branch:
voice.test.ts
61673 bytesRaw
/**
 * Server-side VoiceAgent tests.
 *
 * Uses a TestVoiceAgent that stubs STT/TTS/VAD with deterministic results.
 * Tests cover: voice protocol, pipeline flow, conversation persistence,
 * interruption handling, text messages, and the beforeCallStart hook.
 */
import { env } from "cloudflare:workers";
import { createExecutionContext } from "cloudflare:test";
import { describe, expect, it } from "vitest";
import worker from "./worker";
// --- Helpers ---

async function connectWS(path: string) {
  const ctx = createExecutionContext();
  const req = new Request(`http://example.com${path}`, {
    headers: { Upgrade: "websocket" }
  });
  const res = await worker.fetch(req, env, ctx);
  expect(res.status).toBe(101);
  const ws = res.webSocket as WebSocket;
  expect(ws).toBeDefined();
  ws.accept();
  return { ws, ctx };
}

/** Collect messages until we find one matching the predicate, or timeout. */
function waitForMessageMatching(
  ws: WebSocket,
  predicate: (msg: unknown) => boolean,
  timeout = 5000
): Promise<unknown> {
  return new Promise((resolve, reject) => {
    const timer = setTimeout(
      () => reject(new Error("Timeout waiting for matching message")),
      timeout
    );
    const handler = (e: MessageEvent) => {
      const msg = typeof e.data === "string" ? JSON.parse(e.data) : e.data;
      if (predicate(msg)) {
        clearTimeout(timer);
        ws.removeEventListener("message", handler);
        resolve(msg);
      }
    };
    ws.addEventListener("message", handler);
  });
}

function sendJSON(ws: WebSocket, msg: Record<string, unknown>) {
  ws.send(JSON.stringify(msg));
}

// Use unique instance names to avoid interference between tests
let instanceCounter = 0;
function uniquePath() {
  return `/agents/test-voice-agent/voice-test-${++instanceCounter}`;
}

// --- Tests ---

/** Wait for a voice status message with a specific status value. */
function waitForStatus(ws: WebSocket, status: string) {
  return waitForMessageMatching(
    ws,
    (m) =>
      typeof m === "object" &&
      m !== null &&
      (m as Record<string, unknown>).type === "status" &&
      (m as Record<string, unknown>).status === status
  );
}

describe("VoiceAgent — protocol", () => {
  it("sends idle status on connect", async () => {
    const { ws } = await connectWS(uniquePath());
    // Agent base class sends cf_agent_identity and cf_agent_mcp_servers first;
    // wait specifically for the voice idle status.
    const msg = await waitForStatus(ws, "idle");
    expect(msg).toEqual({ type: "status", status: "idle" });
    ws.close();
  });

  it("sends listening status on start_call", async () => {
    const { ws } = await connectWS(uniquePath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    const msg = await waitForStatus(ws, "listening");
    expect(msg).toEqual({ type: "status", status: "listening" });
    ws.close();
  });

  it("sends idle status on end_call", async () => {
    const { ws } = await connectWS(uniquePath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    await waitForStatus(ws, "listening");

    sendJSON(ws, { type: "end_call" });
    const msg = await waitForStatus(ws, "idle");
    expect(msg).toEqual({ type: "status", status: "idle" });
    ws.close();
  });
});

describe("VoiceAgent — reconnect during call", () => {
  it("resumes call on new connection to same instance (simulates PartySocket reconnect)", async () => {
    // Use a fixed path so both connections hit the same DO instance
    const path = uniquePath();

    // First connection: start a call
    const { ws: ws1 } = await connectWS(path);
    await waitForStatus(ws1, "idle");
    sendJSON(ws1, { type: "start_call" });
    await waitForStatus(ws1, "listening");

    // Send some audio and get a transcript (proves call is working)
    ws1.send(new ArrayBuffer(20000));
    sendJSON(ws1, { type: "end_of_speech" });
    await waitForMessageMatching(
      ws1,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript" &&
        (m as Record<string, unknown>).role === "user"
    );

    // Disconnect (simulates network drop)
    ws1.close();

    // Second connection: reconnect to the same instance
    const { ws: ws2 } = await connectWS(path);
    await waitForStatus(ws2, "idle");

    // Client would re-send start_call on reconnect
    sendJSON(ws2, { type: "start_call" });
    await waitForStatus(ws2, "listening");

    // Send audio on the new connection — should work normally
    ws2.send(new ArrayBuffer(20000));
    sendJSON(ws2, { type: "end_of_speech" });

    const transcript = (await waitForMessageMatching(
      ws2,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript" &&
        (m as Record<string, unknown>).role === "user"
    )) as Record<string, unknown>;

    expect(transcript.text).toBe("test transcript");
    ws2.close();
  });

  it("preserves conversation history across reconnect", async () => {
    const path = uniquePath();

    // First connection: have a conversation
    const { ws: ws1 } = await connectWS(path);
    await waitForStatus(ws1, "idle");
    sendJSON(ws1, { type: "start_call" });
    await waitForStatus(ws1, "listening");

    ws1.send(new ArrayBuffer(20000));
    sendJSON(ws1, { type: "end_of_speech" });

    // Wait for assistant response (conversation saved to SQLite)
    await waitForMessageMatching(
      ws1,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript_end"
    );

    ws1.close();

    // Second connection: new call on the same instance
    const { ws: ws2 } = await connectWS(path);
    await waitForStatus(ws2, "idle");
    sendJSON(ws2, { type: "start_call" });
    await waitForStatus(ws2, "listening");

    // Send another turn
    ws2.send(new ArrayBuffer(20000));
    sendJSON(ws2, { type: "end_of_speech" });

    // Should still produce a response (history preserved in SQLite)
    const transcript = (await waitForMessageMatching(
      ws2,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript_end"
    )) as Record<string, unknown>;

    expect(transcript.text).toBe("Echo: test transcript");
    ws2.close();
  });
});

describe("VoiceAgent — audio pipeline", () => {
  it("processes audio and returns user transcript on end_of_speech", async () => {
    const { ws } = await connectWS(uniquePath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    await waitForStatus(ws, "listening");

    // Send enough audio data (> minAudioBytes = 16000)
    ws.send(new ArrayBuffer(20000));

    // Trigger end of speech
    sendJSON(ws, { type: "end_of_speech" });

    // Wait for the user transcript message
    const transcript = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript" &&
        (m as Record<string, unknown>).role === "user"
    )) as Record<string, unknown>;

    expect(transcript.text).toBe("test transcript");
    ws.close();
  });

  it("returns assistant response after processing", async () => {
    const { ws } = await connectWS(uniquePath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    await waitForStatus(ws, "listening");

    ws.send(new ArrayBuffer(20000));
    sendJSON(ws, { type: "end_of_speech" });

    // Wait for the assistant transcript_end
    const transcriptEnd = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript_end"
    )) as Record<string, unknown>;

    expect(transcriptEnd.text).toBe("Echo: test transcript");
    ws.close();
  });

  it("sends pipeline metrics after processing", async () => {
    const { ws } = await connectWS(uniquePath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    await waitForStatus(ws, "listening");

    ws.send(new ArrayBuffer(20000));
    sendJSON(ws, { type: "end_of_speech" });

    // Wait for metrics
    const metrics = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "metrics"
    )) as Record<string, unknown>;

    expect(metrics).toHaveProperty("vad_ms");
    expect(metrics).toHaveProperty("stt_ms");
    expect(metrics).toHaveProperty("llm_ms");
    expect(metrics).toHaveProperty("tts_ms");
    expect(metrics).toHaveProperty("first_audio_ms");
    expect(metrics).toHaveProperty("total_ms");

    ws.close();
  });
});

