branch:
server.ts
13555 bytesRaw
/**
 * Gatekeeper Example — with Durable Object Facets
 *
 * The CustomerDatabase is a FACET — a child Durable Object created by the
 * GatekeeperAgent via ctx.facets.get(). It has its own isolated SQLite
 * that the parent cannot access directly. The parent can only call the
 * facet's RPC methods: query(), execute(), getAllCustomers().
 *
 * This makes the approval queue structurally enforceable: the agent
 * literally cannot bypass it because it has no path to the customer data
 * except through the facet stub.
 *
 * Requires the "experimental" compatibility flag for ctx.facets and
 * ctx.exports.
 */

import { createWorkersAI } from "workers-ai-provider";
import { Agent, routeAgentRequest, callable } from "agents";
import { AIChatAgent } from "@cloudflare/ai-chat";
import { streamText, convertToModelMessages, tool, stepCountIs } from "ai";
import { z } from "zod";

// ─────────────────────────────────────────────────────────────────────────────
// Types shared between server and client
// ─────────────────────────────────────────────────────────────────────────────

export type ActionEntry = {
  id: number;
  type: "action" | "observation";
  title: string;
  description: string;
  sql: string;
  state: "pending" | "approved" | "rejected" | "reverted";
  canRevert: boolean;
  revertSql: string | null;
  createdAt: string;
  resolvedAt: string | null;
};

export type GatekeeperState = {
  actions: ActionEntry[];
  customers: CustomerRecord[];
};

export type CustomerRecord = {
  id: number;
  name: string;
  email: string;
  tier: string;
  region: string;
};

// ─────────────────────────────────────────────────────────────────────────────
// CustomerDatabase — SubAgent (isolated storage)
//
// NOT listed in wrangler.jsonc bindings or migrations. Instantiated by the
// parent GatekeeperAgent via this.subAgent(CustomerDatabase, "database").
// ─────────────────────────────────────────────────────────────────────────────

export class CustomerDatabase extends Agent<Env> {
  onStart() {
    this.sql`
      CREATE TABLE IF NOT EXISTS customers (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        name TEXT NOT NULL,
        email TEXT NOT NULL,
        tier TEXT NOT NULL DEFAULT 'Bronze',
        region TEXT NOT NULL DEFAULT 'Unknown'
      )
    `;

    const rows = this.sql<{
      cnt: number;
    }>`SELECT COUNT(*) as cnt FROM customers`;
    if (rows[0].cnt === 0) {
      this.sql`INSERT INTO customers (name, email, tier, region) VALUES
        ('Alice Chen', 'alice@example.com', 'Gold', 'West'),
        ('Bob Martinez', 'bob@example.com', 'Silver', 'East'),
        ('Carol Johnson', 'carol@example.com', 'Bronze', 'West'),
        ('Dave Kim', 'dave@example.com', 'Gold', 'Central'),
        ('Eve Williams', 'eve@example.com', 'Silver', 'East'),
        ('Frank Brown', 'frank@example.com', 'Bronze', 'West'),
        ('Grace Lee', 'grace@example.com', 'Gold', 'Central'),
        ('Hank Davis', 'hank@example.com', 'Silver', 'East')
      `;
    }
  }

  query(sqlText: string): Record<string, unknown>[] {
    return [...this.ctx.storage.sql.exec(sqlText).toArray()] as Record<
      string,
      unknown
    >[];
  }

  execute(sqlText: string): { success: boolean } {
    this.ctx.storage.sql.exec(sqlText);
    return { success: true };
  }

  getAllCustomers(): CustomerRecord[] {
    return this.sql<CustomerRecord>`
      SELECT id, name, email, tier, region FROM customers ORDER BY id
    `;
  }
}

// ─────────────────────────────────────────────────────────────────────────────
// The Gatekeeper Agent
// ─────────────────────────────────────────────────────────────────────────────

export class GatekeeperAgent extends AIChatAgent<Env, GatekeeperState> {
  initialState: GatekeeperState = {
    actions: [],
    customers: []
  };

  async onStart() {
    this._initQueue();
    await this._syncState();
  }

  // ─── Sub-agent access ───────────────────────────────────────────────

  private _getDb() {
    return this.subAgent(CustomerDatabase, "database");
  }

  // ─── Queue table ─────────────────────────────────────────────────────

  private _initQueue() {
    this.sql`
      CREATE TABLE IF NOT EXISTS action_queue (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        type TEXT NOT NULL DEFAULT 'action',
        title TEXT NOT NULL,
        description TEXT NOT NULL,
        sql_statement TEXT NOT NULL,
        state TEXT NOT NULL DEFAULT 'pending',
        can_revert INTEGER NOT NULL DEFAULT 0,
        revert_sql TEXT,
        created_at TEXT NOT NULL DEFAULT (datetime('now')),
        resolved_at TEXT
      )
    `;
  }

  // ─── State sync ──────────────────────────────────────────────────────

  private async _syncState() {
    const actions = this.sql<ActionEntry>`
      SELECT id, type, title, description,
             sql_statement as sql, state,
             can_revert as canRevert, revert_sql as revertSql,
             created_at as createdAt, resolved_at as resolvedAt
      FROM action_queue ORDER BY id DESC
    `;

    const db = await this._getDb();
    const customers = await db.getAllCustomers();

    this.setState({
      actions: actions.map((a) => ({ ...a, canRevert: Boolean(a.canRevert) })),
      customers
    });
  }

  // ─── Gatekeeper API ──────────────────────────────────────────────────

  private async _logObservation(title: string, desc: string, sql: string) {
    this.sql`
      INSERT INTO action_queue (type, title, description, sql_statement, state, can_revert)
      VALUES ('observation', ${title}, ${desc}, ${sql}, 'approved', 0)
    `;
    await this._syncState();
  }

