branch:
client.tsx
25017 bytesRaw
/**
* Chat Rooms — Client
*
* Left sidebar: room list with create/delete/clear.
* Main area: chat for the active room.
*
* Data sources:
* - Room list: from Agent state sync (useAgent onStateUpdate)
* - Chat messages & streaming: useChat with custom AgentChatTransport
* - Room CRUD: via agent.call() RPC
*
* The AgentChatTransport bridges the AI SDK's useChat hook with the Agent
* WebSocket connection: sendMessages() triggers the server-side RPC, then
* pipes WS stream-event messages into a ReadableStream<UIMessageChunk>
* that useChat consumes and renders.
*/
import {
Suspense,
useCallback,
useState,
useEffect,
useRef,
useMemo
} from "react";
import { useAgent } from "agents/react";
import { useChat } from "@ai-sdk/react";
import type { UIMessage, UIMessageChunk, ChatTransport } from "ai";
import { Button, Badge, InputArea, Empty, Text } from "@cloudflare/kumo";
import {
ConnectionIndicator,
ModeToggle,
PoweredByAgents,
type ConnectionStatus
} from "@cloudflare/agents-ui";
import {
PaperPlaneRightIcon,
PlusIcon,
ChatCircleIcon,
BroomIcon,
HashIcon
} from "@phosphor-icons/react";
import { Streamdown } from "streamdown";
import type { RoomsState, RoomInfo, ChatMessage } from "./server";
// ─── Helpers ──────────────────────────────────────────────────────────────
/** Convert server-side ChatMessage[] to UIMessage[] for useChat */
function chatToUIMessages(msgs: ChatMessage[]): UIMessage[] {
return msgs.map((m) => ({
id: m.id,
role: m.role,
parts: [{ type: "text" as const, text: m.content }]
}));
}
/** Extract concatenated text from a UIMessage's text parts */
function getMessageText(msg: UIMessage): string {
return msg.parts
.filter((p): p is { type: "text"; text: string } => p.type === "text")
.map((p) => p.text)
.join("");
}
// ─── Custom Transport ─────────────────────────────────────────────────────
/** Minimal interface for the agent socket used by the transport */
interface AgentSocket {
addEventListener(
type: "message",
handler: (event: MessageEvent) => void,
options?: { signal?: AbortSignal }
): void;
removeEventListener(
type: "message",
handler: (event: MessageEvent) => void
): void;
call(method: string, args?: unknown[]): Promise<unknown>;
send(data: string): void;
}
/**
* Bridges useChat with the Agent WebSocket connection.
*
* Features:
* - Request ID correlation: each request gets a unique ID, only matching
* WS messages are processed
* - Cancel: sends { type: "cancel", requestId } to stop server-side streaming
* - Completion guard: close/error/abort are idempotent
* - Signal-based cleanup: uses AbortController signal on addEventListener
* - Stream resumption: reconnectToStream sends resume-request, server replays
* buffered chunks
*/
class AgentChatTransport implements ChatTransport<UIMessage> {
#agent: AgentSocket;
#activeRequestIds = new Set<string>();
#currentFinish: (() => void) | null = null;
constructor(agent: AgentSocket) {
this.#agent = agent;
}
/**
* Silently close the client-side stream without cancelling the server.
* The server keeps generating; when the user switches back, switchRoom
* will fetch the completed messages.
*/
detach() {
this.#currentFinish?.();
this.#currentFinish = null;
}
async sendMessages({
messages,
abortSignal
}: Parameters<ChatTransport<UIMessage>["sendMessages"]>[0]): Promise<
ReadableStream<UIMessageChunk>
> {
const lastMessage = messages[messages.length - 1];
const text = getMessageText(lastMessage);
const requestId = crypto.randomUUID().slice(0, 8);
let completed = false;
const abortController = new AbortController();
let streamController!: ReadableStreamDefaultController<UIMessageChunk>;
// Single cleanup helper — every terminal path goes through here once
const finish = (action: () => void) => {
if (completed) return;
completed = true;
this.#currentFinish = null;
try {
action();
} catch {
/* stream may already be closed */
}
this.#activeRequestIds.delete(requestId);
abortController.abort();
};
// Expose a detach-friendly finish that closes the stream gracefully
this.#currentFinish = () => finish(() => streamController.close());
// Abort handler: notify server, then terminate stream
const onAbort = () => {
if (completed) return;
try {
this.#agent.send(JSON.stringify({ type: "cancel", requestId }));
} catch {
/* ignore send failures */
}
finish(() =>
streamController.error(
Object.assign(new Error("Aborted"), { name: "AbortError" })
)
);
};
const stream = new ReadableStream<UIMessageChunk>({
start(controller) {
streamController = controller;
},
cancel() {
onAbort();
}
});
// Listen for stream events filtered by requestId
this.#agent.addEventListener(
"message",
(event: MessageEvent) => {
if (typeof event.data !== "string") return;
try {
const msg = JSON.parse(event.data);
if (msg.requestId !== requestId) return;
if (msg.type === "stream-event") {
const chunk: UIMessageChunk = JSON.parse(msg.event);
streamController.enqueue(chunk);
} else if (msg.type === "stream-done") {
finish(() => streamController.close());
}
} catch {
/* ignore parse errors */
}
},
{ signal: abortController.signal }
);
// Handle abort from caller
if (abortSignal) {
abortSignal.addEventListener("abort", onAbort, { once: true });
if (abortSignal.aborted) onAbort();
}
// Track this request
this.#activeRequestIds.add(requestId);
// Fire-and-forget RPC — response comes via WS events
this.#agent.call("sendMessage", [text, requestId]).catch((error: Error) => {
finish(() => streamController.error(error));
});
return stream;
}
async reconnectToStream(): Promise<ReadableStream<UIMessageChunk> | null> {
return new Promise<ReadableStream<UIMessageChunk> | null>((resolve) => {
let resolved = false;
let timeout: ReturnType<typeof setTimeout> | undefined;
const done = (value: ReadableStream<UIMessageChunk> | null) => {
if (resolved) return;
resolved = true;
if (timeout) clearTimeout(timeout);
this.#agent.removeEventListener("message", handler);
resolve(value);
};
const handler = (event: MessageEvent) => {
if (typeof event.data !== "string") return;
try {
const msg = JSON.parse(event.data);
if (msg.type === "stream-resuming") {
done(this.#createResumeStream(msg.requestId));
}
} catch {
/* ignore */
}
};
this.#agent.addEventListener("message", handler);
try {
this.#agent.send(JSON.stringify({ type: "resume-request" }));
} catch {
/* WebSocket may not be open yet */
}
// Short timeout: server responds immediately if there's an active stream
timeout = setTimeout(() => done(null), 500);
});
}
/** Create a ReadableStream that receives resumed stream chunks. */
#createResumeStream(requestId: string): ReadableStream<UIMessageChunk> {
const abortController = new AbortController();
let completed = false;
const finish = (action: () => void) => {
if (completed) return;
completed = true;
try {
action();
} catch {
/* stream may already be closed */
}
this.#activeRequestIds.delete(requestId);
abortController.abort();
};
this.#activeRequestIds.add(requestId);
return new ReadableStream<UIMessageChunk>({
start: (controller) => {
this.#agent.addEventListener(
"message",
(event: MessageEvent) => {
if (typeof event.data !== "string") return;
try {
const msg = JSON.parse(event.data);
if (msg.requestId !== requestId) return;
if (msg.type === "stream-event") {
const chunk: UIMessageChunk = JSON.parse(msg.event);
controller.enqueue(chunk);
} else if (msg.type === "stream-done") {
finish(() => controller.close());
}
} catch {
/* ignore */
}
},
{ signal: abortController.signal }
);
},
cancel() {
finish(() => {});
}
});
}
}
// ─── Room Sidebar ──────────────────────────────────────────────────────────
function RoomSidebar({
rooms,
activeRoomId,
onSwitch,
onCreate,
onDelete,
onClear
}: {
rooms: RoomInfo[];
activeRoomId: string | null;
onSwitch: (id: string) => void;
onCreate: () => void;
onDelete: (id: string) => void;
onClear: (id: string) => void;
}) {
return (
<div className="flex flex-col h-full">
<div className="px-4 py-3 border-b border-kumo-line flex items-center justify-between">
<div className="flex items-center gap-2">
<ChatCircleIcon size={18} className="text-kumo-brand" />
<Text size="sm" bold>
Rooms
</Text>
<Badge variant="secondary">{rooms.length}</Badge>
</div>
<Button
variant="primary"
size="sm"
icon={<PlusIcon size={14} />}
onClick={onCreate}
>
New
</Button>
</div>
<div className="flex-1 overflow-y-auto p-2 space-y-1">
{rooms.length === 0 && (
<div className="px-2 py-8 text-center">
<Text size="xs" variant="secondary">
No rooms yet. Create one to start chatting.
</Text>
</div>
)}
{rooms.map((room) => {
const isActive = room.id === activeRoomId;
return (
<div
key={room.id}
// oxlint-disable-next-line prefer-tag-over-role
role="button"
tabIndex={0}
className={`group rounded-lg px-3 py-2 cursor-pointer transition-colors w-full text-left ${
isActive
? "bg-kumo-tint ring-1 ring-kumo-ring"
: "hover:bg-kumo-tint/50"
}`}
onClick={() => onSwitch(room.id)}
onKeyDown={(e) => {
if (e.key === "Enter" || e.key === " ") {
e.preventDefault();
onSwitch(room.id);
}
}}
>
<div className="flex items-center justify-between">
<div className="flex items-center gap-2 min-w-0">
<HashIcon
size={14}
className={
isActive ? "text-kumo-brand" : "text-kumo-inactive"
}
/>
<Text size="sm" bold>
{room.name}
</Text>
</div>
{room.messageCount > 0 && (
<Badge variant="secondary">{room.messageCount}</Badge>
)}
</div>
<div
className={`flex items-center gap-1 mt-1.5 ${
isActive ? "opacity-100" : "opacity-0 group-hover:opacity-100"
} transition-opacity`}
>
<Button
variant="secondary"
size="sm"
onClick={(e) => {
e.stopPropagation();
onClear(room.id);
}}
>
Clear
</Button>
<Button
variant="destructive"
size="sm"
onClick={(e) => {
e.stopPropagation();
onDelete(room.id);
}}
>
Delete
</Button>
</div>
</div>
);
})}
</div>
</div>
);
}
// ─── Messages ──────────────────────────────────────────────────────────────
function Messages({
messages,
status
}: {
messages: UIMessage[];
status: string;
}) {
const endRef = useRef<HTMLDivElement>(null);
const isBusy = status === "submitted" || status === "streaming";
useEffect(() => {
endRef.current?.scrollIntoView({ behavior: "smooth" });
}, [messages, isBusy]);
if (messages.length === 0 && !isBusy) {
return (
<Empty
icon={<ChatCircleIcon size={32} />}
title="Empty room"
description="Type a message below to start the conversation"
/>
);
}
return (
<div className="space-y-4">
{messages.map((msg) => (
<div key={msg.id}>
{msg.role === "user" ? (
<div className="flex justify-end">
<div className="max-w-[85%] px-4 py-2.5 rounded-2xl rounded-br-md bg-kumo-contrast text-kumo-inverse leading-relaxed">
{getMessageText(msg)}
</div>
</div>
) : (
<div className="flex justify-start">
<div className="max-w-[85%] rounded-2xl rounded-bl-md bg-kumo-base text-kumo-default leading-relaxed overflow-hidden">
{msg.parts.map((part, i) => {
if (part.type === "reasoning") {
return (
<details
key={i}
className="px-4 py-2 border-b border-kumo-line"
open={"state" in part && part.state === "streaming"}
>
<summary className="cursor-pointer text-xs text-kumo-inactive select-none">
Reasoning
</summary>
<div className="mt-1 text-xs text-kumo-secondary italic whitespace-pre-wrap">
{part.text}
</div>
</details>
);
}
if ("toolName" in part && "toolCallId" in part) {
const tp = part as unknown as {
toolName: string;
toolCallId: string;
state: string;
input: unknown;
output?: unknown;
};
return (
<div
key={i}
className="px-4 py-2.5 border-b border-kumo-line"
>
<div className="flex items-center gap-2">
<Text size="xs" bold>
{tp.toolName}
</Text>
<Badge variant="secondary">{tp.state}</Badge>
</div>
{tp.input != null &&
Object.keys(tp.input as Record<string, unknown>)
.length > 0 && (
<pre className="mt-1 text-xs text-kumo-secondary overflow-auto">
{JSON.stringify(tp.input, null, 2)}
</pre>
)}
{tp.state === "output-available" &&
tp.output != null && (
<pre className="mt-1 text-xs text-kumo-brand overflow-auto">
{JSON.stringify(tp.output, null, 2)}
</pre>
)}
</div>
);
}
if (part.type === "text") {
return (
<Streamdown
key={i}
className="sd-theme px-4 py-2.5"
controls={false}
isAnimating={
"state" in part && part.state === "streaming"
}
>
{part.text}
</Streamdown>
);
}
return null;
})}
</div>
</div>
)}
</div>
))}
{status === "submitted" && (
<div className="flex justify-start">
<div className="px-4 py-2.5 rounded-2xl rounded-bl-md bg-kumo-base">
<div className="flex items-center gap-2">
<div className="w-2 h-2 bg-kumo-brand rounded-full animate-pulse" />
<Text size="xs" variant="secondary">
Thinking...
</Text>
</div>
</div>
</div>
)}
<div ref={endRef} />
</div>
);
}
// ─── Main ──────────────────────────────────────────────────────────────────
function App() {
const [connectionStatus, setConnectionStatus] =
useState<ConnectionStatus>("connecting");
const [rooms, setRooms] = useState<RoomInfo[]>([]);
const [activeRoomId, setActiveRoomId] = useState<string | null>(null);
const [input, setInput] = useState("");
// Ref bridges useAgent's onMessage → useChat's setMessages
// (useChat is declared after useAgent, so we use a ref to avoid ordering issues)
const setChatMessagesRef = useRef<((messages: UIMessage[]) => void) | null>(
null
);
const handleServerMessage = useCallback((event: MessageEvent) => {
if (typeof event.data !== "string") return;
try {
const msg = JSON.parse(event.data);
if (msg.type === "messages") {
setActiveRoomId(msg.roomId);
setChatMessagesRef.current?.(chatToUIMessages(msg.messages));
}
// stream-start, stream-event, stream-done are handled by the transport
} catch {
/* ignore parse errors */
}
}, []);
const agent = useAgent<RoomsState>({
agent: "OverseerAgent",
onOpen: useCallback(() => setConnectionStatus("connected"), []),
onClose: useCallback(() => setConnectionStatus("disconnected"), []),
onError: useCallback(
(error: Event) => console.error("WebSocket error:", error),
[]
),
onStateUpdate: useCallback(
(state: RoomsState) => setRooms(state.rooms),
[]
),
onMessage: handleServerMessage
});
const transport = useMemo(() => new AgentChatTransport(agent), [agent]);
const {
messages,
setMessages: setChatMessages,
sendMessage,
resumeStream,
status
} = useChat({ transport });
// Keep the ref in sync so onMessage can call setChatMessages
setChatMessagesRef.current = setChatMessages;
const isConnected = connectionStatus === "connected";
const isBusy = status === "submitted" || status === "streaming";
const handleCreate = useCallback(async () => {
const name = `Room ${(rooms.length ?? 0) + 1}`;
await agent.call("createRoom", [name]);
}, [agent, rooms]);
const handleDelete = useCallback(
async (id: string) => {
transport.detach();
await agent.call("deleteRoom", [id]);
if (activeRoomId === id) {
setActiveRoomId(null);
setChatMessages([]);
}
},
[agent, activeRoomId, setChatMessages, transport]
);
const handleClear = useCallback(
async (id: string) => agent.call("clearRoom", [id]),
[agent]
);
const handleSwitch = useCallback(
async (id: string) => {
transport.detach();
await agent.call("switchRoom", [id]);
// If the target room has an active stream, resume it.
// The server filters by the connection's active room.
resumeStream();
},
[agent, transport, resumeStream]
);
const send = useCallback(() => {
const text = input.trim();
if (!text || isBusy || !activeRoomId) return;
setInput("");
sendMessage({ text });
}, [input, isBusy, activeRoomId, sendMessage]);
const activeRoom = rooms.find((r) => r.id === activeRoomId);
return (
<div className="flex h-screen bg-kumo-elevated">
{/* Left: Room sidebar */}
<div className="w-[260px] bg-kumo-base border-r border-kumo-line shrink-0">
<RoomSidebar
rooms={rooms}
activeRoomId={activeRoomId}
onSwitch={handleSwitch}
onCreate={handleCreate}
onDelete={handleDelete}
onClear={handleClear}
/>
</div>
{/* Main: Chat */}
<div className="flex flex-col flex-1 min-w-0">
<header className="px-5 py-4 bg-kumo-base border-b border-kumo-line">
<div className="flex items-center justify-between">
<div className="flex items-center gap-3">
{activeRoom ? (
<>
<HashIcon size={20} className="text-kumo-brand" />
<Text size="lg" bold>
{activeRoom.name}
</Text>
<Badge variant="secondary">
{activeRoom.messageCount} messages
</Badge>
</>
) : (
<Text size="lg" bold variant="secondary">
No room selected
</Text>
)}
</div>
<div className="flex items-center gap-3">
<ConnectionIndicator status={connectionStatus} />
<ModeToggle />
{activeRoom && (
<Button
variant="secondary"
size="sm"
icon={<BroomIcon size={14} />}
onClick={() => handleClear(activeRoom.id)}
>
Clear
</Button>
)}
</div>
</div>
</header>
<div className="flex-1 overflow-y-auto">
<div className="max-w-3xl mx-auto px-5 py-6">
{activeRoomId ? (
<Messages messages={messages} status={status} />
) : (
<Empty
icon={<ChatCircleIcon size={32} />}
title="Create a room to start"
description='Click "New" in the sidebar to create your first chat room'
/>
)}
</div>
</div>
<div className="border-t border-kumo-line bg-kumo-base">
<form
onSubmit={(e) => {
e.preventDefault();
send();
}}
className="max-w-3xl mx-auto px-5 py-4"
>
<div className="flex items-end gap-3 rounded-xl border border-kumo-line bg-kumo-base p-3 shadow-sm focus-within:ring-2 focus-within:ring-kumo-ring focus-within:border-transparent transition-shadow">
<InputArea
value={input}
onValueChange={setInput}
onKeyDown={(e) => {
if (e.key === "Enter" && !e.shiftKey) {
e.preventDefault();
send();
}
}}
placeholder={
activeRoomId ? "Type a message..." : "Create a room first..."
}
disabled={!isConnected || isBusy || !activeRoomId}
rows={2}
className="flex-1 !ring-0 focus:!ring-0 !shadow-none !bg-transparent !outline-none"
/>
<Button
type="submit"
variant="primary"
shape="square"
aria-label="Send message"
disabled={
!input.trim() || !isConnected || isBusy || !activeRoomId
}
icon={<PaperPlaneRightIcon size={18} />}
loading={isBusy}
className="mb-0.5"
/>
</div>
</form>
<div className="flex justify-center pb-3">
<PoweredByAgents />
</div>
</div>
</div>
</div>
);
}
export default function AppRoot() {
return (
<Suspense
fallback={
<div className="flex items-center justify-center h-screen text-kumo-inactive">
Loading...
</div>
}
>
<App />
</Suspense>
);
}