describe("VoiceAgent — text messages", () => {
  it("handles text_message without an active call (text-only response)", async () => {
    const { ws } = await connectWS(uniquePath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "text_message", text: "hello" });

    // Wait for the assistant transcript_end
    const transcriptEnd = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript_end"
    )) as Record<string, unknown>;

    expect(transcriptEnd.text).toBe("Echo: hello");

    // Should end with idle status (not listening, since no call is active)
    const idleStatus = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "status" &&
        (m as Record<string, unknown>).status === "idle"
    )) as Record<string, unknown>;

    expect(idleStatus.status).toBe("idle");
    ws.close();
  });

  it("ignores text_message with missing text field", async () => {
    const { ws } = await connectWS(uniquePath());
    await waitForStatus(ws, "idle");

    // Send text_message without text field — should not crash
    sendJSON(ws, { type: "text_message" });

    // Prove connection is still alive by sending a valid message
    sendJSON(ws, { type: "text_message", text: "alive" });

    const transcriptEnd = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript_end"
    )) as Record<string, unknown>;

    expect(transcriptEnd.text).toBe("Echo: alive");
    ws.close();
  });

  it("ignores empty text_message", async () => {
    const { ws } = await connectWS(uniquePath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "text_message", text: "" });

    // Prove connection is still alive
    sendJSON(ws, { type: "text_message", text: "still works" });

    const transcriptEnd = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript_end"
    )) as Record<string, unknown>;

    expect(transcriptEnd.text).toBe("Echo: still works");
    ws.close();
  });
});

describe("VoiceAgent — interruption", () => {
  it("returns to listening status after interrupt", async () => {
    const { ws } = await connectWS(uniquePath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    await waitForStatus(ws, "listening");

    // Send audio and trigger pipeline
    ws.send(new ArrayBuffer(20000));
    sendJSON(ws, { type: "end_of_speech" });

    // Immediately interrupt
    sendJSON(ws, { type: "interrupt" });

    // Should eventually return to listening
    const listeningStatus = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "status" &&
        (m as Record<string, unknown>).status === "listening"
    )) as Record<string, unknown>;

    expect(listeningStatus.status).toBe("listening");
    ws.close();
  });
});

describe("VoiceAgent — non-voice messages", () => {
  it("does not crash on unknown JSON message types", async () => {
    const { ws } = await connectWS(uniquePath());
    await waitForStatus(ws, "idle");

    // Send a custom message type
    sendJSON(ws, { type: "custom_event", data: "hello" });

    // Prove connection is still alive
    sendJSON(ws, { type: "text_message", text: "still alive" });

    const transcriptEnd = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript_end"
    )) as Record<string, unknown>;

    expect(transcriptEnd.text).toBe("Echo: still alive");
    ws.close();
  });

  it("does not crash on non-JSON string messages", async () => {
    const { ws } = await connectWS(uniquePath());
    await waitForStatus(ws, "idle");

    // Send non-JSON text
    ws.send("this is not json {{{");

    // Prove connection is still alive
    sendJSON(ws, { type: "text_message", text: "works" });

    const transcriptEnd = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript_end"
    )) as Record<string, unknown>;

    expect(transcriptEnd.text).toBe("Echo: works");
    ws.close();
  });
});

describe("VoiceAgent — beforeCallStart rejection", () => {
  it("does not transition to listening when beforeCallStart returns false", async () => {
    const path = uniquePath();
    const { ws } = await connectWS(path);
    await waitForStatus(ws, "idle");

    // Toggle beforeCallStart to false via a control message
    sendJSON(ws, { type: "_set_before_call_start", value: false });

    // Wait for the ack to ensure the flag is set
    await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "_ack"
    );

    // Now try to start a call — should be rejected silently
    sendJSON(ws, { type: "start_call" });

    // Prove the agent is still functional and in idle state:
    // send a text_message which should work and return idle (not listening)
    sendJSON(ws, { type: "text_message", text: "still idle" });

    const transcriptEnd = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript_end"
    )) as Record<string, unknown>;

    expect(transcriptEnd.text).toBe("Echo: still idle");

    // Should return to idle (not listening), proving start_call was rejected
    const idleStatus = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "status" &&
        (m as Record<string, unknown>).status === "idle"
    )) as Record<string, unknown>;

    expect(idleStatus.status).toBe("idle");
    ws.close();
  });

  it("transitions to listening when beforeCallStart returns true (default)", async () => {
    const { ws } = await connectWS(uniquePath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    const msg = await waitForStatus(ws, "listening");
    expect(msg).toEqual({ type: "status", status: "listening" });
    ws.close();
  });
});

