branch:
queue.md
9028 bytesRaw
# Queue System

The Agents SDK provides a built-in queue system that allows you to schedule tasks for asynchronous execution. This is particularly useful for background processing, delayed operations, and managing workloads that don't need immediate execution.

## Overview

The queue system is built into the base `Agent` class. Tasks are stored in a SQLite table and processed automatically in FIFO (First In, First Out) order.

## QueueItem Type

```typescript
export type QueueItem<T = string> = {
  id: string; // Unique identifier for the queued task
  payload: T; // Data to pass to the callback function
  callback: keyof Agent<unknown>; // Name of the method to call
  created_at: number; // Timestamp when the task was created
  retry?: RetryOptions; // Retry options (if configured)
};
```

## Core Methods

### queue()

Adds a task to the queue for future execution.

```typescript
async queue<T = unknown>(
  callback: keyof this,
  payload: T,
  options?: { retry?: RetryOptions }
): Promise<string>
```

**Parameters:**

- `callback`: The name of the method to call when processing the task
- `payload`: Data to pass to the callback method
- `options.retry`: Optional retry configuration. See [Retries](./retries.md) for details.

**Returns:** The unique ID of the queued task

**Example:**

```typescript
class MyAgent extends Agent {
  async processEmail(data: { email: string; subject: string }) {
    // Process the email
    console.log(`Processing email: ${data.subject}`);
  }

  async onMessage(message: string) {
    // Queue an email processing task
    const taskId = await this.queue("processEmail", {
      email: "user@example.com",
      subject: "Welcome!"
    });

    console.log(`Queued task with ID: ${taskId}`);
  }
}
```

### dequeue()

Removes a specific task from the queue by ID.

```typescript
dequeue(id: string): void
```

**Parameters:**

- `id`: The ID of the task to remove

**Example:**

```typescript
// Remove a specific task
agent.dequeue("abc123def");
```

### dequeueAll()

Removes all tasks from the queue.

```typescript
dequeueAll(): void
```

**Example:**

```typescript
// Clear the entire queue
agent.dequeueAll();
```

### dequeueAllByCallback()

Removes all tasks that match a specific callback method.

```typescript
dequeueAllByCallback(callback: string): void
```

**Parameters:**

- `callback`: Name of the callback method

**Example:**

```typescript
// Remove all email processing tasks
agent.dequeueAllByCallback("processEmail");
```

### getQueue()

Retrieves a specific queued task by ID.

```typescript
getQueue(id: string): QueueItem<string> | undefined
```

**Parameters:**

- `id`: The ID of the task to retrieve

**Returns:** The QueueItem with parsed payload or undefined if not found

**Note:** The payload is automatically parsed from JSON before being returned

**Example:**

```typescript
const task = agent.getQueue("abc123def");
if (task) {
  console.log(`Task callback: ${task.callback}`);
  console.log(`Task payload:`, task.payload);
}
```

### getQueues()

Retrieves all queued tasks that match a specific key-value pair in their payload.

```typescript
getQueues(key: string, value: string): QueueItem<string>[]
```

**Parameters:**

- `key`: The key to filter by in the payload
- `value`: The value to match

**Returns:** Array of matching QueueItem objects

**Note:** This method fetches all queue items and filters them in memory by parsing each payload and checking if the specified key matches the value

**Example:**

```typescript
// Find all tasks for a specific user
const userTasks = await agent.getQueues("userId", "12345");
```

## How Queue Processing Works

1. **Validation**: When calling `queue()`, the method validates that the callback exists as a function on the agent
2. **Automatic Processing**: After queuing, the system automatically attempts to flush the queue
3. **FIFO Order**: Tasks are processed in the order they were created (`created_at` timestamp)
4. **Context Preservation**: Each queued task runs with the same agent context (connection, request, email)
5. **Automatic Retries**: If a callback fails, it is retried with exponential backoff (configurable per task)
6. **Automatic Dequeue**: Tasks are removed from the queue after successful execution or after all retry attempts are exhausted
7. **Error Handling**: If a callback method does not exist at execution time, an error is logged and the task is skipped
8. **Persistence**: Tasks are stored in the `cf_agents_queues` table and survive agent restarts

