import { Suspense, useCallback, useState, useEffect, useRef } from "react";
import { useAgent } from "agents/react";
import { useAgentChat } from "@cloudflare/ai-chat/react";
import { Button, Badge, InputArea, Empty } from "@cloudflare/kumo";
import {
ConnectionIndicator,
ModeToggle,
PoweredByAgents,
type ConnectionStatus
} from "@cloudflare/agents-ui";
import {
PaperPlaneRightIcon,
TrashIcon,
ArrowClockwiseIcon,
MagnifyingGlassIcon,
BrainIcon,
ChartBarIcon
} from "@phosphor-icons/react";
import type { UIMessage } from "ai";
// ── Typed data parts ──────────────────────────────────────────
type SourcesData = {
query: string;
status: "searching" | "found";
results: string[];
};
type ThinkingData = {
model: string;
startedAt: string;
};
type UsageData = {
model: string;
inputTokens: number;
outputTokens: number;
latencyMs: number;
};
/** Custom message type with typed data parts. */
type ChatMessage = UIMessage<
unknown,
{
sources: SourcesData;
thinking: ThinkingData;
usage: UsageData;
}
>;
// ── Data part renderers ─────────────────────────────────────────────
function SourcesPart({
data,
isStreaming
}: {
data: SourcesData;
isStreaming: boolean;
}) {
if (data.status === "searching") {
return (
Searching for “{data.query}”…
);
}
return (
Sources
{data.results.map((source) => (
-
{source}
))}
);
}
function ThinkingPart({ data }: { data: ThinkingData }) {
return (
Thinking with {data.model}…
);
}
function UsagePart({ data }: { data: UsageData }) {
const totalTokens = data.inputTokens + data.outputTokens;
const latencySec = (data.latencyMs / 1000).toFixed(1);
return (
{data.model}
|
{totalTokens} tokens
|
{latencySec}s
);
}
// ── Message helpers ─────────────────────────────────────────────────
/** Extract plain text from a message's parts. */
function getMessageText(message: ChatMessage): string {
return message.parts
.filter((part) => part.type === "text")
.map((part) => part.text)
.join("");
}
/**
* Resumable Streaming Chat Client
*
* Demonstrates automatic resumable streaming with useAgentChat.
* When you disconnect and reconnect during streaming:
* 1. useAgentChat automatically detects the active stream
* 2. Sends ACK to server
* 3. Receives all buffered chunks and continues streaming
*
* Try it: Start a long response, refresh the page, and watch it resume!
*/
function Chat() {
const [connectionStatus, setConnectionStatus] =
useState("connecting");
const [input, setInput] = useState("");
const messagesEndRef = useRef(null);
// Transient data parts are not added to message.parts, they only
// fire the onData callback. We store the latest thinking part in
// local state so we can render it while streaming.
const [thinkingData, setThinkingData] = useState(null);
const handleOpen = useCallback(() => setConnectionStatus("connected"), []);
const handleClose = useCallback(
() => setConnectionStatus("disconnected"),
[]
);
const handleError = useCallback(
(error: Event) => console.error("WebSocket error:", error),
[]
);
const agent = useAgent({
agent: "ResumableStreamingChat",
name: "demo",
onOpen: handleOpen,
onClose: handleClose,
onError: handleError
});
const { messages, sendMessage, clearHistory, status } = useAgentChat<
unknown,
ChatMessage
>({
agent,
onData(part) {
// Capture transient thinking parts from the onData callback.
// These are ephemeral — not persisted and not in message.parts.
if (part.type === "data-thinking") {
// part.data is typed as ThinkingData here — no cast needed
setThinkingData(part.data);
}
}
});
const isStreaming = status === "streaming";
const isConnected = connectionStatus === "connected";
// Clear transient thinking state when streaming ends
useEffect(() => {
if (!isStreaming) {
setThinkingData(null);
}
}, [isStreaming]);
useEffect(() => {
messagesEndRef.current?.scrollIntoView({ behavior: "smooth" });
}, [messages]);
const send = useCallback(async () => {
const text = input.trim();
if (!text || isStreaming) return;
setInput("");
try {
await sendMessage({
role: "user",
parts: [{ type: "text", text }]
});
} catch (error) {
console.error("Failed to send message:", error);
}
}, [input, isStreaming, sendMessage]);
return (
{/* Header */}
{/* Messages */}
{messages.length === 0 && (
}
title="Send a message to start chatting"
description="Try refreshing mid-response — the stream picks up where it left off."
/>
)}
{messages.map((message, index) => {
const isUser = message.role === "user";
const isLastAssistant =
message.role === "assistant" && index === messages.length - 1;
const text = getMessageText(message);
if (isUser) {
return (
);
}
// Transient parts (like data-thinking) are not in message.parts,
// they're captured via onData and stored in local state instead.
const sourcesPart = message.parts.find(
(p) => p.type === "data-sources"
);
const usagePart = message.parts.find(
(p) => p.type === "data-usage"
);
return (
{sourcesPart && (
)}
{/* Transient thinking indicator that is captured via onData and
only visible on the last assistant message while streaming */}
{thinkingData && isLastAssistant && isStreaming && (
)}
{/* Message text */}
{text}
{isLastAssistant && isStreaming && (
)}
{usagePart &&
}
);
})}
{/* Input */}
);
}
export default function App() {
return (
Loading...
}
>
);
}