describe("VoiceAgent — audio_config", () => {
  it("sends audio_config message on start_call", async () => {
    const { ws } = await connectWS(uniquePath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });

    // audio_config should arrive before listening status
    const config = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "audio_config"
    )) as Record<string, unknown>;

    expect(config.format).toBe("mp3");

    // listening should follow
    await waitForStatus(ws, "listening");
    ws.close();
  });
});

describe("VoiceAgent — format negotiation", () => {
  it("sends configured format even when client requests a different one", async () => {
    const { ws } = await connectWS(uniquePath());
    await waitForStatus(ws, "idle");

    // Client requests pcm16, but the test agent is configured for mp3
    sendJSON(ws, { type: "start_call", preferred_format: "pcm16" });

    const config = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "audio_config"
    )) as Record<string, unknown>;

    // Server always sends its configured format
    expect(config.format).toBe("mp3");
    await waitForStatus(ws, "listening");
    ws.close();
  });

  it("sends configured format when client does not request one", async () => {
    const { ws } = await connectWS(uniquePath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });

    const config = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "audio_config"
    )) as Record<string, unknown>;

    expect(config.format).toBe("mp3");
    await waitForStatus(ws, "listening");
    ws.close();
  });
});

describe("VoiceAgent — audio buffer limits", () => {
  it("does not process audio shorter than minAudioBytes", async () => {
    const { ws } = await connectWS(uniquePath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    await waitForStatus(ws, "listening");

    // Send very short audio (less than 16000 bytes)
    ws.send(new ArrayBuffer(100));
    sendJSON(ws, { type: "end_of_speech" });

    // Should get listening status back (not thinking/processing)
    const msg = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "status" &&
        (m as Record<string, unknown>).status === "listening"
    )) as Record<string, unknown>;

    expect(msg.status).toBe("listening");
    ws.close();
  });

  it("ignores audio chunks when not in a call", async () => {
    const { ws } = await connectWS(uniquePath());
    await waitForStatus(ws, "idle");

    // Send audio without start_call — should be silently ignored
    ws.send(new ArrayBuffer(20000));

    // Prove connection is still alive
    sendJSON(ws, { type: "text_message", text: "alive" });

    const transcriptEnd = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript_end"
    )) as Record<string, unknown>;

    expect(transcriptEnd.text).toBe("Echo: alive");
    ws.close();
  });
});

describe("VoiceAgent — protocol versioning", () => {
  it("sends welcome message with protocol_version on connect", async () => {
    const { ws } = await connectWS(uniquePath());

    const welcome = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "welcome"
    )) as Record<string, unknown>;

    expect(welcome.type).toBe("welcome");
    expect(typeof welcome.protocol_version).toBe("number");
    ws.close();
  });

  it("accepts hello message without crashing", async () => {
    const { ws } = await connectWS(uniquePath());
    await waitForStatus(ws, "idle");

    // Send hello (like VoiceClient does on connect)
    sendJSON(ws, { type: "hello", protocol_version: 1 });

    // Prove connection is still alive
    sendJSON(ws, { type: "text_message", text: "after hello" });
    const transcriptEnd = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript_end"
    )) as Record<string, unknown>;

    expect(transcriptEnd.text).toBe("Echo: after hello");
    ws.close();
  });
});

describe("VoiceAgent — lifecycle hook counting", () => {
  it("increments onCallStart and onCallEnd counters", async () => {
    const { ws } = await connectWS(uniquePath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    await waitForStatus(ws, "listening");

    sendJSON(ws, { type: "end_call" });
    await waitForStatus(ws, "idle");

    // Query the hook counters
    sendJSON(ws, { type: "_get_counts" });
    const counts = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "_counts"
    )) as Record<string, unknown>;

    expect(counts.callStart).toBe(1);
    expect(counts.callEnd).toBe(1);
    expect(counts.interrupt).toBe(0);
    ws.close();
  });

  it("increments onInterrupt counter", async () => {
    const { ws } = await connectWS(uniquePath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    await waitForStatus(ws, "listening");

    ws.send(new ArrayBuffer(20000));
    sendJSON(ws, { type: "end_of_speech" });

    // Wait for pipeline to start
    await waitForStatus(ws, "thinking");

    sendJSON(ws, { type: "interrupt" });
    await waitForStatus(ws, "listening");

    sendJSON(ws, { type: "_get_counts" });
    const counts = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "_counts"
    )) as Record<string, unknown>;

    expect(counts.interrupt).toBe(1);
    ws.close();
  });
});

describe("VoiceAgent — forceEndCall", () => {
  it("ends call and returns to idle when forceEndCall is triggered", async () => {
    const { ws } = await connectWS(uniquePath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    await waitForStatus(ws, "listening");

    // Trigger forceEndCall via control message
    sendJSON(ws, { type: "_force_end_call" });

    const idleStatus = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "status" &&
        (m as Record<string, unknown>).status === "idle"
    )) as Record<string, unknown>;

    expect(idleStatus.status).toBe("idle");

    // Verify onCallEnd was called
    sendJSON(ws, { type: "_get_counts" });
    const counts = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "_counts"
    )) as Record<string, unknown>;

    expect(counts.callStart).toBe(1);
    expect(counts.callEnd).toBe(1);
    ws.close();
  });

  it("is a no-op when not in a call", async () => {
    const { ws } = await connectWS(uniquePath());
    await waitForStatus(ws, "idle");

    // forceEndCall when not in a call — should not crash
    sendJSON(ws, { type: "_force_end_call" });

    // Prove connection is still alive
    sendJSON(ws, { type: "text_message", text: "still alive" });
    const transcriptEnd = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript_end"
    )) as Record<string, unknown>;

    expect(transcriptEnd.text).toBe("Echo: still alive");
    ws.close();
  });
});

