branch:
voice.ts
39267 bytesRaw
/**
* !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
* !! WARNING: EXPERIMENTAL — DO NOT USE IN PRODUCTION !!
* !! !!
* !! This API is under active development and WILL break between !!
* !! releases. Method names, types, behavior, and the mixin signature !!
* !! are all subject to change without notice. !!
* !! !!
* !! If you use this, pin your agents version and expect to rewrite !!
* !! your code when upgrading. !!
* !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
*
* Experimental voice pipeline mixin for the Agents SDK.
*
* Usage:
* import { Agent } from "agents";
* import { withVoice } from "agents/experimental/voice";
*
* const VoiceAgent = withVoice(Agent);
*
* class MyAgent extends VoiceAgent<Env> {
* async onTurn(transcript: string, context: VoiceTurnContext) {
* const result = streamText({ ... });
* return result.textStream;
* }
* }
*
* This mixin adds the full voice pipeline: audio buffering, VAD, STT,
* streaming TTS, interruption handling, conversation persistence, and
* the WebSocket voice protocol.
*
* @experimental This API is not yet stable and may change.
*/
import type { Agent, Connection, WSMessage } from "agents";
import { SentenceChunker } from "./sentence-chunker";
import { iterateText, type TextSource } from "./text-stream";
import { VOICE_PROTOCOL_VERSION } from "./types";
import type {
VoiceStatus,
VoiceRole,
VoiceAudioFormat,
VoiceClientMessage,
VoiceServerMessage,
VoicePipelineMetrics,
STTProvider,
TTSProvider,
StreamingTTSProvider,
VADProvider,
StreamingSTTProvider
} from "./types";
import {
AudioConnectionManager,
sendVoiceJSON,
DEFAULT_VAD_THRESHOLD,
DEFAULT_MIN_AUDIO_BYTES,
DEFAULT_VAD_PUSHBACK_SECONDS,
DEFAULT_VAD_RETRY_MS
} from "./audio-pipeline";
import type { VoiceInputAgentOptions } from "./voice-input";
// Re-export SentenceChunker for direct use
export { SentenceChunker } from "./sentence-chunker";
// Re-export protocol version constant
export { VOICE_PROTOCOL_VERSION } from "./types";
// Re-export shared types so existing imports from "agents/experimental/voice" still work
export type {
VoiceStatus,
VoiceRole,
VoiceAudioFormat,
VoiceAudioInput,
VoiceTransport,
VoiceClientMessage,
VoiceServerMessage,
VoicePipelineMetrics,
TranscriptMessage,
STTProvider,
TTSProvider,
StreamingTTSProvider,
VADProvider,
StreamingSTTProvider,
StreamingSTTSession,
StreamingSTTSessionOptions
} from "./types";
// Re-export voice input mixin (STT-only, no TTS/LLM)
export { withVoiceInput } from "./voice-input";
export type { VoiceInputAgentOptions } from "./voice-input";
// Re-export text stream utility
export { iterateText, type TextSource } from "./text-stream";
// Re-export SFU utility functions
export {
decodeVarint,
encodeVarint,
extractPayloadFromProtobuf,
encodePayloadToProtobuf,
downsample48kStereoTo16kMono,
upsample16kMonoTo48kStereo,
sfuFetch,
createSFUSession,
addSFUTracks,
renegotiateSFUSession,
createSFUWebSocketAdapter
} from "./sfu-utils";
export type { SFUConfig } from "./sfu-utils";
// Re-export Workers AI providers and audio utility
export {
WorkersAISTT,
WorkersAITTS,
WorkersAIVAD,
WorkersAIFluxSTT,
pcmToWav
} from "./workers-ai-providers";
export type {
WorkersAISTTOptions,
WorkersAITTSOptions,
WorkersAIVADOptions,
WorkersAIFluxSTTOptions
} from "./workers-ai-providers";
// --- Public types ---
/** Result from a VAD (Voice Activity Detection) provider. */
export interface VADResult {
isComplete: boolean;
probability: number;
}
/** Context passed to the `onTurn()` hook. */
export interface VoiceTurnContext {
/**
* The WebSocket connection that sent the audio.
* Useful for sending custom JSON messages (e.g. tool progress).
* WARNING: sending raw binary on this connection will interleave with
* the TTS audio stream. Use `connection.send(JSON.stringify(...))` only.
*/
connection: Connection;
/** Conversation history from SQLite (chronological order). */
messages: Array<{ role: VoiceRole; content: string }>;
/** AbortSignal — aborted if user interrupts or disconnects. */
signal: AbortSignal;
}
/** Configuration options for the voice mixin. Passed to `withVoice()`. */
export interface VoiceAgentOptions extends VoiceInputAgentOptions {
/** Max conversation history messages loaded for context. @default 20 */
historyLimit?: number;
/** Audio format used for binary audio payloads sent to the client. @default "mp3" */
audioFormat?: VoiceAudioFormat;
/** Max conversation messages to keep in SQLite. Oldest are pruned. @default 1000 */
maxMessageCount?: number;
}
// --- Default option values (voice-specific, not in audio-pipeline) ---
const DEFAULT_HISTORY_LIMIT = 20;
const DEFAULT_MAX_MESSAGE_COUNT = 1000;
// --- Mixin ---
// oxlint-disable-next-line @typescript-eslint/no-explicit-any -- mixin constructor constraint
type Constructor<T = object> = new (...args: any[]) => T;
type AgentLike = Constructor<
Pick<
Agent<Cloudflare.Env>,
| "sql"
| "getConnections"
| "_unsafe_getConnectionFlag"
| "_unsafe_setConnectionFlag"
>
>;
/**
* Voice pipeline mixin. Adds the full voice pipeline to an Agent class.
*
* Subclasses must set `stt` and `tts` provider properties. VAD is optional.
*
* @param Base - The Agent class to extend (e.g. `Agent`).
* @param voiceOptions - Optional pipeline configuration.
*
* @example
* ```typescript
* import { Agent } from "agents";
* import { withVoice, WorkersAISTT, WorkersAITTS, WorkersAIVAD } from "agents/experimental/voice";
*
* const VoiceAgent = withVoice(Agent);
*
* class MyAgent extends VoiceAgent<Env> {
* stt = new WorkersAISTT(this.env.AI);
* tts = new WorkersAITTS(this.env.AI);
* vad = new WorkersAIVAD(this.env.AI);
*
* async onTurn(transcript, context) {
* return "Hello! I heard you say: " + transcript;
* }
* }
* ```
*/
export function withVoice<TBase extends AgentLike>(
Base: TBase,
voiceOptions?: VoiceAgentOptions
) {
console.log(
"[@cloudflare/voice] Note: The voice API is experimental and may change between releases. Pin your version to avoid surprises."
);
const opts = voiceOptions ?? {};
function opt<K extends keyof VoiceAgentOptions>(
key: K,
fallback: NonNullable<VoiceAgentOptions[K]>
): NonNullable<VoiceAgentOptions[K]> {
return (opts[key] ?? fallback) as NonNullable<VoiceAgentOptions[K]>;
}
class VoiceAgentMixin extends Base {
// --- Provider properties (set by subclass) ---
/** Speech-to-text provider (batch). Required unless streamingStt is set. */
stt?: STTProvider;
/** Streaming speech-to-text provider. Optional — if set, used instead of batch `stt`. */
streamingStt?: StreamingSTTProvider;
/** Text-to-speech provider. Required. May also implement StreamingTTSProvider. */
tts?: TTSProvider & Partial<StreamingTTSProvider>;
/** Voice activity detection provider. Optional — if unset, every end_of_speech is treated as confirmed. */
vad?: VADProvider;
// Shared per-connection audio state manager
#cm = new AudioConnectionManager("VoiceAgent");
// Voice protocol message types handled internally
static #VOICE_MESSAGES = new Set([
"hello",
"start_call",
"end_call",
"start_of_speech",
"end_of_speech",
"interrupt",
"text_message"
]);
// --- Hibernation helpers ---
#setCallState(connection: Connection, inCall: boolean) {
this._unsafe_setConnectionFlag(
connection,
"_cf_voiceInCall",
inCall || undefined
);
}
#getCallState(connection: Connection): boolean {
return (
this._unsafe_getConnectionFlag(connection, "_cf_voiceInCall") === true
);
}
/**
* Restore in-memory call state after hibernation wake.
* Called when we receive a message for a connection that the state
* says is in a call, but we have no in-memory buffer for it.
*/
#restoreCallState(connection: Connection) {
this.#cm.initConnection(connection.id);
}
// --- Agent lifecycle ---
#schemaReady = false;
#ensureSchema() {
if (this.#schemaReady) return;
this.sql`
CREATE TABLE IF NOT EXISTS cf_voice_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
role TEXT NOT NULL,
text TEXT NOT NULL,
timestamp INTEGER NOT NULL
)
`;
this.#schemaReady = true;
}
// oxlint-disable-next-line @typescript-eslint/no-explicit-any -- mixin constructor must accept any args
constructor(...args: any[]) {
super(...args);
// Capture the consumer's lifecycle methods (defined on the subclass
// prototype) and wrap them so voice logic always runs first.
// This is the same pattern used by withVoiceInput, Agent, and PartyServer.
// oxlint-disable-next-line @typescript-eslint/no-explicit-any -- binding consumer methods
const _onConnect = (this as any).onConnect?.bind(this);
// oxlint-disable-next-line @typescript-eslint/no-explicit-any -- binding consumer methods
const _onClose = (this as any).onClose?.bind(this);
// oxlint-disable-next-line @typescript-eslint/no-explicit-any -- binding consumer methods
const _onMessage = (this as any).onMessage?.bind(this);
// oxlint-disable-next-line @typescript-eslint/no-explicit-any -- overwriting lifecycle
(this as any).onConnect = (
connection: Connection,
...rest: unknown[]
) => {
this.#sendJSON(connection, {
type: "welcome",
protocol_version: VOICE_PROTOCOL_VERSION
});
this.#sendJSON(connection, { type: "status", status: "idle" });
return _onConnect?.(connection, ...rest);
};
// oxlint-disable-next-line @typescript-eslint/no-explicit-any -- overwriting lifecycle
(this as any).onClose = (connection: Connection, ...rest: unknown[]) => {
this.#cm.cleanup(connection.id);
this.#setCallState(connection, false);
return _onClose?.(connection, ...rest);
};
// oxlint-disable-next-line @typescript-eslint/no-explicit-any -- overwriting lifecycle
(this as any).onMessage = (
connection: Connection,
message: WSMessage
) => {
// Restore in-memory state if DO woke from hibernation
if (
!this.#cm.isInCall(connection.id) &&
this.#getCallState(connection)
) {
this.#restoreCallState(connection);
}
// Binary audio — always handled by voice, never forwarded
if (message instanceof ArrayBuffer) {
this.#cm.bufferAudio(connection.id, message);
return;
}
if (typeof message !== "string") {
return _onMessage?.(connection, message);
}
// Try to parse as voice protocol
let parsed: { type: string };
try {
parsed = JSON.parse(message);
} catch {
// Not JSON — forward to consumer
return _onMessage?.(connection, message);
}
// Voice protocol message — handle internally
if (VoiceAgentMixin.#VOICE_MESSAGES.has(parsed.type)) {
switch (parsed.type) {
case "hello":
// Client announced its protocol version — log for diagnostics.
break;
case "start_call":
this.#handleStartCall(
connection,
(parsed as { preferred_format?: string }).preferred_format
);
break;
case "end_call":
this.#handleEndCall(connection);
break;
case "start_of_speech":
this.#handleStartOfSpeech(connection);
break;
case "end_of_speech":
this.#cm.clearVadRetry(connection.id);
this.#handleEndOfSpeech(connection);
break;
case "interrupt":
this.#handleInterrupt(connection);
break;
case "text_message": {
const text = (parsed as unknown as { text?: string }).text;
if (typeof text === "string") {
this.#handleTextMessage(connection, text);
}
break;
}
}
return;
}
// Not a voice message — forward to consumer
return _onMessage?.(connection, message);
};
}
// --- User-overridable hooks ---
onTurn(
_transcript: string,
_context: VoiceTurnContext
): Promise<TextSource> {
throw new Error(
"VoiceAgent subclass must implement onTurn(). Return a string, AsyncIterable<string>, or ReadableStream."
);
}
beforeCallStart(_connection: Connection): boolean | Promise<boolean> {
return true;
}
onCallStart(_connection: Connection): void | Promise<void> {}
onCallEnd(_connection: Connection): void | Promise<void> {}
onInterrupt(_connection: Connection): void | Promise<void> {}
// --- Pipeline hooks ---
beforeTranscribe(
audio: ArrayBuffer,
_connection: Connection
): ArrayBuffer | null | Promise<ArrayBuffer | null> {
return audio;
}
afterTranscribe(
transcript: string,
_connection: Connection
): string | null | Promise<string | null> {
return transcript;
}
beforeSynthesize(
text: string,
_connection: Connection
): string | null | Promise<string | null> {
return text;
}
afterSynthesize(
audio: ArrayBuffer | null,
_text: string,
_connection: Connection
): ArrayBuffer | null | Promise<ArrayBuffer | null> {
return audio;
}
// --- Streaming STT session management ---
#handleStartOfSpeech(connection: Connection) {
if (!this.streamingStt) return; // no streaming provider — ignore
if (this.#cm.hasSTTSession(connection.id)) return; // already active
if (!this.#cm.isInCall(connection.id)) return; // not in a call
// Clear EOT flag from any previous turn
this.#cm.clearEOT(connection.id);
// Accumulate finalized segments for the full transcript
let accumulated = "";
this.#cm.startSTTSession(connection.id, this.streamingStt, {
onFinal: (text: string) => {
accumulated += (accumulated ? " " : "") + text;
// Send interim update with the accumulated final text
this.#sendJSON(connection, {
type: "transcript_interim",
text: accumulated
});
},
onInterim: (text: string) => {
// Show accumulated finals + current interim to the client
const display = accumulated ? accumulated + " " + text : text;
this.#sendJSON(connection, {
type: "transcript_interim",
text: display
});
},
// Provider-driven end-of-turn: start LLM+TTS immediately
// without waiting for the client to send end_of_speech.
onEndOfTurn: (transcript: string) => {
// Guard against double-fire
if (this.#cm.isEOTTriggered(connection.id)) return;
this.#cm.setEOTTriggered(connection.id);
// Remove the session — this turn is done
this.#cm.removeSTTSession(connection.id);
// Clear audio buffer — no batch STT needed
this.#cm.clearAudioBuffer(connection.id);
// Clear any pending VAD retry
this.#cm.clearVadRetry(connection.id);
// Start the pipeline immediately with the stable transcript
this.#runPipeline(connection, transcript);
}
});
}
#requireTTS(): TTSProvider & Partial<StreamingTTSProvider> {
if (!this.tts) {
throw new Error(
"No TTS provider configured. Set 'tts' on your VoiceAgent subclass."
);
}
return this.tts;
}
// --- Conversation persistence ---
saveMessage(role: "user" | "assistant", text: string) {
this.#ensureSchema();
this.sql`
INSERT INTO cf_voice_messages (role, text, timestamp)
VALUES (${role}, ${text}, ${Date.now()})
`;
const maxMessages = opt("maxMessageCount", DEFAULT_MAX_MESSAGE_COUNT);
this.sql`
DELETE FROM cf_voice_messages
WHERE id NOT IN (
SELECT id FROM cf_voice_messages
ORDER BY id DESC LIMIT ${maxMessages}
)
`;
}
getConversationHistory(
limit?: number
): Array<{ role: VoiceRole; content: string }> {
this.#ensureSchema();
const historyLimit = limit ?? opt("historyLimit", DEFAULT_HISTORY_LIMIT);
const rows = this.sql<{ role: VoiceRole; text: string }>`
SELECT role, text FROM cf_voice_messages
ORDER BY id DESC LIMIT ${historyLimit}
`;
return rows.reverse().map((row) => ({
role: row.role,
content: row.text
}));
}
// --- Convenience methods ---
/**
* Programmatically end a call for a specific connection.
* Cleans up server-side state (audio buffers, pipelines, STT sessions,
* keepalives) and sends the idle status to the client.
* Use this to kick a speaker or enforce call limits.
*/
forceEndCall(connection: Connection): void {
if (!this.#cm.isInCall(connection.id)) return; // not in a call
this.#handleEndCall(connection);
}
async speak(connection: Connection, text: string): Promise<void> {
const signal = this.#cm.createPipelineAbort(connection.id);
try {
this.#sendJSON(connection, { type: "status", status: "speaking" });
this.#sendJSON(connection, {
type: "transcript_start",
role: "assistant"
});
this.#sendJSON(connection, { type: "transcript_end", text });
const audio = await this.#synthesizeWithHooks(text, connection, signal);
if (audio && !signal.aborted) {
connection.send(audio);
}
if (!signal.aborted) {
this.saveMessage("assistant", text);
this.#sendJSON(connection, { type: "status", status: "listening" });
}
} finally {
this.#cm.clearPipelineAbort(connection.id);
}
}
async speakAll(text: string): Promise<void> {
this.saveMessage("assistant", text);
const connections = [...this.getConnections()];
if (connections.length === 0) {
return;
}
for (const connection of connections) {
const signal = this.#cm.createPipelineAbort(connection.id);
try {
this.#sendJSON(connection, { type: "status", status: "speaking" });
this.#sendJSON(connection, {
type: "transcript_start",
role: "assistant"
});
this.#sendJSON(connection, { type: "transcript_end", text });
const audio = await this.#synthesizeWithHooks(
text,
connection,
signal
);
if (audio && !signal.aborted) {
connection.send(audio);
}
if (!signal.aborted) {
this.#sendJSON(connection, {
type: "status",
status: "listening"
});
}
} finally {
this.#cm.clearPipelineAbort(connection.id);
}
}
}
async #synthesizeWithHooks(
text: string,
connection: Connection,
signal?: AbortSignal
): Promise<ArrayBuffer | null> {
const textToSpeak = await this.beforeSynthesize(text, connection);
if (!textToSpeak) return null;
const rawAudio = await this.#requireTTS().synthesize(textToSpeak, signal);
return this.afterSynthesize(rawAudio, textToSpeak, connection);
}
// --- Internal: call lifecycle ---
async #handleStartCall(connection: Connection, _preferredFormat?: string) {
const allowed = await this.beforeCallStart(connection);
if (!allowed) return;
this.#cm.initConnection(connection.id);
this.#setCallState(connection, true);
const configuredFormat = opt("audioFormat", "mp3") as VoiceAudioFormat;
this.#sendJSON(connection, {
type: "audio_config",
format: configuredFormat
});
this.#sendJSON(connection, { type: "status", status: "listening" });
await this.onCallStart(connection);
}
#handleEndCall(connection: Connection) {
this.#cm.cleanup(connection.id);
this.#setCallState(connection, false);
this.#sendJSON(connection, { type: "status", status: "idle" });
this.onCallEnd(connection);
}
#handleInterrupt(connection: Connection) {
this.#cm.abortPipeline(connection.id);
this.#cm.abortSTTSession(connection.id);
this.#cm.clearVadRetry(connection.id);
this.#cm.clearEOT(connection.id);
this.#cm.clearAudioBuffer(connection.id);
this.#sendJSON(connection, { type: "status", status: "listening" });
this.onInterrupt(connection);
}
// --- Internal: text message handling ---
async #handleTextMessage(connection: Connection, text: string) {
if (!text || text.trim().length === 0) return;
const userText = text.trim();
const signal = this.#cm.createPipelineAbort(connection.id);
const pipelineStart = Date.now();
this.#sendJSON(connection, { type: "status", status: "thinking" });
this.saveMessage("user", userText);
this.#sendJSON(connection, {
type: "transcript",
role: "user",
text: userText
});
try {
const context: VoiceTurnContext = {
connection,
messages: this.getConversationHistory(),
signal
};
const llmStart = Date.now();
const turnResult = await this.onTurn(userText, context);
if (signal.aborted) return;
const isInCall = this.#cm.isInCall(connection.id);
if (isInCall) {
this.#sendJSON(connection, { type: "status", status: "speaking" });
const { text: fullText } = await this.#streamResponse(
connection,
turnResult,
llmStart,
pipelineStart,
signal
);
if (signal.aborted) return;
this.saveMessage("assistant", fullText);
this.#sendJSON(connection, { type: "status", status: "listening" });
} else {
this.#sendJSON(connection, {
type: "transcript_start",
role: "assistant"
});
let fullText = "";
for await (const token of iterateText(turnResult)) {
if (signal.aborted) break;
fullText += token;
this.#sendJSON(connection, {
type: "transcript_delta",
text: token
});
}
this.#sendJSON(connection, {
type: "transcript_end",
text: fullText
});
this.saveMessage("assistant", fullText);
this.#sendJSON(connection, { type: "status", status: "idle" });
}
} catch (error) {
if (signal.aborted) return;
console.error("[VoiceAgent] Text pipeline error:", error);
this.#sendJSON(connection, {
type: "error",
message:
error instanceof Error ? error.message : "Text pipeline failed"
});
this.#sendJSON(connection, {
type: "status",
status: this.#cm.isInCall(connection.id) ? "listening" : "idle"
});
} finally {
this.#cm.clearPipelineAbort(connection.id);
}
}
// --- Internal: audio pipeline ---
async #handleEndOfSpeech(connection: Connection, skipVad = false) {
// If the pipeline was already triggered by provider-driven EOT,
// this end_of_speech from the client is late — ignore it.
if (this.#cm.isEOTTriggered(connection.id)) {
this.#cm.clearEOT(connection.id);
return;
}
const audioData = this.#cm.getAndClearAudio(connection.id);
if (!audioData) {
return;
}
const hasStreamingSession = this.#cm.hasSTTSession(connection.id);
const minAudioBytes = opt("minAudioBytes", DEFAULT_MIN_AUDIO_BYTES);
if (audioData.byteLength < minAudioBytes) {
// Too short — abort the streaming session if any
this.#cm.abortSTTSession(connection.id);
this.#sendJSON(connection, { type: "status", status: "listening" });
return;
}
let vadMs = 0;
if (this.vad && !skipVad) {
const vadStart = Date.now();
const vadResult = await this.vad.checkEndOfTurn(audioData);
vadMs = Date.now() - vadStart;
const vadThreshold = opt("vadThreshold", DEFAULT_VAD_THRESHOLD);
const shouldProceed =
vadResult.isComplete || vadResult.probability > vadThreshold;
if (!shouldProceed) {
const pushbackSeconds = opt(
"vadPushbackSeconds",
DEFAULT_VAD_PUSHBACK_SECONDS
);
const maxPushbackBytes = pushbackSeconds * 16000 * 2;
const pushback =
audioData.byteLength > maxPushbackBytes
? audioData.slice(audioData.byteLength - maxPushbackBytes)
: audioData;
this.#cm.pushbackAudio(connection.id, pushback);
// Keep the streaming STT session alive — VAD rejected but user
// may still be speaking. The session continues accumulating.
this.#sendJSON(connection, { type: "status", status: "listening" });
// Schedule a retry that skips VAD. If the user stays silent,
// the client won't send another end_of_speech (its #isSpeaking
// is already false), so we'd deadlock without this timer.
this.#cm.scheduleVadRetry(
connection.id,
() => this.#handleEndOfSpeech(connection, true),
opt("vadRetryMs", DEFAULT_VAD_RETRY_MS) as number
);
return;
}
}
// --- STT phase ---
const signal = this.#cm.createPipelineAbort(connection.id);
const sttStart = Date.now();
this.#sendJSON(connection, { type: "status", status: "thinking" });
try {
let userText: string | null;
let sttMs: number;
if (hasStreamingSession) {
// --- Streaming STT path ---
// The session has been receiving audio all along.
// finish() flushes and returns the final transcript (~50ms).
// beforeTranscribe is skipped — audio was already fed incrementally.
const rawTranscript = await this.#cm.flushSTTSession(connection.id);
sttMs = Date.now() - sttStart;
if (signal.aborted) return;
if (!rawTranscript || rawTranscript.trim().length === 0) {
this.#sendJSON(connection, {
type: "status",
status: "listening"
});
return;
}
userText = await this.afterTranscribe(rawTranscript, connection);
} else {
// --- Batch STT path (original) ---
if (!this.stt) {
// No batch STT provider and no streaming session — this can
// happen when onEndOfTurn already consumed the session.
// Just return to listening.
this.#sendJSON(connection, {
type: "status",
status: "listening"
});
return;
}
const processedAudio = await this.beforeTranscribe(
audioData,
connection
);
if (!processedAudio || signal.aborted) {
this.#sendJSON(connection, {
type: "status",
status: "listening"
});
return;
}
const rawTranscript = await this.stt.transcribe(
processedAudio,
signal
);
sttMs = Date.now() - sttStart;
if (signal.aborted) return;
if (!rawTranscript || rawTranscript.trim().length === 0) {
this.#sendJSON(connection, {
type: "status",
status: "listening"
});
return;
}
userText = await this.afterTranscribe(rawTranscript, connection);
}
if (!userText || signal.aborted) {
this.#sendJSON(connection, { type: "status", status: "listening" });
return;
}
// Hand off to the shared pipeline (LLM + TTS)
await this.#runPipelineInner(
connection,
userText,
sttStart,
vadMs,
sttMs,
signal
);
} catch (error) {
if (signal.aborted) return;
console.error("[VoiceAgent] Pipeline error:", error);
this.#sendJSON(connection, {
type: "error",
message:
error instanceof Error ? error.message : "Voice pipeline failed"
});
this.#sendJSON(connection, { type: "status", status: "listening" });
} finally {
this.#cm.clearPipelineAbort(connection.id);
}
}
/**
* Start the voice pipeline from a stable transcript.
* Called by provider-driven EOT (onEndOfTurn callback).
* Handles: abort controller setup, LLM, TTS, metrics, persistence.
*/
async #runPipeline(connection: Connection, transcript: string) {
const signal = this.#cm.createPipelineAbort(connection.id);
const pipelineStart = Date.now();
try {
const userText = await this.afterTranscribe(transcript, connection);
if (!userText || signal.aborted) {
this.#sendJSON(connection, { type: "status", status: "listening" });
return;
}
await this.#runPipelineInner(
connection,
userText,
pipelineStart,
0, // vadMs — no VAD with provider-driven EOT
0, // sttMs — transcript was delivered instantly by EOT
signal
);
} catch (error) {
if (signal.aborted) return;
console.error("[VoiceAgent] Pipeline error:", error);
this.#sendJSON(connection, {
type: "error",
message:
error instanceof Error ? error.message : "Voice pipeline failed"
});
this.#sendJSON(connection, { type: "status", status: "listening" });
} finally {
this.#cm.clearPipelineAbort(connection.id);
}
}
/**
* Shared inner pipeline: save transcript, run LLM, stream TTS, emit metrics.
* Used by both #handleEndOfSpeech (after STT) and #runPipeline (after provider EOT).
*/
async #runPipelineInner(
connection: Connection,
userText: string,
pipelineStart: number,
vadMs: number,
sttMs: number,
signal: AbortSignal
) {
this.saveMessage("user", userText);
this.#sendJSON(connection, {
type: "transcript",
role: "user",
text: userText
});
this.#sendJSON(connection, { type: "status", status: "speaking" });
const context: VoiceTurnContext = {
connection,
messages: this.getConversationHistory(),
signal
};
const llmStart = Date.now();
const turnResult = await this.onTurn(userText, context);
if (signal.aborted) return;
const {
text: fullText,
llmMs,
ttsMs,
firstAudioMs
} = await this.#streamResponse(
connection,
turnResult,
llmStart,
pipelineStart,
signal
);
if (signal.aborted) return;
const totalMs = Date.now() - pipelineStart;
this.#sendJSON(connection, {
type: "metrics",
vad_ms: vadMs,
stt_ms: sttMs,
llm_ms: llmMs,
tts_ms: ttsMs,
first_audio_ms: firstAudioMs,
total_ms: totalMs
});
this.saveMessage("assistant", fullText);
this.#sendJSON(connection, { type: "status", status: "listening" });
}
// --- Internal: streaming TTS pipeline ---
async #streamResponse(
connection: Connection,
response: TextSource,
llmStart: number,
pipelineStart: number,
signal: AbortSignal
): Promise<{
text: string;
llmMs: number;
ttsMs: number;
firstAudioMs: number;
}> {
if (typeof response === "string") {
const llmMs = Date.now() - llmStart;
this.#sendJSON(connection, {
type: "transcript_start",
role: "assistant"
});
this.#sendJSON(connection, {
type: "transcript_end",
text: response
});
const ttsStart = Date.now();
const audio = await this.#synthesizeWithHooks(response, connection);
const ttsMs = Date.now() - ttsStart;
if (audio && !signal.aborted) {
connection.send(audio);
}
const firstAudioMs = Date.now() - pipelineStart;
return { text: response, llmMs, ttsMs, firstAudioMs };
}
return this.#streamingTTSPipeline(
connection,
iterateText(response),
llmStart,
pipelineStart,
signal
);
}
async #streamingTTSPipeline(
connection: Connection,
tokenStream: AsyncIterable<string>,
llmStart: number,
pipelineStart: number,
signal: AbortSignal
): Promise<{
text: string;
llmMs: number;
ttsMs: number;
firstAudioMs: number;
}> {
const chunker = new SentenceChunker();
const ttsQueue: AsyncIterable<ArrayBuffer>[] = [];
let fullText = "";
let firstAudioSentAt: number | null = null;
let cumulativeTtsMs = 0;
let streamComplete = false;
let drainNotify: (() => void) | null = null;
let drainPending = false;
const notifyDrain = () => {
if (drainNotify) {
const resolve = drainNotify;
drainNotify = null;
resolve();
} else {
drainPending = true;
}
};
const tts = this.#requireTTS();
const hasStreamingTTS = typeof tts.synthesizeStream === "function";
const drainPromise = (async () => {
let i = 0;
while (true) {
while (i >= ttsQueue.length) {
if (streamComplete && i >= ttsQueue.length) return;
if (drainPending) {
drainPending = false;
continue;
}
await new Promise<void>((r) => {
drainNotify = r;
});
if (streamComplete && i >= ttsQueue.length) return;
}
if (signal.aborted) return;
try {
for await (const chunk of ttsQueue[i]) {
if (signal.aborted) return;
connection.send(chunk);
if (!firstAudioSentAt) {
firstAudioSentAt = Date.now();
}
}
} catch (err) {
console.error("[VoiceAgent] TTS error for sentence:", err);
this.#sendJSON(connection, {
type: "error",
message:
err instanceof Error ? err.message : "TTS failed for a sentence"
});
}
i++;
}
})();
const makeSentenceTTS = (
sentence: string
): AsyncIterable<ArrayBuffer> => {
const self = this;
async function* generate() {
const ttsStart = Date.now();
const text = await self.beforeSynthesize(sentence, connection);
if (!text) return;
if (hasStreamingTTS) {
for await (const chunk of tts.synthesizeStream!(text, signal)) {
const processed = await self.afterSynthesize(
chunk,
text,
connection
);
if (processed) yield processed;
}
} else {
const rawAudio = await tts.synthesize(text, signal);
const processed = await self.afterSynthesize(
rawAudio,
text,
connection
);
if (processed) yield processed;
}
cumulativeTtsMs += Date.now() - ttsStart;
}
return eagerAsyncIterable(generate());
};
const enqueueSentence = (sentence: string) => {
ttsQueue.push(makeSentenceTTS(sentence));
notifyDrain();
};
this.#sendJSON(connection, {
type: "transcript_start",
role: "assistant"
});
for await (const token of tokenStream) {
if (signal.aborted) break;
fullText += token;
this.#sendJSON(connection, { type: "transcript_delta", text: token });
const sentences = chunker.add(token);
for (const sentence of sentences) {
enqueueSentence(sentence);
}
}
const llmMs = Date.now() - llmStart;
const remaining = chunker.flush();
for (const sentence of remaining) {
enqueueSentence(sentence);
}
streamComplete = true;
notifyDrain();
this.#sendJSON(connection, { type: "transcript_end", text: fullText });
await drainPromise;
const firstAudioMs = firstAudioSentAt
? firstAudioSentAt - pipelineStart
: 0;
return { text: fullText, llmMs, ttsMs: cumulativeTtsMs, firstAudioMs };
}
// --- Internal: protocol helpers ---
#sendJSON(connection: Connection, data: unknown) {
const parsed = data as Record<string, unknown>;
sendVoiceJSON(
connection,
data,
"VoiceAgent",
parsed.type === "transcript_delta"
);
}
}
return VoiceAgentMixin;
}
// --- Eager async iterable ---
function eagerAsyncIterable<T>(source: AsyncIterable<T>): AsyncIterable<T> {
const buffer: T[] = [];
let finished = false;
let error: unknown = null;
let waitResolve: (() => void) | null = null;
const notify = () => {
if (waitResolve) {
const resolve = waitResolve;
waitResolve = null;
resolve();
}
};
(async () => {
try {
for await (const item of source) {
buffer.push(item);
notify();
}
} catch (err) {
error = err;
} finally {
finished = true;
notify();
}
})();
return {
[Symbol.asyncIterator]() {
let index = 0;
return {
async next(): Promise<IteratorResult<T>> {
while (index >= buffer.length && !finished) {
await new Promise<void>((r) => {
waitResolve = r;
});
}
if (error) {
throw error;
}
if (index >= buffer.length) {
return { done: true, value: undefined };
}
return { done: false, value: buffer[index++] };
}
};
}
};
}