branch:
server.ts
4135 bytesRaw
import { createWorkersAI } from "workers-ai-provider";
import { routeAgentRequest } from "agents";
import { AIChatAgent } from "@cloudflare/ai-chat";
import {
  streamText,
  convertToModelMessages,
  createUIMessageStream,
  createUIMessageStreamResponse
} from "ai";

/**
 * Resumable Streaming Chat Agent
 *
 * This example demonstrates automatic resumable streaming built into AIChatAgent.
 * When a client disconnects and reconnects during streaming:
 * 1. The server automatically detects the active stream
 * 2. Sends CF_AGENT_STREAM_RESUMING notification
 * 3. Client ACKs and receives all buffered chunks
 *
 * No special setup required - just use onChatMessage() as usual.
 *
 * It also demonstrates data parts — typed JSON blobs attached to messages
 * alongside text. Three patterns are shown:
 *
 * 1. data-sources  — reconciliation (loading → found, same type+id updates in-place)
 * 2. data-thinking — ephemeral status (transient, not persisted)
 * 3. data-usage    — persisted metadata (token counts & latency, survives reload)
 */
export class ResumableStreamingChat extends AIChatAgent {
  /**
   * Handle incoming chat messages.
   */
  async onChatMessage() {
    // Grab the last user message for the simulated source lookup
    let lastUserMsg;
    for (let i = this.messages.length - 1; i >= 0; i--) {
      if (this.messages[i].role === "user") {
        lastUserMsg = this.messages[i];
        break;
      }
    }
    const query =
      lastUserMsg?.parts
        .filter((p): p is { type: "text"; text: string } => p.type === "text")
        .map((p) => p.text)
        .join("") || "unknown";

    const workersai = createWorkersAI({ binding: this.env.AI });
    const modelId = "@cf/moonshotai/kimi-k2.5";
    const startTime = Date.now();

    const stream = createUIMessageStream({
      execute: async ({ writer }) => {
        const result = streamText({
          model: workersai(modelId, {
            sessionAffinity: this.sessionAffinity
          }),
          messages: await convertToModelMessages(this.messages)
        });

        // Merge the LLM stream first — subsequent writer.write() calls
        // interleave into the same message on the client.
        writer.merge(result.toUIMessageStream());

        // First write: "searching" state
        writer.write({
          type: "data-sources",
          id: "src-1",
          data: { query, status: "searching", results: [] }
        });

        // Simulate a lookup delay
        await new Promise((r) => setTimeout(r, 1000));

        // Second write with same type+id: replaces "searching" in-place.
        // Results are hardcoded here to keep the example focused on the
        // data parts plumbing — in a real app you'd query a vector DB or
        // search index and return actual results.
        writer.write({
          type: "data-sources",
          id: "src-1",
          data: {
            query,
            status: "found",
            results: [
              "Cloudflare Agents SDK docs",
              "AI SDK streaming reference",
              "Durable Objects SQLite guide"
            ]
          }
        });

        // Transient "thinking" data part — not persisted to message.parts
        await new Promise((r) => setTimeout(r, 300));
        writer.write({
          transient: true,
          type: "data-thinking",
          data: { model: modelId, startedAt: new Date().toISOString() }
        });

        // Wait for the LLM to finish so we can report token counts
        const usage = await result.usage;

        writer.write({
          type: "data-usage",
          data: {
            model: modelId,
            inputTokens: usage?.inputTokens ?? 0,
            outputTokens: usage?.outputTokens ?? 0,
            latencyMs: Date.now() - startTime
          }
        });
      }
    });

    return createUIMessageStreamResponse({ stream });
  }
}

export default {
  async fetch(request: Request, env: Env) {
    return (
      (await routeAgentRequest(request, env)) ||
      new Response("Not found", { status: 404 })
    );
  }
} satisfies ExportedHandler<Env>;