describe("VoiceAgent — binary TTS audio", () => {
  it("sends binary audio data after assistant response", async () => {
    const { ws } = await connectWS(uniquePath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    await waitForStatus(ws, "listening");

    ws.send(new ArrayBuffer(20000));
    sendJSON(ws, { type: "end_of_speech" });

    // Wait for binary audio data from server
    const audioData = await new Promise<ArrayBuffer>((resolve, reject) => {
      const timer = setTimeout(
        () => reject(new Error("Timeout waiting for binary audio")),
        5000
      );
      const handler = (e: MessageEvent) => {
        if (e.data instanceof ArrayBuffer) {
          clearTimeout(timer);
          ws.removeEventListener("message", handler);
          resolve(e.data);
        }
      };
      ws.addEventListener("message", handler);
    });

    // TestTTS encodes text as bytes, so audio should be non-empty
    expect(audioData.byteLength).toBeGreaterThan(0);
    ws.close();
  });
});

describe("VoiceAgent — conversation persistence", () => {
  it("accumulates messages in SQLite across turns", async () => {
    const path = uniquePath();
    const { ws } = await connectWS(path);
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    await waitForStatus(ws, "listening");

    // First turn
    ws.send(new ArrayBuffer(20000));
    sendJSON(ws, { type: "end_of_speech" });
    await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript_end"
    );

    // Wait for listening status (pipeline complete)
    await waitForStatus(ws, "listening");

    // Second turn
    ws.send(new ArrayBuffer(20000));
    sendJSON(ws, { type: "end_of_speech" });
    await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript_end"
    );

    // Check message count: 2 user + 2 assistant = 4
    sendJSON(ws, { type: "_get_message_count" });
    const countMsg = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "_message_count"
    )) as Record<string, unknown>;

    expect(countMsg.count).toBe(4);
    ws.close();
  });
});

describe("VoiceAgent — text_message during active call", () => {
  it("returns to listening (not idle) after text_message during a call", async () => {
    const { ws } = await connectWS(uniquePath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    await waitForStatus(ws, "listening");

    // Send text message while in a call
    sendJSON(ws, { type: "text_message", text: "mid-call text" });

    // Wait for assistant response
    const transcriptEnd = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript_end"
    )) as Record<string, unknown>;

    expect(transcriptEnd.text).toBe("Echo: mid-call text");

    // Should return to listening (not idle) since call is still active
    const listeningStatus = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "status" &&
        ((m as Record<string, unknown>).status === "listening" ||
          (m as Record<string, unknown>).status === "idle")
    )) as Record<string, unknown>;

    expect(listeningStatus.status).toBe("listening");
    ws.close();
  });
});

describe("VoiceAgent — multiple sequential turns", () => {
  it("handles two audio turns in the same call", async () => {
    const { ws } = await connectWS(uniquePath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    await waitForStatus(ws, "listening");

    // First turn
    ws.send(new ArrayBuffer(20000));
    sendJSON(ws, { type: "end_of_speech" });

    const t1 = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript" &&
        (m as Record<string, unknown>).role === "user"
    )) as Record<string, unknown>;

    expect(t1.text).toBe("test transcript");

    // Wait for pipeline to finish (listening again)
    await waitForStatus(ws, "listening");

    // Second turn
    ws.send(new ArrayBuffer(20000));
    sendJSON(ws, { type: "end_of_speech" });

    const t2 = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript" &&
        (m as Record<string, unknown>).role === "user"
    )) as Record<string, unknown>;

    expect(t2.text).toBe("test transcript");
    ws.close();
  });
});

describe("VoiceAgent — multiple connections", () => {
  it("allows two clients to start calls on the same DO instance", async () => {
    const path = uniquePath();

    const { ws: ws1 } = await connectWS(path);
    await waitForStatus(ws1, "idle");
    sendJSON(ws1, { type: "start_call" });
    await waitForStatus(ws1, "listening");

    // Second connection to the same DO
    const { ws: ws2 } = await connectWS(path);
    await waitForStatus(ws2, "idle");
    sendJSON(ws2, { type: "start_call" });
    await waitForStatus(ws2, "listening");

    // Both connections should process audio independently
    ws1.send(new ArrayBuffer(20000));
    sendJSON(ws1, { type: "end_of_speech" });

    const t1 = (await waitForMessageMatching(
      ws1,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript" &&
        (m as Record<string, unknown>).role === "user"
    )) as Record<string, unknown>;

    expect(t1.text).toBe("test transcript");

    ws2.send(new ArrayBuffer(20000));
    sendJSON(ws2, { type: "end_of_speech" });

    const t2 = (await waitForMessageMatching(
      ws2,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript" &&
        (m as Record<string, unknown>).role === "user"
    )) as Record<string, unknown>;

    expect(t2.text).toBe("test transcript");

    ws1.close();
    ws2.close();
  });

  it("one client ending call does not affect the other", async () => {
    const path = uniquePath();

    const { ws: ws1 } = await connectWS(path);
    await waitForStatus(ws1, "idle");
    sendJSON(ws1, { type: "start_call" });
    await waitForStatus(ws1, "listening");

    const { ws: ws2 } = await connectWS(path);
    await waitForStatus(ws2, "idle");
    sendJSON(ws2, { type: "start_call" });
    await waitForStatus(ws2, "listening");

    // Client 1 ends their call
    sendJSON(ws1, { type: "end_call" });
    await waitForStatus(ws1, "idle");

    // Client 2 should still be able to process audio
    ws2.send(new ArrayBuffer(20000));
    sendJSON(ws2, { type: "end_of_speech" });

    const t2 = (await waitForMessageMatching(
      ws2,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript" &&
        (m as Record<string, unknown>).role === "user"
    )) as Record<string, unknown>;

    expect(t2.text).toBe("test transcript");

    ws1.close();
    ws2.close();
  });
});

// --- VAD retry recovery tests ---

