branch:
server.ts
7612 bytesRaw
/**
* Long-Running Agent — Durable Fibers Demo
*
* Demonstrates:
* - spawnFiber() for fire-and-forget durable execution
* - stashFiber() for checkpointing progress that survives eviction
* - onFiberRecovered() for custom recovery after DO restart
* - onFiberComplete() for handling completion
* - cancelFiber() for stopping a running fiber
* - getFiber() for querying fiber state
* - Real-time progress via broadcast() to connected clients
*
* No API keys needed — research steps are simulated with delays.
*/
import { Agent, callable, routeAgentRequest } from "agents";
import {
withFibers,
type FiberContext,
type FiberRecoveryContext,
type FiberCompleteContext,
type FiberState
} from "agents/experimental/forever";
// ── Types shared with the client ──────────────────────────────────────
export type ResearchStep = {
name: string;
result: string;
completedAt: number;
};
export type ResearchPayload = {
topic: string;
steps: string[];
};
export type ResearchSnapshot = {
topic: string;
completedSteps: ResearchStep[];
currentStep: string;
totalSteps: number;
};
export type AgentState = {
activeFiberId: string | null;
};
export type ProgressMessage =
| {
type: "research:started";
fiberId: string;
topic: string;
steps: string[];
}
| {
type: "research:step";
fiberId: string;
step: string;
stepIndex: number;
totalSteps: number;
result: string;
}
| {
type: "research:complete";
fiberId: string;
results: ResearchStep[];
}
| {
type: "research:recovered";
fiberId: string;
skippedSteps: number;
remainingSteps: number;
}
| {
type: "research:failed";
fiberId: string;
error: string;
}
| {
type: "research:cancelled";
fiberId: string;
};
// ── Simulated research work ───────────────────────────────────────────
const RESEARCH_FINDINGS: Record<string, string[]> = {
default: [
"Found 47 relevant papers from the last 5 years.",
"Identified 3 major competing approaches in the literature.",
"Cross-referenced citations reveal a key insight connecting two subfields.",
"Statistical meta-analysis shows a strong effect size (d=0.82).",
"Synthesized findings into a coherent narrative with 5 key takeaways."
]
};
function getFindings(topic: string): string[] {
return RESEARCH_FINDINGS[topic.toLowerCase()] || RESEARCH_FINDINGS.default;
}
// ── The Agent ─────────────────────────────────────────────────────────
const FiberAgent = withFibers(Agent, { debugFibers: true });
export class ResearchAgent extends FiberAgent<Env, AgentState> {
initialState: AgentState = { activeFiberId: null };
// ── Research fiber method ───────────────────────────────────────
async doResearch(
payload: ResearchPayload,
fiberCtx: FiberContext
): Promise<{ results: ResearchStep[] }> {
const { topic, steps } = payload;
const findings = getFindings(topic);
const snapshot = fiberCtx.snapshot as ResearchSnapshot | null;
const completedSteps = snapshot?.completedSteps ?? [];
const startIndex = completedSteps.length;
if (startIndex > 0) {
this.broadcast(
JSON.stringify({
type: "research:recovered",
fiberId: fiberCtx.id,
skippedSteps: startIndex,
remainingSteps: steps.length - startIndex
} satisfies ProgressMessage)
);
}
for (let i = startIndex; i < steps.length; i++) {
const step = steps[i];
const duration = 1000 + Math.random() * 1000;
await new Promise((resolve) => setTimeout(resolve, duration));
const result =
findings[i % findings.length] || `Completed analysis for "${step}".`;
const stepResult: ResearchStep = {
name: step,
result,
completedAt: Date.now()
};
completedSteps.push(stepResult);
this.stashFiber({
topic,
completedSteps: [...completedSteps],
currentStep: step,
totalSteps: steps.length
} satisfies ResearchSnapshot);
this.broadcast(
JSON.stringify({
type: "research:step",
fiberId: fiberCtx.id,
step,
stepIndex: i,
totalSteps: steps.length,
result
} satisfies ProgressMessage)
);
}
return { results: completedSteps };
}
// ── Lifecycle hooks ─────────────────────────────────────────────
override onFiberComplete(ctx: FiberCompleteContext) {
const results = (ctx.result as { results: ResearchStep[] })?.results;
this.broadcast(
JSON.stringify({
type: "research:complete",
fiberId: ctx.id,
results: results ?? []
} satisfies ProgressMessage)
);
this.setState({ activeFiberId: null });
}
override onFiberRecovered(ctx: FiberRecoveryContext) {
this.restartFiber(ctx.id);
}
// ── Callable methods (client-facing API) ────────────────────────
@callable()
startResearch(topic: string): {
fiberId: string;
steps: string[];
} {
const steps = [
"Literature Review",
"Data Collection",
"Analysis",
"Cross-referencing",
"Synthesis"
];
const fiberId = this.spawnFiber("doResearch", {
topic,
steps
} satisfies ResearchPayload);
this.setState({ activeFiberId: fiberId });
this.broadcast(
JSON.stringify({
type: "research:started",
fiberId,
topic,
steps
} satisfies ProgressMessage)
);
return { fiberId, steps };
}
@callable()
cancelResearch(): boolean {
const { activeFiberId } = this.state;
if (!activeFiberId) return false;
const cancelled = this.cancelFiber(activeFiberId);
if (cancelled) {
this.setState({ activeFiberId: null });
this.broadcast(
JSON.stringify({
type: "research:cancelled",
fiberId: activeFiberId
} satisfies ProgressMessage)
);
}
return cancelled;
}
@callable()
getResearchStatus(): FiberState | null {
const { activeFiberId } = this.state;
if (!activeFiberId) return null;
return this.getFiber(activeFiberId);
}
@callable()
async simulateKillAndRecover(): Promise<boolean> {
const { activeFiberId } = this.state;
if (!activeFiberId) return false;
this.cancelFiber(activeFiberId);
const now = Date.now();
this.sql`
UPDATE cf_agents_fibers
SET status = 'running', updated_at = ${now}
WHERE id = ${activeFiberId}
`;
// (cancelFiber already cleared in-memory tracking)
await this.checkFibers();
return true;
}
}
// ── Request handler ───────────────────────────────────────────────────
export default {
async fetch(request: Request, env: Env, _ctx: ExecutionContext) {
return (
(await routeAgentRequest(request, env)) ||
new Response("Not found", { status: 404 })
);
}
};