import { Suspense, useCallback, useState, useEffect, useRef } from "react"; import { useAgent } from "agents/react"; import { useAgentChat } from "@cloudflare/ai-chat/react"; import { Button, Badge, InputArea, Empty } from "@cloudflare/kumo"; import { ConnectionIndicator, ModeToggle, PoweredByAgents, type ConnectionStatus } from "@cloudflare/agents-ui"; import { PaperPlaneRightIcon, TrashIcon, ArrowClockwiseIcon, MagnifyingGlassIcon, BrainIcon, ChartBarIcon } from "@phosphor-icons/react"; import type { UIMessage } from "ai"; // ── Typed data parts ────────────────────────────────────────── type SourcesData = { query: string; status: "searching" | "found"; results: string[]; }; type ThinkingData = { model: string; startedAt: string; }; type UsageData = { model: string; inputTokens: number; outputTokens: number; latencyMs: number; }; /** Custom message type with typed data parts. */ type ChatMessage = UIMessage< unknown, { sources: SourcesData; thinking: ThinkingData; usage: UsageData; } >; // ── Data part renderers ───────────────────────────────────────────── function SourcesPart({ data, isStreaming }: { data: SourcesData; isStreaming: boolean; }) { if (data.status === "searching") { return (
Searching for “{data.query}”…
); } return (
Sources
); } function ThinkingPart({ data }: { data: ThinkingData }) { return (
Thinking with {data.model}…
); } function UsagePart({ data }: { data: UsageData }) { const totalTokens = data.inputTokens + data.outputTokens; const latencySec = (data.latencyMs / 1000).toFixed(1); return (
{data.model} | {totalTokens} tokens | {latencySec}s
); } // ── Message helpers ───────────────────────────────────────────────── /** Extract plain text from a message's parts. */ function getMessageText(message: ChatMessage): string { return message.parts .filter((part) => part.type === "text") .map((part) => part.text) .join(""); } /** * Resumable Streaming Chat Client * * Demonstrates automatic resumable streaming with useAgentChat. * When you disconnect and reconnect during streaming: * 1. useAgentChat automatically detects the active stream * 2. Sends ACK to server * 3. Receives all buffered chunks and continues streaming * * Try it: Start a long response, refresh the page, and watch it resume! */ function Chat() { const [connectionStatus, setConnectionStatus] = useState("connecting"); const [input, setInput] = useState(""); const messagesEndRef = useRef(null); // Transient data parts are not added to message.parts, they only // fire the onData callback. We store the latest thinking part in // local state so we can render it while streaming. const [thinkingData, setThinkingData] = useState(null); const handleOpen = useCallback(() => setConnectionStatus("connected"), []); const handleClose = useCallback( () => setConnectionStatus("disconnected"), [] ); const handleError = useCallback( (error: Event) => console.error("WebSocket error:", error), [] ); const agent = useAgent({ agent: "ResumableStreamingChat", name: "demo", onOpen: handleOpen, onClose: handleClose, onError: handleError }); const { messages, sendMessage, clearHistory, status } = useAgentChat< unknown, ChatMessage >({ agent, onData(part) { // Capture transient thinking parts from the onData callback. // These are ephemeral — not persisted and not in message.parts. if (part.type === "data-thinking") { // part.data is typed as ThinkingData here — no cast needed setThinkingData(part.data); } } }); const isStreaming = status === "streaming"; const isConnected = connectionStatus === "connected"; // Clear transient thinking state when streaming ends useEffect(() => { if (!isStreaming) { setThinkingData(null); } }, [isStreaming]); useEffect(() => { messagesEndRef.current?.scrollIntoView({ behavior: "smooth" }); }, [messages]); const send = useCallback(async () => { const text = input.trim(); if (!text || isStreaming) return; setInput(""); try { await sendMessage({ role: "user", parts: [{ type: "text", text }] }); } catch (error) { console.error("Failed to send message:", error); } }, [input, isStreaming, sendMessage]); return (
{/* Header */}

Resumable Chat

Auto-resume
{/* Messages */}
{messages.length === 0 && ( } title="Send a message to start chatting" description="Try refreshing mid-response — the stream picks up where it left off." /> )} {messages.map((message, index) => { const isUser = message.role === "user"; const isLastAssistant = message.role === "assistant" && index === messages.length - 1; const text = getMessageText(message); if (isUser) { return (
{text}
); } // Transient parts (like data-thinking) are not in message.parts, // they're captured via onData and stored in local state instead. const sourcesPart = message.parts.find( (p) => p.type === "data-sources" ); const usagePart = message.parts.find( (p) => p.type === "data-usage" ); return (
{sourcesPart && ( )} {/* Transient thinking indicator that is captured via onData and only visible on the last assistant message while streaming */} {thinkingData && isLastAssistant && isStreaming && ( )} {/* Message text */}
{text} {isLastAssistant && isStreaming && ( )}
{usagePart && }
); })}
{/* Input */}
{ e.preventDefault(); send(); }} className="max-w-3xl mx-auto px-5 py-4" >
{ if (e.key === "Enter" && !e.shiftKey) { e.preventDefault(); send(); } }} placeholder="Type a message..." disabled={!isConnected || isStreaming} rows={2} className="flex-1 !ring-0 focus:!ring-0 !shadow-none !bg-transparent !outline-none" />
); } export default function App() { return ( Loading...
} > ); }