branch:
observability.md
10683 bytesRaw
# Observability
Agents emit structured events for every significant operation — RPC calls, state changes, schedule execution, workflow transitions, MCP connections, and more. These events are published to [diagnostics channels](https://developers.cloudflare.com/workers/runtime-apis/nodejs/diagnostics-channel/) and are silent by default (zero overhead when nobody is listening).
## Event structure
Every event has these fields:
```ts
{
type: "rpc", // what happened
agent: "MyAgent", // which agent class emitted it
name: "user-123", // which agent instance (Durable Object name)
payload: { method: "getWeather" }, // details
timestamp: 1758005142787 // when (ms since epoch)
}
```
`agent` and `name` identify the source agent — `agent` is the class name and `name` is the Durable Object instance name.
## Channels
Events are routed to eight named channels based on their type:
| Channel | Event types | Description |
| ------------------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------------------------------- |
| `agents:state` | `state:update` | State sync events |
| `agents:rpc` | `rpc`, `rpc:error` | RPC method calls and failures |
| `agents:message` | `message:request`, `message:response`, `message:clear`, `message:cancel`, `message:error`, `tool:result`, `tool:approval` | Chat message and tool lifecycle |
| `agents:schedule` | `schedule:create`, `schedule:execute`, `schedule:cancel`, `schedule:retry`, `schedule:error`, `queue:create`, `queue:retry`, `queue:error` | Scheduled and queued task lifecycle |
| `agents:lifecycle` | `connect`, `disconnect`, `destroy` | Agent connection and teardown |
| `agents:workflow` | `workflow:start`, `workflow:event`, `workflow:approved`, `workflow:rejected`, `workflow:terminated`, `workflow:paused`, `workflow:resumed`, `workflow:restarted` | Workflow state transitions |
| `agents:mcp` | `mcp:client:preconnect`, `mcp:client:connect`, `mcp:client:authorize`, `mcp:client:discover` | MCP client operations |
| `agents:email` | `email:receive`, `email:reply` | Email processing |
## Subscribing to events
### Typed subscribe helper
The `subscribe()` function from `agents/observability` provides type-safe access to events on a specific channel:
```ts
import { subscribe } from "agents/observability";
const unsub = subscribe("rpc", (event) => {
if (event.type === "rpc") {
console.log(`RPC call: ${event.payload.method}`);
}
if (event.type === "rpc:error") {
console.error(
`RPC failed: ${event.payload.method} — ${event.payload.error}`
);
}
});
// Clean up when done
unsub();
```
The callback is fully typed — `event` is narrowed to only the event types that flow through that channel.
### Raw diagnostics_channel
You can also subscribe directly using the Node.js API:
```ts
import { subscribe } from "node:diagnostics_channel";
subscribe("agents:schedule", (event) => {
console.log(event);
});
```
## Tail Workers (production)
In production, all diagnostics channel messages are automatically forwarded to [Tail Workers](https://developers.cloudflare.com/workers/observability/tail-workers/). No subscription code is needed in the agent itself — attach a Tail Worker and access events via `event.diagnosticsChannelEvents`:
```ts
export default {
async tail(events) {
for (const event of events) {
for (const msg of event.diagnosticsChannelEvents) {
// msg.channel is "agents:rpc", "agents:workflow", etc.
// msg.message is the typed event payload
console.log(msg.timestamp, msg.channel, msg.message);
}
}
}
};
```
This gives you structured, filterable observability in production with zero overhead in the agent hot path.
## Custom observability
You can override the default implementation by providing your own `Observability` interface:
```ts
import { Agent } from "agents";
import type { Observability } from "agents/observability";
const myObservability: Observability = {
emit(event) {
// Send to your logging service, filter events, etc.
if (event.type === "rpc:error") {
myLogger.error(event.payload.method, event.payload.error);
}
}
};
class MyAgent extends Agent {
override observability = myObservability;
}
```
Set `observability` to `undefined` to disable all event emission:
```ts
class MyAgent extends Agent {
override observability = undefined;
}
```
## Event reference
### RPC events
| Type | Payload | When |
| ----------- | ------------------------ | ------------------------------- |
| `rpc` | `{ method, streaming? }` | A `@callable` method is invoked |
| `rpc:error` | `{ method, error }` | A `@callable` method throws |
### State events
| Type | Payload | When |
| -------------- | ------- | ---------------------- |
| `state:update` | `{}` | `setState()` is called |
### Message and tool events (`AIChatAgent`)
These events are emitted by `AIChatAgent` from `@cloudflare/ai-chat`. They track the chat message lifecycle, including client-side tool interactions.
| Type | Payload | When |
| ------------------ | -------------------------- | ----------------------------------- |
| `message:request` | `{}` | A chat message is received |
| `message:response` | `{}` | A chat response stream completes |
| `message:clear` | `{}` | Chat history is cleared |
| `message:cancel` | `{ requestId }` | A streaming request is cancelled |
| `message:error` | `{ error }` | A chat stream fails |
| `tool:result` | `{ toolCallId, toolName }` | A client tool result is received |
| `tool:approval` | `{ toolCallId, approved }` | A tool call is approved or rejected |
### Schedule and queue events
| Type | Payload | When |
| ------------------ | ---------------------------------------- | -------------------------------------------- |
| `schedule:create` | `{ callback, id }` | A schedule is created |
| `schedule:execute` | `{ callback, id }` | A scheduled callback starts |
| `schedule:cancel` | `{ callback, id }` | A schedule is cancelled |
| `schedule:retry` | `{ callback, id, attempt, maxAttempts }` | A scheduled callback is retried |
| `schedule:error` | `{ callback, id, error, attempts }` | A scheduled callback fails after all retries |
| `queue:create` | `{ callback, id }` | A task is enqueued |
| `queue:retry` | `{ callback, id, attempt, maxAttempts }` | A queued callback is retried |
| `queue:error` | `{ callback, id, error, attempts }` | A queued callback fails after all retries |
### Lifecycle events
| Type | Payload | When |
| ------------ | -------------------------------- | ------------------------------------- |
| `connect` | `{ connectionId }` | A WebSocket connection is established |
| `disconnect` | `{ connectionId, code, reason }` | A WebSocket connection is closed |
| `destroy` | `{}` | The agent is destroyed |
### Workflow events
| Type | Payload | When |
| --------------------- | ------------------------------- | ------------------------------ |
| `workflow:start` | `{ workflowId, workflowName? }` | A workflow instance is started |
| `workflow:event` | `{ workflowId, eventType? }` | An event is sent to a workflow |
| `workflow:approved` | `{ workflowId, reason? }` | A workflow is approved |
| `workflow:rejected` | `{ workflowId, reason? }` | A workflow is rejected |
| `workflow:terminated` | `{ workflowId, workflowName? }` | A workflow is terminated |
| `workflow:paused` | `{ workflowId, workflowName? }` | A workflow is paused |
| `workflow:resumed` | `{ workflowId, workflowName? }` | A workflow is resumed |
| `workflow:restarted` | `{ workflowId, workflowName? }` | A workflow is restarted |
### MCP events
| Type | Payload | When |
| ----------------------- | --------------------------------------- | -------------------------------------------- |
| `mcp:client:preconnect` | `{ serverId }` | Before connecting to an MCP server |
| `mcp:client:connect` | `{ url, transport, state, error? }` | An MCP connection attempt completes or fails |
| `mcp:client:authorize` | `{ serverId, authUrl, clientId? }` | An MCP OAuth flow begins |
| `mcp:client:discover` | `{ url?, state?, error?, capability? }` | MCP capability discovery succeeds or fails |
### Email events
| Type | Payload | When |
| --------------- | ------------------------ | --------------------- |
| `email:receive` | `{ from, to, subject? }` | An email is received |
| `email:reply` | `{ from, to, subject? }` | A reply email is sent |