/** Connect to the VAD-retry test agent. */
async function connectVadRetryWS(path: string) {
  const ctx = createExecutionContext();
  const req = new Request(`http://example.com${path}`, {
    headers: { Upgrade: "websocket" }
  });
  const res = await worker.fetch(req, env, ctx);
  expect(res.status).toBe(101);
  const ws = res.webSocket as WebSocket;
  expect(ws).toBeDefined();
  ws.accept();
  return { ws, ctx };
}

let vadRetryCounter = 0;
function uniqueVadRetryPath() {
  return `/agents/test-vad-retry-voice-agent/vad-retry-${++vadRetryCounter}`;
}

describe("VoiceAgent — VAD retry recovery", () => {
  it("processes audio after VAD rejects and retry timer fires", async () => {
    const { ws } = await connectVadRetryWS(uniqueVadRetryPath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    await waitForStatus(ws, "listening");

    // Send enough audio to pass minAudioBytes
    ws.send(new ArrayBuffer(20000));
    sendJSON(ws, { type: "end_of_speech" });

    // VAD rejects first time → server sends "listening" (not "thinking")
    // Then retry timer fires after 200ms → processes without VAD
    // We should eventually see the user transcript
    const transcript = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript" &&
        (m as Record<string, unknown>).role === "user"
    )) as Record<string, unknown>;

    expect(transcript.text).toBe("test transcript");

    // And the assistant response
    const assistantEnd = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript_end"
    )) as Record<string, unknown>;

    expect(assistantEnd.text).toBe("Echo: test transcript");
    ws.close();
  });

  it("cancels VAD retry timer when user speaks again", async () => {
    const { ws } = await connectVadRetryWS(uniqueVadRetryPath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    await waitForStatus(ws, "listening");

    // First end_of_speech — VAD rejects, retry timer starts
    ws.send(new ArrayBuffer(20000));
    sendJSON(ws, { type: "end_of_speech" });

    // Before the 200ms retry timer fires, send a new end_of_speech
    // This clears the old timer and triggers a fresh pipeline run.
    // The second VAD call accepts.
    ws.send(new ArrayBuffer(20000));
    sendJSON(ws, { type: "end_of_speech" });

    const transcript = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript" &&
        (m as Record<string, unknown>).role === "user"
    )) as Record<string, unknown>;

    expect(transcript.text).toBe("test transcript");
    ws.close();
  });
});

// --- Streaming STT tests ---

/** Connect to the streaming STT test agent. */
async function connectStreamingWS(path: string) {
  const ctx = createExecutionContext();
  const req = new Request(`http://example.com${path}`, {
    headers: { Upgrade: "websocket" }
  });
  const res = await worker.fetch(req, env, ctx);
  expect(res.status).toBe(101);
  const ws = res.webSocket as WebSocket;
  expect(ws).toBeDefined();
  ws.accept();
  return { ws, ctx };
}

let streamingInstanceCounter = 0;
function uniqueStreamingPath() {
  return `/agents/test-streaming-voice-agent/streaming-test-${++streamingInstanceCounter}`;
}

/** Collect all messages matching a type until another message type arrives or timeout. */
function collectMessages(
  ws: WebSocket,
  type: string,
  timeout = 3000
): Promise<unknown[]> {
  return new Promise((resolve) => {
    const collected: unknown[] = [];
    const timer = setTimeout(() => {
      ws.removeEventListener("message", handler);
      resolve(collected);
    }, timeout);
    const handler = (e: MessageEvent) => {
      if (typeof e.data !== "string") return;
      try {
        const msg = JSON.parse(e.data);
        if (msg.type === type) {
          collected.push(msg);
        }
      } catch {
        // ignore
      }
    };
    ws.addEventListener("message", handler);
    // Also resolve when we get a transcript (user) which means pipeline finished STT
    const doneHandler = (e: MessageEvent) => {
      if (typeof e.data !== "string") return;
      try {
        const msg = JSON.parse(e.data);
        if (msg.type === "transcript" && msg.role === "user") {
          clearTimeout(timer);
          ws.removeEventListener("message", handler);
          ws.removeEventListener("message", doneHandler);
          resolve(collected);
        }
      } catch {
        // ignore
      }
    };
    ws.addEventListener("message", doneHandler);
  });
}

describe("Streaming STT — basic pipeline", () => {
  it("produces transcript via streaming STT (no batch stt needed)", async () => {
    const { ws } = await connectStreamingWS(uniqueStreamingPath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    await waitForStatus(ws, "listening");

    // Send enough audio for minAudioBytes (> 16000)
    sendJSON(ws, { type: "start_of_speech" });
    ws.send(new ArrayBuffer(20000));
    sendJSON(ws, { type: "end_of_speech" });

    // Wait for user transcript
    const transcript = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript" &&
        (m as Record<string, unknown>).role === "user"
    )) as Record<string, unknown>;

    // TestStreamingSTTSession.finish() returns "streaming transcript (N bytes)"
    expect(transcript.text).toBe("streaming transcript (20000 bytes)");
    ws.close();
  });

  it("sends transcript_interim messages during audio streaming", async () => {
    const { ws } = await connectStreamingWS(uniqueStreamingPath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    await waitForStatus(ws, "listening");

    // Start collecting interim messages before sending audio
    const interimPromise = collectMessages(ws, "transcript_interim");

    // Send audio in multiple chunks to trigger interim callbacks
    sendJSON(ws, { type: "start_of_speech" });
    ws.send(new ArrayBuffer(5000));
    ws.send(new ArrayBuffer(5000));
    ws.send(new ArrayBuffer(10000));
    sendJSON(ws, { type: "end_of_speech" });

    const interims = await interimPromise;

    // Should have received at least one interim message
    expect(interims.length).toBeGreaterThan(0);

    // Each interim should have a text field
    for (const interim of interims) {
      expect(interim).toHaveProperty("type", "transcript_interim");
      expect(interim).toHaveProperty("text");
    }

    ws.close();
  });

  it("returns assistant response after streaming STT", async () => {
    const { ws } = await connectStreamingWS(uniqueStreamingPath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    await waitForStatus(ws, "listening");

    sendJSON(ws, { type: "start_of_speech" });
    ws.send(new ArrayBuffer(20000));
    sendJSON(ws, { type: "end_of_speech" });

    // Wait for assistant transcript_end
    const transcriptEnd = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript_end"
    )) as Record<string, unknown>;

    expect(transcriptEnd.text).toBe("Echo: streaming transcript (20000 bytes)");
    ws.close();
  });

  it("sends pipeline metrics after streaming STT processing", async () => {
    const { ws } = await connectStreamingWS(uniqueStreamingPath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    await waitForStatus(ws, "listening");

    sendJSON(ws, { type: "start_of_speech" });
    ws.send(new ArrayBuffer(20000));
    sendJSON(ws, { type: "end_of_speech" });

    const metrics = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "metrics"
    )) as Record<string, unknown>;

    expect(metrics).toHaveProperty("vad_ms");
    expect(metrics).toHaveProperty("stt_ms");
    expect(metrics).toHaveProperty("llm_ms");
    expect(metrics).toHaveProperty("tts_ms");
    expect(metrics).toHaveProperty("first_audio_ms");
    expect(metrics).toHaveProperty("total_ms");

    // STT should be very fast — it's just a flush, not full transcription
    expect(metrics.stt_ms).toBeLessThan(100);

    ws.close();
  });
});

