branch:
server.ts
4135 bytesRaw
import { createWorkersAI } from "workers-ai-provider";
import { routeAgentRequest } from "agents";
import { AIChatAgent } from "@cloudflare/ai-chat";
import {
streamText,
convertToModelMessages,
createUIMessageStream,
createUIMessageStreamResponse
} from "ai";
/**
* Resumable Streaming Chat Agent
*
* This example demonstrates automatic resumable streaming built into AIChatAgent.
* When a client disconnects and reconnects during streaming:
* 1. The server automatically detects the active stream
* 2. Sends CF_AGENT_STREAM_RESUMING notification
* 3. Client ACKs and receives all buffered chunks
*
* No special setup required - just use onChatMessage() as usual.
*
* It also demonstrates data parts — typed JSON blobs attached to messages
* alongside text. Three patterns are shown:
*
* 1. data-sources — reconciliation (loading → found, same type+id updates in-place)
* 2. data-thinking — ephemeral status (transient, not persisted)
* 3. data-usage — persisted metadata (token counts & latency, survives reload)
*/
export class ResumableStreamingChat extends AIChatAgent {
/**
* Handle incoming chat messages.
*/
async onChatMessage() {
// Grab the last user message for the simulated source lookup
let lastUserMsg;
for (let i = this.messages.length - 1; i >= 0; i--) {
if (this.messages[i].role === "user") {
lastUserMsg = this.messages[i];
break;
}
}
const query =
lastUserMsg?.parts
.filter((p): p is { type: "text"; text: string } => p.type === "text")
.map((p) => p.text)
.join("") || "unknown";
const workersai = createWorkersAI({ binding: this.env.AI });
const modelId = "@cf/moonshotai/kimi-k2.5";
const startTime = Date.now();
const stream = createUIMessageStream({
execute: async ({ writer }) => {
const result = streamText({
model: workersai(modelId, {
sessionAffinity: this.sessionAffinity
}),
messages: await convertToModelMessages(this.messages)
});
// Merge the LLM stream first — subsequent writer.write() calls
// interleave into the same message on the client.
writer.merge(result.toUIMessageStream());
// First write: "searching" state
writer.write({
type: "data-sources",
id: "src-1",
data: { query, status: "searching", results: [] }
});
// Simulate a lookup delay
await new Promise((r) => setTimeout(r, 1000));
// Second write with same type+id: replaces "searching" in-place.
// Results are hardcoded here to keep the example focused on the
// data parts plumbing — in a real app you'd query a vector DB or
// search index and return actual results.
writer.write({
type: "data-sources",
id: "src-1",
data: {
query,
status: "found",
results: [
"Cloudflare Agents SDK docs",
"AI SDK streaming reference",
"Durable Objects SQLite guide"
]
}
});
// Transient "thinking" data part — not persisted to message.parts
await new Promise((r) => setTimeout(r, 300));
writer.write({
transient: true,
type: "data-thinking",
data: { model: modelId, startedAt: new Date().toISOString() }
});
// Wait for the LLM to finish so we can report token counts
const usage = await result.usage;
writer.write({
type: "data-usage",
data: {
model: modelId,
inputTokens: usage?.inputTokens ?? 0,
outputTokens: usage?.outputTokens ?? 0,
latencyMs: Date.now() - startTime
}
});
}
});
return createUIMessageStreamResponse({ stream });
}
}
export default {
async fetch(request: Request, env: Env) {
return (
(await routeAgentRequest(request, env)) ||
new Response("Not found", { status: 404 })
);
}
} satisfies ExportedHandler<Env>;