branch:
index.ts
8823 bytesRaw
import type {
  StreamingSTTProvider,
  StreamingSTTSession,
  StreamingSTTSessionOptions
} from "@cloudflare/voice";

export interface DeepgramStreamingSTTOptions {
  /** Deepgram API key. */
  apiKey: string;
  /** Deepgram model. @default "nova-3" */
  model?: string;
  /** Language code. @default "en" */
  language?: string;
  /** Enable smart formatting (numbers, dates, etc.). @default true */
  smartFormat?: boolean;
  /** Enable punctuation. @default true */
  punctuate?: boolean;
  /** Enable filler words (um, uh). @default false */
  fillerWords?: boolean;
  /**
   * Encoding of the audio being sent.
   * The voice pipeline sends 16-bit PCM at 16kHz mono.
   * @default "linear16"
   */
  encoding?: string;
  /** Sample rate in Hz. @default 16000 */
  sampleRate?: number;
  /** Number of audio channels. @default 1 */
  channels?: number;
}

const DEEPGRAM_WS_URL = "wss://api.deepgram.com/v1/listen";

/**
 * Deepgram streaming speech-to-text provider for the Agents voice pipeline.
 *
 * Creates per-utterance WebSocket sessions to Deepgram's real-time API.
 * Audio is streamed incrementally as it arrives, producing interim and
 * final transcript results in real time.
 *
 * @example
 * ```typescript
 * import { Agent } from "agents";
 * import { withVoice } from "@cloudflare/voice";
 * import { DeepgramStreamingSTT } from "@cloudflare/voice-deepgram";
 *
 * const VoiceAgent = withVoice(Agent);
 *
 * export class MyAgent extends VoiceAgent<Env> {
 *   streamingStt = new DeepgramStreamingSTT({
 *     apiKey: this.env.DEEPGRAM_API_KEY
 *   });
 *
 *   async onTurn(transcript, context) { ... }
 * }
 * ```
 */
export class DeepgramStreamingSTT implements StreamingSTTProvider {
  #apiKey: string;
  #model: string;
  #language: string;
  #smartFormat: boolean;
  #punctuate: boolean;
  #fillerWords: boolean;
  #encoding: string;
  #sampleRate: number;
  #channels: number;

  constructor(options: DeepgramStreamingSTTOptions) {
    this.#apiKey = options.apiKey;
    this.#model = options.model ?? "nova-3";
    this.#language = options.language ?? "en";
    this.#smartFormat = options.smartFormat ?? true;
    this.#punctuate = options.punctuate ?? true;
    this.#fillerWords = options.fillerWords ?? false;
    this.#encoding = options.encoding ?? "linear16";
    this.#sampleRate = options.sampleRate ?? 16000;
    this.#channels = options.channels ?? 1;
  }

