branch:
think.ts
30531 bytesRaw
/**
* Think — a unified Agent base class for chat sessions.
*
* Works as both a **top-level agent** (speaking the `cf_agent_chat_*`
* WebSocket protocol to browser clients) and a **sub-agent** (called
* via `chat()` over RPC from a parent agent).
*
* Each instance gets its own SQLite storage and runs the full chat
* lifecycle:
* store user message → assemble context → call LLM → stream events → persist response
*
* Uses SessionManager for message persistence, giving you branching and
* compaction support for free.
*
* Override points:
* - getModel() — return the LanguageModel to use
* - getSystemPrompt() — return the system prompt
* - getTools() — return the ToolSet for the agentic loop
* - getMaxSteps() — max tool-call rounds per turn (default: 10)
* - assembleContext() — customize context assembly from this.messages
* - onChatMessage() — full control over inference (override the agentic loop)
* - onChatError() — customize error handling
*
* Production features:
* - WebSocket chat protocol (compatible with useAgentChat / useChat)
* - Multi-session management (create, switch, list, delete, rename)
* - Sub-agent RPC streaming via StreamCallback
* - Abort/cancel support via AbortSignal
* - Error handling with partial message persistence
* - Message sanitization (strips OpenAI ephemeral metadata)
* - Row size enforcement (compacts large tool outputs)
* - Configurable storage bounds (maxPersistedMessages)
* - Incremental persistence (skips unchanged messages)
* - Richer input (accepts UIMessage or string)
*
* @experimental Requires the `"experimental"` compatibility flag.
*
* @example
* ```typescript
* import { Think } from "@cloudflare/think";
* import { createWorkersAI } from "workers-ai-provider";
* import { createWorkspaceTools } from "@cloudflare/think/tools/workspace";
* import { Workspace } from "@cloudflare/shell";
*
* export class ChatSession extends Think<Env> {
* workspace = new Workspace(this);
*
* getModel() {
* return createWorkersAI({ binding: this.env.AI })("@cf/moonshotai/kimi-k2.5");
* }
*
* getTools() {
* return createWorkspaceTools(this.workspace);
* }
* }
* ```
*/
import type { LanguageModel, ModelMessage, ToolSet, UIMessage } from "ai";
import {
convertToModelMessages,
pruneMessages,
stepCountIs,
streamText
} from "ai";
import {
Agent,
__DO_NOT_USE_WILL_BREAK__agentContext as agentContext
} from "agents";
import type { Connection, WSMessage } from "agents";
import type { Workspace } from "@cloudflare/shell";
import { withFibers } from "agents/experimental/forever";
import type { FiberMethods } from "agents/experimental/forever";
import { SessionManager } from "./session/index";
import type { Session } from "./session/index";
import { applyChunkToParts } from "./message-builder";
import type { StreamChunkData } from "./message-builder";
import { sanitizeMessage, enforceRowSizeLimit } from "./sanitize";
export type { Session } from "./session/index";
export type {
FiberState,
FiberRecoveryContext,
FiberContext,
FiberCompleteContext,
FiberMethods
} from "agents/experimental/forever";
// ── Fiber base class ──────────────────────────────────────────────────
// Think extends withFibers(Agent) so fiber methods (spawnFiber, etc.)
// are always available on the prototype. The `fibers` flag controls
// whether interrupted fibers are recovered on start.
//
// The type cast preserves Agent's generic constructor while adding
// FiberMethods to the instance type, avoiding unsafe interface merging.
type ThinkBaseConstructor = {
new <
Env extends Cloudflare.Env = Cloudflare.Env,
State = unknown,
Props extends Record<string, unknown> = Record<string, unknown>
>(
ctx: DurableObjectState,
env: Env
): Agent<Env, State, Props> & FiberMethods;
};
const ThinkBase = withFibers(Agent) as unknown as ThinkBaseConstructor;
// ── Wire protocol constants ────────────────────────────────────────
// These string values are wire-compatible with @cloudflare/ai-chat's
// MessageType enum. Defined locally to avoid a circular dependency.
const MSG_CHAT_MESSAGES = "cf_agent_chat_messages";
const MSG_CHAT_REQUEST = "cf_agent_use_chat_request";
const MSG_CHAT_RESPONSE = "cf_agent_use_chat_response";
const MSG_CHAT_CLEAR = "cf_agent_chat_clear";
const MSG_CHAT_CANCEL = "cf_agent_chat_request_cancel";
/**
* Callback interface for streaming chat events from a Think.
*
* Designed to work across the sub-agent RPC boundary — implement as
* an RpcTarget in the parent agent and pass to `chat()`.
*
* Methods may return a Promise for async RPC callbacks.
*/
export interface StreamCallback {
/** Called for each UIMessageChunk event during streaming. */
onEvent(json: string): void | Promise<void>;
/** Called when the stream completes successfully (not called on abort). */
onDone(): void | Promise<void>;
/** Called when an error occurs during streaming. */
onError?(error: string): void | Promise<void>;
}
/**
* Minimal interface for the result of `onChatMessage()`.
* Must provide a `toUIMessageStream()` method that returns an
* async-iterable stream of UI message chunks.
*
* The AI SDK's `streamText()` result satisfies this interface.
*/
export interface StreamableResult {
toUIMessageStream(): AsyncIterable<unknown>;
}
/**
* Options for a chat turn (sub-agent RPC entry point).
*/
export interface ChatOptions {
/** AbortSignal — fires when the caller wants to cancel the turn. */
signal?: AbortSignal;
/** Extra tools to merge with getTools() for this turn only. */
tools?: ToolSet;
}
/**
* Options passed to the onChatMessage handler.
*/
export interface ChatMessageOptions {
/** AbortSignal for cancelling the request */
signal?: AbortSignal;
/** Extra tools to merge with getTools() for this turn only. */
tools?: ToolSet;
}
/**
* A unified Agent base class for chat sessions.
*
* Works as both a top-level agent (WebSocket chat protocol) and a
* sub-agent (RPC streaming via `chat()`).
*
* @experimental Requires the `"experimental"` compatibility flag.
*/
export class Think<
Env extends Cloudflare.Env = Cloudflare.Env,
Config = Record<string, unknown>
> extends (ThinkBase as ThinkBaseConstructor)<Env> {
/** Session manager — persistence layer with branching and compaction. */
sessions!: SessionManager;
/** In-memory messages for the current conversation. Authoritative after load. */
messages: UIMessage[] = [];
/**
* Enable durable fiber recovery on start. Set to `true` to
* automatically recover interrupted fibers when the DO restarts.
*
* Fiber methods (`spawnFiber()`, `stashFiber()`, etc.) are always
* available — this flag only controls automatic recovery.
*
* @experimental
*/
fibers = false;
/**
* Maximum number of messages to keep in storage per session.
* When exceeded, oldest messages are deleted after each persist.
* Set to `undefined` (default) for no limit.
*
* This controls storage only — it does not affect what's sent to the LLM.
* Use `pruneMessages()` in `assembleContext()` to control LLM context.
*/
maxPersistedMessages: number | undefined = undefined;
/**
* Cache of last-persisted JSON for each message ID.
* Used for incremental persistence: skip SQL writes for unchanged messages.
* @internal
*/
private _persistedMessageCache: Map<string, string> = new Map();
private _sessionId: string | null = null;
private _abortControllers = new Map<string, AbortController>();
private _clearGeneration = 0;
// ── Dynamic config ──────────────────────────────────────────────
#configTableReady = false;
#configCache: Config | null = null;
private _ensureConfigTable(): void {
if (this.#configTableReady) return;
this.sql`
CREATE TABLE IF NOT EXISTS _think_config (
key TEXT PRIMARY KEY, value TEXT NOT NULL
)
`;
this.#configTableReady = true;
}
/**
* Persist a typed configuration object.
* Stored in SQLite so it survives restarts and hibernation.
*/
configure(config: Config): void {
this._ensureConfigTable();
const json = JSON.stringify(config);
this.sql`
INSERT OR REPLACE INTO _think_config (key, value) VALUES ('config', ${json})
`;
this.#configCache = config;
}
/**
* Read the persisted configuration, or null if never configured.
*/
getConfig(): Config | null {
if (this.#configCache) return this.#configCache;
this._ensureConfigTable();
const rows = this.sql<{ value: string }>`
SELECT value FROM _think_config WHERE key = 'config'
`;
if (rows.length > 0) {
this.#configCache = JSON.parse(rows[0].value) as Config;
return this.#configCache;
}
return null;
}
onStart() {
this.sessions = new SessionManager(this, {
exec: (query, ...values) => {
this.ctx.storage.sql.exec(query, ...values);
}
});
const existing = this.sessions.list();
if (existing.length > 0) {
this._sessionId = existing[0].id;
this.messages = this.sessions.getHistory(this._sessionId);
this._rebuildPersistenceCache();
}
this._setupProtocolHandlers();
if (this.fibers) {
void this.checkFibers();
}
}
// ── Override points ──────────────────────────────────────────────
/**
* Return the language model to use for inference.
* Must be overridden by subclasses that rely on the default
* `onChatMessage` implementation (the agentic loop).
*/
getModel(): LanguageModel {
throw new Error(
"Override getModel() to return a LanguageModel, or override onChatMessage() for full control."
);
}
/**
* Return the system prompt for the assistant.
* Override to customize instructions.
*/
getSystemPrompt(): string {
return "You are a helpful assistant.";
}
/**
* Return the tools available to the assistant.
* Override to provide workspace tools, custom tools, etc.
*/
getTools(): ToolSet {
return {};
}
/**
* Return the maximum number of tool-call steps per turn.
*/
getMaxSteps(): number {
return 10;
}
/**
* Return the workspace instance for this session, or null if none.
*
* Override in subclasses that create a Workspace. Used by
* HostBridgeLoopback to provide workspace access to extension Workers.
*/
getWorkspace(): Workspace | null {
return null;
}
// ── Workspace proxy methods (called by HostBridgeLoopback via RPC) ──
async _hostReadFile(path: string): Promise<string | null> {
const ws = this.getWorkspace();
if (!ws) throw new Error("No workspace available on this agent");
return ws.readFile(path);
}
async _hostWriteFile(path: string, content: string): Promise<void> {
const ws = this.getWorkspace();
if (!ws) throw new Error("No workspace available on this agent");
await ws.writeFile(path, content);
}
async _hostDeleteFile(path: string): Promise<boolean> {
const ws = this.getWorkspace();
if (!ws) throw new Error("No workspace available on this agent");
return ws.deleteFile(path);
}
async _hostListFiles(
dir: string
): Promise<
Array<{ name: string; type: string; size: number; path: string }>
> {
const ws = this.getWorkspace();
if (!ws) throw new Error("No workspace available on this agent");
return ws.readDir(dir);
}
/**
* Assemble the model messages from the current conversation history.
* Override to customize context assembly (e.g. inject memory,
* project context, or apply compaction).
*/
async assembleContext(): Promise<ModelMessage[]> {
return pruneMessages({
messages: await convertToModelMessages(this.messages),
toolCalls: "before-last-2-messages"
});
}
/**
* Handle a chat turn and return the streaming result.
*
* The default implementation runs the agentic loop:
* 1. Assemble context from `this.messages`
* 2. Call `streamText` with the model, system prompt, tools, and step limit
*
* Override for full control over inference (e.g. different models per turn,
* RAG pipelines, routing to specialized sub-agents, etc.).
*
* When this is called, `this.messages` already contains the user's
* latest message persisted to the current session.
*
* @returns A result with `toUIMessageStream()` — AI SDK's `streamText()`
* return value satisfies this interface.
*/
async onChatMessage(options?: ChatMessageOptions): Promise<StreamableResult> {
const baseTools = this.getTools();
const tools = options?.tools
? { ...baseTools, ...options.tools }
: baseTools;
return streamText({
model: this.getModel(),
system: this.getSystemPrompt(),
messages: await this.assembleContext(),
tools,
stopWhen: stepCountIs(this.getMaxSteps()),
abortSignal: options?.signal
});
}
/**
* Handle an error that occurred during a chat turn.
* Override to customize error handling (e.g. logging, metrics).
*
* @param error The error that occurred
* @returns The error (or a wrapped version) to propagate
*/
onChatError(error: unknown): unknown {
return error;
}
// ── Sub-agent RPC entry point ───────────────────────────────────
/**
* Run a chat turn: persist the user message, run the agentic loop,
* stream UIMessageChunk events via callback, and persist the
* assistant's response.
*
* On error or abort, the partial assistant message is still persisted
* so the user doesn't lose context.
*
* @param userMessage The user's message (string or UIMessage for multi-modal)
* @param callback Streaming callback (typically an RpcTarget from the parent)
* @param options Optional chat options (e.g. AbortSignal)
*/
async chat(
userMessage: string | UIMessage,
callback: StreamCallback,
options?: ChatOptions
): Promise<void> {
// Ensure a session exists
if (!this._sessionId) {
const session = this.sessions.create("default");
this._sessionId = session.id;
}
// Persist user message
const userMsg: UIMessage =
typeof userMessage === "string"
? {
id: crypto.randomUUID(),
role: "user",
parts: [{ type: "text", text: userMessage }]
}
: userMessage;
this.sessions.append(this._sessionId, userMsg);
this.messages = this.sessions.getHistory(this._sessionId);
// Build assistant message from stream chunks
const assistantMsg: UIMessage = {
id: crypto.randomUUID(),
role: "assistant",
parts: []
};
try {
// Run the agentic loop (or custom override)
const result = await this.onChatMessage({
signal: options?.signal,
tools: options?.tools
});
// Stream UIMessageChunk events via callback
let aborted = false;
for await (const chunk of result.toUIMessageStream()) {
if (options?.signal?.aborted) {
aborted = true;
break;
}
applyChunkToParts(
assistantMsg.parts,
chunk as unknown as StreamChunkData
);
await callback.onEvent(JSON.stringify(chunk));
}
// Persist assistant message (sanitized + size-enforced)
this._persistAssistantMessage(assistantMsg);
// Only signal completion if not aborted
if (!aborted) {
await callback.onDone();
}
} catch (error) {
// Persist partial assistant message so context isn't lost
if (assistantMsg.parts.length > 0) {
this._persistAssistantMessage(assistantMsg);
}
const wrapped = this.onChatError(error);
const errorMessage =
wrapped instanceof Error ? wrapped.message : String(wrapped);
if (callback.onError) {
await callback.onError(errorMessage);
} else {
// Re-throw if no error callback — caller must handle it
throw wrapped;
}
}
}
// ── Session management ─────────────────────────────────────────
getSessions(): Session[] {
return this.sessions.list();
}
createSession(name: string): Session {
const session = this.sessions.create(name);
this._sessionId = session.id;
this.messages = [];
this._broadcastMessages();
return session;
}
switchSession(sessionId: string): UIMessage[] {
const session = this.sessions.get(sessionId);
if (!session) {
throw new Error(`Session not found: ${sessionId}`);
}
this._sessionId = sessionId;
this.messages = this.sessions.getHistory(sessionId);
this._broadcastMessages();
return this.messages;
}
deleteSession(sessionId: string): void {
const session = this.sessions.get(sessionId);
if (!session) {
throw new Error(`Session not found: ${sessionId}`);
}
this.sessions.delete(sessionId);
if (this._sessionId === sessionId) {
this._sessionId = null;
this.messages = [];
this._broadcastMessages();
}
}
renameSession(sessionId: string, name: string): void {
const session = this.sessions.get(sessionId);
if (!session) {
throw new Error(`Session not found: ${sessionId}`);
}
this.sessions.rename(sessionId, name);
}
getCurrentSessionId(): string | null {
return this._sessionId;
}
// ── Message access ───────────────────────────────────────────────
/**
* Get the current session info, or null if no session exists yet.
*/
getSession(): Session | null {
if (!this._sessionId) return null;
return this.sessions.get(this._sessionId);
}
/**
* Get the conversation history as UIMessage[].
*/
getHistory(): UIMessage[] {
if (!this._sessionId) return [];
return this.sessions.getHistory(this._sessionId);
}
/**
* Get the total message count for this session.
*/
getMessageCount(): number {
if (!this._sessionId) return 0;
return this.sessions.getMessageCount(this._sessionId);
}
/**
* Clear all messages from this session (preserves the session itself).
*/
clearMessages(): void {
if (!this._sessionId) return;
this.sessions.clearMessages(this._sessionId);
this.messages = [];
this._persistedMessageCache.clear();
}
// ── WebSocket protocol ──────────────────────────────────────────
/**
* Wrap onMessage and onRequest to intercept the chat protocol.
* Unrecognized messages are forwarded to the user's handlers.
* @internal
*/
private _setupProtocolHandlers() {
const _onMessage = this.onMessage.bind(this);
this.onMessage = async (connection: Connection, message: WSMessage) => {
if (typeof message === "string") {
try {
const data = JSON.parse(message) as Record<string, unknown>;
if (await this._handleProtocol(connection, data)) return;
} catch {
// Not JSON — fall through to user handler
}
}
return _onMessage(connection, message);
};
const _onRequest = this.onRequest.bind(this);
this.onRequest = async (request: Request) => {
const url = new URL(request.url);
if (
url.pathname === "/get-messages" ||
url.pathname.endsWith("/get-messages")
) {
const sessionId = url.searchParams.get("sessionId");
if (sessionId) {
const session = this.sessions.get(sessionId);
if (!session) {
return Response.json(
{ error: "Session not found" },
{ status: 404 }
);
}
return Response.json(this.sessions.getHistory(sessionId));
}
return Response.json(this.messages);
}
return _onRequest(request);
};
}
/**
* Route an incoming WebSocket message to the appropriate handler.
* Returns true if the message was handled by the protocol.
* @internal
*/
private async _handleProtocol(
connection: Connection,
data: Record<string, unknown>
): Promise<boolean> {
const type = data.type as string;
if (type === MSG_CHAT_REQUEST) {
const init = data.init as { method?: string; body?: string } | undefined;
if (init?.method === "POST") {
await this._handleChatRequest(connection, data);
return true;
}
}
if (type === MSG_CHAT_CLEAR) {
this._handleClear();
return true;
}
if (type === MSG_CHAT_CANCEL) {
this._handleCancel(data.id as string);
return true;
}
return false;
}
/**
* Handle CF_AGENT_USE_CHAT_REQUEST:
* 1. Parse incoming messages
* 2. Ensure a session exists
* 3. Persist user messages to session
* 4. Call onChatMessage
* 5. Stream response back to clients
* 6. Persist assistant message to session
* @internal
*/
private async _handleChatRequest(
connection: Connection,
data: Record<string, unknown>
) {
const init = data.init as { body?: string };
if (!init?.body) return;
let parsed: { messages?: UIMessage[] };
try {
parsed = JSON.parse(init.body) as { messages?: UIMessage[] };
} catch {
return;
}
const incomingMessages = parsed.messages;
if (!Array.isArray(incomingMessages)) return;
// Ensure a session exists
if (!this._sessionId) {
const session = this.sessions.create("New Chat");
this._sessionId = session.id;
}
// Persist incoming messages to session (idempotent via INSERT OR IGNORE)
this.sessions.appendAll(this._sessionId, incomingMessages);
// Reload from session (authoritative)
this.messages = this.sessions.getHistory(this._sessionId);
// Broadcast updated messages to other connections
this._broadcastMessages([connection.id]);
// Set up abort controller
const requestId = data.id as string;
const abortController = new AbortController();
this._abortControllers.set(requestId, abortController);
try {
await this.keepAliveWhile(async () => {
const result = await agentContext.run(
{ agent: this, connection, request: undefined, email: undefined },
() =>
this.onChatMessage({
signal: abortController.signal
})
);
if (result) {
await this._streamResult(requestId, result, abortController.signal);
} else {
this._broadcast({
type: MSG_CHAT_RESPONSE,
id: requestId,
body: "No response was generated.",
done: true
});
}
});
} catch (error) {
this._broadcast({
type: MSG_CHAT_RESPONSE,
id: requestId,
body: error instanceof Error ? error.message : "Error",
done: true,
error: true
});
} finally {
this._abortControllers.delete(requestId);
}
}
/**
* Handle CF_AGENT_CHAT_CLEAR: abort streams, clear current session messages.
* @internal
*/
private _handleClear() {
for (const controller of this._abortControllers.values()) {
controller.abort();
}
this._abortControllers.clear();
if (this._sessionId) {
this.sessions.clearMessages(this._sessionId);
}
this.messages = [];
this._persistedMessageCache.clear();
this._clearGeneration++;
this._broadcast({ type: MSG_CHAT_CLEAR });
}
/**
* Handle CF_AGENT_CHAT_REQUEST_CANCEL: abort a specific request.
* @internal
*/
private _handleCancel(requestId: string) {
const controller = this._abortControllers.get(requestId);
if (controller) {
controller.abort();
}
}
/**
* Iterate a StreamableResult, broadcast chunks to clients,
* build a UIMessage, and persist it to the session.
* @internal
*/
private async _streamResult(
requestId: string,
result: StreamableResult,
abortSignal?: AbortSignal
) {
const clearGen = this._clearGeneration;
const message: UIMessage = {
id: crypto.randomUUID(),
role: "assistant",
parts: []
};
let doneSent = false;
try {
for await (const chunk of result.toUIMessageStream()) {
if (abortSignal?.aborted) break;
const data = chunk as StreamChunkData;
// Build UIMessage from stream events
const handled = applyChunkToParts(message.parts, data);
if (!handled) {
// Handle metadata events that applyChunkToParts doesn't cover
switch (data.type) {
case "start": {
if (data.messageId != null) {
message.id = data.messageId;
}
if (data.messageMetadata != null) {
message.metadata = message.metadata
? { ...message.metadata, ...data.messageMetadata }
: data.messageMetadata;
}
break;
}
case "finish":
case "message-metadata": {
if (data.messageMetadata != null) {
message.metadata = message.metadata
? { ...message.metadata, ...data.messageMetadata }
: data.messageMetadata;
}
break;
}
case "error": {
this._broadcast({
type: MSG_CHAT_RESPONSE,
id: requestId,
body: data.errorText ?? JSON.stringify(data),
done: false,
error: true
});
continue;
}
}
}
// Broadcast chunk to clients
this._broadcast({
type: MSG_CHAT_RESPONSE,
id: requestId,
body: JSON.stringify(chunk),
done: false
});
}
this._broadcast({
type: MSG_CHAT_RESPONSE,
id: requestId,
body: "",
done: true
});
doneSent = true;
} catch (error) {
if (!doneSent) {
this._broadcast({
type: MSG_CHAT_RESPONSE,
id: requestId,
body: error instanceof Error ? error.message : "Stream error",
done: true,
error: true
});
doneSent = true;
}
} finally {
if (!doneSent) {
this._broadcast({
type: MSG_CHAT_RESPONSE,
id: requestId,
body: "",
done: true
});
}
}
// Persist the assistant message to the session (sanitized + size-enforced).
// Skip if a clear happened during this stream (clearGeneration changed).
// Wrapped in try-catch: the stream done message was already sent above,
// so a persistence error must not propagate to the outer catch (which
// would broadcast a second done message).
if (
message.parts.length > 0 &&
this._sessionId &&
this._clearGeneration === clearGen
) {
try {
this._persistAssistantMessage(message);
this._broadcastMessages();
} catch (e) {
console.error("Failed to persist assistant message:", e);
}
}
}
// ── Persistence internals ────────────────────────────────────────
/**
* Persist an assistant message with sanitization, size enforcement,
* and incremental persistence.
* @internal
*/
private _persistAssistantMessage(msg: UIMessage): void {
if (!this._sessionId) return;
const sanitized = sanitizeMessage(msg);
const safe = enforceRowSizeLimit(sanitized);
const json = JSON.stringify(safe);
// Skip SQL write if unchanged (incremental persistence)
if (this._persistedMessageCache.get(safe.id) !== json) {
this.sessions.upsert(this._sessionId, safe);
this._persistedMessageCache.set(safe.id, json);
}
// Enforce storage bounds
if (this.maxPersistedMessages != null) {
this._enforceMaxPersistedMessages();
}
this.messages = this.sessions.getHistory(this._sessionId);
}
/**
* Rebuild the persistence cache from current messages.
* Called on startup to enable incremental persistence.
* @internal
*/
private _rebuildPersistenceCache(): void {
this._persistedMessageCache.clear();
for (const msg of this.messages) {
this._persistedMessageCache.set(msg.id, JSON.stringify(msg));
}
}
/**
* Delete oldest messages on the current branch when count exceeds
* maxPersistedMessages. Uses path-based count (not total across all
* branches) and individual deletes to preserve branch structure.
* @internal
*/
private _enforceMaxPersistedMessages(): void {
if (this.maxPersistedMessages == null || !this._sessionId) return;
// Use current branch history, not total message count across all branches
const history = this.sessions.getHistory(this._sessionId);
if (history.length <= this.maxPersistedMessages) return;
const excess = history.length - this.maxPersistedMessages;
const toRemove = history.slice(0, excess);
// Delete individual messages — preserves branch structure
this.sessions.deleteMessages(toRemove.map((m) => m.id));
for (const msg of toRemove) {
this._persistedMessageCache.delete(msg.id);
}
}
/**
* Broadcast a JSON message to all connected clients.
* @internal
*/
private _broadcast(message: Record<string, unknown>, exclude?: string[]) {
this.broadcast(JSON.stringify(message), exclude);
}
/**
* Broadcast the current message list to all connected clients.
* @internal
*/
private _broadcastMessages(exclude?: string[]) {
this._broadcast(
{ type: MSG_CHAT_MESSAGES, messages: this.messages },
exclude
);
}
}