branch:
server.ts
8692 bytesRaw
import {
Agent,
routeAgentRequest,
type Connection,
type WSMessage
} from "agents";
import {
withVoice,
WorkersAIFluxSTT,
WorkersAITTS,
type VoiceTurnContext
} from "@cloudflare/voice";
import { streamText, tool, stepCountIs } from "ai";
import { createWorkersAI } from "workers-ai-provider";
import { z } from "zod";
const VoiceAgent = withVoice(Agent);
const SYSTEM_PROMPT = `You are a helpful voice assistant running on Cloudflare Workers. Keep your responses concise and conversational — you're being spoken aloud, not read. Aim for 1-3 sentences unless the user asks for more detail. Be warm and natural.
You have tools available:
- get_current_time: Tell the user the current date and time
- set_reminder: Set a spoken reminder after a delay (e.g. "remind me in 5 minutes to check the oven")
- get_weather: Check the weather for a location
Use tools when the user's request matches. After calling a tool, incorporate the result naturally into your spoken response.`;
export class MyVoiceAgent extends VoiceAgent<Env> {
// --- Providers ---
// Flux: streaming STT with built-in end-of-turn detection via Workers AI.
// No API key needed — uses the AI binding directly.
streamingStt = new WorkersAIFluxSTT(this.env.AI);
tts = new WorkersAITTS(this.env.AI);
// No VAD needed — Flux has built-in turn detection.
// Client-side silence detection still triggers the pipeline.
// --- Single-speaker enforcement ---
//
// Only one connection can be the active speaker at a time. This prevents
// two browser tabs from capturing audio simultaneously. Other connections
// can still observe transcripts and send text messages.
#activeSpeakerId: string | null = null;
beforeCallStart(connection: Connection): boolean {
if (this.#activeSpeakerId && this.#activeSpeakerId !== connection.id) {
connection.send(
JSON.stringify({
type: "speaker_conflict",
message:
"Another session is currently the active speaker. You can kick them to take over."
})
);
return false;
}
this.#activeSpeakerId = connection.id;
return true;
}
onCallEnd(connection: Connection) {
if (this.#activeSpeakerId === connection.id) {
this.#activeSpeakerId = null;
}
}
onClose(connection: Connection) {
if (this.#activeSpeakerId === connection.id) {
this.#activeSpeakerId = null;
}
}
onMessage(connection: Connection, message: WSMessage) {
// Voice protocol messages are intercepted automatically by the mixin.
// This handler only receives non-voice messages.
if (typeof message === "string") {
try {
const parsed = JSON.parse(message);
if (parsed.type === "kick_speaker") {
this.#handleKick(connection);
return;
}
} catch {
// not JSON
}
}
}
#handleKick(requester: Connection) {
if (!this.#activeSpeakerId) {
// No active speaker — nothing to kick
return;
}
const activeConn = [...this.getConnections()].find(
(c) => c.id === this.#activeSpeakerId
);
if (activeConn) {
// Notify the kicked connection
activeConn.send(
JSON.stringify({
type: "kicked",
message: "Another session has taken over as the active speaker."
})
);
// Force end their call — cleans up server-side state and sends idle
this.forceEndCall(activeConn);
}
this.#activeSpeakerId = null;
// Notify the requester they can now start
requester.send(
JSON.stringify({
type: "speaker_available",
message: "Previous speaker has been disconnected. You can start a call."
})
);
}
// --- Voice agent logic ---
async onTurn(transcript: string, context: VoiceTurnContext) {
const workersAi = createWorkersAI({ binding: this.env.AI });
const result = streamText({
model: workersAi(
"@cf/moonshotai/kimi-k2.5" as Parameters<typeof workersAi>[0],
{ sessionAffinity: this.sessionAffinity }
),
system: SYSTEM_PROMPT,
messages: [
...context.messages.map((m) => ({
role: m.role as "user" | "assistant",
content: m.content
})),
{ role: "user" as const, content: transcript }
],
tools: {
get_current_time: tool({
description:
"Get the current date and time. Use when the user asks what time it is.",
inputSchema: z.object({}),
execute: async () => {
const now = new Date();
return {
time: now.toLocaleTimeString("en-US", {
hour: "2-digit",
minute: "2-digit",
timeZoneName: "short"
}),
date: now.toLocaleDateString("en-US", {
weekday: "long",
year: "numeric",
month: "long",
day: "numeric"
})
};
}
}),
set_reminder: tool({
description:
"Set a reminder that will be spoken aloud after a delay.",
inputSchema: z.object({
message: z
.string()
.describe("The reminder message to speak to the user"),
delay_seconds: z
.number()
.describe("How many seconds from now to trigger the reminder")
}),
execute: async ({
message,
delay_seconds
}: {
message: string;
delay_seconds: number;
}) => {
await this.schedule(delay_seconds, "speakReminder", { message });
const minutes = Math.round(delay_seconds / 60);
const timeLabel =
minutes >= 1
? `${minutes} minute${minutes > 1 ? "s" : ""}`
: `${delay_seconds} seconds`;
return { confirmed: true, message, delay: timeLabel };
}
}),
get_weather: tool({
description:
"Get the current weather for a location. Use when the user asks about the weather.",
inputSchema: z.object({
location: z
.string()
.describe("The city or location to check weather for")
}),
execute: async ({ location }: { location: string }) => {
const conditions = [
"sunny",
"partly cloudy",
"overcast",
"light rain"
];
const condition =
conditions[Math.floor(Math.random() * conditions.length)];
const temp = Math.floor(55 + Math.random() * 35);
return {
location,
temperature: `${temp}°F`,
condition,
note: "Mock data — connect a weather MCP server for real forecasts."
};
}
})
},
stopWhen: stepCountIs(3),
abortSignal: context.signal
});
return result.textStream;
}
async onCallStart(connection: Connection) {
const messageCount =
this.sql<{ count: number }>`
SELECT COUNT(*) as count FROM cf_voice_messages
`[0]?.count ?? 0;
const greeting =
messageCount > 0
? "Welcome back! How can I help you today?"
: "Hi there! I'm your voice assistant. I can answer questions, set reminders, or check the weather. What can I do for you?";
await this.speak(connection, greeting);
}
async speakReminder(payload: { message: string }) {
await this.speakAll(`Reminder: ${payload.message}`);
}
}
// --- SFU integration ---
import { handleSFURequest } from "./sfu";
export default {
async fetch(request: Request, env: Env) {
const url = new URL(request.url);
// SFU routes (proxied API calls + WebSocket endpoints)
if (url.pathname.startsWith("/sfu/")) {
const appId = (env as unknown as Record<string, string>)
.CLOUDFLARE_REALTIME_SFU_APP_ID;
const apiToken = (env as unknown as Record<string, string>)
.CLOUDFLARE_REALTIME_SFU_API_TOKEN;
if (!appId || !apiToken) {
return Response.json(
{ error: "SFU credentials not configured" },
{ status: 500 }
);
}
const sfuResponse = await handleSFURequest(request, {
appId,
apiToken,
agentNamespace: env.MyVoiceAgent as unknown as DurableObjectNamespace
});
if (sfuResponse) return sfuResponse;
}
return (
(await routeAgentRequest(request, env)) ??
new Response("Not found", { status: 404 })
);
}
} satisfies ExportedHandler<Env>;