/** * Workflow Demo - Task Processing with Approval * * This example demonstrates: * - Multiple concurrent workflows with progress tracking * - Human-in-the-loop approval gate per workflow * - Real-time state sync to connected clients * - Workflow list with status tracking * - Approve/reject specific workflows from the Agent */ import { Agent, callable, routeAgentRequest } from "agents"; import { AgentWorkflow } from "agents/workflows"; import type { AgentWorkflowEvent, AgentWorkflowStep, DefaultProgress, WorkflowInfo } from "agents/workflows"; // Workflow parameters type TaskParams = { taskId: string; taskName: string; }; // Persisted UI state stored in workflow metadata // This survives page refreshes unlike real-time progress updates type PersistedUIState = { waitingForApproval?: boolean; lastProgress?: DefaultProgress; }; // Workflow metadata structure type WorkflowMetadata = { taskName: string; uiState?: PersistedUIState; }; // UI status (subset of WorkflowInfo status with "waiting" for approval UI) type UIStatus = | "queued" | "running" | "waiting" | "complete" | "errored" | "paused"; // Per-workflow UI state extends WorkflowInfo with transient UI fields export type WorkflowItem = Omit & { /** UI-friendly status */ status: UIStatus; /** Display name for the task */ taskName: string; /** Real-time progress (not persisted in tracking table) */ progress: DefaultProgress | null; /** Whether workflow is waiting for user approval */ waitingForApproval: boolean; /** Workflow result (not persisted in tracking table) */ result?: unknown; }; // Shared agent state (minimal - real-time updates only) // Pagination is client-side, not shared export type AgentState = Record; // Empty state // Page result returned by listWorkflows export type WorkflowPage = { workflows: WorkflowItem[]; total: number; nextCursor: string | null; }; // Broadcast message types for real-time updates export type WorkflowUpdate = | { type: "workflow:added"; workflow: WorkflowItem } | { type: "workflow:updated"; workflowId: string; updates: Partial; } | { type: "workflow:removed"; workflowId: string }; /** * TaskAgent - manages multiple task workflows and syncs state to clients */ export class TaskAgent extends Agent { // Empty state - pagination is client-side initialState: AgentState = {}; /** * Convert WorkflowInfo to WorkflowItem for UI * Reads persisted UI state from metadata.uiState */ private toWorkflowItem( info: WorkflowInfo, overrides: Partial = {} ): WorkflowItem { const metadata = info.metadata as WorkflowMetadata | null; const uiState = metadata?.uiState; // Use overrides first, then persisted uiState, then defaults const waitingForApproval = overrides.waitingForApproval ?? uiState?.waitingForApproval ?? false; const progress = overrides.progress ?? uiState?.lastProgress ?? null; return { ...info, status: this.mapStatus(info.status, waitingForApproval), taskName: metadata?.taskName || "Unknown Task", progress, waitingForApproval, result: overrides.result }; } /** * Update persisted UI state in metadata */ private updateUIState(workflowId: string, uiState: PersistedUIState): void { const workflow = this.getWorkflow(workflowId); if (!workflow) return; const metadata = (workflow.metadata as WorkflowMetadata) || {}; const updatedMetadata: WorkflowMetadata = { ...metadata, uiState: { ...metadata.uiState, ...uiState } }; try { this.sql` UPDATE cf_agents_workflows SET metadata = ${JSON.stringify(updatedMetadata)} WHERE workflow_id = ${workflowId} `; } catch (err) { console.error(`Failed to update UI state for ${workflowId}:`, err); } } /** * Clear persisted UI state from metadata */ private clearUIState(workflowId: string): void { const workflow = this.getWorkflow(workflowId); if (!workflow) return; const metadata = (workflow.metadata as WorkflowMetadata) || {}; const { uiState: _, ...rest } = metadata; try { this.sql` UPDATE cf_agents_workflows SET metadata = ${JSON.stringify(rest)} WHERE workflow_id = ${workflowId} `; } catch (err) { console.error(`Failed to clear UI state for ${workflowId}:`, err); } } /** * Broadcast a workflow update to all connected clients */ private broadcastUpdate(update: WorkflowUpdate): void { this.broadcast(JSON.stringify(update)); } /** * Map tracking table status to UI status */ private mapStatus( status: string, waitingForApproval: boolean ): WorkflowItem["status"] { if (waitingForApproval) return "waiting"; const known: Record = { queued: "queued", running: "running", waiting: "waiting", paused: "paused", complete: "complete", errored: "errored", terminated: "errored" }; return known[status] ?? "running"; } /** * Broadcast a workflow update to all clients */ private updateWorkflow( workflowId: string, updates: Partial ): void { this.broadcastUpdate({ type: "workflow:updated", workflowId, updates }); } /** * Submit a new task for processing */ @callable() async submitTask(taskName: string): Promise { const taskId = crypto.randomUUID(); // Start the workflow const workflowId = await this.runWorkflow( "TASK_WORKFLOW", { taskId, taskName }, { metadata: { taskName } } ); // Create workflow item for UI const now = new Date(); const newWorkflow: WorkflowItem = { id: workflowId, workflowId, workflowName: "TASK_WORKFLOW", metadata: { taskName }, error: null, createdAt: now, updatedAt: now, completedAt: null, status: "queued", taskName, progress: { step: "starting", status: "pending", percent: 0 }, waitingForApproval: false }; // Broadcast to all clients this.broadcastUpdate({ type: "workflow:added", workflow: newWorkflow }); return newWorkflow; } /** * Terminate a specific workflow */ @callable() async terminate(workflowId: string): Promise { await this.terminateWorkflow(workflowId); this.broadcastUpdate({ type: "workflow:removed", workflowId }); } /** * Pause a running workflow */ @callable() async pause(workflowId: string): Promise { await this.pauseWorkflow(workflowId); this.updateWorkflow(workflowId, { status: "paused" }); } /** * Resume a paused workflow */ @callable() async resume(workflowId: string): Promise { await this.resumeWorkflow(workflowId); this.updateWorkflow(workflowId, { status: "running" }); } /** * Restart a completed/errored workflow */ @callable() async restart(workflowId: string): Promise { // Clear persisted UI state before restart this.clearUIState(workflowId); await this.restartWorkflow(workflowId); this.updateWorkflow(workflowId, { status: "queued", progress: { step: "restarting", status: "pending", percent: 0 }, waitingForApproval: false, error: undefined, result: undefined }); } /** * Approve a specific workflow */ @callable() async approve(workflowId: string, reason?: string): Promise { await this.approveWorkflow(workflowId, { reason: reason || "Approved by user", metadata: { approvedAt: Date.now() } }); // Clear persisted UI state this.clearUIState(workflowId); this.updateWorkflow(workflowId, { waitingForApproval: false, status: "running", progress: { step: "approved", status: "running", percent: 0.6, message: "Approval received, continuing..." } }); } /** * Reject a specific workflow */ @callable() async reject(workflowId: string, reason?: string): Promise { await this.rejectWorkflow(workflowId, { reason: reason || "Rejected by user" }); // State will be updated by onWorkflowError callback } /** * List workflows with pagination (client-driven) */ @callable() listWorkflows(cursor?: string, limit = 5): WorkflowPage { const page = this.getWorkflows({ workflowName: "TASK_WORKFLOW", orderBy: "desc", limit, cursor }); // toWorkflowItem reads persisted UI state from metadata.uiState const workflows = page.workflows.map((info) => this.toWorkflowItem(info)); return { workflows, total: page.total, nextCursor: page.nextCursor }; } /** * Clear completed and errored workflows */ @callable() clearCompleted(): { clearedCount: number } { const count = this.deleteWorkflows({ status: ["complete", "errored"] }); // Broadcast that clients should refresh their lists this.broadcast(JSON.stringify({ type: "workflows:cleared", count })); return { clearedCount: count }; } /** * Remove a specific workflow */ @callable() dismissWorkflow(workflowId: string): void { this.deleteWorkflow(workflowId); this.broadcastUpdate({ type: "workflow:removed", workflowId }); } // Lifecycle callbacks from workflow - broadcast updates to all clients async onWorkflowProgress( workflowName: string, workflowId: string, progress: unknown ): Promise { const p = progress as DefaultProgress & { waitingForApproval?: boolean }; console.log(`Progress: ${workflowName}/${workflowId}`, p); const waitingForApproval = p.waitingForApproval ?? false; // Persist UI state so it survives page refresh this.updateUIState(workflowId, { waitingForApproval, lastProgress: p }); this.updateWorkflow(workflowId, { progress: p, waitingForApproval, status: waitingForApproval ? "waiting" : "running" }); } async onWorkflowComplete( workflowName: string, workflowId: string, result?: unknown ): Promise { console.log(`Complete: ${workflowName}/${workflowId}`, result); // Clear persisted UI state on completion this.clearUIState(workflowId); this.updateWorkflow(workflowId, { progress: { step: "done", status: "complete", percent: 1, message: "Task completed!" }, status: "complete", result, waitingForApproval: false }); } async onWorkflowError( workflowName: string, workflowId: string, error: string ): Promise { console.log(`Error: ${workflowName}/${workflowId}`, error); // Clear persisted UI state on error this.clearUIState(workflowId); this.updateWorkflow(workflowId, { progress: { step: "error", status: "error", percent: 0, message: error }, status: "errored", error: { name: "WorkflowError", message: error }, waitingForApproval: false }); } } /** * TaskProcessingWorkflow - multi-step workflow with approval gate */ export class TaskProcessingWorkflow extends AgentWorkflow< TaskAgent, TaskParams > { async run(event: AgentWorkflowEvent, step: AgentWorkflowStep) { const params = event.payload; console.log(`Starting workflow for task: ${params.taskName}`); // Step 1: Validate await this.reportProgress({ step: "validate", status: "running", percent: 0.1, message: "Validating task..." }); await step.do("validate", async () => { // Simulate validation work await sleep(1000); return { valid: true }; }); maybeThrow("validate"); await this.reportProgress({ step: "validate", status: "complete", percent: 0.25, message: "Validation complete" }); // Step 2: Process await this.reportProgress({ step: "process", status: "running", percent: 0.3, message: "Processing task..." }); const processResult = await step.do("process", async () => { // Simulate processing work await sleep(1500); return { processed: true, taskId: params.taskId, data: `Processed: ${params.taskName}` }; }); maybeThrow("process"); await this.reportProgress({ step: "process", status: "complete", percent: 0.5, message: "Processing complete - awaiting approval" }); // Step 3: Wait for human approval // Signal waiting state via progress (agent will set waitingForApproval) await this.reportProgress({ step: "approval", status: "pending", percent: 0.5, message: "Waiting for approval...", waitingForApproval: true }); // This will throw WorkflowRejectedError if rejected const approvalData = await this.waitForApproval<{ approvedAt: number }>( step, { timeout: "1 hour" } ); await this.reportProgress({ step: "approval", status: "complete", percent: 0.7, message: "Approved! Finalizing..." }); // Step 4: Finalize await this.reportProgress({ step: "finalize", status: "running", percent: 0.8, message: "Finalizing task..." }); const finalResult = await step.do("finalize", async () => { // Simulate finalization work await sleep(1000); return { ...processResult, finalized: true, approvedAt: approvalData?.approvedAt, completedAt: Date.now() }; }); maybeThrow("finalize"); await this.reportProgress({ step: "finalize", status: "complete", percent: 1, message: "Task completed successfully!" }); await step.reportComplete(finalResult); return finalResult; } } // Helper to simulate work function sleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); } // Randomly throw an error with a given probability (0-1) function maybeThrow(stepName: string, probability = 0.9): void { if (Math.random() < probability) { console.warn("throwing a random error for testing"); throw new Error( `Random failure in "${stepName}" (this is intentional for testing)` ); } } // Main request handler export default { async fetch(request: Request, env: Env, _ctx: ExecutionContext) { return ( (await routeAgentRequest(request, env)) || new Response("Not found", { status: 404 }) ); } };