branch:
manager-agent.ts
1446 bytesRaw
import { callable, getAgentByName } from "agents";
import { PlaygroundAgent as Agent } from "../../shared/playground-agent";
import type { FanoutWorkerAgent, WorkerResult } from "./fanout-worker-agent";

export interface ManagerState {
  lastRun: {
    items: string[];
    workerCount: number;
    results: WorkerResult[];
    totalDuration: number;
  } | null;
}

export class ManagerAgent extends Agent<Env, ManagerState> {
  initialState: ManagerState = { lastRun: null };

  @callable({ description: "Fan out items to N workers in parallel" })
  async processItems(
    items: string[],
    workerCount: number
  ): Promise<ManagerState["lastRun"]> {
    const start = Date.now();
    const clamped = Math.max(1, Math.min(workerCount, 4));

    const chunkSize = Math.ceil(items.length / clamped);
    const chunks: string[][] = [];
    for (let i = 0; i < items.length; i += chunkSize) {
      chunks.push(items.slice(i, i + chunkSize));
    }

    const results = await Promise.all(
      chunks.map(async (chunk, i) => {
        const worker = await getAgentByName<Env, FanoutWorkerAgent>(
          this.env.FanoutWorkerAgent,
          `worker-${this.name}-${i}`
        );
        return worker.processChunk(`worker-${i}`, chunk);
      })
    );

    const run = {
      items,
      workerCount: chunks.length,
      results,
      totalDuration: Date.now() - start
    };

    this.setState({ lastRun: run });
    return run;
  }
}