import { useAgent } from "agents/react"; import { useState } from "react"; import { Button, Input, Surface, Text } from "@cloudflare/kumo"; import { DemoWrapper } from "../../layout"; import { LogPanel, ConnectionStatus, CodeExplanation, type CodeSection } from "../../components"; import { useLogs, useUserId, useToast } from "../../hooks"; import type { ManagerAgent, ManagerState } from "./manager-agent"; import type { WorkerResult } from "./fanout-worker-agent"; const codeSections: CodeSection[] = [ { title: "Fan out work to parallel agents", description: "The manager splits work into chunks and spawns worker agents using getAgentByName(). Each worker processes its chunk concurrently as a separate Durable Object, and results are aggregated with Promise.all().", code: `import { Agent, callable, getAgentByName } from "agents"; class ManagerAgent extends Agent { @callable() async processItems(items: string[], workerCount: number) { const chunkSize = Math.ceil(items.length / workerCount); const chunks = []; for (let i = 0; i < items.length; i += chunkSize) { chunks.push(items.slice(i, i + chunkSize)); } const results = await Promise.all( chunks.map(async (chunk, i) => { const worker = await getAgentByName( this.env.FanoutWorkerAgent, \`worker-\${i}\` ); return worker.processChunk(\`worker-\${i}\`, chunk); }) ); return results; } }` }, { title: "Worker agents process independently", description: "Each worker is a separate Durable Object with its own isolated execution. Workers do not need @callable — the manager calls them directly via Durable Object RPC.", code: `class FanoutWorkerAgent extends Agent { async processChunk(workerId: string, items: string[]) { const processed = items.map(item => { return item.toUpperCase(); }); return { workerId, items, processed }; } }` } ]; const PRESETS = [ "apple, banana, cherry, date, elderberry, fig, grape, honeydew", "react, vue, svelte, angular, solid, preact, lit, qwik", "paris, london, tokyo, new york, sydney, berlin, rome, cairo, mumbai, toronto, seoul, lisbon" ]; export function WorkersDemo() { const userId = useUserId(); const { logs, addLog, clearLogs } = useLogs(); const { toast } = useToast(); const [items, setItems] = useState(PRESETS[0]); const [workerCount, setWorkerCount] = useState("3"); const [isProcessing, setIsProcessing] = useState(false); const [lastRun, setLastRun] = useState(null); const agent = useAgent({ agent: "manager-agent", name: `workers-demo-${userId}`, onOpen: () => addLog("info", "connected"), onClose: () => addLog("info", "disconnected"), onError: () => addLog("error", "error", "Connection error"), onStateUpdate: (newState) => { if (newState?.lastRun) setLastRun(newState.lastRun); } }); const handleProcess = async () => { const parsed = items .split(",") .map((s) => s.trim()) .filter(Boolean); if (parsed.length === 0) return; setIsProcessing(true); addLog("out", "processItems", { items: parsed.length, workers: Number(workerCount) }); try { const result = await agent.call("processItems", [ parsed, Number(workerCount) ]); addLog("in", "result", { workers: (result as ManagerState["lastRun"])?.workerCount, totalMs: (result as ManagerState["lastRun"])?.totalDuration }); setLastRun(result as ManagerState["lastRun"]); const run = result as ManagerState["lastRun"]; toast( "Processed by " + run?.workerCount + " workers in " + run?.totalDuration + "ms", "success" ); } catch (e) { addLog("error", "error", e instanceof Error ? e.message : String(e)); } finally { setIsProcessing(false); } }; return ( A manager agent splits work into chunks and distributes them across multiple worker agents using{" "} getAgentByName() . Workers process their chunks concurrently as separate Durable Objects, and the manager aggregates results with{" "} Promise.all() . Enter some items below and fan them out. } statusIndicator={ } >
{/* Controls */}
{/* Input */}
Items to Process
) => setItems(e.target.value) } className="w-full mb-3" placeholder="apple, banana, cherry, ..." />
{PRESETS.map((preset, i) => ( ))}
{/* Worker Count */}
Workers
{["1", "2", "3", "4"].map((n) => ( ))}
{/* Results */} {lastRun && (
Results {lastRun.totalDuration}ms total
{lastRun.results.map((worker: WorkerResult) => (
{worker.workerId} {worker.duration}ms
{worker.processed.map((result: string, j: number) => (
{result}
))}
))}
)}
{/* Logs */}
); }