/** * Gatekeeper Example — with Durable Object Facets * * The CustomerDatabase is a FACET — a child Durable Object created by the * GatekeeperAgent via ctx.facets.get(). It has its own isolated SQLite * that the parent cannot access directly. The parent can only call the * facet's RPC methods: query(), execute(), getAllCustomers(). * * This makes the approval queue structurally enforceable: the agent * literally cannot bypass it because it has no path to the customer data * except through the facet stub. * * Requires the "experimental" compatibility flag for ctx.facets and * ctx.exports. */ import { createWorkersAI } from "workers-ai-provider"; import { Agent, routeAgentRequest, callable } from "agents"; import { AIChatAgent } from "@cloudflare/ai-chat"; import { streamText, convertToModelMessages, tool, stepCountIs } from "ai"; import { z } from "zod"; // ───────────────────────────────────────────────────────────────────────────── // Types shared between server and client // ───────────────────────────────────────────────────────────────────────────── export type ActionEntry = { id: number; type: "action" | "observation"; title: string; description: string; sql: string; state: "pending" | "approved" | "rejected" | "reverted"; canRevert: boolean; revertSql: string | null; createdAt: string; resolvedAt: string | null; }; export type GatekeeperState = { actions: ActionEntry[]; customers: CustomerRecord[]; }; export type CustomerRecord = { id: number; name: string; email: string; tier: string; region: string; }; // ───────────────────────────────────────────────────────────────────────────── // CustomerDatabase — SubAgent (isolated storage) // // NOT listed in wrangler.jsonc bindings or migrations. Instantiated by the // parent GatekeeperAgent via this.subAgent(CustomerDatabase, "database"). // ───────────────────────────────────────────────────────────────────────────── export class CustomerDatabase extends Agent { onStart() { this.sql` CREATE TABLE IF NOT EXISTS customers ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, email TEXT NOT NULL, tier TEXT NOT NULL DEFAULT 'Bronze', region TEXT NOT NULL DEFAULT 'Unknown' ) `; const rows = this.sql<{ cnt: number; }>`SELECT COUNT(*) as cnt FROM customers`; if (rows[0].cnt === 0) { this.sql`INSERT INTO customers (name, email, tier, region) VALUES ('Alice Chen', 'alice@example.com', 'Gold', 'West'), ('Bob Martinez', 'bob@example.com', 'Silver', 'East'), ('Carol Johnson', 'carol@example.com', 'Bronze', 'West'), ('Dave Kim', 'dave@example.com', 'Gold', 'Central'), ('Eve Williams', 'eve@example.com', 'Silver', 'East'), ('Frank Brown', 'frank@example.com', 'Bronze', 'West'), ('Grace Lee', 'grace@example.com', 'Gold', 'Central'), ('Hank Davis', 'hank@example.com', 'Silver', 'East') `; } } query(sqlText: string): Record[] { return [...this.ctx.storage.sql.exec(sqlText).toArray()] as Record< string, unknown >[]; } execute(sqlText: string): { success: boolean } { this.ctx.storage.sql.exec(sqlText); return { success: true }; } getAllCustomers(): CustomerRecord[] { return this.sql` SELECT id, name, email, tier, region FROM customers ORDER BY id `; } } // ───────────────────────────────────────────────────────────────────────────── // The Gatekeeper Agent // ───────────────────────────────────────────────────────────────────────────── export class GatekeeperAgent extends AIChatAgent { initialState: GatekeeperState = { actions: [], customers: [] }; async onStart() { this._initQueue(); await this._syncState(); } // ─── Sub-agent access ─────────────────────────────────────────────── private _getDb() { return this.subAgent(CustomerDatabase, "database"); } // ─── Queue table ───────────────────────────────────────────────────── private _initQueue() { this.sql` CREATE TABLE IF NOT EXISTS action_queue ( id INTEGER PRIMARY KEY AUTOINCREMENT, type TEXT NOT NULL DEFAULT 'action', title TEXT NOT NULL, description TEXT NOT NULL, sql_statement TEXT NOT NULL, state TEXT NOT NULL DEFAULT 'pending', can_revert INTEGER NOT NULL DEFAULT 0, revert_sql TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')), resolved_at TEXT ) `; } // ─── State sync ────────────────────────────────────────────────────── private async _syncState() { const actions = this.sql` SELECT id, type, title, description, sql_statement as sql, state, can_revert as canRevert, revert_sql as revertSql, created_at as createdAt, resolved_at as resolvedAt FROM action_queue ORDER BY id DESC `; const db = await this._getDb(); const customers = await db.getAllCustomers(); this.setState({ actions: actions.map((a) => ({ ...a, canRevert: Boolean(a.canRevert) })), customers }); } // ─── Gatekeeper API ────────────────────────────────────────────────── private async _logObservation(title: string, desc: string, sql: string) { this.sql` INSERT INTO action_queue (type, title, description, sql_statement, state, can_revert) VALUES ('observation', ${title}, ${desc}, ${sql}, 'approved', 0) `; await this._syncState(); } private async _submitAction( title: string, desc: string, sql: string, revertSql: string | null ): Promise { this.sql` INSERT INTO action_queue (type, title, description, sql_statement, state, can_revert, revert_sql) VALUES ('action', ${title}, ${desc}, ${sql}, 'pending', ${revertSql ? 1 : 0}, ${revertSql}) `; const result = this.sql<{ id: number }>`SELECT last_insert_rowid() as id`; await this._syncState(); return result[0].id; } @callable() async approveAction(actionId: number) { const rows = this.sql<{ sql_statement: string; state: string }>` SELECT sql_statement, state FROM action_queue WHERE id = ${actionId} `; if (rows.length === 0) throw new Error(`Action ${actionId} not found`); if (rows[0].state !== "pending") throw new Error(`Not pending`); const db = await this._getDb(); await db.execute(rows[0].sql_statement); this.sql` UPDATE action_queue SET state = 'approved', resolved_at = datetime('now') WHERE id = ${actionId} `; await this._syncState(); } @callable() async rejectAction(actionId: number) { const rows = this.sql<{ state: string }>` SELECT state FROM action_queue WHERE id = ${actionId} `; if (rows.length === 0) throw new Error(`Action ${actionId} not found`); if (rows[0].state !== "pending") throw new Error(`Not pending`); this.sql` UPDATE action_queue SET state = 'rejected', resolved_at = datetime('now') WHERE id = ${actionId} `; await this._syncState(); } @callable() async revertAction(actionId: number) { const rows = this.sql<{ state: string; revert_sql: string | null; can_revert: number; }>` SELECT state, revert_sql, can_revert FROM action_queue WHERE id = ${actionId} `; if (rows.length === 0) throw new Error(`Action ${actionId} not found`); if (rows[0].state !== "approved") throw new Error(`Not approved`); if (!rows[0].can_revert || !rows[0].revert_sql) throw new Error(`Not revertable`); const db = await this._getDb(); await db.execute(rows[0].revert_sql); this.sql` UPDATE action_queue SET state = 'reverted', resolved_at = datetime('now') WHERE id = ${actionId} `; await this._syncState(); } // ─── Chat ──────────────────────────────────────────────────────────── async onChatMessage() { const workersai = createWorkersAI({ binding: this.env.AI }); const agent = this; const result = streamText({ model: workersai("@cf/moonshotai/kimi-k2.5", { sessionAffinity: this.sessionAffinity }), system: `You are a helpful database administrator assistant. You manage a customer database. You can query the database freely — reads are always allowed. But any changes to the data (INSERT, UPDATE, DELETE) will be submitted for human approval before they execute. This is the "Gatekeeper" pattern: you propose changes, a human reviews them. The database has a "customers" table with columns: id, name, email, tier (Bronze/Silver/Gold), region (West/East/Central). When proposing changes: - Be specific about what will change and why - Generate correct SQL — the human will see the exact query - For UPDATEs, mention how many rows will be affected - For DELETEs, warn about data loss Important: After submitting an action for approval, tell the user it's been queued and they can approve or reject it in the action panel. Don't say it's been done — it hasn't happened yet.`, messages: await convertToModelMessages(this.messages), tools: { queryDatabase: tool({ description: "Query the customer database with a SELECT statement. " + "Reads are always allowed and auto-logged.", inputSchema: z.object({ sql: z.string().describe("A SELECT query"), description: z.string().describe("What this query looks for") }), execute: async ({ sql, description }) => { const trimmed = sql.trim().toUpperCase(); if (!trimmed.startsWith("SELECT")) { return { error: "Only SELECT allowed. Use mutateDatabase for writes." }; } await agent._logObservation( `Query: ${description}`, description, sql ); try { const db = await agent._getDb(); const results = await db.query(sql); return { sql, rowCount: results.length, rows: results }; } catch (err) { return { error: `SQL error: ${err}` }; } } }), mutateDatabase: tool({ description: "Propose a change (INSERT, UPDATE, DELETE). " + "Queued for human approval — will NOT execute immediately.", inputSchema: z.object({ title: z.string().describe("Short title for the action"), description: z.string().describe("What this change does"), sql: z.string().describe("The SQL mutation"), revertSql: z .string() .nullable() .describe("SQL to undo this action, or null if not revertable") }), execute: async ({ title, description, sql, revertSql }) => { const trimmed = sql.trim().toUpperCase(); if (trimmed.startsWith("SELECT")) { return { error: "Use queryDatabase for SELECTs." }; } const actionId = await agent._submitAction( title, description, sql, revertSql ); return { actionId, status: "pending", message: `Action #${actionId} queued for approval.` }; } }) }, stopWhen: stepCountIs(5) }); return result.toUIMessageStreamResponse(); } } // ───────────────────────────────────────────────────────────────────────────── export default { async fetch(request: Request, env: Env) { return ( (await routeAgentRequest(request, env)) || new Response("Not found", { status: 404 }) ); } } satisfies ExportedHandler;