branch:
PipelineDemo.tsx
8436 bytesRaw
import { useAgent } from "agents/react";
import { useState } from "react";
import { Button, Input, Surface, Text } from "@cloudflare/kumo";
import { DemoWrapper } from "../../layout";
import {
  LogPanel,
  ConnectionStatus,
  CodeExplanation,
  HighlightedJson,
  type CodeSection
} from "../../components";
import { useLogs, useUserId } from "../../hooks";
import type {
  PipelineOrchestratorAgent,
  PipelineState,
  PipelineResult
} from "./pipeline-agent";
import type { StageResult } from "./stage-agents";

const codeSections: CodeSection[] = [
  {
    title: "Chain agents into a pipeline",
    description:
      "Each stage processes data and passes it to the next agent via getAgentByName(). An orchestrator agent drives the pipeline and collects results from each stage.",
    code: `import { Agent, callable, getAgentByName } from "agents";

class PipelineOrchestratorAgent extends Agent<Env> {
  @callable()
  async runPipeline(input: string) {
    const stages = [];

    const validator = await getAgentByName(
      this.env.ValidatorStageAgent, "validator"
    );
    const validated = await validator.process(input);
    stages.push(validated);

    const transformer = await getAgentByName(
      this.env.TransformStageAgent, "transformer"
    );
    const transformed = await transformer.process(validated.output);
    stages.push(transformed);

    const enricher = await getAgentByName(
      this.env.EnrichStageAgent, "enricher"
    );
    const enriched = await enricher.process(transformed.output);
    stages.push(enriched);

    return { input, stages };
  }
}`
  },
  {
    title: "Stage agents are plain Durable Objects",
    description:
      "Each stage is a separate agent with a process() method. No @callable needed — the orchestrator calls them directly via Durable Object RPC. Each stage is isolated and independently scalable.",
    code: `class ValidatorStageAgent extends Agent<Env> {
  async process(input: string) {
    const trimmed = input.trim();
    if (!trimmed) throw new Error("Input cannot be empty");
    return {
      stage: "validate",
      input,
      output: { trimmed, lowercased: trimmed.toLowerCase() },
    };
  }
}

class TransformStageAgent extends Agent<Env> {
  async process(validated: { trimmed: string }) {
    return {
      stage: "transform",
      output: {
        uppercase: validated.trimmed.toUpperCase(),
        reversed: validated.trimmed.split("").reverse().join(""),
      },
    };
  }
}`
  }
];

const PRESETS = [
  "The quick brown fox jumps over the lazy dog",
  "Cloudflare Workers run at the edge",
  "Hello World from the Agents SDK"
];

const STAGE_LABELS: Record<string, string> = {
  validate: "Validate",
  transform: "Transform",
  enrich: "Enrich"
};

function StageCard({ stage }: { stage: StageResult }) {
  return (
    <div className="border border-kumo-line rounded p-3">
      <div className="flex items-center justify-between mb-2">
        <span className="text-sm font-semibold text-kumo-default">
          {STAGE_LABELS[stage.stage] || stage.stage}
        </span>
        <span className="text-xs text-kumo-inactive">{stage.duration}ms</span>
      </div>
      <HighlightedJson data={stage.output} />
    </div>
  );
}

