branch:
index.ts
11759 bytesRaw
/**
 * Twilio Media Streams adapter for the Agents voice pipeline.
 *
 * Bridges Twilio's bidirectional Media Streams WebSocket protocol
 * to VoiceAgent's binary PCM + JSON voice protocol.
 *
 * Twilio sends: mulaw 8kHz base64-encoded audio in JSON messages
 * VoiceAgent expects: 16kHz 16-bit PCM as binary WebSocket frames + JSON control messages
 *
 * This adapter handles:
 * - Decoding base64 mulaw 8kHz audio from Twilio → resampling to 16kHz PCM for VoiceAgent
 * - Encoding VoiceAgent's MP3 TTS output → mulaw 8kHz base64 for Twilio
 * - Translating Twilio lifecycle events (connected, start, stop) to VoiceAgent protocol (start_call, end_call)
 * - Forwarding VoiceAgent JSON messages (status, transcript, etc.) to the caller via marks
 *
 * @example
 * ```typescript
 * import { withVoice } from "@cloudflare/voice";
 * import { TwilioAdapter } from "@cloudflare/voice-twilio";
 *
 * class MyAgent extends VoiceAgent<Env> {
 *   async onTurn(transcript, context) { ... }
 * }
 *
 * export default {
 *   async fetch(request, env) {
 *     // Twilio sends WebSocket connections to your stream URL
 *     if (new URL(request.url).pathname === "/twilio") {
 *       return TwilioAdapter.handleRequest(request, env, "MyAgent");
 *     }
 *     return routeAgentRequest(request, env);
 *   }
 * };
 * ```
 */

// --- Audio conversion utilities ---

/**
 * mulaw decoding table. Maps mulaw byte values to 16-bit linear PCM samples.
 */
const MULAW_DECODE_TABLE = new Int16Array(256);
{
  for (let i = 0; i < 256; i++) {
    // Invert all bits
    const mu = ~i & 0xff;
    const sign = mu & 0x80;
    const exponent = (mu >> 4) & 0x07;
    const mantissa = mu & 0x0f;
    let sample = ((mantissa << 3) + 0x84) << exponent;
    sample -= 0x84;
    MULAW_DECODE_TABLE[i] = sign ? -sample : sample;
  }
}

/**
 * mulaw encoding table. Maps 16-bit linear PCM sample values to mulaw bytes.
 * Uses a 16-bit lookup for the positive range, negated for negative values.
 */
const MULAW_BIAS = 0x84;
const MULAW_CLIP = 32635;

// Exported for future use in outbound audio conversion (agent → Twilio).
export function encodeMulaw(sample: number): number {
  const sign = sample < 0 ? 0x80 : 0;
  if (sample < 0) sample = -sample;
  if (sample > MULAW_CLIP) sample = MULAW_CLIP;
  sample += MULAW_BIAS;

  let exponent = 7;
  const expMask = 0x4000;
  for (; exponent > 0; exponent--) {
    if (sample & expMask) break;
    sample <<= 1;
  }

  const mantissa = (sample >> 10) & 0x0f;
  return ~(sign | (exponent << 4) | mantissa) & 0xff;
}

/**
 * Decode mulaw 8kHz audio to 16-bit PCM 8kHz.
 */
function decodeMulawToPCM(mulawData: Uint8Array): Int16Array {
  const pcm = new Int16Array(mulawData.length);
  for (let i = 0; i < mulawData.length; i++) {
    pcm[i] = MULAW_DECODE_TABLE[mulawData[i]];
  }
  return pcm;
}

/**
 * Resample PCM audio from one sample rate to another using linear interpolation.
 */
function resamplePCM(
  input: Int16Array,
  fromRate: number,
  toRate: number
): Int16Array {
  if (fromRate === toRate) return input;
  const ratio = fromRate / toRate;
  const outputLength = Math.floor(input.length / ratio);
  const output = new Int16Array(outputLength);

  for (let i = 0; i < outputLength; i++) {
    const srcIndex = i * ratio;
    const idx = Math.floor(srcIndex);
    const frac = srcIndex - idx;
    const a = input[idx] ?? 0;
    const b = input[Math.min(idx + 1, input.length - 1)] ?? 0;
    output[i] = Math.round(a + frac * (b - a));
  }

  return output;
}

