Test PR #4

Closed
resumability main
opened 3 days ago by deathbyknowledge

foo bar 1234

Files changed

4 files changed +330 -153
A .changeset/polite-shirts-shop.md
@@ -0,0 +1,5 @@
1+---
2+"@cloudflare/ai-chat": patch
3+---
4+
5+Add queues to AIChatAgent to support simultaneous input messages
M packages/ai-chat/src/index.ts
@@ -216,12 +216,14 @@
216216 private _approvalPersistedMessageId: string | null = null;
217217
218218 /**
219- * Promise that resolves when the current stream completes.
220- * Used to wait for message persistence before continuing after tool results.
219+ * Queue for serializing chat requests so that only one
220+ * onChatMessage → _reply cycle is in-flight at a time.
221+ * Prevents races when multiple messages arrive before the first
222+ * response completes.
221223 * @internal
222224 */
223- private _streamCompletionPromise: Promise<void> | null = null;
224- private _streamCompletionResolve: (() => void) | null = null;
225+ private _chatRequestQueue: Array<() => Promise<void>> = [];
226+ private _chatRequestInFlight = false;
225227
226228 /**
227229 * Set of connection IDs that are pending stream resume.
@@ -417,51 +419,63 @@
417419 const chatMessageId = data.id;
418420 const abortSignal = this._getAbortSignal(chatMessageId);
419421
420- return this._tryCatchChat(async () => {
421- // Wrap in agentContext.run() to propagate connection context to onChatMessage
422- // This ensures getCurrentAgent() returns the connection inside tool execute functions
423- return agentContext.run(
424- { agent: this, connection, request: undefined, email: undefined },
425- async () => {
426- const response = await this.onChatMessage(
427- async (_finishResult) => {
428- // User-provided hook. Cleanup is now handled by _reply,
429- // so this is optional for the user to pass to streamText.
430- },
431- {
432- requestId: chatMessageId,
433- abortSignal,
434- clientTools,
435- body: this._lastBody
436- }
437- );
438-
439- if (response) {
440- await this._reply(data.id, response, [connection.id], {
441- chatMessageId
442- });
443- } else {
444- console.warn(
445- `[AIChatAgent] onChatMessage returned no response for chatMessageId: ${chatMessageId}`
446- );
447- this._broadcastChatMessage(
422+ // Enqueue the LLM call so concurrent messages are
423+ // processed one at a time. The handler returns
424+ // immediately; the queue drains via ctx.waitUntil.
425+ this._enqueueChatRequest(async () => {
426+ await this._tryCatchChat(async () => {
427+ // Wrap in agentContext.run() to propagate connection context to onChatMessage
428+ // This ensures getCurrentAgent() returns the connection inside tool execute functions
429+ return agentContext.run(
430+ {
431+ agent: this,
432+ connection,
433+ request: undefined,
434+ email: undefined
435+ },
436+ async () => {
437+ const response = await this.onChatMessage(
438+ async (_finishResult) => {
439+ // User-provided hook. Cleanup is now handled by _reply,
440+ // so this is optional for the user to pass to streamText.
441+ },
448442 {
449- body: "No response was generated by the agent.",
450- done: true,
451- id: data.id,
452- type: MessageType.CF_AGENT_USE_CHAT_RESPONSE
453- },
454- [connection.id]
443+ requestId: chatMessageId,
444+ abortSignal,
445+ clientTools,
446+ body: this._lastBody
447+ }
455448 );
449+
450+ if (response) {
451+ await this._reply(data.id, response, [connection.id], {
452+ chatMessageId
453+ });
454+ } else {
455+ console.warn(
456+ `[AIChatAgent] onChatMessage returned no response for chatMessageId: ${chatMessageId}`
457+ );
458+ this._broadcastChatMessage(
459+ {
460+ body: "No response was generated by the agent.",
461+ done: true,
462+ id: data.id,
463+ type: MessageType.CF_AGENT_USE_CHAT_RESPONSE
464+ },
465+ [connection.id]
466+ );
467+ }
456468 }
457- }
458- );
469+ );
470+ });
459471 });
472+ return;
460473 }
461474
462475 // Handle clear chat
463476 if (data.type === MessageType.CF_AGENT_CHAT_CLEAR) {
464477 this._destroyAbortControllers();
478+ this._chatRequestQueue.length = 0;
465479 this.sql`delete from cf_ai_chat_agent_messages`;
466480 this._resumableStream.clearAll();
467481 this._pendingResumeConnections.clear();
@@ -558,72 +572,54 @@
558572 // This mimics server-executed tool behavior where the LLM
559573 // automatically continues after seeing tool results
560574 if (applied && autoContinue) {
561- // Wait for the original stream to complete and message to be persisted
562- // before calling onChatMessage, so this.messages includes the tool result
563- const waitForStream = async () => {
564- if (this._streamCompletionPromise) {
565- await this._streamCompletionPromise;
566- } else {
567- // TODO: The completion promise can be null if the stream finished
568- // before the tool result arrived (race between stream end and tool
569- // apply). The 500ms fallback is a pragmatic workaround — consider
570- // a more deterministic signal (e.g. always setting the promise).
571- await new Promise((resolve) => setTimeout(resolve, 500));
572- }
573- };
575+ // Enqueue the continuation — the queue guarantees the
576+ // current stream finishes (and messages are persisted)
577+ // before this callback runs.
578+ this._enqueueChatRequest(async () => {
579+ const continuationId = nanoid();
580+ const abortSignal = this._getAbortSignal(continuationId);
574581
575- waitForStream()
576- .then(() => {
577- const continuationId = nanoid();
578- const abortSignal = this._getAbortSignal(continuationId);
582+ await this._tryCatchChat(async () => {
583+ return agentContext.run(
584+ {
585+ agent: this,
586+ connection,
587+ request: undefined,
588+ email: undefined
589+ },
590+ async () => {
591+ const response = await this.onChatMessage(
592+ async (_finishResult) => {
593+ // User-provided hook. Cleanup handled by _reply.
594+ },
595+ {
596+ requestId: continuationId,
597+ abortSignal,
598+ clientTools: clientTools ?? this._lastClientTools,
599+ body: this._lastBody
600+ }
601+ );
579602
580- return this._tryCatchChat(async () => {
581- return agentContext.run(
582- {
583- agent: this,
584- connection,
585- request: undefined,
586- email: undefined
587- },
588- async () => {
589- const response = await this.onChatMessage(
590- async (_finishResult) => {
591- // User-provided hook. Cleanup handled by _reply.
592- },
603+ if (response) {
604+ // Pass continuation flag to merge parts into last assistant message
605+ // Note: We pass an empty excludeBroadcastIds array because the sender
606+ // NEEDS to receive the continuation stream. Unlike regular chat requests
607+ // where aiFetch handles the response, tool continuations have no listener
608+ // waiting - the client relies on the broadcast.
609+ await this._reply(
610+ continuationId,
611+ response,
612+ [], // Don't exclude sender - they need the continuation
593613 {
594- requestId: continuationId,
595- abortSignal,
596- clientTools: clientTools ?? this._lastClientTools,
597- body: this._lastBody
614+ continuation: true,
615+ chatMessageId: continuationId
598616 }
599617 );
600-
601- if (response) {
602- // Pass continuation flag to merge parts into last assistant message
603- // Note: We pass an empty excludeBroadcastIds array because the sender
604- // NEEDS to receive the continuation stream. Unlike regular chat requests
605- // where aiFetch handles the response, tool continuations have no listener
606- // waiting - the client relies on the broadcast.
607- await this._reply(
608- continuationId,
609- response,
610- [], // Don't exclude sender - they need the continuation
611- {
612- continuation: true,
613- chatMessageId: continuationId
614- }
615- );
616- }
617618 }
618- );
619- });
620- })
621- .catch((error) => {
622- console.error(
623- "[AIChatAgent] Tool continuation failed:",
624- error
619+ }
625620 );
626621 });
622+ });
627623 }
628624 });
629625 return;
@@ -636,54 +632,39 @@
636632 // Auto-continue for both approvals and rejections so the LLM
637633 // sees the tool_result and can respond accordingly.
638634 if (applied && autoContinue) {
639- const waitForStream = async () => {
640- if (this._streamCompletionPromise) {
641- await this._streamCompletionPromise;
642- } else {
643- await new Promise((resolve) => setTimeout(resolve, 500));
644- }
645- };
646-
647- waitForStream()
648- .then(() => {
649- const continuationId = nanoid();
650- const abortSignal = this._getAbortSignal(continuationId);
651-
652- return this._tryCatchChat(async () => {
653- return agentContext.run(
654- {
655- agent: this,
656- connection,
657- request: undefined,
658- email: undefined
659- },
660- async () => {
661- const response = await this.onChatMessage(
662- async (_finishResult) => {},
663- {
664- requestId: continuationId,
665- abortSignal,
666- clientTools: this._lastClientTools,
667- body: this._lastBody
668- }
669- );
635+ this._enqueueChatRequest(async () => {
636+ const continuationId = nanoid();
637+ const abortSignal = this._getAbortSignal(continuationId);
670638
671- if (response) {
672- await this._reply(continuationId, response, [], {
673- continuation: true,
674- chatMessageId: continuationId
675- });
639+ await this._tryCatchChat(async () => {
640+ return agentContext.run(
641+ {
642+ agent: this,
643+ connection,
644+ request: undefined,
645+ email: undefined
646+ },
647+ async () => {
648+ const response = await this.onChatMessage(
649+ async (_finishResult) => {},
650+ {
651+ requestId: continuationId,
652+ abortSignal,
653+ clientTools: this._lastClientTools,
654+ body: this._lastBody
676655 }
656+ );
657+
658+ if (response) {
659+ await this._reply(continuationId, response, [], {
660+ continuation: true,
661+ chatMessageId: continuationId
662+ });
677663 }
678- );
679- });
680- })
681- .catch((error) => {
682- console.error(
683- "[AIChatAgent] Tool approval continuation failed:",
684- error
664+ }
685665 );
686666 });
667+ });
687668 }
688669 });
689670 return;
@@ -831,6 +812,39 @@
831812 }
832813
833814 /**
815+ * Enqueue a chat request for serialized execution.
816+ * If nothing is in-flight the request runs immediately;
817+ * otherwise it waits until every earlier request's
818+ * onChatMessage → _reply cycle has completed.
819+ * @internal
820+ */
821+ private _enqueueChatRequest(execute: () => Promise<void>) {
822+ this._chatRequestQueue.push(execute);
823+ if (!this._chatRequestInFlight) {
824+ this._drainChatRequestQueue();
825+ }
826+ }
827+
828+ /**
829+ * Process queued chat requests one at a time.
830+ * Errors in individual requests are logged but do not
831+ * prevent subsequent requests from executing.
832+ * @internal
833+ */
834+ private async _drainChatRequestQueue() {
835+ this._chatRequestInFlight = true;
836+ while (this._chatRequestQueue.length > 0) {
837+ const next = this._chatRequestQueue.shift()!;
838+ try {
839+ await next();
840+ } catch (error) {
841+ console.error("[AIChatAgent] Queued chat request failed:", error);
842+ }
843+ }
844+ this._chatRequestInFlight = false;
845+ }
846+
847+ /**
834848 * Restore _lastBody and _lastClientTools from SQLite.
835849 * Called in the constructor so these values survive DO hibernation.
836850 * @internal
@@ -2147,10 +2161,6 @@
21472161 };
21482162 // Track the streaming message so tool results can be applied before persistence
21492163 this._streamingMessage = message;
2150- // Set up completion promise for tool continuation to wait on
2151- this._streamCompletionPromise = new Promise((resolve) => {
2152- this._streamCompletionResolve = resolve;
2153- });
21542164
21552165 // Determine response format based on content-type
21562166 const contentType = response.headers.get("content-type") || "";
@@ -2202,19 +2212,12 @@
22022212 } finally {
22032213 reader.releaseLock();
22042214
2205- // Always clear the streaming message reference and resolve completion
2206- // promise, even on error. Without this, tool continuations waiting on
2207- // _streamCompletionPromise would hang forever after a stream error.
2215+ // Always clear the streaming message reference, even on error.
22082216 this._streamingMessage = null;
22092217 // Capture and clear early-persist tracking. The persistence block
22102218 // after the finally uses the local to update in place.
22112219 earlyPersistedId = this._approvalPersistedMessageId;
22122220 this._approvalPersistedMessageId = null;
2213- if (this._streamCompletionResolve) {
2214- this._streamCompletionResolve();
2215- this._streamCompletionResolve = null;
2216- this._streamCompletionPromise = null;
2217- }
22182221
22192222 // Framework-level cleanup: always remove abort controller.
22202223 // Only emit observability on success (not on error path).
@@ -2335,6 +2338,7 @@
23352338 */
23362339 async destroy() {
23372340 this._destroyAbortControllers();
2341+ this._chatRequestQueue.length = 0;
23382342 this._resumableStream.destroy();
23392343 await super.destroy();
23402344 }
A packages/ai-chat/src/tests/concurrent-messages.test.ts
@@ -0,0 +1,163 @@
1+import { describe, it, expect } from "vitest";
2+import type { UIMessage as ChatMessage } from "ai";
3+import { MessageType } from "../types";
4+import { connectChatWS, isUseChatResponseMessage } from "./test-utils";
5+
6+describe("Concurrent Message Handling", () => {
7+ it("processes multiple simultaneous messages in order without interleaving", async () => {
8+ const room = crypto.randomUUID();
9+ const { ws } = await connectChatWS(`/agents/test-chat-agent/${room}`);
10+
11+ const receivedResponses: Array<{
12+ id: string;
13+ body: string;
14+ done: boolean;
15+ timestamp: number;
16+ }> = [];
17+
18+ // Track when all 3 responses are complete
19+ let completedCount = 0;
20+ let resolveAll: () => void;
21+ const allDone = new Promise<void>((res) => {
22+ resolveAll = res;
23+ });
24+
25+ const timeout = setTimeout(() => {
26+ throw new Error(
27+ `Timeout: only received ${completedCount}/3 complete responses`
28+ );
29+ }, 10000);
30+
31+ ws.addEventListener("message", (e: MessageEvent) => {
32+ const data = JSON.parse(e.data as string);
33+ if (isUseChatResponseMessage(data)) {
34+ receivedResponses.push({
35+ id: data.id,
36+ body: data.body,
37+ done: data.done,
38+ timestamp: Date.now()
39+ });
40+ if (data.done) {
41+ completedCount++;
42+ if (completedCount === 3) {
43+ clearTimeout(timeout);
44+ resolveAll();
45+ }
46+ }
47+ }
48+ });
49+
50+ // Send 3 messages as fast as possible (simulating rapid user input)
51+ const messages = ["first", "second", "third"];
52+ for (const text of messages) {
53+ const userMessage: ChatMessage = {
54+ id: `msg-${text}`,
55+ role: "user",
56+ parts: [{ type: "text", text }]
57+ };
58+
59+ ws.send(
60+ JSON.stringify({
61+ type: MessageType.CF_AGENT_USE_CHAT_REQUEST,
62+ id: `req-${text}`,
63+ init: {
64+ method: "POST",
65+ body: JSON.stringify({ messages: [userMessage] })
66+ }
67+ })
68+ );
69+ }
70+
71+ // Wait for all responses to complete
72+ await allDone;
73+
74+ // Verify we got exactly 3 complete responses
75+ const doneResponses = receivedResponses.filter((r) => r.done);
76+ expect(doneResponses).toHaveLength(3);
77+
78+ // Verify responses completed in order (first, second, third)
79+ // This proves the queue serialized the requests — if they ran concurrently,
80+ // order would be non-deterministic
81+ const doneIds = doneResponses.map((r) => r.id);
82+ expect(doneIds).toEqual(["req-first", "req-second", "req-third"]);
83+
84+ ws.close();
85+ });
86+
87+ it("queued messages are cleared when chat history is cleared", async () => {
88+ const room = crypto.randomUUID();
89+ const { ws } = await connectChatWS(`/agents/test-chat-agent/${room}`);
90+
91+ const receivedResponses: Array<{ id: string; done: boolean }> = [];
92+
93+ ws.addEventListener("message", (e: MessageEvent) => {
94+ const data = JSON.parse(e.data as string);
95+ if (isUseChatResponseMessage(data)) {
96+ receivedResponses.push({ id: data.id, done: data.done });
97+ }
98+ });
99+
100+ // Send first message
101+ const firstMessage: ChatMessage = {
102+ id: "msg-1",
103+ role: "user",
104+ parts: [{ type: "text", text: "first" }]
105+ };
106+ ws.send(
107+ JSON.stringify({
108+ type: MessageType.CF_AGENT_USE_CHAT_REQUEST,
109+ id: "req-1",
110+ init: {
111+ method: "POST",
112+ body: JSON.stringify({ messages: [firstMessage] })
113+ }
114+ })
115+ );
116+
117+ // Immediately send clear command
118+ ws.send(
119+ JSON.stringify({
120+ type: MessageType.CF_AGENT_CHAT_CLEAR
121+ })
122+ );
123+
124+ // Send second message after clear
125+ const secondMessage: ChatMessage = {
126+ id: "msg-2",
127+ role: "user",
128+ parts: [{ type: "text", text: "second" }]
129+ };
130+ ws.send(
131+ JSON.stringify({
132+ type: MessageType.CF_AGENT_USE_CHAT_REQUEST,
133+ id: "req-2",
134+ init: {
135+ method: "POST",
136+ body: JSON.stringify({ messages: [secondMessage] })
137+ }
138+ })
139+ );
140+
141+ // Wait for second response to complete
142+ await new Promise<void>((resolve) => {
143+ const checkInterval = setInterval(() => {
144+ const doneResponses = receivedResponses.filter((r) => r.done);
145+ // We expect at least one done response (the second one)
146+ if (doneResponses.some((r) => r.id === "req-2")) {
147+ clearInterval(checkInterval);
148+ resolve();
149+ }
150+ }, 50);
151+ setTimeout(() => {
152+ clearInterval(checkInterval);
153+ resolve();
154+ }, 5000);
155+ });
156+
157+ // The second message should have completed
158+ const doneResponses = receivedResponses.filter((r) => r.done);
159+ expect(doneResponses.some((r) => r.id === "req-2")).toBe(true);
160+
161+ ws.close();
162+ });
163+});
M packages/ai-chat/src/tests/worker.ts
@@ -64,6 +64,11 @@
6464 // It's a nested async function called from within onChatMessage
6565 await this._simulateToolExecute();
6666
67+ // Random delay to simulate variable LLM response times.
68+ // This proves queue serialization works — without it, concurrent
69+ // messages with random delays would complete in random order.
70+ await new Promise((r) => setTimeout(r, Math.random() * 50));
71+
6772 // Simple echo response for testing
6873 return new Response("Hello from chat agent!", {
6974 headers: { "Content-Type": "text/plain" }
deathbyknowledge 3 days ago

wut