branch:
server.ts
4758 bytesRaw
import { Agent as OpenAIAgent, run, tool } from "@openai/agents";
import type { AgentInputItem } from "@openai/agents";
import { Agent as CFAgent, callable, routeAgentRequest } from "agents";
import type { StreamingResponse } from "agents";
import { z } from "zod";
export type { AgentInputItem };
/** A streaming chunk sent to the client via StreamingResponse.send() */
export type StreamChunk =
| { type: "text-delta"; delta: string }
| { type: "tool-call"; toolCallId: string; toolName: string; input: unknown }
| { type: "tool-result"; toolCallId: string; output: unknown };
export type AgentState = {
messages: AgentInputItem[];
};
const getWeather = tool({
name: "get_weather",
description: "Get the current weather for a city",
parameters: z.object({
city: z.string().describe("The city name")
}),
execute: async ({ city }) => {
const conditions = ["sunny", "cloudy", "rainy", "windy"];
const temp = Math.floor(Math.random() * 30) + 5;
return JSON.stringify({
city,
temperature: `${temp}°C`,
condition: conditions[Math.floor(Math.random() * conditions.length)]
});
}
});
/**
* Streaming chat agent using @openai/agents with @callable({ streaming: true }).
*
* The pattern:
* 1. Client calls agent.call("chat", [message], { stream: { onChunk, onDone } })
* 2. Server receives a StreamingResponse as the first argument
* 3. run(agent, history, { stream: true }) produces a StreamedRunResult
* 4. For each event, stream.send() pushes typed chunks to the client
* 5. stream.end() signals completion with the full assistant message
*/
export class MyAgent extends CFAgent<Env, AgentState> {
initialState: AgentState = { messages: [] };
@callable({ streaming: true })
async chat(stream: StreamingResponse, userMessage: string) {
const agent = new OpenAIAgent({
name: "Assistant",
instructions:
"You are a helpful assistant. You can check the weather for any city.",
tools: [getWeather]
});
const messages: AgentInputItem[] = [
...this.state.messages,
{ role: "user" as const, content: userMessage }
];
const result = await run(agent, messages, { stream: true });
let assistantText = "";
const newItems: AgentInputItem[] = [];
// Track tool names by callId so we can pair them with results
const toolNames = new Map<string, string>();
for await (const event of result) {
if (event.type === "raw_model_stream_event") {
if (event.data.type === "output_text_delta") {
assistantText += event.data.delta;
stream.send({
type: "text-delta",
delta: event.data.delta
} satisfies StreamChunk);
}
}
if (event.type === "run_item_stream_event") {
if (event.name === "tool_called") {
const rawItem = event.item.rawItem as {
callId: string;
name: string;
arguments: string;
};
toolNames.set(rawItem.callId, rawItem.name);
newItems.push({
type: "function_call" as const,
callId: rawItem.callId,
name: rawItem.name,
arguments: rawItem.arguments
});
stream.send({
type: "tool-call",
toolCallId: rawItem.callId,
toolName: rawItem.name,
input: JSON.parse(rawItem.arguments)
} satisfies StreamChunk);
} else if (event.name === "tool_output") {
const rawItem = event.item.rawItem as {
callId: string;
output: unknown;
};
newItems.push({
type: "function_call_result" as const,
callId: rawItem.callId,
name: toolNames.get(rawItem.callId) ?? "unknown",
status: "completed" as const,
output:
typeof rawItem.output === "string"
? rawItem.output
: JSON.stringify(rawItem.output)
});
stream.send({
type: "tool-result",
toolCallId: rawItem.callId,
output: rawItem.output
} satisfies StreamChunk);
}
}
}
newItems.push({
role: "assistant" as const,
status: "completed" as const,
content: [{ type: "output_text" as const, text: assistantText }]
});
this.setState({ messages: [...messages, ...newItems] });
stream.end();
}
@callable()
clearHistory() {
this.setState({ messages: [] });
}
}
// Worker entrypoint
export default {
async fetch(request: Request, env: Env, _ctx: ExecutionContext) {
return (
(await routeAgentRequest(request, env)) ||
new Response("Not found", { status: 404 })
);
}
};