branch:
workflows.md
24993 bytesRaw
# Workflows Integration

Integrate [Cloudflare Workflows](https://developers.cloudflare.com/workflows/) with Agents for durable, multi-step background processing while Agents handle real-time communication.

## Quick Links

- [Start a workflow](#2-start-workflow-from-agent)
- [Human approval](#human-in-the-loop-approvals)
- [Pagination](#pagination)
- [Workflow controls](#workflow-control-methods)

## Introduction

### What are Cloudflare Workflows?

Cloudflare Workflows provide durable, multi-step execution that survives failures, retries automatically, and can pause to wait for external events. They're ideal for:

- Long-running background tasks (data processing, report generation)
- Multi-step pipelines with retry logic
- Human-in-the-loop approval flows
- Tasks that shouldn't block user requests

### Why Integrate with Agents?

Agents excel at real-time communication and state management, while Workflows excel at durable execution. Together they provide:

| Feature                | Agent   | Workflow | Combined         |
| ---------------------- | ------- | -------- | ---------------- |
| Real-time WebSocket    | ✓       | ✗        | Agent handles    |
| Long-running tasks     | Limited | ✓        | Workflow handles |
| State persistence      | ✓       | ✓        | Both             |
| Automatic retries      | ✗       | ✓        | Workflow handles |
| External event waiting | ✗       | ✓        | Workflow handles |

### When to Use What

| Use Case                      | Recommendation                 |
| ----------------------------- | ------------------------------ |
| Chat/messaging                | Agent only                     |
| Quick API calls               | Agent only                     |
| Background processing (< 30s) | Agent `queue()`                |
| Long-running tasks (> 30s)    | Agent + Workflow               |
| Multi-step pipelines          | Workflow                       |
| Human approval flows          | Agent + Workflow               |
| Scheduled tasks               | Agent `schedule()` or Workflow |

## Quick Start

### 1. Define Your Workflow

Create a Workflow that extends `AgentWorkflow` to get typed access to the originating Agent:

```typescript
// src/workflows/processing.ts
import { AgentWorkflow } from "agents/workflows";
import type { AgentWorkflowEvent, AgentWorkflowStep } from "agents/workflows";
import type { MyAgent } from "../agent";

type TaskParams = {
  taskId: string;
  data: string;
};

export class ProcessingWorkflow extends AgentWorkflow<MyAgent, TaskParams> {
  async run(event: AgentWorkflowEvent<TaskParams>, step: AgentWorkflowStep) {
    const params = event.payload;

    // Step 1: Process data
    const result = await step.do("process-data", async () => {
      // Durable step - will retry on failure
      return processData(params.data);
    });

    // Report progress to Agent (non-durable, lightweight)
    await this.reportProgress({
      step: "process",
      status: "complete",
      percent: 0.5
    });

    // Step 2: Save results
    await step.do("save-results", async () => {
      // Call Agent method via RPC
      await this.agent.saveResult(params.taskId, result);
    });

    // Broadcast to connected clients (non-durable)
    this.broadcastToClients({
      type: "task-complete",
      taskId: params.taskId
    });

    // Report completion (durable via step)
    await step.reportComplete(result);

    return result;
  }
}
```

### 2. Start Workflow from Agent

Use `runWorkflow()` to start a workflow with automatic tracking:

```typescript
// src/agent.ts
import { Agent } from "agents";

export class MyAgent extends Agent {
  async startTask(taskId: string, data: string) {
    // Start workflow - automatically tracked in Agent's database
    const instanceId = await this.runWorkflow("PROCESSING_WORKFLOW", {
      taskId,
      data
    });

    return { instanceId };
  }

  // Called when workflow reports progress (progress is typed object)
  async onWorkflowProgress(
    workflowName: string,
    instanceId: string,
    progress: unknown
  ) {
    // Cast to your progress type
    const p = progress as { step?: string; status?: string; percent?: number };
    console.log(
      `Workflow ${workflowName}/${instanceId}: ${p.step} - ${p.status} (${(p.percent ?? 0) * 100}%)`
    );

    // Broadcast to connected clients
    this.broadcast(
      JSON.stringify({
        type: "workflow-progress",
        workflowName,
        instanceId,
        progress
      })
    );
  }

  // Called when workflow completes
  async onWorkflowComplete(
    workflowName: string,
    instanceId: string,
    result?: unknown
  ) {
    console.log(`Workflow ${workflowName}/${instanceId} completed:`, result);
  }

  // Method called by workflow via RPC
  async saveResult(taskId: string, result: unknown) {
    this
      .sql`INSERT INTO results (task_id, data) VALUES (${taskId}, ${JSON.stringify(result)})`;
  }
}
```

### 3. Configure Wrangler

```jsonc
// wrangler.jsonc
{
  "name": "my-app",
  "main": "src/index.ts",
  "compatibility_date": "2025-02-11",
  "durable_objects": {
    "bindings": [{ "name": "MY_AGENT", "class_name": "MyAgent" }]
  },
  "workflows": [
    {
      "name": "processing-workflow",
      "binding": "PROCESSING_WORKFLOW",
      "class_name": "ProcessingWorkflow"
    }
  ],
  "migrations": [{ "tag": "v1", "new_sqlite_classes": ["MyAgent"] }]
}
```

## API Reference

### `AgentWorkflow<AgentType, Params, ProgressType, Env>`

Base class for Workflows that integrate with Agents.

**Type Parameters:**

- `AgentType` - The Agent class type (for typed RPC)
- `Params` - User params passed to the workflow (optional)
- `ProgressType` - Type for progress reporting (defaults to `DefaultProgress`)
- `Env` - Environment type (defaults to `Cloudflare.Env`)

**Properties:**

- `agent` - Typed stub for calling Agent methods via RPC
- `instanceId` - The workflow instance ID
- `workflowName` - The workflow binding name
- `env` - Environment bindings

**Methods on `this` (non-durable, may repeat on retry):**

| Method                         | Description                                   |
| ------------------------------ | --------------------------------------------- |
| `reportProgress(progress)`     | Report typed progress object to the Agent     |
| `broadcastToClients(message)`  | Broadcast message to all WebSocket clients    |
| `waitForApproval(step, opts?)` | Wait for approval event (throws on rejection) |

**Methods on `step` (durable, idempotent, won't repeat on retry):**

| Method                          | Description                                    |
| ------------------------------- | ---------------------------------------------- |
| `step.reportComplete(result?)`  | Report successful completion                   |
| `step.reportError(error)`       | Report an error                                |
| `step.sendEvent(event)`         | Send a custom event to the Agent               |
| `step.updateAgentState(state)`  | Replace Agent state (broadcasts to clients)    |
| `step.mergeAgentState(partial)` | Merge into Agent state (broadcasts to clients) |
| `step.resetAgentState()`        | Reset Agent state to initialState              |

**DefaultProgress Type:**

```typescript
type DefaultProgress = {
  step?: string;
  status?: "pending" | "running" | "complete" | "error";
  message?: string;
  percent?: number;
  [key: string]: unknown; // extensible
};
```

### Agent Workflow Methods

Methods added to the `Agent` class:

#### `runWorkflow(workflowName, params, options?)`

Start a workflow and track it in the Agent's database.

```typescript
const instanceId = await this.runWorkflow(
  "MY_WORKFLOW",
  { taskId: "123", data: "process this" },
  {
    id: "custom-id", // optional - auto-generated if not provided
    metadata: { userId: "user-456", priority: "high" }, // optional - for querying
    agentBinding: "MyAgent" // optional - auto-detected from class name if not provided
  }
);
```

**Parameters:**

- `workflowName` - Workflow binding name from `env`
- `params` - Params to pass to the workflow
- `options.id` - Custom workflow ID (auto-generated if not provided)
- `options.metadata` - Optional metadata stored for querying (not passed to workflow)
- `options.agentBinding` - Agent binding name (auto-detected from class name if not provided)

**Returns:** Workflow instance ID

#### `sendWorkflowEvent(workflowName, instanceId, event)`

Send an event to a running workflow.

```typescript
await this.sendWorkflowEvent("MY_WORKFLOW", instanceId, {
  type: "approval",
  payload: { approved: true }
});
```

#### `getWorkflowStatus(workflowName, instanceId)`

Get the status of a workflow and update tracking record.

```typescript
const status = await this.getWorkflowStatus("MY_WORKFLOW", instanceId);
// status: { status: 'running', output: null, error: null }
```

#### `getWorkflow(instanceId)`

Get a tracked workflow by ID.

```typescript
const workflow = this.getWorkflow(instanceId);
// { instanceId, workflowName, status, metadata, error, createdAt, ... }
```

#### `getWorkflows(criteria?)`

Query tracked workflows with cursor-based pagination. Returns a `WorkflowPage` with workflows, total count, and cursor for the next page.

```typescript
// Get running workflows (default limit is 50, max is 100)
const { workflows, total } = this.getWorkflows({ status: "running" });

// Get workflows by binding name
const { workflows: processing } = this.getWorkflows({
  workflowName: "PROCESSING_WORKFLOW"
});

// Filter by metadata
const { workflows: userWorkflows } = this.getWorkflows({
  metadata: { userId: "user-456" }
});

// Pagination example
const page1 = this.getWorkflows({
  status: ["complete", "errored"],
  limit: 20,
  orderBy: "desc"
});

console.log(`Showing ${page1.workflows.length} of ${page1.total} workflows`);

// Get next page using cursor
if (page1.nextCursor) {
  const page2 = this.getWorkflows({
    status: ["complete", "errored"],
    limit: 20,
    orderBy: "desc",
    cursor: page1.nextCursor
  });
}
```

The `WorkflowPage` type:

```typescript
type WorkflowPage = {
  workflows: WorkflowInfo[];
  total: number; // Total matching workflows
  nextCursor: string | null; // null when no more pages
};
```

#### `deleteWorkflow(instanceId)`

Delete a single workflow tracking record.

```typescript
const deleted = this.deleteWorkflow(instanceId);
// true if deleted, false if not found
```

#### `deleteWorkflows(criteria?)`

Delete workflow tracking records matching criteria. Useful for cleanup.

```typescript
// Delete all completed workflows older than 7 days
const count = this.deleteWorkflows({
  status: "complete",
  createdBefore: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000)
});

// Delete all errored and terminated workflows
const count = this.deleteWorkflows({
  status: ["errored", "terminated"]
});
```

#### `terminateWorkflow(instanceId)`

Terminate a running workflow immediately.

```typescript
await this.terminateWorkflow(instanceId);
```

This stops the workflow and sets its status to `"terminated"`. Throws if the workflow is not found in the tracking table. Cloudflare will throw if the workflow is already completed, errored, or terminated.

> **Note:** `terminate()` is not yet supported in local development with `wrangler dev`. It works when deployed to Cloudflare. Follow [#823](https://github.com/cloudflare/agents/issues/823) for details and updates.

#### `pauseWorkflow(instanceId)`

Pause a running workflow. The workflow can be resumed later with `resumeWorkflow()`.

```typescript
await this.pauseWorkflow(instanceId);
```

Throws if the workflow is not running. Cloudflare will throw if the workflow is already paused, completed, errored, or terminated.

> **Note:** `pause()` is not yet supported in local development with `wrangler dev`. It works when deployed to Cloudflare. Follow [#823](https://github.com/cloudflare/agents/issues/823) for details and updates.

#### `resumeWorkflow(instanceId)`

Resume a paused workflow.

```typescript
await this.resumeWorkflow(instanceId);
```

Throws if the workflow is not paused. Cloudflare will throw if the workflow is already running, completed, errored, or terminated.

> **Note:** `resume()` is not yet supported in local development with `wrangler dev`. It works when deployed to Cloudflare. Follow [#823](https://github.com/cloudflare/agents/issues/823) for details and updates.

#### `restartWorkflow(instanceId, options?)`

Restart a workflow instance from the beginning with the same ID.

```typescript
// Reset tracking (default) - clears timestamps and error fields
await this.restartWorkflow(instanceId);

// Preserve original timestamps
await this.restartWorkflow(instanceId, { resetTracking: false });
```

This is useful for re-running failed workflows or retrying from scratch. The `resetTracking` option (default: `true`) controls whether to reset the `created_at` timestamp and clear error fields.

> **Note:** `restart()` is not yet supported in local development with `wrangler dev`. It works when deployed to Cloudflare. Follow [#823](https://github.com/cloudflare/agents/issues/823) for details and updates.

### Lifecycle Callbacks

Override these methods in your Agent to handle workflow events:

```typescript
class MyAgent extends Agent {
  // Called when workflow reports progress (progress is typed object)
  async onWorkflowProgress(
    workflowName: string,
    instanceId: string,
    progress: unknown
  ) {
    // Cast to your progress type
    const p = progress as { step?: string; percent?: number };
  }

  // Called when workflow completes successfully
  async onWorkflowComplete(
    workflowName: string,
    instanceId: string,
    result?: unknown
  ) {}

  // Called when workflow encounters an error
  async onWorkflowError(
    workflowName: string,
    instanceId: string,
    error: string
  ) {}

  // Called when workflow sends a custom event
  async onWorkflowEvent(
    workflowName: string,
    instanceId: string,
    event: unknown
  ) {}

  // Handle all callbacks in one place (alternative)
  async onWorkflowCallback(callback: WorkflowCallback) {
    // Called for all callback types - callback includes workflowName
  }
}
```

### Approval Methods

Convenience methods for human-in-the-loop approval flows:

```typescript
class MyAgent extends Agent {
  // Approve a waiting workflow
  async handleApproval(instanceId: string, userId: string) {
    await this.approveWorkflow(instanceId, {
      reason: "Approved by admin",
      metadata: { approvedBy: userId }
    });
  }

  // Reject a waiting workflow
  async handleRejection(instanceId: string, reason: string) {
    await this.rejectWorkflow(instanceId, { reason });
  }
}
```

## Workflow Tracking

Workflows started with `runWorkflow()` are automatically tracked in the Agent's SQLite database.

### `cf_agents_workflows` Table

| Column          | Type    | Description                     |
| --------------- | ------- | ------------------------------- |
| `id`            | TEXT    | Internal row ID                 |
| `workflow_id`   | TEXT    | Cloudflare workflow instance ID |
| `workflow_name` | TEXT    | Workflow binding name           |
| `status`        | TEXT    | Current status                  |
| `metadata`      | TEXT    | JSON metadata (for querying)    |
| `error_name`    | TEXT    | Error name (if failed)          |
| `error_message` | TEXT    | Error message (if failed)       |
| `created_at`    | INTEGER | Unix timestamp                  |
| `updated_at`    | INTEGER | Unix timestamp                  |
| `completed_at`  | INTEGER | Unix timestamp (when done)      |

Note: Workflow params and output are not stored by default. Use `metadata` to store queryable information, and store large payloads in your own tables if needed.

### Workflow Status Values

- `queued` - Waiting to start
- `running` - Currently executing
- `paused` - Paused by user
- `waiting` - Waiting for event
- `complete` - Finished successfully
- `errored` - Failed with error
- `terminated` - Manually terminated

## Patterns

### Background Processing with Progress

```typescript
// Workflow with default progress type
export class DataProcessingWorkflow extends AgentWorkflow<
  MyAgent,
  ProcessParams
> {
  async run(event: AgentWorkflowEvent<ProcessParams>, step: AgentWorkflowStep) {
    const params = event.payload;
    const items = params.items;

    for (let i = 0; i < items.length; i++) {
      await step.do(`process-${i}`, async () => {
        await processItem(items[i]);
      });

      // Report progress after each item (non-durable, lightweight)
      await this.reportProgress({
        step: `process-${i}`,
        status: "complete",
        percent: (i + 1) / items.length,
        message: `Processed ${i + 1}/${items.length}`
      });
    }

    await step.reportComplete({ processed: items.length });
  }
}

// Agent
class MyAgent extends Agent {
  async onWorkflowProgress(
    workflowName: string,
    instanceId: string,
    progress: unknown
  ) {
    // Broadcast progress to all connected clients
    this.broadcast(
      JSON.stringify({
        type: "processing-progress",
        workflowName,
        instanceId,
        progress
      })
    );
  }
}
```

### Human-in-the-Loop Approval

```typescript
// Workflow using the built-in waitForApproval helper
export class ApprovalWorkflow extends AgentWorkflow<MyAgent, RequestParams> {
  async run(event: AgentWorkflowEvent<RequestParams>, step: AgentWorkflowStep) {
    const params = event.payload;

    // Prepare request
    const request = await step.do("prepare", async () => {
      return { ...params, preparedAt: Date.now() };
    });

    // Wait for approval (throws WorkflowRejectedError if rejected)
    await this.reportProgress({
      step: "approval",
      status: "pending",
      percent: 0.5,
      message: "Awaiting approval"
    });
    const approvalData = await this.waitForApproval<{ approvedBy: string }>(
      step,
      { timeout: "7 days" }
    );

    console.log("Approved by:", approvalData?.approvedBy);

    // Execute approved action
    const result = await step.do("execute", async () => {
      return executeRequest(request);
    });

    await step.reportComplete(result);
    return result;
  }
}

// Agent using the built-in approval methods
class MyAgent extends Agent {
  // Approve a waiting workflow
  async handleApproval(instanceId: string, userId: string) {
    await this.approveWorkflow(instanceId, {
      reason: "Approved by admin",
      metadata: { approvedBy: userId }
    });
  }

  // Reject a waiting workflow
  async handleRejection(instanceId: string, reason: string) {
    await this.rejectWorkflow(instanceId, { reason });
  }
}
```

### Durable Task Queue with Retries

```typescript
// Workflow with built-in retry logic
export class ResilientTaskWorkflow extends AgentWorkflow<MyAgent, TaskParams> {
  async run(event: AgentWorkflowEvent<TaskParams>, step: AgentWorkflowStep) {
    const params = event.payload;

    const result = await step.do(
      "call-external-api",
      {
        retries: {
          limit: 5,
          delay: "10 seconds",
          backoff: "exponential"
        },
        timeout: "5 minutes"
      },
      async () => {
        const response = await fetch("https://api.example.com/process", {
          method: "POST",
          body: JSON.stringify(params)
        });

        if (!response.ok) {
          throw new Error(`API error: ${response.status}`);
        }

        return response.json();
      }
    );

    await step.reportComplete(result);
    return result;
  }
}
```

### State Synchronization

Workflows can update the Agent's state directly (durably via step), which automatically broadcasts to all connected clients:

```typescript
// Workflow that syncs state to Agent
export class ProcessingWorkflow extends AgentWorkflow<MyAgent, TaskParams> {
  async run(event: AgentWorkflowEvent<TaskParams>, step: AgentWorkflowStep) {
    const params = event.payload;

    // Update Agent state (durable, replaces entire state, broadcasts to clients)
    await step.updateAgentState({
      currentTask: {
        id: params.taskId,
        status: "processing",
        startedAt: Date.now()
      }
    });

    const result = await step.do("process", async () => {
      return processTask(params);
    });

    // Merge partial state (durable, keeps existing fields, broadcasts to clients)
    await step.mergeAgentState({
      currentTask: {
        status: "complete",
        result,
        completedAt: Date.now()
      }
    });

    await step.reportComplete(result);
    return result;
  }
}
```

### Custom Progress Types

Define custom progress types for domain-specific reporting:

```typescript
// Custom progress type for data pipeline
type PipelineProgress = {
  stage: "extract" | "transform" | "load";
  recordsProcessed: number;
  totalRecords: number;
  currentTable?: string;
};

// Workflow with custom progress type (3rd type parameter)
export class ETLWorkflow extends AgentWorkflow<
  MyAgent,
  ETLParams,
  PipelineProgress
> {
  async run(event: AgentWorkflowEvent<ETLParams>, step: AgentWorkflowStep) {
    const params = event.payload;

    // Report typed progress (non-durable, lightweight for frequent updates)
    await this.reportProgress({
      stage: "extract",
      recordsProcessed: 0,
      totalRecords: 1000,
      currentTable: "users"
    });

    // ... processing
  }
}

// Agent receives typed progress
class MyAgent extends Agent {
  async onWorkflowProgress(
    workflowName: string,
    instanceId: string,
    progress: unknown
  ) {
    const p = progress as PipelineProgress;
    console.log(`Stage: ${p.stage}, ${p.recordsProcessed}/${p.totalRecords}`);
  }
}
```

## Bidirectional Communication

### Workflow → Agent

```typescript
// Direct RPC call (typed)
await this.agent.updateTaskStatus(taskId, "processing");
const data = await this.agent.getData(taskId);

// Non-durable callbacks (may repeat on retry, use for frequent updates)
await this.reportProgress({
  step: "process",
  percent: 0.5,
  message: "Halfway done"
});
this.broadcastToClients({ type: "update", data });

// Durable callbacks via step (idempotent, won't repeat on retry)
await step.reportComplete(result);
await step.reportError("Something went wrong");
await step.sendEvent({ type: "custom", data: {} });

// Durable state synchronization via step (broadcasts to clients)
await step.updateAgentState({ status: "processing" });
await step.mergeAgentState({ progress: 0.5 });
```

### Agent → Workflow

```typescript
// Send event to waiting workflow (generic)
await this.sendWorkflowEvent("MY_WORKFLOW", instanceId, {
  type: "custom-event",
  payload: { action: "proceed" }
});

// Approve/reject workflows using convenience methods
await this.approveWorkflow(instanceId, {
  reason: "Approved by admin",
  metadata: { approvedBy: userId }
});

await this.rejectWorkflow(instanceId, {
  reason: "Request denied"
});

// The workflow waits for approval with:
const approvalData = await this.waitForApproval(step, { timeout: "7 days" });
```

## Best Practices

1. **Keep workflows focused** - One workflow per logical task
2. **Use meaningful step names** - Helps with debugging and observability
3. **Report progress regularly** - Keeps users informed
4. **Handle errors gracefully** - Use `reportError()` before throwing
5. **Clean up completed workflows** - The `cf_agents_workflows` table can grow unbounded, so implement a retention policy:

```typescript
// Option 1: Cleanup immediately on completion
async onWorkflowComplete(workflowName, instanceId, result) {
  // Process result first, then delete
  this.deleteWorkflow(instanceId);
}

// Option 2: Scheduled cleanup (keep recent history)
// Call this periodically via a scheduled task or cron
this.deleteWorkflows({
  status: ["complete", "errored"],
  createdBefore: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000) // 7 days
});

// Option 3: Keep all history for compliance/auditing
// Don't call deleteWorkflows() - query historical data as needed
```

6. **Handle workflow binding renames carefully** - If you rename a workflow binding in `wrangler.jsonc`, existing tracked workflows will reference the old name. The agent will warn on startup if it detects this. Use `migrateWorkflowBinding()` to update them:

```typescript
// After renaming OLD_WORKFLOW to NEW_WORKFLOW in wrangler.toml
async onStart() {
  // Migrate any existing tracked workflows to the new binding name
  const migrated = this.migrateWorkflowBinding('OLD_WORKFLOW', 'NEW_WORKFLOW');
  // You can remove this code after all agents have migrated
}
```

## Limitations

- Workflows can have at most 1,024 steps
- Maximum 10MB state per workflow
- Events wait for at most 1 year
- No direct WebSocket from workflows (use `broadcastToClients()`)
- Workflow execution time: up to 30 minutes per step