/**
 * Convert an Int16Array to an ArrayBuffer of 16-bit LE PCM bytes.
 */
function int16ToArrayBuffer(samples: Int16Array): ArrayBuffer {
  const buffer = new ArrayBuffer(samples.length * 2);
  const view = new DataView(buffer);
  for (let i = 0; i < samples.length; i++) {
    view.setInt16(i * 2, samples[i], true);
  }
  return buffer;
}

// --- Twilio protocol types ---

interface TwilioStartMessage {
  event: "start";
  streamSid: string;
  start: {
    streamSid: string;
    accountSid: string;
    callSid: string;
    tracks: string[];
    customParameters: Record<string, string>;
    mediaFormat: {
      encoding: string;
      sampleRate: number;
      channels: number;
    };
  };
}

interface TwilioMediaMessage {
  event: "media";
  streamSid: string;
  media: {
    track: string;
    chunk: string;
    timestamp: string;
    payload: string; // base64 mulaw
  };
}

// --- Adapter ---

export interface TwilioAdapterOptions {
  /**
   * Instance name for the VoiceAgent Durable Object.
   * If not provided, uses the Twilio Call SID (each call gets its own agent instance).
   */
  instanceName?: string;
}

/**
 * Bridges Twilio Media Streams to a VoiceAgent Durable Object.
 *
 * Use `TwilioAdapter.handleRequest()` in your Worker's fetch handler
 * to accept Twilio WebSocket connections and forward them to your VoiceAgent.
 */
