import { PartySocket } from "partysocket"; import { VOICE_PROTOCOL_VERSION } from "./types"; function camelCaseToKebabCase(str: string): string { if (str === str.toUpperCase() && str !== str.toLowerCase()) { return str.toLowerCase().replace(/_/g, "-"); } let kebabified = str.replace( /[A-Z]/g, (letter) => `-${letter.toLowerCase()}` ); kebabified = kebabified.startsWith("-") ? kebabified.slice(1) : kebabified; return kebabified.replace(/_/g, "-").replace(/-$/, ""); } import type { VoiceStatus, VoiceRole, VoiceAudioFormat, VoiceAudioInput, VoiceTransport, TranscriptMessage, VoicePipelineMetrics } from "./types"; // Re-export shared types for consumers importing from this module export { VOICE_PROTOCOL_VERSION } from "./types"; export type { VoiceStatus, VoiceRole, VoiceAudioFormat, VoiceAudioInput, VoiceTransport, VoicePipelineMetrics, TranscriptMessage } from "./types"; export interface VoiceClientOptions { /** Agent name (matches the server-side Durable Object class). */ agent: string; /** Instance name for the agent. @default "default" */ name?: string; // Connection options (optional — defaults work for same-origin) /** Host to connect to. @default window.location.host */ host?: string; /** * Custom transport for sending/receiving data. * Defaults to a WebSocket transport via PartySocket. * Provide a custom implementation for WebRTC, SFU, or other transports. */ transport?: VoiceTransport; /** * Custom audio input source. When provided, VoiceClient does NOT * use its built-in AudioWorklet mic capture. The audio input is * responsible for capturing and routing audio to the server. * It must report audio levels via `onAudioLevel` for silence and * interrupt detection to work. */ audioInput?: VoiceAudioInput; /** * Preferred audio format for server responses. Sent in `start_call` * as a hint — the server may ignore it if it cannot produce that format. * The actual format is declared in the server's `audio_config` message. */ preferredFormat?: VoiceAudioFormat; // Tuning knobs with sensible defaults /** RMS threshold below which audio is considered silence. @default 0.04 */ silenceThreshold?: number; /** How long silence must last before sending end_of_speech (ms). @default 500 */ silenceDurationMs?: number; /** RMS threshold for detecting user speech during agent playback. @default 0.05 */ interruptThreshold?: number; /** Consecutive high-RMS chunks needed to trigger an interrupt. @default 2 */ interruptChunks?: number; /** Maximum transcript messages to keep in memory. @default 200 */ maxTranscriptMessages?: number; } /** Maps each event name to the data type passed to its listeners. */ export interface VoiceClientEventMap { statuschange: VoiceStatus; transcriptchange: TranscriptMessage[]; interimtranscript: string | null; metricschange: VoicePipelineMetrics | null; audiolevelchange: number; connectionchange: boolean; error: string | null; mutechange: boolean; custommessage: unknown; } export type VoiceClientEvent = keyof VoiceClientEventMap; // --- Audio helpers (not exported) --- const WORKLET_PROCESSOR = ` class AudioCaptureProcessor extends AudioWorkletProcessor { constructor() { super(); this.buffer = []; this.sampleRate = sampleRate; this.targetRate = 16000; this.ratio = this.sampleRate / this.targetRate; } process(inputs) { const input = inputs[0]; if (!input || !input[0]) return true; const channelData = input[0]; // Linear interpolation resampling (e.g. 48kHz → 16kHz). // Nearest-neighbor (picking every Nth sample) introduces aliasing // artifacts, especially on sibilants (s, f, th). Linear interpolation // blends adjacent samples, acting as a basic low-pass filter. for (let i = 0; i < channelData.length; i += this.ratio) { const idx = Math.floor(i); const frac = i - idx; if (idx + 1 < channelData.length) { this.buffer.push(channelData[idx] * (1 - frac) + channelData[idx + 1] * frac); } else if (idx < channelData.length) { this.buffer.push(channelData[idx]); } } if (this.buffer.length >= 1600) { const chunk = new Float32Array(this.buffer); this.port.postMessage({ type: 'audio', samples: chunk }, [chunk.buffer]); this.buffer = []; } return true; } } registerProcessor('audio-capture-processor', AudioCaptureProcessor); `; function floatTo16BitPCM(samples: Float32Array): ArrayBuffer { const buffer = new ArrayBuffer(samples.length * 2); const view = new DataView(buffer); for (let i = 0; i < samples.length; i++) { const s = Math.max(-1, Math.min(1, samples[i])); view.setInt16(i * 2, s < 0 ? s * 0x8000 : s * 0x7fff, true); } return buffer; } function computeRMS(samples: Float32Array): number { let sum = 0; for (let i = 0; i < samples.length; i++) { sum += samples[i] * samples[i]; } return Math.sqrt(sum / samples.length); } // --- Default WebSocket transport --- /** * Default VoiceTransport backed by PartySocket (reconnecting WebSocket). * Created automatically when no custom transport is provided. */ export class WebSocketVoiceTransport implements VoiceTransport { #socket: PartySocket | null = null; #options: { agent: string; name?: string; host?: string }; onopen: (() => void) | null = null; onclose: (() => void) | null = null; onerror: ((error?: unknown) => void) | null = null; onmessage: ((data: string | ArrayBuffer | Blob) => void) | null = null; constructor(options: { agent: string; name?: string; host?: string }) { this.#options = options; } get connected(): boolean { return this.#socket?.readyState === WebSocket.OPEN; } sendJSON(data: Record): void { if (this.#socket?.readyState === WebSocket.OPEN) { this.#socket.send(JSON.stringify(data)); } } sendBinary(data: ArrayBuffer): void { if (this.#socket?.readyState === WebSocket.OPEN) { this.#socket.send(data); } } connect(): void { if (this.#socket) return; const agentNamespace = camelCaseToKebabCase(this.#options.agent); const socket = new PartySocket({ party: agentNamespace, room: this.#options.name ?? "default", host: this.#options.host ?? window.location.host, prefix: "agents" }); socket.onopen = () => this.onopen?.(); socket.onclose = () => this.onclose?.(); socket.onerror = () => this.onerror?.(); socket.onmessage = (event: MessageEvent) => { this.onmessage?.(event.data); }; this.#socket = socket; } disconnect(): void { this.#socket?.close(); this.#socket = null; } } // --- VoiceClient --- // oxlint-disable-next-line @typescript-eslint/no-explicit-any -- generic listener storage type AnyListener = (data: any) => void; export class VoiceClient { // Internal state #status: VoiceStatus = "idle"; #transcript: TranscriptMessage[] = []; #metrics: VoicePipelineMetrics | null = null; #audioLevel = 0; #isMuted = false; #connected = false; #error: string | null = null; #lastCustomMessage: unknown = null; #audioFormat: VoiceAudioFormat | null = null; #interimTranscript: string | null = null; #serverProtocolVersion: number | null = null; #inCall = false; // Options (with defaults applied) #silenceThreshold: number; #silenceDurationMs: number; #interruptThreshold: number; #interruptChunks: number; #maxTranscriptMessages: number; // Transport #transport: VoiceTransport | null = null; #options: VoiceClientOptions; // Audio refs #audioContext: AudioContext | null = null; #workletRegistered = false; #workletNode: AudioWorkletNode | null = null; #stream: MediaStream | null = null; #silenceTimer: ReturnType | null = null; #isSpeaking = false; #playbackQueue: ArrayBuffer[] = []; #isPlaying = false; #activeSource: AudioBufferSourceNode | null = null; #interruptChunkCount = 0; // Event listeners #listeners = new Map>(); constructor(options: VoiceClientOptions) { this.#options = options; this.#silenceThreshold = options.silenceThreshold ?? 0.04; this.#silenceDurationMs = options.silenceDurationMs ?? 500; this.#interruptThreshold = options.interruptThreshold ?? 0.05; this.#interruptChunks = options.interruptChunks ?? 2; this.#maxTranscriptMessages = options.maxTranscriptMessages ?? 200; } // --- Public getters --- get status(): VoiceStatus { return this.#status; } get transcript(): TranscriptMessage[] { return this.#transcript; } get metrics(): VoicePipelineMetrics | null { return this.#metrics; } get audioLevel(): number { return this.#audioLevel; } get isMuted(): boolean { return this.#isMuted; } get connected(): boolean { return this.#connected; } get error(): string | null { return this.#error; } /** * The current interim (partial) transcript from streaming STT. * Updates in real time as the user speaks. Cleared when the final * transcript is produced. null when no interim text is available. */ get interimTranscript(): string | null { return this.#interimTranscript; } /** * The protocol version reported by the server. * null until the server sends its welcome message. */ get serverProtocolVersion(): number | null { return this.#serverProtocolVersion; } // --- Event system --- addEventListener( event: K, listener: (data: VoiceClientEventMap[K]) => void ): void { let set = this.#listeners.get(event); if (!set) { set = new Set(); this.#listeners.set(event, set); } set.add(listener as AnyListener); } removeEventListener( event: K, listener: (data: VoiceClientEventMap[K]) => void ): void { this.#listeners.get(event)?.delete(listener as AnyListener); } #emit( event: K, data: VoiceClientEventMap[K] ): void { const set = this.#listeners.get(event); if (set) { for (const listener of set) { listener(data); } } } #trimTranscript(): void { if (this.#transcript.length > this.#maxTranscriptMessages) { this.#transcript = this.#transcript.slice(-this.#maxTranscriptMessages); } } // --- Connection --- connect(): void { if (this.#transport) return; const transport = this.#options.transport ?? new WebSocketVoiceTransport({ agent: this.#options.agent, name: this.#options.name, host: this.#options.host }); transport.onopen = () => { this.#connected = true; this.#error = null; // Announce our protocol version to the server transport.sendJSON({ type: "hello", protocol_version: VOICE_PROTOCOL_VERSION }); this.#emit("connectionchange", true); this.#emit("error", null); // Reconnect recovery: if we were in a call when the connection // dropped, re-establish it on the new connection. The mic is // still running (not stopped on disconnect), so audio resumes // flowing as soon as the server processes start_call. if (this.#inCall) { transport.sendJSON({ type: "start_call" }); } }; transport.onclose = () => { this.#connected = false; this.#emit("connectionchange", false); }; transport.onerror = () => { this.#error = "Connection lost. Reconnecting..."; this.#emit("error", this.#error); }; transport.onmessage = (data: string | ArrayBuffer | Blob) => { if (typeof data === "string") { this.#handleJSONMessage(data); } else if (data instanceof Blob) { data.arrayBuffer().then((buffer) => { this.#playbackQueue.push(buffer); this.#processPlaybackQueue(); }); } else if (data instanceof ArrayBuffer) { this.#playbackQueue.push(data); this.#processPlaybackQueue(); } }; this.#transport = transport; transport.connect(); } disconnect(): void { this.endCall(); this.#transport?.disconnect(); this.#transport = null; this.#connected = false; this.#emit("connectionchange", false); } // --- Public actions --- async startCall(): Promise { if (!this.#transport?.connected) { this.#error = "Cannot start call: not connected. Call connect() first."; this.#emit("error", this.#error); return; } this.#inCall = true; this.#error = null; this.#metrics = null; this.#emit("error", null); this.#emit("metricschange", null); const startMsg: Record = { type: "start_call" }; if (this.#options.preferredFormat) { startMsg.preferred_format = this.#options.preferredFormat; } this.#transport.sendJSON(startMsg); if (this.#options.audioInput) { this.#options.audioInput.onAudioLevel = (rms) => this.#processAudioLevel(rms); this.#options.audioInput.onAudioData = (pcm) => { if (this.#transport?.connected && !this.#isMuted) { this.#transport.sendBinary(pcm); } }; await this.#options.audioInput.start(); } else { await this.#startMic(); } } endCall(): void { this.#inCall = false; if (this.#transport?.connected) { this.#transport.sendJSON({ type: "end_call" }); } if (this.#options.audioInput) { this.#options.audioInput.stop(); this.#options.audioInput.onAudioLevel = null; this.#options.audioInput.onAudioData = null; } else { this.#stopMic(); } this.#activeSource?.stop(); this.#activeSource = null; this.#playbackQueue = []; this.#isPlaying = false; this.#closeAudioContext(); this.#resetDetection(); this.#status = "idle"; this.#emit("statuschange", "idle"); } toggleMute(): void { this.#isMuted = !this.#isMuted; // Reset audio level so the UI shows silence while muted. if (this.#isMuted) { this.#audioLevel = 0; this.#emit("audiolevelchange", 0); } // If muting while speaking, flush the current utterance so the server // processes accumulated audio instead of waiting forever (deadlock: // muted → no audio frames → silence timer never starts → no end_of_speech). if (this.#isMuted && this.#isSpeaking) { this.#isSpeaking = false; if (this.#silenceTimer) { clearTimeout(this.#silenceTimer); this.#silenceTimer = null; } if (this.#transport?.connected) { this.#transport.sendJSON({ type: "end_of_speech" }); } } this.#emit("mutechange", this.#isMuted); } /** * Send a text message to the agent. The agent processes it through * `onTurn()` (bypassing STT) and responds with text transcript and * TTS audio (if in a call) or text-only (if not). */ sendText(text: string): void { if (this.#transport?.connected) { this.#transport.sendJSON({ type: "text_message", text }); } } /** * Send arbitrary JSON to the agent. Use this for app-level messages * that are not part of the voice protocol (e.g. `{ type: "kick_speaker" }`). * The server receives these in the consumer's `onMessage()` handler. */ sendJSON(data: Record): void { if (this.#transport?.connected) { this.#transport.sendJSON(data); } } /** * The last custom (non-voice-protocol) message received from the server. * Listen for the `"custommessage"` event to be notified when this changes. */ get lastCustomMessage(): unknown { return this.#lastCustomMessage; } /** * The audio format the server declared for binary payloads. * Set when the server sends `audio_config` at call start. */ get audioFormat(): VoiceAudioFormat | null { return this.#audioFormat; } // --- Voice protocol handler --- #handleJSONMessage(data: string): void { let msg: Record; try { msg = JSON.parse(data); } catch { // Not JSON — ignore (e.g. state sync binary frames) return; } switch (msg.type) { case "welcome": this.#serverProtocolVersion = msg.protocol_version as number; if (msg.protocol_version !== VOICE_PROTOCOL_VERSION) { console.warn( `[VoiceClient] Protocol version mismatch: client=${VOICE_PROTOCOL_VERSION}, server=${msg.protocol_version}` ); } break; case "audio_config": this.#audioFormat = msg.format as VoiceAudioFormat; break; case "status": this.#status = msg.status as VoiceStatus; if (msg.status === "listening" || msg.status === "idle") { this.#error = null; this.#emit("error", null); } this.#emit("statuschange", this.#status); break; case "transcript_interim": this.#interimTranscript = msg.text as string; this.#emit("interimtranscript", this.#interimTranscript); break; case "transcript": // Final transcript arrived — clear interim this.#interimTranscript = null; this.#emit("interimtranscript", null); this.#transcript = [ ...this.#transcript, { role: msg.role as VoiceRole, text: msg.text as string, timestamp: Date.now() } ]; this.#trimTranscript(); this.#emit("transcriptchange", this.#transcript); break; case "transcript_start": this.#transcript = [ ...this.#transcript, { role: "assistant", text: "", timestamp: Date.now() } ]; this.#trimTranscript(); this.#emit("transcriptchange", this.#transcript); break; case "transcript_delta": { if (this.#transcript.length === 0) break; const updated = [...this.#transcript]; const last = updated[updated.length - 1]; if (last.role === "assistant") { updated[updated.length - 1] = { ...last, text: last.text + (msg.text as string) }; this.#transcript = updated; this.#emit("transcriptchange", this.#transcript); } break; } case "transcript_end": { if (this.#transcript.length === 0) break; const updated = [...this.#transcript]; const last = updated[updated.length - 1]; if (last.role === "assistant") { updated[updated.length - 1] = { ...last, text: msg.text as string }; this.#transcript = updated; this.#emit("transcriptchange", this.#transcript); } break; } case "metrics": this.#metrics = { vad_ms: msg.vad_ms as number, stt_ms: msg.stt_ms as number, llm_ms: msg.llm_ms as number, tts_ms: msg.tts_ms as number, first_audio_ms: msg.first_audio_ms as number, total_ms: msg.total_ms as number }; this.#emit("metricschange", this.#metrics); break; case "error": this.#error = msg.message as string; this.#emit("error", this.#error); break; default: // App-level custom message — surface via event this.#lastCustomMessage = msg; this.#emit("custommessage", msg); break; } } // --- Audio context management --- /** Get or create the shared AudioContext. */ async #getAudioContext(): Promise { if (!this.#audioContext) { this.#audioContext = new AudioContext({ sampleRate: 48000 }); } if (this.#audioContext.state === "suspended") { await this.#audioContext.resume(); } return this.#audioContext; } /** Close the AudioContext and release resources. */ #closeAudioContext(): void { if (this.#audioContext) { this.#audioContext.close().catch(() => {}); this.#audioContext = null; this.#workletRegistered = false; } } // --- Audio playback --- async #playAudio(audioData: ArrayBuffer): Promise { try { const ctx = await this.#getAudioContext(); let audioBuffer: AudioBuffer; if (this.#audioFormat === "pcm16") { // Raw 16-bit LE mono PCM at 16kHz — manually construct AudioBuffer const int16 = new Int16Array(audioData); audioBuffer = ctx.createBuffer(1, int16.length, 16000); const channel = audioBuffer.getChannelData(0); for (let i = 0; i < int16.length; i++) { channel[i] = int16[i] / 32768; } } else { // mp3, wav, opus — let the browser decode audioBuffer = await ctx.decodeAudioData(audioData.slice(0)); } const source = ctx.createBufferSource(); source.buffer = audioBuffer; source.connect(ctx.destination); this.#activeSource = source; return new Promise((resolve) => { source.onended = () => { if (this.#activeSource === source) { this.#activeSource = null; } resolve(); }; source.start(); }); } catch (err) { console.error("[VoiceClient] Audio playback error:", err); } } async #processPlaybackQueue(): Promise { if (this.#isPlaying || this.#playbackQueue.length === 0) return; this.#isPlaying = true; while (this.#playbackQueue.length > 0) { const audioData = this.#playbackQueue.shift()!; await this.#playAudio(audioData); } this.#isPlaying = false; } // --- Mic capture --- async #startMic(): Promise { try { const stream = await navigator.mediaDevices.getUserMedia({ audio: { sampleRate: { ideal: 48000 }, channelCount: 1, echoCancellation: true, noiseSuppression: true, autoGainControl: true } }); this.#stream = stream; const ctx = await this.#getAudioContext(); // Only register the worklet processor once per AudioContext. // Calling addModule twice with the same processor name throws. if (!this.#workletRegistered) { const blob = new Blob([WORKLET_PROCESSOR], { type: "application/javascript" }); const workletUrl = URL.createObjectURL(blob); await ctx.audioWorklet.addModule(workletUrl); URL.revokeObjectURL(workletUrl); this.#workletRegistered = true; } const source = ctx.createMediaStreamSource(stream); const workletNode = new AudioWorkletNode(ctx, "audio-capture-processor"); this.#workletNode = workletNode; workletNode.port.onmessage = (event: MessageEvent) => { if (event.data.type === "audio" && !this.#isMuted) { const samples = event.data.samples as Float32Array; const rms = computeRMS(samples); // Send PCM to agent const pcm = floatTo16BitPCM(samples); if (this.#transport?.connected) { this.#transport.sendBinary(pcm); } this.#processAudioLevel(rms); } }; source.connect(workletNode); workletNode.connect(ctx.destination); } catch (err) { console.error("[VoiceClient] Mic error:", err); this.#error = "Microphone access denied. Please allow microphone access and try again."; this.#emit("error", this.#error); } } #stopMic(): void { this.#workletNode?.disconnect(); this.#workletNode = null; this.#stream?.getTracks().forEach((track) => track.stop()); this.#stream = null; this.#resetDetection(); } // --- Audio level processing (shared between built-in mic and custom audioInput) --- #processAudioLevel(rms: number): void { // When muted, ignore incoming audio levels. This prevents false // speech detection when a custom audioInput keeps reporting levels. // The built-in mic already gates on !#isMuted before calling here, // but audioInput implementations don't know about mute state. if (this.#isMuted) return; this.#audioLevel = rms; this.#emit("audiolevelchange", rms); // Interruption detection: user speaking during agent playback if (this.#isPlaying && rms > this.#interruptThreshold) { this.#interruptChunkCount++; if (this.#interruptChunkCount >= this.#interruptChunks) { this.#activeSource?.stop(); this.#activeSource = null; this.#playbackQueue = []; this.#isPlaying = false; this.#interruptChunkCount = 0; if (this.#transport?.connected) { this.#transport.sendJSON({ type: "interrupt" }); } } } else { this.#interruptChunkCount = 0; } // Silence detection if (rms > this.#silenceThreshold) { if (!this.#isSpeaking) { this.#isSpeaking = true; // Notify server that speech started (for streaming STT) if (this.#transport?.connected) { this.#transport.sendJSON({ type: "start_of_speech" }); } } if (this.#silenceTimer) { clearTimeout(this.#silenceTimer); this.#silenceTimer = null; } } else if (this.#isSpeaking) { if (!this.#silenceTimer) { this.#silenceTimer = setTimeout(() => { this.#isSpeaking = false; this.#silenceTimer = null; if (this.#transport?.connected) { this.#transport.sendJSON({ type: "end_of_speech" }); } }, this.#silenceDurationMs); } } } #resetDetection(): void { if (this.#silenceTimer) { clearTimeout(this.#silenceTimer); this.#silenceTimer = null; } this.#isSpeaking = false; this.#interruptChunkCount = 0; this.#audioLevel = 0; this.#emit("audiolevelchange", 0); } }