branch:
message-builder.ts
13563 bytesRaw
/**
 * Shared message builder for reconstructing UIMessage parts from stream chunks.
 *
 * Used by both the server (_streamSSEReply) and client (onAgentMessage in react.tsx)
 * to avoid duplicating the chunk-type switch/case logic. The server handles additional
 * chunk types (tool-input-start, tool-input-delta, etc.) on top of this shared base.
 *
 * Operates on a mutable parts array for performance (avoids allocating new arrays
 * on every chunk during streaming).
 */

import type { UIMessage } from "ai";

/** The parts array type from UIMessage */
export type MessageParts = UIMessage["parts"];

/** A single part from the UIMessage parts array */
export type MessagePart = MessageParts[number];

/**
 * Parsed chunk data from an AI SDK stream event.
 * This is the JSON-parsed body of a CF_AGENT_USE_CHAT_RESPONSE message,
 * or the `data:` payload of an SSE line.
 */
export type StreamChunkData = {
  type: string;
  id?: string;
  delta?: string;
  text?: string;
  mediaType?: string;
  url?: string;
  sourceId?: string;
  title?: string;
  filename?: string;
  toolCallId?: string;
  toolName?: string;
  input?: unknown;
  inputTextDelta?: string;
  output?: unknown;
  state?: string;
  errorText?: string;
  /** When true, the output is preliminary (may be updated by a later chunk) */
  preliminary?: boolean;
  /** Approval ID for tools with needsApproval */
  approvalId?: string;
  providerMetadata?: Record<string, unknown>;
  /** Whether the tool was executed by the provider (e.g. Gemini code execution) */
  providerExecuted?: boolean;
  /** Payload for data-* parts (developer-defined typed JSON) */
  data?: unknown;
  /** When true, data parts are ephemeral and not persisted to message.parts */
  transient?: boolean;
  [key: string]: unknown;
};

/**
 * Applies a stream chunk to a mutable parts array, building up the message
 * incrementally. Returns true if the chunk was handled, false if it was
 * an unrecognized type (caller may handle it with additional logic).
 *
 * Handles all common chunk types that both server and client need:
 * - text-start / text-delta / text-end
 * - reasoning-start / reasoning-delta / reasoning-end
 * - file
 * - source-url / source-document
 * - tool-input-start / tool-input-delta / tool-input-available / tool-input-error
 * - tool-output-available / tool-output-error
 * - step-start (aliased from start-step)
 * - data-* (developer-defined typed JSON blobs)
 *
 * @param parts - The mutable parts array to update
 * @param chunk - The parsed stream chunk data
 * @returns true if handled, false if the chunk type is not recognized
 */
