branch:
server.ts
7224 bytesRaw
import type {
  AgentCard,
  JSONRPCResponse,
  Message,
  Task,
  TaskStatusUpdateEvent
} from "@a2a-js/sdk";
import {
  DefaultRequestHandler,
  JsonRpcTransportHandler,
  type AgentExecutor,
  type ExecutionEventBus,
  type RequestContext,
  type TaskStore
} from "@a2a-js/sdk/server";
import { Agent, getAgentByName } from "agents";
import { generateText } from "ai";
import { createWorkersAI } from "workers-ai-provider";

type Env = {
  AI: Ai;
  MyA2A: DurableObjectNamespace<MyA2A>;
};

const agentCard: AgentCard = {
  capabilities: {
    pushNotifications: false,
    stateTransitionHistory: true,
    streaming: true
  },
  defaultInputModes: ["text"],
  defaultOutputModes: ["text"],
  description:
    "An AI assistant powered by Cloudflare Workers AI, exposed via the A2A protocol.",
  name: "Cloudflare A2A Agent",
  protocolVersion: "0.3.0",
  provider: {
    organization: "Cloudflare",
    url: "https://developers.cloudflare.com/agents"
  },
  skills: [
    {
      description:
        "Chat with an AI assistant powered by Workers AI (GLM 4.7 Flash).",
      examples: [
        "Hello, how are you?",
        "Explain the A2A protocol in simple terms.",
        "Write a haiku about cloud computing."
      ],
      id: "chat",
      name: "Chat",
      tags: ["chat", "ai"]
    }
  ],
  url: "http://localhost:5173/a2a",
  version: "0.1.0"
};

// Task store backed by Durable Object SQLite storage
class DurableObjectTaskStore implements TaskStore {
  constructor(private sql: SqlStorage) {
    this.sql.exec(`
      CREATE TABLE IF NOT EXISTS a2a_tasks (
        id TEXT PRIMARY KEY,
        data TEXT NOT NULL
      )
    `);
  }

  async save(task: Task): Promise<void> {
    this.sql.exec(
      "INSERT OR REPLACE INTO a2a_tasks (id, data) VALUES (?, ?)",
      task.id,
      JSON.stringify(task)
    );
  }

  async load(taskId: string): Promise<Task | undefined> {
    const rows = [
      ...this.sql.exec("SELECT data FROM a2a_tasks WHERE id = ?", taskId)
    ];
    if (rows.length === 0) return undefined;
    return JSON.parse(rows[0].data as string) as Task;
  }
}

// Agent executor that calls Workers AI
class AIAgentExecutor implements AgentExecutor {
  constructor(private getEnv: () => Env) {}

  async execute(
    requestContext: RequestContext,
    eventBus: ExecutionEventBus
  ): Promise<void> {
    const { taskId, contextId, userMessage, task } = requestContext;

    // Publish initial task if new
    if (!task) {
      const initialTask: Task = {
        contextId,
        history: [userMessage],
        id: taskId,
        kind: "task",
        status: {
          state: "submitted",
          timestamp: new Date().toISOString()
        }
      };
      eventBus.publish(initialTask);
    }

    // Working status
    eventBus.publish({
      contextId,
      final: false,
      kind: "status-update",
      status: {
        state: "working",
        timestamp: new Date().toISOString()
      },
      taskId
    } as TaskStatusUpdateEvent);

    // Extract user text
    const userText = userMessage.parts
      .filter((p) => p.kind === "text")
      .map((p) => (p as { kind: "text"; text: string }).text)
      .join("");

    // Call Workers AI
    const workersai = createWorkersAI({ binding: this.getEnv().AI });
    const result = await generateText({
      model: workersai("@cf/moonshotai/kimi-k2.5"),
      system:
        "You are a helpful AI assistant. Keep responses concise and clear.",
      messages: [{ role: "user", content: userText }]
    });

    // Publish agent response
    const responseMessage: Message = {
      contextId,
      kind: "message",
      messageId: crypto.randomUUID(),
      parts: [{ kind: "text", text: result.text }],
      role: "agent",
      taskId
    };
    eventBus.publish(responseMessage);

    // Completed
    eventBus.publish({
      contextId,
      final: true,
      kind: "status-update",
      status: {
        message: responseMessage,
        state: "completed",
        timestamp: new Date().toISOString()
      },
      taskId
    } as TaskStatusUpdateEvent);

    eventBus.finished();
  }

  cancelTask = async (): Promise<void> => {};
}

function isAsyncIterable(value: unknown): value is AsyncIterable<unknown> {
  return (
    value != null &&
    typeof (value as Record<symbol, unknown>)[Symbol.asyncIterator] ===
      "function"
  );
}

export class MyA2A extends Agent<Env> {
  private handler: DefaultRequestHandler;
  private transport: JsonRpcTransportHandler;

  constructor(ctx: DurableObjectState, env: Env) {
    super(ctx, env);

    const taskStore = new DurableObjectTaskStore(ctx.storage.sql);
    const executor = new AIAgentExecutor(() => this.env);

    this.handler = new DefaultRequestHandler(agentCard, taskStore, executor);
    this.transport = new JsonRpcTransportHandler(this.handler);
  }

  async onRequest(request: Request): Promise<Response> {
    const url = new URL(request.url);

    // Agent card discovery
    if (
      url.pathname === "/.well-known/agent-card.json" ||
      url.pathname === "/.well-known/agent.json"
    ) {
      const card = await this.handler.getAgentCard();
      return Response.json(card, {
        headers: { "Access-Control-Allow-Origin": "*" }
      });
    }

    // CORS preflight
    if (request.method === "OPTIONS") {
      return new Response(null, {
        headers: {
          "Access-Control-Allow-Origin": "*",
          "Access-Control-Allow-Methods": "POST, OPTIONS",
          "Access-Control-Allow-Headers": "Content-Type"
        }
      });
    }

    // A2A JSON-RPC endpoint
    if (request.method === "POST") {
      const body = await request.json();
      const result = await this.transport.handle(body);

      if (isAsyncIterable(result)) {
        const stream = result as AsyncGenerator<
          JSONRPCResponse,
          void,
          undefined
        >;
        const { readable, writable } = new TransformStream();
        const writer = writable.getWriter();
        const encoder = new TextEncoder();

        (async () => {
          try {
            for await (const event of stream) {
              await writer.write(
                encoder.encode(
                  `id: ${Date.now()}\ndata: ${JSON.stringify(event)}\n\n`
                )
              );
            }
          } finally {
            await writer.close();
          }
        })();

        return new Response(readable, {
          headers: {
            "Access-Control-Allow-Origin": "*",
            "Cache-Control": "no-cache",
            "Content-Type": "text/event-stream"
          }
        });
      }

      return Response.json(result, {
        headers: { "Access-Control-Allow-Origin": "*" }
      });
    }

    return new Response("Not Found", { status: 404 });
  }
}

export default {
  async fetch(request: Request, env: Env) {
    const url = new URL(request.url);

    // Route A2A endpoints to the Durable Object
    if (url.pathname.startsWith("/.well-known/") || url.pathname === "/a2a") {
      const agent = await getAgentByName(env.MyA2A, "default");
      return agent.fetch(request);
    }

    return new Response("Not found", { status: 404 });
  }
} satisfies ExportedHandler<Env>;