import { useChat, type UseChatOptions } from "@ai-sdk/react"; import { getToolName, isToolUIPart } from "ai"; import type { ChatInit, JSONSchema7, Tool, UIMessage as Message, UIMessage } from "ai"; import { nanoid } from "nanoid"; import { use, useCallback, useEffect, useMemo, useRef, useState } from "react"; import type { OutgoingMessage } from "./types"; import { MessageType } from "./types"; import { applyChunkToParts, type MessageParts } from "./message-builder"; import { WebSocketChatTransport } from "./ws-chat-transport"; import type { useAgent } from "agents/react"; /** * One-shot deprecation warnings (warns once per key per session). */ const _deprecationWarnings = new Set(); function warnDeprecated(id: string, message: string) { if (!_deprecationWarnings.has(id)) { _deprecationWarnings.add(id); console.warn(`[@cloudflare/ai-chat] Deprecated: ${message}`); } } // ── DEPRECATED TYPES AND FUNCTIONS ────────────────────────────────── // Everything in this section is deprecated and will be removed in the // next major version. Use server-side tools with tool() from "ai" and // the onToolCall callback in useAgentChat instead. /** * JSON Schema type for tool parameters. * Re-exported from the AI SDK for convenience. * @deprecated Import JSONSchema7 directly from "ai" instead. Will be removed in the next major version. */ export type JSONSchemaType = JSONSchema7; /** * Definition for a tool that can be executed on the client. * Tools with an `execute` function are automatically registered with the server. * * **For most apps**, define tools on the server with `tool()` from `"ai"` — * you get full Zod type safety and simpler code. Use `onToolCall` in * `useAgentChat` for tools that need browser-side execution. * * **For SDKs and platforms** where the tool surface is determined dynamically * by the embedding application at runtime, this type lets the client register * tools the server does not know about at deploy time. * * Note: Uses `parameters` (JSONSchema7) because client tools must be * serializable for the wire format. Zod schemas cannot be serialized. */ export type AITool = { /** Human-readable description of what the tool does */ description?: Tool["description"]; /** JSON Schema defining the tool's input parameters */ parameters?: JSONSchema7; /** * @deprecated Use `parameters` instead. Will be removed in a future version. */ inputSchema?: JSONSchema7; /** * Function to execute the tool on the client. * If provided, the tool schema is automatically sent to the server. */ execute?: (input: Input) => Output | Promise; }; /** * Schema for a client tool sent to the server. * This is the wire format — what gets sent in the request body. * Must match the server-side ClientToolSchema type. * * Most apps do not need this directly — it is used internally when the * `tools` option on `useAgentChat` contains tools with `execute` functions. */ export type ClientToolSchema = { /** Unique name for the tool */ name: string; /** Human-readable description of what the tool does */ description?: Tool["description"]; /** JSON Schema defining the tool's input parameters */ parameters?: JSONSchema7; }; /** * Extracts tool schemas from tools that have client-side execute functions. * These schemas are automatically sent to the server with each request. * * Called internally by `useAgentChat` when `tools` are provided. * Most apps do not need to call this directly. * * @param tools - Record of tool name to tool definition * @returns Array of tool schemas to send to server, or undefined if none */ export function extractClientToolSchemas( tools?: Record> ): ClientToolSchema[] | undefined { if (!tools) return undefined; const schemas: ClientToolSchema[] = Object.entries(tools) .filter(([_, tool]) => tool.execute) // Only tools with client-side execute .map(([name, tool]) => { if (tool.inputSchema && !tool.parameters) { console.warn( `[useAgentChat] Tool "${name}" uses deprecated 'inputSchema'. Please migrate to 'parameters'.` ); } return { name, description: tool.description, parameters: tool.parameters ?? tool.inputSchema }; }); return schemas.length > 0 ? schemas : undefined; } // ── END DEPRECATED TYPES AND FUNCTIONS ───────────────────────────── type GetInitialMessagesOptions = { agent: string; name: string; url: string; }; // v5 useChat parameters type UseChatParams = ChatInit & UseChatOptions; /** * Options for preparing the send messages request. * Used by prepareSendMessagesRequest callback. */ export type PrepareSendMessagesRequestOptions< ChatMessage extends UIMessage = UIMessage > = { /** The chat ID */ id: string; /** Messages to send */ messages: ChatMessage[]; /** What triggered this request */ trigger: "submit-message" | "regenerate-message"; /** ID of the message being sent (if applicable) */ messageId?: string; /** Request metadata */ requestMetadata?: unknown; /** Current body (if any) */ body?: Record; /** Current credentials (if any) */ credentials?: RequestCredentials; /** Current headers (if any) */ headers?: HeadersInit; /** API endpoint */ api?: string; }; /** * Return type for prepareSendMessagesRequest callback. * Allows customizing headers, body, and credentials for each request. * All fields are optional; only specify what you need to customize. */ export type PrepareSendMessagesRequestResult = { /** Custom headers to send with the request */ headers?: HeadersInit; /** Custom body data to merge with the request */ body?: Record; /** Custom credentials option */ credentials?: RequestCredentials; /** Custom API endpoint */ api?: string; }; /** * Options for addToolOutput function */ type AddToolOutputOptions = { /** The ID of the tool call to provide output for */ toolCallId: string; /** The name of the tool (optional, for type safety) */ toolName?: string; /** The output to provide */ output?: unknown; /** Override the tool part state (e.g. "output-error" for custom denial) */ state?: "output-available" | "output-error"; /** Error message when state is "output-error" */ errorText?: string; }; /** * Callback for handling client-side tool execution. * Called when a tool without server-side execute is invoked. */ export type OnToolCallCallback = (options: { /** The tool call that needs to be handled */ toolCall: { toolCallId: string; toolName: string; input: unknown; }; /** Function to provide the tool output (or signal an error/denial) */ addToolOutput: (options: Omit) => void; }) => void | Promise; /** * Options for the useAgentChat hook */ type UseAgentChatOptions< State, ChatMessage extends UIMessage = UIMessage > = Omit, "fetch" | "onToolCall"> & { /** Agent connection from useAgent */ agent: ReturnType>; getInitialMessages?: | undefined | null | ((options: GetInitialMessagesOptions) => Promise); /** Request credentials */ credentials?: RequestCredentials; /** Request headers */ headers?: HeadersInit; /** * Callback for handling client-side tool execution. * Called when a tool without server-side `execute` is invoked by the LLM. * * Use this for: * - Tools that need browser APIs (geolocation, camera, etc.) * - Tools that need user interaction before providing a result * - Tools requiring approval before execution * * @example * ```typescript * onToolCall: async ({ toolCall, addToolOutput }) => { * if (toolCall.toolName === 'getLocation') { * const position = await navigator.geolocation.getCurrentPosition(); * addToolOutput({ * toolCallId: toolCall.toolCallId, * output: { lat: position.coords.latitude, lng: position.coords.longitude } * }); * } * } * ``` */ onToolCall?: OnToolCallCallback; /** * @deprecated Use `onToolCall` callback instead for automatic tool execution. * @description Whether to automatically resolve tool calls that do not require human interaction. * @experimental */ experimental_automaticToolResolution?: boolean; /** * Tools that can be executed on the client. Tool schemas are automatically * sent to the server and tool calls are routed back for client execution. * * **For most apps**, define tools on the server with `tool()` from `"ai"` * and handle client-side execution via `onToolCall`. This gives you full * Zod type safety and keeps tool definitions in one place. * * **For SDKs and platforms** where tools are defined dynamically by the * embedding application at runtime, this option lets the client register * tools the server does not know about at deploy time. */ tools?: Record>; /** * @deprecated Use `needsApproval` on server-side tools instead. * @description Manual override for tools requiring confirmation. * If not provided, will auto-detect from tools object (tools without execute require confirmation). */ toolsRequiringConfirmation?: string[]; /** * When true (default), the server automatically continues the conversation * after receiving client-side tool results or approvals, similar to how * server-executed tools work with maxSteps in streamText. The continuation * is merged into the same assistant message. * * When false, the client must call sendMessage() after tool results * to continue the conversation, which creates a new assistant message. * * @default true */ autoContinueAfterToolResult?: boolean; /** * @deprecated Use `sendAutomaticallyWhen` from AI SDK instead. * * When true (default), automatically sends the next message only after * all pending confirmation-required tool calls have been resolved. * When false, sends immediately after each tool result. * * Only applies when `autoContinueAfterToolResult` is false. * * @default true */ autoSendAfterAllConfirmationsResolved?: boolean; /** * Set to false to disable automatic stream resumption. * @default true */ resume?: boolean; /** * Custom data to include in every chat request body. * Accepts a static object or a function that returns one (for dynamic values). * These fields are available in `onChatMessage` via `options.body`. * * @example * ```typescript * // Static * body: { timezone: "America/New_York", userId: "abc" } * * // Dynamic (called on each send) * body: () => ({ token: getAuthToken(), timestamp: Date.now() }) * ``` */ body?: | Record | (() => Record | Promise>); /** * Callback to customize the request before sending messages. * For most cases, use the `body` option instead. * Use this for advanced scenarios that need access to the messages or trigger type. * * Note: Client tool schemas are automatically sent when tools have `execute` functions. * This callback can add additional data alongside the auto-extracted schemas. */ prepareSendMessagesRequest?: ( options: PrepareSendMessagesRequestOptions ) => | PrepareSendMessagesRequestResult | Promise; }; /** * Module-level cache for initial message fetches. Intentionally shared across * all useAgentChat instances to deduplicate requests during React Strict Mode * double-renders and re-renders. Cache keys include the agent URL, agent type, * and thread name to prevent cross-agent collisions. */ const requestCache = new Map>(); /** * React hook for building AI chat interfaces using an Agent * @param options Chat options including the agent connection * @returns Chat interface controls and state with added clearHistory method */ /** * Automatically detects which tools require confirmation based on their configuration. * Tools require confirmation if they have no execute function AND are not server-executed. * @param tools - Record of tool name to tool definition * @returns Array of tool names that require confirmation * * @deprecated Use `needsApproval` on server-side tools instead. */ export function detectToolsRequiringConfirmation( tools?: Record> ): string[] { warnDeprecated( "detectToolsRequiringConfirmation", "detectToolsRequiringConfirmation() is deprecated. Use needsApproval on server-side tools instead. Will be removed in the next major version." ); if (!tools) return []; return Object.entries(tools) .filter(([_name, tool]) => !tool.execute) .map(([name]) => name); } export function useAgentChat< State = unknown, ChatMessage extends UIMessage = UIMessage >( options: UseAgentChatOptions ): Omit>, "addToolOutput"> & { clearHistory: () => void; /** * Provide output for a tool call. Use this for tools that require user interaction * or client-side execution. */ addToolOutput: (opts: AddToolOutputOptions) => void; } { const { agent, getInitialMessages, messages: optionsInitialMessages, onToolCall, onData, experimental_automaticToolResolution, tools, toolsRequiringConfirmation: manualToolsRequiringConfirmation, autoContinueAfterToolResult = true, // Server auto-continues after tool results/approvals autoSendAfterAllConfirmationsResolved = true, // Legacy option for client-side batching resume = true, // Enable stream resumption by default body: bodyOption, prepareSendMessagesRequest, ...rest } = options; // Emit deprecation warnings for deprecated options (once per session) if (manualToolsRequiringConfirmation) { warnDeprecated( "useAgentChat.toolsRequiringConfirmation", "The 'toolsRequiringConfirmation' option is deprecated. Use needsApproval on server-side tools instead. Will be removed in the next major version." ); } if (experimental_automaticToolResolution) { warnDeprecated( "useAgentChat.experimental_automaticToolResolution", "The 'experimental_automaticToolResolution' option is deprecated. Use the onToolCall callback instead. Will be removed in the next major version." ); } if (options.autoSendAfterAllConfirmationsResolved !== undefined) { warnDeprecated( "useAgentChat.autoSendAfterAllConfirmationsResolved", "The 'autoSendAfterAllConfirmationsResolved' option is deprecated. Use sendAutomaticallyWhen from AI SDK instead. Will be removed in the next major version." ); } // ── DEPRECATED: client-side tool confirmation ────────────────────── // This block will be removed when toolsRequiringConfirmation is removed. // Only call the deprecated function when deprecated options are actually used. const toolsRequiringConfirmation = useMemo(() => { if (manualToolsRequiringConfirmation) { return manualToolsRequiringConfirmation; } // Inline the logic from detectToolsRequiringConfirmation to avoid // emitting a deprecation warning when tools are provided via the // non-deprecated `tools` option. if (!tools) return []; return Object.entries(tools) .filter(([_name, tool]) => !tool.execute) .map(([name]) => name); }, [manualToolsRequiringConfirmation, tools]); // Keep refs to always point to the latest callbacks const onToolCallRef = useRef(onToolCall); onToolCallRef.current = onToolCall; const onDataRef = useRef(onData); onDataRef.current = onData; const agentUrl = new URL( `${ // @ts-expect-error we're using a protected _url property that includes query params ((agent._url as string | null) || agent._pkurl) ?.replace("ws://", "http://") .replace("wss://", "https://") }` ); agentUrl.searchParams.delete("_pk"); const agentUrlString = agentUrl.toString(); // we need to include agent.name in cache key to prevent collisions during agent switching. // The URL may be stale between updateProperties() and reconnect(), but agent.name // is updated synchronously, so each thread gets its own cache entry const initialMessagesCacheKey = `${agentUrlString}|${agent.agent ?? ""}|${agent.name ?? ""}`; // Keep a ref to always point to the latest agent instance. // Updated synchronously during render (not in useEffect) so the // transport's agent ref is always current. The transport is a // singleton whose .agent is reassigned every render — if we used // useEffect the assignment would lag behind, causing the transport // to send through a stale/closed socket (issue #929). const agentRef = useRef(agent); agentRef.current = agent; async function defaultGetInitialMessagesFetch({ url }: GetInitialMessagesOptions) { const getMessagesUrl = new URL(url); getMessagesUrl.pathname += "/get-messages"; const response = await fetch(getMessagesUrl.toString(), { credentials: options.credentials, headers: options.headers }); if (!response.ok) { console.warn( `Failed to fetch initial messages: ${response.status} ${response.statusText}` ); return []; } const text = await response.text(); if (!text.trim()) { return []; } try { return JSON.parse(text) as ChatMessage[]; } catch (error) { console.warn("Failed to parse initial messages JSON:", error); return []; } } const getInitialMessagesFetch = getInitialMessages || defaultGetInitialMessagesFetch; function doGetInitialMessages( getInitialMessagesOptions: GetInitialMessagesOptions, cacheKey: string ) { if (requestCache.has(cacheKey)) { return requestCache.get(cacheKey)! as Promise; } const promise = getInitialMessagesFetch(getInitialMessagesOptions); requestCache.set(cacheKey, promise); return promise; } const initialMessagesPromise = getInitialMessages === null ? null : doGetInitialMessages( { agent: agent.agent, name: agent.name, url: agentUrlString }, initialMessagesCacheKey ); const initialMessages = initialMessagesPromise ? use(initialMessagesPromise) : (optionsInitialMessages ?? []); useEffect(() => { if (!initialMessagesPromise) { return; } requestCache.set(initialMessagesCacheKey, initialMessagesPromise!); return () => { if ( requestCache.get(initialMessagesCacheKey) === initialMessagesPromise ) { requestCache.delete(initialMessagesCacheKey); } }; }, [initialMessagesCacheKey, initialMessagesPromise]); // Use synchronous ref updates to avoid race conditions between effect runs. // This ensures the ref always has the latest value before any effect reads it. const toolsRef = useRef(tools); toolsRef.current = tools; const prepareSendMessagesRequestRef = useRef(prepareSendMessagesRequest); prepareSendMessagesRequestRef.current = prepareSendMessagesRequest; const bodyOptionRef = useRef(bodyOption); bodyOptionRef.current = bodyOption; /** * Tracks request IDs initiated by this tab via the transport. * Used by onAgentMessage to skip messages already handled by the transport. */ const localRequestIdsRef = useRef>(new Set()); // WebSocket-based transport that speaks the CF_AGENT protocol natively. // Replaces the old aiFetch + DefaultChatTransport indirection. // // The transport is a true singleton (created once, never recreated) so // that the resolver set by reconnectToStream and the handleStreamResuming // call from onAgentMessage always operate on the SAME instance — even // when _pk changes (async queries, socket recreation) or React Strict // Mode double-mounts. The agent reference is updated every render so // sends always go through the latest socket. const customTransportRef = useRef | null>( null ); if (customTransportRef.current === null) { customTransportRef.current = new WebSocketChatTransport({ agent: agentRef.current, activeRequestIds: localRequestIdsRef.current, prepareBody: async ({ messages: msgs, trigger, messageId }) => { // Start with the top-level body option (static or dynamic) let extraBody: Record = {}; const currentBody = bodyOptionRef.current; if (currentBody) { const resolved = typeof currentBody === "function" ? await currentBody() : currentBody; extraBody = { ...resolved }; } // Extract schemas from deprecated client tools (if any) // Only extract client tool schemas when deprecated tools option is used if (toolsRef.current) { const clientToolSchemas = extractClientToolSchemas(toolsRef.current); if (clientToolSchemas) { extraBody.clientTools = clientToolSchemas; } } // Apply user's prepareSendMessagesRequest callback (overrides body option) if (prepareSendMessagesRequestRef.current) { const userResult = await prepareSendMessagesRequestRef.current({ id: agentRef.current._pk, messages: msgs, trigger, messageId }); if (userResult.body) { Object.assign(extraBody, userResult.body); } } return extraBody; } }); } // Always point the transport at the latest socket so sends/listeners // go through the current connection after _pk changes. customTransportRef.current.agent = agentRef.current; const customTransport = customTransportRef.current; // Use a stable Chat ID that doesn't change when _pk changes. // The AI SDK recreates the Chat when `id` changes, which would // abandon any in-flight makeRequest (including resume) and the // resume effect wouldn't re-fire (deps are [resume, chatRef]). // Using the initial messages cache key (URL + agent + name) keeps // the Chat stable across socket recreations. const useChatHelpers = useChat({ ...rest, onData, messages: initialMessages, transport: customTransport, id: initialMessagesCacheKey, // Pass resume so useChat calls transport.reconnectToStream(). // This lets the AI SDK track status ("streaming") during resume. resume }); // Destructure stable method references from useChatHelpers. // These are individually memoized by the AI SDK (via useCallback), so they're // safe to use in dependency arrays without causing re-renders. Using them // directly instead of `useChatHelpers.method` avoids the exhaustive-deps // warning about the unstable `useChatHelpers` object. const { messages: chatMessages, setMessages, addToolResult, addToolApprovalResponse, sendMessage } = useChatHelpers; const processedToolCalls = useRef(new Set()); const isResolvingToolsRef = useRef(false); // Counter to force the tool resolution effect to re-run after completing // a batch of tool calls. Without this, if new tool calls arrive while // isResolvingToolsRef is true (e.g. server auto-continuation), the effect // exits early and never retriggers because the ref reset doesn't cause // a re-render. const [toolResolutionTrigger, setToolResolutionTrigger] = useState(0); // Fix for issue #728: Track client-side tool results in local state // to ensure tool parts show output-available immediately after execution. const [clientToolResults, setClientToolResults] = useState< Map >(new Map()); // Ref to access current messages in callbacks without stale closures const messagesRef = useRef(chatMessages); messagesRef.current = chatMessages; // Calculate pending confirmations for the latest assistant message const lastMessage = chatMessages[chatMessages.length - 1]; const pendingConfirmations = (() => { if (!lastMessage || lastMessage.role !== "assistant") { return { messageId: undefined, toolCallIds: new Set() }; } const pendingIds = new Set(); for (const part of lastMessage.parts ?? []) { if ( isToolUIPart(part) && part.state === "input-available" && toolsRequiringConfirmation.includes(getToolName(part)) ) { pendingIds.add(part.toolCallId); } } return { messageId: lastMessage.id, toolCallIds: pendingIds }; })(); const pendingConfirmationsRef = useRef(pendingConfirmations); pendingConfirmationsRef.current = pendingConfirmations; // ── DEPRECATED: automatic tool resolution effect ──────────────────── // This entire useEffect is deprecated. Use onToolCall instead. useEffect(() => { if (!experimental_automaticToolResolution) { return; } // Prevent re-entry while async operations are in progress if (isResolvingToolsRef.current) { return; } const lastMsg = chatMessages[chatMessages.length - 1]; if (!lastMsg || lastMsg.role !== "assistant") { return; } const toolCalls = lastMsg.parts.filter( (part) => isToolUIPart(part) && part.state === "input-available" && !processedToolCalls.current.has(part.toolCallId) ); if (toolCalls.length > 0) { // Capture tools synchronously before async work const currentTools = toolsRef.current; const toolCallsToResolve = toolCalls.filter( (part) => isToolUIPart(part) && !toolsRequiringConfirmation.includes(getToolName(part)) && currentTools?.[getToolName(part)]?.execute ); if (toolCallsToResolve.length > 0) { isResolvingToolsRef.current = true; (async () => { try { const toolResults: Array<{ toolCallId: string; toolName: string; output: unknown; }> = []; for (const part of toolCallsToResolve) { if (isToolUIPart(part)) { let toolOutput: unknown = null; const toolName = getToolName(part); const tool = currentTools?.[toolName]; if (tool?.execute && part.input !== undefined) { try { toolOutput = await tool.execute(part.input); } catch (error) { toolOutput = `Error executing tool: ${error instanceof Error ? error.message : String(error)}`; } } processedToolCalls.current.add(part.toolCallId); toolResults.push({ toolCallId: part.toolCallId, toolName, output: toolOutput }); } } if (toolResults.length > 0) { // Send tool results to server first (server is source of truth) const clientToolSchemas = extractClientToolSchemas(currentTools); for (const result of toolResults) { agentRef.current.send( JSON.stringify({ type: MessageType.CF_AGENT_TOOL_RESULT, toolCallId: result.toolCallId, toolName: result.toolName, output: result.output, autoContinue: autoContinueAfterToolResult, clientTools: clientToolSchemas }) ); } // Also update local state via AI SDK for immediate UI feedback await Promise.all( toolResults.map((result) => addToolResult({ tool: result.toolName, toolCallId: result.toolCallId, output: result.output }) ) ); setClientToolResults((prev) => { const newMap = new Map(prev); for (const result of toolResults) { newMap.set(result.toolCallId, result.output); } return newMap; }); } // Note: We don't call sendMessage() here anymore. // The server will continue the conversation after applying tool results. } finally { isResolvingToolsRef.current = false; // Trigger a re-run so any tool calls that arrived while we were // busy (e.g. from server auto-continuation) get picked up. setToolResolutionTrigger((c) => c + 1); } })(); } } }, [ chatMessages, experimental_automaticToolResolution, addToolResult, toolsRequiringConfirmation, autoContinueAfterToolResult, toolResolutionTrigger ]); // Helper function to send tool output to server const sendToolOutputToServer = useCallback( ( toolCallId: string, toolName: string, output: unknown, state?: "output-available" | "output-error", errorText?: string ) => { agentRef.current.send( JSON.stringify({ type: MessageType.CF_AGENT_TOOL_RESULT, toolCallId, toolName, output, ...(state ? { state } : {}), ...(errorText !== undefined ? { errorText } : {}), // output-error is a deliberate client action — don't auto-continue. // This differs from addToolApprovalResponse (which auto-continues for // both approvals and rejections). To have the LLM respond to the error, // call sendMessage() after addToolOutput. autoContinue: state === "output-error" ? false : autoContinueAfterToolResult, clientTools: toolsRef.current ? extractClientToolSchemas(toolsRef.current) : undefined }) ); if (state !== "output-error") { setClientToolResults((prev) => new Map(prev).set(toolCallId, output)); } }, [autoContinueAfterToolResult] ); // Helper function to send tool approval to server const sendToolApprovalToServer = useCallback( (toolCallId: string, approved: boolean) => { agentRef.current.send( JSON.stringify({ type: MessageType.CF_AGENT_TOOL_APPROVAL, toolCallId, approved, autoContinue: autoContinueAfterToolResult }) ); }, [autoContinueAfterToolResult] ); // Effect for new onToolCall callback pattern (v6 style) // This fires when there are tool calls that need client-side handling useEffect(() => { const currentOnToolCall = onToolCallRef.current; if (!currentOnToolCall) { return; } const lastMsg = chatMessages[chatMessages.length - 1]; if (!lastMsg || lastMsg.role !== "assistant") { return; } // Find tool calls in input-available state that haven't been processed const pendingToolCalls = lastMsg.parts.filter( (part) => isToolUIPart(part) && part.state === "input-available" && !processedToolCalls.current.has(part.toolCallId) ); for (const part of pendingToolCalls) { if (isToolUIPart(part)) { const toolCallId = part.toolCallId; const toolName = getToolName(part); // Mark as processed to prevent re-triggering processedToolCalls.current.add(toolCallId); // Create addToolOutput function for this specific tool call const addToolOutput = (opts: AddToolOutputOptions) => { sendToolOutputToServer( opts.toolCallId, toolName, opts.output, opts.state, opts.errorText ); // Update local state via AI SDK addToolResult({ tool: toolName, toolCallId: opts.toolCallId, output: opts.state === "output-error" ? (opts.errorText ?? "Tool execution denied by user") : opts.output }); }; // Call the onToolCall callback // The callback is responsible for calling addToolOutput when ready currentOnToolCall({ toolCall: { toolCallId, toolName, input: part.input }, addToolOutput }); } } }, [chatMessages, sendToolOutputToServer, addToolResult]); /** * Contains the request ID, accumulated message parts, metadata, and a unique message ID. * Used for both resumed streams and real-time broadcasts from other tabs. * Metadata is captured from start/finish/message-metadata stream chunks * so that it's included when the partial message is flushed to React state. */ const activeStreamRef = useRef<{ id: string; messageId: string; continuation: boolean; parts: ChatMessage["parts"]; metadata?: Record; } | null>(null); /** * Flush the active stream's accumulated parts into React state. * Extracted as a helper so it can be called both during live streaming * (per-chunk) and after replay completes (once, at done). */ const flushActiveStreamToMessages = useCallback( (activeMsg: { id: string; messageId: string; continuation: boolean; parts: ChatMessage["parts"]; metadata?: Record; }) => { setMessages((prevMessages: ChatMessage[]) => { let existingIdx = prevMessages.findIndex( (m) => m.id === activeMsg.messageId ); if (existingIdx < 0 && activeMsg.continuation) { for (let i = prevMessages.length - 1; i >= 0; i--) { if (prevMessages[i].role === "assistant") { existingIdx = i; break; } } } const messageId = existingIdx >= 0 ? prevMessages[existingIdx].id : activeMsg.messageId; const partialMessage = { id: messageId, role: "assistant" as const, parts: [...activeMsg.parts], ...(activeMsg.metadata != null && { metadata: activeMsg.metadata }) } as unknown as ChatMessage; if (existingIdx >= 0) { const updated = [...prevMessages]; updated[existingIdx] = partialMessage; return updated; } return [...prevMessages, partialMessage]; }); }, [setMessages] ); useEffect(() => { /** * Unified message handler that parses JSON once and dispatches based on type. * Avoids duplicate parsing overhead from separate listeners. */ function onAgentMessage(event: MessageEvent) { if (typeof event.data !== "string") return; let data: OutgoingMessage; try { data = JSON.parse(event.data) as OutgoingMessage; } catch (_error) { return; } switch (data.type) { case MessageType.CF_AGENT_CHAT_CLEAR: setMessages([]); break; case MessageType.CF_AGENT_CHAT_MESSAGES: setMessages(data.messages); break; case MessageType.CF_AGENT_MESSAGE_UPDATED: // Server updated a message (e.g., applied tool result) // Update the specific message in local state setMessages((prevMessages: ChatMessage[]) => { const updatedMessage = data.message; // First try to find by message ID let idx = prevMessages.findIndex((m) => m.id === updatedMessage.id); // If not found by ID, try to find by toolCallId // This handles the case where client has AI SDK-generated IDs // but server has server-generated IDs if (idx < 0) { const updatedToolCallIds = new Set( updatedMessage.parts .filter( (p: ChatMessage["parts"][number]) => "toolCallId" in p && p.toolCallId ) .map( (p: ChatMessage["parts"][number]) => (p as { toolCallId: string }).toolCallId ) ); if (updatedToolCallIds.size > 0) { idx = prevMessages.findIndex((m) => m.parts.some( (p) => "toolCallId" in p && updatedToolCallIds.has( (p as { toolCallId: string }).toolCallId ) ) ); } } if (idx >= 0) { const updated = [...prevMessages]; // Preserve the client's message ID but update the content updated[idx] = { ...updatedMessage, id: prevMessages[idx].id }; return updated; } // Message not found — don't append. CF_AGENT_MESSAGE_UPDATED is // for updating existing messages (e.g. tool result/approval state // changes), not for adding new ones. If the message isn't in // client state yet, it will arrive via the transport stream // (same tab) or CF_AGENT_CHAT_MESSAGES (cross-tab). // Appending here causes temporary duplicates (#1094). return prevMessages; }); break; case MessageType.CF_AGENT_STREAM_RESUME_NONE: // Server confirmed no active stream — let the transport // resolve reconnectToStream immediately with null. customTransport.handleStreamResumeNone(); break; case MessageType.CF_AGENT_STREAM_RESUMING: if (!resume) return; // Let the transport handle it if reconnectToStream is waiting. // This is called synchronously — no addEventListener race. // The transport sends ACK, adds to activeRequestIds, and // creates the ReadableStream that feeds into useChat's pipeline // (which correctly sets status to "streaming"). if (customTransport.handleStreamResuming(data)) return; // Skip if the transport already handled this stream's resume // (server sends STREAM_RESUMING from both onConnect and the // RESUME_REQUEST handler — the second one must not trigger // a duplicate ACK / replay). if (localRequestIdsRef.current.has(data.id)) return; // Fallback for cross-tab broadcasts or cases where the // transport isn't expecting a resume. activeStreamRef.current = null; activeStreamRef.current = { id: data.id, messageId: nanoid(), continuation: false, parts: [] }; agentRef.current.send( JSON.stringify({ type: MessageType.CF_AGENT_STREAM_RESUME_ACK, id: data.id }) ); break; case MessageType.CF_AGENT_USE_CHAT_RESPONSE: { // Skip if this is a response to a request this tab initiated // (handled by the transport listener instead). // On done:true, also clean up the ID — it may have been kept in the // set after an abort to prevent in-flight chunks from creating a // duplicate message (see ws-chat-transport.ts onAbort). if (localRequestIdsRef.current.has(data.id)) { if (data.done) { localRequestIdsRef.current.delete(data.id); } return; } // For continuations, find the last assistant message ID to append to const isContinuation = data.continuation === true; // Initialize stream state for broadcasts from other tabs if ( !activeStreamRef.current || activeStreamRef.current.id !== data.id ) { let messageId = nanoid(); let existingParts: ChatMessage["parts"] = []; let existingMetadata: Record | undefined; // For continuations, use the last assistant message's ID, parts, and metadata if (isContinuation) { const currentMessages = messagesRef.current; for (let i = currentMessages.length - 1; i >= 0; i--) { if (currentMessages[i].role === "assistant") { messageId = currentMessages[i].id; existingParts = [...currentMessages[i].parts]; if (currentMessages[i].metadata != null) { existingMetadata = { ...(currentMessages[i].metadata as Record< string, unknown >) }; } break; } } } activeStreamRef.current = { id: data.id, messageId, continuation: isContinuation, parts: existingParts, metadata: existingMetadata }; } const activeMsg = activeStreamRef.current; const isReplay = data.replay === true; if (data.body?.trim()) { try { const chunkData = JSON.parse(data.body); // Apply chunk to parts using shared parser. // Handles text, reasoning, file, source, tool lifecycle // (including streaming: tool-input-start, tool-input-delta, // tool-input-available, tool-output-available), step, and // data-* chunks. const handled = applyChunkToParts( activeMsg.parts as MessageParts, chunkData ); // Fire onData callback for data-* parts (stream resumption // and cross-tab broadcasts). For the transport path (new // messages from this tab), the AI SDK's pipeline invokes // onData internally. if ( typeof chunkData.type === "string" && chunkData.type.startsWith("data-") && onDataRef.current ) { onDataRef.current(chunkData); } // Capture message metadata from start/finish/message-metadata // chunks. These carry metadata like timestamps, model info, and // token usage that should be attached at the message level. if ( !handled && (chunkData.type === "start" || chunkData.type === "finish" || chunkData.type === "message-metadata") ) { if ( chunkData.messageId != null && chunkData.type === "start" && !isContinuation ) { activeMsg.messageId = chunkData.messageId; } if (chunkData.messageMetadata != null) { activeMsg.metadata = activeMsg.metadata ? { ...activeMsg.metadata, ...chunkData.messageMetadata } : { ...chunkData.messageMetadata }; } } // For replayed chunks, skip intermediate setMessages calls. // Replayed chunks arrive synchronously in a tight loop, so React // would batch all state updates into a single render anyway — // causing intermediate states (like "Thinking...") to be lost. // We defer the render until replay is complete (done signal). if (!isReplay) { flushActiveStreamToMessages(activeMsg); } } catch (parseError) { console.warn( "[useAgentChat] Failed to parse stream chunk:", parseError instanceof Error ? parseError.message : parseError, "body:", data.body?.slice(0, 100) ); } } // On completion or error, flush final state to messages if (data.done || data.error) { // For replayed streams, this is the single render point — // all parts have been accumulated, now render them at once. if (isReplay && activeMsg) { flushActiveStreamToMessages(activeMsg); } activeStreamRef.current = null; } else if (data.replayComplete && activeMsg) { // Replay of stored chunks is complete but the stream is still // active (e.g. model is still thinking). Flush the accumulated // parts to React state so the UI shows progress. Without this, // replayed chunks sit in activeStreamRef unflushed until the // next live chunk arrives. flushActiveStreamToMessages(activeMsg); } break; } } } agent.addEventListener("message", onAgentMessage); // Stream resume is now primarily handled by the transport's // reconnectToStream (which sends CF_AGENT_STREAM_RESUME_REQUEST). // The onAgentMessage handler above serves as fallback for cross-tab // broadcasts and cases where the transport didn't handle the resume. return () => { agent.removeEventListener("message", onAgentMessage); // Clear active stream state on cleanup to prevent memory leak activeStreamRef.current = null; }; }, [ agent, setMessages, resume, flushActiveStreamToMessages, customTransport ]); // ── DEPRECATED: addToolResult wrapper with confirmation batching ──── // This wrapper is deprecated. Use addToolOutput or addToolApprovalResponse instead. const addToolResultAndSendMessage: typeof addToolResult = async (args) => { const { toolCallId } = args; const toolName = "tool" in args ? args.tool : ""; const output = "output" in args ? args.output : undefined; // Send tool result to server (server is source of truth) // Include flag to tell server whether to auto-continue agentRef.current.send( JSON.stringify({ type: MessageType.CF_AGENT_TOOL_RESULT, toolCallId, toolName, output, autoContinue: autoContinueAfterToolResult, clientTools: toolsRef.current ? extractClientToolSchemas(toolsRef.current) : undefined }) ); setClientToolResults((prev) => new Map(prev).set(toolCallId, output)); // Call AI SDK's addToolResult for local state update (non-blocking) // We don't await this since clientToolResults provides immediate UI feedback addToolResult(args); // If server auto-continuation is disabled, client needs to trigger continuation if (!autoContinueAfterToolResult) { // Use legacy behavior: batch confirmations or send immediately if (!autoSendAfterAllConfirmationsResolved) { // Always send immediately sendMessage(); return; } // Wait for all confirmations before sending const pending = pendingConfirmationsRef.current?.toolCallIds; if (!pending) { sendMessage(); return; } const wasLast = pending.size === 1 && pending.has(toolCallId); if (pending.has(toolCallId)) { pending.delete(toolCallId); } if (wasLast || pending.size === 0) { sendMessage(); } } // If autoContinueAfterToolResult is true, server handles continuation }; // Wrapper that sends tool approval to server before updating local state. // This prevents duplicate messages by ensuring server updates the message // in place with the existing ID, rather than relying on ID resolution // when sendMessage() is called later. const addToolApprovalResponseAndNotifyServer: typeof addToolApprovalResponse = (args) => { const { id: approvalId, approved } = args; // Find the toolCallId from the approval ID // The approval ID is stored on the tool part's approval.id field let toolCallId: string | undefined; for (const msg of messagesRef.current) { for (const part of msg.parts) { if ( "toolCallId" in part && "approval" in part && (part.approval as { id?: string })?.id === approvalId ) { toolCallId = part.toolCallId as string; break; } } if (toolCallId) break; } if (toolCallId) { // Send approval to server first (server updates message in place) sendToolApprovalToServer(toolCallId, approved); } else { console.warn( `[useAgentChat] addToolApprovalResponse: Could not find toolCallId for approval ID "${approvalId}". ` + "Server will not be notified, which may cause duplicate messages." ); } // Call AI SDK's addToolApprovalResponse for local state update addToolApprovalResponse(args); }; // Fix for issue #728: Merge client-side tool results with messages // so tool parts show output-available immediately after execution const messagesWithToolResults = useMemo(() => { if (clientToolResults.size === 0) { return chatMessages; } return chatMessages.map((msg) => ({ ...msg, parts: msg.parts.map((p) => { if ( !("toolCallId" in p) || !("state" in p) || p.state !== "input-available" || !clientToolResults.has(p.toolCallId) ) { return p; } return { ...p, state: "output-available" as const, output: clientToolResults.get(p.toolCallId) }; }) })) as ChatMessage[]; }, [chatMessages, clientToolResults]); // Cleanup stale entries from clientToolResults when messages change // to prevent memory leak in long conversations. // Note: We intentionally exclude clientToolResults from deps to avoid infinite loops. // The functional update form gives us access to the previous state. useEffect(() => { // Collect all current toolCallIds from messages const currentToolCallIds = new Set(); for (const msg of chatMessages) { for (const part of msg.parts) { if ("toolCallId" in part && part.toolCallId) { currentToolCallIds.add(part.toolCallId); } } } // Use functional update to check and clean stale entries atomically setClientToolResults((prev) => { if (prev.size === 0) return prev; // Check if any entries are stale let hasStaleEntries = false; for (const toolCallId of prev.keys()) { if (!currentToolCallIds.has(toolCallId)) { hasStaleEntries = true; break; } } // Only create new Map if there are stale entries to remove if (!hasStaleEntries) return prev; const newMap = new Map(); for (const [id, output] of prev) { if (currentToolCallIds.has(id)) { newMap.set(id, output); } } return newMap; }); // Also cleanup processedToolCalls to prevent issues in long conversations for (const toolCallId of processedToolCalls.current) { if (!currentToolCallIds.has(toolCallId)) { processedToolCalls.current.delete(toolCallId); } } // eslint-disable-next-line react-hooks/exhaustive-deps }, [chatMessages]); // Create addToolOutput function for external use const addToolOutput = useCallback( (opts: AddToolOutputOptions) => { const toolName = opts.toolName ?? ""; sendToolOutputToServer( opts.toolCallId, toolName, opts.output, opts.state, opts.errorText ); // Update local state via AI SDK addToolResult({ tool: toolName, toolCallId: opts.toolCallId, output: opts.state === "output-error" ? (opts.errorText ?? "Tool execution denied by user") : opts.output }); }, [sendToolOutputToServer, addToolResult] ); return { ...useChatHelpers, messages: messagesWithToolResults, /** * Provide output for a tool call. Use this for tools that require user interaction * or client-side execution. */ addToolOutput, /** * @deprecated Use `addToolOutput` instead. */ addToolResult: addToolResultAndSendMessage, /** * Respond to a tool approval request. Use this for tools with `needsApproval`. * This wrapper notifies the server before updating local state, preventing * duplicate messages when sendMessage() is called afterward. */ addToolApprovalResponse: addToolApprovalResponseAndNotifyServer, clearHistory: () => { setMessages([]); setClientToolResults(new Map()); processedToolCalls.current.clear(); agent.send( JSON.stringify({ type: MessageType.CF_AGENT_CHAT_CLEAR }) ); }, setMessages: (messagesOrUpdater: Parameters[0]) => { // Resolve functional updaters to get the actual messages array // before syncing to server. Without this, updater functions would // send an empty array and wipe server-side messages. let resolvedMessages: ChatMessage[]; if (typeof messagesOrUpdater === "function") { resolvedMessages = messagesOrUpdater(messagesRef.current); } else { resolvedMessages = messagesOrUpdater; } setMessages(resolvedMessages); agent.send( JSON.stringify({ messages: resolvedMessages, type: MessageType.CF_AGENT_CHAT_MESSAGES }) ); } }; }