export function applyChunkToParts(
  parts: MessagePart[],
  chunk: StreamChunkData
): boolean {
  switch (chunk.type) {
    case "text-start": {
      parts.push({
        type: "text",
        text: "",
        state: "streaming"
      } as MessagePart);
      return true;
    }

    case "text-delta": {
      const lastTextPart = findLastPartByType(parts, "text");
      if (lastTextPart && lastTextPart.type === "text") {
        (lastTextPart as { text: string }).text += chunk.delta ?? "";
      } else {
        // No text-start received — create a new text part (stream resumption fallback)
        parts.push({
          type: "text",
          text: chunk.delta ?? "",
          state: "streaming"
        } as MessagePart);
      }
      return true;
    }

    case "text-end": {
      const lastTextPart = findLastPartByType(parts, "text");
      if (lastTextPart && "state" in lastTextPart) {
        (lastTextPart as { state: string }).state = "done";
      }
      return true;
    }

    case "reasoning-start": {
      parts.push({
        type: "reasoning",
        text: "",
        state: "streaming"
      } as MessagePart);
      return true;
    }

    case "reasoning-delta": {
      const lastReasoningPart = findLastPartByType(parts, "reasoning");
      if (lastReasoningPart && lastReasoningPart.type === "reasoning") {
        (lastReasoningPart as { text: string }).text += chunk.delta ?? "";
      } else {
        // No reasoning-start received — create a new reasoning part (stream resumption fallback)
        parts.push({
          type: "reasoning",
          text: chunk.delta ?? "",
          state: "streaming"
        } as MessagePart);
      }
      return true;
    }

    case "reasoning-end": {
      const lastReasoningPart = findLastPartByType(parts, "reasoning");
      if (lastReasoningPart && "state" in lastReasoningPart) {
        (lastReasoningPart as { state: string }).state = "done";
      }
      return true;
    }

    case "file": {
      parts.push({
        type: "file",
        mediaType: chunk.mediaType,
        url: chunk.url
      } as MessagePart);
      return true;
    }

    case "source-url": {
      parts.push({
        type: "source-url",
        sourceId: chunk.sourceId,
        url: chunk.url,
        title: chunk.title,
        providerMetadata: chunk.providerMetadata
      } as MessagePart);
      return true;
    }

    case "source-document": {
      parts.push({
        type: "source-document",
        sourceId: chunk.sourceId,
        mediaType: chunk.mediaType,
        title: chunk.title,
        filename: chunk.filename,
        providerMetadata: chunk.providerMetadata
      } as MessagePart);
      return true;
    }

    case "tool-input-start": {
      // Create a tool part in input-streaming state with no input yet.
      // Cross-tab clients see the tool appear immediately with "streaming" indicator.
      parts.push({
        type: `tool-${chunk.toolName}`,
        toolCallId: chunk.toolCallId,
        toolName: chunk.toolName,
        state: "input-streaming",
        input: undefined,
        ...(chunk.providerExecuted != null
          ? { providerExecuted: chunk.providerExecuted }
          : {}),
        ...(chunk.providerMetadata != null
          ? { callProviderMetadata: chunk.providerMetadata }
          : {}),
        ...(chunk.title != null ? { title: chunk.title } : {})
      } as MessagePart);
      return true;
    }

    case "tool-input-delta": {
      // Update the existing tool part with partial input as it streams in.
      const toolPart = findToolPartByCallId(parts, chunk.toolCallId);
      if (toolPart) {
        (toolPart as Record<string, unknown>).input = chunk.input;
      }
      return true;
    }

    case "tool-input-available": {
      // Finalize the tool input. If tool-input-start was received, update
      // the existing part; otherwise create a new one (for non-streaming tools).
      const existing = findToolPartByCallId(parts, chunk.toolCallId);
      if (existing) {
        const p = existing as Record<string, unknown>;
        p.state = "input-available";
        p.input = chunk.input;
        if (chunk.providerExecuted != null) {
          p.providerExecuted = chunk.providerExecuted;
        }
        if (chunk.providerMetadata != null) {
          p.callProviderMetadata = chunk.providerMetadata;
        }
        if (chunk.title != null) {
          p.title = chunk.title;
        }
      } else {
        parts.push({
          type: `tool-${chunk.toolName}`,
          toolCallId: chunk.toolCallId,
          toolName: chunk.toolName,
          state: "input-available",
          input: chunk.input,
          ...(chunk.providerExecuted != null
            ? { providerExecuted: chunk.providerExecuted }
            : {}),
          ...(chunk.providerMetadata != null
            ? { callProviderMetadata: chunk.providerMetadata }
            : {}),
          ...(chunk.title != null ? { title: chunk.title } : {})
        } as MessagePart);
      }
      return true;
    }

    case "tool-input-error": {
      // Tool input parsing failed. Update existing part or create one.
      const existing = findToolPartByCallId(parts, chunk.toolCallId);
      if (existing) {
        const p = existing as Record<string, unknown>;
        p.state = "output-error";
        p.errorText = chunk.errorText;
        p.input = chunk.input;
        if (chunk.providerExecuted != null) {
          p.providerExecuted = chunk.providerExecuted;
        }
        if (chunk.providerMetadata != null) {
          p.callProviderMetadata = chunk.providerMetadata;
        }
      } else {
        parts.push({
          type: `tool-${chunk.toolName}`,
          toolCallId: chunk.toolCallId,
          toolName: chunk.toolName,
          state: "output-error",
          input: chunk.input,
          errorText: chunk.errorText,
          ...(chunk.providerExecuted != null
            ? { providerExecuted: chunk.providerExecuted }
            : {}),
          ...(chunk.providerMetadata != null
            ? { callProviderMetadata: chunk.providerMetadata }
            : {})
        } as MessagePart);
      }
      return true;
    }

    case "tool-approval-request": {
      // Tool requires user approval before executing.
      // Transition the tool part to approval-requested state with the approval ID.
      const toolPart = findToolPartByCallId(parts, chunk.toolCallId);
      if (toolPart) {
        const p = toolPart as Record<string, unknown>;
        p.state = "approval-requested";
        p.approval = { id: chunk.approvalId };
      }
      return true;
    }

    case "tool-output-denied": {
      // User rejected the tool approval request.
      const toolPart = findToolPartByCallId(parts, chunk.toolCallId);
      if (toolPart) {
        const p = toolPart as Record<string, unknown>;
        p.state = "output-denied";
      }
      return true;
    }

    case "tool-output-available": {
      // Update existing tool part with output.
      // Supports `preliminary: true` for streaming tool results —
      // the output may be updated by a subsequent chunk.
      const toolPart = findToolPartByCallId(parts, chunk.toolCallId);
      if (toolPart) {
        const p = toolPart as Record<string, unknown>;
        p.state = "output-available";
        p.output = chunk.output;
        if (chunk.preliminary !== undefined) {
          p.preliminary = chunk.preliminary;
        }
      }
      return true;
    }

    case "tool-output-error": {
      // Tool execution failed. Update the existing tool part.
      const toolPart = findToolPartByCallId(parts, chunk.toolCallId);
      if (toolPart) {
        const p = toolPart as Record<string, unknown>;
        p.state = "output-error";
        p.errorText = chunk.errorText;
      }
      return true;
    }

    // Both "step-start" (client convention) and "start-step" (server convention)
    case "step-start":
    case "start-step": {
      parts.push({ type: "step-start" } as MessagePart);
      return true;
    }

    default: {
      // https://ai-sdk.dev/docs/ai-sdk-ui/streaming-data
      if (chunk.type.startsWith("data-")) {
        // Transient parts are ephemeral — the AI SDK client fires an onData
        // callback instead of adding them to message.parts. On the server we
        // still broadcast them (so connected clients see them in real time)
        // but skip persisting them into the stored message parts.
        if (chunk.transient) {
          return true;
        }

        // Reconciliation: if a part with the same type AND id already exists,
        // update its data in-place instead of appending a duplicate.
        if (chunk.id != null) {
          const existing = findDataPartByTypeAndId(parts, chunk.type, chunk.id);
          if (existing) {
            (existing as Record<string, unknown>).data = chunk.data;
            return true;
          }
        }

        // Append new data parts to the array directly.
        // Note: `chunk.data` should always be provided — if omitted, the
        // persisted part will have `data: undefined` which JSON.stringify
        // drops, so the part will have no `data` field on reload.
        // The cast is needed because UIMessage["parts"] doesn't include
        // data-* types in its union because they're an open extension point.
        parts.push({
          type: chunk.type,
          ...(chunk.id != null && { id: chunk.id }),
          data: chunk.data
        } as MessagePart);
        return true;
      }

      return false;
    }
  }
}

