/** * Shared message builder for reconstructing UIMessage parts from stream chunks. * * Used by both the server (_streamSSEReply) and client (onAgentMessage in react.tsx) * to avoid duplicating the chunk-type switch/case logic. The server handles additional * chunk types (tool-input-start, tool-input-delta, etc.) on top of this shared base. * * Operates on a mutable parts array for performance (avoids allocating new arrays * on every chunk during streaming). */ import type { UIMessage } from "ai"; /** The parts array type from UIMessage */ export type MessageParts = UIMessage["parts"]; /** A single part from the UIMessage parts array */ export type MessagePart = MessageParts[number]; /** * Parsed chunk data from an AI SDK stream event. * This is the JSON-parsed body of a CF_AGENT_USE_CHAT_RESPONSE message, * or the `data:` payload of an SSE line. */ export type StreamChunkData = { type: string; id?: string; delta?: string; text?: string; mediaType?: string; url?: string; sourceId?: string; title?: string; filename?: string; toolCallId?: string; toolName?: string; input?: unknown; inputTextDelta?: string; output?: unknown; state?: string; errorText?: string; /** When true, the output is preliminary (may be updated by a later chunk) */ preliminary?: boolean; /** Approval ID for tools with needsApproval */ approvalId?: string; providerMetadata?: Record; /** Whether the tool was executed by the provider (e.g. Gemini code execution) */ providerExecuted?: boolean; /** Payload for data-* parts (developer-defined typed JSON) */ data?: unknown; /** When true, data parts are ephemeral and not persisted to message.parts */ transient?: boolean; [key: string]: unknown; }; /** * Applies a stream chunk to a mutable parts array, building up the message * incrementally. Returns true if the chunk was handled, false if it was * an unrecognized type (caller may handle it with additional logic). * * Handles all common chunk types that both server and client need: * - text-start / text-delta / text-end * - reasoning-start / reasoning-delta / reasoning-end * - file * - source-url / source-document * - tool-input-start / tool-input-delta / tool-input-available / tool-input-error * - tool-output-available / tool-output-error * - step-start (aliased from start-step) * - data-* (developer-defined typed JSON blobs) * * @param parts - The mutable parts array to update * @param chunk - The parsed stream chunk data * @returns true if handled, false if the chunk type is not recognized */ export function applyChunkToParts( parts: MessagePart[], chunk: StreamChunkData ): boolean { switch (chunk.type) { case "text-start": { parts.push({ type: "text", text: "", state: "streaming" } as MessagePart); return true; } case "text-delta": { const lastTextPart = findLastPartByType(parts, "text"); if (lastTextPart && lastTextPart.type === "text") { (lastTextPart as { text: string }).text += chunk.delta ?? ""; } else { // No text-start received — create a new text part (stream resumption fallback) parts.push({ type: "text", text: chunk.delta ?? "", state: "streaming" } as MessagePart); } return true; } case "text-end": { const lastTextPart = findLastPartByType(parts, "text"); if (lastTextPart && "state" in lastTextPart) { (lastTextPart as { state: string }).state = "done"; } return true; } case "reasoning-start": { parts.push({ type: "reasoning", text: "", state: "streaming" } as MessagePart); return true; } case "reasoning-delta": { const lastReasoningPart = findLastPartByType(parts, "reasoning"); if (lastReasoningPart && lastReasoningPart.type === "reasoning") { (lastReasoningPart as { text: string }).text += chunk.delta ?? ""; } else { // No reasoning-start received — create a new reasoning part (stream resumption fallback) parts.push({ type: "reasoning", text: chunk.delta ?? "", state: "streaming" } as MessagePart); } return true; } case "reasoning-end": { const lastReasoningPart = findLastPartByType(parts, "reasoning"); if (lastReasoningPart && "state" in lastReasoningPart) { (lastReasoningPart as { state: string }).state = "done"; } return true; } case "file": { parts.push({ type: "file", mediaType: chunk.mediaType, url: chunk.url } as MessagePart); return true; } case "source-url": { parts.push({ type: "source-url", sourceId: chunk.sourceId, url: chunk.url, title: chunk.title, providerMetadata: chunk.providerMetadata } as MessagePart); return true; } case "source-document": { parts.push({ type: "source-document", sourceId: chunk.sourceId, mediaType: chunk.mediaType, title: chunk.title, filename: chunk.filename, providerMetadata: chunk.providerMetadata } as MessagePart); return true; } case "tool-input-start": { // Create a tool part in input-streaming state with no input yet. // Cross-tab clients see the tool appear immediately with "streaming" indicator. parts.push({ type: `tool-${chunk.toolName}`, toolCallId: chunk.toolCallId, toolName: chunk.toolName, state: "input-streaming", input: undefined, ...(chunk.providerExecuted != null ? { providerExecuted: chunk.providerExecuted } : {}), ...(chunk.providerMetadata != null ? { callProviderMetadata: chunk.providerMetadata } : {}), ...(chunk.title != null ? { title: chunk.title } : {}) } as MessagePart); return true; } case "tool-input-delta": { // Update the existing tool part with partial input as it streams in. const toolPart = findToolPartByCallId(parts, chunk.toolCallId); if (toolPart) { (toolPart as Record).input = chunk.input; } return true; } case "tool-input-available": { // Finalize the tool input. If tool-input-start was received, update // the existing part; otherwise create a new one (for non-streaming tools). const existing = findToolPartByCallId(parts, chunk.toolCallId); if (existing) { const p = existing as Record; p.state = "input-available"; p.input = chunk.input; if (chunk.providerExecuted != null) { p.providerExecuted = chunk.providerExecuted; } if (chunk.providerMetadata != null) { p.callProviderMetadata = chunk.providerMetadata; } if (chunk.title != null) { p.title = chunk.title; } } else { parts.push({ type: `tool-${chunk.toolName}`, toolCallId: chunk.toolCallId, toolName: chunk.toolName, state: "input-available", input: chunk.input, ...(chunk.providerExecuted != null ? { providerExecuted: chunk.providerExecuted } : {}), ...(chunk.providerMetadata != null ? { callProviderMetadata: chunk.providerMetadata } : {}), ...(chunk.title != null ? { title: chunk.title } : {}) } as MessagePart); } return true; } case "tool-input-error": { // Tool input parsing failed. Update existing part or create one. const existing = findToolPartByCallId(parts, chunk.toolCallId); if (existing) { const p = existing as Record; p.state = "output-error"; p.errorText = chunk.errorText; p.input = chunk.input; if (chunk.providerExecuted != null) { p.providerExecuted = chunk.providerExecuted; } if (chunk.providerMetadata != null) { p.callProviderMetadata = chunk.providerMetadata; } } else { parts.push({ type: `tool-${chunk.toolName}`, toolCallId: chunk.toolCallId, toolName: chunk.toolName, state: "output-error", input: chunk.input, errorText: chunk.errorText, ...(chunk.providerExecuted != null ? { providerExecuted: chunk.providerExecuted } : {}), ...(chunk.providerMetadata != null ? { callProviderMetadata: chunk.providerMetadata } : {}) } as MessagePart); } return true; } case "tool-approval-request": { // Tool requires user approval before executing. // Transition the tool part to approval-requested state with the approval ID. const toolPart = findToolPartByCallId(parts, chunk.toolCallId); if (toolPart) { const p = toolPart as Record; p.state = "approval-requested"; p.approval = { id: chunk.approvalId }; } return true; } case "tool-output-denied": { // User rejected the tool approval request. const toolPart = findToolPartByCallId(parts, chunk.toolCallId); if (toolPart) { const p = toolPart as Record; p.state = "output-denied"; } return true; } case "tool-output-available": { // Update existing tool part with output. // Supports `preliminary: true` for streaming tool results — // the output may be updated by a subsequent chunk. const toolPart = findToolPartByCallId(parts, chunk.toolCallId); if (toolPart) { const p = toolPart as Record; p.state = "output-available"; p.output = chunk.output; if (chunk.preliminary !== undefined) { p.preliminary = chunk.preliminary; } } return true; } case "tool-output-error": { // Tool execution failed. Update the existing tool part. const toolPart = findToolPartByCallId(parts, chunk.toolCallId); if (toolPart) { const p = toolPart as Record; p.state = "output-error"; p.errorText = chunk.errorText; } return true; } // Both "step-start" (client convention) and "start-step" (server convention) case "step-start": case "start-step": { parts.push({ type: "step-start" } as MessagePart); return true; } default: { // https://ai-sdk.dev/docs/ai-sdk-ui/streaming-data if (chunk.type.startsWith("data-")) { // Transient parts are ephemeral — the AI SDK client fires an onData // callback instead of adding them to message.parts. On the server we // still broadcast them (so connected clients see them in real time) // but skip persisting them into the stored message parts. if (chunk.transient) { return true; } // Reconciliation: if a part with the same type AND id already exists, // update its data in-place instead of appending a duplicate. if (chunk.id != null) { const existing = findDataPartByTypeAndId(parts, chunk.type, chunk.id); if (existing) { (existing as Record).data = chunk.data; return true; } } // Append new data parts to the array directly. // Note: `chunk.data` should always be provided — if omitted, the // persisted part will have `data: undefined` which JSON.stringify // drops, so the part will have no `data` field on reload. // The cast is needed because UIMessage["parts"] doesn't include // data-* types in its union because they're an open extension point. parts.push({ type: chunk.type, ...(chunk.id != null && { id: chunk.id }), data: chunk.data } as MessagePart); return true; } return false; } } } /** * Finds the last part in the array matching the given type. * Searches from the end for efficiency (the part we want is usually recent). */ function findLastPartByType( parts: MessagePart[], type: string ): MessagePart | undefined { for (let i = parts.length - 1; i >= 0; i--) { if (parts[i].type === type) { return parts[i]; } } return undefined; } /** * Finds a tool part by its toolCallId. * Searches from the end since the tool part is usually recent. */ function findToolPartByCallId( parts: MessagePart[], toolCallId: string | undefined ): MessagePart | undefined { if (!toolCallId) return undefined; for (let i = parts.length - 1; i >= 0; i--) { const p = parts[i]; if ("toolCallId" in p && p.toolCallId === toolCallId) { return p; } } return undefined; } /** * Finds a data part by its type and id for reconciliation. * Data parts use type+id as a composite key so when the same combination * is seen again, the existing part's data is updated in-place. */ function findDataPartByTypeAndId( parts: MessagePart[], type: string, id: string ): MessagePart | undefined { for (let i = parts.length - 1; i >= 0; i--) { const p = parts[i]; if (p.type === type && "id" in p && (p as { id: string }).id === id) { return p; } } return undefined; }