describe("Streaming STT — start_of_speech", () => {
  it("handles explicit start_of_speech message", async () => {
    const { ws } = await connectStreamingWS(uniqueStreamingPath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    await waitForStatus(ws, "listening");

    // Explicitly signal speech start (like new VoiceClient would)
    sendJSON(ws, { type: "start_of_speech" });

    // Send audio
    ws.send(new ArrayBuffer(20000));
    sendJSON(ws, { type: "end_of_speech" });

    const transcript = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript" &&
        (m as Record<string, unknown>).role === "user"
    )) as Record<string, unknown>;

    expect(transcript.text).toBe("streaming transcript (20000 bytes)");
    ws.close();
  });

  it("returns to listening without start_of_speech (no auto-creation)", async () => {
    const { ws } = await connectStreamingWS(uniqueStreamingPath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    await waitForStatus(ws, "listening");

    // No start_of_speech — audio is buffered but no STT session created.
    // end_of_speech falls through to the batch STT path which gracefully
    // returns to listening when no batch STT provider is configured.
    ws.send(new ArrayBuffer(20000));
    sendJSON(ws, { type: "end_of_speech" });

    const msg = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "status" &&
        (m as Record<string, unknown>).status === "listening"
    )) as Record<string, unknown>;

    expect(msg.status).toBe("listening");
    ws.close();
  });
});

describe("Streaming STT — interruption", () => {
  it("aborts streaming STT session on interrupt", async () => {
    const { ws } = await connectStreamingWS(uniqueStreamingPath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    await waitForStatus(ws, "listening");

    // Send audio to create a session
    sendJSON(ws, { type: "start_of_speech" });
    ws.send(new ArrayBuffer(20000));

    // Interrupt before end_of_speech
    sendJSON(ws, { type: "interrupt" });

    // Should return to listening
    const listeningStatus = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "status" &&
        (m as Record<string, unknown>).status === "listening"
    )) as Record<string, unknown>;

    expect(listeningStatus.status).toBe("listening");

    // Now send new audio — should create a fresh session and work normally
    sendJSON(ws, { type: "start_of_speech" });
    ws.send(new ArrayBuffer(25000));
    sendJSON(ws, { type: "end_of_speech" });

    const transcript = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript" &&
        (m as Record<string, unknown>).role === "user"
    )) as Record<string, unknown>;

    expect(transcript.text).toBe("streaming transcript (25000 bytes)");
    ws.close();
  });

  it("aborts streaming STT session on end_call", async () => {
    const { ws } = await connectStreamingWS(uniqueStreamingPath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    await waitForStatus(ws, "listening");

    ws.send(new ArrayBuffer(20000));

    sendJSON(ws, { type: "end_call" });

    const idleStatus = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "status" &&
        (m as Record<string, unknown>).status === "idle"
    )) as Record<string, unknown>;

    expect(idleStatus.status).toBe("idle");
    ws.close();
  });
});

describe("Streaming STT — short audio rejection", () => {
  it("aborts session and returns to listening on short audio", async () => {
    const { ws } = await connectStreamingWS(uniqueStreamingPath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    await waitForStatus(ws, "listening");

    // Send very short audio (< minAudioBytes = 16000)
    ws.send(new ArrayBuffer(100));
    sendJSON(ws, { type: "end_of_speech" });

    // Should get listening status back (session aborted, no processing)
    const msg = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "status" &&
        (m as Record<string, unknown>).status === "listening"
    )) as Record<string, unknown>;

    expect(msg.status).toBe("listening");
    ws.close();
  });
});

// --- Provider-driven EOT tests ---

/** Connect to the EOT test agent. */
async function connectEOTWS(path: string) {
  const ctx = createExecutionContext();
  const req = new Request(`http://example.com${path}`, {
    headers: { Upgrade: "websocket" }
  });
  const res = await worker.fetch(req, env, ctx);
  expect(res.status).toBe(101);
  const ws = res.webSocket as WebSocket;
  expect(ws).toBeDefined();
  ws.accept();
  return { ws, ctx };
}

let eotInstanceCounter = 0;
function uniqueEOTPath() {
  return `/agents/test-eot-voice-agent/eot-test-${++eotInstanceCounter}`;
}

