branch:
index.ts
87001 bytesRaw
import type {
  UIMessage as ChatMessage,
  JSONSchema7,
  ProviderMetadata,
  ReasoningUIPart,
  StreamTextOnFinishCallback,
  TextUIPart,
  Tool,
  ToolSet,
  UIMessageChunk
} from "ai";
import { tool, jsonSchema } from "ai";
import {
  Agent,
  __DO_NOT_USE_WILL_BREAK__agentContext as agentContext,
  type AgentContext,
  type Connection,
  type ConnectionContext,
  type WSMessage
} from "agents";

import {
  MessageType,
  type IncomingMessage,
  type OutgoingMessage
} from "./types";
import { autoTransformMessages } from "./ai-chat-v5-migration";
import { applyChunkToParts } from "./message-builder";
import { ResumableStream } from "./resumable-stream";
import { nanoid } from "nanoid";

/** Shared encoder for UTF-8 byte length measurement */
const textEncoder = new TextEncoder();

/**
 * Validates that a parsed message has the minimum required structure.
 * Returns false for messages that would cause runtime errors downstream
 * (e.g. in convertToModelMessages or the UI layer).
 *
 * Checks:
 * - `id` is a non-empty string
 * - `role` is one of the valid roles
 * - `parts` is an array (may be empty — the AI SDK enforces nonempty
 *   on incoming messages, but we are lenient on persisted data)
 */
function isValidMessageStructure(msg: unknown): msg is ChatMessage {
  if (typeof msg !== "object" || msg === null) return false;
  const m = msg as Record<string, unknown>;

  if (typeof m.id !== "string" || m.id.length === 0) return false;

  if (m.role !== "user" && m.role !== "assistant" && m.role !== "system") {
    return false;
  }

  if (!Array.isArray(m.parts)) return false;

  return true;
}

/**
 * Schema for a client-defined tool sent from the browser.
 * These tools are executed on the client, not the server.
 *
 * **For most apps**, define tools on the server with `tool()` from `"ai"` —
 * you get full Zod type safety, server-side execution, and simpler code.
 * Use `onToolCall` in `useAgentChat` for tools that need client-side execution.
 *
 * **For SDKs and platforms** where the tool surface is determined dynamically
 * by the embedding application at runtime, client tool schemas let the
 * client register tools the server does not know about at deploy time.
 *
 * Note: Uses `parameters` (JSONSchema7) rather than AI SDK's `inputSchema`
 * because this is the wire format. Zod schemas cannot be serialized.
 */
export type ClientToolSchema = {
  /** Unique name for the tool */
  name: string;
  /** Human-readable description of what the tool does */
  description?: Tool["description"];
  /** JSON Schema defining the tool's input parameters */
  parameters?: JSONSchema7;
};

/**
 * Options passed to the onChatMessage handler.
 */
export type OnChatMessageOptions = {
  /**
   * Unique ID for this chat message exchange.
   *
   * For initial user messages this is the client-generated ID from the
   * `CF_AGENT_USE_CHAT_REQUEST` WebSocket frame. For tool continuations
   * (auto-continue after client tool results or approvals) this is a
   * server-generated ID.
   */
  requestId: string;
  /** AbortSignal for cancelling the request */
  abortSignal?: AbortSignal;
  /**
   * Tool schemas sent from the client for dynamic tool registration.
   * These represent tools that will be executed on the client side.
   * Use `createToolsFromClientSchemas()` to convert these to AI SDK tool format.
   *
   * **For most apps**, you do not need this — define tools on the server with
   * `tool()` from `"ai"` and use `onToolCall` for client-side execution.
   *
   * **For SDKs and platforms** where tools are defined dynamically by the
   * client at runtime and the server does not know the tool surface ahead
   * of time, this field carries the client-provided tool schemas.
   */
  clientTools?: ClientToolSchema[];
  /**
   * Custom body data sent from the client via `prepareSendMessagesRequest`
   * or the AI SDK's `body` option in `sendMessage`.
   *
   * Contains all fields from the request body except `messages` and `clientTools`,
   * which are handled separately.
   *
   * During tool continuations (auto-continue after client tool results), this
   * contains the body from the most recent chat request. The value is persisted
   * to SQLite so it survives Durable Object hibernation. It is cleared when the
   * chat is cleared via `CF_AGENT_CHAT_CLEAR`.
   */
  body?: Record<string, unknown>;
};

/**
 * Converts client tool schemas to AI SDK tool format.
 *
 * These tools have no `execute` function — when the AI model calls them,
 * the tool call is sent back to the client for execution.
 *
 * **For most apps**, define tools on the server with `tool()` from `"ai"`
 * for full Zod type safety. This helper is intended for SDK/platform use
 * cases where the tool surface is determined dynamically by the client.
 *
 * @param clientTools - Array of tool schemas from the client
 * @returns Record of AI SDK tools that can be spread into your tools object
 *
 * @example
 * ```typescript
 * // In onChatMessage:
 * const tools = {
 *   ...createToolsFromClientSchemas(options.clientTools),
 *   // server-defined tools with execute:
 *   myServerTool: tool({ ... }),
 * };
 * ```
 */
export function createToolsFromClientSchemas(
  clientTools?: ClientToolSchema[]
): ToolSet {
  if (!clientTools || clientTools.length === 0) {
    return {};
  }

  // Check for duplicate tool names
  const seenNames = new Set<string>();
  for (const t of clientTools) {
    if (seenNames.has(t.name)) {
      console.warn(
        `[createToolsFromClientSchemas] Duplicate tool name "${t.name}" found. Later definitions will override earlier ones.`
      );
    }
    seenNames.add(t.name);
  }

  return Object.fromEntries(
    clientTools.map((t) => [
      t.name,
      tool({
        description: t.description ?? "",
        inputSchema: jsonSchema(t.parameters ?? { type: "object" })
        // No execute function = tool call is sent back to client
      })
    ])
  );
}

const decoder = new TextDecoder();

/**
 * Extension of Agent with built-in chat capabilities
 * @template Env Environment type containing bindings
 */
export class AIChatAgent<
  Env extends Cloudflare.Env = Cloudflare.Env,
  State = unknown
