# Queue System The Agents SDK provides a built-in queue system that allows you to schedule tasks for asynchronous execution. This is particularly useful for background processing, delayed operations, and managing workloads that don't need immediate execution. ## Overview The queue system is built into the base `Agent` class. Tasks are stored in a SQLite table and processed automatically in FIFO (First In, First Out) order. ## QueueItem Type ```typescript export type QueueItem = { id: string; // Unique identifier for the queued task payload: T; // Data to pass to the callback function callback: keyof Agent; // Name of the method to call created_at: number; // Timestamp when the task was created retry?: RetryOptions; // Retry options (if configured) }; ``` ## Core Methods ### queue() Adds a task to the queue for future execution. ```typescript async queue( callback: keyof this, payload: T, options?: { retry?: RetryOptions } ): Promise ``` **Parameters:** - `callback`: The name of the method to call when processing the task - `payload`: Data to pass to the callback method - `options.retry`: Optional retry configuration. See [Retries](./retries.md) for details. **Returns:** The unique ID of the queued task **Example:** ```typescript class MyAgent extends Agent { async processEmail(data: { email: string; subject: string }) { // Process the email console.log(`Processing email: ${data.subject}`); } async onMessage(message: string) { // Queue an email processing task const taskId = await this.queue("processEmail", { email: "user@example.com", subject: "Welcome!" }); console.log(`Queued task with ID: ${taskId}`); } } ``` ### dequeue() Removes a specific task from the queue by ID. ```typescript dequeue(id: string): void ``` **Parameters:** - `id`: The ID of the task to remove **Example:** ```typescript // Remove a specific task agent.dequeue("abc123def"); ``` ### dequeueAll() Removes all tasks from the queue. ```typescript dequeueAll(): void ``` **Example:** ```typescript // Clear the entire queue agent.dequeueAll(); ``` ### dequeueAllByCallback() Removes all tasks that match a specific callback method. ```typescript dequeueAllByCallback(callback: string): void ``` **Parameters:** - `callback`: Name of the callback method **Example:** ```typescript // Remove all email processing tasks agent.dequeueAllByCallback("processEmail"); ``` ### getQueue() Retrieves a specific queued task by ID. ```typescript getQueue(id: string): QueueItem | undefined ``` **Parameters:** - `id`: The ID of the task to retrieve **Returns:** The QueueItem with parsed payload or undefined if not found **Note:** The payload is automatically parsed from JSON before being returned **Example:** ```typescript const task = agent.getQueue("abc123def"); if (task) { console.log(`Task callback: ${task.callback}`); console.log(`Task payload:`, task.payload); } ``` ### getQueues() Retrieves all queued tasks that match a specific key-value pair in their payload. ```typescript getQueues(key: string, value: string): QueueItem[] ``` **Parameters:** - `key`: The key to filter by in the payload - `value`: The value to match **Returns:** Array of matching QueueItem objects **Note:** This method fetches all queue items and filters them in memory by parsing each payload and checking if the specified key matches the value **Example:** ```typescript // Find all tasks for a specific user const userTasks = await agent.getQueues("userId", "12345"); ``` ## How Queue Processing Works 1. **Validation**: When calling `queue()`, the method validates that the callback exists as a function on the agent 2. **Automatic Processing**: After queuing, the system automatically attempts to flush the queue 3. **FIFO Order**: Tasks are processed in the order they were created (`created_at` timestamp) 4. **Context Preservation**: Each queued task runs with the same agent context (connection, request, email) 5. **Automatic Retries**: If a callback fails, it is retried with exponential backoff (configurable per task) 6. **Automatic Dequeue**: Tasks are removed from the queue after successful execution or after all retry attempts are exhausted 7. **Error Handling**: If a callback method does not exist at execution time, an error is logged and the task is skipped 8. **Persistence**: Tasks are stored in the `cf_agents_queues` table and survive agent restarts ## Queue Callback Methods When defining callback methods for queued tasks, they must follow this signature: ```typescript async callbackMethod(payload: unknown, queueItem: QueueItem): Promise ``` **Example:** ```typescript class MyAgent extends Agent { async sendNotification( payload: { userId: string; message: string }, queueItem: QueueItem ) { console.log(`Processing task ${queueItem.id}`); console.log( `Sending notification to user ${payload.userId}: ${payload.message}` ); // Your notification logic here await this.notificationService.send(payload.userId, payload.message); } async onUserSignup(userData: any) { // Queue a welcome notification await this.queue("sendNotification", { userId: userData.id, message: "Welcome to our platform!" }); } } ``` ## Use Cases ### Background Processing ```typescript class DataProcessor extends Agent { async processLargeDataset(data: { datasetId: string; userId: string }) { const results = await this.heavyComputation(data.datasetId); await this.notifyUser(data.userId, results); } async onDataUpload(uploadData: any) { // Queue the processing instead of doing it synchronously await this.queue("processLargeDataset", { datasetId: uploadData.id, userId: uploadData.userId }); return { message: "Data upload received, processing started" }; } } ``` ### Delayed Operations ```typescript class ReminderAgent extends Agent { async sendReminder(data: { userId: string; message: string }) { await this.emailService.send(data.userId, data.message); } async scheduleReminder(userId: string, message: string, delayMs: number) { // Note: For true delayed execution, combine with the scheduling system // This example shows queueing for later processing await this.queue("sendReminder", { userId, message }); } } ``` ### Batch Operations ```typescript class BatchProcessor extends Agent { async processBatch(data: { items: any[]; batchId: string }) { for (const item of data.items) { await this.processItem(item); } console.log(`Completed batch ${data.batchId}`); } async onLargeRequest(items: any[]) { // Split large requests into smaller batches const batchSize = 10; for (let i = 0; i < items.length; i += batchSize) { const batch = items.slice(i, i + batchSize); await this.queue("processBatch", { items: batch, batchId: `batch-${i / batchSize + 1}` }); } } } ``` ## Best Practices 1. **Keep Payloads Small**: Payloads are JSON-serialized and stored in the database 2. **Idempotent Operations**: Design callback methods to be safe to retry 3. **Error Handling**: Include proper error handling in callback methods 4. **Monitoring**: Use logging to track queue processing 5. **Cleanup**: Regularly clean up completed or failed tasks if needed ## Error Handling and Retries Queued tasks are automatically retried on failure with exponential backoff. The default is 3 attempts. You can customize this per task: ```typescript // Retry up to 5 times with custom backoff await this.queue("reliableTask", payload, { retry: { maxAttempts: 5, baseDelayMs: 500 } }); ``` If you need custom error handling in the callback: ```typescript class RobustAgent extends Agent { async reliableTask(payload: { data: string }, queueItem: QueueItem) { try { await this.doSomethingRisky(payload); } catch (error) { console.error(`Task ${queueItem.id} failed:`, error); // The retry system will catch this error and retry automatically throw error; } } } ``` See [Retries](./retries.md) for full documentation on retry options and patterns. ## Integration with Other Features The queue system works seamlessly with other Agent SDK features: - **State Management**: Access agent state within queued callbacks - **Scheduling**: Combine with `schedule()` for time-based queue processing - **Retries**: Built-in retry with exponential backoff. See [Retries](./retries.md). - **Context**: Queued tasks maintain the original request context - **Database**: Uses the same database as other agent data ## Limitations - Tasks are processed sequentially, not in parallel - No priority system (FIFO only) - Queue processing happens during agent execution, not as separate background jobs - Failed tasks are dequeued after all retry attempts are exhausted (no dead-letter queue)