export class TwilioAdapter {
  /**
   * Handle an incoming Twilio Media Streams WebSocket connection.
   * Routes the audio to a VoiceAgent Durable Object.
   *
   * @param request - The incoming WebSocket upgrade request from Twilio
   * @param env - The Worker environment (must contain the agent's DO namespace)
   * @param agentName - The name of the VoiceAgent DO binding in env (e.g., "MyAgent")
   * @param options - Optional adapter configuration
   *
   * @example
   * ```typescript
   * export default {
   *   async fetch(request, env) {
   *     if (new URL(request.url).pathname === "/twilio") {
   *       return TwilioAdapter.handleRequest(request, env, "MyAgent");
   *     }
   *     return routeAgentRequest(request, env);
   *   }
   * };
   * ```
   */
  static handleRequest(
    request: Request,
    env: Record<string, unknown>,
    agentName: string,
    options?: TwilioAdapterOptions
  ): Response {
    const upgradeHeader = request.headers.get("Upgrade");
    if (!upgradeHeader || upgradeHeader.toLowerCase() !== "websocket") {
      return new Response("Expected WebSocket upgrade", { status: 426 });
    }

    const { 0: twilioSocket, 1: serverSocket } = new WebSocketPair();

    serverSocket.accept();

    let streamSid: string | null = null;
    let agentSocket: WebSocket | null = null;
    let callSid: string | null = null;

    // Connect to the VoiceAgent DO
    const connectToAgent = async (instanceId: string) => {
      const namespace = env[agentName] as DurableObjectNamespace | undefined;
      if (!namespace) {
        console.error(
          `[TwilioAdapter] DO namespace "${agentName}" not found in env`
        );
        return;
      }

      const id = namespace.idFromName(instanceId);
      const stub = namespace.get(id);

      // Create a WebSocket connection to the agent
      const agentUrl = new URL(request.url);
      agentUrl.pathname = `/agents/${agentName.toLowerCase()}/${instanceId}`;
      agentUrl.protocol = agentUrl.protocol.replace("http", "ws");

      const agentResp = await stub.fetch(
        new Request(agentUrl.toString(), {
          headers: { Upgrade: "websocket" }
        })
      );

      const ws = agentResp.webSocket;
      if (!ws) {
        console.error("[TwilioAdapter] Failed to get WebSocket from agent");
        return;
      }

      ws.accept();
      agentSocket = ws;

      // Forward agent messages back to Twilio
      ws.addEventListener("message", (event) => {
        if (!streamSid) return;

        if (typeof event.data === "string") {
          // JSON messages from agent — we can use Twilio marks to track them.
          // Forward as a mark so the Twilio side can correlate events.
          try {
            const msg = JSON.parse(event.data);
            if (
              serverSocket.readyState === WebSocket.OPEN &&
              (msg.type === "transcript" ||
                msg.type === "transcript_end" ||
                msg.type === "status")
            ) {
              serverSocket.send(
                JSON.stringify({
                  event: "mark",
                  streamSid,
                  mark: { name: JSON.stringify(msg) }
                })
              );
            }
          } catch {
            // ignore non-JSON
          }
        } else if (event.data instanceof ArrayBuffer) {
          // Audio from agent. This is expected to be 16kHz 16-bit mono PCM.
          //
          // IMPORTANT: The default Workers AI TTS returns MP3, which cannot
          // be decoded to PCM on Workers (no AudioContext). For Twilio, the
          // VoiceAgent MUST be configured with a TTS provider that outputs
          // raw PCM (e.g., ElevenLabs with output_format "pcm_16000", or a
          // custom synthesize() that returns 16kHz 16-bit PCM).
          //
          // Convert: 16kHz PCM → resample to 8kHz → encode mulaw → base64
          const pcm16k = new Int16Array(event.data);
          const pcm8k = resamplePCM(pcm16k, 16000, 8000);
          const mulawBytes = new Uint8Array(pcm8k.length);
          for (let i = 0; i < pcm8k.length; i++) {
            mulawBytes[i] = encodeMulaw(pcm8k[i]);
          }
          let binary = "";
          for (let i = 0; i < mulawBytes.length; i++) {
            binary += String.fromCharCode(mulawBytes[i]);
          }
          const payload = btoa(binary);

          if (serverSocket.readyState === WebSocket.OPEN) {
            serverSocket.send(
              JSON.stringify({
                event: "media",
                streamSid,
                media: { payload }
              })
            );
          }
        }
      });

      ws.addEventListener("close", () => {});

      // Send start_call to agent
      ws.send(JSON.stringify({ type: "start_call" }));
    };

    serverSocket.addEventListener("message", async (event) => {
      if (typeof event.data !== "string") return;

      let msg: { event: string };
      try {
        msg = JSON.parse(event.data);
      } catch {
        return;
      }

      switch (msg.event) {
        case "connected":
          break;

        case "start": {
          const startMsg = msg as unknown as TwilioStartMessage;
          streamSid = startMsg.streamSid;
          callSid = startMsg.start.callSid;

          const instanceId = options?.instanceName ?? callSid ?? "default";
          await connectToAgent(instanceId);
          break;
        }

        case "media": {
          const mediaMsg = msg as unknown as TwilioMediaMessage;
          if (mediaMsg.media.track !== "inbound") break;

          // Decode base64 mulaw → PCM 8kHz → resample to 16kHz → send as binary
          const mulawBytes = Uint8Array.from(
            atob(mediaMsg.media.payload),
            (c) => c.charCodeAt(0)
          );
          const pcm8k = decodeMulawToPCM(mulawBytes);
          const pcm16k = resamplePCM(pcm8k, 8000, 16000);
          const pcmBuffer = int16ToArrayBuffer(pcm16k);

          if (agentSocket?.readyState === WebSocket.OPEN) {
            agentSocket.send(pcmBuffer);
          }
          break;
        }

        case "stop": {
          if (agentSocket?.readyState === WebSocket.OPEN) {
            agentSocket.send(JSON.stringify({ type: "end_call" }));
            agentSocket.close();
          }
          break;
        }

        case "dtmf": {
          // Forward DTMF tones as non-voice messages
          if (agentSocket?.readyState === WebSocket.OPEN) {
            agentSocket.send(JSON.stringify(msg));
          }
          break;
        }
      }
    });

    serverSocket.addEventListener("close", () => {
      if (agentSocket?.readyState === WebSocket.OPEN) {
        agentSocket.send(JSON.stringify({ type: "end_call" }));
        agentSocket.close();
      }
    });

    return new Response(null, {
      status: 101,
      webSocket: twilioSocket
    });
  }
}