branch:
sfu.ts
7960 bytesRaw
/**
* Cloudflare Realtime SFU integration for VoiceAgent.
*
* Bridges the SFU WebSocket adapter protocol (48kHz stereo protobuf PCM)
* to the VoiceAgent protocol (16kHz mono 16-bit PCM binary frames + JSON).
*
* Architecture:
* Browser → WebRTC → SFU → WebSocket Adapter → this module → VoiceAgent DO
*
* The SFU handles WebRTC negotiation, codec transcoding, and network
* resilience. This module handles audio format conversion and routing.
*/
import {
extractPayloadFromProtobuf,
encodePayloadToProtobuf,
downsample48kStereoTo16kMono,
upsample16kMonoTo48kStereo,
createSFUSession,
addSFUTracks,
renegotiateSFUSession,
createSFUWebSocketAdapter
} from "@cloudflare/voice";
import type { SFUConfig } from "@cloudflare/voice";
// --- Main SFU handler ---
export interface SFUHandlerOptions {
/** SFU App ID */
appId: string;
/** SFU API Token */
apiToken: string;
/** The VoiceAgent DO namespace */
agentNamespace: DurableObjectNamespace;
/** Agent instance name */
agentInstance?: string;
}
/**
* Handle SFU-related HTTP requests.
* Routes:
* POST /sfu/session — Create SFU session + WebSocket adapters
* POST /sfu/tracks — Add tracks to session (WebRTC offer/answer)
* PUT /sfu/renegotiate — Renegotiate session
* GET /sfu/audio-in — WebSocket endpoint for SFU → Worker (user audio)
* GET /sfu/audio-out — WebSocket endpoint for Worker → SFU (agent audio)
*/
export async function handleSFURequest(
request: Request,
options: SFUHandlerOptions
): Promise<Response | null> {
const url = new URL(request.url);
const path = url.pathname;
const config: SFUConfig = {
appId: options.appId,
apiToken: options.apiToken
};
// Create a new SFU session
if (path === "/sfu/session" && request.method === "POST") {
try {
const result = await createSFUSession(config);
return Response.json(result);
} catch (error) {
return Response.json(
{ error: error instanceof Error ? error.message : "SFU error" },
{ status: 500 }
);
}
}
// Add tracks to an existing session
if (path === "/sfu/tracks" && request.method === "POST") {
try {
const body = (await request.json()) as {
sessionId: string;
tracks: unknown;
};
const result = await addSFUTracks(config, body.sessionId, body.tracks);
return Response.json(result);
} catch (error) {
return Response.json(
{ error: error instanceof Error ? error.message : "SFU error" },
{ status: 500 }
);
}
}
// Renegotiate a session
if (path === "/sfu/renegotiate" && request.method === "PUT") {
try {
const body = (await request.json()) as {
sessionId: string;
sdp: string;
};
const result = await renegotiateSFUSession(
config,
body.sessionId,
body.sdp
);
return Response.json(result);
} catch (error) {
return Response.json(
{ error: error instanceof Error ? error.message : "SFU error" },
{ status: 500 }
);
}
}
// Create WebSocket adapter
if (path === "/sfu/adapter" && request.method === "POST") {
try {
const body = (await request.json()) as { tracks: unknown[] };
const result = await createSFUWebSocketAdapter(config, body.tracks);
return Response.json(result);
} catch (error) {
return Response.json(
{ error: error instanceof Error ? error.message : "SFU error" },
{ status: 500 }
);
}
}
// WebSocket: SFU streams user audio TO us (48kHz stereo PCM)
// We downsample and forward to the VoiceAgent
if (path === "/sfu/audio-in") {
const upgradeHeader = request.headers.get("Upgrade");
if (!upgradeHeader || upgradeHeader.toLowerCase() !== "websocket") {
return new Response("Expected WebSocket", { status: 426 });
}
const { 0: clientSocket, 1: serverSocket } = new WebSocketPair();
serverSocket.accept();
// Connect to the VoiceAgent DO
const instanceName = options.agentInstance ?? "sfu-session";
const id = options.agentNamespace.idFromName(instanceName);
const stub = options.agentNamespace.get(id);
const agentUrl = new URL(request.url);
agentUrl.pathname = `/agents/my-voice-agent/${instanceName}`;
agentUrl.protocol = agentUrl.protocol.replace("http", "ws");
const agentResp = await stub.fetch(
new Request(agentUrl.toString(), {
headers: { Upgrade: "websocket" }
})
);
const agentWs = agentResp.webSocket;
if (!agentWs) {
return new Response("Failed to connect to agent", { status: 500 });
}
agentWs.accept();
// Auto-start a call
agentWs.send(JSON.stringify({ type: "start_call" }));
// Forward agent JSON messages back through the SFU audio-in socket
// (the client can listen on this for transcripts)
agentWs.addEventListener("message", (event) => {
if (
typeof event.data === "string" &&
serverSocket.readyState === WebSocket.OPEN
) {
serverSocket.send(event.data);
}
// Binary audio from agent (MP3) — we would need to convert to
// 48kHz stereo PCM protobuf for SFU. For now, forward as-is
// and let the client handle playback separately.
if (
event.data instanceof ArrayBuffer &&
serverSocket.readyState === WebSocket.OPEN
) {
serverSocket.send(event.data);
}
});
// Receive 48kHz stereo PCM from SFU, downsample to 16kHz mono, forward to agent
serverSocket.addEventListener("message", (event) => {
if (event.data instanceof ArrayBuffer) {
// Decode protobuf to extract PCM payload
const payload = extractPayloadFromProtobuf(event.data);
if (!payload || payload.length === 0) return;
// Downsample 48kHz stereo → 16kHz mono
const pcm16k = downsample48kStereoTo16kMono(payload);
// Forward to agent as binary PCM
if (agentWs.readyState === WebSocket.OPEN) {
agentWs.send(pcm16k);
}
}
// Forward text messages (e.g., end_of_speech from client)
if (typeof event.data === "string") {
if (agentWs.readyState === WebSocket.OPEN) {
agentWs.send(event.data);
}
}
});
serverSocket.addEventListener("close", () => {
if (agentWs.readyState === WebSocket.OPEN) {
agentWs.send(JSON.stringify({ type: "end_call" }));
agentWs.close();
}
});
agentWs.addEventListener("close", () => {
if (serverSocket.readyState === WebSocket.OPEN) {
serverSocket.close();
}
});
return new Response(null, { status: 101, webSocket: clientSocket });
}
// WebSocket: Worker sends agent audio TO SFU (for ingest adapter)
if (path === "/sfu/audio-out") {
const upgradeHeader = request.headers.get("Upgrade");
if (!upgradeHeader || upgradeHeader.toLowerCase() !== "websocket") {
return new Response("Expected WebSocket", { status: 426 });
}
const { 0: clientSocket, 1: serverSocket } = new WebSocketPair();
serverSocket.accept();
// This endpoint receives audio from the agent and converts to
// 48kHz stereo protobuf PCM for the SFU ingest adapter.
// For now, this is a placeholder — the agent would need to send
// raw PCM (not MP3) for this to work properly.
serverSocket.addEventListener("message", (event) => {
if (event.data instanceof ArrayBuffer) {
// Assume input is 16kHz mono PCM → upsample to 48kHz stereo
const stereo48k = upsample16kMonoTo48kStereo(event.data);
const protobuf = encodePayloadToProtobuf(stereo48k);
if (serverSocket.readyState === WebSocket.OPEN) {
serverSocket.send(protobuf);
}
}
});
return new Response(null, { status: 101, webSocket: clientSocket });
}
return null;
}