export function PipelineDemo() {
  const userId = useUserId();
  const { logs, addLog, clearLogs } = useLogs();
  const [input, setInput] = useState(PRESETS[0]);
  const [isRunning, setIsRunning] = useState(false);
  const [lastRun, setLastRun] = useState<PipelineResult | null>(null);

  const agent = useAgent<PipelineOrchestratorAgent, PipelineState>({
    agent: "pipeline-orchestrator-agent",
    name: `pipeline-demo-${userId}`,
    onOpen: () => addLog("info", "connected"),
    onClose: () => addLog("info", "disconnected"),
    onError: () => addLog("error", "error", "Connection error"),
    onStateUpdate: (newState) => {
      if (newState?.lastRun) setLastRun(newState.lastRun);
    }
  });

  const handleRun = async () => {
    if (!input.trim()) return;
    setIsRunning(true);
    addLog("out", "runPipeline", { input });

    try {
      const result = await agent.call("runPipeline", [input]);
      const typed = result as PipelineResult;
      addLog("in", "result", {
        stages: typed.stages.length,
        totalMs: typed.totalDuration
      });
      setLastRun(typed);
    } catch (e) {
      addLog("error", "error", e instanceof Error ? e.message : String(e));
    } finally {
      setIsRunning(false);
    }
  };

  return (
    <DemoWrapper
      title="Pipeline Pattern"
      description={
        <>
          Data flows through a chain of agents, each performing a specific
          transformation and passing the result to the next stage via{" "}
          <code className="text-xs bg-kumo-fill px-1 py-0.5 rounded">
            getAgentByName()
          </code>
          . Each stage is a separate Durable Object, so stages are isolated and
          independently scalable. Enter some text and run it through the
          validate, transform, enrich pipeline.
        </>
      }
      statusIndicator={
        <ConnectionStatus
          status={
            agent.readyState === WebSocket.OPEN ? "connected" : "connecting"
          }
        />
      }
    >
      <div className="grid grid-cols-1 lg:grid-cols-2 gap-6">
        {/* Controls */}
        <div className="space-y-6">
          {/* Input */}
          <Surface className="p-4 rounded-lg ring ring-kumo-line">
            <div className="mb-4">
              <Text variant="heading3">Pipeline Input</Text>
            </div>
            <Input
              aria-label="Text to process"
              type="text"
              value={input}
              onChange={(e: React.ChangeEvent<HTMLInputElement>) =>
                setInput(e.target.value)
              }
              onKeyDown={(e: React.KeyboardEvent) =>
                e.key === "Enter" && handleRun()
              }
              className="w-full mb-3"
              placeholder="Enter text to process..."
            />
            <div className="flex flex-wrap gap-2 mb-4">
              {PRESETS.map((preset, i) => (
                <Button
                  key={i}
                  variant="ghost"
                  size="xs"
                  onClick={() => setInput(preset)}
                >
                  Preset {i + 1}
                </Button>
              ))}
            </div>
            <Button
              variant="primary"
              onClick={handleRun}
              disabled={isRunning || !input.trim()}
              className="w-full"
            >
              {isRunning ? "Running Pipeline..." : "Run Pipeline"}
            </Button>
          </Surface>

          {/* Pipeline visualization */}
          <Surface className="p-4 rounded-lg ring ring-kumo-line">
            <div className="flex items-center justify-between mb-4">
              <Text variant="heading3">Pipeline Stages</Text>
              {lastRun && (
                <span className="text-xs text-kumo-subtle">
                  {lastRun.totalDuration}ms total
                </span>
              )}
            </div>

            {/* Stage flow diagram */}
            <div className="flex items-center justify-center gap-2 mb-4 text-xs text-kumo-subtle">
              <span className="bg-kumo-control px-2 py-1 rounded text-kumo-default">
                Validate
              </span>
              <span>→</span>
              <span className="bg-kumo-control px-2 py-1 rounded text-kumo-default">
                Transform
              </span>
              <span>→</span>
              <span className="bg-kumo-control px-2 py-1 rounded text-kumo-default">
                Enrich
              </span>
            </div>

            {lastRun ? (
              <div className="space-y-3">
                {lastRun.stages.map((stage) => (
                  <StageCard key={stage.stage} stage={stage} />
                ))}
              </div>
            ) : (
              <p className="text-sm text-kumo-inactive text-center py-4">
                Run the pipeline to see stage results
              </p>
            )}
          </Surface>
        </div>

        {/* Logs */}
        <div className="space-y-6">
          <LogPanel logs={logs} onClear={clearLogs} maxHeight="600px" />
        </div>
      </div>

      <CodeExplanation sections={codeSections} />
    </DemoWrapper>
  );
}