/**
 * Finds the last part in the array matching the given type.
 * Searches from the end for efficiency (the part we want is usually recent).
 */
function findLastPartByType(
  parts: MessagePart[],
  type: string
): MessagePart | undefined {
  for (let i = parts.length - 1; i >= 0; i--) {
    if (parts[i].type === type) {
      return parts[i];
    }
  }
  return undefined;
}

/**
 * Finds a tool part by its toolCallId.
 * Searches from the end since the tool part is usually recent.
 */
function findToolPartByCallId(
  parts: MessagePart[],
  toolCallId: string | undefined
): MessagePart | undefined {
  if (!toolCallId) return undefined;
  for (let i = parts.length - 1; i >= 0; i--) {
    const p = parts[i];
    if ("toolCallId" in p && p.toolCallId === toolCallId) {
      return p;
    }
  }
  return undefined;
}

/**
 * Finds a data part by its type and id for reconciliation.
 * Data parts use type+id as a composite key so when the same combination
 * is seen again, the existing part's data is updated in-place.
 */
function findDataPartByTypeAndId(
  parts: MessagePart[],
  type: string,
  id: string
): MessagePart | undefined {
  for (let i = parts.length - 1; i >= 0; i--) {
    const p = parts[i];
    if (p.type === type && "id" in p && (p as { id: string }).id === id) {
      return p;
    }
  }
  return undefined;
}