  private async _submitAction(
    title: string,
    desc: string,
    sql: string,
    revertSql: string | null
  ): Promise<number> {
    this.sql`
      INSERT INTO action_queue (type, title, description, sql_statement, state, can_revert, revert_sql)
      VALUES ('action', ${title}, ${desc}, ${sql}, 'pending', ${revertSql ? 1 : 0}, ${revertSql})
    `;
    const result = this.sql<{ id: number }>`SELECT last_insert_rowid() as id`;
    await this._syncState();
    return result[0].id;
  }

  @callable()
  async approveAction(actionId: number) {
    const rows = this.sql<{ sql_statement: string; state: string }>`
      SELECT sql_statement, state FROM action_queue WHERE id = ${actionId}
    `;
    if (rows.length === 0) throw new Error(`Action ${actionId} not found`);
    if (rows[0].state !== "pending") throw new Error(`Not pending`);

    const db = await this._getDb();
    await db.execute(rows[0].sql_statement);

    this.sql`
      UPDATE action_queue SET state = 'approved', resolved_at = datetime('now')
      WHERE id = ${actionId}
    `;
    await this._syncState();
  }

  @callable()
  async rejectAction(actionId: number) {
    const rows = this.sql<{ state: string }>`
      SELECT state FROM action_queue WHERE id = ${actionId}
    `;
    if (rows.length === 0) throw new Error(`Action ${actionId} not found`);
    if (rows[0].state !== "pending") throw new Error(`Not pending`);

    this.sql`
      UPDATE action_queue SET state = 'rejected', resolved_at = datetime('now')
      WHERE id = ${actionId}
    `;
    await this._syncState();
  }

  @callable()
  async revertAction(actionId: number) {
    const rows = this.sql<{
      state: string;
      revert_sql: string | null;
      can_revert: number;
    }>`
      SELECT state, revert_sql, can_revert FROM action_queue WHERE id = ${actionId}
    `;
    if (rows.length === 0) throw new Error(`Action ${actionId} not found`);
    if (rows[0].state !== "approved") throw new Error(`Not approved`);
    if (!rows[0].can_revert || !rows[0].revert_sql)
      throw new Error(`Not revertable`);

    const db = await this._getDb();
    await db.execute(rows[0].revert_sql);

    this.sql`
      UPDATE action_queue SET state = 'reverted', resolved_at = datetime('now')
      WHERE id = ${actionId}
    `;
    await this._syncState();
  }

  // ─── Chat ────────────────────────────────────────────────────────────

  async onChatMessage() {
    const workersai = createWorkersAI({ binding: this.env.AI });
    const agent = this;

    const result = streamText({
      model: workersai("@cf/moonshotai/kimi-k2.5", {
        sessionAffinity: this.sessionAffinity
      }),
      system: `You are a helpful database administrator assistant. You manage a customer database.

You can query the database freely — reads are always allowed. But any changes to the data
(INSERT, UPDATE, DELETE) will be submitted for human approval before they execute. This is
the "Gatekeeper" pattern: you propose changes, a human reviews them.

The database has a "customers" table with columns: id, name, email, tier (Bronze/Silver/Gold), region (West/East/Central).

When proposing changes:
- Be specific about what will change and why
- Generate correct SQL — the human will see the exact query
- For UPDATEs, mention how many rows will be affected
- For DELETEs, warn about data loss

Important: After submitting an action for approval, tell the user it's been queued and they
can approve or reject it in the action panel. Don't say it's been done — it hasn't happened yet.`,
      messages: await convertToModelMessages(this.messages),
      tools: {
        queryDatabase: tool({
          description:
            "Query the customer database with a SELECT statement. " +
            "Reads are always allowed and auto-logged.",
          inputSchema: z.object({
            sql: z.string().describe("A SELECT query"),
            description: z.string().describe("What this query looks for")
          }),
          execute: async ({ sql, description }) => {
            const trimmed = sql.trim().toUpperCase();
            if (!trimmed.startsWith("SELECT")) {
              return {
                error: "Only SELECT allowed. Use mutateDatabase for writes."
              };
            }
            await agent._logObservation(
              `Query: ${description}`,
              description,
              sql
            );
            try {
              const db = await agent._getDb();
              const results = await db.query(sql);
              return { sql, rowCount: results.length, rows: results };
            } catch (err) {
              return { error: `SQL error: ${err}` };
            }
          }
        }),

        mutateDatabase: tool({
          description:
            "Propose a change (INSERT, UPDATE, DELETE). " +
            "Queued for human approval — will NOT execute immediately.",
          inputSchema: z.object({
            title: z.string().describe("Short title for the action"),
            description: z.string().describe("What this change does"),
            sql: z.string().describe("The SQL mutation"),
            revertSql: z
              .string()
              .nullable()
              .describe("SQL to undo this action, or null if not revertable")
          }),
          execute: async ({ title, description, sql, revertSql }) => {
            const trimmed = sql.trim().toUpperCase();
            if (trimmed.startsWith("SELECT")) {
              return { error: "Use queryDatabase for SELECTs." };
            }
            const actionId = await agent._submitAction(
              title,
              description,
              sql,
              revertSql
            );
            return {
              actionId,
              status: "pending",
              message: `Action #${actionId} queued for approval.`
            };
          }
        })
      },
      stopWhen: stepCountIs(5)
    });

    return result.toUIMessageStreamResponse();
  }
}

// ─────────────────────────────────────────────────────────────────────────────

export default {
  async fetch(request: Request, env: Env) {
    return (
      (await routeAgentRequest(request, env)) ||
      new Response("Not found", { status: 404 })
    );
  }
} satisfies ExportedHandler<Env>;