> extends Agent<Env, State> {
  /**
   * Map of message `id`s to `AbortController`s
   * useful to propagate request cancellation signals for any external calls made by the agent
   */
  private _chatMessageAbortControllers: Map<string, AbortController>;

  /**
   * Resumable stream manager -- handles chunk buffering, persistence, and replay.
   * @internal Protected for testing purposes.
   */
  protected _resumableStream!: ResumableStream;

  /**
   * The message currently being streamed. Used to apply tool results
   * before the message is persisted.
   * @internal
   */
  private _streamingMessage: ChatMessage | null = null;

  /**
   * Tracks the ID of a streaming message that was persisted early due to
   * a tool entering approval-requested state. When set, stream completion
   * updates the existing persisted message instead of appending a new one.
   * @internal
   */
  private _approvalPersistedMessageId: string | null = null;

  /**
   * Promise that resolves when the current stream completes.
   * Used to wait for message persistence before continuing after tool results.
   * @internal
   */
  private _streamCompletionPromise: Promise<void> | null = null;
  private _streamCompletionResolve: (() => void) | null = null;

  /**
   * Set of connection IDs that are pending stream resume.
   * These connections have received CF_AGENT_STREAM_RESUMING but haven't sent ACK yet.
   * They should be excluded from live stream broadcasts until they ACK.
   * @internal
   */
  private _pendingResumeConnections: Set<string> = new Set();

  /**
   * Client tool schemas from the most recent chat request.
   * Stored so they can be passed to onChatMessage during tool continuations.
   * @internal
   */
  private _lastClientTools: ClientToolSchema[] | undefined;

  /**
   * Custom body data from the most recent chat request.
   * Stored so it can be passed to onChatMessage during tool continuations.
   * @internal
   */
  private _lastBody: Record<string, unknown> | undefined;

  /**
   * Cache of last-persisted JSON for each message ID.
   * Used for incremental persistence: skip SQL writes for unchanged messages.
   * Lost on hibernation, repopulated from SQLite on wake.
   * @internal
   */
  private _persistedMessageCache: Map<string, string> = new Map();

  /** Maximum serialized message size before compaction (bytes). 1.8MB with headroom below SQLite's 2MB limit. */
  private static ROW_MAX_BYTES = 1_800_000;

  /** Measure UTF-8 byte length of a string (accurate for SQLite row limits). */
  private static _byteLength(s: string): number {
    return textEncoder.encode(s).byteLength;
  }

  /**
   * Maximum number of messages to keep in SQLite storage.
   * When the conversation exceeds this limit, oldest messages are deleted
   * after each persist. Set to `undefined` (default) for no limit.
   *
   * This controls storage only — it does not affect what's sent to the LLM.
   * Use `pruneMessages()` from the AI SDK in your `onChatMessage` to control
   * LLM context separately.
   *
   * @example
   * ```typescript
   * class MyAgent extends AIChatAgent<Env> {
   *   maxPersistedMessages = 100; // Keep last 100 messages in storage
   * }
   * ```
   */
  maxPersistedMessages: number | undefined = undefined;

  /**
   * When enabled, waits for all MCP server connections to be ready before
   * calling `onChatMessage`. This prevents the race condition where
   * `getAITools()` returns an incomplete set because connections are still
   * restoring after Durable Object hibernation.
   *
   * - `false` (default) — non-blocking; `onChatMessage` runs immediately.
   * - `true` — waits indefinitely for all connections to settle.
   * - `{ timeout: number }` — waits up to `timeout` milliseconds.
   *
   * For lower-level control, call `this.mcp.waitForConnections()` directly
   * inside your `onChatMessage` instead.
   *
   * @example
   * ```typescript
   * class MyAgent extends AIChatAgent<Env> {
   *   waitForMcpConnections = true;
   * }
   * ```
   *
   * @example
   * ```typescript
   * class MyAgent extends AIChatAgent<Env> {
   *   waitForMcpConnections = { timeout: 10_000 };
   * }
   * ```
   */
  waitForMcpConnections: boolean | { timeout: number } = { timeout: 10_000 };

  /** Array of chat messages for the current conversation */
  messages: ChatMessage[];

  constructor(ctx: AgentContext, env: Env) {
    super(ctx, env);
    this.sql`create table if not exists cf_ai_chat_agent_messages (
      id text primary key,
      message text not null,
      created_at datetime default current_timestamp
    )`;

    // Key-value table for request context that must survive hibernation
    // (e.g., custom body fields, client tools from the last chat request).
    this.sql`create table if not exists cf_ai_chat_request_context (
      key text primary key,
      value text not null
    )`;

    // Restore request context from SQLite (survives hibernation)
    this._restoreRequestContext();

    // Initialize resumable stream manager (creates its own tables + restores state)
    this._resumableStream = new ResumableStream(this.sql.bind(this));

    // Load messages and automatically transform them to v5 format.
    // Note: _loadMessagesFromDb() runs structural validation which requires
    // `parts` to be an array. Legacy v4 messages (with `content` instead of
    // `parts`) would fail this check — but that's fine because autoTransformMessages
    // already migrated them on a previous load, and persistMessages wrote them back.
    // Any message still without `parts` at this point is genuinely corrupt.
    const rawMessages = this._loadMessagesFromDb();

    // Automatic migration following https://jhak.im/blog/ai-sdk-migration-handling-previously-saved-messages
    this.messages = autoTransformMessages(rawMessages);

    this._chatMessageAbortControllers = new Map();
    const _onConnect = this.onConnect.bind(this);
    this.onConnect = async (connection: Connection, ctx: ConnectionContext) => {
      // Notify client about active streams that can be resumed
      if (this._resumableStream.hasActiveStream()) {
        this._notifyStreamResuming(connection);
      }
      // Call consumer's onConnect
      return _onConnect(connection, ctx);
    };

    // Wrap onClose to clean up pending resume connections
    const _onClose = this.onClose.bind(this);
    this.onClose = async (
      connection: Connection,
      code: number,
      reason: string,
      wasClean: boolean
    ) => {
      // Clean up pending resume state for this connection
      this._pendingResumeConnections.delete(connection.id);
      // Call consumer's onClose
      return _onClose(connection, code, reason, wasClean);
    };

    // Wrap onMessage
    const _onMessage = this.onMessage.bind(this);
    this.onMessage = async (connection: Connection, message: WSMessage) => {
      // Handle AIChatAgent's internal messages first
      if (typeof message === "string") {
        let data: IncomingMessage;
        try {
          data = JSON.parse(message) as IncomingMessage;
        } catch (_error) {
          // Not JSON, forward to consumer
          return _onMessage(connection, message);
        }

        // Handle chat request
        if (
          data.type === MessageType.CF_AGENT_USE_CHAT_REQUEST &&
          data.init.method === "POST"
        ) {
          // Optionally wait for in-flight MCP connections to settle (e.g. after hibernation restore)
          // so that getAITools() returns the full set of tools in onChatMessage
          if (this.waitForMcpConnections) {
            const timeout =
              typeof this.waitForMcpConnections === "object"
                ? this.waitForMcpConnections.timeout
                : undefined;
            await this.mcp.waitForConnections(
              timeout != null ? { timeout } : undefined
            );
          }

          const { body } = data.init;
          if (!body) {
            console.warn(
              "[AIChatAgent] Received chat request with empty body, ignoring"
            );
            return;
          }

          let parsed: Record<string, unknown>;
          try {
            parsed = JSON.parse(body as string);
          } catch (_parseError) {
            console.warn(
              "[AIChatAgent] Received chat request with invalid JSON body, ignoring"
            );
            return;
          }

          const {
            messages,
            clientTools,
            trigger: _trigger,
            ...customBody
          } = parsed as {
            messages: ChatMessage[];
            clientTools?: ClientToolSchema[];
            trigger?: string;
            [key: string]: unknown;
          };

          // Store client tools and body for use during tool continuations
          this._lastClientTools = clientTools?.length ? clientTools : undefined;
          this._lastBody =
            Object.keys(customBody).length > 0 ? customBody : undefined;
          this._persistRequestContext();

          // Automatically transform any incoming messages
          const transformedMessages = autoTransformMessages(messages);

          this._broadcastChatMessage(
            {
              messages: transformedMessages,
              type: MessageType.CF_AGENT_CHAT_MESSAGES
            },
            [connection.id]
          );

          await this.persistMessages(transformedMessages, [connection.id], {
            _deleteStaleRows: true
          });

          this._emit("message:request");

          const chatMessageId = data.id;
          const abortSignal = this._getAbortSignal(chatMessageId);

          return this._tryCatchChat(async () => {
            // Wrap in agentContext.run() to propagate connection context to onChatMessage
            // This ensures getCurrentAgent() returns the connection inside tool execute functions
            return agentContext.run(
              { agent: this, connection, request: undefined, email: undefined },
              async () => {
                const response = await this.onChatMessage(
                  async (_finishResult) => {
                    // User-provided hook. Cleanup is now handled by _reply,
                    // so this is optional for the user to pass to streamText.
                  },
                  {
                    requestId: chatMessageId,
                    abortSignal,
                    clientTools,
                    body: this._lastBody
                  }
                );

                if (response) {
                  await this._reply(data.id, response, [connection.id], {
                    chatMessageId
                  });
                } else {
                  console.warn(
                    `[AIChatAgent] onChatMessage returned no response for chatMessageId: ${chatMessageId}`
                  );
                  this._broadcastChatMessage(
                    {
                      body: "No response was generated by the agent.",
                      done: true,
                      id: data.id,
                      type: MessageType.CF_AGENT_USE_CHAT_RESPONSE
                    },
                    [connection.id]
                  );
                }
              }
            );
          });
        }

        // Handle clear chat
        if (data.type === MessageType.CF_AGENT_CHAT_CLEAR) {
          this._destroyAbortControllers();
          this.sql`delete from cf_ai_chat_agent_messages`;
          this._resumableStream.clearAll();
          this._pendingResumeConnections.clear();
          this._lastClientTools = undefined;
          this._lastBody = undefined;
          this._persistRequestContext();
          this._persistedMessageCache.clear();
          this.messages = [];
          this._broadcastChatMessage(
            { type: MessageType.CF_AGENT_CHAT_CLEAR },
            [connection.id]
          );
          this._emit("message:clear");
          return;
        }

        // Handle message replacement
        if (data.type === MessageType.CF_AGENT_CHAT_MESSAGES) {
          const transformedMessages = autoTransformMessages(data.messages);
          await this.persistMessages(transformedMessages, [connection.id]);
          return;
        }

        // Handle request cancellation
        if (data.type === MessageType.CF_AGENT_CHAT_REQUEST_CANCEL) {
          this._cancelChatRequest(data.id);
          this._emit("message:cancel", { requestId: data.id });
          return;
        }

        // Handle client-initiated stream resume request.
        // The client sends this after its message handler is registered,
        // avoiding the race condition where CF_AGENT_STREAM_RESUMING sent
        // in onConnect arrives before the client's handler is ready.
        if (data.type === MessageType.CF_AGENT_STREAM_RESUME_REQUEST) {
          if (this._resumableStream.hasActiveStream()) {
            this._notifyStreamResuming(connection);
          } else {
            connection.send(
              JSON.stringify({
                type: MessageType.CF_AGENT_STREAM_RESUME_NONE
              })
            );
          }
          return;
        }

        // Handle stream resume acknowledgment
        if (data.type === MessageType.CF_AGENT_STREAM_RESUME_ACK) {
          this._pendingResumeConnections.delete(connection.id);

          if (
            this._resumableStream.hasActiveStream() &&
            this._resumableStream.activeRequestId === data.id
          ) {
            const orphanedStreamId = this._resumableStream.replayChunks(
              connection,
              this._resumableStream.activeRequestId
            );

            // If the stream was orphaned (restored from SQLite after
            // hibernation with no live reader), reconstruct the partial
            // assistant message from stored chunks and persist it so it
            // survives further page refreshes.
            if (orphanedStreamId) {
              this._persistOrphanedStream(orphanedStreamId);
            }
          }
          return;
        }

        // Handle client-side tool result
        if (data.type === MessageType.CF_AGENT_TOOL_RESULT) {
          const {
            toolCallId,
            toolName,
            output,
            state,
            errorText,
            autoContinue,
            clientTools
          } = data;

          // Update cached client tools so subsequent continuations use the latest schemas
          if (clientTools?.length) {
            this._lastClientTools = clientTools as ClientToolSchema[];
            this._persistRequestContext();
          }

          const overrideState =
            state === "output-error" ? "output-error" : undefined;

          this._emit("tool:result", { toolCallId, toolName });

          // Apply the tool result
          this._applyToolResult(
            toolCallId,
            toolName,
            output,
            overrideState,
            errorText
          ).then((applied) => {
            // Only auto-continue if client requested it (opt-in behavior)
            // This mimics server-executed tool behavior where the LLM
            // automatically continues after seeing tool results
            if (applied && autoContinue) {
              // Wait for the original stream to complete and message to be persisted
              // before calling onChatMessage, so this.messages includes the tool result
              const waitForStream = async () => {
                if (this._streamCompletionPromise) {
                  await this._streamCompletionPromise;
                } else {
                  // TODO: The completion promise can be null if the stream finished
                  // before the tool result arrived (race between stream end and tool
                  // apply). The 500ms fallback is a pragmatic workaround — consider
                  // a more deterministic signal (e.g. always setting the promise).
                  await new Promise((resolve) => setTimeout(resolve, 500));
                }
              };

              waitForStream()
                .then(() => {
                  const continuationId = nanoid();
                  const abortSignal = this._getAbortSignal(continuationId);

                  return this._tryCatchChat(async () => {
                    return agentContext.run(
                      {
                        agent: this,
                        connection,
                        request: undefined,
                        email: undefined
                      },
                      async () => {
                        const response = await this.onChatMessage(
                          async (_finishResult) => {
                            // User-provided hook. Cleanup handled by _reply.
                          },
                          {
                            requestId: continuationId,
                            abortSignal,
                            clientTools: clientTools ?? this._lastClientTools,
                            body: this._lastBody
                          }
                        );

                        if (response) {
                          // Pass continuation flag to merge parts into last assistant message
                          // Note: We pass an empty excludeBroadcastIds array because the sender
                          // NEEDS to receive the continuation stream. Unlike regular chat requests
                          // where aiFetch handles the response, tool continuations have no listener
                          // waiting - the client relies on the broadcast.
                          await this._reply(
                            continuationId,
                            response,
                            [], // Don't exclude sender - they need the continuation
                            {
                              continuation: true,
                              chatMessageId: continuationId
                            }
                          );
                        }
                      }
                    );
                  });
                })
                .catch((error) => {
                  console.error(
                    "[AIChatAgent] Tool continuation failed:",
                    error
                  );
                });
            }
          });
          return;
        }

        // Handle client-side tool approval response
        if (data.type === MessageType.CF_AGENT_TOOL_APPROVAL) {
          const { toolCallId, approved, autoContinue } = data;
          this._emit("tool:approval", { toolCallId, approved });
          this._applyToolApproval(toolCallId, approved).then((applied) => {
            // Auto-continue for both approvals and rejections so the LLM
            // sees the tool_result and can respond accordingly.
            if (applied && autoContinue) {
              const waitForStream = async () => {
                if (this._streamCompletionPromise) {
                  await this._streamCompletionPromise;
                } else {
                  await new Promise((resolve) => setTimeout(resolve, 500));
                }
              };

              waitForStream()
                .then(() => {
                  const continuationId = nanoid();
                  const abortSignal = this._getAbortSignal(continuationId);

                  return this._tryCatchChat(async () => {
                    return agentContext.run(
                      {
                        agent: this,
                        connection,
                        request: undefined,
                        email: undefined
                      },
                      async () => {
                        const response = await this.onChatMessage(
                          async (_finishResult) => {},
                          {
                            requestId: continuationId,
                            abortSignal,
                            clientTools: this._lastClientTools,
                            body: this._lastBody
                          }
                        );

                        if (response) {
                          await this._reply(continuationId, response, [], {
                            continuation: true,
                            chatMessageId: continuationId
                          });
                        }
                      }
                    );
                  });
                })
                .catch((error) => {
                  console.error(
                    "[AIChatAgent] Tool approval continuation failed:",
                    error
                  );
                });
            }
          });
          return;
        }
      }

      // Forward unhandled messages to consumer's onMessage
      return _onMessage(connection, message);
    };

    const _onRequest = this.onRequest.bind(this);
    this.onRequest = async (request: Request) => {
      return this._tryCatchChat(async () => {
        const url = new URL(request.url);
        if (url.pathname.split("/").pop() === "get-messages") {
          return Response.json(this._loadMessagesFromDb());
        }
        return _onRequest(request);
      });
    };
  }

  /**
   * Notify a connection about an active stream that can be resumed.
   * The client should respond with CF_AGENT_STREAM_RESUME_ACK to receive chunks.
   * @param connection - The WebSocket connection to notify
   */
  private _notifyStreamResuming(connection: Connection) {
    if (!this._resumableStream.hasActiveStream()) {
      return;
    }

    // Add connection to pending set - they'll be excluded from live broadcasts
    // until they send ACK to receive the full stream replay
    this._pendingResumeConnections.add(connection.id);

    // Notify client - they will send ACK when ready
    connection.send(
      JSON.stringify({
        type: MessageType.CF_AGENT_STREAM_RESUMING,
        id: this._resumableStream.activeRequestId
      })
    );
  }

  // ── Delegate methods for backward compatibility with tests ─────────
  // These protected methods delegate to _resumableStream so existing
  // test workers that call them directly continue to work.

  /** @internal Delegate to _resumableStream */
  protected get _activeStreamId(): string | null {
    return this._resumableStream?.activeStreamId ?? null;
  }

  /** @internal Delegate to _resumableStream */
  protected get _activeRequestId(): string | null {
    return this._resumableStream?.activeRequestId ?? null;
  }

  /** @internal Delegate to _resumableStream */
  protected _startStream(requestId: string): string {
    return this._resumableStream.start(requestId);
  }

  /** @internal Delegate to _resumableStream */
  protected _completeStream(streamId: string) {
    this._resumableStream.complete(streamId);
    this._pendingResumeConnections.clear();
  }

  /** @internal Delegate to _resumableStream */
  protected _storeStreamChunk(streamId: string, body: string) {
    this._resumableStream.storeChunk(streamId, body);
  }

  /** @internal Delegate to _resumableStream */
  protected _flushChunkBuffer() {
    this._resumableStream.flushBuffer();
  }

  /** @internal Delegate to _resumableStream */
  protected _restoreActiveStream() {
    this._resumableStream.restore();
  }

  /** @internal Delegate to _resumableStream */
  protected _markStreamError(streamId: string) {
    this._resumableStream.markError(streamId);
  }

  /**
   * Reconstruct and persist a partial assistant message from an orphaned
   * stream's stored chunks. Called when the DO wakes from hibernation and
   * discovers an active stream with no live LLM reader.
   *
   * Replays each chunk body through `applyChunkToParts` to rebuild the
   * message parts, then persists the result so it survives further refreshes.
   * @internal
   */
  private _persistOrphanedStream(streamId: string) {
    const chunks = this._resumableStream.getStreamChunks(streamId);
    if (!chunks.length) return;

    const message: ChatMessage = {
      id: `assistant_${Date.now()}_${Math.random().toString(36).slice(2, 11)}`,
      role: "assistant",
      parts: []
    };

    for (const chunk of chunks) {
      try {
        const data = JSON.parse(chunk.body);

        // Capture message ID from the "start" event if present
        if (data.type === "start" && data.messageId != null) {
          message.id = data.messageId;
        }
        if (
          (data.type === "start" ||
            data.type === "finish" ||
            data.type === "message-metadata") &&
          data.messageMetadata != null
        ) {
          message.metadata = message.metadata
            ? { ...message.metadata, ...data.messageMetadata }
            : data.messageMetadata;
        }

        applyChunkToParts(message.parts, data);
      } catch {
        // Skip malformed chunk bodies
      }
    }

    if (message.parts.length > 0) {
      // Check if a message with this ID already exists (e.g., from an
      // early persist during tool approval). Update in place if so.
      const existingIdx = this.messages.findIndex((m) => m.id === message.id);
      const updatedMessages =
        existingIdx >= 0
          ? this.messages.map((m, i) => (i === existingIdx ? message : m))
          : [...this.messages, message];
      this.persistMessages(updatedMessages);
    }
  }

  /**
   * Restore _lastBody and _lastClientTools from SQLite.
   * Called in the constructor so these values survive DO hibernation.
   * @internal
   */
  private _restoreRequestContext() {
    const rows =
      this.sql<{ key: string; value: string }>`
        select key, value from cf_ai_chat_request_context
      ` || [];

    for (const row of rows) {
      try {
        if (row.key === "lastBody") {
          this._lastBody = JSON.parse(row.value);
        } else if (row.key === "lastClientTools") {
          this._lastClientTools = JSON.parse(row.value);
        }
      } catch {
        // Corrupted row — ignore and let the next request overwrite it
      }
    }
  }

  /**
   * Persist _lastBody and _lastClientTools to SQLite so they survive hibernation.
   * Uses upsert (INSERT OR REPLACE) so repeated calls are safe.
   * @internal
   */
  private _persistRequestContext() {
    // Persist or delete body
    if (this._lastBody) {
      this.sql`
        insert or replace into cf_ai_chat_request_context (key, value)
        values ('lastBody', ${JSON.stringify(this._lastBody)})
      `;
    } else {
      this.sql`delete from cf_ai_chat_request_context where key = 'lastBody'`;
    }
    // Persist or delete client tools
    if (this._lastClientTools) {
      this.sql`
        insert or replace into cf_ai_chat_request_context (key, value)
        values ('lastClientTools', ${JSON.stringify(this._lastClientTools)})
      `;
    } else {
      this
        .sql`delete from cf_ai_chat_request_context where key = 'lastClientTools'`;
    }
  }

  private _broadcastChatMessage(message: OutgoingMessage, exclude?: string[]) {
    // Combine explicit exclusions with connections pending stream resume.
    // Pending connections should not receive live stream chunks until they ACK,
    // at which point they'll receive the full replay via _sendStreamChunks.
    const allExclusions = [
      ...(exclude || []),
      ...this._pendingResumeConnections
    ];
    this.broadcast(JSON.stringify(message), allExclusions);
  }

  /**
   * Broadcasts a text event for non-SSE responses.
   * This ensures plain text responses follow the AI SDK v5 stream protocol.
   *
   * @param streamId - The stream identifier for chunk storage
   * @param event - The text event payload (text-start, text-delta with delta, or text-end)
   * @param continuation - Whether this is a continuation of a previous stream
   */
  private _broadcastTextEvent(
    streamId: string,
    event:
      | { type: "text-start"; id: string }
      | { type: "text-delta"; id: string; delta: string }
      | { type: "text-end"; id: string },
    continuation: boolean
  ) {
    const body = JSON.stringify(event);
    this._storeStreamChunk(streamId, body);
    this._broadcastChatMessage({
      body,
      done: false,
      id: event.id,
      type: MessageType.CF_AGENT_USE_CHAT_RESPONSE,
      ...(continuation && { continuation: true })
    });
  }

  private _loadMessagesFromDb(): ChatMessage[] {
    const rows =
      this.sql`select * from cf_ai_chat_agent_messages order by created_at` ||
      [];

    // Populate the persistence cache from DB so incremental persistence
    // can skip SQL writes for messages already stored.
    this._persistedMessageCache.clear();

    return rows
      .map((row) => {
        try {
          const messageStr = row.message as string;
          const parsed = JSON.parse(messageStr) as ChatMessage;

          // Structural validation: ensure required fields exist and have
          // the correct types. This catches corrupted rows, manual tampering,
          // or schema drift from older versions without crashing the agent.
          if (!isValidMessageStructure(parsed)) {
            console.warn(
              `[AIChatAgent] Skipping invalid message ${row.id}: ` +
                "missing or malformed id, role, or parts"
            );
            return null;
          }

          // Cache the raw JSON keyed by message ID
          this._persistedMessageCache.set(parsed.id, messageStr);
          return parsed;
        } catch (error) {
          console.error(`Failed to parse message ${row.id}:`, error);
          return null;
        }
      })
      .filter((msg): msg is ChatMessage => msg !== null);
  }

  private async _tryCatchChat<T>(fn: () => T | Promise<T>) {
    try {
      return await fn();
    } catch (e) {
      throw this.onError(e);
    }
  }

  /**
   * Handle incoming chat messages and generate a response
   * @param onFinish Callback to be called when the response is finished
   * @param options Options including abort signal and client-defined tools
   * @returns Response to send to the client or undefined
   */
  async onChatMessage(
    // oxlint-disable-next-line eslint(no-unused-vars) -- params used by subclass overrides
    onFinish: StreamTextOnFinishCallback<ToolSet>,
    // oxlint-disable-next-line eslint(no-unused-vars) -- params used by subclass overrides
    options?: OnChatMessageOptions
  ): Promise<Response | undefined> {
    throw new Error(
      "received a chat message, override onChatMessage and return a Response to send to the client"
    );
  }

  /**
   * Save messages on the server side
   * @param messages Chat messages to save
   */
  async saveMessages(messages: ChatMessage[]) {
    await this.persistMessages(messages);
    await this._tryCatchChat(async () => {
      const requestId = nanoid();
      const abortSignal = this._getAbortSignal(requestId);
      const response = await this.onChatMessage(() => {}, {
        requestId,
        abortSignal,
        clientTools: this._lastClientTools,
        body: this._lastBody
      });
      if (response) this._reply(requestId, response);
    });
  }

  async persistMessages(
    messages: ChatMessage[],
    excludeBroadcastIds: string[] = [],
    /** @internal */
    options?: { _deleteStaleRows?: boolean }
  ) {
    // Merge incoming messages with existing server state to preserve tool outputs.
    // This is critical for client-side tools: the client sends messages without
    // tool outputs, but the server has them via _applyToolResult.
    const mergedMessages = this._mergeIncomingWithServerState(messages);

    // Persist only new or changed messages (incremental persistence).
    // Compares serialized JSON against a cache of last-persisted versions.
    for (const message of mergedMessages) {
      const sanitizedMessage = this._sanitizeMessageForPersistence(message);
      const resolved = this._resolveMessageForToolMerge(sanitizedMessage);
      const safe = this._enforceRowSizeLimit(resolved);
      const json = JSON.stringify(safe);

      // Skip SQL write if the message is identical to what's already persisted
      if (this._persistedMessageCache.get(safe.id) === json) {
        continue;
      }

      this.sql`
        insert into cf_ai_chat_agent_messages (id, message)
        values (${safe.id}, ${json})
        on conflict(id) do update set message = excluded.message
      `;
      this._persistedMessageCache.set(safe.id, json);
    }

    // Reconcile: delete DB rows not present in the incoming message set.
    // Only safe when the incoming set is a subset of the server state
    // (e.g. regenerate() trims the last assistant message). When the
    // client appends new messages (IDs unknown to the server), it may
    // not have the full history, so deleting "missing" rows would
    // destroy server-generated assistant messages the client hasn't
    // seen yet.
    // This MUST use mergedMessages (post-merge IDs) because
    // _mergeIncomingWithServerState can remap client IDs to server IDs.
    if (options?._deleteStaleRows) {
      const serverIds = new Set(this.messages.map((m) => m.id));
      const isSubsetOfServer = mergedMessages.every((m) => serverIds.has(m.id));

      if (isSubsetOfServer) {
        const keepIds = new Set(mergedMessages.map((m) => m.id));
        const allDbRows =
          this.sql<{ id: string }>`
            select id from cf_ai_chat_agent_messages
          ` || [];
        for (const row of allDbRows) {
          if (!keepIds.has(row.id)) {
            this.sql`
              delete from cf_ai_chat_agent_messages where id = ${row.id}
            `;
            this._persistedMessageCache.delete(row.id);
          }
        }
      }
    }

    // Enforce maxPersistedMessages: delete oldest messages if over the limit
    if (this.maxPersistedMessages != null) {
      this._enforceMaxPersistedMessages();
    }

    // refresh in-memory messages
    const persisted = this._loadMessagesFromDb();
    this.messages = autoTransformMessages(persisted);
    this._broadcastChatMessage(
      {
        messages: mergedMessages,
        type: MessageType.CF_AGENT_CHAT_MESSAGES
      },
      excludeBroadcastIds
    );
  }

  /**
   * Merges incoming messages with existing server state.
   * This preserves tool outputs that the server has (via _applyToolResult)
   * but the client doesn't have yet.
   *
   * @param incomingMessages - Messages from the client
   * @returns Messages with server's tool outputs preserved
   */
  private _mergeIncomingWithServerState(
    incomingMessages: ChatMessage[]
  ): ChatMessage[] {
    // Build a map of toolCallId -> output from existing server messages
    const serverToolOutputs = new Map<string, unknown>();
    for (const msg of this.messages) {
      if (msg.role !== "assistant") continue;
      for (const part of msg.parts) {
        if (
          "toolCallId" in part &&
          "state" in part &&
          part.state === "output-available" &&
          "output" in part
        ) {
          serverToolOutputs.set(
            part.toolCallId as string,
            (part as { output: unknown }).output
          );
        }
      }
    }

    // Merge server's tool outputs into incoming messages.
    // The client may send stale tool states that the server has already advanced:
    //   - input-available: client hasn't received the tool result yet
    //   - approval-requested: client showed the approval UI but hasn't sent a
    //     response yet (server may have already executed via a parallel path)
    //   - approval-responded: client sent an approval but hasn't received the
    //     execution result (server executed it via onChatMessage between turns)
    // In all cases, restore the server's output-available state and output.
    const withMergedToolOutputs =
      serverToolOutputs.size === 0
        ? incomingMessages
        : incomingMessages.map((msg) => {
            if (msg.role !== "assistant") return msg;

            let hasChanges = false;
            const updatedParts = msg.parts.map((part) => {
              if (
                "toolCallId" in part &&
                "state" in part &&
                (part.state === "input-available" ||
                  part.state === "approval-requested" ||
                  part.state === "approval-responded") &&
                serverToolOutputs.has(part.toolCallId as string)
              ) {
                hasChanges = true;
                return {
                  ...part,
                  state: "output-available" as const,
                  output: serverToolOutputs.get(part.toolCallId as string)
                };
              }
              return part;
            }) as ChatMessage["parts"];

            return hasChanges ? { ...msg, parts: updatedParts } : msg;
          });

    return this._reconcileAssistantIdsWithServerState(withMergedToolOutputs);
  }

  /**
   * Reconciles assistant message IDs between incoming client state and server state.
   *
   * The client can keep a different local ID for an assistant message than the one
   * persisted on the server (e.g. optimistic/local IDs). When that full history is
   * sent back, persisting by ID alone creates duplicate assistant rows. To prevent
   * this, we reuse the server ID for assistant messages that match by content,
   * while leaving tool-call messages to _resolveMessageForToolMerge.
   *
   * Uses a two-pass approach:
   *  - Pass 1: resolve all exact-ID matches, claiming server indices.
   *  - Pass 2: content-based matching for remaining non-tool assistant messages,
   *    scanning only unclaimed server indices left-to-right.
   *
   * The two-pass design prevents exact-ID matches from advancing a cursor past
   * server messages that a later incoming message needs for content matching.
   * This fixes mismatches when two assistant messages have identical text
   * (e.g. "Sure", "I understand") — see #1008.
   */
  private _reconcileAssistantIdsWithServerState(
    incomingMessages: ChatMessage[]
  ): ChatMessage[] {
    if (this.messages.length === 0) {
      return incomingMessages;
    }

    // Pass 1: Resolve exact-ID matches first.
    // This prevents content-based matching from claiming a server message
    // that has a direct ID match with a later incoming message.
    const claimedServerIndices = new Set<number>();
    const exactMatchMap = new Map<number, number>();

    for (let i = 0; i < incomingMessages.length; i++) {
      const serverIdx = this.messages.findIndex(
        (sm, si) =>
          !claimedServerIndices.has(si) && sm.id === incomingMessages[i].id
      );
      if (serverIdx !== -1) {
        claimedServerIndices.add(serverIdx);
        exactMatchMap.set(i, serverIdx);
      }
    }

    // Pass 2: Content-based matching for remaining non-tool assistant messages.
    // Scans unclaimed server messages left-to-right to preserve ordering.
    return incomingMessages.map((incomingMessage, incomingIdx) => {
      if (exactMatchMap.has(incomingIdx)) {
        return incomingMessage;
      }

      if (
        incomingMessage.role !== "assistant" ||
        this._hasToolCallPart(incomingMessage)
      ) {
        // Content-based reconciliation is only for non-tool assistant messages.
        // Tool-bearing assistant messages are reconciled by _resolveMessageForToolMerge.
        return incomingMessage;
      }

      const incomingKey = this._assistantMessageContentKey(incomingMessage);
      if (!incomingKey) {
        return incomingMessage;
      }

      for (let i = 0; i < this.messages.length; i++) {
        if (claimedServerIndices.has(i)) {
          continue;
        }
        const serverMessage = this.messages[i];
        if (
          serverMessage.role !== "assistant" ||
          this._hasToolCallPart(serverMessage)
        ) {
          continue;
        }

        if (this._assistantMessageContentKey(serverMessage) === incomingKey) {
          claimedServerIndices.add(i);

          return {
            ...incomingMessage,
            id: serverMessage.id
          };
        }
      }

      return incomingMessage;
    });
  }

  private _hasToolCallPart(message: ChatMessage): boolean {
    return message.parts.some((part) => "toolCallId" in part);
  }

  private _assistantMessageContentKey(
    message: ChatMessage
  ): string | undefined {
    if (message.role !== "assistant") {
      return undefined;
    }

    const sanitized = this._sanitizeMessageForPersistence(message);
    return JSON.stringify(sanitized.parts);
  }

  /**
   * Resolves a message for persistence, handling tool result merging.
   * If the message contains tool parts with a toolCallId that already exists
   * in a server-side message with a different ID, adopts the server's ID.
   * This prevents duplicate rows when the client sends messages with
   * client-generated IDs (e.g. nanoid from the AI SDK) that differ from
   * the server-stamped IDs. Tool call IDs are unique per conversation,
   * so matching by toolCallId is safe regardless of tool state (#1094).
   *
   * @param message - The message to potentially merge
   * @returns The message with the correct ID (either original or merged)
   */
  private _resolveMessageForToolMerge(message: ChatMessage): ChatMessage {
    if (message.role !== "assistant") {
      return message;
    }

    for (const part of message.parts) {
      if ("toolCallId" in part && part.toolCallId) {
        const toolCallId = part.toolCallId as string;

        const existingMessage = this._findMessageByToolCallId(toolCallId);
        if (existingMessage && existingMessage.id !== message.id) {
          return {
            ...message,
            id: existingMessage.id
          };
        }
      }
    }

    return message;
  }

  /**
   * Finds an existing assistant message that contains a tool part with the given toolCallId.
   * Used to detect when a tool result should update an existing message rather than
   * creating a new one.
   *
   * @param toolCallId - The tool call ID to search for
   * @returns The existing message if found, undefined otherwise
   */
  private _findMessageByToolCallId(
    toolCallId: string
  ): ChatMessage | undefined {
    for (const msg of this.messages) {
      if (msg.role !== "assistant") continue;

      for (const part of msg.parts) {
        if ("toolCallId" in part && part.toolCallId === toolCallId) {
          return msg;
        }
      }
    }
    return undefined;
  }

  /**
   * Sanitizes a message for persistence by removing ephemeral provider-specific
   * data that should not be stored or sent back in subsequent requests.
   *
   * Two-step process:
   *
   * 1. **Strip OpenAI ephemeral fields**: The AI SDK's @ai-sdk/openai provider
   *    (v2.0.x+) defaults to using OpenAI's Responses API which assigns unique
   *    itemIds and reasoningEncryptedContent to message parts. When persisted
   *    and sent back, OpenAI rejects duplicate itemIds.
   *
   * 2. **Filter truly empty reasoning parts**: After stripping, reasoning parts
   *    with no text and no remaining providerMetadata are removed. Parts that
   *    still carry providerMetadata (e.g. Anthropic's redacted_thinking blocks
   *    with providerMetadata.anthropic.redactedData) are preserved, as they
   *    contain data required for round-tripping with the provider API.
   *
   * @param message - The message to sanitize
   * @returns A new message with ephemeral provider data removed
   */
  private _sanitizeMessageForPersistence(message: ChatMessage): ChatMessage {
    // First, strip OpenAI-specific ephemeral data from all parts
    const strippedParts = message.parts.map((part) => {
      let sanitizedPart = part;

      // Strip providerMetadata.openai.itemId and reasoningEncryptedContent
      if (
        "providerMetadata" in sanitizedPart &&
        sanitizedPart.providerMetadata &&
        typeof sanitizedPart.providerMetadata === "object" &&
        "openai" in sanitizedPart.providerMetadata
      ) {
        sanitizedPart = this._stripOpenAIMetadata(
          sanitizedPart,
          "providerMetadata"
        );
      }

      // Also check callProviderMetadata for tool parts
      if (
        "callProviderMetadata" in sanitizedPart &&
        sanitizedPart.callProviderMetadata &&
        typeof sanitizedPart.callProviderMetadata === "object" &&
        "openai" in sanitizedPart.callProviderMetadata
      ) {
        sanitizedPart = this._stripOpenAIMetadata(
          sanitizedPart,
          "callProviderMetadata"
        );
      }

      return sanitizedPart;
    }) as ChatMessage["parts"];

    // Then filter out reasoning parts that are truly empty (no text and no
    // remaining providerMetadata). This removes OpenAI placeholders whose
    // metadata was just stripped, while preserving provider-specific blocks
    // like Anthropic's redacted_thinking that carry data in providerMetadata.
    const sanitizedParts = strippedParts.filter((part) => {
      if (part.type === "reasoning") {
        const reasoningPart = part as ReasoningUIPart;
        if (!reasoningPart.text || reasoningPart.text.trim() === "") {
          if (
            "providerMetadata" in reasoningPart &&
            reasoningPart.providerMetadata &&
            typeof reasoningPart.providerMetadata === "object" &&
            Object.keys(reasoningPart.providerMetadata).length > 0
          ) {
            return true;
          }
          return false;
        }
      }
      return true;
    });

    return { ...message, parts: sanitizedParts };
  }

  /**
   * Helper to strip OpenAI-specific ephemeral fields from a metadata object.
   * Removes itemId and reasoningEncryptedContent while preserving other fields.
   */
  private _stripOpenAIMetadata<T extends ChatMessage["parts"][number]>(
    part: T,
    metadataKey: "providerMetadata" | "callProviderMetadata"
  ): T {
    const metadata = (part as Record<string, unknown>)[metadataKey] as {
      openai?: Record<string, unknown>;
      [key: string]: unknown;
    };

    if (!metadata?.openai) return part;

    const openaiMeta = metadata.openai;

    // Remove ephemeral fields: itemId and reasoningEncryptedContent
    const {
      itemId: _itemId,
      reasoningEncryptedContent: _rec,
      ...restOpenai
    } = openaiMeta;

    // Determine what to keep
    const hasOtherOpenaiFields = Object.keys(restOpenai).length > 0;
    const { openai: _openai, ...restMetadata } = metadata;

    let newMetadata: ProviderMetadata | undefined;
    if (hasOtherOpenaiFields) {
      newMetadata = {
        ...restMetadata,
        openai: restOpenai
      } as ProviderMetadata;
    } else if (Object.keys(restMetadata).length > 0) {
      newMetadata = restMetadata as ProviderMetadata;
    }

    // Create new part without the old metadata
    const { [metadataKey]: _oldMeta, ...restPart } = part as Record<
      string,
      unknown
    >;

    if (newMetadata) {
      return { ...restPart, [metadataKey]: newMetadata } as T;
    }
    return restPart as T;
  }

  /**
   * Deletes oldest messages from SQLite when the count exceeds maxPersistedMessages.
   * Called after each persist to keep storage bounded.
   */
  private _enforceMaxPersistedMessages() {
    if (this.maxPersistedMessages == null) return;

    const countResult = this.sql<{ cnt: number }>`
      select count(*) as cnt from cf_ai_chat_agent_messages
    `;
    const count = countResult?.[0]?.cnt ?? 0;

    if (count <= this.maxPersistedMessages) return;

    const excess = count - this.maxPersistedMessages;

    // Delete the oldest messages (by created_at)
    // Also remove them from the persistence cache
    const toDelete = this.sql<{ id: string }>`
      select id from cf_ai_chat_agent_messages 
      order by created_at asc 
      limit ${excess}
    `;

    if (toDelete && toDelete.length > 0) {
      for (const row of toDelete) {
        this.sql`delete from cf_ai_chat_agent_messages where id = ${row.id}`;
        this._persistedMessageCache.delete(row.id);
      }
    }
  }

  /**
   * Enforces SQLite row size limits by compacting tool outputs and text parts
   * when a serialized message exceeds the safety threshold (1.8MB).
   *
   * Only fires in pathological cases (extremely large tool outputs or text).
   * Returns the message unchanged if it fits within limits.
   *
   * Compaction strategy:
   * 1. Compact tool outputs over 1KB (replace with LLM-friendly summary)
   * 2. If still too big, truncate text parts from oldest to newest
   * 3. Add metadata so clients can detect compaction
   *
   * @param message - The message to check
   * @returns The message, compacted if necessary
   */
  private _enforceRowSizeLimit(message: ChatMessage): ChatMessage {
    let json = JSON.stringify(message);
    let size = AIChatAgent._byteLength(json);
    if (size <= AIChatAgent.ROW_MAX_BYTES) return message;

    if (message.role !== "assistant") {
      // Non-assistant messages (user/system) are harder to compact safely.
      // Truncate the entire message JSON as a last resort.
      console.warn(
        `[AIChatAgent] Non-assistant message ${message.id} is ${size} bytes, ` +
          `exceeds row limit. Truncating text parts.`
      );
      return this._truncateTextParts(message);
    }

    console.warn(
      `[AIChatAgent] Message ${message.id} is ${size} bytes, ` +
        `compacting tool outputs to fit SQLite row limit`
    );

    // Pass 1: compact tool outputs
    const compactedToolCallIds: string[] = [];
    const compactedParts = message.parts.map((part) => {
      if (
        "output" in part &&
        "toolCallId" in part &&
        "state" in part &&
        part.state === "output-available"
      ) {
        const outputJson = JSON.stringify((part as { output: unknown }).output);
        if (outputJson.length > 1000) {
          compactedToolCallIds.push(part.toolCallId as string);
          return {
            ...part,
            output:
              "This tool output was too large to persist in storage " +
              `(${outputJson.length} bytes). ` +
              "If the user asks about this data, suggest re-running the tool. " +
              `Preview: ${outputJson.slice(0, 500)}...`
          };
        }
      }
      return part;
    }) as ChatMessage["parts"];

    let result: ChatMessage = {
      ...message,
      parts: compactedParts
    };

    if (compactedToolCallIds.length > 0) {
      result.metadata = {
        ...(result.metadata ?? {}),
        compactedToolOutputs: compactedToolCallIds
      };
    }

    // Check if tool compaction was enough
    json = JSON.stringify(result);
    size = AIChatAgent._byteLength(json);
    if (size <= AIChatAgent.ROW_MAX_BYTES) return result;

    // Pass 2: truncate text parts
    console.warn(
      `[AIChatAgent] Message ${message.id} still ${size} bytes after tool compaction, truncating text parts`
    );
    return this._truncateTextParts(result);
  }

  /**
   * Truncates text parts in a message to fit within the row size limit.
   * Truncates from the first text part forward, keeping the last text part
   * as intact as possible (it is usually the most relevant).
   */
  private _truncateTextParts(message: ChatMessage): ChatMessage {
    const compactedTextPartIndices: number[] = [];
    const parts = [...message.parts];

    // Truncate text parts from oldest to newest until we fit
    for (let i = 0; i < parts.length; i++) {
      const part = parts[i];
      if (part.type === "text" && "text" in part) {
        const text = (part as { text: string }).text;
        if (text.length > 1000) {
          compactedTextPartIndices.push(i);
          parts[i] = {
            ...part,
            text:
              `[Text truncated for storage (${text.length} chars). ` +
              `First 500 chars: ${text.slice(0, 500)}...]`
          } as ChatMessage["parts"][number];

          // Check if we fit now
          const candidate = { ...message, parts };
          if (
            AIChatAgent._byteLength(JSON.stringify(candidate)) <=
            AIChatAgent.ROW_MAX_BYTES
          ) {
            break;
          }
        }
      }
    }

    const result: ChatMessage = { ...message, parts };
    if (compactedTextPartIndices.length > 0) {
      result.metadata = {
        ...(result.metadata ?? {}),
        compactedTextParts: compactedTextPartIndices
      };
    }
    return result;
  }

  /**
   * Shared helper for finding a tool part by toolCallId and applying an update.
   * Handles both streaming (in-memory) and persisted (SQLite) messages.
   *
   * Checks _streamingMessage first (tool results/approvals can arrive while
   * the AI is still streaming), then retries persisted messages with backoff
   * in case streaming completes between attempts.
   *
   * @param toolCallId - The tool call ID to find
   * @param callerName - Name for log messages (e.g. "_applyToolResult")
   * @param matchStates - Which tool part states to match
   * @param applyUpdate - Mutation to apply to the matched part (streaming: in-place, persisted: spread)
   * @returns true if the update was applied, false if not found or state didn't match
   */
  private async _findAndUpdateToolPart(
    toolCallId: string,
    callerName: string,
    matchStates: string[],
    applyUpdate: (part: Record<string, unknown>) => Record<string, unknown>
  ): Promise<boolean> {
    // Find the message containing this tool call.
    // Check streaming message first (in-memory, not yet persisted), then
    // retry persisted messages with backoff.
    let message: ChatMessage | undefined;

    if (this._streamingMessage) {
      for (const part of this._streamingMessage.parts) {
        if ("toolCallId" in part && part.toolCallId === toolCallId) {
          message = this._streamingMessage;
          break;
        }
      }
    }

    if (!message) {
      for (let attempt = 0; attempt < 10; attempt++) {
        message = this._findMessageByToolCallId(toolCallId);
        if (message) break;
        await new Promise((resolve) => setTimeout(resolve, 100));
      }
    }

    if (!message) {
      console.warn(
        `[AIChatAgent] ${callerName}: Could not find message with toolCallId ${toolCallId} after retries`
      );
      return false;
    }

    const isStreamingMessage = message === this._streamingMessage;
    let updated = false;

    if (isStreamingMessage) {
      // Update in place -- the message will be persisted when streaming completes
      for (const part of message.parts) {
        if (
          "toolCallId" in part &&
          part.toolCallId === toolCallId &&
          "state" in part &&
          matchStates.includes(part.state as string)
        ) {
          const applied = applyUpdate(part as Record<string, unknown>);
          Object.assign(part, applied);
          updated = true;
          break;
        }
      }
    } else {
      // For persisted messages, create updated parts immutably
      const updatedParts = message.parts.map((part) => {
        if (
          "toolCallId" in part &&
          part.toolCallId === toolCallId &&
          "state" in part &&
          matchStates.includes(part.state as string)
        ) {
          updated = true;
          return applyUpdate(part as Record<string, unknown>);
        }
        return part;
      }) as ChatMessage["parts"];

      if (updated) {
        const updatedMessage: ChatMessage = this._sanitizeMessageForPersistence(
          { ...message, parts: updatedParts }
        );
        const safe = this._enforceRowSizeLimit(updatedMessage);
        const json = JSON.stringify(safe);

        this.sql`
          update cf_ai_chat_agent_messages 
          set message = ${json}
          where id = ${message.id}
        `;
        this._persistedMessageCache.set(message.id, json);

        const persisted = this._loadMessagesFromDb();
        this.messages = autoTransformMessages(persisted);
      }
    }

    if (!updated) {
      console.warn(
        `[AIChatAgent] ${callerName}: Tool part with toolCallId ${toolCallId} not in expected state (expected: ${matchStates.join("|")})`
      );
      return false;
    }

    // Broadcast the update to all clients.
    // For persisted messages, re-fetch the latest state from this.messages.
    // For streaming messages, broadcast the in-memory snapshot so clients
    // get immediate confirmation that the tool result/approval was applied.
    if (isStreamingMessage) {
      this._broadcastChatMessage({
        type: MessageType.CF_AGENT_MESSAGE_UPDATED,
        message
      });
    } else {
      const broadcastMessage = this._findMessageByToolCallId(toolCallId);
      if (broadcastMessage) {
        this._broadcastChatMessage({
          type: MessageType.CF_AGENT_MESSAGE_UPDATED,
          message: broadcastMessage
        });
      }
    }

    return true;
  }

  /**
   * Applies a tool result to an existing assistant message.
   * This is used when the client sends CF_AGENT_TOOL_RESULT for client-side tools.
   * The server is the source of truth, so we update the message here and broadcast
   * the update to all clients.
   *
   * @param toolCallId - The tool call ID this result is for
   * @param _toolName - The name of the tool (unused, kept for API compat)
   * @param output - The output from the tool execution
   * @param overrideState - Optional state override ("output-error" to signal denial/failure)
   * @param errorText - Error message when overrideState is "output-error"
   * @returns true if the result was applied, false if the message was not found
   */
  private async _applyToolResult(
    toolCallId: string,
    _toolName: string,
    output: unknown,
    overrideState?: "output-error",
    errorText?: string
  ): Promise<boolean> {
    return this._findAndUpdateToolPart(
      toolCallId,
      "_applyToolResult",
      ["input-available", "approval-requested", "approval-responded"],
      (part) => ({
        ...part,
        ...(overrideState === "output-error"
          ? {
              state: "output-error",
              errorText: errorText ?? "Tool execution denied by user"
            }
          : { state: "output-available", output, preliminary: false })
      })
    );
  }

  private async _streamSSEReply(
    id: string,
    streamId: string,
    reader: ReadableStreamDefaultReader<Uint8Array>,
    message: ChatMessage,
    streamCompleted: { value: boolean },
    continuation = false,
    abortSignal?: AbortSignal
  ) {
    streamCompleted.value = false;

    // Cancel the reader when the abort signal fires (e.g. client pressed stop).
    // This ensures we stop broadcasting chunks even if the underlying stream
    // hasn't been connected to the abort signal (e.g. user forgot to pass it
    // to streamText).
    if (abortSignal && !abortSignal.aborted) {
      abortSignal.addEventListener(
        "abort",
        () => {
          reader.cancel().catch(() => {});
        },
        { once: true }
      );
    }

    while (true) {
      if (abortSignal?.aborted) break;
      let readResult: ReadableStreamReadResult<Uint8Array>;
      try {
        readResult = await reader.read();
      } catch {
        // reader.read() throws after cancel() — treat as abort
        break;
      }
      const { done, value } = readResult;
      if (done) {
        this._completeStream(streamId);
        streamCompleted.value = true;
        this._broadcastChatMessage({
          body: "",
          done: true,
          id,
          type: MessageType.CF_AGENT_USE_CHAT_RESPONSE,
          ...(continuation && { continuation: true })
        });
        break;
      }

      const chunk = decoder.decode(value);
      const lines = chunk.split("\n");

      for (const line of lines) {
        if (line.startsWith("data: ") && line !== "data: [DONE]") {
          try {
            const data: UIMessageChunk = JSON.parse(line.slice(6));

            // Delegate message building to the shared parser.
            // It handles: text, reasoning, file, source, tool lifecycle,
            // step boundaries — all the part types needed for UIMessage.
            const handled = applyChunkToParts(message.parts, data);

            // When a tool enters approval-requested state, the stream is
            // paused waiting for user approval. Persist the streaming message
            // immediately so the approval UI survives page refresh. Without
            // this, a refresh would reload from SQLite where the tool part
            // is still in input-available state, showing "Running..." instead
            // of the Approve/Reject buttons.
            if (
              data.type === "tool-approval-request" &&
              this._streamingMessage
            ) {
              // Persist directly to SQLite without broadcasting.
              // The client already has this data from the SSE stream —
              // broadcasting would cause the approval UI to render twice.
              // We only need the SQL write so the state survives page refresh.
              const snapshot: ChatMessage = {
                ...this._streamingMessage,
                parts: [...this._streamingMessage.parts]
              };
              const sanitized = this._sanitizeMessageForPersistence(snapshot);
              const json = JSON.stringify(sanitized);
              this.sql`
                INSERT INTO cf_ai_chat_agent_messages (id, message)
                VALUES (${sanitized.id}, ${json})
                ON CONFLICT(id) DO UPDATE SET message = excluded.message
              `;
              // Track that we persisted early so stream completion can update
              // in place rather than appending a duplicate.
              this._approvalPersistedMessageId = sanitized.id;
            }

            // Cross-message tool output fallback:
            // When a tool with needsApproval is approved, the continuation
            // stream emits tool-output-available/tool-output-error for a
            // tool call that lives in a *previous* assistant message.
            // applyChunkToParts only searches the current message's parts,
            // so the update is silently skipped. Fall back to searching
            // this.messages and update the persisted message directly.
            // Note: checked independently of `handled` — applyChunkToParts
            // returns true for recognized chunk types even when it cannot
            // find the target part, so `handled` is not a reliable signal.
            if (
              (data.type === "tool-output-available" ||
                data.type === "tool-output-error") &&
              data.toolCallId
            ) {
              const foundInCurrentMessage = message.parts.some(
                (p) => "toolCallId" in p && p.toolCallId === data.toolCallId
              );
              if (!foundInCurrentMessage) {
                if (data.type === "tool-output-available") {
                  this._findAndUpdateToolPart(
                    data.toolCallId,
                    "_streamSSEReply",
                    [
                      "input-available",
                      "input-streaming",
                      "approval-responded",
                      "approval-requested"
                    ],
                    (part) => ({
                      ...part,
                      state: "output-available",
                      output: data.output,
                      ...(data.preliminary !== undefined && {
                        preliminary: data.preliminary
                      })
                    })
                  );
                } else {
                  this._findAndUpdateToolPart(
                    data.toolCallId,
                    "_streamSSEReply",
                    [
                      "input-available",
                      "input-streaming",
                      "approval-responded",
                      "approval-requested"
                    ],
                    (part) => ({
                      ...part,
                      state: "output-error",
                      errorText: data.errorText
                    })
                  );
                }
              }
            }

            // Handle server-specific chunk types not covered by the shared parser
            if (!handled) {
              switch (data.type) {
                case "start": {
                  if (data.messageId != null) {
                    message.id = data.messageId;
                  }
                  if (data.messageMetadata != null) {
                    message.metadata = message.metadata
                      ? { ...message.metadata, ...data.messageMetadata }
                      : data.messageMetadata;
                  }
                  break;
                }
                case "finish":
                case "message-metadata": {
                  if (data.messageMetadata != null) {
                    message.metadata = message.metadata
                      ? { ...message.metadata, ...data.messageMetadata }
                      : data.messageMetadata;
                  }
                  break;
                }
                case "finish-step": {
                  // No-op for message building (shared parser handles step-start)
                  break;
                }
                case "error": {
                  this._broadcastChatMessage({
                    error: true,
                    body: data.errorText ?? JSON.stringify(data),
                    done: false,
                    id,
                    type: MessageType.CF_AGENT_USE_CHAT_RESPONSE
                  });
                  break;
                }
              }
            }

            // Convert internal AI SDK stream events to valid UIMessageStreamPart format.
            // The "finish" event with "finishReason" is an internal LanguageModelV3StreamPart,
            // not a UIMessageStreamPart (which expects "messageMetadata" instead).
            // See: https://github.com/cloudflare/agents/issues/677
            let eventToSend: unknown = data;
            if (data.type === "finish" && "finishReason" in data) {
              const { finishReason, ...rest } = data as {
                finishReason: string;
                [key: string]: unknown;
              };
              eventToSend = {
                ...rest,
                type: "finish",
                messageMetadata: { finishReason }
              };
            }

            // Store chunk for replay and broadcast to clients
            const chunkBody = JSON.stringify(eventToSend);
            this._storeStreamChunk(streamId, chunkBody);
            this._broadcastChatMessage({
              body: chunkBody,
              done: false,
              id,
              type: MessageType.CF_AGENT_USE_CHAT_RESPONSE,
              ...(continuation && { continuation: true })
            });
          } catch (_error) {
            // Skip malformed JSON lines silently
          }
        }
      }
    }

    // If we exited due to abort, send a done signal so clients know the stream ended
    if (!streamCompleted.value) {
      console.warn(
        "[AIChatAgent] Stream was still active when cancel was received. " +
          "Pass options.abortSignal to streamText() in your onChatMessage() " +
          "to cancel the upstream LLM call and avoid wasted work."
      );
      this._completeStream(streamId);
      streamCompleted.value = true;
      this._broadcastChatMessage({
        body: "",
        done: true,
        id,
        type: MessageType.CF_AGENT_USE_CHAT_RESPONSE,
        ...(continuation && { continuation: true })
      });
    }
  }

  // Handle plain text responses (e.g., from generateText)
  private async _sendPlaintextReply(
    id: string,
    streamId: string,
    reader: ReadableStreamDefaultReader<Uint8Array>,
    message: ChatMessage,
    streamCompleted: { value: boolean },
    continuation = false,
    abortSignal?: AbortSignal
  ) {
    // if not AI SDK SSE format, we need to inject text-start and text-end events ourselves
    this._broadcastTextEvent(
      streamId,
      { type: "text-start", id },
      continuation
    );

    // Use a single text part and accumulate into it, so the persisted message
    // has one text part regardless of how many network chunks the response spans.
    const textPart: TextUIPart = { type: "text", text: "", state: "streaming" };
    message.parts.push(textPart);

    // Cancel the reader when the abort signal fires
    if (abortSignal && !abortSignal.aborted) {
      abortSignal.addEventListener(
        "abort",
        () => {
          reader.cancel().catch(() => {});
        },
        { once: true }
      );
    }

    while (true) {
      if (abortSignal?.aborted) break;
      let readResult: ReadableStreamReadResult<Uint8Array>;
      try {
        readResult = await reader.read();
      } catch {
        // reader.read() throws after cancel() — treat as abort
        break;
      }
      const { done, value } = readResult;
      if (done) {
        textPart.state = "done";

        this._broadcastTextEvent(
          streamId,
          { type: "text-end", id },
          continuation
        );

        // Mark the stream as completed
        this._completeStream(streamId);
        streamCompleted.value = true;
        // Send final completion signal
        this._broadcastChatMessage({
          body: "",
          done: true,
          id,
          type: MessageType.CF_AGENT_USE_CHAT_RESPONSE,
          ...(continuation && { continuation: true })
        });
        break;
      }

      const chunk = decoder.decode(value);

      // Accumulate into the single text part to preserve exact formatting
      if (chunk.length > 0) {
        textPart.text += chunk;
        this._broadcastTextEvent(
          streamId,
          { type: "text-delta", id, delta: chunk },
          continuation
        );
      }
    }

    // If we exited due to abort, send a done signal so clients know the stream ended
    if (!streamCompleted.value) {
      console.warn(
        "[AIChatAgent] Stream was still active when cancel was received. " +
          "Pass options.abortSignal to streamText() in your onChatMessage() " +
          "to cancel the upstream LLM call and avoid wasted work."
      );
      textPart.state = "done";
      this._broadcastTextEvent(
        streamId,
        { type: "text-end", id },
        continuation
      );
      this._completeStream(streamId);
      streamCompleted.value = true;
      this._broadcastChatMessage({
        body: "",
        done: true,
        id,
        type: MessageType.CF_AGENT_USE_CHAT_RESPONSE,
        ...(continuation && { continuation: true })
      });
    }
  }

  /**
   * Applies a tool approval response from the client, updating the persisted message.
   * This is called when the client sends CF_AGENT_TOOL_APPROVAL for tools with needsApproval.
   *
   * - approved=true transitions to approval-responded
   * - approved=false transitions to output-denied so convertToModelMessages
   *   emits a tool_result for providers (e.g. Anthropic) that require it.
   *
   * @param toolCallId - The tool call ID this approval is for
   * @param approved - Whether the tool execution was approved
   * @returns true if the approval was applied, false if the message was not found
   */
  private async _applyToolApproval(
    toolCallId: string,
    approved: boolean
  ): Promise<boolean> {
    return this._findAndUpdateToolPart(
      toolCallId,
      "_applyToolApproval",
      ["input-available", "approval-requested"],
      (part) => ({
        ...part,
        state: approved ? "approval-responded" : "output-denied",
        // Merge with existing approval data to preserve the id field.
        // convertToModelMessages needs approval.id to produce a valid
        // tool-approval-request content part with approvalId.
        approval: {
          ...(part.approval as Record<string, unknown> | undefined),
          approved
        }
      })
    );
  }

  private async _reply(
    id: string,
    response: Response,
    excludeBroadcastIds: string[] = [],
    options: { continuation?: boolean; chatMessageId?: string } = {}
  ) {
    const { continuation = false, chatMessageId } = options;
    // Look up the abort signal for this request so we can cancel the reader
    // loop if the client sends a cancel message. This is a safety net —
    // users should also pass abortSignal to streamText for proper cancellation.
    const abortSignal = chatMessageId
      ? this._chatMessageAbortControllers.get(chatMessageId)?.signal
      : undefined;

    // Keep the DO alive during streaming to prevent idle eviction
    return this.keepAliveWhile(() =>
      this._tryCatchChat(async () => {
        if (!response.body) {
          // Send empty response if no body
          this._broadcastChatMessage({
            body: "",
            done: true,
            id,
            type: MessageType.CF_AGENT_USE_CHAT_RESPONSE,
            ...(continuation && { continuation: true })
          });
          return;
        }

        // Start tracking this stream for resumability
        const streamId = this._startStream(id);

        const reader = response.body.getReader();

        // Parsing state adapted from:
        // https://github.com/vercel/ai/blob/main/packages/ai/src/ui-message-stream/ui-message-chunks.ts#L295
        const message: ChatMessage = {
          id: `assistant_${Date.now()}_${Math.random().toString(36).slice(2, 11)}`, // default
          role: "assistant",
          parts: []
        };
        // Track the streaming message so tool results can be applied before persistence
        this._streamingMessage = message;
        // Set up completion promise for tool continuation to wait on
        this._streamCompletionPromise = new Promise((resolve) => {
          this._streamCompletionResolve = resolve;
        });

        // Determine response format based on content-type
        const contentType = response.headers.get("content-type") || "";
        const isSSE = contentType.includes("text/event-stream"); // AI SDK v5 SSE format
        const streamCompleted = { value: false };
        // Capture before try so it's available after finally.
        // _approvalPersistedMessageId is set inside _streamSSEReply when a
        // tool enters approval-requested state and the message is persisted early.
        let earlyPersistedId: string | null = null;

        try {
          if (isSSE) {
            // AI SDK v5 SSE format
            await this._streamSSEReply(
              id,
              streamId,
              reader,
              message,
              streamCompleted,
              continuation,
              abortSignal
            );
          } else {
            await this._sendPlaintextReply(
              id,
              streamId,
              reader,
              message,
              streamCompleted,
              continuation,
              abortSignal
            );
          }
        } catch (error) {
          // Mark stream as error if not already completed
          if (!streamCompleted.value) {
            this._markStreamError(streamId);
            // Notify clients of the error
            this._broadcastChatMessage({
              body: error instanceof Error ? error.message : "Stream error",
              done: true,
              error: true,
              id,
              type: MessageType.CF_AGENT_USE_CHAT_RESPONSE,
              ...(continuation && { continuation: true })
            });
            this._emit("message:error", {
              error: error instanceof Error ? error.message : String(error)
            });
          }
          throw error;
        } finally {
          reader.releaseLock();

          // Always clear the streaming message reference and resolve completion
          // promise, even on error. Without this, tool continuations waiting on
          // _streamCompletionPromise would hang forever after a stream error.
          this._streamingMessage = null;
          // Capture and clear early-persist tracking. The persistence block
          // after the finally uses the local to update in place.
          earlyPersistedId = this._approvalPersistedMessageId;
          this._approvalPersistedMessageId = null;
          if (this._streamCompletionResolve) {
            this._streamCompletionResolve();
            this._streamCompletionResolve = null;
            this._streamCompletionPromise = null;
          }

          // Framework-level cleanup: always remove abort controller.
          // Only emit observability on success (not on error path).
          if (chatMessageId) {
            this._removeAbortController(chatMessageId);
            if (streamCompleted.value) {
              this._emit("message:response");
            }
          }
        }

        if (message.parts.length > 0) {
          if (earlyPersistedId) {
            // Message already exists in this.messages from the early persist.
            // Update it in place with the final streaming state.
            // Note: early-persisted messages come from the initial stream
            // (before approval), which is never a continuation. The
            // continuation stream starts fresh after approval, so
            // earlyPersistedId will always be null for continuations.
            const updatedMessages = this.messages.map((msg) =>
              msg.id === earlyPersistedId ? message : msg
            );
            await this.persistMessages(updatedMessages, excludeBroadcastIds);
          } else if (continuation) {
            // Find the last assistant message and append parts to it
            let lastAssistantIdx = -1;
            for (let i = this.messages.length - 1; i >= 0; i--) {
              if (this.messages[i].role === "assistant") {
                lastAssistantIdx = i;
                break;
              }
            }
            if (lastAssistantIdx >= 0) {
              const lastAssistant = this.messages[lastAssistantIdx];
              const mergedMessage: ChatMessage = {
                ...lastAssistant,
                parts: [...lastAssistant.parts, ...message.parts]
              };
              const updatedMessages = [...this.messages];
              updatedMessages[lastAssistantIdx] = mergedMessage;
              await this.persistMessages(updatedMessages, excludeBroadcastIds);
            } else {
              // No assistant message to append to, create new one
              await this.persistMessages(
                [...this.messages, message],
                excludeBroadcastIds
              );
            }
          } else {
            await this.persistMessages(
              [...this.messages, message],
              excludeBroadcastIds
            );
          }
        }
      })
    );
  }

  /**
   * For the given message id, look up its associated AbortController
   * If the AbortController does not exist, create and store one in memory
   *
   * returns the AbortSignal associated with the AbortController
   */
  private _getAbortSignal(id: string): AbortSignal | undefined {
    // Defensive check, since we're coercing message types at the moment
    if (typeof id !== "string") {
      return undefined;
    }

    if (!this._chatMessageAbortControllers.has(id)) {
      this._chatMessageAbortControllers.set(id, new AbortController());
    }

    return this._chatMessageAbortControllers.get(id)?.signal;
  }

  /**
   * Remove an abort controller from the cache of pending message responses
   */
  private _removeAbortController(id: string) {
    this._chatMessageAbortControllers.delete(id);
  }

  /**
   * Propagate an abort signal for any requests associated with the given message id
   */
  private _cancelChatRequest(id: string) {
    if (this._chatMessageAbortControllers.has(id)) {
      const abortController = this._chatMessageAbortControllers.get(id);
      abortController?.abort();
    }
  }

  /**
   * Abort all pending requests and clear the cache of AbortControllers
   */
  private _destroyAbortControllers() {
    for (const controller of this._chatMessageAbortControllers.values()) {
      controller?.abort();
    }
    this._chatMessageAbortControllers.clear();
  }

  /**
   * When the DO is destroyed, cancel all pending requests and clean up resources
   */
  async destroy() {
    this._destroyAbortControllers();
    this._resumableStream.destroy();
    await super.destroy();
  }
}