branch:
index.ts
87001 bytesRaw
import type {
UIMessage as ChatMessage,
JSONSchema7,
ProviderMetadata,
ReasoningUIPart,
StreamTextOnFinishCallback,
TextUIPart,
Tool,
ToolSet,
UIMessageChunk
} from "ai";
import { tool, jsonSchema } from "ai";
import {
Agent,
__DO_NOT_USE_WILL_BREAK__agentContext as agentContext,
type AgentContext,
type Connection,
type ConnectionContext,
type WSMessage
} from "agents";
import {
MessageType,
type IncomingMessage,
type OutgoingMessage
} from "./types";
import { autoTransformMessages } from "./ai-chat-v5-migration";
import { applyChunkToParts } from "./message-builder";
import { ResumableStream } from "./resumable-stream";
import { nanoid } from "nanoid";
/** Shared encoder for UTF-8 byte length measurement */
const textEncoder = new TextEncoder();
/**
* Validates that a parsed message has the minimum required structure.
* Returns false for messages that would cause runtime errors downstream
* (e.g. in convertToModelMessages or the UI layer).
*
* Checks:
* - `id` is a non-empty string
* - `role` is one of the valid roles
* - `parts` is an array (may be empty — the AI SDK enforces nonempty
* on incoming messages, but we are lenient on persisted data)
*/
function isValidMessageStructure(msg: unknown): msg is ChatMessage {
if (typeof msg !== "object" || msg === null) return false;
const m = msg as Record<string, unknown>;
if (typeof m.id !== "string" || m.id.length === 0) return false;
if (m.role !== "user" && m.role !== "assistant" && m.role !== "system") {
return false;
}
if (!Array.isArray(m.parts)) return false;
return true;
}
/**
* Schema for a client-defined tool sent from the browser.
* These tools are executed on the client, not the server.
*
* **For most apps**, define tools on the server with `tool()` from `"ai"` —
* you get full Zod type safety, server-side execution, and simpler code.
* Use `onToolCall` in `useAgentChat` for tools that need client-side execution.
*
* **For SDKs and platforms** where the tool surface is determined dynamically
* by the embedding application at runtime, client tool schemas let the
* client register tools the server does not know about at deploy time.
*
* Note: Uses `parameters` (JSONSchema7) rather than AI SDK's `inputSchema`
* because this is the wire format. Zod schemas cannot be serialized.
*/
export type ClientToolSchema = {
/** Unique name for the tool */
name: string;
/** Human-readable description of what the tool does */
description?: Tool["description"];
/** JSON Schema defining the tool's input parameters */
parameters?: JSONSchema7;
};
/**
* Options passed to the onChatMessage handler.
*/
export type OnChatMessageOptions = {
/**
* Unique ID for this chat message exchange.
*
* For initial user messages this is the client-generated ID from the
* `CF_AGENT_USE_CHAT_REQUEST` WebSocket frame. For tool continuations
* (auto-continue after client tool results or approvals) this is a
* server-generated ID.
*/
requestId: string;
/** AbortSignal for cancelling the request */
abortSignal?: AbortSignal;
/**
* Tool schemas sent from the client for dynamic tool registration.
* These represent tools that will be executed on the client side.
* Use `createToolsFromClientSchemas()` to convert these to AI SDK tool format.
*
* **For most apps**, you do not need this — define tools on the server with
* `tool()` from `"ai"` and use `onToolCall` for client-side execution.
*
* **For SDKs and platforms** where tools are defined dynamically by the
* client at runtime and the server does not know the tool surface ahead
* of time, this field carries the client-provided tool schemas.
*/
clientTools?: ClientToolSchema[];
/**
* Custom body data sent from the client via `prepareSendMessagesRequest`
* or the AI SDK's `body` option in `sendMessage`.
*
* Contains all fields from the request body except `messages` and `clientTools`,
* which are handled separately.
*
* During tool continuations (auto-continue after client tool results), this
* contains the body from the most recent chat request. The value is persisted
* to SQLite so it survives Durable Object hibernation. It is cleared when the
* chat is cleared via `CF_AGENT_CHAT_CLEAR`.
*/
body?: Record<string, unknown>;
};
/**
* Converts client tool schemas to AI SDK tool format.
*
* These tools have no `execute` function — when the AI model calls them,
* the tool call is sent back to the client for execution.
*
* **For most apps**, define tools on the server with `tool()` from `"ai"`
* for full Zod type safety. This helper is intended for SDK/platform use
* cases where the tool surface is determined dynamically by the client.
*
* @param clientTools - Array of tool schemas from the client
* @returns Record of AI SDK tools that can be spread into your tools object
*
* @example
* ```typescript
* // In onChatMessage:
* const tools = {
* ...createToolsFromClientSchemas(options.clientTools),
* // server-defined tools with execute:
* myServerTool: tool({ ... }),
* };
* ```
*/
export function createToolsFromClientSchemas(
clientTools?: ClientToolSchema[]
): ToolSet {
if (!clientTools || clientTools.length === 0) {
return {};
}
// Check for duplicate tool names
const seenNames = new Set<string>();
for (const t of clientTools) {
if (seenNames.has(t.name)) {
console.warn(
`[createToolsFromClientSchemas] Duplicate tool name "${t.name}" found. Later definitions will override earlier ones.`
);
}
seenNames.add(t.name);
}
return Object.fromEntries(
clientTools.map((t) => [
t.name,
tool({
description: t.description ?? "",
inputSchema: jsonSchema(t.parameters ?? { type: "object" })
// No execute function = tool call is sent back to client
})
])
);
}
const decoder = new TextDecoder();
/**
* Extension of Agent with built-in chat capabilities
* @template Env Environment type containing bindings
*/
export class AIChatAgent<
Env extends Cloudflare.Env = Cloudflare.Env,
State = unknown
> extends Agent<Env, State> {
/**
* Map of message `id`s to `AbortController`s
* useful to propagate request cancellation signals for any external calls made by the agent
*/
private _chatMessageAbortControllers: Map<string, AbortController>;
/**
* Resumable stream manager -- handles chunk buffering, persistence, and replay.
* @internal Protected for testing purposes.
*/
protected _resumableStream!: ResumableStream;
/**
* The message currently being streamed. Used to apply tool results
* before the message is persisted.
* @internal
*/
private _streamingMessage: ChatMessage | null = null;
/**
* Tracks the ID of a streaming message that was persisted early due to
* a tool entering approval-requested state. When set, stream completion
* updates the existing persisted message instead of appending a new one.
* @internal
*/
private _approvalPersistedMessageId: string | null = null;
/**
* Promise that resolves when the current stream completes.
* Used to wait for message persistence before continuing after tool results.
* @internal
*/
private _streamCompletionPromise: Promise<void> | null = null;
private _streamCompletionResolve: (() => void) | null = null;
/**
* Set of connection IDs that are pending stream resume.
* These connections have received CF_AGENT_STREAM_RESUMING but haven't sent ACK yet.
* They should be excluded from live stream broadcasts until they ACK.
* @internal
*/
private _pendingResumeConnections: Set<string> = new Set();
/**
* Client tool schemas from the most recent chat request.
* Stored so they can be passed to onChatMessage during tool continuations.
* @internal
*/
private _lastClientTools: ClientToolSchema[] | undefined;
/**
* Custom body data from the most recent chat request.
* Stored so it can be passed to onChatMessage during tool continuations.
* @internal
*/
private _lastBody: Record<string, unknown> | undefined;
/**
* Cache of last-persisted JSON for each message ID.
* Used for incremental persistence: skip SQL writes for unchanged messages.
* Lost on hibernation, repopulated from SQLite on wake.
* @internal
*/
private _persistedMessageCache: Map<string, string> = new Map();
/** Maximum serialized message size before compaction (bytes). 1.8MB with headroom below SQLite's 2MB limit. */
private static ROW_MAX_BYTES = 1_800_000;
/** Measure UTF-8 byte length of a string (accurate for SQLite row limits). */
private static _byteLength(s: string): number {
return textEncoder.encode(s).byteLength;
}
/**
* Maximum number of messages to keep in SQLite storage.
* When the conversation exceeds this limit, oldest messages are deleted
* after each persist. Set to `undefined` (default) for no limit.
*
* This controls storage only — it does not affect what's sent to the LLM.
* Use `pruneMessages()` from the AI SDK in your `onChatMessage` to control
* LLM context separately.
*
* @example
* ```typescript
* class MyAgent extends AIChatAgent<Env> {
* maxPersistedMessages = 100; // Keep last 100 messages in storage
* }
* ```
*/
maxPersistedMessages: number | undefined = undefined;
/**
* When enabled, waits for all MCP server connections to be ready before
* calling `onChatMessage`. This prevents the race condition where
* `getAITools()` returns an incomplete set because connections are still
* restoring after Durable Object hibernation.
*
* - `false` (default) — non-blocking; `onChatMessage` runs immediately.
* - `true` — waits indefinitely for all connections to settle.
* - `{ timeout: number }` — waits up to `timeout` milliseconds.
*
* For lower-level control, call `this.mcp.waitForConnections()` directly
* inside your `onChatMessage` instead.
*
* @example
* ```typescript
* class MyAgent extends AIChatAgent<Env> {
* waitForMcpConnections = true;
* }
* ```
*
* @example
* ```typescript
* class MyAgent extends AIChatAgent<Env> {
* waitForMcpConnections = { timeout: 10_000 };
* }
* ```
*/
waitForMcpConnections: boolean | { timeout: number } = { timeout: 10_000 };
/** Array of chat messages for the current conversation */
messages: ChatMessage[];
constructor(ctx: AgentContext, env: Env) {
super(ctx, env);
this.sql`create table if not exists cf_ai_chat_agent_messages (
id text primary key,
message text not null,
created_at datetime default current_timestamp
)`;
// Key-value table for request context that must survive hibernation
// (e.g., custom body fields, client tools from the last chat request).
this.sql`create table if not exists cf_ai_chat_request_context (
key text primary key,
value text not null
)`;
// Restore request context from SQLite (survives hibernation)
this._restoreRequestContext();
// Initialize resumable stream manager (creates its own tables + restores state)
this._resumableStream = new ResumableStream(this.sql.bind(this));
// Load messages and automatically transform them to v5 format.
// Note: _loadMessagesFromDb() runs structural validation which requires
// `parts` to be an array. Legacy v4 messages (with `content` instead of
// `parts`) would fail this check — but that's fine because autoTransformMessages
// already migrated them on a previous load, and persistMessages wrote them back.
// Any message still without `parts` at this point is genuinely corrupt.
const rawMessages = this._loadMessagesFromDb();
// Automatic migration following https://jhak.im/blog/ai-sdk-migration-handling-previously-saved-messages
this.messages = autoTransformMessages(rawMessages);
this._chatMessageAbortControllers = new Map();
const _onConnect = this.onConnect.bind(this);
this.onConnect = async (connection: Connection, ctx: ConnectionContext) => {
// Notify client about active streams that can be resumed
if (this._resumableStream.hasActiveStream()) {
this._notifyStreamResuming(connection);
}
// Call consumer's onConnect
return _onConnect(connection, ctx);
};
// Wrap onClose to clean up pending resume connections
const _onClose = this.onClose.bind(this);
this.onClose = async (
connection: Connection,
code: number,
reason: string,
wasClean: boolean
) => {
// Clean up pending resume state for this connection
this._pendingResumeConnections.delete(connection.id);
// Call consumer's onClose
return _onClose(connection, code, reason, wasClean);
};
// Wrap onMessage
const _onMessage = this.onMessage.bind(this);
this.onMessage = async (connection: Connection, message: WSMessage) => {
// Handle AIChatAgent's internal messages first
if (typeof message === "string") {
let data: IncomingMessage;
try {
data = JSON.parse(message) as IncomingMessage;
} catch (_error) {
// Not JSON, forward to consumer
return _onMessage(connection, message);
}
// Handle chat request
if (
data.type === MessageType.CF_AGENT_USE_CHAT_REQUEST &&
data.init.method === "POST"
) {
// Optionally wait for in-flight MCP connections to settle (e.g. after hibernation restore)
// so that getAITools() returns the full set of tools in onChatMessage
if (this.waitForMcpConnections) {
const timeout =
typeof this.waitForMcpConnections === "object"
? this.waitForMcpConnections.timeout
: undefined;
await this.mcp.waitForConnections(
timeout != null ? { timeout } : undefined
);
}
const { body } = data.init;
if (!body) {
console.warn(
"[AIChatAgent] Received chat request with empty body, ignoring"
);
return;
}
let parsed: Record<string, unknown>;
try {
parsed = JSON.parse(body as string);
} catch (_parseError) {
console.warn(
"[AIChatAgent] Received chat request with invalid JSON body, ignoring"
);
return;
}
const {
messages,
clientTools,
trigger: _trigger,
...customBody
} = parsed as {
messages: ChatMessage[];
clientTools?: ClientToolSchema[];
trigger?: string;
[key: string]: unknown;
};
// Store client tools and body for use during tool continuations
this._lastClientTools = clientTools?.length ? clientTools : undefined;
this._lastBody =
Object.keys(customBody).length > 0 ? customBody : undefined;
this._persistRequestContext();
// Automatically transform any incoming messages
const transformedMessages = autoTransformMessages(messages);
this._broadcastChatMessage(
{
messages: transformedMessages,
type: MessageType.CF_AGENT_CHAT_MESSAGES
},
[connection.id]
);
await this.persistMessages(transformedMessages, [connection.id], {
_deleteStaleRows: true
});
this._emit("message:request");
const chatMessageId = data.id;
const abortSignal = this._getAbortSignal(chatMessageId);
return this._tryCatchChat(async () => {
// Wrap in agentContext.run() to propagate connection context to onChatMessage
// This ensures getCurrentAgent() returns the connection inside tool execute functions
return agentContext.run(
{ agent: this, connection, request: undefined, email: undefined },
async () => {
const response = await this.onChatMessage(
async (_finishResult) => {
// User-provided hook. Cleanup is now handled by _reply,
// so this is optional for the user to pass to streamText.
},
{
requestId: chatMessageId,
abortSignal,
clientTools,
body: this._lastBody
}
);
if (response) {
await this._reply(data.id, response, [connection.id], {
chatMessageId
});
} else {
console.warn(
`[AIChatAgent] onChatMessage returned no response for chatMessageId: ${chatMessageId}`
);
this._broadcastChatMessage(
{
body: "No response was generated by the agent.",
done: true,
id: data.id,
type: MessageType.CF_AGENT_USE_CHAT_RESPONSE
},
[connection.id]
);
}
}
);
});
}
// Handle clear chat
if (data.type === MessageType.CF_AGENT_CHAT_CLEAR) {
this._destroyAbortControllers();
this.sql`delete from cf_ai_chat_agent_messages`;
this._resumableStream.clearAll();
this._pendingResumeConnections.clear();
this._lastClientTools = undefined;
this._lastBody = undefined;
this._persistRequestContext();
this._persistedMessageCache.clear();
this.messages = [];
this._broadcastChatMessage(
{ type: MessageType.CF_AGENT_CHAT_CLEAR },
[connection.id]
);
this._emit("message:clear");
return;
}
// Handle message replacement
if (data.type === MessageType.CF_AGENT_CHAT_MESSAGES) {
const transformedMessages = autoTransformMessages(data.messages);
await this.persistMessages(transformedMessages, [connection.id]);
return;
}
// Handle request cancellation
if (data.type === MessageType.CF_AGENT_CHAT_REQUEST_CANCEL) {
this._cancelChatRequest(data.id);
this._emit("message:cancel", { requestId: data.id });
return;
}
// Handle client-initiated stream resume request.
// The client sends this after its message handler is registered,
// avoiding the race condition where CF_AGENT_STREAM_RESUMING sent
// in onConnect arrives before the client's handler is ready.
if (data.type === MessageType.CF_AGENT_STREAM_RESUME_REQUEST) {
if (this._resumableStream.hasActiveStream()) {
this._notifyStreamResuming(connection);
} else {
connection.send(
JSON.stringify({
type: MessageType.CF_AGENT_STREAM_RESUME_NONE
})
);
}
return;
}
// Handle stream resume acknowledgment
if (data.type === MessageType.CF_AGENT_STREAM_RESUME_ACK) {
this._pendingResumeConnections.delete(connection.id);
if (
this._resumableStream.hasActiveStream() &&
this._resumableStream.activeRequestId === data.id
) {
const orphanedStreamId = this._resumableStream.replayChunks(
connection,
this._resumableStream.activeRequestId
);
// If the stream was orphaned (restored from SQLite after
// hibernation with no live reader), reconstruct the partial
// assistant message from stored chunks and persist it so it
// survives further page refreshes.
if (orphanedStreamId) {
this._persistOrphanedStream(orphanedStreamId);
}
}
return;
}
// Handle client-side tool result
if (data.type === MessageType.CF_AGENT_TOOL_RESULT) {
const {
toolCallId,
toolName,
output,
state,
errorText,
autoContinue,
clientTools
} = data;
// Update cached client tools so subsequent continuations use the latest schemas
if (clientTools?.length) {
this._lastClientTools = clientTools as ClientToolSchema[];
this._persistRequestContext();
}
const overrideState =
state === "output-error" ? "output-error" : undefined;
this._emit("tool:result", { toolCallId, toolName });
// Apply the tool result
this._applyToolResult(
toolCallId,
toolName,
output,
overrideState,
errorText
).then((applied) => {
// Only auto-continue if client requested it (opt-in behavior)
// This mimics server-executed tool behavior where the LLM
// automatically continues after seeing tool results
if (applied && autoContinue) {
// Wait for the original stream to complete and message to be persisted
// before calling onChatMessage, so this.messages includes the tool result
const waitForStream = async () => {
if (this._streamCompletionPromise) {
await this._streamCompletionPromise;
} else {
// TODO: The completion promise can be null if the stream finished
// before the tool result arrived (race between stream end and tool
// apply). The 500ms fallback is a pragmatic workaround — consider
// a more deterministic signal (e.g. always setting the promise).
await new Promise((resolve) => setTimeout(resolve, 500));
}
};
waitForStream()
.then(() => {
const continuationId = nanoid();
const abortSignal = this._getAbortSignal(continuationId);
return this._tryCatchChat(async () => {
return agentContext.run(
{
agent: this,
connection,
request: undefined,
email: undefined
},
async () => {
const response = await this.onChatMessage(
async (_finishResult) => {
// User-provided hook. Cleanup handled by _reply.
},
{
requestId: continuationId,
abortSignal,
clientTools: clientTools ?? this._lastClientTools,
body: this._lastBody
}
);
if (response) {
// Pass continuation flag to merge parts into last assistant message
// Note: We pass an empty excludeBroadcastIds array because the sender
// NEEDS to receive the continuation stream. Unlike regular chat requests
// where aiFetch handles the response, tool continuations have no listener
// waiting - the client relies on the broadcast.
await this._reply(
continuationId,
response,
[], // Don't exclude sender - they need the continuation
{
continuation: true,
chatMessageId: continuationId
}
);
}
}
);
});
})
.catch((error) => {
console.error(
"[AIChatAgent] Tool continuation failed:",
error
);
});
}
});
return;
}
// Handle client-side tool approval response
if (data.type === MessageType.CF_AGENT_TOOL_APPROVAL) {
const { toolCallId, approved, autoContinue } = data;
this._emit("tool:approval", { toolCallId, approved });
this._applyToolApproval(toolCallId, approved).then((applied) => {
// Auto-continue for both approvals and rejections so the LLM
// sees the tool_result and can respond accordingly.
if (applied && autoContinue) {
const waitForStream = async () => {
if (this._streamCompletionPromise) {
await this._streamCompletionPromise;
} else {
await new Promise((resolve) => setTimeout(resolve, 500));
}
};
waitForStream()
.then(() => {
const continuationId = nanoid();
const abortSignal = this._getAbortSignal(continuationId);
return this._tryCatchChat(async () => {
return agentContext.run(
{
agent: this,
connection,
request: undefined,
email: undefined
},
async () => {
const response = await this.onChatMessage(
async (_finishResult) => {},
{
requestId: continuationId,
abortSignal,
clientTools: this._lastClientTools,
body: this._lastBody
}
);
if (response) {
await this._reply(continuationId, response, [], {
continuation: true,
chatMessageId: continuationId
});
}
}
);
});
})
.catch((error) => {
console.error(
"[AIChatAgent] Tool approval continuation failed:",
error
);
});
}
});
return;
}
}
// Forward unhandled messages to consumer's onMessage
return _onMessage(connection, message);
};
const _onRequest = this.onRequest.bind(this);
this.onRequest = async (request: Request) => {
return this._tryCatchChat(async () => {
const url = new URL(request.url);
if (url.pathname.split("/").pop() === "get-messages") {
return Response.json(this._loadMessagesFromDb());
}
return _onRequest(request);
});
};
}
/**
* Notify a connection about an active stream that can be resumed.
* The client should respond with CF_AGENT_STREAM_RESUME_ACK to receive chunks.
* @param connection - The WebSocket connection to notify
*/
private _notifyStreamResuming(connection: Connection) {
if (!this._resumableStream.hasActiveStream()) {
return;
}
// Add connection to pending set - they'll be excluded from live broadcasts
// until they send ACK to receive the full stream replay
this._pendingResumeConnections.add(connection.id);
// Notify client - they will send ACK when ready
connection.send(
JSON.stringify({
type: MessageType.CF_AGENT_STREAM_RESUMING,
id: this._resumableStream.activeRequestId
})
);
}
// ── Delegate methods for backward compatibility with tests ─────────
// These protected methods delegate to _resumableStream so existing
// test workers that call them directly continue to work.
/** @internal Delegate to _resumableStream */
protected get _activeStreamId(): string | null {
return this._resumableStream?.activeStreamId ?? null;
}
/** @internal Delegate to _resumableStream */
protected get _activeRequestId(): string | null {
return this._resumableStream?.activeRequestId ?? null;
}
/** @internal Delegate to _resumableStream */
protected _startStream(requestId: string): string {
return this._resumableStream.start(requestId);
}
/** @internal Delegate to _resumableStream */
protected _completeStream(streamId: string) {
this._resumableStream.complete(streamId);
this._pendingResumeConnections.clear();
}
/** @internal Delegate to _resumableStream */
protected _storeStreamChunk(streamId: string, body: string) {
this._resumableStream.storeChunk(streamId, body);
}
/** @internal Delegate to _resumableStream */
protected _flushChunkBuffer() {
this._resumableStream.flushBuffer();
}
/** @internal Delegate to _resumableStream */
protected _restoreActiveStream() {
this._resumableStream.restore();
}
/** @internal Delegate to _resumableStream */
protected _markStreamError(streamId: string) {
this._resumableStream.markError(streamId);
}
/**
* Reconstruct and persist a partial assistant message from an orphaned
* stream's stored chunks. Called when the DO wakes from hibernation and
* discovers an active stream with no live LLM reader.
*
* Replays each chunk body through `applyChunkToParts` to rebuild the
* message parts, then persists the result so it survives further refreshes.
* @internal
*/
private _persistOrphanedStream(streamId: string) {
const chunks = this._resumableStream.getStreamChunks(streamId);
if (!chunks.length) return;
const message: ChatMessage = {
id: `assistant_${Date.now()}_${Math.random().toString(36).slice(2, 11)}`,
role: "assistant",
parts: []
};
for (const chunk of chunks) {
try {
const data = JSON.parse(chunk.body);
// Capture message ID from the "start" event if present
if (data.type === "start" && data.messageId != null) {
message.id = data.messageId;
}
if (
(data.type === "start" ||
data.type === "finish" ||
data.type === "message-metadata") &&
data.messageMetadata != null
) {
message.metadata = message.metadata
? { ...message.metadata, ...data.messageMetadata }
: data.messageMetadata;
}
applyChunkToParts(message.parts, data);
} catch {
// Skip malformed chunk bodies
}
}
if (message.parts.length > 0) {
// Check if a message with this ID already exists (e.g., from an
// early persist during tool approval). Update in place if so.
const existingIdx = this.messages.findIndex((m) => m.id === message.id);
const updatedMessages =
existingIdx >= 0
? this.messages.map((m, i) => (i === existingIdx ? message : m))
: [...this.messages, message];
this.persistMessages(updatedMessages);
}
}
/**
* Restore _lastBody and _lastClientTools from SQLite.
* Called in the constructor so these values survive DO hibernation.
* @internal
*/
private _restoreRequestContext() {
const rows =
this.sql<{ key: string; value: string }>`
select key, value from cf_ai_chat_request_context
` || [];
for (const row of rows) {
try {
if (row.key === "lastBody") {
this._lastBody = JSON.parse(row.value);
} else if (row.key === "lastClientTools") {
this._lastClientTools = JSON.parse(row.value);
}
} catch {
// Corrupted row — ignore and let the next request overwrite it
}
}
}
/**
* Persist _lastBody and _lastClientTools to SQLite so they survive hibernation.
* Uses upsert (INSERT OR REPLACE) so repeated calls are safe.
* @internal
*/
private _persistRequestContext() {
// Persist or delete body
if (this._lastBody) {
this.sql`
insert or replace into cf_ai_chat_request_context (key, value)
values ('lastBody', ${JSON.stringify(this._lastBody)})
`;
} else {
this.sql`delete from cf_ai_chat_request_context where key = 'lastBody'`;
}
// Persist or delete client tools
if (this._lastClientTools) {
this.sql`
insert or replace into cf_ai_chat_request_context (key, value)
values ('lastClientTools', ${JSON.stringify(this._lastClientTools)})
`;
} else {
this
.sql`delete from cf_ai_chat_request_context where key = 'lastClientTools'`;
}
}
private _broadcastChatMessage(message: OutgoingMessage, exclude?: string[]) {
// Combine explicit exclusions with connections pending stream resume.
// Pending connections should not receive live stream chunks until they ACK,
// at which point they'll receive the full replay via _sendStreamChunks.
const allExclusions = [
...(exclude || []),
...this._pendingResumeConnections
];
this.broadcast(JSON.stringify(message), allExclusions);
}
/**
* Broadcasts a text event for non-SSE responses.
* This ensures plain text responses follow the AI SDK v5 stream protocol.
*
* @param streamId - The stream identifier for chunk storage
* @param event - The text event payload (text-start, text-delta with delta, or text-end)
* @param continuation - Whether this is a continuation of a previous stream
*/
private _broadcastTextEvent(
streamId: string,
event:
| { type: "text-start"; id: string }
| { type: "text-delta"; id: string; delta: string }
| { type: "text-end"; id: string },
continuation: boolean
) {
const body = JSON.stringify(event);
this._storeStreamChunk(streamId, body);
this._broadcastChatMessage({
body,
done: false,
id: event.id,
type: MessageType.CF_AGENT_USE_CHAT_RESPONSE,
...(continuation && { continuation: true })
});
}
private _loadMessagesFromDb(): ChatMessage[] {
const rows =
this.sql`select * from cf_ai_chat_agent_messages order by created_at` ||
[];
// Populate the persistence cache from DB so incremental persistence
// can skip SQL writes for messages already stored.
this._persistedMessageCache.clear();
return rows
.map((row) => {
try {
const messageStr = row.message as string;
const parsed = JSON.parse(messageStr) as ChatMessage;
// Structural validation: ensure required fields exist and have
// the correct types. This catches corrupted rows, manual tampering,
// or schema drift from older versions without crashing the agent.
if (!isValidMessageStructure(parsed)) {
console.warn(
`[AIChatAgent] Skipping invalid message ${row.id}: ` +
"missing or malformed id, role, or parts"
);
return null;
}
// Cache the raw JSON keyed by message ID
this._persistedMessageCache.set(parsed.id, messageStr);
return parsed;
} catch (error) {
console.error(`Failed to parse message ${row.id}:`, error);
return null;
}
})
.filter((msg): msg is ChatMessage => msg !== null);
}
private async _tryCatchChat<T>(fn: () => T | Promise<T>) {
try {
return await fn();
} catch (e) {
throw this.onError(e);
}
}
/**
* Handle incoming chat messages and generate a response
* @param onFinish Callback to be called when the response is finished
* @param options Options including abort signal and client-defined tools
* @returns Response to send to the client or undefined
*/
async onChatMessage(
// oxlint-disable-next-line eslint(no-unused-vars) -- params used by subclass overrides
onFinish: StreamTextOnFinishCallback<ToolSet>,
// oxlint-disable-next-line eslint(no-unused-vars) -- params used by subclass overrides
options?: OnChatMessageOptions
): Promise<Response | undefined> {
throw new Error(
"received a chat message, override onChatMessage and return a Response to send to the client"
);
}
/**
* Save messages on the server side
* @param messages Chat messages to save
*/
async saveMessages(messages: ChatMessage[]) {
await this.persistMessages(messages);
await this._tryCatchChat(async () => {
const requestId = nanoid();
const abortSignal = this._getAbortSignal(requestId);
const response = await this.onChatMessage(() => {}, {
requestId,
abortSignal,
clientTools: this._lastClientTools,
body: this._lastBody
});
if (response) this._reply(requestId, response);
});
}
async persistMessages(
messages: ChatMessage[],
excludeBroadcastIds: string[] = [],
/** @internal */
options?: { _deleteStaleRows?: boolean }
) {
// Merge incoming messages with existing server state to preserve tool outputs.
// This is critical for client-side tools: the client sends messages without
// tool outputs, but the server has them via _applyToolResult.
const mergedMessages = this._mergeIncomingWithServerState(messages);
// Persist only new or changed messages (incremental persistence).
// Compares serialized JSON against a cache of last-persisted versions.
for (const message of mergedMessages) {
const sanitizedMessage = this._sanitizeMessageForPersistence(message);
const resolved = this._resolveMessageForToolMerge(sanitizedMessage);
const safe = this._enforceRowSizeLimit(resolved);
const json = JSON.stringify(safe);
// Skip SQL write if the message is identical to what's already persisted
if (this._persistedMessageCache.get(safe.id) === json) {
continue;
}
this.sql`
insert into cf_ai_chat_agent_messages (id, message)
values (${safe.id}, ${json})
on conflict(id) do update set message = excluded.message
`;
this._persistedMessageCache.set(safe.id, json);
}
// Reconcile: delete DB rows not present in the incoming message set.
// Only safe when the incoming set is a subset of the server state
// (e.g. regenerate() trims the last assistant message). When the
// client appends new messages (IDs unknown to the server), it may
// not have the full history, so deleting "missing" rows would
// destroy server-generated assistant messages the client hasn't
// seen yet.
// This MUST use mergedMessages (post-merge IDs) because
// _mergeIncomingWithServerState can remap client IDs to server IDs.
if (options?._deleteStaleRows) {
const serverIds = new Set(this.messages.map((m) => m.id));
const isSubsetOfServer = mergedMessages.every((m) => serverIds.has(m.id));
if (isSubsetOfServer) {
const keepIds = new Set(mergedMessages.map((m) => m.id));
const allDbRows =
this.sql<{ id: string }>`
select id from cf_ai_chat_agent_messages
` || [];
for (const row of allDbRows) {
if (!keepIds.has(row.id)) {
this.sql`
delete from cf_ai_chat_agent_messages where id = ${row.id}
`;
this._persistedMessageCache.delete(row.id);
}
}
}
}
// Enforce maxPersistedMessages: delete oldest messages if over the limit
if (this.maxPersistedMessages != null) {
this._enforceMaxPersistedMessages();
}
// refresh in-memory messages
const persisted = this._loadMessagesFromDb();
this.messages = autoTransformMessages(persisted);
this._broadcastChatMessage(
{
messages: mergedMessages,
type: MessageType.CF_AGENT_CHAT_MESSAGES
},
excludeBroadcastIds
);
}
/**
* Merges incoming messages with existing server state.
* This preserves tool outputs that the server has (via _applyToolResult)
* but the client doesn't have yet.
*
* @param incomingMessages - Messages from the client
* @returns Messages with server's tool outputs preserved
*/
private _mergeIncomingWithServerState(
incomingMessages: ChatMessage[]
): ChatMessage[] {
// Build a map of toolCallId -> output from existing server messages
const serverToolOutputs = new Map<string, unknown>();
for (const msg of this.messages) {
if (msg.role !== "assistant") continue;
for (const part of msg.parts) {
if (
"toolCallId" in part &&
"state" in part &&
part.state === "output-available" &&
"output" in part
) {
serverToolOutputs.set(
part.toolCallId as string,
(part as { output: unknown }).output
);
}
}
}
// Merge server's tool outputs into incoming messages.
// The client may send stale tool states that the server has already advanced:
// - input-available: client hasn't received the tool result yet
// - approval-requested: client showed the approval UI but hasn't sent a
// response yet (server may have already executed via a parallel path)
// - approval-responded: client sent an approval but hasn't received the
// execution result (server executed it via onChatMessage between turns)
// In all cases, restore the server's output-available state and output.
const withMergedToolOutputs =
serverToolOutputs.size === 0
? incomingMessages
: incomingMessages.map((msg) => {
if (msg.role !== "assistant") return msg;
let hasChanges = false;
const updatedParts = msg.parts.map((part) => {
if (
"toolCallId" in part &&
"state" in part &&
(part.state === "input-available" ||
part.state === "approval-requested" ||
part.state === "approval-responded") &&
serverToolOutputs.has(part.toolCallId as string)
) {
hasChanges = true;
return {
...part,
state: "output-available" as const,
output: serverToolOutputs.get(part.toolCallId as string)
};
}
return part;
}) as ChatMessage["parts"];
return hasChanges ? { ...msg, parts: updatedParts } : msg;
});
return this._reconcileAssistantIdsWithServerState(withMergedToolOutputs);
}
/**
* Reconciles assistant message IDs between incoming client state and server state.
*
* The client can keep a different local ID for an assistant message than the one
* persisted on the server (e.g. optimistic/local IDs). When that full history is
* sent back, persisting by ID alone creates duplicate assistant rows. To prevent
* this, we reuse the server ID for assistant messages that match by content,
* while leaving tool-call messages to _resolveMessageForToolMerge.
*
* Uses a two-pass approach:
* - Pass 1: resolve all exact-ID matches, claiming server indices.
* - Pass 2: content-based matching for remaining non-tool assistant messages,
* scanning only unclaimed server indices left-to-right.
*
* The two-pass design prevents exact-ID matches from advancing a cursor past
* server messages that a later incoming message needs for content matching.
* This fixes mismatches when two assistant messages have identical text
* (e.g. "Sure", "I understand") — see #1008.
*/
private _reconcileAssistantIdsWithServerState(
incomingMessages: ChatMessage[]
): ChatMessage[] {
if (this.messages.length === 0) {
return incomingMessages;
}
// Pass 1: Resolve exact-ID matches first.
// This prevents content-based matching from claiming a server message
// that has a direct ID match with a later incoming message.
const claimedServerIndices = new Set<number>();
const exactMatchMap = new Map<number, number>();
for (let i = 0; i < incomingMessages.length; i++) {
const serverIdx = this.messages.findIndex(
(sm, si) =>
!claimedServerIndices.has(si) && sm.id === incomingMessages[i].id
);
if (serverIdx !== -1) {
claimedServerIndices.add(serverIdx);
exactMatchMap.set(i, serverIdx);
}
}
// Pass 2: Content-based matching for remaining non-tool assistant messages.
// Scans unclaimed server messages left-to-right to preserve ordering.
return incomingMessages.map((incomingMessage, incomingIdx) => {
if (exactMatchMap.has(incomingIdx)) {
return incomingMessage;
}
if (
incomingMessage.role !== "assistant" ||
this._hasToolCallPart(incomingMessage)
) {
// Content-based reconciliation is only for non-tool assistant messages.
// Tool-bearing assistant messages are reconciled by _resolveMessageForToolMerge.
return incomingMessage;
}
const incomingKey = this._assistantMessageContentKey(incomingMessage);
if (!incomingKey) {
return incomingMessage;
}
for (let i = 0; i < this.messages.length; i++) {
if (claimedServerIndices.has(i)) {
continue;
}
const serverMessage = this.messages[i];
if (
serverMessage.role !== "assistant" ||
this._hasToolCallPart(serverMessage)
) {
continue;
}
if (this._assistantMessageContentKey(serverMessage) === incomingKey) {
claimedServerIndices.add(i);
return {
...incomingMessage,
id: serverMessage.id
};
}
}
return incomingMessage;
});
}
private _hasToolCallPart(message: ChatMessage): boolean {
return message.parts.some((part) => "toolCallId" in part);
}
private _assistantMessageContentKey(
message: ChatMessage
): string | undefined {
if (message.role !== "assistant") {
return undefined;
}
const sanitized = this._sanitizeMessageForPersistence(message);
return JSON.stringify(sanitized.parts);
}
/**
* Resolves a message for persistence, handling tool result merging.
* If the message contains tool parts with a toolCallId that already exists
* in a server-side message with a different ID, adopts the server's ID.
* This prevents duplicate rows when the client sends messages with
* client-generated IDs (e.g. nanoid from the AI SDK) that differ from
* the server-stamped IDs. Tool call IDs are unique per conversation,
* so matching by toolCallId is safe regardless of tool state (#1094).
*
* @param message - The message to potentially merge
* @returns The message with the correct ID (either original or merged)
*/
private _resolveMessageForToolMerge(message: ChatMessage): ChatMessage {
if (message.role !== "assistant") {
return message;
}
for (const part of message.parts) {
if ("toolCallId" in part && part.toolCallId) {
const toolCallId = part.toolCallId as string;
const existingMessage = this._findMessageByToolCallId(toolCallId);
if (existingMessage && existingMessage.id !== message.id) {
return {
...message,
id: existingMessage.id
};
}
}
}
return message;
}
/**
* Finds an existing assistant message that contains a tool part with the given toolCallId.
* Used to detect when a tool result should update an existing message rather than
* creating a new one.
*
* @param toolCallId - The tool call ID to search for
* @returns The existing message if found, undefined otherwise
*/
private _findMessageByToolCallId(
toolCallId: string
): ChatMessage | undefined {
for (const msg of this.messages) {
if (msg.role !== "assistant") continue;
for (const part of msg.parts) {
if ("toolCallId" in part && part.toolCallId === toolCallId) {
return msg;
}
}
}
return undefined;
}
/**
* Sanitizes a message for persistence by removing ephemeral provider-specific
* data that should not be stored or sent back in subsequent requests.
*
* Two-step process:
*
* 1. **Strip OpenAI ephemeral fields**: The AI SDK's @ai-sdk/openai provider
* (v2.0.x+) defaults to using OpenAI's Responses API which assigns unique
* itemIds and reasoningEncryptedContent to message parts. When persisted
* and sent back, OpenAI rejects duplicate itemIds.
*
* 2. **Filter truly empty reasoning parts**: After stripping, reasoning parts
* with no text and no remaining providerMetadata are removed. Parts that
* still carry providerMetadata (e.g. Anthropic's redacted_thinking blocks
* with providerMetadata.anthropic.redactedData) are preserved, as they
* contain data required for round-tripping with the provider API.
*
* @param message - The message to sanitize
* @returns A new message with ephemeral provider data removed
*/
private _sanitizeMessageForPersistence(message: ChatMessage): ChatMessage {
// First, strip OpenAI-specific ephemeral data from all parts
const strippedParts = message.parts.map((part) => {
let sanitizedPart = part;
// Strip providerMetadata.openai.itemId and reasoningEncryptedContent
if (
"providerMetadata" in sanitizedPart &&
sanitizedPart.providerMetadata &&
typeof sanitizedPart.providerMetadata === "object" &&
"openai" in sanitizedPart.providerMetadata
) {
sanitizedPart = this._stripOpenAIMetadata(
sanitizedPart,
"providerMetadata"
);
}
// Also check callProviderMetadata for tool parts
if (
"callProviderMetadata" in sanitizedPart &&
sanitizedPart.callProviderMetadata &&
typeof sanitizedPart.callProviderMetadata === "object" &&
"openai" in sanitizedPart.callProviderMetadata
) {
sanitizedPart = this._stripOpenAIMetadata(
sanitizedPart,
"callProviderMetadata"
);
}
return sanitizedPart;
}) as ChatMessage["parts"];
// Then filter out reasoning parts that are truly empty (no text and no
// remaining providerMetadata). This removes OpenAI placeholders whose
// metadata was just stripped, while preserving provider-specific blocks
// like Anthropic's redacted_thinking that carry data in providerMetadata.
const sanitizedParts = strippedParts.filter((part) => {
if (part.type === "reasoning") {
const reasoningPart = part as ReasoningUIPart;
if (!reasoningPart.text || reasoningPart.text.trim() === "") {
if (
"providerMetadata" in reasoningPart &&
reasoningPart.providerMetadata &&
typeof reasoningPart.providerMetadata === "object" &&
Object.keys(reasoningPart.providerMetadata).length > 0
) {
return true;
}
return false;
}
}
return true;
});
return { ...message, parts: sanitizedParts };
}
/**
* Helper to strip OpenAI-specific ephemeral fields from a metadata object.
* Removes itemId and reasoningEncryptedContent while preserving other fields.
*/
private _stripOpenAIMetadata<T extends ChatMessage["parts"][number]>(
part: T,
metadataKey: "providerMetadata" | "callProviderMetadata"
): T {
const metadata = (part as Record<string, unknown>)[metadataKey] as {
openai?: Record<string, unknown>;
[key: string]: unknown;
};
if (!metadata?.openai) return part;
const openaiMeta = metadata.openai;
// Remove ephemeral fields: itemId and reasoningEncryptedContent
const {
itemId: _itemId,
reasoningEncryptedContent: _rec,
...restOpenai
} = openaiMeta;
// Determine what to keep
const hasOtherOpenaiFields = Object.keys(restOpenai).length > 0;
const { openai: _openai, ...restMetadata } = metadata;
let newMetadata: ProviderMetadata | undefined;
if (hasOtherOpenaiFields) {
newMetadata = {
...restMetadata,
openai: restOpenai
} as ProviderMetadata;
} else if (Object.keys(restMetadata).length > 0) {
newMetadata = restMetadata as ProviderMetadata;
}
// Create new part without the old metadata
const { [metadataKey]: _oldMeta, ...restPart } = part as Record<
string,
unknown
>;
if (newMetadata) {
return { ...restPart, [metadataKey]: newMetadata } as T;
}
return restPart as T;
}
/**
* Deletes oldest messages from SQLite when the count exceeds maxPersistedMessages.
* Called after each persist to keep storage bounded.
*/
private _enforceMaxPersistedMessages() {
if (this.maxPersistedMessages == null) return;
const countResult = this.sql<{ cnt: number }>`
select count(*) as cnt from cf_ai_chat_agent_messages
`;
const count = countResult?.[0]?.cnt ?? 0;
if (count <= this.maxPersistedMessages) return;
const excess = count - this.maxPersistedMessages;
// Delete the oldest messages (by created_at)
// Also remove them from the persistence cache
const toDelete = this.sql<{ id: string }>`
select id from cf_ai_chat_agent_messages
order by created_at asc
limit ${excess}
`;
if (toDelete && toDelete.length > 0) {
for (const row of toDelete) {
this.sql`delete from cf_ai_chat_agent_messages where id = ${row.id}`;
this._persistedMessageCache.delete(row.id);
}
}
}
/**
* Enforces SQLite row size limits by compacting tool outputs and text parts
* when a serialized message exceeds the safety threshold (1.8MB).
*
* Only fires in pathological cases (extremely large tool outputs or text).
* Returns the message unchanged if it fits within limits.
*
* Compaction strategy:
* 1. Compact tool outputs over 1KB (replace with LLM-friendly summary)
* 2. If still too big, truncate text parts from oldest to newest
* 3. Add metadata so clients can detect compaction
*
* @param message - The message to check
* @returns The message, compacted if necessary
*/
private _enforceRowSizeLimit(message: ChatMessage): ChatMessage {
let json = JSON.stringify(message);
let size = AIChatAgent._byteLength(json);
if (size <= AIChatAgent.ROW_MAX_BYTES) return message;
if (message.role !== "assistant") {
// Non-assistant messages (user/system) are harder to compact safely.
// Truncate the entire message JSON as a last resort.
console.warn(
`[AIChatAgent] Non-assistant message ${message.id} is ${size} bytes, ` +
`exceeds row limit. Truncating text parts.`
);
return this._truncateTextParts(message);
}
console.warn(
`[AIChatAgent] Message ${message.id} is ${size} bytes, ` +
`compacting tool outputs to fit SQLite row limit`
);
// Pass 1: compact tool outputs
const compactedToolCallIds: string[] = [];
const compactedParts = message.parts.map((part) => {
if (
"output" in part &&
"toolCallId" in part &&
"state" in part &&
part.state === "output-available"
) {
const outputJson = JSON.stringify((part as { output: unknown }).output);
if (outputJson.length > 1000) {
compactedToolCallIds.push(part.toolCallId as string);
return {
...part,
output:
"This tool output was too large to persist in storage " +
`(${outputJson.length} bytes). ` +
"If the user asks about this data, suggest re-running the tool. " +
`Preview: ${outputJson.slice(0, 500)}...`
};
}
}
return part;
}) as ChatMessage["parts"];
let result: ChatMessage = {
...message,
parts: compactedParts
};
if (compactedToolCallIds.length > 0) {
result.metadata = {
...(result.metadata ?? {}),
compactedToolOutputs: compactedToolCallIds
};
}
// Check if tool compaction was enough
json = JSON.stringify(result);
size = AIChatAgent._byteLength(json);
if (size <= AIChatAgent.ROW_MAX_BYTES) return result;
// Pass 2: truncate text parts
console.warn(
`[AIChatAgent] Message ${message.id} still ${size} bytes after tool compaction, truncating text parts`
);
return this._truncateTextParts(result);
}
/**
* Truncates text parts in a message to fit within the row size limit.
* Truncates from the first text part forward, keeping the last text part
* as intact as possible (it is usually the most relevant).
*/
private _truncateTextParts(message: ChatMessage): ChatMessage {
const compactedTextPartIndices: number[] = [];
const parts = [...message.parts];
// Truncate text parts from oldest to newest until we fit
for (let i = 0; i < parts.length; i++) {
const part = parts[i];
if (part.type === "text" && "text" in part) {
const text = (part as { text: string }).text;
if (text.length > 1000) {
compactedTextPartIndices.push(i);
parts[i] = {
...part,
text:
`[Text truncated for storage (${text.length} chars). ` +
`First 500 chars: ${text.slice(0, 500)}...]`
} as ChatMessage["parts"][number];
// Check if we fit now
const candidate = { ...message, parts };
if (
AIChatAgent._byteLength(JSON.stringify(candidate)) <=
AIChatAgent.ROW_MAX_BYTES
) {
break;
}
}
}
}
const result: ChatMessage = { ...message, parts };
if (compactedTextPartIndices.length > 0) {
result.metadata = {
...(result.metadata ?? {}),
compactedTextParts: compactedTextPartIndices
};
}
return result;
}
/**
* Shared helper for finding a tool part by toolCallId and applying an update.
* Handles both streaming (in-memory) and persisted (SQLite) messages.
*
* Checks _streamingMessage first (tool results/approvals can arrive while
* the AI is still streaming), then retries persisted messages with backoff
* in case streaming completes between attempts.
*
* @param toolCallId - The tool call ID to find
* @param callerName - Name for log messages (e.g. "_applyToolResult")
* @param matchStates - Which tool part states to match
* @param applyUpdate - Mutation to apply to the matched part (streaming: in-place, persisted: spread)
* @returns true if the update was applied, false if not found or state didn't match
*/
private async _findAndUpdateToolPart(
toolCallId: string,
callerName: string,
matchStates: string[],
applyUpdate: (part: Record<string, unknown>) => Record<string, unknown>
): Promise<boolean> {
// Find the message containing this tool call.
// Check streaming message first (in-memory, not yet persisted), then
// retry persisted messages with backoff.
let message: ChatMessage | undefined;
if (this._streamingMessage) {
for (const part of this._streamingMessage.parts) {
if ("toolCallId" in part && part.toolCallId === toolCallId) {
message = this._streamingMessage;
break;
}
}
}
if (!message) {
for (let attempt = 0; attempt < 10; attempt++) {
message = this._findMessageByToolCallId(toolCallId);
if (message) break;
await new Promise((resolve) => setTimeout(resolve, 100));
}
}
if (!message) {
console.warn(
`[AIChatAgent] ${callerName}: Could not find message with toolCallId ${toolCallId} after retries`
);
return false;
}
const isStreamingMessage = message === this._streamingMessage;
let updated = false;
if (isStreamingMessage) {
// Update in place -- the message will be persisted when streaming completes
for (const part of message.parts) {
if (
"toolCallId" in part &&
part.toolCallId === toolCallId &&
"state" in part &&
matchStates.includes(part.state as string)
) {
const applied = applyUpdate(part as Record<string, unknown>);
Object.assign(part, applied);
updated = true;
break;
}
}
} else {
// For persisted messages, create updated parts immutably
const updatedParts = message.parts.map((part) => {
if (
"toolCallId" in part &&
part.toolCallId === toolCallId &&
"state" in part &&
matchStates.includes(part.state as string)
) {
updated = true;
return applyUpdate(part as Record<string, unknown>);
}
return part;
}) as ChatMessage["parts"];
if (updated) {
const updatedMessage: ChatMessage = this._sanitizeMessageForPersistence(
{ ...message, parts: updatedParts }
);
const safe = this._enforceRowSizeLimit(updatedMessage);
const json = JSON.stringify(safe);
this.sql`
update cf_ai_chat_agent_messages
set message = ${json}
where id = ${message.id}
`;
this._persistedMessageCache.set(message.id, json);
const persisted = this._loadMessagesFromDb();
this.messages = autoTransformMessages(persisted);
}
}
if (!updated) {
console.warn(
`[AIChatAgent] ${callerName}: Tool part with toolCallId ${toolCallId} not in expected state (expected: ${matchStates.join("|")})`
);
return false;
}
// Broadcast the update to all clients.
// For persisted messages, re-fetch the latest state from this.messages.
// For streaming messages, broadcast the in-memory snapshot so clients
// get immediate confirmation that the tool result/approval was applied.
if (isStreamingMessage) {
this._broadcastChatMessage({
type: MessageType.CF_AGENT_MESSAGE_UPDATED,
message
});
} else {
const broadcastMessage = this._findMessageByToolCallId(toolCallId);
if (broadcastMessage) {
this._broadcastChatMessage({
type: MessageType.CF_AGENT_MESSAGE_UPDATED,
message: broadcastMessage
});
}
}
return true;
}
/**
* Applies a tool result to an existing assistant message.
* This is used when the client sends CF_AGENT_TOOL_RESULT for client-side tools.
* The server is the source of truth, so we update the message here and broadcast
* the update to all clients.
*
* @param toolCallId - The tool call ID this result is for
* @param _toolName - The name of the tool (unused, kept for API compat)
* @param output - The output from the tool execution
* @param overrideState - Optional state override ("output-error" to signal denial/failure)
* @param errorText - Error message when overrideState is "output-error"
* @returns true if the result was applied, false if the message was not found
*/
private async _applyToolResult(
toolCallId: string,
_toolName: string,
output: unknown,
overrideState?: "output-error",
errorText?: string
): Promise<boolean> {
return this._findAndUpdateToolPart(
toolCallId,
"_applyToolResult",
["input-available", "approval-requested", "approval-responded"],
(part) => ({
...part,
...(overrideState === "output-error"
? {
state: "output-error",
errorText: errorText ?? "Tool execution denied by user"
}
: { state: "output-available", output, preliminary: false })
})
);
}
private async _streamSSEReply(
id: string,
streamId: string,
reader: ReadableStreamDefaultReader<Uint8Array>,
message: ChatMessage,
streamCompleted: { value: boolean },
continuation = false,
abortSignal?: AbortSignal
) {
streamCompleted.value = false;
// Cancel the reader when the abort signal fires (e.g. client pressed stop).
// This ensures we stop broadcasting chunks even if the underlying stream
// hasn't been connected to the abort signal (e.g. user forgot to pass it
// to streamText).
if (abortSignal && !abortSignal.aborted) {
abortSignal.addEventListener(
"abort",
() => {
reader.cancel().catch(() => {});
},
{ once: true }
);
}
while (true) {
if (abortSignal?.aborted) break;
let readResult: ReadableStreamReadResult<Uint8Array>;
try {
readResult = await reader.read();
} catch {
// reader.read() throws after cancel() — treat as abort
break;
}
const { done, value } = readResult;
if (done) {
this._completeStream(streamId);
streamCompleted.value = true;
this._broadcastChatMessage({
body: "",
done: true,
id,
type: MessageType.CF_AGENT_USE_CHAT_RESPONSE,
...(continuation && { continuation: true })
});
break;
}
const chunk = decoder.decode(value);
const lines = chunk.split("\n");
for (const line of lines) {
if (line.startsWith("data: ") && line !== "data: [DONE]") {
try {
const data: UIMessageChunk = JSON.parse(line.slice(6));
// Delegate message building to the shared parser.
// It handles: text, reasoning, file, source, tool lifecycle,
// step boundaries — all the part types needed for UIMessage.
const handled = applyChunkToParts(message.parts, data);
// When a tool enters approval-requested state, the stream is
// paused waiting for user approval. Persist the streaming message
// immediately so the approval UI survives page refresh. Without
// this, a refresh would reload from SQLite where the tool part
// is still in input-available state, showing "Running..." instead
// of the Approve/Reject buttons.
if (
data.type === "tool-approval-request" &&
this._streamingMessage
) {
// Persist directly to SQLite without broadcasting.
// The client already has this data from the SSE stream —
// broadcasting would cause the approval UI to render twice.
// We only need the SQL write so the state survives page refresh.
const snapshot: ChatMessage = {
...this._streamingMessage,
parts: [...this._streamingMessage.parts]
};
const sanitized = this._sanitizeMessageForPersistence(snapshot);
const json = JSON.stringify(sanitized);
this.sql`
INSERT INTO cf_ai_chat_agent_messages (id, message)
VALUES (${sanitized.id}, ${json})
ON CONFLICT(id) DO UPDATE SET message = excluded.message
`;
// Track that we persisted early so stream completion can update
// in place rather than appending a duplicate.
this._approvalPersistedMessageId = sanitized.id;
}
// Cross-message tool output fallback:
// When a tool with needsApproval is approved, the continuation
// stream emits tool-output-available/tool-output-error for a
// tool call that lives in a *previous* assistant message.
// applyChunkToParts only searches the current message's parts,
// so the update is silently skipped. Fall back to searching
// this.messages and update the persisted message directly.
// Note: checked independently of `handled` — applyChunkToParts
// returns true for recognized chunk types even when it cannot
// find the target part, so `handled` is not a reliable signal.
if (
(data.type === "tool-output-available" ||
data.type === "tool-output-error") &&
data.toolCallId
) {
const foundInCurrentMessage = message.parts.some(
(p) => "toolCallId" in p && p.toolCallId === data.toolCallId
);
if (!foundInCurrentMessage) {
if (data.type === "tool-output-available") {
this._findAndUpdateToolPart(
data.toolCallId,
"_streamSSEReply",
[
"input-available",
"input-streaming",
"approval-responded",
"approval-requested"
],
(part) => ({
...part,
state: "output-available",
output: data.output,
...(data.preliminary !== undefined && {
preliminary: data.preliminary
})
})
);
} else {
this._findAndUpdateToolPart(
data.toolCallId,
"_streamSSEReply",
[
"input-available",
"input-streaming",
"approval-responded",
"approval-requested"
],
(part) => ({
...part,
state: "output-error",
errorText: data.errorText
})
);
}
}
}
// Handle server-specific chunk types not covered by the shared parser
if (!handled) {
switch (data.type) {
case "start": {
if (data.messageId != null) {
message.id = data.messageId;
}
if (data.messageMetadata != null) {
message.metadata = message.metadata
? { ...message.metadata, ...data.messageMetadata }
: data.messageMetadata;
}
break;
}
case "finish":
case "message-metadata": {
if (data.messageMetadata != null) {
message.metadata = message.metadata
? { ...message.metadata, ...data.messageMetadata }
: data.messageMetadata;
}
break;
}
case "finish-step": {
// No-op for message building (shared parser handles step-start)
break;
}
case "error": {
this._broadcastChatMessage({
error: true,
body: data.errorText ?? JSON.stringify(data),
done: false,
id,
type: MessageType.CF_AGENT_USE_CHAT_RESPONSE
});
break;
}
}
}
// Convert internal AI SDK stream events to valid UIMessageStreamPart format.
// The "finish" event with "finishReason" is an internal LanguageModelV3StreamPart,
// not a UIMessageStreamPart (which expects "messageMetadata" instead).
// See: https://github.com/cloudflare/agents/issues/677
let eventToSend: unknown = data;
if (data.type === "finish" && "finishReason" in data) {
const { finishReason, ...rest } = data as {
finishReason: string;
[key: string]: unknown;
};
eventToSend = {
...rest,
type: "finish",
messageMetadata: { finishReason }
};
}
// Store chunk for replay and broadcast to clients
const chunkBody = JSON.stringify(eventToSend);
this._storeStreamChunk(streamId, chunkBody);
this._broadcastChatMessage({
body: chunkBody,
done: false,
id,
type: MessageType.CF_AGENT_USE_CHAT_RESPONSE,
...(continuation && { continuation: true })
});
} catch (_error) {
// Skip malformed JSON lines silently
}
}
}
}
// If we exited due to abort, send a done signal so clients know the stream ended
if (!streamCompleted.value) {
console.warn(
"[AIChatAgent] Stream was still active when cancel was received. " +
"Pass options.abortSignal to streamText() in your onChatMessage() " +
"to cancel the upstream LLM call and avoid wasted work."
);
this._completeStream(streamId);
streamCompleted.value = true;
this._broadcastChatMessage({
body: "",
done: true,
id,
type: MessageType.CF_AGENT_USE_CHAT_RESPONSE,
...(continuation && { continuation: true })
});
}
}
// Handle plain text responses (e.g., from generateText)
private async _sendPlaintextReply(
id: string,
streamId: string,
reader: ReadableStreamDefaultReader<Uint8Array>,
message: ChatMessage,
streamCompleted: { value: boolean },
continuation = false,
abortSignal?: AbortSignal
) {
// if not AI SDK SSE format, we need to inject text-start and text-end events ourselves
this._broadcastTextEvent(
streamId,
{ type: "text-start", id },
continuation
);
// Use a single text part and accumulate into it, so the persisted message
// has one text part regardless of how many network chunks the response spans.
const textPart: TextUIPart = { type: "text", text: "", state: "streaming" };
message.parts.push(textPart);
// Cancel the reader when the abort signal fires
if (abortSignal && !abortSignal.aborted) {
abortSignal.addEventListener(
"abort",
() => {
reader.cancel().catch(() => {});
},
{ once: true }
);
}
while (true) {
if (abortSignal?.aborted) break;
let readResult: ReadableStreamReadResult<Uint8Array>;
try {
readResult = await reader.read();
} catch {
// reader.read() throws after cancel() — treat as abort
break;
}
const { done, value } = readResult;
if (done) {
textPart.state = "done";
this._broadcastTextEvent(
streamId,
{ type: "text-end", id },
continuation
);
// Mark the stream as completed
this._completeStream(streamId);
streamCompleted.value = true;
// Send final completion signal
this._broadcastChatMessage({
body: "",
done: true,
id,
type: MessageType.CF_AGENT_USE_CHAT_RESPONSE,
...(continuation && { continuation: true })
});
break;
}
const chunk = decoder.decode(value);
// Accumulate into the single text part to preserve exact formatting
if (chunk.length > 0) {
textPart.text += chunk;
this._broadcastTextEvent(
streamId,
{ type: "text-delta", id, delta: chunk },
continuation
);
}
}
// If we exited due to abort, send a done signal so clients know the stream ended
if (!streamCompleted.value) {
console.warn(
"[AIChatAgent] Stream was still active when cancel was received. " +
"Pass options.abortSignal to streamText() in your onChatMessage() " +
"to cancel the upstream LLM call and avoid wasted work."
);
textPart.state = "done";
this._broadcastTextEvent(
streamId,
{ type: "text-end", id },
continuation
);
this._completeStream(streamId);
streamCompleted.value = true;
this._broadcastChatMessage({
body: "",
done: true,
id,
type: MessageType.CF_AGENT_USE_CHAT_RESPONSE,
...(continuation && { continuation: true })
});
}
}
/**
* Applies a tool approval response from the client, updating the persisted message.
* This is called when the client sends CF_AGENT_TOOL_APPROVAL for tools with needsApproval.
*
* - approved=true transitions to approval-responded
* - approved=false transitions to output-denied so convertToModelMessages
* emits a tool_result for providers (e.g. Anthropic) that require it.
*
* @param toolCallId - The tool call ID this approval is for
* @param approved - Whether the tool execution was approved
* @returns true if the approval was applied, false if the message was not found
*/
private async _applyToolApproval(
toolCallId: string,
approved: boolean
): Promise<boolean> {
return this._findAndUpdateToolPart(
toolCallId,
"_applyToolApproval",
["input-available", "approval-requested"],
(part) => ({
...part,
state: approved ? "approval-responded" : "output-denied",
// Merge with existing approval data to preserve the id field.
// convertToModelMessages needs approval.id to produce a valid
// tool-approval-request content part with approvalId.
approval: {
...(part.approval as Record<string, unknown> | undefined),
approved
}
})
);
}
private async _reply(
id: string,
response: Response,
excludeBroadcastIds: string[] = [],
options: { continuation?: boolean; chatMessageId?: string } = {}
) {
const { continuation = false, chatMessageId } = options;
// Look up the abort signal for this request so we can cancel the reader
// loop if the client sends a cancel message. This is a safety net —
// users should also pass abortSignal to streamText for proper cancellation.
const abortSignal = chatMessageId
? this._chatMessageAbortControllers.get(chatMessageId)?.signal
: undefined;
// Keep the DO alive during streaming to prevent idle eviction
return this.keepAliveWhile(() =>
this._tryCatchChat(async () => {
if (!response.body) {
// Send empty response if no body
this._broadcastChatMessage({
body: "",
done: true,
id,
type: MessageType.CF_AGENT_USE_CHAT_RESPONSE,
...(continuation && { continuation: true })
});
return;
}
// Start tracking this stream for resumability
const streamId = this._startStream(id);
const reader = response.body.getReader();
// Parsing state adapted from:
// https://github.com/vercel/ai/blob/main/packages/ai/src/ui-message-stream/ui-message-chunks.ts#L295
const message: ChatMessage = {
id: `assistant_${Date.now()}_${Math.random().toString(36).slice(2, 11)}`, // default
role: "assistant",
parts: []
};
// Track the streaming message so tool results can be applied before persistence
this._streamingMessage = message;
// Set up completion promise for tool continuation to wait on
this._streamCompletionPromise = new Promise((resolve) => {
this._streamCompletionResolve = resolve;
});
// Determine response format based on content-type
const contentType = response.headers.get("content-type") || "";
const isSSE = contentType.includes("text/event-stream"); // AI SDK v5 SSE format
const streamCompleted = { value: false };
// Capture before try so it's available after finally.
// _approvalPersistedMessageId is set inside _streamSSEReply when a
// tool enters approval-requested state and the message is persisted early.
let earlyPersistedId: string | null = null;
try {
if (isSSE) {
// AI SDK v5 SSE format
await this._streamSSEReply(
id,
streamId,
reader,
message,
streamCompleted,
continuation,
abortSignal
);
} else {
await this._sendPlaintextReply(
id,
streamId,
reader,
message,
streamCompleted,
continuation,
abortSignal
);
}
} catch (error) {
// Mark stream as error if not already completed
if (!streamCompleted.value) {
this._markStreamError(streamId);
// Notify clients of the error
this._broadcastChatMessage({
body: error instanceof Error ? error.message : "Stream error",
done: true,
error: true,
id,
type: MessageType.CF_AGENT_USE_CHAT_RESPONSE,
...(continuation && { continuation: true })
});
this._emit("message:error", {
error: error instanceof Error ? error.message : String(error)
});
}
throw error;
} finally {
reader.releaseLock();
// Always clear the streaming message reference and resolve completion
// promise, even on error. Without this, tool continuations waiting on
// _streamCompletionPromise would hang forever after a stream error.
this._streamingMessage = null;
// Capture and clear early-persist tracking. The persistence block
// after the finally uses the local to update in place.
earlyPersistedId = this._approvalPersistedMessageId;
this._approvalPersistedMessageId = null;
if (this._streamCompletionResolve) {
this._streamCompletionResolve();
this._streamCompletionResolve = null;
this._streamCompletionPromise = null;
}
// Framework-level cleanup: always remove abort controller.
// Only emit observability on success (not on error path).
if (chatMessageId) {
this._removeAbortController(chatMessageId);
if (streamCompleted.value) {
this._emit("message:response");
}
}
}
if (message.parts.length > 0) {
if (earlyPersistedId) {
// Message already exists in this.messages from the early persist.
// Update it in place with the final streaming state.
// Note: early-persisted messages come from the initial stream
// (before approval), which is never a continuation. The
// continuation stream starts fresh after approval, so
// earlyPersistedId will always be null for continuations.
const updatedMessages = this.messages.map((msg) =>
msg.id === earlyPersistedId ? message : msg
);
await this.persistMessages(updatedMessages, excludeBroadcastIds);
} else if (continuation) {
// Find the last assistant message and append parts to it
let lastAssistantIdx = -1;
for (let i = this.messages.length - 1; i >= 0; i--) {
if (this.messages[i].role === "assistant") {
lastAssistantIdx = i;
break;
}
}
if (lastAssistantIdx >= 0) {
const lastAssistant = this.messages[lastAssistantIdx];
const mergedMessage: ChatMessage = {
...lastAssistant,
parts: [...lastAssistant.parts, ...message.parts]
};
const updatedMessages = [...this.messages];
updatedMessages[lastAssistantIdx] = mergedMessage;
await this.persistMessages(updatedMessages, excludeBroadcastIds);
} else {
// No assistant message to append to, create new one
await this.persistMessages(
[...this.messages, message],
excludeBroadcastIds
);
}
} else {
await this.persistMessages(
[...this.messages, message],
excludeBroadcastIds
);
}
}
})
);
}
/**
* For the given message id, look up its associated AbortController
* If the AbortController does not exist, create and store one in memory
*
* returns the AbortSignal associated with the AbortController
*/
private _getAbortSignal(id: string): AbortSignal | undefined {
// Defensive check, since we're coercing message types at the moment
if (typeof id !== "string") {
return undefined;
}
if (!this._chatMessageAbortControllers.has(id)) {
this._chatMessageAbortControllers.set(id, new AbortController());
}
return this._chatMessageAbortControllers.get(id)?.signal;
}
/**
* Remove an abort controller from the cache of pending message responses
*/
private _removeAbortController(id: string) {
this._chatMessageAbortControllers.delete(id);
}
/**
* Propagate an abort signal for any requests associated with the given message id
*/
private _cancelChatRequest(id: string) {
if (this._chatMessageAbortControllers.has(id)) {
const abortController = this._chatMessageAbortControllers.get(id);
abortController?.abort();
}
}
/**
* Abort all pending requests and clear the cache of AbortControllers
*/
private _destroyAbortControllers() {
for (const controller of this._chatMessageAbortControllers.values()) {
controller?.abort();
}
this._chatMessageAbortControllers.clear();
}
/**
* When the DO is destroyed, cancel all pending requests and clean up resources
*/
async destroy() {
this._destroyAbortControllers();
this._resumableStream.destroy();
await super.destroy();
}
}