import { describe, it, expect } from "vitest"; import type { UIMessage as ChatMessage, UIMessageChunk } from "ai"; import { connectChatWS } from "./test-utils"; import { WebSocketChatTransport } from "../ws-chat-transport"; function connectSlowStream(room: string) { return connectChatWS(`/agents/slow-stream-agent/${room}`); } function withTimeout(promise: Promise, timeoutMs: number): Promise { let timeoutId: ReturnType | undefined; const timeoutPromise = new Promise((_, reject) => { timeoutId = setTimeout(() => reject(new Error("Timed out")), timeoutMs); }); return Promise.race([promise, timeoutPromise]).finally(() => { if (timeoutId) clearTimeout(timeoutId); }) as Promise; } async function readUntilDoneOrAbort( reader: ReadableStreamDefaultReader, timeoutMs: number ) { const deadline = Date.now() + timeoutMs; while (true) { const remaining = deadline - Date.now(); if (remaining <= 0) throw new Error("Timed out waiting for stream to end"); try { const result = await withTimeout(reader.read(), remaining); if (result.done) return; } catch (err) { if (err && typeof err === "object" && "name" in err) { if ((err as { name: unknown }).name === "AbortError") return; } throw err; } } } async function collectChunks( stream: ReadableStream, timeoutMs: number ): Promise { const reader = stream.getReader(); const chunks: UIMessageChunk[] = []; const deadline = Date.now() + timeoutMs; while (true) { const remaining = deadline - Date.now(); if (remaining <= 0) throw new Error("Timed out collecting chunks"); const result = await withTimeout(reader.read(), remaining); if (result.done) break; chunks.push(result.value); } return chunks; } const userMessage: ChatMessage = { id: "msg1", role: "user", parts: [{ type: "text", text: "Hello" }] }; describe("WebSocketChatTransport abort", () => { it("terminates the stream when abortSignal fires mid-stream", async () => { const room = crypto.randomUUID(); const { ws } = await connectSlowStream(room); await new Promise((r) => setTimeout(r, 50)); try { const transport = new WebSocketChatTransport({ agent: ws }); const abortController = new AbortController(); const stream = await transport.sendMessages({ chatId: "chat", messages: [userMessage], abortSignal: abortController.signal, trigger: "submit-message", body: { format: "plaintext", chunkCount: 20, chunkDelayMs: 50 } }); const reader = stream.getReader(); // Abort mid-stream. Without proper stream termination, this would hang. setTimeout(() => abortController.abort(), 200); await expect(readUntilDoneOrAbort(reader, 5000)).resolves.toBeUndefined(); } finally { ws.close(1000); } }); it("terminates immediately when abortSignal is already aborted", async () => { const room = crypto.randomUUID(); const { ws } = await connectSlowStream(room); await new Promise((r) => setTimeout(r, 50)); try { const transport = new WebSocketChatTransport({ agent: ws }); const abortController = new AbortController(); abortController.abort(); const stream = await transport.sendMessages({ chatId: "chat", messages: [userMessage], abortSignal: abortController.signal, trigger: "submit-message", body: { format: "plaintext", chunkCount: 20, chunkDelayMs: 50 } }); const reader = stream.getReader(); // Should terminate immediately — no waiting for chunks await expect(readUntilDoneOrAbort(reader, 1000)).resolves.toBeUndefined(); } finally { ws.close(1000); } }); it("stream.cancel() sends cancel to server and terminates", async () => { const room = crypto.randomUUID(); const { ws } = await connectSlowStream(room); await new Promise((r) => setTimeout(r, 50)); try { const transport = new WebSocketChatTransport({ agent: ws }); const stream = await transport.sendMessages({ chatId: "chat", messages: [userMessage], abortSignal: undefined, trigger: "submit-message", body: { format: "plaintext", chunkCount: 20, chunkDelayMs: 50 } }); // Let a few chunks arrive await new Promise((r) => setTimeout(r, 200)); // cancel() should not hang — it sends CF_AGENT_CHAT_REQUEST_CANCEL // and terminates the stream await expect(withTimeout(stream.cancel(), 2000)).resolves.toBeUndefined(); } finally { ws.close(1000); } }); it("keeps requestId in activeRequestIds after abort until server sends done", async () => { const room = crypto.randomUUID(); const { ws } = await connectSlowStream(room); await new Promise((r) => setTimeout(r, 50)); try { const activeRequestIds = new Set(); const transport = new WebSocketChatTransport({ agent: ws, activeRequestIds }); const abortController = new AbortController(); const stream = await transport.sendMessages({ chatId: "chat", messages: [userMessage], abortSignal: abortController.signal, trigger: "submit-message", body: { format: "plaintext", chunkCount: 20, chunkDelayMs: 50 } }); const reader = stream.getReader(); // Wait for a few chunks to arrive await new Promise((r) => setTimeout(r, 200)); // Verify requestId is tracked before abort expect(activeRequestIds.size).toBe(1); const requestId = [...activeRequestIds][0]; // Abort mid-stream abortController.abort(); // After abort, requestId must still be in activeRequestIds so that // onAgentMessage skips in-flight server chunks (issue #1100). expect(activeRequestIds.has(requestId)).toBe(true); // Drain the stream (it errors with AbortError) await readUntilDoneOrAbort(reader, 5000); // ID is still kept — caller (onAgentMessage in react.tsx) cleans it up // when it receives the server's done:true broadcast. expect(activeRequestIds.has(requestId)).toBe(true); } finally { ws.close(1000); } }); it("completes normally when stream finishes without abort", async () => { const room = crypto.randomUUID(); const { ws } = await connectSlowStream(room); await new Promise((r) => setTimeout(r, 50)); try { const transport = new WebSocketChatTransport({ agent: ws }); const stream = await transport.sendMessages({ chatId: "chat", messages: [userMessage], abortSignal: undefined, trigger: "submit-message", body: { format: "plaintext", chunkCount: 3, chunkDelayMs: 10 } }); const chunks = await collectChunks(stream, 5000); expect(chunks.length).toBeGreaterThanOrEqual(3); } finally { ws.close(1000); } }); });