import type { AgentCard, JSONRPCResponse, Message, Task, TaskStatusUpdateEvent } from "@a2a-js/sdk"; import { DefaultRequestHandler, JsonRpcTransportHandler, type AgentExecutor, type ExecutionEventBus, type RequestContext, type TaskStore } from "@a2a-js/sdk/server"; import { Agent, getAgentByName } from "agents"; import { generateText } from "ai"; import { createWorkersAI } from "workers-ai-provider"; type Env = { AI: Ai; MyA2A: DurableObjectNamespace; }; const agentCard: AgentCard = { capabilities: { pushNotifications: false, stateTransitionHistory: true, streaming: true }, defaultInputModes: ["text"], defaultOutputModes: ["text"], description: "An AI assistant powered by Cloudflare Workers AI, exposed via the A2A protocol.", name: "Cloudflare A2A Agent", protocolVersion: "0.3.0", provider: { organization: "Cloudflare", url: "https://developers.cloudflare.com/agents" }, skills: [ { description: "Chat with an AI assistant powered by Workers AI (GLM 4.7 Flash).", examples: [ "Hello, how are you?", "Explain the A2A protocol in simple terms.", "Write a haiku about cloud computing." ], id: "chat", name: "Chat", tags: ["chat", "ai"] } ], url: "http://localhost:5173/a2a", version: "0.1.0" }; // Task store backed by Durable Object SQLite storage class DurableObjectTaskStore implements TaskStore { constructor(private sql: SqlStorage) { this.sql.exec(` CREATE TABLE IF NOT EXISTS a2a_tasks ( id TEXT PRIMARY KEY, data TEXT NOT NULL ) `); } async save(task: Task): Promise { this.sql.exec( "INSERT OR REPLACE INTO a2a_tasks (id, data) VALUES (?, ?)", task.id, JSON.stringify(task) ); } async load(taskId: string): Promise { const rows = [ ...this.sql.exec("SELECT data FROM a2a_tasks WHERE id = ?", taskId) ]; if (rows.length === 0) return undefined; return JSON.parse(rows[0].data as string) as Task; } } // Agent executor that calls Workers AI class AIAgentExecutor implements AgentExecutor { constructor(private getEnv: () => Env) {} async execute( requestContext: RequestContext, eventBus: ExecutionEventBus ): Promise { const { taskId, contextId, userMessage, task } = requestContext; // Publish initial task if new if (!task) { const initialTask: Task = { contextId, history: [userMessage], id: taskId, kind: "task", status: { state: "submitted", timestamp: new Date().toISOString() } }; eventBus.publish(initialTask); } // Working status eventBus.publish({ contextId, final: false, kind: "status-update", status: { state: "working", timestamp: new Date().toISOString() }, taskId } as TaskStatusUpdateEvent); // Extract user text const userText = userMessage.parts .filter((p) => p.kind === "text") .map((p) => (p as { kind: "text"; text: string }).text) .join(""); // Call Workers AI const workersai = createWorkersAI({ binding: this.getEnv().AI }); const result = await generateText({ model: workersai("@cf/moonshotai/kimi-k2.5"), system: "You are a helpful AI assistant. Keep responses concise and clear.", messages: [{ role: "user", content: userText }] }); // Publish agent response const responseMessage: Message = { contextId, kind: "message", messageId: crypto.randomUUID(), parts: [{ kind: "text", text: result.text }], role: "agent", taskId }; eventBus.publish(responseMessage); // Completed eventBus.publish({ contextId, final: true, kind: "status-update", status: { message: responseMessage, state: "completed", timestamp: new Date().toISOString() }, taskId } as TaskStatusUpdateEvent); eventBus.finished(); } cancelTask = async (): Promise => {}; } function isAsyncIterable(value: unknown): value is AsyncIterable { return ( value != null && typeof (value as Record)[Symbol.asyncIterator] === "function" ); } export class MyA2A extends Agent { private handler: DefaultRequestHandler; private transport: JsonRpcTransportHandler; constructor(ctx: DurableObjectState, env: Env) { super(ctx, env); const taskStore = new DurableObjectTaskStore(ctx.storage.sql); const executor = new AIAgentExecutor(() => this.env); this.handler = new DefaultRequestHandler(agentCard, taskStore, executor); this.transport = new JsonRpcTransportHandler(this.handler); } async onRequest(request: Request): Promise { const url = new URL(request.url); // Agent card discovery if ( url.pathname === "/.well-known/agent-card.json" || url.pathname === "/.well-known/agent.json" ) { const card = await this.handler.getAgentCard(); return Response.json(card, { headers: { "Access-Control-Allow-Origin": "*" } }); } // CORS preflight if (request.method === "OPTIONS") { return new Response(null, { headers: { "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "POST, OPTIONS", "Access-Control-Allow-Headers": "Content-Type" } }); } // A2A JSON-RPC endpoint if (request.method === "POST") { const body = await request.json(); const result = await this.transport.handle(body); if (isAsyncIterable(result)) { const stream = result as AsyncGenerator< JSONRPCResponse, void, undefined >; const { readable, writable } = new TransformStream(); const writer = writable.getWriter(); const encoder = new TextEncoder(); (async () => { try { for await (const event of stream) { await writer.write( encoder.encode( `id: ${Date.now()}\ndata: ${JSON.stringify(event)}\n\n` ) ); } } finally { await writer.close(); } })(); return new Response(readable, { headers: { "Access-Control-Allow-Origin": "*", "Cache-Control": "no-cache", "Content-Type": "text/event-stream" } }); } return Response.json(result, { headers: { "Access-Control-Allow-Origin": "*" } }); } return new Response("Not Found", { status: 404 }); } } export default { async fetch(request: Request, env: Env) { const url = new URL(request.url); // Route A2A endpoints to the Durable Object if (url.pathname.startsWith("/.well-known/") || url.pathname === "/a2a") { const agent = await getAgentByName(env.MyA2A, "default"); return agent.fetch(request); } return new Response("Not found", { status: 404 }); } } satisfies ExportedHandler;