/** * Server-side VoiceInput mixin tests. * * Uses test agents that stub STT providers with deterministic results. * Tests cover: voice protocol, consumer lifecycle passthrough, message * routing, batch STT pipeline, streaming STT pipeline, provider-driven * EOT, onTranscript hook, beforeCallStart rejection, and interrupt handling. */ import { env } from "cloudflare:workers"; import { createExecutionContext } from "cloudflare:test"; import { describe, expect, it } from "vitest"; import worker from "./worker"; // --- Helpers --- async function connectWS(path: string) { const ctx = createExecutionContext(); const req = new Request(`http://example.com${path}`, { headers: { Upgrade: "websocket" } }); const res = await worker.fetch(req, env, ctx); expect(res.status).toBe(101); const ws = res.webSocket as WebSocket; expect(ws).toBeDefined(); ws.accept(); return { ws, ctx }; } function waitForMessageMatching( ws: WebSocket, predicate: (msg: unknown) => boolean, timeout = 5000 ): Promise { return new Promise((resolve, reject) => { const timer = setTimeout( () => reject(new Error("Timeout waiting for matching message")), timeout ); const handler = (e: MessageEvent) => { const msg = typeof e.data === "string" ? JSON.parse(e.data) : e.data; if (predicate(msg)) { clearTimeout(timer); ws.removeEventListener("message", handler); resolve(msg); } }; ws.addEventListener("message", handler); }); } function sendJSON(ws: WebSocket, msg: Record) { ws.send(JSON.stringify(msg)); } function waitForStatus(ws: WebSocket, status: string) { return waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "status" && (m as Record).status === status ); } function waitForType(ws: WebSocket, type: string) { return waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === type ); } /** Create a chunk of silent PCM audio (zeros). */ function makeSilentAudio(bytes: number): ArrayBuffer { return new ArrayBuffer(bytes); } let instanceCounter = 0; function uniquePath(agent: string) { return `/agents/${agent}/voice-input-test-${++instanceCounter}`; } /** Request internal state from a test agent. */ async function getAgentState(ws: WebSocket) { sendJSON(ws, { type: "_get_state" }); const msg = (await waitForType(ws, "_state")) as Record; return msg; } // --- Tests --- describe("VoiceInput — protocol basics", () => { it("sends welcome and idle status on connect", async () => { const { ws } = await connectWS(uniquePath("test-voice-input-agent")); const welcome = (await waitForType(ws, "welcome")) as Record< string, unknown >; expect(welcome.protocol_version).toBeDefined(); const status = (await waitForStatus(ws, "idle")) as Record; expect(status.status).toBe("idle"); ws.close(); }); it("transitions to listening on start_call", async () => { const { ws } = await connectWS(uniquePath("test-voice-input-agent")); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); const status = (await waitForStatus(ws, "listening")) as Record< string, unknown >; expect(status.status).toBe("listening"); ws.close(); }); it("transitions to idle on end_call", async () => { const { ws } = await connectWS(uniquePath("test-voice-input-agent")); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); sendJSON(ws, { type: "end_call" }); const status = (await waitForStatus(ws, "idle")) as Record; expect(status.status).toBe("idle"); ws.close(); }); }); describe("VoiceInput — consumer lifecycle passthrough", () => { it("calls consumer onConnect", async () => { const { ws } = await connectWS(uniquePath("test-voice-input-agent")); await waitForStatus(ws, "idle"); const state = await getAgentState(ws); expect(state.connectCount).toBe(1); ws.close(); }); it("calls consumer onClose", async () => { // Connect two clients to the same instance so we can query state after close const instancePath = uniquePath("test-voice-input-agent"); const { ws: ws1 } = await connectWS(instancePath); await waitForStatus(ws1, "idle"); // Close first connection ws1.close(); // Give the DO time to process the close await new Promise((resolve) => setTimeout(resolve, 200)); // Connect second client to same instance and query state const { ws: ws2 } = await connectWS(instancePath); await waitForStatus(ws2, "idle"); const state = await getAgentState(ws2); // Two connects (ws1 + ws2), one close (ws1) expect(state.connectCount).toBe(2); expect(state.closeCount).toBe(1); ws2.close(); }); it("forwards non-voice messages to consumer onMessage", async () => { const { ws } = await connectWS(uniquePath("test-voice-input-agent")); await waitForStatus(ws, "idle"); // Send a custom (non-voice) message sendJSON(ws, { type: "_custom", data: "hello from client" }); await waitForType(ws, "_ack"); const state = await getAgentState(ws); expect(state.customMessages).toEqual(["hello from client"]); ws.close(); }); it("does NOT forward voice protocol messages to consumer onMessage", async () => { const { ws } = await connectWS(uniquePath("test-voice-input-agent")); await waitForStatus(ws, "idle"); // Send voice protocol messages — these should be intercepted sendJSON(ws, { type: "hello" }); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); // Send a custom message to verify consumer onMessage works sendJSON(ws, { type: "_custom", data: "after voice msgs" }); await waitForType(ws, "_ack"); const state = await getAgentState(ws); // Only the _custom message should have reached onMessage expect(state.customMessages).toEqual(["after voice msgs"]); ws.close(); }); }); describe("VoiceInput — batch STT pipeline", () => { it("transcribes audio and calls onTranscript", async () => { const { ws } = await connectWS(uniquePath("test-voice-input-agent")); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); // Send enough audio to exceed minAudioBytes (default 16000) ws.send(makeSilentAudio(20000)); // Signal end of speech sendJSON(ws, { type: "end_of_speech" }); // Should get a transcript message const transcript = (await waitForType(ws, "transcript")) as Record< string, unknown >; expect(transcript.role).toBe("user"); expect(transcript.text).toBe("test input transcript"); // Should return to listening await waitForStatus(ws, "listening"); // Verify onTranscript was called const state = await getAgentState(ws); expect(state.transcripts).toEqual(["test input transcript"]); ws.close(); }); it("discards audio that is too short", async () => { const { ws } = await connectWS(uniquePath("test-voice-input-agent")); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); // Send audio below minAudioBytes threshold ws.send(makeSilentAudio(100)); sendJSON(ws, { type: "end_of_speech" }); // Should return to listening without a transcript await waitForStatus(ws, "listening"); // Verify no transcript was generated const state = await getAgentState(ws); expect(state.transcripts).toEqual([]); ws.close(); }); it("handles multiple utterances", async () => { const { ws } = await connectWS(uniquePath("test-voice-input-agent")); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); // First utterance ws.send(makeSilentAudio(20000)); sendJSON(ws, { type: "end_of_speech" }); await waitForType(ws, "transcript"); await waitForStatus(ws, "listening"); // Second utterance ws.send(makeSilentAudio(20000)); sendJSON(ws, { type: "end_of_speech" }); await waitForType(ws, "transcript"); await waitForStatus(ws, "listening"); const state = await getAgentState(ws); expect(state.transcripts).toEqual([ "test input transcript", "test input transcript" ]); ws.close(); }); }); describe("VoiceInput — streaming STT pipeline", () => { it("transcribes with streaming STT and emits interim transcripts", async () => { const { ws } = await connectWS( uniquePath("test-streaming-voice-input-agent") ); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); // Start speech to create streaming session sendJSON(ws, { type: "start_of_speech" }); // Send audio in chunks ws.send(makeSilentAudio(10000)); ws.send(makeSilentAudio(10000)); // Should get interim transcripts const interim = (await waitForType(ws, "transcript_interim")) as Record< string, unknown >; expect(interim.text).toBeDefined(); // End speech to flush sendJSON(ws, { type: "end_of_speech" }); // Should get final transcript const transcript = (await waitForType(ws, "transcript")) as Record< string, unknown >; expect(transcript.role).toBe("user"); expect(typeof transcript.text).toBe("string"); expect((transcript.text as string).includes("streaming input")).toBe(true); await waitForStatus(ws, "listening"); const state = await getAgentState(ws); expect(state.transcripts).toHaveLength(1); ws.close(); }); }); describe("VoiceInput — provider-driven EOT", () => { it("emits transcript on provider EOT without end_of_speech", async () => { const { ws } = await connectWS(uniquePath("test-eot-voice-input-agent")); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); sendJSON(ws, { type: "start_of_speech" }); // Send enough audio to trigger EOT (>= 20000 bytes) for (let i = 0; i < 5; i++) { ws.send(makeSilentAudio(5000)); } // Provider fires onEndOfTurn — should get transcript without end_of_speech const transcript = (await waitForType(ws, "transcript")) as Record< string, unknown >; expect(transcript.role).toBe("user"); expect((transcript.text as string).includes("eot input")).toBe(true); await waitForStatus(ws, "listening"); const state = await getAgentState(ws); expect(state.transcripts).toHaveLength(1); ws.close(); }); }); describe("VoiceInput — interrupt", () => { it("aborts in-flight STT on interrupt", async () => { const { ws } = await connectWS(uniquePath("test-voice-input-agent")); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); // Send audio ws.send(makeSilentAudio(20000)); // Interrupt before end_of_speech sendJSON(ws, { type: "interrupt" }); await waitForStatus(ws, "listening"); // No transcript should have been generated const state = await getAgentState(ws); expect(state.transcripts).toEqual([]); ws.close(); }); }); describe("VoiceInput — beforeCallStart rejection", () => { it("does not start call when beforeCallStart returns false", async () => { const { ws } = await connectWS( uniquePath("test-reject-call-voice-input-agent") ); await waitForStatus(ws, "idle"); // start_call should be rejected — no listening status should appear sendJSON(ws, { type: "start_call" }); // Send a second start_call to confirm the agent is still in idle state // (if it were listening, we'd see a status change) sendJSON(ws, { type: "start_call" }); // Give a moment for any messages to arrive await new Promise((resolve) => setTimeout(resolve, 300)); // The agent should still be in idle — no "listening" status was sent // We verify by connecting and checking no transition happened ws.close(); }); }); describe("VoiceInput — double start_call", () => { it("does not reset audio buffer on duplicate start_call", async () => { const { ws } = await connectWS(uniquePath("test-voice-input-agent")); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); // Send audio ws.send(makeSilentAudio(20000)); // Send duplicate start_call — should NOT reset the buffer sendJSON(ws, { type: "start_call" }); // Wait for the second listening status from the duplicate start_call await waitForStatus(ws, "listening"); // End speech — should still have the audio from before the duplicate start_call sendJSON(ws, { type: "end_of_speech" }); const transcript = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript" && (m as Record).role === "user" )) as Record; // Audio should still be present (not lost by duplicate start_call) expect(transcript.text).toBe("test input transcript"); ws.close(); }); }); describe("VoiceInput — interrupt before start_call", () => { it("does not create phantom in-call state", async () => { const { ws } = await connectWS(uniquePath("test-voice-input-agent")); await waitForStatus(ws, "idle"); // Send interrupt before start_call — should not create phantom state sendJSON(ws, { type: "interrupt" }); // Now send audio — it should be silently dropped (not buffered) ws.send(makeSilentAudio(20000)); // Give a moment for processing await new Promise((resolve) => setTimeout(resolve, 200)); // Now start a proper call — should work normally sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); // Send audio and end_of_speech — should produce a transcript ws.send(makeSilentAudio(20000)); sendJSON(ws, { type: "end_of_speech" }); const transcript = (await waitForMessageMatching( ws, (m) => typeof m === "object" && m !== null && (m as Record).type === "transcript" && (m as Record).role === "user" )) as Record; // Should produce a normal transcript (audio after start_call was processed) expect(transcript.text).toBe("test input transcript"); ws.close(); }); }); describe("VoiceInput — no TTS or response generation", () => { it("does not send audio_config or agent transcript", async () => { const { ws } = await connectWS(uniquePath("test-voice-input-agent")); await waitForStatus(ws, "idle"); sendJSON(ws, { type: "start_call" }); await waitForStatus(ws, "listening"); ws.send(makeSilentAudio(20000)); sendJSON(ws, { type: "end_of_speech" }); // Collect all messages until we get back to listening const messages: Record[] = []; await new Promise((resolve, reject) => { const timer = setTimeout( () => reject(new Error("Timeout collecting messages")), 5000 ); const handler = (e: MessageEvent) => { if (typeof e.data === "string") { const msg = JSON.parse(e.data); messages.push(msg); if (msg.type === "status" && msg.status === "listening") { clearTimeout(timer); ws.removeEventListener("message", handler); resolve(); } } }; ws.addEventListener("message", handler); }); // Should NOT have audio_config, agent transcript, or TTS audio const types = messages.map((m) => m.type); expect(types).not.toContain("audio_config"); expect( messages.filter((m) => m.type === "transcript" && m.role === "agent") ).toHaveLength(0); // Should have user transcript expect( messages.filter((m) => m.type === "transcript" && m.role === "user") ).toHaveLength(1); ws.close(); }); });