/** * !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! * !! WARNING: EXPERIMENTAL — DO NOT USE IN PRODUCTION !! * !! !! * !! This API is under active development and WILL break between !! * !! releases. Method names, types, behavior, and the mixin signature !! * !! are all subject to change without notice. !! * !! !! * !! If you use this, pin your agents version and expect to rewrite !! * !! your code when upgrading. !! * !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! * * Experimental voice pipeline mixin for the Agents SDK. * * Usage: * import { Agent } from "agents"; * import { withVoice } from "agents/experimental/voice"; * * const VoiceAgent = withVoice(Agent); * * class MyAgent extends VoiceAgent { * async onTurn(transcript: string, context: VoiceTurnContext) { * const result = streamText({ ... }); * return result.textStream; * } * } * * This mixin adds the full voice pipeline: audio buffering, VAD, STT, * streaming TTS, interruption handling, conversation persistence, and * the WebSocket voice protocol. * * @experimental This API is not yet stable and may change. */ import type { Agent, Connection, WSMessage } from "agents"; import { SentenceChunker } from "./sentence-chunker"; import { iterateText, type TextSource } from "./text-stream"; import { VOICE_PROTOCOL_VERSION } from "./types"; import type { VoiceStatus, VoiceRole, VoiceAudioFormat, VoiceClientMessage, VoiceServerMessage, VoicePipelineMetrics, STTProvider, TTSProvider, StreamingTTSProvider, VADProvider, StreamingSTTProvider } from "./types"; import { AudioConnectionManager, sendVoiceJSON, DEFAULT_VAD_THRESHOLD, DEFAULT_MIN_AUDIO_BYTES, DEFAULT_VAD_PUSHBACK_SECONDS, DEFAULT_VAD_RETRY_MS } from "./audio-pipeline"; import type { VoiceInputAgentOptions } from "./voice-input"; // Re-export SentenceChunker for direct use export { SentenceChunker } from "./sentence-chunker"; // Re-export protocol version constant export { VOICE_PROTOCOL_VERSION } from "./types"; // Re-export shared types so existing imports from "agents/experimental/voice" still work export type { VoiceStatus, VoiceRole, VoiceAudioFormat, VoiceAudioInput, VoiceTransport, VoiceClientMessage, VoiceServerMessage, VoicePipelineMetrics, TranscriptMessage, STTProvider, TTSProvider, StreamingTTSProvider, VADProvider, StreamingSTTProvider, StreamingSTTSession, StreamingSTTSessionOptions } from "./types"; // Re-export voice input mixin (STT-only, no TTS/LLM) export { withVoiceInput } from "./voice-input"; export type { VoiceInputAgentOptions } from "./voice-input"; // Re-export text stream utility export { iterateText, type TextSource } from "./text-stream"; // Re-export SFU utility functions export { decodeVarint, encodeVarint, extractPayloadFromProtobuf, encodePayloadToProtobuf, downsample48kStereoTo16kMono, upsample16kMonoTo48kStereo, sfuFetch, createSFUSession, addSFUTracks, renegotiateSFUSession, createSFUWebSocketAdapter } from "./sfu-utils"; export type { SFUConfig } from "./sfu-utils"; // Re-export Workers AI providers and audio utility export { WorkersAISTT, WorkersAITTS, WorkersAIVAD, WorkersAIFluxSTT, pcmToWav } from "./workers-ai-providers"; export type { WorkersAISTTOptions, WorkersAITTSOptions, WorkersAIVADOptions, WorkersAIFluxSTTOptions } from "./workers-ai-providers"; // --- Public types --- /** Result from a VAD (Voice Activity Detection) provider. */ export interface VADResult { isComplete: boolean; probability: number; } /** Context passed to the `onTurn()` hook. */ export interface VoiceTurnContext { /** * The WebSocket connection that sent the audio. * Useful for sending custom JSON messages (e.g. tool progress). * WARNING: sending raw binary on this connection will interleave with * the TTS audio stream. Use `connection.send(JSON.stringify(...))` only. */ connection: Connection; /** Conversation history from SQLite (chronological order). */ messages: Array<{ role: VoiceRole; content: string }>; /** AbortSignal — aborted if user interrupts or disconnects. */ signal: AbortSignal; } /** Configuration options for the voice mixin. Passed to `withVoice()`. */ export interface VoiceAgentOptions extends VoiceInputAgentOptions { /** Max conversation history messages loaded for context. @default 20 */ historyLimit?: number; /** Audio format used for binary audio payloads sent to the client. @default "mp3" */ audioFormat?: VoiceAudioFormat; /** Max conversation messages to keep in SQLite. Oldest are pruned. @default 1000 */ maxMessageCount?: number; } // --- Default option values (voice-specific, not in audio-pipeline) --- const DEFAULT_HISTORY_LIMIT = 20; const DEFAULT_MAX_MESSAGE_COUNT = 1000; // --- Mixin --- // oxlint-disable-next-line @typescript-eslint/no-explicit-any -- mixin constructor constraint type Constructor = new (...args: any[]) => T; type AgentLike = Constructor< Pick< Agent, | "sql" | "getConnections" | "_unsafe_getConnectionFlag" | "_unsafe_setConnectionFlag" > >; /** * Voice pipeline mixin. Adds the full voice pipeline to an Agent class. * * Subclasses must set `stt` and `tts` provider properties. VAD is optional. * * @param Base - The Agent class to extend (e.g. `Agent`). * @param voiceOptions - Optional pipeline configuration. * * @example * ```typescript * import { Agent } from "agents"; * import { withVoice, WorkersAISTT, WorkersAITTS, WorkersAIVAD } from "agents/experimental/voice"; * * const VoiceAgent = withVoice(Agent); * * class MyAgent extends VoiceAgent { * stt = new WorkersAISTT(this.env.AI); * tts = new WorkersAITTS(this.env.AI); * vad = new WorkersAIVAD(this.env.AI); * * async onTurn(transcript, context) { * return "Hello! I heard you say: " + transcript; * } * } * ``` */ export function withVoice( Base: TBase, voiceOptions?: VoiceAgentOptions ) { console.log( "[@cloudflare/voice] Note: The voice API is experimental and may change between releases. Pin your version to avoid surprises." ); const opts = voiceOptions ?? {}; function opt( key: K, fallback: NonNullable ): NonNullable { return (opts[key] ?? fallback) as NonNullable; } class VoiceAgentMixin extends Base { // --- Provider properties (set by subclass) --- /** Speech-to-text provider (batch). Required unless streamingStt is set. */ stt?: STTProvider; /** Streaming speech-to-text provider. Optional — if set, used instead of batch `stt`. */ streamingStt?: StreamingSTTProvider; /** Text-to-speech provider. Required. May also implement StreamingTTSProvider. */ tts?: TTSProvider & Partial; /** Voice activity detection provider. Optional — if unset, every end_of_speech is treated as confirmed. */ vad?: VADProvider; // Shared per-connection audio state manager #cm = new AudioConnectionManager("VoiceAgent"); // Voice protocol message types handled internally static #VOICE_MESSAGES = new Set([ "hello", "start_call", "end_call", "start_of_speech", "end_of_speech", "interrupt", "text_message" ]); // --- Hibernation helpers --- #setCallState(connection: Connection, inCall: boolean) { this._unsafe_setConnectionFlag( connection, "_cf_voiceInCall", inCall || undefined ); } #getCallState(connection: Connection): boolean { return ( this._unsafe_getConnectionFlag(connection, "_cf_voiceInCall") === true ); } /** * Restore in-memory call state after hibernation wake. * Called when we receive a message for a connection that the state * says is in a call, but we have no in-memory buffer for it. */ #restoreCallState(connection: Connection) { this.#cm.initConnection(connection.id); } // --- Agent lifecycle --- #schemaReady = false; #ensureSchema() { if (this.#schemaReady) return; this.sql` CREATE TABLE IF NOT EXISTS cf_voice_messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, role TEXT NOT NULL, text TEXT NOT NULL, timestamp INTEGER NOT NULL ) `; this.#schemaReady = true; } // oxlint-disable-next-line @typescript-eslint/no-explicit-any -- mixin constructor must accept any args constructor(...args: any[]) { super(...args); // Capture the consumer's lifecycle methods (defined on the subclass // prototype) and wrap them so voice logic always runs first. // This is the same pattern used by withVoiceInput, Agent, and PartyServer. // oxlint-disable-next-line @typescript-eslint/no-explicit-any -- binding consumer methods const _onConnect = (this as any).onConnect?.bind(this); // oxlint-disable-next-line @typescript-eslint/no-explicit-any -- binding consumer methods const _onClose = (this as any).onClose?.bind(this); // oxlint-disable-next-line @typescript-eslint/no-explicit-any -- binding consumer methods const _onMessage = (this as any).onMessage?.bind(this); // oxlint-disable-next-line @typescript-eslint/no-explicit-any -- overwriting lifecycle (this as any).onConnect = ( connection: Connection, ...rest: unknown[] ) => { this.#sendJSON(connection, { type: "welcome", protocol_version: VOICE_PROTOCOL_VERSION }); this.#sendJSON(connection, { type: "status", status: "idle" }); return _onConnect?.(connection, ...rest); }; // oxlint-disable-next-line @typescript-eslint/no-explicit-any -- overwriting lifecycle (this as any).onClose = (connection: Connection, ...rest: unknown[]) => { this.#cm.cleanup(connection.id); this.#setCallState(connection, false); return _onClose?.(connection, ...rest); }; // oxlint-disable-next-line @typescript-eslint/no-explicit-any -- overwriting lifecycle (this as any).onMessage = ( connection: Connection, message: WSMessage ) => { // Restore in-memory state if DO woke from hibernation if ( !this.#cm.isInCall(connection.id) && this.#getCallState(connection) ) { this.#restoreCallState(connection); } // Binary audio — always handled by voice, never forwarded if (message instanceof ArrayBuffer) { this.#cm.bufferAudio(connection.id, message); return; } if (typeof message !== "string") { return _onMessage?.(connection, message); } // Try to parse as voice protocol let parsed: { type: string }; try { parsed = JSON.parse(message); } catch { // Not JSON — forward to consumer return _onMessage?.(connection, message); } // Voice protocol message — handle internally if (VoiceAgentMixin.#VOICE_MESSAGES.has(parsed.type)) { switch (parsed.type) { case "hello": // Client announced its protocol version — log for diagnostics. break; case "start_call": this.#handleStartCall( connection, (parsed as { preferred_format?: string }).preferred_format ); break; case "end_call": this.#handleEndCall(connection); break; case "start_of_speech": this.#handleStartOfSpeech(connection); break; case "end_of_speech": this.#cm.clearVadRetry(connection.id); this.#handleEndOfSpeech(connection); break; case "interrupt": this.#handleInterrupt(connection); break; case "text_message": { const text = (parsed as unknown as { text?: string }).text; if (typeof text === "string") { this.#handleTextMessage(connection, text); } break; } } return; } // Not a voice message — forward to consumer return _onMessage?.(connection, message); }; } // --- User-overridable hooks --- onTurn( _transcript: string, _context: VoiceTurnContext ): Promise { throw new Error( "VoiceAgent subclass must implement onTurn(). Return a string, AsyncIterable, or ReadableStream." ); } beforeCallStart(_connection: Connection): boolean | Promise { return true; } onCallStart(_connection: Connection): void | Promise {} onCallEnd(_connection: Connection): void | Promise {} onInterrupt(_connection: Connection): void | Promise {} // --- Pipeline hooks --- beforeTranscribe( audio: ArrayBuffer, _connection: Connection ): ArrayBuffer | null | Promise { return audio; } afterTranscribe( transcript: string, _connection: Connection ): string | null | Promise { return transcript; } beforeSynthesize( text: string, _connection: Connection ): string | null | Promise { return text; } afterSynthesize( audio: ArrayBuffer | null, _text: string, _connection: Connection ): ArrayBuffer | null | Promise { return audio; } // --- Streaming STT session management --- #handleStartOfSpeech(connection: Connection) { if (!this.streamingStt) return; // no streaming provider — ignore if (this.#cm.hasSTTSession(connection.id)) return; // already active if (!this.#cm.isInCall(connection.id)) return; // not in a call // Clear EOT flag from any previous turn this.#cm.clearEOT(connection.id); // Accumulate finalized segments for the full transcript let accumulated = ""; this.#cm.startSTTSession(connection.id, this.streamingStt, { onFinal: (text: string) => { accumulated += (accumulated ? " " : "") + text; // Send interim update with the accumulated final text this.#sendJSON(connection, { type: "transcript_interim", text: accumulated }); }, onInterim: (text: string) => { // Show accumulated finals + current interim to the client const display = accumulated ? accumulated + " " + text : text; this.#sendJSON(connection, { type: "transcript_interim", text: display }); }, // Provider-driven end-of-turn: start LLM+TTS immediately // without waiting for the client to send end_of_speech. onEndOfTurn: (transcript: string) => { // Guard against double-fire if (this.#cm.isEOTTriggered(connection.id)) return; this.#cm.setEOTTriggered(connection.id); // Remove the session — this turn is done this.#cm.removeSTTSession(connection.id); // Clear audio buffer — no batch STT needed this.#cm.clearAudioBuffer(connection.id); // Clear any pending VAD retry this.#cm.clearVadRetry(connection.id); // Start the pipeline immediately with the stable transcript this.#runPipeline(connection, transcript); } }); } #requireTTS(): TTSProvider & Partial { if (!this.tts) { throw new Error( "No TTS provider configured. Set 'tts' on your VoiceAgent subclass." ); } return this.tts; } // --- Conversation persistence --- saveMessage(role: "user" | "assistant", text: string) { this.#ensureSchema(); this.sql` INSERT INTO cf_voice_messages (role, text, timestamp) VALUES (${role}, ${text}, ${Date.now()}) `; const maxMessages = opt("maxMessageCount", DEFAULT_MAX_MESSAGE_COUNT); this.sql` DELETE FROM cf_voice_messages WHERE id NOT IN ( SELECT id FROM cf_voice_messages ORDER BY id DESC LIMIT ${maxMessages} ) `; } getConversationHistory( limit?: number ): Array<{ role: VoiceRole; content: string }> { this.#ensureSchema(); const historyLimit = limit ?? opt("historyLimit", DEFAULT_HISTORY_LIMIT); const rows = this.sql<{ role: VoiceRole; text: string }>` SELECT role, text FROM cf_voice_messages ORDER BY id DESC LIMIT ${historyLimit} `; return rows.reverse().map((row) => ({ role: row.role, content: row.text })); } // --- Convenience methods --- /** * Programmatically end a call for a specific connection. * Cleans up server-side state (audio buffers, pipelines, STT sessions, * keepalives) and sends the idle status to the client. * Use this to kick a speaker or enforce call limits. */ forceEndCall(connection: Connection): void { if (!this.#cm.isInCall(connection.id)) return; // not in a call this.#handleEndCall(connection); } async speak(connection: Connection, text: string): Promise { const signal = this.#cm.createPipelineAbort(connection.id); try { this.#sendJSON(connection, { type: "status", status: "speaking" }); this.#sendJSON(connection, { type: "transcript_start", role: "assistant" }); this.#sendJSON(connection, { type: "transcript_end", text }); const audio = await this.#synthesizeWithHooks(text, connection, signal); if (audio && !signal.aborted) { connection.send(audio); } if (!signal.aborted) { this.saveMessage("assistant", text); this.#sendJSON(connection, { type: "status", status: "listening" }); } } finally { this.#cm.clearPipelineAbort(connection.id); } } async speakAll(text: string): Promise { this.saveMessage("assistant", text); const connections = [...this.getConnections()]; if (connections.length === 0) { return; } for (const connection of connections) { const signal = this.#cm.createPipelineAbort(connection.id); try { this.#sendJSON(connection, { type: "status", status: "speaking" }); this.#sendJSON(connection, { type: "transcript_start", role: "assistant" }); this.#sendJSON(connection, { type: "transcript_end", text }); const audio = await this.#synthesizeWithHooks( text, connection, signal ); if (audio && !signal.aborted) { connection.send(audio); } if (!signal.aborted) { this.#sendJSON(connection, { type: "status", status: "listening" }); } } finally { this.#cm.clearPipelineAbort(connection.id); } } } async #synthesizeWithHooks( text: string, connection: Connection, signal?: AbortSignal ): Promise { const textToSpeak = await this.beforeSynthesize(text, connection); if (!textToSpeak) return null; const rawAudio = await this.#requireTTS().synthesize(textToSpeak, signal); return this.afterSynthesize(rawAudio, textToSpeak, connection); } // --- Internal: call lifecycle --- async #handleStartCall(connection: Connection, _preferredFormat?: string) { const allowed = await this.beforeCallStart(connection); if (!allowed) return; this.#cm.initConnection(connection.id); this.#setCallState(connection, true); const configuredFormat = opt("audioFormat", "mp3") as VoiceAudioFormat; this.#sendJSON(connection, { type: "audio_config", format: configuredFormat }); this.#sendJSON(connection, { type: "status", status: "listening" }); await this.onCallStart(connection); } #handleEndCall(connection: Connection) { this.#cm.cleanup(connection.id); this.#setCallState(connection, false); this.#sendJSON(connection, { type: "status", status: "idle" }); this.onCallEnd(connection); } #handleInterrupt(connection: Connection) { this.#cm.abortPipeline(connection.id); this.#cm.abortSTTSession(connection.id); this.#cm.clearVadRetry(connection.id); this.#cm.clearEOT(connection.id); this.#cm.clearAudioBuffer(connection.id); this.#sendJSON(connection, { type: "status", status: "listening" }); this.onInterrupt(connection); } // --- Internal: text message handling --- async #handleTextMessage(connection: Connection, text: string) { if (!text || text.trim().length === 0) return; const userText = text.trim(); const signal = this.#cm.createPipelineAbort(connection.id); const pipelineStart = Date.now(); this.#sendJSON(connection, { type: "status", status: "thinking" }); this.saveMessage("user", userText); this.#sendJSON(connection, { type: "transcript", role: "user", text: userText }); try { const context: VoiceTurnContext = { connection, messages: this.getConversationHistory(), signal }; const llmStart = Date.now(); const turnResult = await this.onTurn(userText, context); if (signal.aborted) return; const isInCall = this.#cm.isInCall(connection.id); if (isInCall) { this.#sendJSON(connection, { type: "status", status: "speaking" }); const { text: fullText } = await this.#streamResponse( connection, turnResult, llmStart, pipelineStart, signal ); if (signal.aborted) return; this.saveMessage("assistant", fullText); this.#sendJSON(connection, { type: "status", status: "listening" }); } else { this.#sendJSON(connection, { type: "transcript_start", role: "assistant" }); let fullText = ""; for await (const token of iterateText(turnResult)) { if (signal.aborted) break; fullText += token; this.#sendJSON(connection, { type: "transcript_delta", text: token }); } this.#sendJSON(connection, { type: "transcript_end", text: fullText }); this.saveMessage("assistant", fullText); this.#sendJSON(connection, { type: "status", status: "idle" }); } } catch (error) { if (signal.aborted) return; console.error("[VoiceAgent] Text pipeline error:", error); this.#sendJSON(connection, { type: "error", message: error instanceof Error ? error.message : "Text pipeline failed" }); this.#sendJSON(connection, { type: "status", status: this.#cm.isInCall(connection.id) ? "listening" : "idle" }); } finally { this.#cm.clearPipelineAbort(connection.id); } } // --- Internal: audio pipeline --- async #handleEndOfSpeech(connection: Connection, skipVad = false) { // If the pipeline was already triggered by provider-driven EOT, // this end_of_speech from the client is late — ignore it. if (this.#cm.isEOTTriggered(connection.id)) { this.#cm.clearEOT(connection.id); return; } const audioData = this.#cm.getAndClearAudio(connection.id); if (!audioData) { return; } const hasStreamingSession = this.#cm.hasSTTSession(connection.id); const minAudioBytes = opt("minAudioBytes", DEFAULT_MIN_AUDIO_BYTES); if (audioData.byteLength < minAudioBytes) { // Too short — abort the streaming session if any this.#cm.abortSTTSession(connection.id); this.#sendJSON(connection, { type: "status", status: "listening" }); return; } let vadMs = 0; if (this.vad && !skipVad) { const vadStart = Date.now(); const vadResult = await this.vad.checkEndOfTurn(audioData); vadMs = Date.now() - vadStart; const vadThreshold = opt("vadThreshold", DEFAULT_VAD_THRESHOLD); const shouldProceed = vadResult.isComplete || vadResult.probability > vadThreshold; if (!shouldProceed) { const pushbackSeconds = opt( "vadPushbackSeconds", DEFAULT_VAD_PUSHBACK_SECONDS ); const maxPushbackBytes = pushbackSeconds * 16000 * 2; const pushback = audioData.byteLength > maxPushbackBytes ? audioData.slice(audioData.byteLength - maxPushbackBytes) : audioData; this.#cm.pushbackAudio(connection.id, pushback); // Keep the streaming STT session alive — VAD rejected but user // may still be speaking. The session continues accumulating. this.#sendJSON(connection, { type: "status", status: "listening" }); // Schedule a retry that skips VAD. If the user stays silent, // the client won't send another end_of_speech (its #isSpeaking // is already false), so we'd deadlock without this timer. this.#cm.scheduleVadRetry( connection.id, () => this.#handleEndOfSpeech(connection, true), opt("vadRetryMs", DEFAULT_VAD_RETRY_MS) as number ); return; } } // --- STT phase --- const signal = this.#cm.createPipelineAbort(connection.id); const sttStart = Date.now(); this.#sendJSON(connection, { type: "status", status: "thinking" }); try { let userText: string | null; let sttMs: number; if (hasStreamingSession) { // --- Streaming STT path --- // The session has been receiving audio all along. // finish() flushes and returns the final transcript (~50ms). // beforeTranscribe is skipped — audio was already fed incrementally. const rawTranscript = await this.#cm.flushSTTSession(connection.id); sttMs = Date.now() - sttStart; if (signal.aborted) return; if (!rawTranscript || rawTranscript.trim().length === 0) { this.#sendJSON(connection, { type: "status", status: "listening" }); return; } userText = await this.afterTranscribe(rawTranscript, connection); } else { // --- Batch STT path (original) --- if (!this.stt) { // No batch STT provider and no streaming session — this can // happen when onEndOfTurn already consumed the session. // Just return to listening. this.#sendJSON(connection, { type: "status", status: "listening" }); return; } const processedAudio = await this.beforeTranscribe( audioData, connection ); if (!processedAudio || signal.aborted) { this.#sendJSON(connection, { type: "status", status: "listening" }); return; } const rawTranscript = await this.stt.transcribe( processedAudio, signal ); sttMs = Date.now() - sttStart; if (signal.aborted) return; if (!rawTranscript || rawTranscript.trim().length === 0) { this.#sendJSON(connection, { type: "status", status: "listening" }); return; } userText = await this.afterTranscribe(rawTranscript, connection); } if (!userText || signal.aborted) { this.#sendJSON(connection, { type: "status", status: "listening" }); return; } // Hand off to the shared pipeline (LLM + TTS) await this.#runPipelineInner( connection, userText, sttStart, vadMs, sttMs, signal ); } catch (error) { if (signal.aborted) return; console.error("[VoiceAgent] Pipeline error:", error); this.#sendJSON(connection, { type: "error", message: error instanceof Error ? error.message : "Voice pipeline failed" }); this.#sendJSON(connection, { type: "status", status: "listening" }); } finally { this.#cm.clearPipelineAbort(connection.id); } } /** * Start the voice pipeline from a stable transcript. * Called by provider-driven EOT (onEndOfTurn callback). * Handles: abort controller setup, LLM, TTS, metrics, persistence. */ async #runPipeline(connection: Connection, transcript: string) { const signal = this.#cm.createPipelineAbort(connection.id); const pipelineStart = Date.now(); try { const userText = await this.afterTranscribe(transcript, connection); if (!userText || signal.aborted) { this.#sendJSON(connection, { type: "status", status: "listening" }); return; } await this.#runPipelineInner( connection, userText, pipelineStart, 0, // vadMs — no VAD with provider-driven EOT 0, // sttMs — transcript was delivered instantly by EOT signal ); } catch (error) { if (signal.aborted) return; console.error("[VoiceAgent] Pipeline error:", error); this.#sendJSON(connection, { type: "error", message: error instanceof Error ? error.message : "Voice pipeline failed" }); this.#sendJSON(connection, { type: "status", status: "listening" }); } finally { this.#cm.clearPipelineAbort(connection.id); } } /** * Shared inner pipeline: save transcript, run LLM, stream TTS, emit metrics. * Used by both #handleEndOfSpeech (after STT) and #runPipeline (after provider EOT). */ async #runPipelineInner( connection: Connection, userText: string, pipelineStart: number, vadMs: number, sttMs: number, signal: AbortSignal ) { this.saveMessage("user", userText); this.#sendJSON(connection, { type: "transcript", role: "user", text: userText }); this.#sendJSON(connection, { type: "status", status: "speaking" }); const context: VoiceTurnContext = { connection, messages: this.getConversationHistory(), signal }; const llmStart = Date.now(); const turnResult = await this.onTurn(userText, context); if (signal.aborted) return; const { text: fullText, llmMs, ttsMs, firstAudioMs } = await this.#streamResponse( connection, turnResult, llmStart, pipelineStart, signal ); if (signal.aborted) return; const totalMs = Date.now() - pipelineStart; this.#sendJSON(connection, { type: "metrics", vad_ms: vadMs, stt_ms: sttMs, llm_ms: llmMs, tts_ms: ttsMs, first_audio_ms: firstAudioMs, total_ms: totalMs }); this.saveMessage("assistant", fullText); this.#sendJSON(connection, { type: "status", status: "listening" }); } // --- Internal: streaming TTS pipeline --- async #streamResponse( connection: Connection, response: TextSource, llmStart: number, pipelineStart: number, signal: AbortSignal ): Promise<{ text: string; llmMs: number; ttsMs: number; firstAudioMs: number; }> { if (typeof response === "string") { const llmMs = Date.now() - llmStart; this.#sendJSON(connection, { type: "transcript_start", role: "assistant" }); this.#sendJSON(connection, { type: "transcript_end", text: response }); const ttsStart = Date.now(); const audio = await this.#synthesizeWithHooks(response, connection); const ttsMs = Date.now() - ttsStart; if (audio && !signal.aborted) { connection.send(audio); } const firstAudioMs = Date.now() - pipelineStart; return { text: response, llmMs, ttsMs, firstAudioMs }; } return this.#streamingTTSPipeline( connection, iterateText(response), llmStart, pipelineStart, signal ); } async #streamingTTSPipeline( connection: Connection, tokenStream: AsyncIterable, llmStart: number, pipelineStart: number, signal: AbortSignal ): Promise<{ text: string; llmMs: number; ttsMs: number; firstAudioMs: number; }> { const chunker = new SentenceChunker(); const ttsQueue: AsyncIterable[] = []; let fullText = ""; let firstAudioSentAt: number | null = null; let cumulativeTtsMs = 0; let streamComplete = false; let drainNotify: (() => void) | null = null; let drainPending = false; const notifyDrain = () => { if (drainNotify) { const resolve = drainNotify; drainNotify = null; resolve(); } else { drainPending = true; } }; const tts = this.#requireTTS(); const hasStreamingTTS = typeof tts.synthesizeStream === "function"; const drainPromise = (async () => { let i = 0; while (true) { while (i >= ttsQueue.length) { if (streamComplete && i >= ttsQueue.length) return; if (drainPending) { drainPending = false; continue; } await new Promise((r) => { drainNotify = r; }); if (streamComplete && i >= ttsQueue.length) return; } if (signal.aborted) return; try { for await (const chunk of ttsQueue[i]) { if (signal.aborted) return; connection.send(chunk); if (!firstAudioSentAt) { firstAudioSentAt = Date.now(); } } } catch (err) { console.error("[VoiceAgent] TTS error for sentence:", err); this.#sendJSON(connection, { type: "error", message: err instanceof Error ? err.message : "TTS failed for a sentence" }); } i++; } })(); const makeSentenceTTS = ( sentence: string ): AsyncIterable => { const self = this; async function* generate() { const ttsStart = Date.now(); const text = await self.beforeSynthesize(sentence, connection); if (!text) return; if (hasStreamingTTS) { for await (const chunk of tts.synthesizeStream!(text, signal)) { const processed = await self.afterSynthesize( chunk, text, connection ); if (processed) yield processed; } } else { const rawAudio = await tts.synthesize(text, signal); const processed = await self.afterSynthesize( rawAudio, text, connection ); if (processed) yield processed; } cumulativeTtsMs += Date.now() - ttsStart; } return eagerAsyncIterable(generate()); }; const enqueueSentence = (sentence: string) => { ttsQueue.push(makeSentenceTTS(sentence)); notifyDrain(); }; this.#sendJSON(connection, { type: "transcript_start", role: "assistant" }); for await (const token of tokenStream) { if (signal.aborted) break; fullText += token; this.#sendJSON(connection, { type: "transcript_delta", text: token }); const sentences = chunker.add(token); for (const sentence of sentences) { enqueueSentence(sentence); } } const llmMs = Date.now() - llmStart; const remaining = chunker.flush(); for (const sentence of remaining) { enqueueSentence(sentence); } streamComplete = true; notifyDrain(); this.#sendJSON(connection, { type: "transcript_end", text: fullText }); await drainPromise; const firstAudioMs = firstAudioSentAt ? firstAudioSentAt - pipelineStart : 0; return { text: fullText, llmMs, ttsMs: cumulativeTtsMs, firstAudioMs }; } // --- Internal: protocol helpers --- #sendJSON(connection: Connection, data: unknown) { const parsed = data as Record; sendVoiceJSON( connection, data, "VoiceAgent", parsed.type === "transcript_delta" ); } } return VoiceAgentMixin; } // --- Eager async iterable --- function eagerAsyncIterable(source: AsyncIterable): AsyncIterable { const buffer: T[] = []; let finished = false; let error: unknown = null; let waitResolve: (() => void) | null = null; const notify = () => { if (waitResolve) { const resolve = waitResolve; waitResolve = null; resolve(); } }; (async () => { try { for await (const item of source) { buffer.push(item); notify(); } } catch (err) { error = err; } finally { finished = true; notify(); } })(); return { [Symbol.asyncIterator]() { let index = 0; return { async next(): Promise> { while (index >= buffer.length && !finished) { await new Promise((r) => { waitResolve = r; }); } if (error) { throw error; } if (index >= buffer.length) { return { done: true, value: undefined }; } return { done: false, value: buffer[index++] }; } }; } }; }