/** * Server-side VoiceAgent tests. * * Uses a TestVoiceAgent that stubs STT/TTS/VAD with deterministic results. * Tests cover: voice protocol, pipeline flow, conversation persistence, * interruption handling, text messages, and the beforeCallStart hook. */ import { env } from "cloudflare:workers"; import { createExecutionContext } from "cloudflare:test"; import { describe, expect, it } from "vitest"; import worker from "./worker"; // --- Helpers --- async function connectWS(path: string) { const ctx = createExecutionContext(); const req = new Request(`http://example.com${path}`, { headers: { Upgrade: "websocket" } }); const res = await worker.fetch(req, env, ctx); expect(res.status).toBe(101); const ws = res.webSocket as WebSocket; expect(ws).toBeDefined(); ws.accept(); return { ws, ctx }; } /** Collect messages until we find one matching the predicate, or timeout. */ function waitForMessageMatching( ws: WebSocket, predicate: (msg: unknown) => boolean, timeout = 5000 ): Promise { return new Promise((resolve, reject) => { const timer = setTimeout( () => reject(new Error("Timeout waiting for matching message")), timeout ); const handler = (e: MessageEvent) => { const msg = typeof e.data === "string" ? JSON.parse(e.data) : e.data; if (predicate(msg)) { clearTimeout(timer); ws.removeEventListener("message", handler); resolve(msg); } }; ws.addEventListener("message", handler); }); } function sendJSON(ws: WebSocket, msg: Record) { ws.send(JSON.stringify(msg)); } // Use unique instance names to avoid interference between tests let instanceCounter = 0; function uniquePath() { return `/agents/test-voice-agent/voice-test-${++instanceCounter}`; } // --- Tests --- /** Wait for a voice status message with a specific status value. */ function waitForStatus(ws: WebSocket, status: string) { return waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "status" && (m as Record).status === status ); } describe("VoiceAgent — protocol", () => { it("sends idle status on connect", async () => { const { ws } = await connectWS(uniquePath()); // Agent base class sends cf_agent_identity and cf_agent_mcp_servers first; // wait specifically for the voice idle status. const msg = await waitForStatus(ws, "idle"); expect(msg).toEqual({ type: "status", status: "idle" }); ws.close(); }); it("sends listening status on start_call", async () => { const { ws } = await connectWS(uniquePath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); const msg = await waitForStatus(ws, "listening"); expect(msg).toEqual({ type: "status", status: "listening" }); ws.close(); }); it("sends idle status on end_call", async () => { const { ws } = await connectWS(uniquePath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); sendJSON(ws, { type: "end_call" }); const msg = await waitForStatus(ws, "idle"); expect(msg).toEqual({ type: "status", status: "idle" }); ws.close(); }); }); describe("VoiceAgent — reconnect during call", () => { it("resumes call on new connection to same instance (simulates PartySocket reconnect)", async () => { // Use a fixed path so both connections hit the same DO instance const path = uniquePath(); // First connection: start a call const { ws: ws1 } = await connectWS(path); await waitForStatus(ws1, "idle"); sendJSON(ws1, { type: "start_call" }); await waitForStatus(ws1, "listening"); // Send some audio and get a transcript (proves call is working) ws1.send(new ArrayBuffer(20000)); sendJSON(ws1, { type: "end_of_speech" }); await waitForMessageMatching( ws1, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript" && (m as Record).role === "user" ); // Disconnect (simulates network drop) ws1.close(); // Second connection: reconnect to the same instance const { ws: ws2 } = await connectWS(path); await waitForStatus(ws2, "idle"); // Client would re-send start_call on reconnect sendJSON(ws2, { type: "start_call" }); await waitForStatus(ws2, "listening"); // Send audio on the new connection — should work normally ws2.send(new ArrayBuffer(20000)); sendJSON(ws2, { type: "end_of_speech" }); const transcript = (await waitForMessageMatching( ws2, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript" && (m as Record).role === "user" )) as Record; expect(transcript.text).toBe("test transcript"); ws2.close(); }); it("preserves conversation history across reconnect", async () => { const path = uniquePath(); // First connection: have a conversation const { ws: ws1 } = await connectWS(path); await waitForStatus(ws1, "idle"); sendJSON(ws1, { type: "start_call" }); await waitForStatus(ws1, "listening"); ws1.send(new ArrayBuffer(20000)); sendJSON(ws1, { type: "end_of_speech" }); // Wait for assistant response (conversation saved to SQLite) await waitForMessageMatching( ws1, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript_end" ); ws1.close(); // Second connection: new call on the same instance const { ws: ws2 } = await connectWS(path); await waitForStatus(ws2, "idle"); sendJSON(ws2, { type: "start_call" }); await waitForStatus(ws2, "listening"); // Send another turn ws2.send(new ArrayBuffer(20000)); sendJSON(ws2, { type: "end_of_speech" }); // Should still produce a response (history preserved in SQLite) const transcript = (await waitForMessageMatching( ws2, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript_end" )) as Record; expect(transcript.text).toBe("Echo: test transcript"); ws2.close(); }); }); describe("VoiceAgent — audio pipeline", () => { it("processes audio and returns user transcript on end_of_speech", async () => { const { ws } = await connectWS(uniquePath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); // Send enough audio data (> minAudioBytes = 16000) ws.send(new ArrayBuffer(20000)); // Trigger end of speech sendJSON(ws, { type: "end_of_speech" }); // Wait for the user transcript message const transcript = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript" && (m as Record).role === "user" )) as Record; expect(transcript.text).toBe("test transcript"); ws.close(); }); it("returns assistant response after processing", async () => { const { ws } = await connectWS(uniquePath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); ws.send(new ArrayBuffer(20000)); sendJSON(ws, { type: "end_of_speech" }); // Wait for the assistant transcript_end const transcriptEnd = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript_end" )) as Record; expect(transcriptEnd.text).toBe("Echo: test transcript"); ws.close(); }); it("sends pipeline metrics after processing", async () => { const { ws } = await connectWS(uniquePath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); ws.send(new ArrayBuffer(20000)); sendJSON(ws, { type: "end_of_speech" }); // Wait for metrics const metrics = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "metrics" )) as Record; expect(metrics).toHaveProperty("vad_ms"); expect(metrics).toHaveProperty("stt_ms"); expect(metrics).toHaveProperty("llm_ms"); expect(metrics).toHaveProperty("tts_ms"); expect(metrics).toHaveProperty("first_audio_ms"); expect(metrics).toHaveProperty("total_ms"); ws.close(); }); }); describe("VoiceAgent — text messages", () => { it("handles text_message without an active call (text-only response)", async () => { const { ws } = await connectWS(uniquePath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "text_message", text: "hello" }); // Wait for the assistant transcript_end const transcriptEnd = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript_end" )) as Record; expect(transcriptEnd.text).toBe("Echo: hello"); // Should end with idle status (not listening, since no call is active) const idleStatus = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "status" && (m as Record).status === "idle" )) as Record; expect(idleStatus.status).toBe("idle"); ws.close(); }); it("ignores text_message with missing text field", async () => { const { ws } = await connectWS(uniquePath()); await waitForStatus(ws, "idle"); // Send text_message without text field — should not crash sendJSON(ws, { type: "text_message" }); // Prove connection is still alive by sending a valid message sendJSON(ws, { type: "text_message", text: "alive" }); const transcriptEnd = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript_end" )) as Record; expect(transcriptEnd.text).toBe("Echo: alive"); ws.close(); }); it("ignores empty text_message", async () => { const { ws } = await connectWS(uniquePath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "text_message", text: "" }); // Prove connection is still alive sendJSON(ws, { type: "text_message", text: "still works" }); const transcriptEnd = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript_end" )) as Record; expect(transcriptEnd.text).toBe("Echo: still works"); ws.close(); }); }); describe("VoiceAgent — interruption", () => { it("returns to listening status after interrupt", async () => { const { ws } = await connectWS(uniquePath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); // Send audio and trigger pipeline ws.send(new ArrayBuffer(20000)); sendJSON(ws, { type: "end_of_speech" }); // Immediately interrupt sendJSON(ws, { type: "interrupt" }); // Should eventually return to listening const listeningStatus = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "status" && (m as Record).status === "listening" )) as Record; expect(listeningStatus.status).toBe("listening"); ws.close(); }); }); describe("VoiceAgent — non-voice messages", () => { it("does not crash on unknown JSON message types", async () => { const { ws } = await connectWS(uniquePath()); await waitForStatus(ws, "idle"); // Send a custom message type sendJSON(ws, { type: "custom_event", data: "hello" }); // Prove connection is still alive sendJSON(ws, { type: "text_message", text: "still alive" }); const transcriptEnd = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript_end" )) as Record; expect(transcriptEnd.text).toBe("Echo: still alive"); ws.close(); }); it("does not crash on non-JSON string messages", async () => { const { ws } = await connectWS(uniquePath()); await waitForStatus(ws, "idle"); // Send non-JSON text ws.send("this is not json {{{"); // Prove connection is still alive sendJSON(ws, { type: "text_message", text: "works" }); const transcriptEnd = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript_end" )) as Record; expect(transcriptEnd.text).toBe("Echo: works"); ws.close(); }); }); describe("VoiceAgent — beforeCallStart rejection", () => { it("does not transition to listening when beforeCallStart returns false", async () => { const path = uniquePath(); const { ws } = await connectWS(path); await waitForStatus(ws, "idle"); // Toggle beforeCallStart to false via a control message sendJSON(ws, { type: "_set_before_call_start", value: false }); // Wait for the ack to ensure the flag is set await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "_ack" ); // Now try to start a call — should be rejected silently sendJSON(ws, { type: "start_call" }); // Prove the agent is still functional and in idle state: // send a text_message which should work and return idle (not listening) sendJSON(ws, { type: "text_message", text: "still idle" }); const transcriptEnd = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript_end" )) as Record; expect(transcriptEnd.text).toBe("Echo: still idle"); // Should return to idle (not listening), proving start_call was rejected const idleStatus = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "status" && (m as Record).status === "idle" )) as Record; expect(idleStatus.status).toBe("idle"); ws.close(); }); it("transitions to listening when beforeCallStart returns true (default)", async () => { const { ws } = await connectWS(uniquePath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); const msg = await waitForStatus(ws, "listening"); expect(msg).toEqual({ type: "status", status: "listening" }); ws.close(); }); }); describe("VoiceAgent — audio_config", () => { it("sends audio_config message on start_call", async () => { const { ws } = await connectWS(uniquePath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); // audio_config should arrive before listening status const config = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "audio_config" )) as Record; expect(config.format).toBe("mp3"); // listening should follow await waitForStatus(ws, "listening"); ws.close(); }); }); describe("VoiceAgent — format negotiation", () => { it("sends configured format even when client requests a different one", async () => { const { ws } = await connectWS(uniquePath()); await waitForStatus(ws, "idle"); // Client requests pcm16, but the test agent is configured for mp3 sendJSON(ws, { type: "start_call", preferred_format: "pcm16" }); const config = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "audio_config" )) as Record; // Server always sends its configured format expect(config.format).toBe("mp3"); await waitForStatus(ws, "listening"); ws.close(); }); it("sends configured format when client does not request one", async () => { const { ws } = await connectWS(uniquePath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); const config = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "audio_config" )) as Record; expect(config.format).toBe("mp3"); await waitForStatus(ws, "listening"); ws.close(); }); }); describe("VoiceAgent — audio buffer limits", () => { it("does not process audio shorter than minAudioBytes", async () => { const { ws } = await connectWS(uniquePath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); // Send very short audio (less than 16000 bytes) ws.send(new ArrayBuffer(100)); sendJSON(ws, { type: "end_of_speech" }); // Should get listening status back (not thinking/processing) const msg = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "status" && (m as Record).status === "listening" )) as Record; expect(msg.status).toBe("listening"); ws.close(); }); it("ignores audio chunks when not in a call", async () => { const { ws } = await connectWS(uniquePath()); await waitForStatus(ws, "idle"); // Send audio without start_call — should be silently ignored ws.send(new ArrayBuffer(20000)); // Prove connection is still alive sendJSON(ws, { type: "text_message", text: "alive" }); const transcriptEnd = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript_end" )) as Record; expect(transcriptEnd.text).toBe("Echo: alive"); ws.close(); }); }); describe("VoiceAgent — protocol versioning", () => { it("sends welcome message with protocol_version on connect", async () => { const { ws } = await connectWS(uniquePath()); const welcome = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "welcome" )) as Record; expect(welcome.type).toBe("welcome"); expect(typeof welcome.protocol_version).toBe("number"); ws.close(); }); it("accepts hello message without crashing", async () => { const { ws } = await connectWS(uniquePath()); await waitForStatus(ws, "idle"); // Send hello (like VoiceClient does on connect) sendJSON(ws, { type: "hello", protocol_version: 1 }); // Prove connection is still alive sendJSON(ws, { type: "text_message", text: "after hello" }); const transcriptEnd = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript_end" )) as Record; expect(transcriptEnd.text).toBe("Echo: after hello"); ws.close(); }); }); describe("VoiceAgent — lifecycle hook counting", () => { it("increments onCallStart and onCallEnd counters", async () => { const { ws } = await connectWS(uniquePath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); sendJSON(ws, { type: "end_call" }); await waitForStatus(ws, "idle"); // Query the hook counters sendJSON(ws, { type: "_get_counts" }); const counts = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "_counts" )) as Record; expect(counts.callStart).toBe(1); expect(counts.callEnd).toBe(1); expect(counts.interrupt).toBe(0); ws.close(); }); it("increments onInterrupt counter", async () => { const { ws } = await connectWS(uniquePath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); ws.send(new ArrayBuffer(20000)); sendJSON(ws, { type: "end_of_speech" }); // Wait for pipeline to start await waitForStatus(ws, "thinking"); sendJSON(ws, { type: "interrupt" }); await waitForStatus(ws, "listening"); sendJSON(ws, { type: "_get_counts" }); const counts = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "_counts" )) as Record; expect(counts.interrupt).toBe(1); ws.close(); }); }); describe("VoiceAgent — forceEndCall", () => { it("ends call and returns to idle when forceEndCall is triggered", async () => { const { ws } = await connectWS(uniquePath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); // Trigger forceEndCall via control message sendJSON(ws, { type: "_force_end_call" }); const idleStatus = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "status" && (m as Record).status === "idle" )) as Record; expect(idleStatus.status).toBe("idle"); // Verify onCallEnd was called sendJSON(ws, { type: "_get_counts" }); const counts = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "_counts" )) as Record; expect(counts.callStart).toBe(1); expect(counts.callEnd).toBe(1); ws.close(); }); it("is a no-op when not in a call", async () => { const { ws } = await connectWS(uniquePath()); await waitForStatus(ws, "idle"); // forceEndCall when not in a call — should not crash sendJSON(ws, { type: "_force_end_call" }); // Prove connection is still alive sendJSON(ws, { type: "text_message", text: "still alive" }); const transcriptEnd = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript_end" )) as Record; expect(transcriptEnd.text).toBe("Echo: still alive"); ws.close(); }); }); describe("VoiceAgent — binary TTS audio", () => { it("sends binary audio data after assistant response", async () => { const { ws } = await connectWS(uniquePath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); ws.send(new ArrayBuffer(20000)); sendJSON(ws, { type: "end_of_speech" }); // Wait for binary audio data from server const audioData = await new Promise((resolve, reject) => { const timer = setTimeout( () => reject(new Error("Timeout waiting for binary audio")), 5000 ); const handler = (e: MessageEvent) => { if (e.data instanceof ArrayBuffer) { clearTimeout(timer); ws.removeEventListener("message", handler); resolve(e.data); } }; ws.addEventListener("message", handler); }); // TestTTS encodes text as bytes, so audio should be non-empty expect(audioData.byteLength).toBeGreaterThan(0); ws.close(); }); }); describe("VoiceAgent — conversation persistence", () => { it("accumulates messages in SQLite across turns", async () => { const path = uniquePath(); const { ws } = await connectWS(path); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); // First turn ws.send(new ArrayBuffer(20000)); sendJSON(ws, { type: "end_of_speech" }); await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript_end" ); // Wait for listening status (pipeline complete) await waitForStatus(ws, "listening"); // Second turn ws.send(new ArrayBuffer(20000)); sendJSON(ws, { type: "end_of_speech" }); await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript_end" ); // Check message count: 2 user + 2 assistant = 4 sendJSON(ws, { type: "_get_message_count" }); const countMsg = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "_message_count" )) as Record; expect(countMsg.count).toBe(4); ws.close(); }); }); describe("VoiceAgent — text_message during active call", () => { it("returns to listening (not idle) after text_message during a call", async () => { const { ws } = await connectWS(uniquePath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); // Send text message while in a call sendJSON(ws, { type: "text_message", text: "mid-call text" }); // Wait for assistant response const transcriptEnd = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript_end" )) as Record; expect(transcriptEnd.text).toBe("Echo: mid-call text"); // Should return to listening (not idle) since call is still active const listeningStatus = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "status" && ((m as Record).status === "listening" || (m as Record).status === "idle") )) as Record; expect(listeningStatus.status).toBe("listening"); ws.close(); }); }); describe("VoiceAgent — multiple sequential turns", () => { it("handles two audio turns in the same call", async () => { const { ws } = await connectWS(uniquePath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); // First turn ws.send(new ArrayBuffer(20000)); sendJSON(ws, { type: "end_of_speech" }); const t1 = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript" && (m as Record).role === "user" )) as Record; expect(t1.text).toBe("test transcript"); // Wait for pipeline to finish (listening again) await waitForStatus(ws, "listening"); // Second turn ws.send(new ArrayBuffer(20000)); sendJSON(ws, { type: "end_of_speech" }); const t2 = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript" && (m as Record).role === "user" )) as Record; expect(t2.text).toBe("test transcript"); ws.close(); }); }); describe("VoiceAgent — multiple connections", () => { it("allows two clients to start calls on the same DO instance", async () => { const path = uniquePath(); const { ws: ws1 } = await connectWS(path); await waitForStatus(ws1, "idle"); sendJSON(ws1, { type: "start_call" }); await waitForStatus(ws1, "listening"); // Second connection to the same DO const { ws: ws2 } = await connectWS(path); await waitForStatus(ws2, "idle"); sendJSON(ws2, { type: "start_call" }); await waitForStatus(ws2, "listening"); // Both connections should process audio independently ws1.send(new ArrayBuffer(20000)); sendJSON(ws1, { type: "end_of_speech" }); const t1 = (await waitForMessageMatching( ws1, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript" && (m as Record).role === "user" )) as Record; expect(t1.text).toBe("test transcript"); ws2.send(new ArrayBuffer(20000)); sendJSON(ws2, { type: "end_of_speech" }); const t2 = (await waitForMessageMatching( ws2, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript" && (m as Record).role === "user" )) as Record; expect(t2.text).toBe("test transcript"); ws1.close(); ws2.close(); }); it("one client ending call does not affect the other", async () => { const path = uniquePath(); const { ws: ws1 } = await connectWS(path); await waitForStatus(ws1, "idle"); sendJSON(ws1, { type: "start_call" }); await waitForStatus(ws1, "listening"); const { ws: ws2 } = await connectWS(path); await waitForStatus(ws2, "idle"); sendJSON(ws2, { type: "start_call" }); await waitForStatus(ws2, "listening"); // Client 1 ends their call sendJSON(ws1, { type: "end_call" }); await waitForStatus(ws1, "idle"); // Client 2 should still be able to process audio ws2.send(new ArrayBuffer(20000)); sendJSON(ws2, { type: "end_of_speech" }); const t2 = (await waitForMessageMatching( ws2, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript" && (m as Record).role === "user" )) as Record; expect(t2.text).toBe("test transcript"); ws1.close(); ws2.close(); }); }); // --- VAD retry recovery tests --- /** Connect to the VAD-retry test agent. */ async function connectVadRetryWS(path: string) { const ctx = createExecutionContext(); const req = new Request(`http://example.com${path}`, { headers: { Upgrade: "websocket" } }); const res = await worker.fetch(req, env, ctx); expect(res.status).toBe(101); const ws = res.webSocket as WebSocket; expect(ws).toBeDefined(); ws.accept(); return { ws, ctx }; } let vadRetryCounter = 0; function uniqueVadRetryPath() { return `/agents/test-vad-retry-voice-agent/vad-retry-${++vadRetryCounter}`; } describe("VoiceAgent — VAD retry recovery", () => { it("processes audio after VAD rejects and retry timer fires", async () => { const { ws } = await connectVadRetryWS(uniqueVadRetryPath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); // Send enough audio to pass minAudioBytes ws.send(new ArrayBuffer(20000)); sendJSON(ws, { type: "end_of_speech" }); // VAD rejects first time → server sends "listening" (not "thinking") // Then retry timer fires after 200ms → processes without VAD // We should eventually see the user transcript const transcript = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript" && (m as Record).role === "user" )) as Record; expect(transcript.text).toBe("test transcript"); // And the assistant response const assistantEnd = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript_end" )) as Record; expect(assistantEnd.text).toBe("Echo: test transcript"); ws.close(); }); it("cancels VAD retry timer when user speaks again", async () => { const { ws } = await connectVadRetryWS(uniqueVadRetryPath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); // First end_of_speech — VAD rejects, retry timer starts ws.send(new ArrayBuffer(20000)); sendJSON(ws, { type: "end_of_speech" }); // Before the 200ms retry timer fires, send a new end_of_speech // This clears the old timer and triggers a fresh pipeline run. // The second VAD call accepts. ws.send(new ArrayBuffer(20000)); sendJSON(ws, { type: "end_of_speech" }); const transcript = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript" && (m as Record).role === "user" )) as Record; expect(transcript.text).toBe("test transcript"); ws.close(); }); }); // --- Streaming STT tests --- /** Connect to the streaming STT test agent. */ async function connectStreamingWS(path: string) { const ctx = createExecutionContext(); const req = new Request(`http://example.com${path}`, { headers: { Upgrade: "websocket" } }); const res = await worker.fetch(req, env, ctx); expect(res.status).toBe(101); const ws = res.webSocket as WebSocket; expect(ws).toBeDefined(); ws.accept(); return { ws, ctx }; } let streamingInstanceCounter = 0; function uniqueStreamingPath() { return `/agents/test-streaming-voice-agent/streaming-test-${++streamingInstanceCounter}`; } /** Collect all messages matching a type until another message type arrives or timeout. */ function collectMessages( ws: WebSocket, type: string, timeout = 3000 ): Promise { return new Promise((resolve) => { const collected: unknown[] = []; const timer = setTimeout(() => { ws.removeEventListener("message", handler); resolve(collected); }, timeout); const handler = (e: MessageEvent) => { if (typeof e.data !== "string") return; try { const msg = JSON.parse(e.data); if (msg.type === type) { collected.push(msg); } } catch { // ignore } }; ws.addEventListener("message", handler); // Also resolve when we get a transcript (user) which means pipeline finished STT const doneHandler = (e: MessageEvent) => { if (typeof e.data !== "string") return; try { const msg = JSON.parse(e.data); if (msg.type === "transcript" && msg.role === "user") { clearTimeout(timer); ws.removeEventListener("message", handler); ws.removeEventListener("message", doneHandler); resolve(collected); } } catch { // ignore } }; ws.addEventListener("message", doneHandler); }); } describe("Streaming STT — basic pipeline", () => { it("produces transcript via streaming STT (no batch stt needed)", async () => { const { ws } = await connectStreamingWS(uniqueStreamingPath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); // Send enough audio for minAudioBytes (> 16000) sendJSON(ws, { type: "start_of_speech" }); ws.send(new ArrayBuffer(20000)); sendJSON(ws, { type: "end_of_speech" }); // Wait for user transcript const transcript = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript" && (m as Record).role === "user" )) as Record; // TestStreamingSTTSession.finish() returns "streaming transcript (N bytes)" expect(transcript.text).toBe("streaming transcript (20000 bytes)"); ws.close(); }); it("sends transcript_interim messages during audio streaming", async () => { const { ws } = await connectStreamingWS(uniqueStreamingPath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); // Start collecting interim messages before sending audio const interimPromise = collectMessages(ws, "transcript_interim"); // Send audio in multiple chunks to trigger interim callbacks sendJSON(ws, { type: "start_of_speech" }); ws.send(new ArrayBuffer(5000)); ws.send(new ArrayBuffer(5000)); ws.send(new ArrayBuffer(10000)); sendJSON(ws, { type: "end_of_speech" }); const interims = await interimPromise; // Should have received at least one interim message expect(interims.length).toBeGreaterThan(0); // Each interim should have a text field for (const interim of interims) { expect(interim).toHaveProperty("type", "transcript_interim"); expect(interim).toHaveProperty("text"); } ws.close(); }); it("returns assistant response after streaming STT", async () => { const { ws } = await connectStreamingWS(uniqueStreamingPath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); sendJSON(ws, { type: "start_of_speech" }); ws.send(new ArrayBuffer(20000)); sendJSON(ws, { type: "end_of_speech" }); // Wait for assistant transcript_end const transcriptEnd = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript_end" )) as Record; expect(transcriptEnd.text).toBe("Echo: streaming transcript (20000 bytes)"); ws.close(); }); it("sends pipeline metrics after streaming STT processing", async () => { const { ws } = await connectStreamingWS(uniqueStreamingPath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); sendJSON(ws, { type: "start_of_speech" }); ws.send(new ArrayBuffer(20000)); sendJSON(ws, { type: "end_of_speech" }); const metrics = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "metrics" )) as Record; expect(metrics).toHaveProperty("vad_ms"); expect(metrics).toHaveProperty("stt_ms"); expect(metrics).toHaveProperty("llm_ms"); expect(metrics).toHaveProperty("tts_ms"); expect(metrics).toHaveProperty("first_audio_ms"); expect(metrics).toHaveProperty("total_ms"); // STT should be very fast — it's just a flush, not full transcription expect(metrics.stt_ms).toBeLessThan(100); ws.close(); }); }); describe("Streaming STT — start_of_speech", () => { it("handles explicit start_of_speech message", async () => { const { ws } = await connectStreamingWS(uniqueStreamingPath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); // Explicitly signal speech start (like new VoiceClient would) sendJSON(ws, { type: "start_of_speech" }); // Send audio ws.send(new ArrayBuffer(20000)); sendJSON(ws, { type: "end_of_speech" }); const transcript = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript" && (m as Record).role === "user" )) as Record; expect(transcript.text).toBe("streaming transcript (20000 bytes)"); ws.close(); }); it("returns to listening without start_of_speech (no auto-creation)", async () => { const { ws } = await connectStreamingWS(uniqueStreamingPath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); // No start_of_speech — audio is buffered but no STT session created. // end_of_speech falls through to the batch STT path which gracefully // returns to listening when no batch STT provider is configured. ws.send(new ArrayBuffer(20000)); sendJSON(ws, { type: "end_of_speech" }); const msg = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "status" && (m as Record).status === "listening" )) as Record; expect(msg.status).toBe("listening"); ws.close(); }); }); describe("Streaming STT — interruption", () => { it("aborts streaming STT session on interrupt", async () => { const { ws } = await connectStreamingWS(uniqueStreamingPath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); // Send audio to create a session sendJSON(ws, { type: "start_of_speech" }); ws.send(new ArrayBuffer(20000)); // Interrupt before end_of_speech sendJSON(ws, { type: "interrupt" }); // Should return to listening const listeningStatus = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "status" && (m as Record).status === "listening" )) as Record; expect(listeningStatus.status).toBe("listening"); // Now send new audio — should create a fresh session and work normally sendJSON(ws, { type: "start_of_speech" }); ws.send(new ArrayBuffer(25000)); sendJSON(ws, { type: "end_of_speech" }); const transcript = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript" && (m as Record).role === "user" )) as Record; expect(transcript.text).toBe("streaming transcript (25000 bytes)"); ws.close(); }); it("aborts streaming STT session on end_call", async () => { const { ws } = await connectStreamingWS(uniqueStreamingPath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); ws.send(new ArrayBuffer(20000)); sendJSON(ws, { type: "end_call" }); const idleStatus = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "status" && (m as Record).status === "idle" )) as Record; expect(idleStatus.status).toBe("idle"); ws.close(); }); }); describe("Streaming STT — short audio rejection", () => { it("aborts session and returns to listening on short audio", async () => { const { ws } = await connectStreamingWS(uniqueStreamingPath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); // Send very short audio (< minAudioBytes = 16000) ws.send(new ArrayBuffer(100)); sendJSON(ws, { type: "end_of_speech" }); // Should get listening status back (session aborted, no processing) const msg = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "status" && (m as Record).status === "listening" )) as Record; expect(msg.status).toBe("listening"); ws.close(); }); }); // --- Provider-driven EOT tests --- /** Connect to the EOT test agent. */ async function connectEOTWS(path: string) { const ctx = createExecutionContext(); const req = new Request(`http://example.com${path}`, { headers: { Upgrade: "websocket" } }); const res = await worker.fetch(req, env, ctx); expect(res.status).toBe(101); const ws = res.webSocket as WebSocket; expect(ws).toBeDefined(); ws.accept(); return { ws, ctx }; } let eotInstanceCounter = 0; function uniqueEOTPath() { return `/agents/test-eot-voice-agent/eot-test-${++eotInstanceCounter}`; } describe("Provider-driven EOT — basic pipeline", () => { it("triggers pipeline immediately when provider fires onEndOfTurn", async () => { const { ws } = await connectEOTWS(uniqueEOTPath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); // Send 20000 bytes — TestEOTStreamingSTTSession fires onEndOfTurn at this threshold sendJSON(ws, { type: "start_of_speech" }); ws.send(new ArrayBuffer(20000)); // Pipeline should start WITHOUT sending end_of_speech // Wait for user transcript const transcript = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript" && (m as Record).role === "user" )) as Record; expect(transcript.text).toBe("eot transcript (20000 bytes)"); ws.close(); }); it("returns assistant response after provider-driven EOT", async () => { const { ws } = await connectEOTWS(uniqueEOTPath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); sendJSON(ws, { type: "start_of_speech" }); ws.send(new ArrayBuffer(20000)); const transcriptEnd = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript_end" )) as Record; expect(transcriptEnd.text).toBe("Echo: eot transcript (20000 bytes)"); ws.close(); }); it("sends binary TTS audio after provider-driven EOT", async () => { const { ws } = await connectEOTWS(uniqueEOTPath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); sendJSON(ws, { type: "start_of_speech" }); ws.send(new ArrayBuffer(20000)); const audioData = await new Promise((resolve, reject) => { const timer = setTimeout( () => reject(new Error("Timeout waiting for binary audio")), 5000 ); const handler = (e: MessageEvent) => { if (e.data instanceof ArrayBuffer) { clearTimeout(timer); ws.removeEventListener("message", handler); resolve(e.data); } }; ws.addEventListener("message", handler); }); expect(audioData.byteLength).toBeGreaterThan(0); ws.close(); }); it("sends pipeline metrics with zero VAD and STT times", async () => { const { ws } = await connectEOTWS(uniqueEOTPath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); sendJSON(ws, { type: "start_of_speech" }); ws.send(new ArrayBuffer(20000)); const metrics = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "metrics" )) as Record; expect(metrics).toHaveProperty("vad_ms", 0); expect(metrics).toHaveProperty("stt_ms", 0); expect(metrics).toHaveProperty("llm_ms"); expect(metrics).toHaveProperty("tts_ms"); expect(metrics).toHaveProperty("first_audio_ms"); expect(metrics).toHaveProperty("total_ms"); ws.close(); }); }); describe("Provider-driven EOT — late end_of_speech", () => { it("ignores late end_of_speech after provider-driven EOT", async () => { const { ws } = await connectEOTWS(uniqueEOTPath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); // Send enough audio to trigger EOT sendJSON(ws, { type: "start_of_speech" }); ws.send(new ArrayBuffer(20000)); // Wait for the pipeline to start (user transcript confirms it) await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript" && (m as Record).role === "user" ); // Now send a late end_of_speech — this should be ignored sendJSON(ws, { type: "end_of_speech" }); // Wait for the pipeline to complete normally const transcriptEnd = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript_end" )) as Record; expect(transcriptEnd.text).toBe("Echo: eot transcript (20000 bytes)"); // Should return to listening (not double-process) await waitForStatus(ws, "listening"); ws.close(); }); }); describe("Provider-driven EOT — multiple turns", () => { it("handles two EOT-driven turns in the same call", async () => { const { ws } = await connectEOTWS(uniqueEOTPath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); // First turn: send audio to trigger EOT sendJSON(ws, { type: "start_of_speech" }); ws.send(new ArrayBuffer(20000)); const t1 = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript" && (m as Record).role === "user" )) as Record; expect(t1.text).toBe("eot transcript (20000 bytes)"); // Wait for pipeline to finish (listening again) await waitForStatus(ws, "listening"); // Second turn: send audio to trigger another EOT sendJSON(ws, { type: "start_of_speech" }); ws.send(new ArrayBuffer(20000)); const t2 = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript" && (m as Record).role === "user" )) as Record; expect(t2.text).toBe("eot transcript (20000 bytes)"); ws.close(); }); it("accumulates messages in SQLite across EOT-driven turns", async () => { const path = uniqueEOTPath(); const { ws } = await connectEOTWS(path); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); // First turn sendJSON(ws, { type: "start_of_speech" }); ws.send(new ArrayBuffer(20000)); await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript_end" ); await waitForStatus(ws, "listening"); // Second turn sendJSON(ws, { type: "start_of_speech" }); ws.send(new ArrayBuffer(20000)); await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript_end" ); // Check message count: 2 user + 2 assistant = 4 sendJSON(ws, { type: "_get_message_count" }); const countMsg = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "_message_count" )) as Record; expect(countMsg.count).toBe(4); ws.close(); }); }); describe("Provider-driven EOT — interruption", () => { it("handles interrupt during EOT-triggered pipeline", async () => { const { ws } = await connectEOTWS(uniqueEOTPath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); // Trigger EOT sendJSON(ws, { type: "start_of_speech" }); ws.send(new ArrayBuffer(20000)); // Wait for pipeline to start processing await waitForStatus(ws, "speaking"); // Interrupt during processing sendJSON(ws, { type: "interrupt" }); // Should return to listening const listeningStatus = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "status" && (m as Record).status === "listening" )) as Record; expect(listeningStatus.status).toBe("listening"); // New turn should work normally sendJSON(ws, { type: "start_of_speech" }); ws.send(new ArrayBuffer(20000)); const transcript = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript" && (m as Record).role === "user" )) as Record; expect(transcript.text).toBe("eot transcript (20000 bytes)"); ws.close(); }); }); describe("Provider-driven EOT — sub-threshold audio", () => { it("does not trigger EOT when audio is below provider threshold", async () => { const { ws } = await connectEOTWS(uniqueEOTPath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); // Send less than 20000 bytes — EOT won't fire sendJSON(ws, { type: "start_of_speech" }); ws.send(new ArrayBuffer(10000)); // Now send end_of_speech manually — should be processed via // normal streaming STT path (finish() returns the transcript) sendJSON(ws, { type: "end_of_speech" }); // minAudioBytes is 16000, we only sent 10000 — too short // Should return to listening without processing const msg = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "status" && (m as Record).status === "listening" )) as Record; expect(msg.status).toBe("listening"); ws.close(); }); it("processes via finish() when above minAudioBytes but below EOT threshold", async () => { const { ws } = await connectEOTWS(uniqueEOTPath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); // Send 18000 bytes — above minAudioBytes (16000) but below EOT threshold (20000) sendJSON(ws, { type: "start_of_speech" }); ws.send(new ArrayBuffer(18000)); // EOT won't fire. Use end_of_speech to trigger finish() sendJSON(ws, { type: "end_of_speech" }); // Should process via streaming STT finish() path const transcript = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript" && (m as Record).role === "user" )) as Record; // TestEOTStreamingSTTSession.finish() returns "eot transcript (N bytes)" expect(transcript.text).toBe("eot transcript (18000 bytes)"); ws.close(); }); }); describe("Provider-driven EOT — interim transcripts", () => { it("sends transcript_interim messages before EOT triggers", async () => { const { ws } = await connectEOTWS(uniqueEOTPath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); // Start collecting interim messages const interimPromise = collectMessages(ws, "transcript_interim"); // Send audio in chunks — each feed() fires onInterim sendJSON(ws, { type: "start_of_speech" }); ws.send(new ArrayBuffer(5000)); ws.send(new ArrayBuffer(5000)); ws.send(new ArrayBuffer(5000)); ws.send(new ArrayBuffer(5000)); // Total 20000 — triggers EOT const interims = await interimPromise; // Should have received interim messages before the final transcript expect(interims.length).toBeGreaterThan(0); for (const interim of interims) { expect(interim).toHaveProperty("type", "transcript_interim"); expect(interim).toHaveProperty("text"); } ws.close(); }); }); // --- Regression tests for streaming-only graceful fallback --- describe("Streaming STT — graceful fallback without batch STT", () => { it("returns to listening when end_of_speech arrives without streaming session (streaming-only agent)", async () => { // Regression: when only streamingStt is configured (no batch stt), // end_of_speech without a preceding start_of_speech should return // to listening instead of throwing "No STT provider configured". const { ws } = await connectStreamingWS(uniqueStreamingPath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); // Send audio without start_of_speech ws.send(new ArrayBuffer(20000)); sendJSON(ws, { type: "end_of_speech" }); // Should gracefully return to listening (no crash) const msg = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "status" && (m as Record).status === "listening" )) as Record; expect(msg.status).toBe("listening"); // Prove agent is still functional after the fallback sendJSON(ws, { type: "start_of_speech" }); ws.send(new ArrayBuffer(20000)); sendJSON(ws, { type: "end_of_speech" }); const transcript = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript" && (m as Record).role === "user" )) as Record; expect(transcript.text).toBe("streaming transcript (20000 bytes)"); ws.close(); }); it("EOT agent returns to listening when end_of_speech arrives without streaming session", async () => { // Same regression for EOT agent (no batch stt, no vad) const { ws } = await connectEOTWS(uniqueEOTPath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); ws.send(new ArrayBuffer(20000)); sendJSON(ws, { type: "end_of_speech" }); const msg = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "status" && (m as Record).status === "listening" )) as Record; expect(msg.status).toBe("listening"); ws.close(); }); }); describe("Streaming STT — no phantom sessions after end_of_speech", () => { it("does not create phantom sessions when audio continues after end_of_speech", async () => { const { ws } = await connectStreamingWS(uniqueStreamingPath()); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); // First turn: normal flow sendJSON(ws, { type: "start_of_speech" }); ws.send(new ArrayBuffer(20000)); sendJSON(ws, { type: "end_of_speech" }); await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript_end" ); await waitForStatus(ws, "listening"); // Simulate audio continuing after end_of_speech (like ScriptProcessorNode). // Without auto-creation, this should NOT create a phantom STT session. ws.send(new ArrayBuffer(20000)); // Now do a second proper turn — should work normally sendJSON(ws, { type: "start_of_speech" }); ws.send(new ArrayBuffer(20000)); sendJSON(ws, { type: "end_of_speech" }); const transcript = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript" && (m as Record).role === "user" )) as Record; // Should only reflect the second turn's audio (20000 bytes), // not accumulated phantom audio expect(transcript.text).toBe("streaming transcript (20000 bytes)"); ws.close(); }); });