branch:
basic-workflow-agent.ts
5658 bytesRaw
import { callable } from "agents";
import { PlaygroundAgent as Agent } from "../../shared/playground-agent";
import type { ProcessingResult } from "./processing-workflow";
// Progress is transient (not persisted by SDK), so we track it in state
export interface BasicWorkflowState {
progress: Record<string, { step: number; total: number; message: string }>;
}
// Serializable workflow info for RPC (Dates converted to ISO strings)
export interface WorkflowWithProgress {
id: string;
workflowId: string;
workflowName: string;
status: string;
name: string;
error: { name: string; message: string } | null;
createdAt: string;
updatedAt: string;
completedAt: string | null;
progress?: { step: number; total: number; message: string };
}
export class BasicWorkflowAgent extends Agent<Env, BasicWorkflowState> {
initialState: BasicWorkflowState = {
progress: {}
};
// ─────────────────────────────────────────────────────────────────────────────
// Workflow lifecycle callbacks
// ─────────────────────────────────────────────────────────────────────────────
async onWorkflowProgress(
workflowName: string,
workflowId: string,
progress: { step: number; total: number; message: string }
): Promise<void> {
// Store progress in state (transient, not tracked by SDK)
this.setState({
...this.state,
progress: {
...this.state.progress,
[workflowId]: progress
}
});
// Broadcast progress to connected clients
this.broadcast(
JSON.stringify({
type: "workflow_progress",
workflowId,
workflowName,
progress
})
);
}
async onWorkflowComplete(
workflowName: string,
workflowId: string,
result?: ProcessingResult
): Promise<void> {
// Clear progress from state
const { [workflowId]: _, ...remainingProgress } = this.state.progress;
this.setState({ ...this.state, progress: remainingProgress });
this.broadcast(
JSON.stringify({
type: "workflow_complete",
workflowId,
workflowName,
result
})
);
}
async onWorkflowError(
workflowName: string,
workflowId: string,
error: string
): Promise<void> {
// Clear progress from state
const { [workflowId]: _, ...remainingProgress } = this.state.progress;
this.setState({ ...this.state, progress: remainingProgress });
this.broadcast(
JSON.stringify({
type: "workflow_error",
workflowId,
workflowName,
error
})
);
}
// ─────────────────────────────────────────────────────────────────────────────
// Callable methods
// ─────────────────────────────────────────────────────────────────────────────
@callable({ description: "Start a new processing workflow" })
async startWorkflow(name: string, stepCount: number): Promise<string> {
// Start the real workflow, storing name in metadata
const workflowId = await this.runWorkflow(
"ProcessingWorkflow",
{ name, stepCount },
{ metadata: { name, stepCount } }
);
this.broadcast(
JSON.stringify({
type: "workflow_started",
workflowId,
name,
stepCount
})
);
return workflowId;
}
@callable({ description: "Get all workflows with progress" })
listWorkflows(): WorkflowWithProgress[] {
// Get workflows from SDK tracking (returns WorkflowPage with pagination)
const { workflows } = this.getWorkflows({
workflowName: "ProcessingWorkflow"
});
// Convert to serializable format and merge with progress state
return workflows.map((w) => {
const metadata = w.metadata as { name?: string } | null;
return {
id: w.id,
workflowId: w.workflowId,
workflowName: w.workflowName,
status: w.status,
name: metadata?.name || w.workflowName,
error: w.error,
createdAt: w.createdAt.toISOString(),
updatedAt: w.updatedAt.toISOString(),
completedAt: w.completedAt?.toISOString() ?? null,
progress: this.state.progress[w.workflowId]
};
});
}
@callable({ description: "Clear completed/errored workflows" })
clearWorkflows(): number {
const count = this.deleteWorkflows({
status: ["complete", "errored", "terminated"]
});
this.broadcast(
JSON.stringify({
type: "workflows_cleared",
count
})
);
return count;
}
@callable({ description: "Get workflow stats" })
getStats(): {
queued: number;
running: number;
completed: number;
errored: number;
} {
const { workflows } = this.getWorkflows({
workflowName: "ProcessingWorkflow"
});
return {
queued: workflows.filter((w) => w.status === "queued").length,
running: workflows.filter((w) => w.status === "running").length,
completed: workflows.filter((w) => w.status === "complete").length,
errored: workflows.filter((w) => w.status === "errored").length
};
}
}