import { Agent, routeAgentRequest, type Connection, type WSMessage } from "agents"; import { withVoice, WorkersAIFluxSTT, WorkersAITTS, type VoiceTurnContext } from "@cloudflare/voice"; import { streamText, tool, stepCountIs } from "ai"; import { createWorkersAI } from "workers-ai-provider"; import { z } from "zod"; const VoiceAgent = withVoice(Agent); const SYSTEM_PROMPT = `You are a helpful voice assistant running on Cloudflare Workers. Keep your responses concise and conversational — you're being spoken aloud, not read. Aim for 1-3 sentences unless the user asks for more detail. Be warm and natural. You have tools available: - get_current_time: Tell the user the current date and time - set_reminder: Set a spoken reminder after a delay (e.g. "remind me in 5 minutes to check the oven") - get_weather: Check the weather for a location Use tools when the user's request matches. After calling a tool, incorporate the result naturally into your spoken response.`; export class MyVoiceAgent extends VoiceAgent { // --- Providers --- // Flux: streaming STT with built-in end-of-turn detection via Workers AI. // No API key needed — uses the AI binding directly. streamingStt = new WorkersAIFluxSTT(this.env.AI); tts = new WorkersAITTS(this.env.AI); // No VAD needed — Flux has built-in turn detection. // Client-side silence detection still triggers the pipeline. // --- Single-speaker enforcement --- // // Only one connection can be the active speaker at a time. This prevents // two browser tabs from capturing audio simultaneously. Other connections // can still observe transcripts and send text messages. #activeSpeakerId: string | null = null; beforeCallStart(connection: Connection): boolean { if (this.#activeSpeakerId && this.#activeSpeakerId !== connection.id) { connection.send( JSON.stringify({ type: "speaker_conflict", message: "Another session is currently the active speaker. You can kick them to take over." }) ); return false; } this.#activeSpeakerId = connection.id; return true; } onCallEnd(connection: Connection) { if (this.#activeSpeakerId === connection.id) { this.#activeSpeakerId = null; } } onClose(connection: Connection) { if (this.#activeSpeakerId === connection.id) { this.#activeSpeakerId = null; } } onMessage(connection: Connection, message: WSMessage) { // Voice protocol messages are intercepted automatically by the mixin. // This handler only receives non-voice messages. if (typeof message === "string") { try { const parsed = JSON.parse(message); if (parsed.type === "kick_speaker") { this.#handleKick(connection); return; } } catch { // not JSON } } } #handleKick(requester: Connection) { if (!this.#activeSpeakerId) { // No active speaker — nothing to kick return; } const activeConn = [...this.getConnections()].find( (c) => c.id === this.#activeSpeakerId ); if (activeConn) { // Notify the kicked connection activeConn.send( JSON.stringify({ type: "kicked", message: "Another session has taken over as the active speaker." }) ); // Force end their call — cleans up server-side state and sends idle this.forceEndCall(activeConn); } this.#activeSpeakerId = null; // Notify the requester they can now start requester.send( JSON.stringify({ type: "speaker_available", message: "Previous speaker has been disconnected. You can start a call." }) ); } // --- Voice agent logic --- async onTurn(transcript: string, context: VoiceTurnContext) { const workersAi = createWorkersAI({ binding: this.env.AI }); const result = streamText({ model: workersAi( "@cf/moonshotai/kimi-k2.5" as Parameters[0], { sessionAffinity: this.sessionAffinity } ), system: SYSTEM_PROMPT, messages: [ ...context.messages.map((m) => ({ role: m.role as "user" | "assistant", content: m.content })), { role: "user" as const, content: transcript } ], tools: { get_current_time: tool({ description: "Get the current date and time. Use when the user asks what time it is.", inputSchema: z.object({}), execute: async () => { const now = new Date(); return { time: now.toLocaleTimeString("en-US", { hour: "2-digit", minute: "2-digit", timeZoneName: "short" }), date: now.toLocaleDateString("en-US", { weekday: "long", year: "numeric", month: "long", day: "numeric" }) }; } }), set_reminder: tool({ description: "Set a reminder that will be spoken aloud after a delay.", inputSchema: z.object({ message: z .string() .describe("The reminder message to speak to the user"), delay_seconds: z .number() .describe("How many seconds from now to trigger the reminder") }), execute: async ({ message, delay_seconds }: { message: string; delay_seconds: number; }) => { await this.schedule(delay_seconds, "speakReminder", { message }); const minutes = Math.round(delay_seconds / 60); const timeLabel = minutes >= 1 ? `${minutes} minute${minutes > 1 ? "s" : ""}` : `${delay_seconds} seconds`; return { confirmed: true, message, delay: timeLabel }; } }), get_weather: tool({ description: "Get the current weather for a location. Use when the user asks about the weather.", inputSchema: z.object({ location: z .string() .describe("The city or location to check weather for") }), execute: async ({ location }: { location: string }) => { const conditions = [ "sunny", "partly cloudy", "overcast", "light rain" ]; const condition = conditions[Math.floor(Math.random() * conditions.length)]; const temp = Math.floor(55 + Math.random() * 35); return { location, temperature: `${temp}°F`, condition, note: "Mock data — connect a weather MCP server for real forecasts." }; } }) }, stopWhen: stepCountIs(3), abortSignal: context.signal }); return result.textStream; } async onCallStart(connection: Connection) { const messageCount = this.sql<{ count: number }>` SELECT COUNT(*) as count FROM cf_voice_messages `[0]?.count ?? 0; const greeting = messageCount > 0 ? "Welcome back! How can I help you today?" : "Hi there! I'm your voice assistant. I can answer questions, set reminders, or check the weather. What can I do for you?"; await this.speak(connection, greeting); } async speakReminder(payload: { message: string }) { await this.speakAll(`Reminder: ${payload.message}`); } } // --- SFU integration --- import { handleSFURequest } from "./sfu"; export default { async fetch(request: Request, env: Env) { const url = new URL(request.url); // SFU routes (proxied API calls + WebSocket endpoints) if (url.pathname.startsWith("/sfu/")) { const appId = (env as unknown as Record) .CLOUDFLARE_REALTIME_SFU_APP_ID; const apiToken = (env as unknown as Record) .CLOUDFLARE_REALTIME_SFU_API_TOKEN; if (!appId || !apiToken) { return Response.json( { error: "SFU credentials not configured" }, { status: 500 } ); } const sfuResponse = await handleSFURequest(request, { appId, apiToken, agentNamespace: env.MyVoiceAgent as unknown as DurableObjectNamespace }); if (sfuResponse) return sfuResponse; } return ( (await routeAgentRequest(request, env)) ?? new Response("Not found", { status: 404 }) ); } } satisfies ExportedHandler;