import { describe, it, expect, vi, beforeEach } from "vitest"; import type { UIMessage as ChatMessage, UIMessageChunk } from "ai"; import { WebSocketChatTransport } from "../ws-chat-transport"; import { MessageType } from "../types"; /** * Minimal mock of the AgentConnection interface. * Supports both addEventListener listeners AND direct handleStreamResuming calls. */ function createMockAgent() { const sent: string[] = []; const target = new EventTarget(); return { sent, target, send(data: string) { sent.push(data); }, addEventListener( type: string, listener: (event: MessageEvent) => void, options?: { signal?: AbortSignal } ) { target.addEventListener(type, listener as EventListener, options); }, removeEventListener(type: string, listener: (event: MessageEvent) => void) { target.removeEventListener(type, listener as EventListener); }, /** Simulate a message arriving from the server */ dispatch(data: Record) { target.dispatchEvent( new MessageEvent("message", { data: JSON.stringify(data) }) ); } }; } describe("WebSocketChatTransport reconnectToStream + handleStreamResuming", () => { let agent: ReturnType; let activeRequestIds: Set; let transport: WebSocketChatTransport; beforeEach(() => { agent = createMockAgent(); activeRequestIds = new Set(); transport = new WebSocketChatTransport({ agent, activeRequestIds }); }); // ── handleStreamResuming basics ────────────────────────────────────── it("handleStreamResuming returns false when no reconnectToStream is pending", () => { expect(transport.handleStreamResuming({ id: "req-1" })).toBe(false); }); // ── reconnectToStream sends RESUME_REQUEST ─────────────────────────── it("sends CF_AGENT_STREAM_RESUME_REQUEST immediately", async () => { // Start reconnectToStream (don't await — it's waiting for handleStreamResuming) const promise = transport.reconnectToStream({ chatId: "chat-1" }); // Verify the request was sent expect(agent.sent).toHaveLength(1); const msg = JSON.parse(agent.sent[0]); expect(msg.type).toBe(MessageType.CF_AGENT_STREAM_RESUME_REQUEST); // Resolve by calling handleStreamResuming transport.handleStreamResuming({ id: "req-1" }); const result = await promise; expect(result).toBeInstanceOf(ReadableStream); }); // ── handleStreamResuming resolves reconnectToStream ────────────────── it("resolves with ReadableStream when handleStreamResuming is called", async () => { const promise = transport.reconnectToStream({ chatId: "chat-1" }); // Simulate onAgentMessage calling handleStreamResuming const handled = transport.handleStreamResuming({ id: "req-abc" }); expect(handled).toBe(true); const stream = await promise; expect(stream).toBeInstanceOf(ReadableStream); }); it("sends ACK when handleStreamResuming is called", async () => { const promise = transport.reconnectToStream({ chatId: "chat-1" }); transport.handleStreamResuming({ id: "req-42" }); await promise; // First message is RESUME_REQUEST, second is ACK expect(agent.sent).toHaveLength(2); const ack = JSON.parse(agent.sent[1]); expect(ack.type).toBe(MessageType.CF_AGENT_STREAM_RESUME_ACK); expect(ack.id).toBe("req-42"); }); it("adds requestId to activeRequestIds when handleStreamResuming is called", async () => { const promise = transport.reconnectToStream({ chatId: "chat-1" }); transport.handleStreamResuming({ id: "req-tracked" }); await promise; expect(activeRequestIds.has("req-tracked")).toBe(true); }); // ── handleStreamResumeNone basics ──────────────────────────────────── it("handleStreamResumeNone returns false when no reconnectToStream is pending", () => { expect(transport.handleStreamResumeNone()).toBe(false); }); it("handleStreamResumeNone resolves reconnectToStream with null immediately", async () => { const promise = transport.reconnectToStream({ chatId: "chat-1" }); const handled = transport.handleStreamResumeNone(); expect(handled).toBe(true); const result = await promise; expect(result).toBeNull(); }); it("handleStreamResumeNone clears both resolvers so subsequent calls return false", async () => { const promise = transport.reconnectToStream({ chatId: "chat-1" }); transport.handleStreamResumeNone(); await promise; expect(transport.handleStreamResumeNone()).toBe(false); expect(transport.handleStreamResuming({ id: "late" })).toBe(false); }); it("handleStreamResuming after handleStreamResumeNone does not double-resolve", async () => { const promise = transport.reconnectToStream({ chatId: "chat-1" }); // RESUME_NONE arrives first transport.handleStreamResumeNone(); const result = await promise; expect(result).toBeNull(); // Late STREAM_RESUMING should be ignored expect(transport.handleStreamResuming({ id: "req-late" })).toBe(false); }); // ── Timeout behavior ───────────────────────────────────────────────── it("resolves null after timeout when no handleStreamResuming is called", async () => { vi.useFakeTimers(); try { const promise = transport.reconnectToStream({ chatId: "chat-1" }); // Advance past the 5s timeout vi.advanceTimersByTime(5001); const result = await promise; expect(result).toBeNull(); } finally { vi.useRealTimers(); } }); it("clears _resumeResolver after timeout so handleStreamResuming returns false", async () => { vi.useFakeTimers(); try { const promise = transport.reconnectToStream({ chatId: "chat-1" }); vi.advanceTimersByTime(5001); await promise; // After timeout, resolver is cleared expect(transport.handleStreamResuming({ id: "late" })).toBe(false); } finally { vi.useRealTimers(); } }); // ── Idempotency / double-call safety ───────────────────────────────── it("only the latest reconnectToStream's resolver is active (React strict mode)", async () => { vi.useFakeTimers(); try { // Simulate React strict mode: effect runs twice const promise1 = transport.reconnectToStream({ chatId: "chat-1" }); const promise2 = transport.reconnectToStream({ chatId: "chat-1" }); // handleStreamResuming only triggers the LATEST resolver const handled = transport.handleStreamResuming({ id: "req-sm" }); expect(handled).toBe(true); // Second call resolves with a stream const stream2 = await promise2; expect(stream2).toBeInstanceOf(ReadableStream); // First call's resolver was overwritten — it times out vi.advanceTimersByTime(5001); const stream1 = await promise1; expect(stream1).toBeNull(); } finally { vi.useRealTimers(); } }); it("handleStreamResuming returns false after resolver is consumed", async () => { const promise = transport.reconnectToStream({ chatId: "chat-1" }); // First call succeeds expect(transport.handleStreamResuming({ id: "req-1" })).toBe(true); await promise; // Second call — resolver was cleared expect(transport.handleStreamResuming({ id: "req-2" })).toBe(false); }); // ── Chunk reception via _createResumeStream ────────────────────────── it("returns a stream that receives chunks via addEventListener", async () => { const promise = transport.reconnectToStream({ chatId: "chat-1" }); transport.handleStreamResuming({ id: "req-chunks" }); const stream = await promise; expect(stream).toBeInstanceOf(ReadableStream); const reader = stream!.getReader(); // Simulate chunk arriving over WebSocket agent.dispatch({ type: "cf_agent_use_chat_response", id: "req-chunks", body: '{"type":"text-start","id":"t1"}', done: false, replay: true }); const { value, done } = await reader.read(); expect(done).toBe(false); expect((value as UIMessageChunk).type).toBe("text-start"); }); it("stream closes when done:true chunk arrives", async () => { const promise = transport.reconnectToStream({ chatId: "chat-1" }); transport.handleStreamResuming({ id: "req-done" }); const stream = await promise; const reader = stream!.getReader(); // Send a chunk then done agent.dispatch({ type: "cf_agent_use_chat_response", id: "req-done", body: '{"type":"text-start","id":"t1"}', done: false }); agent.dispatch({ type: "cf_agent_use_chat_response", id: "req-done", body: '{"type":"text-delta","id":"t1","delta":"Hello"}', done: false }); agent.dispatch({ type: "cf_agent_use_chat_response", id: "req-done", body: "", done: true }); const chunks: UIMessageChunk[] = []; while (true) { const { value, done } = await reader.read(); if (done) break; chunks.push(value); } expect(chunks).toHaveLength(2); expect(chunks[0].type).toBe("text-start"); expect(chunks[1].type).toBe("text-delta"); }); it("removes requestId from activeRequestIds when stream completes", async () => { const promise = transport.reconnectToStream({ chatId: "chat-1" }); transport.handleStreamResuming({ id: "req-cleanup" }); const stream = await promise; const reader = stream!.getReader(); expect(activeRequestIds.has("req-cleanup")).toBe(true); // Complete the stream agent.dispatch({ type: "cf_agent_use_chat_response", id: "req-cleanup", body: "", done: true }); await reader.read(); // { done: true } expect(activeRequestIds.has("req-cleanup")).toBe(false); }); it("stream ignores chunks for different request IDs", async () => { const promise = transport.reconnectToStream({ chatId: "chat-1" }); transport.handleStreamResuming({ id: "req-A" }); const stream = await promise; const reader = stream!.getReader(); // This chunk is for a different request — should be ignored agent.dispatch({ type: "cf_agent_use_chat_response", id: "req-B", body: '{"type":"text-start","id":"t1"}', done: false }); // This chunk is for our request — should be received agent.dispatch({ type: "cf_agent_use_chat_response", id: "req-A", body: '{"type":"text-delta","id":"t1","delta":"correct"}', done: false }); const { value } = await reader.read(); expect((value as { type: string; delta?: string }).delta).toBe("correct"); }); // ── Error handling ─────────────────────────────────────────────────── it("stream errors when error chunk arrives", async () => { const promise = transport.reconnectToStream({ chatId: "chat-1" }); transport.handleStreamResuming({ id: "req-err" }); const stream = await promise; const reader = stream!.getReader(); agent.dispatch({ type: "cf_agent_use_chat_response", id: "req-err", body: "Something went wrong", error: true }); await expect(reader.read()).rejects.toThrow("Something went wrong"); }); it("stream errors with fallback message when error body is empty", async () => { const promise = transport.reconnectToStream({ chatId: "chat-1" }); transport.handleStreamResuming({ id: "req-err2" }); const stream = await promise; const reader = stream!.getReader(); agent.dispatch({ type: "cf_agent_use_chat_response", id: "req-err2", body: "", error: true }); await expect(reader.read()).rejects.toThrow("Stream error"); }); // ── send() failure tolerance ───────────────────────────────────────── it("reconnectToStream does not throw when send() throws", async () => { const failAgent = createMockAgent(); failAgent.send = () => { throw new Error("WebSocket closed"); }; const failTransport = new WebSocketChatTransport({ agent: failAgent }); vi.useFakeTimers(); try { const promise = failTransport.reconnectToStream({ chatId: "chat-1" }); // Should not throw — the try/catch handles it vi.advanceTimersByTime(5001); const result = await promise; expect(result).toBeNull(); } finally { vi.useRealTimers(); } }); // ── No activeRequestIds (optional) ─────────────────────────────────── // ── Double STREAM_RESUMING (server sends from onConnect + RESUME_REQUEST) ── it("activeRequestIds contains the ID after handleStreamResuming so caller can dedupe", async () => { const promise = transport.reconnectToStream({ chatId: "chat-1" }); // First STREAM_RESUMING — transport handles it expect(transport.handleStreamResuming({ id: "req-dup" })).toBe(true); await promise; // requestId is now in activeRequestIds expect(activeRequestIds.has("req-dup")).toBe(true); // Second STREAM_RESUMING with same ID — transport returns false // (resolver consumed), but the caller can check activeRequestIds // to skip the fallback and avoid a duplicate ACK. expect(transport.handleStreamResuming({ id: "req-dup" })).toBe(false); expect(activeRequestIds.has("req-dup")).toBe(true); }); it("works without activeRequestIds", async () => { const noIdsTransport = new WebSocketChatTransport({ agent }); const promise = noIdsTransport.reconnectToStream({ chatId: "chat-1" }); const handled = noIdsTransport.handleStreamResuming({ id: "req-noids" }); expect(handled).toBe(true); const stream = await promise; expect(stream).toBeInstanceOf(ReadableStream); }); // ── Singleton transport: agent update survives resolver ─────────── it("agent property can be updated after construction", () => { const newAgent = createMockAgent(); transport.agent = newAgent; expect(transport.agent).toBe(newAgent); }); it("resolver survives agent swap — handleStreamResuming works on same transport", async () => { // Start reconnectToStream on the original agent const promise = transport.reconnectToStream({ chatId: "chat-1" }); // Simulate _pk change: swap agent to a new socket const newAgent = createMockAgent(); transport.agent = newAgent; // handleStreamResuming still finds the resolver (same transport instance) const handled = transport.handleStreamResuming({ id: "req-swap" }); expect(handled).toBe(true); const stream = await promise; expect(stream).toBeInstanceOf(ReadableStream); }); it("ACK is sent via the NEW agent after agent swap", async () => { const promise = transport.reconnectToStream({ chatId: "chat-1" }); // Swap to new agent before resolving const newAgent = createMockAgent(); transport.agent = newAgent; transport.handleStreamResuming({ id: "req-ack-swap" }); await promise; // ACK should go to the NEW agent, not the old one expect(newAgent.sent).toHaveLength(1); const ack = JSON.parse(newAgent.sent[0]); expect(ack.type).toBe(MessageType.CF_AGENT_STREAM_RESUME_ACK); expect(ack.id).toBe("req-ack-swap"); // Old agent only received RESUME_REQUEST (no ACK) expect(agent.sent).toHaveLength(1); const req = JSON.parse(agent.sent[0]); expect(req.type).toBe(MessageType.CF_AGENT_STREAM_RESUME_REQUEST); }); it("resumed chunk listener attaches to the agent at resolve time", async () => { const promise = transport.reconnectToStream({ chatId: "chat-1" }); // Swap to new agent before resolving const newAgent = createMockAgent(); transport.agent = newAgent; transport.handleStreamResuming({ id: "req-listen" }); const stream = await promise; const reader = stream!.getReader(); // Dispatch chunk on the NEW agent — should be received newAgent.dispatch({ type: "cf_agent_use_chat_response", id: "req-listen", body: '{"type":"text-delta","id":"t1","delta":"from-new"}', done: false }); const { value } = await reader.read(); expect((value as { type: string; delta?: string }).delta).toBe("from-new"); }); it("full agent-swap scenario: reconnect on old socket, resume on new socket", async () => { // 1. Start reconnectToStream on old agent (socket may be closed) const promise = transport.reconnectToStream({ chatId: "chat-1" }); // 2. _pk changes — swap to new agent const newAgent = createMockAgent(); transport.agent = newAgent; // 3. Server sends STREAM_RESUMING on the new socket // (onAgentMessage calls handleStreamResuming on the same transport) transport.handleStreamResuming({ id: "req-full" }); const stream = await promise; // 4. ACK goes to new agent expect(newAgent.sent).toHaveLength(1); expect(JSON.parse(newAgent.sent[0]).type).toBe( MessageType.CF_AGENT_STREAM_RESUME_ACK ); // 5. Chunks arrive on new agent's EventTarget const reader = stream!.getReader(); newAgent.dispatch({ type: "cf_agent_use_chat_response", id: "req-full", body: '{"type":"text-start","id":"t1"}', done: false }); newAgent.dispatch({ type: "cf_agent_use_chat_response", id: "req-full", body: '{"type":"text-delta","id":"t1","delta":"hello"}', done: false }); newAgent.dispatch({ type: "cf_agent_use_chat_response", id: "req-full", body: "", done: true }); const chunks: UIMessageChunk[] = []; while (true) { const { value, done } = await reader.read(); if (done) break; chunks.push(value); } expect(chunks).toHaveLength(2); expect(chunks[0].type).toBe("text-start"); expect(chunks[1].type).toBe("text-delta"); // 6. Request ID cleaned up expect(activeRequestIds.has("req-full")).toBe(false); }); });