describe("Provider-driven EOT — basic pipeline", () => {
  it("triggers pipeline immediately when provider fires onEndOfTurn", async () => {
    const { ws } = await connectEOTWS(uniqueEOTPath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    await waitForStatus(ws, "listening");

    // Send 20000 bytes — TestEOTStreamingSTTSession fires onEndOfTurn at this threshold
    sendJSON(ws, { type: "start_of_speech" });
    ws.send(new ArrayBuffer(20000));

    // Pipeline should start WITHOUT sending end_of_speech
    // Wait for user transcript
    const transcript = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript" &&
        (m as Record<string, unknown>).role === "user"
    )) as Record<string, unknown>;

    expect(transcript.text).toBe("eot transcript (20000 bytes)");
    ws.close();
  });

  it("returns assistant response after provider-driven EOT", async () => {
    const { ws } = await connectEOTWS(uniqueEOTPath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    await waitForStatus(ws, "listening");

    sendJSON(ws, { type: "start_of_speech" });
    ws.send(new ArrayBuffer(20000));

    const transcriptEnd = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript_end"
    )) as Record<string, unknown>;

    expect(transcriptEnd.text).toBe("Echo: eot transcript (20000 bytes)");
    ws.close();
  });

  it("sends binary TTS audio after provider-driven EOT", async () => {
    const { ws } = await connectEOTWS(uniqueEOTPath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    await waitForStatus(ws, "listening");

    sendJSON(ws, { type: "start_of_speech" });
    ws.send(new ArrayBuffer(20000));

    const audioData = await new Promise<ArrayBuffer>((resolve, reject) => {
      const timer = setTimeout(
        () => reject(new Error("Timeout waiting for binary audio")),
        5000
      );
      const handler = (e: MessageEvent) => {
        if (e.data instanceof ArrayBuffer) {
          clearTimeout(timer);
          ws.removeEventListener("message", handler);
          resolve(e.data);
        }
      };
      ws.addEventListener("message", handler);
    });

    expect(audioData.byteLength).toBeGreaterThan(0);
    ws.close();
  });

  it("sends pipeline metrics with zero VAD and STT times", async () => {
    const { ws } = await connectEOTWS(uniqueEOTPath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    await waitForStatus(ws, "listening");

    sendJSON(ws, { type: "start_of_speech" });
    ws.send(new ArrayBuffer(20000));

    const metrics = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "metrics"
    )) as Record<string, unknown>;

    expect(metrics).toHaveProperty("vad_ms", 0);
    expect(metrics).toHaveProperty("stt_ms", 0);
    expect(metrics).toHaveProperty("llm_ms");
    expect(metrics).toHaveProperty("tts_ms");
    expect(metrics).toHaveProperty("first_audio_ms");
    expect(metrics).toHaveProperty("total_ms");

    ws.close();
  });
});

describe("Provider-driven EOT — late end_of_speech", () => {
  it("ignores late end_of_speech after provider-driven EOT", async () => {
    const { ws } = await connectEOTWS(uniqueEOTPath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    await waitForStatus(ws, "listening");

    // Send enough audio to trigger EOT
    sendJSON(ws, { type: "start_of_speech" });
    ws.send(new ArrayBuffer(20000));

    // Wait for the pipeline to start (user transcript confirms it)
    await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript" &&
        (m as Record<string, unknown>).role === "user"
    );

    // Now send a late end_of_speech — this should be ignored
    sendJSON(ws, { type: "end_of_speech" });

    // Wait for the pipeline to complete normally
    const transcriptEnd = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript_end"
    )) as Record<string, unknown>;

    expect(transcriptEnd.text).toBe("Echo: eot transcript (20000 bytes)");

    // Should return to listening (not double-process)
    await waitForStatus(ws, "listening");
    ws.close();
  });
});

describe("Provider-driven EOT — multiple turns", () => {
  it("handles two EOT-driven turns in the same call", async () => {
    const { ws } = await connectEOTWS(uniqueEOTPath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    await waitForStatus(ws, "listening");

    // First turn: send audio to trigger EOT
    sendJSON(ws, { type: "start_of_speech" });
    ws.send(new ArrayBuffer(20000));

    const t1 = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript" &&
        (m as Record<string, unknown>).role === "user"
    )) as Record<string, unknown>;

    expect(t1.text).toBe("eot transcript (20000 bytes)");

    // Wait for pipeline to finish (listening again)
    await waitForStatus(ws, "listening");

    // Second turn: send audio to trigger another EOT
    sendJSON(ws, { type: "start_of_speech" });
    ws.send(new ArrayBuffer(20000));

    const t2 = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript" &&
        (m as Record<string, unknown>).role === "user"
    )) as Record<string, unknown>;

    expect(t2.text).toBe("eot transcript (20000 bytes)");
    ws.close();
  });

  it("accumulates messages in SQLite across EOT-driven turns", async () => {
    const path = uniqueEOTPath();
    const { ws } = await connectEOTWS(path);
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    await waitForStatus(ws, "listening");

    // First turn
    sendJSON(ws, { type: "start_of_speech" });
    ws.send(new ArrayBuffer(20000));
    await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript_end"
    );
    await waitForStatus(ws, "listening");

    // Second turn
    sendJSON(ws, { type: "start_of_speech" });
    ws.send(new ArrayBuffer(20000));
    await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript_end"
    );

    // Check message count: 2 user + 2 assistant = 4
    sendJSON(ws, { type: "_get_message_count" });
    const countMsg = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "_message_count"
    )) as Record<string, unknown>;

    expect(countMsg.count).toBe(4);
    ws.close();
  });
});

describe("Provider-driven EOT — interruption", () => {
  it("handles interrupt during EOT-triggered pipeline", async () => {
    const { ws } = await connectEOTWS(uniqueEOTPath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    await waitForStatus(ws, "listening");

    // Trigger EOT
    sendJSON(ws, { type: "start_of_speech" });
    ws.send(new ArrayBuffer(20000));

    // Wait for pipeline to start processing
    await waitForStatus(ws, "speaking");

    // Interrupt during processing
    sendJSON(ws, { type: "interrupt" });

    // Should return to listening
    const listeningStatus = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "status" &&
        (m as Record<string, unknown>).status === "listening"
    )) as Record<string, unknown>;

    expect(listeningStatus.status).toBe("listening");

    // New turn should work normally
    sendJSON(ws, { type: "start_of_speech" });
    ws.send(new ArrayBuffer(20000));

    const transcript = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript" &&
        (m as Record<string, unknown>).role === "user"
    )) as Record<string, unknown>;

    expect(transcript.text).toBe("eot transcript (20000 bytes)");
    ws.close();
  });
});

