/** * Chat Rooms — Client * * Left sidebar: room list with create/delete/clear. * Main area: chat for the active room. * * Data sources: * - Room list: from Agent state sync (useAgent onStateUpdate) * - Chat messages & streaming: useChat with custom AgentChatTransport * - Room CRUD: via agent.call() RPC * * The AgentChatTransport bridges the AI SDK's useChat hook with the Agent * WebSocket connection: sendMessages() triggers the server-side RPC, then * pipes WS stream-event messages into a ReadableStream * that useChat consumes and renders. */ import { Suspense, useCallback, useState, useEffect, useRef, useMemo } from "react"; import { useAgent } from "agents/react"; import { useChat } from "@ai-sdk/react"; import type { UIMessage, UIMessageChunk, ChatTransport } from "ai"; import { Button, Badge, InputArea, Empty, Text } from "@cloudflare/kumo"; import { ConnectionIndicator, ModeToggle, PoweredByAgents, type ConnectionStatus } from "@cloudflare/agents-ui"; import { PaperPlaneRightIcon, PlusIcon, ChatCircleIcon, BroomIcon, HashIcon } from "@phosphor-icons/react"; import { Streamdown } from "streamdown"; import type { RoomsState, RoomInfo, ChatMessage } from "./server"; // ─── Helpers ────────────────────────────────────────────────────────────── /** Convert server-side ChatMessage[] to UIMessage[] for useChat */ function chatToUIMessages(msgs: ChatMessage[]): UIMessage[] { return msgs.map((m) => ({ id: m.id, role: m.role, parts: [{ type: "text" as const, text: m.content }] })); } /** Extract concatenated text from a UIMessage's text parts */ function getMessageText(msg: UIMessage): string { return msg.parts .filter((p): p is { type: "text"; text: string } => p.type === "text") .map((p) => p.text) .join(""); } // ─── Custom Transport ───────────────────────────────────────────────────── /** Minimal interface for the agent socket used by the transport */ interface AgentSocket { addEventListener( type: "message", handler: (event: MessageEvent) => void, options?: { signal?: AbortSignal } ): void; removeEventListener( type: "message", handler: (event: MessageEvent) => void ): void; call(method: string, args?: unknown[]): Promise; send(data: string): void; } /** * Bridges useChat with the Agent WebSocket connection. * * Features: * - Request ID correlation: each request gets a unique ID, only matching * WS messages are processed * - Cancel: sends { type: "cancel", requestId } to stop server-side streaming * - Completion guard: close/error/abort are idempotent * - Signal-based cleanup: uses AbortController signal on addEventListener * - Stream resumption: reconnectToStream sends resume-request, server replays * buffered chunks */ class AgentChatTransport implements ChatTransport { #agent: AgentSocket; #activeRequestIds = new Set(); #currentFinish: (() => void) | null = null; constructor(agent: AgentSocket) { this.#agent = agent; } /** * Silently close the client-side stream without cancelling the server. * The server keeps generating; when the user switches back, switchRoom * will fetch the completed messages. */ detach() { this.#currentFinish?.(); this.#currentFinish = null; } async sendMessages({ messages, abortSignal }: Parameters["sendMessages"]>[0]): Promise< ReadableStream > { const lastMessage = messages[messages.length - 1]; const text = getMessageText(lastMessage); const requestId = crypto.randomUUID().slice(0, 8); let completed = false; const abortController = new AbortController(); let streamController!: ReadableStreamDefaultController; // Single cleanup helper — every terminal path goes through here once const finish = (action: () => void) => { if (completed) return; completed = true; this.#currentFinish = null; try { action(); } catch { /* stream may already be closed */ } this.#activeRequestIds.delete(requestId); abortController.abort(); }; // Expose a detach-friendly finish that closes the stream gracefully this.#currentFinish = () => finish(() => streamController.close()); // Abort handler: notify server, then terminate stream const onAbort = () => { if (completed) return; try { this.#agent.send(JSON.stringify({ type: "cancel", requestId })); } catch { /* ignore send failures */ } finish(() => streamController.error( Object.assign(new Error("Aborted"), { name: "AbortError" }) ) ); }; const stream = new ReadableStream({ start(controller) { streamController = controller; }, cancel() { onAbort(); } }); // Listen for stream events filtered by requestId this.#agent.addEventListener( "message", (event: MessageEvent) => { if (typeof event.data !== "string") return; try { const msg = JSON.parse(event.data); if (msg.requestId !== requestId) return; if (msg.type === "stream-event") { const chunk: UIMessageChunk = JSON.parse(msg.event); streamController.enqueue(chunk); } else if (msg.type === "stream-done") { finish(() => streamController.close()); } } catch { /* ignore parse errors */ } }, { signal: abortController.signal } ); // Handle abort from caller if (abortSignal) { abortSignal.addEventListener("abort", onAbort, { once: true }); if (abortSignal.aborted) onAbort(); } // Track this request this.#activeRequestIds.add(requestId); // Fire-and-forget RPC — response comes via WS events this.#agent.call("sendMessage", [text, requestId]).catch((error: Error) => { finish(() => streamController.error(error)); }); return stream; } async reconnectToStream(): Promise | null> { return new Promise | null>((resolve) => { let resolved = false; let timeout: ReturnType | undefined; const done = (value: ReadableStream | null) => { if (resolved) return; resolved = true; if (timeout) clearTimeout(timeout); this.#agent.removeEventListener("message", handler); resolve(value); }; const handler = (event: MessageEvent) => { if (typeof event.data !== "string") return; try { const msg = JSON.parse(event.data); if (msg.type === "stream-resuming") { done(this.#createResumeStream(msg.requestId)); } } catch { /* ignore */ } }; this.#agent.addEventListener("message", handler); try { this.#agent.send(JSON.stringify({ type: "resume-request" })); } catch { /* WebSocket may not be open yet */ } // Short timeout: server responds immediately if there's an active stream timeout = setTimeout(() => done(null), 500); }); } /** Create a ReadableStream that receives resumed stream chunks. */ #createResumeStream(requestId: string): ReadableStream { const abortController = new AbortController(); let completed = false; const finish = (action: () => void) => { if (completed) return; completed = true; try { action(); } catch { /* stream may already be closed */ } this.#activeRequestIds.delete(requestId); abortController.abort(); }; this.#activeRequestIds.add(requestId); return new ReadableStream({ start: (controller) => { this.#agent.addEventListener( "message", (event: MessageEvent) => { if (typeof event.data !== "string") return; try { const msg = JSON.parse(event.data); if (msg.requestId !== requestId) return; if (msg.type === "stream-event") { const chunk: UIMessageChunk = JSON.parse(msg.event); controller.enqueue(chunk); } else if (msg.type === "stream-done") { finish(() => controller.close()); } } catch { /* ignore */ } }, { signal: abortController.signal } ); }, cancel() { finish(() => {}); } }); } } // ─── Room Sidebar ────────────────────────────────────────────────────────── function RoomSidebar({ rooms, activeRoomId, onSwitch, onCreate, onDelete, onClear }: { rooms: RoomInfo[]; activeRoomId: string | null; onSwitch: (id: string) => void; onCreate: () => void; onDelete: (id: string) => void; onClear: (id: string) => void; }) { return (
Rooms {rooms.length}
{rooms.length === 0 && (
No rooms yet. Create one to start chatting.
)} {rooms.map((room) => { const isActive = room.id === activeRoomId; return (
onSwitch(room.id)} onKeyDown={(e) => { if (e.key === "Enter" || e.key === " ") { e.preventDefault(); onSwitch(room.id); } }} >
{room.name}
{room.messageCount > 0 && ( {room.messageCount} )}
); })}
); } // ─── Messages ────────────────────────────────────────────────────────────── function Messages({ messages, status }: { messages: UIMessage[]; status: string; }) { const endRef = useRef(null); const isBusy = status === "submitted" || status === "streaming"; useEffect(() => { endRef.current?.scrollIntoView({ behavior: "smooth" }); }, [messages, isBusy]); if (messages.length === 0 && !isBusy) { return ( } title="Empty room" description="Type a message below to start the conversation" /> ); } return (
{messages.map((msg) => (
{msg.role === "user" ? (
{getMessageText(msg)}
) : (
{msg.parts.map((part, i) => { if (part.type === "reasoning") { return (
Reasoning
{part.text}
); } if ("toolName" in part && "toolCallId" in part) { const tp = part as unknown as { toolName: string; toolCallId: string; state: string; input: unknown; output?: unknown; }; return (
{tp.toolName} {tp.state}
{tp.input != null && Object.keys(tp.input as Record) .length > 0 && (
                              {JSON.stringify(tp.input, null, 2)}
                            
)} {tp.state === "output-available" && tp.output != null && (
                              {JSON.stringify(tp.output, null, 2)}
                            
)}
); } if (part.type === "text") { return ( {part.text} ); } return null; })}
)}
))} {status === "submitted" && (
Thinking...
)}
); } // ─── Main ────────────────────────────────────────────────────────────────── function App() { const [connectionStatus, setConnectionStatus] = useState("connecting"); const [rooms, setRooms] = useState([]); const [activeRoomId, setActiveRoomId] = useState(null); const [input, setInput] = useState(""); // Ref bridges useAgent's onMessage → useChat's setMessages // (useChat is declared after useAgent, so we use a ref to avoid ordering issues) const setChatMessagesRef = useRef<((messages: UIMessage[]) => void) | null>( null ); const handleServerMessage = useCallback((event: MessageEvent) => { if (typeof event.data !== "string") return; try { const msg = JSON.parse(event.data); if (msg.type === "messages") { setActiveRoomId(msg.roomId); setChatMessagesRef.current?.(chatToUIMessages(msg.messages)); } // stream-start, stream-event, stream-done are handled by the transport } catch { /* ignore parse errors */ } }, []); const agent = useAgent({ agent: "OverseerAgent", onOpen: useCallback(() => setConnectionStatus("connected"), []), onClose: useCallback(() => setConnectionStatus("disconnected"), []), onError: useCallback( (error: Event) => console.error("WebSocket error:", error), [] ), onStateUpdate: useCallback( (state: RoomsState) => setRooms(state.rooms), [] ), onMessage: handleServerMessage }); const transport = useMemo(() => new AgentChatTransport(agent), [agent]); const { messages, setMessages: setChatMessages, sendMessage, resumeStream, status } = useChat({ transport }); // Keep the ref in sync so onMessage can call setChatMessages setChatMessagesRef.current = setChatMessages; const isConnected = connectionStatus === "connected"; const isBusy = status === "submitted" || status === "streaming"; const handleCreate = useCallback(async () => { const name = `Room ${(rooms.length ?? 0) + 1}`; await agent.call("createRoom", [name]); }, [agent, rooms]); const handleDelete = useCallback( async (id: string) => { transport.detach(); await agent.call("deleteRoom", [id]); if (activeRoomId === id) { setActiveRoomId(null); setChatMessages([]); } }, [agent, activeRoomId, setChatMessages, transport] ); const handleClear = useCallback( async (id: string) => agent.call("clearRoom", [id]), [agent] ); const handleSwitch = useCallback( async (id: string) => { transport.detach(); await agent.call("switchRoom", [id]); // If the target room has an active stream, resume it. // The server filters by the connection's active room. resumeStream(); }, [agent, transport, resumeStream] ); const send = useCallback(() => { const text = input.trim(); if (!text || isBusy || !activeRoomId) return; setInput(""); sendMessage({ text }); }, [input, isBusy, activeRoomId, sendMessage]); const activeRoom = rooms.find((r) => r.id === activeRoomId); return (
{/* Left: Room sidebar */}
{/* Main: Chat */}
{activeRoom ? ( <> {activeRoom.name} {activeRoom.messageCount} messages ) : ( No room selected )}
{activeRoom && ( )}
{activeRoomId ? ( ) : ( } title="Create a room to start" description='Click "New" in the sidebar to create your first chat room' /> )}
{ e.preventDefault(); send(); }} className="max-w-3xl mx-auto px-5 py-4" >
{ if (e.key === "Enter" && !e.shiftKey) { e.preventDefault(); send(); } }} placeholder={ activeRoomId ? "Type a message..." : "Create a room first..." } disabled={!isConnected || isBusy || !activeRoomId} rows={2} className="flex-1 !ring-0 focus:!ring-0 !shadow-none !bg-transparent !outline-none" />
); } export default function AppRoot() { return ( Loading...
} > ); }