## Queue Callback Methods

When defining callback methods for queued tasks, they must follow this signature:

```typescript
async callbackMethod(payload: unknown, queueItem: QueueItem<string>): Promise<void>
```

**Example:**

```typescript
class MyAgent extends Agent {
  async sendNotification(
    payload: { userId: string; message: string },
    queueItem: QueueItem<string>
  ) {
    console.log(`Processing task ${queueItem.id}`);
    console.log(
      `Sending notification to user ${payload.userId}: ${payload.message}`
    );

    // Your notification logic here
    await this.notificationService.send(payload.userId, payload.message);
  }

  async onUserSignup(userData: any) {
    // Queue a welcome notification
    await this.queue("sendNotification", {
      userId: userData.id,
      message: "Welcome to our platform!"
    });
  }
}
```

## Use Cases

### Background Processing

```typescript
class DataProcessor extends Agent {
  async processLargeDataset(data: { datasetId: string; userId: string }) {
    const results = await this.heavyComputation(data.datasetId);
    await this.notifyUser(data.userId, results);
  }

  async onDataUpload(uploadData: any) {
    // Queue the processing instead of doing it synchronously
    await this.queue("processLargeDataset", {
      datasetId: uploadData.id,
      userId: uploadData.userId
    });

    return { message: "Data upload received, processing started" };
  }
}
```

### Delayed Operations

```typescript
class ReminderAgent extends Agent {
  async sendReminder(data: { userId: string; message: string }) {
    await this.emailService.send(data.userId, data.message);
  }

  async scheduleReminder(userId: string, message: string, delayMs: number) {
    // Note: For true delayed execution, combine with the scheduling system
    // This example shows queueing for later processing
    await this.queue("sendReminder", { userId, message });
  }
}
```

### Batch Operations

```typescript
class BatchProcessor extends Agent {
  async processBatch(data: { items: any[]; batchId: string }) {
    for (const item of data.items) {
      await this.processItem(item);
    }
    console.log(`Completed batch ${data.batchId}`);
  }

  async onLargeRequest(items: any[]) {
    // Split large requests into smaller batches
    const batchSize = 10;
    for (let i = 0; i < items.length; i += batchSize) {
      const batch = items.slice(i, i + batchSize);
      await this.queue("processBatch", {
        items: batch,
        batchId: `batch-${i / batchSize + 1}`
      });
    }
  }
}
```

## Best Practices

1. **Keep Payloads Small**: Payloads are JSON-serialized and stored in the database
2. **Idempotent Operations**: Design callback methods to be safe to retry
3. **Error Handling**: Include proper error handling in callback methods
4. **Monitoring**: Use logging to track queue processing
5. **Cleanup**: Regularly clean up completed or failed tasks if needed

## Error Handling and Retries

Queued tasks are automatically retried on failure with exponential backoff. The default is 3 attempts. You can customize this per task:

```typescript
// Retry up to 5 times with custom backoff
await this.queue("reliableTask", payload, {
  retry: { maxAttempts: 5, baseDelayMs: 500 }
});
```

If you need custom error handling in the callback:

```typescript
class RobustAgent extends Agent {
  async reliableTask(payload: { data: string }, queueItem: QueueItem<string>) {
    try {
      await this.doSomethingRisky(payload);
    } catch (error) {
      console.error(`Task ${queueItem.id} failed:`, error);
      // The retry system will catch this error and retry automatically
      throw error;
    }
  }
}
```

See [Retries](./retries.md) for full documentation on retry options and patterns.

## Integration with Other Features

The queue system works seamlessly with other Agent SDK features:

- **State Management**: Access agent state within queued callbacks
- **Scheduling**: Combine with `schedule()` for time-based queue processing
- **Retries**: Built-in retry with exponential backoff. See [Retries](./retries.md).
- **Context**: Queued tasks maintain the original request context
- **Database**: Uses the same database as other agent data

## Limitations

- Tasks are processed sequentially, not in parallel
- No priority system (FIFO only)
- Queue processing happens during agent execution, not as separate background jobs
- Failed tasks are dequeued after all retry attempts are exhausted (no dead-letter queue)