  createSession(options?: StreamingSTTSessionOptions): StreamingSTTSession {
    const params = new URLSearchParams({
      model: this.#model,
      language: options?.language ?? this.#language,
      encoding: this.#encoding,
      sample_rate: String(this.#sampleRate),
      channels: String(this.#channels),
      interim_results: "true",
      punctuate: String(this.#punctuate),
      smart_format: String(this.#smartFormat),
      filler_words: String(this.#fillerWords),
      // endpointing disabled — we control turn boundaries ourselves
      endpointing: "false"
    });

    const url = `${DEEPGRAM_WS_URL}?${params}`;
    return new DeepgramStreamingSTTSession(url, this.#apiKey, options);
  }
}

/**
 * A single streaming STT session backed by a Deepgram WebSocket connection.
 *
 * Lifecycle: created at start-of-speech, receives audio via feed(),
 * flushed via finish() at end-of-speech, or aborted on interrupt.
 */
class DeepgramStreamingSTTSession implements StreamingSTTSession {
  #onInterim: ((text: string) => void) | undefined;
  #onFinal: ((text: string) => void) | undefined;

  #ws: WebSocket | null = null;
  #connected = false;
  #aborted = false;

  // Audio chunks queued before the WebSocket is open
  #pendingChunks: ArrayBuffer[] = [];

  // Accumulates finalized transcript segments from Deepgram
  #finalizedSegments: string[] = [];

  // Resolves when Deepgram sends the final close-acknowledgement
  // after we send CloseStream, or when we see the last is_final.
  #finishResolve: ((transcript: string) => void) | null = null;
  #finishPromise: Promise<string> | null = null;

  // Whether finish() has been called
  #finishing = false;

  constructor(
    url: string,
    apiKey: string,
    options?: StreamingSTTSessionOptions
  ) {
    this.#onInterim = options?.onInterim;
    this.#onFinal = options?.onFinal;

    if (options?.signal?.aborted) {
      this.#aborted = true;
      return;
    }

    options?.signal?.addEventListener(
      "abort",
      () => {
        this.abort();
      },
      { once: true }
    );

    this.#connect(url, apiKey);
  }

  async #connect(url: string, apiKey: string): Promise<void> {
    try {
      // Workers outbound WebSocket — use fetch with Upgrade header
      const resp = await fetch(url, {
        headers: {
          Upgrade: "websocket",
          Authorization: `Token ${apiKey}`
        }
      });

      if (this.#aborted) {
        // Aborted while connecting
        const ws = (resp as unknown as { webSocket?: WebSocket }).webSocket;
        if (ws) {
          ws.accept();
          ws.close();
        }
        return;
      }

      const ws = (resp as unknown as { webSocket?: WebSocket }).webSocket;
      if (!ws) {
        console.error("[DeepgramSTT] Failed to establish WebSocket connection");
        this.#resolveFinish();
        return;
      }

      ws.accept();
      this.#ws = ws;
      this.#connected = true;

      ws.addEventListener("message", (event: MessageEvent) => {
        this.#handleMessage(event);
      });

      ws.addEventListener("close", () => {
        this.#connected = false;
        this.#resolveFinish();
      });

      ws.addEventListener("error", (event: Event) => {
        console.error("[DeepgramSTT] WebSocket error:", event);
        this.#connected = false;
        this.#resolveFinish();
      });

      // Flush any audio chunks that arrived before the WS was open
      for (const chunk of this.#pendingChunks) {
        ws.send(chunk);
      }
      this.#pendingChunks = [];

      // If finish() was called while we were connecting, close now
      if (this.#finishing) {
        this.#sendCloseStream();
      }
    } catch (err) {
      console.error("[DeepgramSTT] Connection error:", err);
      this.#resolveFinish();
    }
  }

  feed(chunk: ArrayBuffer): void {
    if (this.#aborted || this.#finishing) return;

    if (this.#connected && this.#ws) {
      this.#ws.send(chunk);
    } else {
      // Queue until connected
      this.#pendingChunks.push(chunk);
    }
  }

  async finish(): Promise<string> {
    if (this.#aborted) return "";

    this.#finishing = true;

    // Create the promise that will resolve when Deepgram closes
    if (!this.#finishPromise) {
      this.#finishPromise = new Promise<string>((resolve) => {
        this.#finishResolve = resolve;
      });
    }

    if (this.#connected && this.#ws) {
      this.#sendCloseStream();
    }
    // else: #connect() will call #sendCloseStream() when it opens

    return this.#finishPromise;
  }

  abort(): void {
    if (this.#aborted) return;
    this.#aborted = true;
    this.#pendingChunks = [];

    if (this.#ws) {
      try {
        this.#ws.close();
      } catch {
        // ignore close errors
      }
      this.#ws = null;
    }

    this.#resolveFinish();
  }

  #sendCloseStream(): void {
    if (this.#ws && this.#connected) {
      try {
        this.#ws.send(JSON.stringify({ type: "CloseStream" }));
      } catch {
        // Connection may have dropped
        this.#resolveFinish();
      }
    }
  }

  #resolveFinish(): void {
    if (this.#finishResolve) {
      const transcript = this.#finalizedSegments.join(" ").trim();
      this.#finishResolve(transcript);
      this.#finishResolve = null;
    }
  }

  #handleMessage(event: MessageEvent): void {
    if (this.#aborted) return;

    try {
      const data =
        typeof event.data === "string" ? JSON.parse(event.data) : null;

      if (!data) return;

      // Deepgram response schema
      if (data.type === "Results") {
        const alt = data.channel?.alternatives?.[0];
        if (!alt) return;

        const transcript: string = alt.transcript ?? "";

        if (data.is_final) {
          // Finalized segment — stable, will not change
          if (transcript) {
            this.#finalizedSegments.push(transcript);
            this.#onFinal?.(transcript);
          }
        } else {
          // Interim result — unstable, may change
          if (transcript) {
            this.#onInterim?.(transcript);
          }
        }
      }

      // Metadata message (connection opened) — ignore
      // Error messages
      if (data.type === "Error") {
        console.error(
          `[DeepgramSTT] Error: ${data.description ?? data.message ?? JSON.stringify(data)}`
        );
      }
    } catch {
      // Ignore non-JSON or malformed messages
    }
  }
}