branch:
audio-pipeline.ts
6365 bytesRaw
/**
* Shared audio pipeline utilities and per-connection state management.
* Used internally by both withVoice and withVoiceInput mixins.
*/
import type {
StreamingSTTProvider,
StreamingSTTSession,
StreamingSTTSessionOptions
} from "./types";
// --- Audio utilities ---
export function concatenateBuffers(buffers: ArrayBuffer[]): ArrayBuffer {
const totalLength = buffers.reduce((sum, buf) => sum + buf.byteLength, 0);
const result = new Uint8Array(totalLength);
let offset = 0;
for (const buf of buffers) {
result.set(new Uint8Array(buf), offset);
offset += buf.byteLength;
}
return result.buffer;
}
// --- Default option values ---
export const DEFAULT_VAD_THRESHOLD = 0.5;
export const DEFAULT_MIN_AUDIO_BYTES = 16000; // 0.5s at 16kHz mono 16-bit
export const DEFAULT_VAD_PUSHBACK_SECONDS = 2;
export const DEFAULT_VAD_RETRY_MS = 3000;
/** Max audio buffer size per connection: 30 seconds at 16kHz mono 16-bit = 960KB. */
export const MAX_AUDIO_BUFFER_BYTES = 960_000;
// --- Protocol helper ---
export function sendVoiceJSON(
connection: { send(data: string | ArrayBuffer): void },
data: unknown,
_logPrefix: string,
_skipLog = false
): void {
const json = JSON.stringify(data);
connection.send(json);
}
// --- Connection audio state manager ---
/**
* Manages per-connection audio pipeline state for voice mixins.
* Owns the Maps/Sets for audio buffers, STT sessions, timers, and abort controllers.
* Does not own pipeline orchestration — that stays in each mixin.
*/
export class AudioConnectionManager {
#audioBuffers = new Map<string, ArrayBuffer[]>();
#sttSessions = new Map<string, StreamingSTTSession>();
#vadRetryTimers = new Map<string, ReturnType<typeof setTimeout>>();
#eotTriggered = new Set<string>();
#activePipeline = new Map<string, AbortController>();
constructor(_logPrefix: string) {}
// --- Connection lifecycle ---
initConnection(connectionId: string): void {
if (!this.#audioBuffers.has(connectionId)) {
this.#audioBuffers.set(connectionId, []);
}
}
isInCall(connectionId: string): boolean {
return this.#audioBuffers.has(connectionId);
}
cleanup(connectionId: string): void {
this.abortPipeline(connectionId);
this.#audioBuffers.delete(connectionId);
this.abortSTTSession(connectionId);
this.clearVadRetry(connectionId);
this.#eotTriggered.delete(connectionId);
}
// --- Audio buffering ---
bufferAudio(connectionId: string, chunk: ArrayBuffer): void {
const buffer = this.#audioBuffers.get(connectionId);
if (!buffer) return;
buffer.push(chunk);
let totalBytes = 0;
for (const buf of buffer) totalBytes += buf.byteLength;
// Trim to max buffer size
while (totalBytes > MAX_AUDIO_BUFFER_BYTES && buffer.length > 1) {
totalBytes -= buffer.shift()!.byteLength;
}
// Feed to streaming STT session if active
const session = this.#sttSessions.get(connectionId);
if (session) {
session.feed(chunk);
}
}
/**
* Concatenate and clear the audio buffer for a connection.
* Returns null if no audio or buffer doesn't exist.
*/
getAndClearAudio(connectionId: string): ArrayBuffer | null {
const chunks = this.#audioBuffers.get(connectionId);
if (!chunks || chunks.length === 0) return null;
const audio = concatenateBuffers(chunks);
this.#audioBuffers.set(connectionId, []);
return audio;
}
clearAudioBuffer(connectionId: string): void {
if (this.#audioBuffers.has(connectionId)) {
this.#audioBuffers.set(connectionId, []);
}
}
pushbackAudio(connectionId: string, audio: ArrayBuffer): void {
const buffer = this.#audioBuffers.get(connectionId);
if (buffer) {
buffer.unshift(audio);
} else {
this.#audioBuffers.set(connectionId, [audio]);
}
}
// --- STT sessions ---
hasSTTSession(connectionId: string): boolean {
return this.#sttSessions.has(connectionId);
}
startSTTSession(
connectionId: string,
provider: StreamingSTTProvider,
options: StreamingSTTSessionOptions
): void {
const session = provider.createSession(options);
this.#sttSessions.set(connectionId, session);
}
async flushSTTSession(connectionId: string): Promise<string> {
const session = this.#sttSessions.get(connectionId);
if (!session) return "";
const transcript = await session.finish();
this.#sttSessions.delete(connectionId);
return transcript;
}
abortSTTSession(connectionId: string): void {
const session = this.#sttSessions.get(connectionId);
if (session) {
session.abort();
this.#sttSessions.delete(connectionId);
}
}
/** Remove the STT session without aborting (used after provider-driven EOT). */
removeSTTSession(connectionId: string): void {
this.#sttSessions.delete(connectionId);
}
// --- EOT tracking ---
isEOTTriggered(connectionId: string): boolean {
return this.#eotTriggered.has(connectionId);
}
setEOTTriggered(connectionId: string): void {
this.#eotTriggered.add(connectionId);
}
clearEOT(connectionId: string): void {
this.#eotTriggered.delete(connectionId);
}
// --- Pipeline abort ---
/**
* Abort any in-flight pipeline and create a new AbortController.
* Returns the new AbortSignal.
*/
createPipelineAbort(connectionId: string): AbortSignal {
this.abortPipeline(connectionId);
const controller = new AbortController();
this.#activePipeline.set(connectionId, controller);
return controller.signal;
}
abortPipeline(connectionId: string): void {
this.#activePipeline.get(connectionId)?.abort();
this.#activePipeline.delete(connectionId);
}
clearPipelineAbort(connectionId: string): void {
this.#activePipeline.delete(connectionId);
}
// --- VAD retry ---
scheduleVadRetry(
connectionId: string,
callback: () => void,
retryMs: number
): void {
this.clearVadRetry(connectionId);
this.#vadRetryTimers.set(
connectionId,
setTimeout(() => {
this.#vadRetryTimers.delete(connectionId);
callback();
}, retryMs)
);
}
clearVadRetry(connectionId: string): void {
const timer = this.#vadRetryTimers.get(connectionId);
if (timer) {
clearTimeout(timer);
this.#vadRetryTimers.delete(connectionId);
}
}
}