Test PR #4
foo bar 1234
Files changed
4 files changed
+330
-153
A .changeset/polite-shirts-shop.md
@@ -0,0 +1,5 @@
| 1 | +--- | |
| 2 | +"@cloudflare/ai-chat": patch | |
| 3 | +--- | |
| 4 | + | |
| 5 | +Add queues to AIChatAgent to support simultaneous input messages |
M packages/ai-chat/src/index.ts
@@ -216,12 +216,14 @@
| 216 | 216 | private _approvalPersistedMessageId: string | null = null; |
| 217 | 217 | |
| 218 | 218 | /** |
| 219 | - * Promise that resolves when the current stream completes. | |
| 220 | - * Used to wait for message persistence before continuing after tool results. | |
| 219 | + * Queue for serializing chat requests so that only one | |
| 220 | + * onChatMessage → _reply cycle is in-flight at a time. | |
| 221 | + * Prevents races when multiple messages arrive before the first | |
| 222 | + * response completes. | |
| 221 | 223 | * @internal |
| 222 | 224 | */ |
| 223 | - private _streamCompletionPromise: Promise<void> | null = null; | |
| 224 | - private _streamCompletionResolve: (() => void) | null = null; | |
| 225 | + private _chatRequestQueue: Array<() => Promise<void>> = []; | |
| 226 | + private _chatRequestInFlight = false; | |
| 225 | 227 | |
| 226 | 228 | /** |
| 227 | 229 | * Set of connection IDs that are pending stream resume. |
@@ -417,51 +419,63 @@
| 417 | 419 | const chatMessageId = data.id; |
| 418 | 420 | const abortSignal = this._getAbortSignal(chatMessageId); |
| 419 | 421 | |
| 420 | - return this._tryCatchChat(async () => { | |
| 421 | - // Wrap in agentContext.run() to propagate connection context to onChatMessage | |
| 422 | - // This ensures getCurrentAgent() returns the connection inside tool execute functions | |
| 423 | - return agentContext.run( | |
| 424 | - { agent: this, connection, request: undefined, email: undefined }, | |
| 425 | - async () => { | |
| 426 | - const response = await this.onChatMessage( | |
| 427 | - async (_finishResult) => { | |
| 428 | - // User-provided hook. Cleanup is now handled by _reply, | |
| 429 | - // so this is optional for the user to pass to streamText. | |
| 430 | - }, | |
| 431 | - { | |
| 432 | - requestId: chatMessageId, | |
| 433 | - abortSignal, | |
| 434 | - clientTools, | |
| 435 | - body: this._lastBody | |
| 436 | - } | |
| 437 | - ); | |
| 438 | - | |
| 439 | - if (response) { | |
| 440 | - await this._reply(data.id, response, [connection.id], { | |
| 441 | - chatMessageId | |
| 442 | - }); | |
| 443 | - } else { | |
| 444 | - console.warn( | |
| 445 | - `[AIChatAgent] onChatMessage returned no response for chatMessageId: ${chatMessageId}` | |
| 446 | - ); | |
| 447 | - this._broadcastChatMessage( | |
| 422 | + // Enqueue the LLM call so concurrent messages are | |
| 423 | + // processed one at a time. The handler returns | |
| 424 | + // immediately; the queue drains via ctx.waitUntil. | |
| 425 | + this._enqueueChatRequest(async () => { | |
| 426 | + await this._tryCatchChat(async () => { | |
| 427 | + // Wrap in agentContext.run() to propagate connection context to onChatMessage | |
| 428 | + // This ensures getCurrentAgent() returns the connection inside tool execute functions | |
| 429 | + return agentContext.run( | |
| 430 | + { | |
| 431 | + agent: this, | |
| 432 | + connection, | |
| 433 | + request: undefined, | |
| 434 | + email: undefined | |
| 435 | + }, | |
| 436 | + async () => { | |
| 437 | + const response = await this.onChatMessage( | |
| 438 | + async (_finishResult) => { | |
| 439 | + // User-provided hook. Cleanup is now handled by _reply, | |
| 440 | + // so this is optional for the user to pass to streamText. | |
| 441 | + }, | |
| 448 | 442 | { |
| 449 | - body: "No response was generated by the agent.", | |
| 450 | - done: true, | |
| 451 | - id: data.id, | |
| 452 | - type: MessageType.CF_AGENT_USE_CHAT_RESPONSE | |
| 453 | - }, | |
| 454 | - [connection.id] | |
| 443 | + requestId: chatMessageId, | |
| 444 | + abortSignal, | |
| 445 | + clientTools, | |
| 446 | + body: this._lastBody | |
| 447 | + } | |
| 455 | 448 | ); |
| 449 | + | |
| 450 | + if (response) { | |
| 451 | + await this._reply(data.id, response, [connection.id], { | |
| 452 | + chatMessageId | |
| 453 | + }); | |
| 454 | + } else { | |
| 455 | + console.warn( | |
| 456 | + `[AIChatAgent] onChatMessage returned no response for chatMessageId: ${chatMessageId}` | |
| 457 | + ); | |
| 458 | + this._broadcastChatMessage( | |
| 459 | + { | |
| 460 | + body: "No response was generated by the agent.", | |
| 461 | + done: true, | |
| 462 | + id: data.id, | |
| 463 | + type: MessageType.CF_AGENT_USE_CHAT_RESPONSE | |
| 464 | + }, | |
| 465 | + [connection.id] | |
| 466 | + ); | |
| 467 | + } | |
| 456 | 468 | } |
| 457 | - } | |
| 458 | - ); | |
| 469 | + ); | |
| 470 | + }); | |
| 459 | 471 | }); |
| 472 | + return; | |
| 460 | 473 | } |
| 461 | 474 | |
| 462 | 475 | // Handle clear chat |
| 463 | 476 | if (data.type === MessageType.CF_AGENT_CHAT_CLEAR) { |
| 464 | 477 | this._destroyAbortControllers(); |
| 478 | + this._chatRequestQueue.length = 0; | |
| 465 | 479 | this.sql`delete from cf_ai_chat_agent_messages`; |
| 466 | 480 | this._resumableStream.clearAll(); |
| 467 | 481 | this._pendingResumeConnections.clear(); |
@@ -558,72 +572,54 @@
| 558 | 572 | // This mimics server-executed tool behavior where the LLM |
| 559 | 573 | // automatically continues after seeing tool results |
| 560 | 574 | if (applied && autoContinue) { |
| 561 | - // Wait for the original stream to complete and message to be persisted | |
| 562 | - // before calling onChatMessage, so this.messages includes the tool result | |
| 563 | - const waitForStream = async () => { | |
| 564 | - if (this._streamCompletionPromise) { | |
| 565 | - await this._streamCompletionPromise; | |
| 566 | - } else { | |
| 567 | - // TODO: The completion promise can be null if the stream finished | |
| 568 | - // before the tool result arrived (race between stream end and tool | |
| 569 | - // apply). The 500ms fallback is a pragmatic workaround — consider | |
| 570 | - // a more deterministic signal (e.g. always setting the promise). | |
| 571 | - await new Promise((resolve) => setTimeout(resolve, 500)); | |
| 572 | - } | |
| 573 | - }; | |
| 575 | + // Enqueue the continuation — the queue guarantees the | |
| 576 | + // current stream finishes (and messages are persisted) | |
| 577 | + // before this callback runs. | |
| 578 | + this._enqueueChatRequest(async () => { | |
| 579 | + const continuationId = nanoid(); | |
| 580 | + const abortSignal = this._getAbortSignal(continuationId); | |
| 574 | 581 | |
| 575 | - waitForStream() | |
| 576 | - .then(() => { | |
| 577 | - const continuationId = nanoid(); | |
| 578 | - const abortSignal = this._getAbortSignal(continuationId); | |
| 582 | + await this._tryCatchChat(async () => { | |
| 583 | + return agentContext.run( | |
| 584 | + { | |
| 585 | + agent: this, | |
| 586 | + connection, | |
| 587 | + request: undefined, | |
| 588 | + email: undefined | |
| 589 | + }, | |
| 590 | + async () => { | |
| 591 | + const response = await this.onChatMessage( | |
| 592 | + async (_finishResult) => { | |
| 593 | + // User-provided hook. Cleanup handled by _reply. | |
| 594 | + }, | |
| 595 | + { | |
| 596 | + requestId: continuationId, | |
| 597 | + abortSignal, | |
| 598 | + clientTools: clientTools ?? this._lastClientTools, | |
| 599 | + body: this._lastBody | |
| 600 | + } | |
| 601 | + ); | |
| 579 | 602 | |
| 580 | - return this._tryCatchChat(async () => { | |
| 581 | - return agentContext.run( | |
| 582 | - { | |
| 583 | - agent: this, | |
| 584 | - connection, | |
| 585 | - request: undefined, | |
| 586 | - email: undefined | |
| 587 | - }, | |
| 588 | - async () => { | |
| 589 | - const response = await this.onChatMessage( | |
| 590 | - async (_finishResult) => { | |
| 591 | - // User-provided hook. Cleanup handled by _reply. | |
| 592 | - }, | |
| 603 | + if (response) { | |
| 604 | + // Pass continuation flag to merge parts into last assistant message | |
| 605 | + // Note: We pass an empty excludeBroadcastIds array because the sender | |
| 606 | + // NEEDS to receive the continuation stream. Unlike regular chat requests | |
| 607 | + // where aiFetch handles the response, tool continuations have no listener | |
| 608 | + // waiting - the client relies on the broadcast. | |
| 609 | + await this._reply( | |
| 610 | + continuationId, | |
| 611 | + response, | |
| 612 | + [], // Don't exclude sender - they need the continuation | |
| 593 | 613 | { |
| 594 | - requestId: continuationId, | |
| 595 | - abortSignal, | |
| 596 | - clientTools: clientTools ?? this._lastClientTools, | |
| 597 | - body: this._lastBody | |
| 614 | + continuation: true, | |
| 615 | + chatMessageId: continuationId | |
| 598 | 616 | } |
| 599 | 617 | ); |
| 600 | - | |
| 601 | - if (response) { | |
| 602 | - // Pass continuation flag to merge parts into last assistant message | |
| 603 | - // Note: We pass an empty excludeBroadcastIds array because the sender | |
| 604 | - // NEEDS to receive the continuation stream. Unlike regular chat requests | |
| 605 | - // where aiFetch handles the response, tool continuations have no listener | |
| 606 | - // waiting - the client relies on the broadcast. | |
| 607 | - await this._reply( | |
| 608 | - continuationId, | |
| 609 | - response, | |
| 610 | - [], // Don't exclude sender - they need the continuation | |
| 611 | - { | |
| 612 | - continuation: true, | |
| 613 | - chatMessageId: continuationId | |
| 614 | - } | |
| 615 | - ); | |
| 616 | - } | |
| 617 | 618 | } |
| 618 | - ); | |
| 619 | - }); | |
| 620 | - }) | |
| 621 | - .catch((error) => { | |
| 622 | - console.error( | |
| 623 | - "[AIChatAgent] Tool continuation failed:", | |
| 624 | - error | |
| 619 | + } | |
| 625 | 620 | ); |
| 626 | 621 | }); |
| 622 | + }); | |
| 627 | 623 | } |
| 628 | 624 | }); |
| 629 | 625 | return; |
@@ -636,54 +632,39 @@
| 636 | 632 | // Auto-continue for both approvals and rejections so the LLM |
| 637 | 633 | // sees the tool_result and can respond accordingly. |
| 638 | 634 | if (applied && autoContinue) { |
| 639 | - const waitForStream = async () => { | |
| 640 | - if (this._streamCompletionPromise) { | |
| 641 | - await this._streamCompletionPromise; | |
| 642 | - } else { | |
| 643 | - await new Promise((resolve) => setTimeout(resolve, 500)); | |
| 644 | - } | |
| 645 | - }; | |
| 646 | - | |
| 647 | - waitForStream() | |
| 648 | - .then(() => { | |
| 649 | - const continuationId = nanoid(); | |
| 650 | - const abortSignal = this._getAbortSignal(continuationId); | |
| 651 | - | |
| 652 | - return this._tryCatchChat(async () => { | |
| 653 | - return agentContext.run( | |
| 654 | - { | |
| 655 | - agent: this, | |
| 656 | - connection, | |
| 657 | - request: undefined, | |
| 658 | - email: undefined | |
| 659 | - }, | |
| 660 | - async () => { | |
| 661 | - const response = await this.onChatMessage( | |
| 662 | - async (_finishResult) => {}, | |
| 663 | - { | |
| 664 | - requestId: continuationId, | |
| 665 | - abortSignal, | |
| 666 | - clientTools: this._lastClientTools, | |
| 667 | - body: this._lastBody | |
| 668 | - } | |
| 669 | - ); | |
| 635 | + this._enqueueChatRequest(async () => { | |
| 636 | + const continuationId = nanoid(); | |
| 637 | + const abortSignal = this._getAbortSignal(continuationId); | |
| 670 | 638 | |
| 671 | - if (response) { | |
| 672 | - await this._reply(continuationId, response, [], { | |
| 673 | - continuation: true, | |
| 674 | - chatMessageId: continuationId | |
| 675 | - }); | |
| 639 | + await this._tryCatchChat(async () => { | |
| 640 | + return agentContext.run( | |
| 641 | + { | |
| 642 | + agent: this, | |
| 643 | + connection, | |
| 644 | + request: undefined, | |
| 645 | + email: undefined | |
| 646 | + }, | |
| 647 | + async () => { | |
| 648 | + const response = await this.onChatMessage( | |
| 649 | + async (_finishResult) => {}, | |
| 650 | + { | |
| 651 | + requestId: continuationId, | |
| 652 | + abortSignal, | |
| 653 | + clientTools: this._lastClientTools, | |
| 654 | + body: this._lastBody | |
| 676 | 655 | } |
| 656 | + ); | |
| 657 | + | |
| 658 | + if (response) { | |
| 659 | + await this._reply(continuationId, response, [], { | |
| 660 | + continuation: true, | |
| 661 | + chatMessageId: continuationId | |
| 662 | + }); | |
| 677 | 663 | } |
| 678 | - ); | |
| 679 | - }); | |
| 680 | - }) | |
| 681 | - .catch((error) => { | |
| 682 | - console.error( | |
| 683 | - "[AIChatAgent] Tool approval continuation failed:", | |
| 684 | - error | |
| 664 | + } | |
| 685 | 665 | ); |
| 686 | 666 | }); |
| 667 | + }); | |
| 687 | 668 | } |
| 688 | 669 | }); |
| 689 | 670 | return; |
@@ -831,6 +812,39 @@
| 831 | 812 | } |
| 832 | 813 | |
| 833 | 814 | /** |
| 815 | + * Enqueue a chat request for serialized execution. | |
| 816 | + * If nothing is in-flight the request runs immediately; | |
| 817 | + * otherwise it waits until every earlier request's | |
| 818 | + * onChatMessage → _reply cycle has completed. | |
| 819 | + * @internal | |
| 820 | + */ | |
| 821 | + private _enqueueChatRequest(execute: () => Promise<void>) { | |
| 822 | + this._chatRequestQueue.push(execute); | |
| 823 | + if (!this._chatRequestInFlight) { | |
| 824 | + this._drainChatRequestQueue(); | |
| 825 | + } | |
| 826 | + } | |
| 827 | + | |
| 828 | + /** | |
| 829 | + * Process queued chat requests one at a time. | |
| 830 | + * Errors in individual requests are logged but do not | |
| 831 | + * prevent subsequent requests from executing. | |
| 832 | + * @internal | |
| 833 | + */ | |
| 834 | + private async _drainChatRequestQueue() { | |
| 835 | + this._chatRequestInFlight = true; | |
| 836 | + while (this._chatRequestQueue.length > 0) { | |
| 837 | + const next = this._chatRequestQueue.shift()!; | |
| 838 | + try { | |
| 839 | + await next(); | |
| 840 | + } catch (error) { | |
| 841 | + console.error("[AIChatAgent] Queued chat request failed:", error); | |
| 842 | + } | |
| 843 | + } | |
| 844 | + this._chatRequestInFlight = false; | |
| 845 | + } | |
| 846 | + | |
| 847 | + /** | |
| 834 | 848 | * Restore _lastBody and _lastClientTools from SQLite. |
| 835 | 849 | * Called in the constructor so these values survive DO hibernation. |
| 836 | 850 | * @internal |
@@ -2147,10 +2161,6 @@
| 2147 | 2161 | }; |
| 2148 | 2162 | // Track the streaming message so tool results can be applied before persistence |
| 2149 | 2163 | this._streamingMessage = message; |
| 2150 | - // Set up completion promise for tool continuation to wait on | |
| 2151 | - this._streamCompletionPromise = new Promise((resolve) => { | |
| 2152 | - this._streamCompletionResolve = resolve; | |
| 2153 | - }); | |
| 2154 | 2164 | |
| 2155 | 2165 | // Determine response format based on content-type |
| 2156 | 2166 | const contentType = response.headers.get("content-type") || ""; |
@@ -2202,19 +2212,12 @@
| 2202 | 2212 | } finally { |
| 2203 | 2213 | reader.releaseLock(); |
| 2204 | 2214 | |
| 2205 | - // Always clear the streaming message reference and resolve completion | |
| 2206 | - // promise, even on error. Without this, tool continuations waiting on | |
| 2207 | - // _streamCompletionPromise would hang forever after a stream error. | |
| 2215 | + // Always clear the streaming message reference, even on error. | |
| 2208 | 2216 | this._streamingMessage = null; |
| 2209 | 2217 | // Capture and clear early-persist tracking. The persistence block |
| 2210 | 2218 | // after the finally uses the local to update in place. |
| 2211 | 2219 | earlyPersistedId = this._approvalPersistedMessageId; |
| 2212 | 2220 | this._approvalPersistedMessageId = null; |
| 2213 | - if (this._streamCompletionResolve) { | |
| 2214 | - this._streamCompletionResolve(); | |
| 2215 | - this._streamCompletionResolve = null; | |
| 2216 | - this._streamCompletionPromise = null; | |
| 2217 | - } | |
| 2218 | 2221 | |
| 2219 | 2222 | // Framework-level cleanup: always remove abort controller. |
| 2220 | 2223 | // Only emit observability on success (not on error path). |
@@ -2335,6 +2338,7 @@
| 2335 | 2338 | */ |
| 2336 | 2339 | async destroy() { |
| 2337 | 2340 | this._destroyAbortControllers(); |
| 2341 | + this._chatRequestQueue.length = 0; | |
| 2338 | 2342 | this._resumableStream.destroy(); |
| 2339 | 2343 | await super.destroy(); |
| 2340 | 2344 | } |
A packages/ai-chat/src/tests/concurrent-messages.test.ts
@@ -0,0 +1,163 @@
| 1 | +import { describe, it, expect } from "vitest"; | |
| 2 | +import type { UIMessage as ChatMessage } from "ai"; | |
| 3 | +import { MessageType } from "../types"; | |
| 4 | +import { connectChatWS, isUseChatResponseMessage } from "./test-utils"; | |
| 5 | + | |
| 6 | +describe("Concurrent Message Handling", () => { | |
| 7 | + it("processes multiple simultaneous messages in order without interleaving", async () => { | |
| 8 | + const room = crypto.randomUUID(); | |
| 9 | + const { ws } = await connectChatWS(`/agents/test-chat-agent/${room}`); | |
| 10 | + | |
| 11 | + const receivedResponses: Array<{ | |
| 12 | + id: string; | |
| 13 | + body: string; | |
| 14 | + done: boolean; | |
| 15 | + timestamp: number; | |
| 16 | + }> = []; | |
| 17 | + | |
| 18 | + // Track when all 3 responses are complete | |
| 19 | + let completedCount = 0; | |
| 20 | + let resolveAll: () => void; | |
| 21 | + const allDone = new Promise<void>((res) => { | |
| 22 | + resolveAll = res; | |
| 23 | + }); | |
| 24 | + | |
| 25 | + const timeout = setTimeout(() => { | |
| 26 | + throw new Error( | |
| 27 | + `Timeout: only received ${completedCount}/3 complete responses` | |
| 28 | + ); | |
| 29 | + }, 10000); | |
| 30 | + | |
| 31 | + ws.addEventListener("message", (e: MessageEvent) => { | |
| 32 | + const data = JSON.parse(e.data as string); | |
| 33 | + if (isUseChatResponseMessage(data)) { | |
| 34 | + receivedResponses.push({ | |
| 35 | + id: data.id, | |
| 36 | + body: data.body, | |
| 37 | + done: data.done, | |
| 38 | + timestamp: Date.now() | |
| 39 | + }); | |
| 40 | + if (data.done) { | |
| 41 | + completedCount++; | |
| 42 | + if (completedCount === 3) { | |
| 43 | + clearTimeout(timeout); | |
| 44 | + resolveAll(); | |
| 45 | + } | |
| 46 | + } | |
| 47 | + } | |
| 48 | + }); | |
| 49 | + | |
| 50 | + // Send 3 messages as fast as possible (simulating rapid user input) | |
| 51 | + const messages = ["first", "second", "third"]; | |
| 52 | + for (const text of messages) { | |
| 53 | + const userMessage: ChatMessage = { | |
| 54 | + id: `msg-${text}`, | |
| 55 | + role: "user", | |
| 56 | + parts: [{ type: "text", text }] | |
| 57 | + }; | |
| 58 | + | |
| 59 | + ws.send( | |
| 60 | + JSON.stringify({ | |
| 61 | + type: MessageType.CF_AGENT_USE_CHAT_REQUEST, | |
| 62 | + id: `req-${text}`, | |
| 63 | + init: { | |
| 64 | + method: "POST", | |
| 65 | + body: JSON.stringify({ messages: [userMessage] }) | |
| 66 | + } | |
| 67 | + }) | |
| 68 | + ); | |
| 69 | + } | |
| 70 | + | |
| 71 | + // Wait for all responses to complete | |
| 72 | + await allDone; | |
| 73 | + | |
| 74 | + // Verify we got exactly 3 complete responses | |
| 75 | + const doneResponses = receivedResponses.filter((r) => r.done); | |
| 76 | + expect(doneResponses).toHaveLength(3); | |
| 77 | + | |
| 78 | + // Verify responses completed in order (first, second, third) | |
| 79 | + // This proves the queue serialized the requests — if they ran concurrently, | |
| 80 | + // order would be non-deterministic | |
| 81 | + const doneIds = doneResponses.map((r) => r.id); | |
| 82 | + expect(doneIds).toEqual(["req-first", "req-second", "req-third"]); | |
| 83 | + | |
| 84 | + ws.close(); | |
| 85 | + }); | |
| 86 | + | |
| 87 | + it("queued messages are cleared when chat history is cleared", async () => { | |
| 88 | + const room = crypto.randomUUID(); | |
| 89 | + const { ws } = await connectChatWS(`/agents/test-chat-agent/${room}`); | |
| 90 | + | |
| 91 | + const receivedResponses: Array<{ id: string; done: boolean }> = []; | |
| 92 | + | |
| 93 | + ws.addEventListener("message", (e: MessageEvent) => { | |
| 94 | + const data = JSON.parse(e.data as string); | |
| 95 | + if (isUseChatResponseMessage(data)) { | |
| 96 | + receivedResponses.push({ id: data.id, done: data.done }); | |
| 97 | + } | |
| 98 | + }); | |
| 99 | + | |
| 100 | + // Send first message | |
| 101 | + const firstMessage: ChatMessage = { | |
| 102 | + id: "msg-1", | |
| 103 | + role: "user", | |
| 104 | + parts: [{ type: "text", text: "first" }] | |
| 105 | + }; | |
| 106 | + ws.send( | |
| 107 | + JSON.stringify({ | |
| 108 | + type: MessageType.CF_AGENT_USE_CHAT_REQUEST, | |
| 109 | + id: "req-1", | |
| 110 | + init: { | |
| 111 | + method: "POST", | |
| 112 | + body: JSON.stringify({ messages: [firstMessage] }) | |
| 113 | + } | |
| 114 | + }) | |
| 115 | + ); | |
| 116 | + | |
| 117 | + // Immediately send clear command | |
| 118 | + ws.send( | |
| 119 | + JSON.stringify({ | |
| 120 | + type: MessageType.CF_AGENT_CHAT_CLEAR | |
| 121 | + }) | |
| 122 | + ); | |
| 123 | + | |
| 124 | + // Send second message after clear | |
| 125 | + const secondMessage: ChatMessage = { | |
| 126 | + id: "msg-2", | |
| 127 | + role: "user", | |
| 128 | + parts: [{ type: "text", text: "second" }] | |
| 129 | + }; | |
| 130 | + ws.send( | |
| 131 | + JSON.stringify({ | |
| 132 | + type: MessageType.CF_AGENT_USE_CHAT_REQUEST, | |
| 133 | + id: "req-2", | |
| 134 | + init: { | |
| 135 | + method: "POST", | |
| 136 | + body: JSON.stringify({ messages: [secondMessage] }) | |
| 137 | + } | |
| 138 | + }) | |
| 139 | + ); | |
| 140 | + | |
| 141 | + // Wait for second response to complete | |
| 142 | + await new Promise<void>((resolve) => { | |
| 143 | + const checkInterval = setInterval(() => { | |
| 144 | + const doneResponses = receivedResponses.filter((r) => r.done); | |
| 145 | + // We expect at least one done response (the second one) | |
| 146 | + if (doneResponses.some((r) => r.id === "req-2")) { | |
| 147 | + clearInterval(checkInterval); | |
| 148 | + resolve(); | |
| 149 | + } | |
| 150 | + }, 50); | |
| 151 | + setTimeout(() => { | |
| 152 | + clearInterval(checkInterval); | |
| 153 | + resolve(); | |
| 154 | + }, 5000); | |
| 155 | + }); | |
| 156 | + | |
| 157 | + // The second message should have completed | |
| 158 | + const doneResponses = receivedResponses.filter((r) => r.done); | |
| 159 | + expect(doneResponses.some((r) => r.id === "req-2")).toBe(true); | |
| 160 | + | |
| 161 | + ws.close(); | |
| 162 | + }); | |
| 163 | +}); |
M packages/ai-chat/src/tests/worker.ts
@@ -64,6 +64,11 @@
| 64 | 64 | // It's a nested async function called from within onChatMessage |
| 65 | 65 | await this._simulateToolExecute(); |
| 66 | 66 | |
| 67 | + // Random delay to simulate variable LLM response times. | |
| 68 | + // This proves queue serialization works — without it, concurrent | |
| 69 | + // messages with random delays would complete in random order. | |
| 70 | + await new Promise((r) => setTimeout(r, Math.random() * 50)); | |
| 71 | + | |
| 67 | 72 | // Simple echo response for testing |
| 68 | 73 | return new Response("Hello from chat agent!", { |
| 69 | 74 | headers: { "Content-Type": "text/plain" } |
wut