/** * Chat Rooms — Multiple conversations via SubAgent facets * * Architecture: * - OverseerAgent (parent): room registry, WebSocket routing * - ChatRoom (sub-agent): messages, LLM calls, streaming via RpcTarget callback * * Data flow: * - Room list: broadcast to ALL clients via setState({ rooms }) * - Active room: per-connection via connection.setState({ activeRoomId }) * - Messages & streaming: per-connection via direct WebSocket messages * * OverseerAgent * ├── Room registry (own SQLite, shared state) * ├── Per-connection routing * │ * ├── subAgent("room-abc") → ChatRoom (isolated SQLite + LLM) * ├── subAgent("room-def") → ChatRoom (isolated SQLite + LLM) * └── subAgent("room-ghi") → ChatRoom (isolated SQLite + LLM) */ import { createWorkersAI } from "workers-ai-provider"; import { Agent, getCurrentAgent, routeAgentRequest, callable } from "agents"; import type { Connection } from "agents"; import { streamText, tool, stepCountIs } from "ai"; import { RpcTarget } from "cloudflare:workers"; import { z } from "zod"; // ───────────────────────────────────────────────────────────────────────────── // Types (shared with client) // ───────────────────────────────────────────────────────────────────────────── export type RoomInfo = { id: string; name: string; messageCount: number; createdAt: string; lastActiveAt: string; }; export type ChatMessage = { id: string; role: "user" | "assistant"; content: string; createdAt: string; }; /** Broadcast state — room list only. Synced to all clients via setState. */ export type RoomsState = { rooms: RoomInfo[]; }; /** Per-connection state — stored via connection.setState(). */ export type ConnectionData = { activeRoomId: string | null; }; /** * Messages sent to individual clients via connection.send(). * * stream-event carries a serialized UIMessageChunk from the AI SDK's * toUIMessageStream(). The client can process text-delta, reasoning-delta, * tool-input-*, etc. without importing from the AI SDK. */ export type ServerMessage = | { type: "messages"; roomId: string; messages: ChatMessage[] } | { type: "stream-start"; roomId: string; requestId: string } | { type: "stream-event"; requestId: string; event: string; replay?: boolean } | { type: "stream-done"; requestId: string; message: ChatMessage } | { type: "stream-resuming"; requestId: string }; /** Messages the client sends directly on the WebSocket (not RPC). */ export type ClientMessage = | { type: "cancel"; requestId: string } | { type: "resume-request" }; // ───────────────────────────────────────────────────────────────────────────── // ChatRoom — sub-agent with isolated SQLite + LLM calls // ───────────────────────────────────────────────────────────────────────────── export class ChatRoom extends Agent { onStart() { this.sql` CREATE TABLE IF NOT EXISTS messages ( id TEXT PRIMARY KEY, role TEXT NOT NULL, content TEXT NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) `; } /** * Stream a chat response using the AI SDK's UIMessageStream protocol. * * The sub-agent owns the full lifecycle: * store user message → load history → call LLM → stream UIMessageChunk * events via callback → store assistant response. * * Each chunk is a serialized UIMessageChunk JSON string (text-delta, * reasoning-delta, tool-input-*, etc.) relayed to the parent. */ async chatStream( userMessage: string, callback: { onEvent(json: string): void; onDone(message: ChatMessage): void; } ): Promise { // Store user message this.sql` INSERT INTO messages (id, role, content) VALUES (${crypto.randomUUID()}, 'user', ${userMessage}) `; // Load conversation history const history = this.sql<{ role: string; content: string }>` SELECT role, content FROM messages ORDER BY created_at `; // Stream the LLM response using the AI SDK's UIMessageStream protocol const workersai = createWorkersAI({ binding: this.env.AI }); const result = streamText({ model: workersai("@cf/moonshotai/kimi-k2.5", { sessionAffinity: this.sessionAffinity }), system: "You are a helpful assistant. Each chat room has its own independent " + "conversation history. Be concise and helpful. " + "You have access to tools — use them when appropriate.", messages: history.map((m) => ({ role: m.role as "user" | "assistant", content: m.content })), tools: { get_current_time: tool({ description: "Get the current date and time", inputSchema: z.object({}), execute: async () => new Date().toISOString() }), roll_dice: tool({ description: "Roll a dice with the specified number of sides", inputSchema: z.object({ sides: z.number().describe("Number of sides on the dice").default(6) }), execute: async ({ sides }) => ({ result: Math.floor(Math.random() * sides) + 1, sides }) }) }, stopWhen: stepCountIs(5) }); // Iterate over typed UIMessageChunk objects from the AI SDK. // Each chunk is serialized and relayed to the parent via the callback. // We also accumulate text from text-delta events for persistence. let accumulated = ""; for await (const chunk of result.toUIMessageStream()) { if (chunk.type === "text-delta" && "delta" in chunk) { accumulated += (chunk as { delta: string }).delta; } await callback.onEvent(JSON.stringify(chunk)); } // Store the assistant response const msgId = crypto.randomUUID(); this.sql` INSERT INTO messages (id, role, content) VALUES (${msgId}, 'assistant', ${accumulated}) `; // Get the stored message with its timestamp const stored = this.sql` SELECT id, role, content, created_at as createdAt FROM messages WHERE id = ${msgId} `; await callback.onDone(stored[0]); } getMessages(): ChatMessage[] { return this.sql` SELECT id, role, content, created_at as createdAt FROM messages ORDER BY created_at `; } getMessageCount(): number { const rows = this.sql<{ cnt: number; }>`SELECT COUNT(*) as cnt FROM messages`; return rows[0].cnt; } clearMessages(): void { this.sql`DELETE FROM messages`; } } // ───────────────────────────────────────────────────────────────────────────── // StreamRelay — RpcTarget that relays chunks from sub-agent to a WebSocket // ───────────────────────────────────────────────────────────────────────────── class StreamRelay extends RpcTarget { #connection: Connection; #requestId: string; #chunks: string[] = []; #aborted = false; constructor(connection: Connection, requestId: string) { super(); this.#connection = connection; this.#requestId = requestId; } /** Stop forwarding events (called on cancel). */ abort() { this.#aborted = true; } /** Reassign to a new connection (called on resume after reconnect). */ updateConnection(connection: Connection) { this.#connection = connection; } /** Get buffered chunks for replay on resume. */ getChunks(): string[] { return this.#chunks; } /** Relay a serialized UIMessageChunk event to the WebSocket connection. */ onEvent(json: string) { this.#chunks.push(json); if (this.#aborted) return; const msg: ServerMessage = { type: "stream-event", requestId: this.#requestId, event: json }; this.#connection.send(JSON.stringify(msg)); } onDone(message: ChatMessage) { if (this.#aborted) return; const msg: ServerMessage = { type: "stream-done", requestId: this.#requestId, message }; this.#connection.send(JSON.stringify(msg)); } } // ───────────────────────────────────────────────────────────────────────────── // OverseerAgent — room registry + WebSocket router // ───────────────────────────────────────────────────────────────────────────── export class OverseerAgent extends Agent { initialState: RoomsState = { rooms: [] }; /** Active streams keyed by requestId — used for cancel and resume. */ #activeStreams = new Map< string, { relay: StreamRelay; roomId: string; connectionId: string } >(); async onStart() { this._initRoomTable(); await this._broadcastRooms(); } /** * Handle non-RPC WebSocket messages: cancel and resume-request. * These bypass the RPC system and arrive as raw messages. */ async onMessage(connection: Connection, message: string | ArrayBuffer) { if (typeof message !== "string") return; try { const msg = JSON.parse(message) as ClientMessage; switch (msg.type) { case "cancel": { const stream = this.#activeStreams.get(msg.requestId); if (stream) { stream.relay.abort(); this.#activeStreams.delete(msg.requestId); } break; } case "resume-request": { // Find active stream matching the connection's current room const activeId = this._getActiveRoomId(connection); if (!activeId) break; for (const [requestId, stream] of this.#activeStreams) { if (stream.roomId !== activeId) continue; stream.relay.updateConnection(connection); stream.connectionId = connection.id; // Send resuming signal const resumeMsg: ServerMessage = { type: "stream-resuming", requestId }; connection.send(JSON.stringify(resumeMsg)); // Replay buffered chunks for (const chunk of stream.relay.getChunks()) { const replayMsg: ServerMessage = { type: "stream-event", requestId, event: chunk, replay: true }; connection.send(JSON.stringify(replayMsg)); } break; } break; } } } catch { /* not a ClientMessage — ignore */ } } private _initRoomTable() { this.sql` CREATE TABLE IF NOT EXISTS rooms ( id TEXT PRIMARY KEY, name TEXT NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, last_active_at DATETIME DEFAULT CURRENT_TIMESTAMP ) `; } // ─── Helpers ────────────────────────────────────────────────────────── /** Broadcast the room list to all connected clients via shared state. */ private async _broadcastRooms() { const rooms = this.sql<{ id: string; name: string; created_at: string; last_active_at: string; }>` SELECT id, name, created_at, last_active_at FROM rooms ORDER BY last_active_at DESC `; const roomInfos: RoomInfo[] = await Promise.all( rooms.map(async (r) => { const room = await this.subAgent(ChatRoom, `room-${r.id}`); return { id: r.id, name: r.name, messageCount: await room.getMessageCount(), createdAt: r.created_at, lastActiveAt: r.last_active_at }; }) ); this.setState({ rooms: roomInfos }); } /** Send the messages of a room to a specific connection. */ private async _sendRoomMessages(connection: Connection, roomId: string) { const room = await this.subAgent(ChatRoom, `room-${roomId}`); const messages = await room.getMessages(); const msg: ServerMessage = { type: "messages", roomId, messages }; connection.send(JSON.stringify(msg)); } /** Get the calling connection from the agent context. */ private _getConnection(): Connection { const { connection } = getCurrentAgent(); if (!connection) throw new Error("No connection in context"); return connection; } /** Get the active room ID for a connection. */ private _getActiveRoomId(connection: Connection): string | null { const data = connection.state as ConnectionData | null; return data?.activeRoomId ?? null; } // ─── Room CRUD ─────────────────────────────────────────────────────── @callable() async createRoom(name: string): Promise { const id = crypto.randomUUID().slice(0, 8); this.sql`INSERT INTO rooms (id, name) VALUES (${id}, ${name})`; // Auto-switch the calling connection to the new room const connection = this._getConnection(); connection.setState({ activeRoomId: id } satisfies ConnectionData); await this._sendRoomMessages(connection, id); await this._broadcastRooms(); return id; } @callable() async deleteRoom(roomId: string) { this.sql`DELETE FROM rooms WHERE id = ${roomId}`; this.deleteSubAgent(ChatRoom, `room-${roomId}`); // If the calling connection was viewing this room, clear it const connection = this._getConnection(); if (this._getActiveRoomId(connection) === roomId) { connection.setState({ activeRoomId: null } satisfies ConnectionData); } await this._broadcastRooms(); } @callable() async switchRoom(roomId: string) { const connection = this._getConnection(); connection.setState({ activeRoomId: roomId } satisfies ConnectionData); await this._sendRoomMessages(connection, roomId); } @callable() async clearRoom(roomId: string) { const room = await this.subAgent(ChatRoom, `room-${roomId}`); await room.clearMessages(); // Send empty messages to all connections viewing this room for (const conn of this.getConnections()) { if (this._getActiveRoomId(conn) === roomId) { await this._sendRoomMessages(conn, roomId); } } await this._broadcastRooms(); } @callable() async renameRoom(roomId: string, name: string) { this.sql`UPDATE rooms SET name = ${name} WHERE id = ${roomId}`; await this._broadcastRooms(); } // ─── Send message ──────────────────────────────────────────────────── /** * Send a message to the active room with streaming. * * Flow: * 1. Get the calling connection's active room * 2. Send stream-start to the connection * 3. Call chatStream() on the sub-agent with an RpcTarget relay * — the sub-agent stores the user msg, calls LLM, streams chunks * — the relay forwards each chunk to the WebSocket connection * 4. On stream-done, broadcast updated room list (message counts) */ @callable() async sendMessage(text: string, requestId: string) { const connection = this._getConnection(); const activeId = this._getActiveRoomId(connection); if (!activeId) throw new Error("No active room"); const room = await this.subAgent(ChatRoom, `room-${activeId}`); // Signal stream start const startMsg: ServerMessage = { type: "stream-start", roomId: activeId, requestId }; connection.send(JSON.stringify(startMsg)); // Stream via RpcTarget callback — sub-agent calls back to relay, // relay sends chunks to the WebSocket connection const relay = new StreamRelay(connection, requestId); // Track active stream for cancel/resume this.#activeStreams.set(requestId, { relay, roomId: activeId, connectionId: connection.id }); try { await room.chatStream(text, relay); } finally { this.#activeStreams.delete(requestId); } // Update room timestamps and broadcast new message counts this .sql`UPDATE rooms SET last_active_at = CURRENT_TIMESTAMP WHERE id = ${activeId}`; await this._broadcastRooms(); } } // ───────────────────────────────────────────────────────────────────────────── export default { async fetch(request: Request, env: Env) { return ( (await routeAgentRequest(request, env)) || new Response("Not found", { status: 404 }) ); } } satisfies ExportedHandler;