branch:
sfu-utils.ts
6838 bytesRaw
/**
 * Pure utility functions for the Cloudflare Realtime SFU integration.
 *
 * Extracted from sfu.ts for testability. These handle:
 * - Protobuf varint encoding/decoding
 * - SFU WebSocket adapter protobuf packet encoding/decoding
 * - Audio format conversion (48kHz stereo ↔ 16kHz mono)
 */

// --- Protobuf helpers ---
// The SFU WebSocket adapter uses a simple protobuf message:
//   message Packet {
//     uint32 sequenceNumber = 1;
//     uint32 timestamp = 2;
//     bytes payload = 5;
//   }

export function decodeVarint(
  buf: Uint8Array,
  offset: number
): { value: number; bytesRead: number } {
  let value = 0;
  let shift = 0;
  let bytesRead = 0;
  while (offset + bytesRead < buf.length) {
    const byte = buf[offset + bytesRead];
    value |= (byte & 0x7f) << shift;
    bytesRead++;
    if ((byte & 0x80) === 0) break;
    shift += 7;
  }
  return { value, bytesRead };
}

export function encodeVarint(value: number): Uint8Array {
  const bytes: number[] = [];
  while (value > 0x7f) {
    bytes.push((value & 0x7f) | 0x80);
    value >>>= 7;
  }
  bytes.push(value & 0x7f);
  return new Uint8Array(bytes);
}

/** Extract the PCM payload from a protobuf Packet message. */
export function extractPayloadFromProtobuf(
  data: ArrayBuffer
): Uint8Array | null {
  const buf = new Uint8Array(data);
  let offset = 0;

  while (offset < buf.length) {
    const { value: tag, bytesRead: tagBytes } = decodeVarint(buf, offset);
    offset += tagBytes;

    const fieldNumber = tag >>> 3;
    const wireType = tag & 0x07;

    if (wireType === 0) {
      // Varint
      const { bytesRead } = decodeVarint(buf, offset);
      offset += bytesRead;
    } else if (wireType === 2) {
      // Length-delimited (bytes)
      const { value: length, bytesRead: lenBytes } = decodeVarint(buf, offset);
      offset += lenBytes;

      if (fieldNumber === 5) {
        // payload field
        return buf.slice(offset, offset + length);
      }
      offset += length;
    } else {
      // Unknown wire type — skip
      break;
    }
  }

  return null;
}

/** Encode PCM payload into a protobuf Packet message (for ingest/buffer mode — just payload). */
export function encodePayloadToProtobuf(payload: Uint8Array): ArrayBuffer {
  // Field 5, wire type 2 (length-delimited): tag = (5 << 3) | 2 = 42
  const tagBytes = encodeVarint(42);
  const lengthBytes = encodeVarint(payload.length);

  const result = new Uint8Array(
    tagBytes.length + lengthBytes.length + payload.length
  );
  result.set(tagBytes, 0);
  result.set(lengthBytes, tagBytes.length);
  result.set(payload, tagBytes.length + lengthBytes.length);

  return result.buffer;
}

// --- Audio conversion ---

/** Downsample 48kHz stereo interleaved PCM to 16kHz mono PCM (both 16-bit LE). */
export function downsample48kStereoTo16kMono(
  stereo48k: Uint8Array
): ArrayBuffer {
  // Input: 48kHz stereo 16-bit LE → 2 channels × 2 bytes = 4 bytes per sample pair
  // Output: 16kHz mono 16-bit LE → 2 bytes per sample
  // Ratio: 48000/16000 = 3, plus stereo→mono = average of L+R

  const inputView = new DataView(
    stereo48k.buffer,
    stereo48k.byteOffset,
    stereo48k.byteLength
  );
  const inputSamples = stereo48k.byteLength / 4; // stereo sample pairs
  const outputSamples = Math.floor(inputSamples / 3);
  const output = new ArrayBuffer(outputSamples * 2);
  const outputView = new DataView(output);

  for (let i = 0; i < outputSamples; i++) {
    const srcOffset = i * 3 * 4; // 3x downsample, 4 bytes per stereo pair
    if (srcOffset + 3 >= stereo48k.byteLength) break;
    const left = inputView.getInt16(srcOffset, true);
    const right = inputView.getInt16(srcOffset + 2, true);
    const mono = Math.round((left + right) / 2);
    outputView.setInt16(i * 2, mono, true);
  }

  return output;
}

/** Upsample 16kHz mono PCM to 48kHz stereo interleaved PCM (both 16-bit LE). */
export function upsample16kMonoTo48kStereo(mono16k: ArrayBuffer): Uint8Array {
  const inputView = new DataView(mono16k);
  const inputSamples = mono16k.byteLength / 2;
  const outputSamples = inputSamples * 3; // 3x upsample
  const output = new ArrayBuffer(outputSamples * 4); // stereo = 4 bytes per pair
  const outputView = new DataView(output);

  for (let i = 0; i < inputSamples; i++) {
    const sample = inputView.getInt16(i * 2, true);
    // Write 3 stereo samples (simple sample duplication)
    for (let j = 0; j < 3; j++) {
      const outOffset = (i * 3 + j) * 4;
      outputView.setInt16(outOffset, sample, true); // left
      outputView.setInt16(outOffset + 2, sample, true); // right
    }
  }

  return new Uint8Array(output);
}

// --- SFU API helpers ---

export interface SFUConfig {
  appId: string;
  apiToken: string;
}

const SFU_API_BASE = "https://rtc.live.cloudflare.com/v1";

export async function sfuFetch(
  config: SFUConfig,
  path: string,
  body: unknown
): Promise<unknown> {
  const url = `${SFU_API_BASE}/apps/${config.appId}${path}`;
  const response = await fetch(url, {
    method: "POST",
    headers: {
      Authorization: `Bearer ${config.apiToken}`,
      "Content-Type": "application/json"
    },
    body: JSON.stringify(body)
  });
  if (!response.ok) {
    const text = await response.text();
    throw new Error(`SFU API error ${response.status}: ${text}`);
  }
  return response.json();
}

export async function createSFUSession(
  config: SFUConfig
): Promise<{ sessionId: string }> {
  const url = `${SFU_API_BASE}/apps/${config.appId}/sessions/new`;
  const response = await fetch(url, {
    method: "POST",
    headers: {
      Authorization: `Bearer ${config.apiToken}`
    }
  });
  if (!response.ok) {
    const text = await response.text();
    throw new Error(`SFU API error ${response.status}: ${text}`);
  }
  return response.json() as Promise<{ sessionId: string }>;
}

export async function addSFUTracks(
  config: SFUConfig,
  sessionId: string,
  body: unknown
): Promise<unknown> {
  return sfuFetch(config, `/sessions/${sessionId}/tracks/new`, body);
}

export async function renegotiateSFUSession(
  config: SFUConfig,
  sessionId: string,
  sdp: string
): Promise<unknown> {
  const url = `${SFU_API_BASE}/apps/${config.appId}/sessions/${sessionId}/renegotiate`;
  const response = await fetch(url, {
    method: "PUT",
    headers: {
      Authorization: `Bearer ${config.apiToken}`,
      "Content-Type": "application/json"
    },
    body: JSON.stringify({
      sessionDescription: { type: "answer", sdp }
    })
  });
  if (!response.ok) {
    const text = await response.text();
    throw new Error(`SFU renegotiate error ${response.status}: ${text}`);
  }
  return response.json();
}

export async function createSFUWebSocketAdapter(
  config: SFUConfig,
  tracks: unknown[]
): Promise<unknown> {
  return sfuFetch(config, "/adapters/websocket/new", { tracks });
}