branch:
server.ts
14714 bytesRaw
/**
* Workflow Demo - Task Processing with Approval
*
* This example demonstrates:
* - Multiple concurrent workflows with progress tracking
* - Human-in-the-loop approval gate per workflow
* - Real-time state sync to connected clients
* - Workflow list with status tracking
* - Approve/reject specific workflows from the Agent
*/
import { Agent, callable, routeAgentRequest } from "agents";
import { AgentWorkflow } from "agents/workflows";
import type {
AgentWorkflowEvent,
AgentWorkflowStep,
DefaultProgress,
WorkflowInfo
} from "agents/workflows";
// Workflow parameters
type TaskParams = {
taskId: string;
taskName: string;
};
// Persisted UI state stored in workflow metadata
// This survives page refreshes unlike real-time progress updates
type PersistedUIState = {
waitingForApproval?: boolean;
lastProgress?: DefaultProgress;
};
// Workflow metadata structure
type WorkflowMetadata = {
taskName: string;
uiState?: PersistedUIState;
};
// UI status (subset of WorkflowInfo status with "waiting" for approval UI)
type UIStatus =
| "queued"
| "running"
| "waiting"
| "complete"
| "errored"
| "paused";
// Per-workflow UI state extends WorkflowInfo with transient UI fields
export type WorkflowItem = Omit<WorkflowInfo, "status"> & {
/** UI-friendly status */
status: UIStatus;
/** Display name for the task */
taskName: string;
/** Real-time progress (not persisted in tracking table) */
progress: DefaultProgress | null;
/** Whether workflow is waiting for user approval */
waitingForApproval: boolean;
/** Workflow result (not persisted in tracking table) */
result?: unknown;
};
// Shared agent state (minimal - real-time updates only)
// Pagination is client-side, not shared
export type AgentState = Record<string, never>; // Empty state
// Page result returned by listWorkflows
export type WorkflowPage = {
workflows: WorkflowItem[];
total: number;
nextCursor: string | null;
};
// Broadcast message types for real-time updates
export type WorkflowUpdate =
| { type: "workflow:added"; workflow: WorkflowItem }
| {
type: "workflow:updated";
workflowId: string;
updates: Partial<WorkflowItem>;
}
| { type: "workflow:removed"; workflowId: string };
/**
* TaskAgent - manages multiple task workflows and syncs state to clients
*/
export class TaskAgent extends Agent<Env, AgentState> {
// Empty state - pagination is client-side
initialState: AgentState = {};
/**
* Convert WorkflowInfo to WorkflowItem for UI
* Reads persisted UI state from metadata.uiState
*/
private toWorkflowItem(
info: WorkflowInfo,
overrides: Partial<WorkflowItem> = {}
): WorkflowItem {
const metadata = info.metadata as WorkflowMetadata | null;
const uiState = metadata?.uiState;
// Use overrides first, then persisted uiState, then defaults
const waitingForApproval =
overrides.waitingForApproval ?? uiState?.waitingForApproval ?? false;
const progress = overrides.progress ?? uiState?.lastProgress ?? null;
return {
...info,
status: this.mapStatus(info.status, waitingForApproval),
taskName: metadata?.taskName || "Unknown Task",
progress,
waitingForApproval,
result: overrides.result
};
}
/**
* Update persisted UI state in metadata
*/
private updateUIState(workflowId: string, uiState: PersistedUIState): void {
const workflow = this.getWorkflow(workflowId);
if (!workflow) return;
const metadata = (workflow.metadata as WorkflowMetadata) || {};
const updatedMetadata: WorkflowMetadata = {
...metadata,
uiState: { ...metadata.uiState, ...uiState }
};
try {
this.sql`
UPDATE cf_agents_workflows
SET metadata = ${JSON.stringify(updatedMetadata)}
WHERE workflow_id = ${workflowId}
`;
} catch (err) {
console.error(`Failed to update UI state for ${workflowId}:`, err);
}
}
/**
* Clear persisted UI state from metadata
*/
private clearUIState(workflowId: string): void {
const workflow = this.getWorkflow(workflowId);
if (!workflow) return;
const metadata = (workflow.metadata as WorkflowMetadata) || {};
const { uiState: _, ...rest } = metadata;
try {
this.sql`
UPDATE cf_agents_workflows
SET metadata = ${JSON.stringify(rest)}
WHERE workflow_id = ${workflowId}
`;
} catch (err) {
console.error(`Failed to clear UI state for ${workflowId}:`, err);
}
}
/**
* Broadcast a workflow update to all connected clients
*/
private broadcastUpdate(update: WorkflowUpdate): void {
this.broadcast(JSON.stringify(update));
}
/**
* Map tracking table status to UI status
*/
private mapStatus(
status: string,
waitingForApproval: boolean
): WorkflowItem["status"] {
if (waitingForApproval) return "waiting";
const known: Record<string, UIStatus> = {
queued: "queued",
running: "running",
waiting: "waiting",
paused: "paused",
complete: "complete",
errored: "errored",
terminated: "errored"
};
return known[status] ?? "running";
}
/**
* Broadcast a workflow update to all clients
*/
private updateWorkflow(
workflowId: string,
updates: Partial<WorkflowItem>
): void {
this.broadcastUpdate({ type: "workflow:updated", workflowId, updates });
}
/**
* Submit a new task for processing
*/
@callable()
async submitTask(taskName: string): Promise<WorkflowItem> {
const taskId = crypto.randomUUID();
// Start the workflow
const workflowId = await this.runWorkflow(
"TASK_WORKFLOW",
{ taskId, taskName },
{ metadata: { taskName } }
);
// Create workflow item for UI
const now = new Date();
const newWorkflow: WorkflowItem = {
id: workflowId,
workflowId,
workflowName: "TASK_WORKFLOW",
metadata: { taskName },
error: null,
createdAt: now,
updatedAt: now,
completedAt: null,
status: "queued",
taskName,
progress: { step: "starting", status: "pending", percent: 0 },
waitingForApproval: false
};
// Broadcast to all clients
this.broadcastUpdate({ type: "workflow:added", workflow: newWorkflow });
return newWorkflow;
}
/**
* Terminate a specific workflow
*/
@callable()
async terminate(workflowId: string): Promise<void> {
await this.terminateWorkflow(workflowId);
this.broadcastUpdate({ type: "workflow:removed", workflowId });
}
/**
* Pause a running workflow
*/
@callable()
async pause(workflowId: string): Promise<void> {
await this.pauseWorkflow(workflowId);
this.updateWorkflow(workflowId, { status: "paused" });
}
/**
* Resume a paused workflow
*/
@callable()
async resume(workflowId: string): Promise<void> {
await this.resumeWorkflow(workflowId);
this.updateWorkflow(workflowId, { status: "running" });
}
/**
* Restart a completed/errored workflow
*/
@callable()
async restart(workflowId: string): Promise<void> {
// Clear persisted UI state before restart
this.clearUIState(workflowId);
await this.restartWorkflow(workflowId);
this.updateWorkflow(workflowId, {
status: "queued",
progress: { step: "restarting", status: "pending", percent: 0 },
waitingForApproval: false,
error: undefined,
result: undefined
});
}
/**
* Approve a specific workflow
*/
@callable()
async approve(workflowId: string, reason?: string): Promise<void> {
await this.approveWorkflow(workflowId, {
reason: reason || "Approved by user",
metadata: { approvedAt: Date.now() }
});
// Clear persisted UI state
this.clearUIState(workflowId);
this.updateWorkflow(workflowId, {
waitingForApproval: false,
status: "running",
progress: {
step: "approved",
status: "running",
percent: 0.6,
message: "Approval received, continuing..."
}
});
}
/**
* Reject a specific workflow
*/
@callable()
async reject(workflowId: string, reason?: string): Promise<void> {
await this.rejectWorkflow(workflowId, {
reason: reason || "Rejected by user"
});
// State will be updated by onWorkflowError callback
}
/**
* List workflows with pagination (client-driven)
*/
@callable()
listWorkflows(cursor?: string, limit = 5): WorkflowPage {
const page = this.getWorkflows({
workflowName: "TASK_WORKFLOW",
orderBy: "desc",
limit,
cursor
});
// toWorkflowItem reads persisted UI state from metadata.uiState
const workflows = page.workflows.map((info) => this.toWorkflowItem(info));
return {
workflows,
total: page.total,
nextCursor: page.nextCursor
};
}
/**
* Clear completed and errored workflows
*/
@callable()
clearCompleted(): { clearedCount: number } {
const count = this.deleteWorkflows({ status: ["complete", "errored"] });
// Broadcast that clients should refresh their lists
this.broadcast(JSON.stringify({ type: "workflows:cleared", count }));
return { clearedCount: count };
}
/**
* Remove a specific workflow
*/
@callable()
dismissWorkflow(workflowId: string): void {
this.deleteWorkflow(workflowId);
this.broadcastUpdate({ type: "workflow:removed", workflowId });
}
// Lifecycle callbacks from workflow - broadcast updates to all clients
async onWorkflowProgress(
workflowName: string,
workflowId: string,
progress: unknown
): Promise<void> {
const p = progress as DefaultProgress & { waitingForApproval?: boolean };
console.log(`Progress: ${workflowName}/${workflowId}`, p);
const waitingForApproval = p.waitingForApproval ?? false;
// Persist UI state so it survives page refresh
this.updateUIState(workflowId, {
waitingForApproval,
lastProgress: p
});
this.updateWorkflow(workflowId, {
progress: p,
waitingForApproval,
status: waitingForApproval ? "waiting" : "running"
});
}
async onWorkflowComplete(
workflowName: string,
workflowId: string,
result?: unknown
): Promise<void> {
console.log(`Complete: ${workflowName}/${workflowId}`, result);
// Clear persisted UI state on completion
this.clearUIState(workflowId);
this.updateWorkflow(workflowId, {
progress: {
step: "done",
status: "complete",
percent: 1,
message: "Task completed!"
},
status: "complete",
result,
waitingForApproval: false
});
}
async onWorkflowError(
workflowName: string,
workflowId: string,
error: string
): Promise<void> {
console.log(`Error: ${workflowName}/${workflowId}`, error);
// Clear persisted UI state on error
this.clearUIState(workflowId);
this.updateWorkflow(workflowId, {
progress: { step: "error", status: "error", percent: 0, message: error },
status: "errored",
error: { name: "WorkflowError", message: error },
waitingForApproval: false
});
}
}
/**
* TaskProcessingWorkflow - multi-step workflow with approval gate
*/
export class TaskProcessingWorkflow extends AgentWorkflow<
TaskAgent,
TaskParams
> {
async run(event: AgentWorkflowEvent<TaskParams>, step: AgentWorkflowStep) {
const params = event.payload;
console.log(`Starting workflow for task: ${params.taskName}`);
// Step 1: Validate
await this.reportProgress({
step: "validate",
status: "running",
percent: 0.1,
message: "Validating task..."
});
await step.do("validate", async () => {
// Simulate validation work
await sleep(1000);
return { valid: true };
});
maybeThrow("validate");
await this.reportProgress({
step: "validate",
status: "complete",
percent: 0.25,
message: "Validation complete"
});
// Step 2: Process
await this.reportProgress({
step: "process",
status: "running",
percent: 0.3,
message: "Processing task..."
});
const processResult = await step.do("process", async () => {
// Simulate processing work
await sleep(1500);
return {
processed: true,
taskId: params.taskId,
data: `Processed: ${params.taskName}`
};
});
maybeThrow("process");
await this.reportProgress({
step: "process",
status: "complete",
percent: 0.5,
message: "Processing complete - awaiting approval"
});
// Step 3: Wait for human approval
// Signal waiting state via progress (agent will set waitingForApproval)
await this.reportProgress({
step: "approval",
status: "pending",
percent: 0.5,
message: "Waiting for approval...",
waitingForApproval: true
});
// This will throw WorkflowRejectedError if rejected
const approvalData = await this.waitForApproval<{ approvedAt: number }>(
step,
{
timeout: "1 hour"
}
);
await this.reportProgress({
step: "approval",
status: "complete",
percent: 0.7,
message: "Approved! Finalizing..."
});
// Step 4: Finalize
await this.reportProgress({
step: "finalize",
status: "running",
percent: 0.8,
message: "Finalizing task..."
});
const finalResult = await step.do("finalize", async () => {
// Simulate finalization work
await sleep(1000);
return {
...processResult,
finalized: true,
approvedAt: approvalData?.approvedAt,
completedAt: Date.now()
};
});
maybeThrow("finalize");
await this.reportProgress({
step: "finalize",
status: "complete",
percent: 1,
message: "Task completed successfully!"
});
await step.reportComplete(finalResult);
return finalResult;
}
}
// Helper to simulate work
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
// Randomly throw an error with a given probability (0-1)
function maybeThrow(stepName: string, probability = 0.9): void {
if (Math.random() < probability) {
console.warn("throwing a random error for testing");
throw new Error(
`Random failure in "${stepName}" (this is intentional for testing)`
);
}
}
// Main request handler
export default {
async fetch(request: Request, env: Env, _ctx: ExecutionContext) {
return (
(await routeAgentRequest(request, env)) ||
new Response("Not found", { status: 404 })
);
}
};