branch:
StreamingDemo.tsx
9669 bytesRaw
import { useAgent } from "agents/react";
import { useState } from "react";
import { Button, Input, Surface, Text } from "@cloudflare/kumo";
import { DemoWrapper } from "../../layout";
import {
LogPanel,
ConnectionStatus,
CodeExplanation,
HighlightedCode,
HighlightedJson,
type CodeSection
} from "../../components";
import { useLogs, useUserId, useToast } from "../../hooks";
import type { StreamingAgent } from "./streaming-agent";
const codeSections: CodeSection[] = [
{
title: "Define a streaming method",
description:
"Add streaming: true to the @callable decorator. The method receives a StreamingResponse as its first argument — call stream.send() to push chunks and stream.end() to finish.",
code: `import { Agent, callable, type StreamingResponse } from "agents";
class StreamingAgent extends Agent<Env> {
@callable({ streaming: true })
async countdown(stream: StreamingResponse, from: number) {
for (let i = from; i >= 0; i--) {
stream.send({ count: i, label: i === 0 ? "Liftoff!" : \`\${i}...\` });
await new Promise(r => setTimeout(r, 500));
}
stream.end({ launched: true });
}
}`
},
{
title: "Consume the stream on the client",
description:
"Pass onChunk, onDone, and onError callbacks as the third argument to agent.call(). Chunks arrive as they are sent from the server.",
code: `await agent.call("countdown", [5], {
onChunk: (chunk) => {
// { count: 4, label: "4..." }
console.log(chunk);
},
onDone: (final) => {
// { launched: true }
console.log("Stream complete:", final);
},
onError: (error) => {
console.error("Stream error:", error);
},
});`
}
];
export function StreamingDemo() {
const userId = useUserId();
const { logs, addLog, clearLogs } = useLogs();
const { toast } = useToast();
const [chunks, setChunks] = useState<unknown[]>([]);
const [finalResult, setFinalResult] = useState<unknown>(null);
const [isStreaming, setIsStreaming] = useState(false);
const [count, setCount] = useState("10");
const [countdown, setCountdown] = useState("5");
const [errorAfter, setErrorAfter] = useState("3");
const agent = useAgent<StreamingAgent, {}>({
agent: "streaming-agent",
name: `streaming-demo-${userId}`,
onOpen: () => addLog("info", "connected"),
onClose: () => addLog("info", "disconnected"),
onError: () => addLog("error", "error", "Connection error")
});
const handleStream = async (method: string, args: unknown[]) => {
setChunks([]);
setFinalResult(null);
setIsStreaming(true);
addLog("out", "stream_start", { method, args });
try {
await (
agent.call as (
m: string,
a?: unknown[],
opts?: unknown
) => Promise<unknown>
)(method, args, {
onChunk: (chunk: unknown) => {
addLog("in", "chunk", chunk);
setChunks((prev) => [...prev, chunk]);
},
onDone: (final: unknown) => {
addLog("in", "stream_done", final);
setFinalResult(final);
setIsStreaming(false);
toast("Stream complete — " + chunks.length + " chunks", "success");
},
onError: (error: string) => {
addLog("error", "stream_error", error);
setIsStreaming(false);
toast("Stream error", "error");
}
});
} catch (e) {
addLog("error", "error", e instanceof Error ? e.message : String(e));
setIsStreaming(false);
toast(e instanceof Error ? e.message : String(e), "error");
}
};
return (
<DemoWrapper
title="Streaming RPC"
description={
<>
Add{" "}
<code className="text-xs bg-kumo-fill px-1 py-0.5 rounded">
streaming: true
</code>{" "}
to any{" "}
<code className="text-xs bg-kumo-fill px-1 py-0.5 rounded">
@callable
</code>{" "}
decorator to stream data chunk-by-chunk from server to client. The
method receives a{" "}
<code className="text-xs bg-kumo-fill px-1 py-0.5 rounded">
StreamingResponse
</code>{" "}
object — call{" "}
<code className="text-xs bg-kumo-fill px-1 py-0.5 rounded">
stream.send()
</code>{" "}
to push data and{" "}
<code className="text-xs bg-kumo-fill px-1 py-0.5 rounded">
stream.end()
</code>{" "}
to finish. Try the countdown to see streaming with delays.
</>
}
statusIndicator={
<ConnectionStatus
status={
agent.readyState === WebSocket.OPEN ? "connected" : "connecting"
}
/>
}
>
<div className="grid grid-cols-1 lg:grid-cols-2 gap-6">
{/* Controls */}
<div className="space-y-6">
{/* Stream Numbers */}
<Surface className="p-4 rounded-lg ring ring-kumo-line">
<div className="mb-4">
<Text variant="heading3">Stream Numbers</Text>
</div>
<p className="text-sm text-kumo-subtle mb-3">
Streams numbers from 1 to N synchronously
</p>
<div className="flex gap-2">
<Input
aria-label="Number count"
type="number"
value={count}
onChange={(e: React.ChangeEvent<HTMLInputElement>) =>
setCount(e.target.value)
}
className="w-20"
min={1}
max={100}
/>
<Button
variant="primary"
onClick={() => handleStream("streamNumbers", [Number(count)])}
disabled={isStreaming}
>
{isStreaming ? "Streaming..." : `Stream ${count} numbers`}
</Button>
</div>
</Surface>
{/* Countdown */}
<Surface className="p-4 rounded-lg ring ring-kumo-line">
<div className="mb-4">
<Text variant="heading3">Countdown</Text>
</div>
<p className="text-sm text-kumo-subtle mb-3">
Streams a countdown with 500ms delays between numbers
</p>
<div className="flex gap-2">
<Input
aria-label="Countdown start"
type="number"
value={countdown}
onChange={(e: React.ChangeEvent<HTMLInputElement>) =>
setCountdown(e.target.value)
}
className="w-20"
min={1}
max={20}
/>
<Button
variant="primary"
onClick={() => handleStream("countdown", [Number(countdown)])}
disabled={isStreaming}
>
{isStreaming ? "Streaming..." : `Countdown from ${countdown}`}
</Button>
</div>
</Surface>
{/* Stream with Error */}
<Surface className="p-4 rounded-lg ring ring-kumo-line">
<div className="mb-4">
<Text variant="heading3">Stream with Error</Text>
</div>
<p className="text-sm text-kumo-subtle mb-3">
Sends N chunks then errors (tests error handling mid-stream)
</p>
<div className="flex gap-2">
<Input
aria-label="Error after N items"
type="number"
value={errorAfter}
onChange={(e: React.ChangeEvent<HTMLInputElement>) =>
setErrorAfter(e.target.value)
}
className="w-20"
min={1}
max={10}
/>
<Button
variant="destructive"
onClick={() =>
handleStream("streamWithError", [Number(errorAfter)])
}
disabled={isStreaming}
>
Error after {errorAfter} chunks
</Button>
</div>
</Surface>
{/* Stream Output */}
<Surface className="p-4 rounded-lg ring ring-kumo-line">
<div className="mb-4">
<Text variant="heading3">
Stream Output
{isStreaming && (
<span className="ml-2 text-xs font-normal text-kumo-subtle animate-pulse">
receiving...
</span>
)}
</Text>
</div>
<div className="space-y-2">
<div>
<span className="text-xs text-kumo-subtle">
Chunks ({chunks.length})
</span>
{chunks.length === 0 ? (
<p className="text-xs text-kumo-inactive">No chunks yet</p>
) : (
<HighlightedCode
code={chunks.map((c) => JSON.stringify(c)).join("\n")}
lang="json"
/>
)}
</div>
{finalResult !== null && (
<div>
<span className="text-xs text-kumo-subtle">Final Result</span>
<HighlightedJson data={finalResult} />
</div>
)}
</div>
</Surface>
</div>
{/* Logs */}
<div className="space-y-6">
<LogPanel logs={logs} onClear={clearLogs} maxHeight="400px" />
</div>
</div>
<CodeExplanation sections={codeSections} />
</DemoWrapper>
);
}