describe("Provider-driven EOT — sub-threshold audio", () => {
  it("does not trigger EOT when audio is below provider threshold", async () => {
    const { ws } = await connectEOTWS(uniqueEOTPath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    await waitForStatus(ws, "listening");

    // Send less than 20000 bytes — EOT won't fire
    sendJSON(ws, { type: "start_of_speech" });
    ws.send(new ArrayBuffer(10000));

    // Now send end_of_speech manually — should be processed via
    // normal streaming STT path (finish() returns the transcript)
    sendJSON(ws, { type: "end_of_speech" });

    // minAudioBytes is 16000, we only sent 10000 — too short
    // Should return to listening without processing
    const msg = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "status" &&
        (m as Record<string, unknown>).status === "listening"
    )) as Record<string, unknown>;

    expect(msg.status).toBe("listening");
    ws.close();
  });

  it("processes via finish() when above minAudioBytes but below EOT threshold", async () => {
    const { ws } = await connectEOTWS(uniqueEOTPath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    await waitForStatus(ws, "listening");

    // Send 18000 bytes — above minAudioBytes (16000) but below EOT threshold (20000)
    sendJSON(ws, { type: "start_of_speech" });
    ws.send(new ArrayBuffer(18000));

    // EOT won't fire. Use end_of_speech to trigger finish()
    sendJSON(ws, { type: "end_of_speech" });

    // Should process via streaming STT finish() path
    const transcript = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript" &&
        (m as Record<string, unknown>).role === "user"
    )) as Record<string, unknown>;

    // TestEOTStreamingSTTSession.finish() returns "eot transcript (N bytes)"
    expect(transcript.text).toBe("eot transcript (18000 bytes)");
    ws.close();
  });
});

describe("Provider-driven EOT — interim transcripts", () => {
  it("sends transcript_interim messages before EOT triggers", async () => {
    const { ws } = await connectEOTWS(uniqueEOTPath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    await waitForStatus(ws, "listening");

    // Start collecting interim messages
    const interimPromise = collectMessages(ws, "transcript_interim");

    // Send audio in chunks — each feed() fires onInterim
    sendJSON(ws, { type: "start_of_speech" });
    ws.send(new ArrayBuffer(5000));
    ws.send(new ArrayBuffer(5000));
    ws.send(new ArrayBuffer(5000));
    ws.send(new ArrayBuffer(5000)); // Total 20000 — triggers EOT

    const interims = await interimPromise;

    // Should have received interim messages before the final transcript
    expect(interims.length).toBeGreaterThan(0);

    for (const interim of interims) {
      expect(interim).toHaveProperty("type", "transcript_interim");
      expect(interim).toHaveProperty("text");
    }

    ws.close();
  });
});

// --- Regression tests for streaming-only graceful fallback ---

describe("Streaming STT — graceful fallback without batch STT", () => {
  it("returns to listening when end_of_speech arrives without streaming session (streaming-only agent)", async () => {
    // Regression: when only streamingStt is configured (no batch stt),
    // end_of_speech without a preceding start_of_speech should return
    // to listening instead of throwing "No STT provider configured".
    const { ws } = await connectStreamingWS(uniqueStreamingPath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    await waitForStatus(ws, "listening");

    // Send audio without start_of_speech
    ws.send(new ArrayBuffer(20000));
    sendJSON(ws, { type: "end_of_speech" });

    // Should gracefully return to listening (no crash)
    const msg = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "status" &&
        (m as Record<string, unknown>).status === "listening"
    )) as Record<string, unknown>;

    expect(msg.status).toBe("listening");

    // Prove agent is still functional after the fallback
    sendJSON(ws, { type: "start_of_speech" });
    ws.send(new ArrayBuffer(20000));
    sendJSON(ws, { type: "end_of_speech" });

    const transcript = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript" &&
        (m as Record<string, unknown>).role === "user"
    )) as Record<string, unknown>;

    expect(transcript.text).toBe("streaming transcript (20000 bytes)");
    ws.close();
  });

  it("EOT agent returns to listening when end_of_speech arrives without streaming session", async () => {
    // Same regression for EOT agent (no batch stt, no vad)
    const { ws } = await connectEOTWS(uniqueEOTPath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    await waitForStatus(ws, "listening");

    ws.send(new ArrayBuffer(20000));
    sendJSON(ws, { type: "end_of_speech" });

    const msg = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "status" &&
        (m as Record<string, unknown>).status === "listening"
    )) as Record<string, unknown>;

    expect(msg.status).toBe("listening");
    ws.close();
  });
});

describe("Streaming STT — no phantom sessions after end_of_speech", () => {
  it("does not create phantom sessions when audio continues after end_of_speech", async () => {
    const { ws } = await connectStreamingWS(uniqueStreamingPath());
    await waitForStatus(ws, "idle");

    sendJSON(ws, { type: "start_call" });
    await waitForStatus(ws, "listening");

    // First turn: normal flow
    sendJSON(ws, { type: "start_of_speech" });
    ws.send(new ArrayBuffer(20000));
    sendJSON(ws, { type: "end_of_speech" });

    await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript_end"
    );

    await waitForStatus(ws, "listening");

    // Simulate audio continuing after end_of_speech (like ScriptProcessorNode).
    // Without auto-creation, this should NOT create a phantom STT session.
    ws.send(new ArrayBuffer(20000));

    // Now do a second proper turn — should work normally
    sendJSON(ws, { type: "start_of_speech" });
    ws.send(new ArrayBuffer(20000));
    sendJSON(ws, { type: "end_of_speech" });

    const transcript = (await waitForMessageMatching(
      ws,
      (m) =>
        typeof m === "object" &&
        m !== null &&
        (m as Record<string, unknown>).type === "transcript" &&
        (m as Record<string, unknown>).role === "user"
    )) as Record<string, unknown>;

    // Should only reflect the second turn's audio (20000 bytes),
    // not accumulated phantom audio
    expect(transcript.text).toBe("streaming transcript (20000 bytes)");
    ws.close();
  });
});