branch:
onfinish-cleanup.test.ts
10162 bytesRaw
import { env } from "cloudflare:workers";
import { describe, it, expect } from "vitest";
import { MessageType } from "../types";
import { connectChatWS } from "./test-utils";
import { getAgentByName } from "agents";
describe("onFinish cleanup (framework-managed)", () => {
it("cleans up abort controller after stream completes even without user passing onFinish", async () => {
const room = crypto.randomUUID();
const { ws } = await connectChatWS(`/agents/test-chat-agent/${room}`);
let resolvePromise: (value: boolean) => void;
const donePromise = new Promise<boolean>((res) => {
resolvePromise = res;
});
const timeout = setTimeout(() => resolvePromise(false), 3000);
ws.addEventListener("message", (e: MessageEvent) => {
const data = JSON.parse(e.data as string);
if (data.type === MessageType.CF_AGENT_USE_CHAT_RESPONSE && data.done) {
clearTimeout(timeout);
resolvePromise(true);
}
});
// Send first request
ws.send(
JSON.stringify({
type: MessageType.CF_AGENT_USE_CHAT_REQUEST,
id: "req-cleanup-1",
init: {
method: "POST",
body: JSON.stringify({
messages: [
{
id: "msg-cleanup-1",
role: "user",
parts: [{ type: "text", text: "Hello" }]
}
]
})
}
})
);
const done = await donePromise;
expect(done).toBe(true);
// Send a second request to prove the agent is still healthy
// (no leaked abort controllers causing issues)
let resolveSecond: (value: boolean) => void;
const secondDone = new Promise<boolean>((res) => {
resolveSecond = res;
});
const timeout2 = setTimeout(() => resolveSecond(false), 3000);
ws.addEventListener("message", (e: MessageEvent) => {
const data = JSON.parse(e.data as string);
if (
data.type === MessageType.CF_AGENT_USE_CHAT_RESPONSE &&
data.done &&
data.id === "req-cleanup-2"
) {
clearTimeout(timeout2);
resolveSecond(true);
}
});
ws.send(
JSON.stringify({
type: MessageType.CF_AGENT_USE_CHAT_REQUEST,
id: "req-cleanup-2",
init: {
method: "POST",
body: JSON.stringify({
messages: [
{
id: "msg-cleanup-1",
role: "user",
parts: [{ type: "text", text: "Hello" }]
},
{
id: "msg-cleanup-2",
role: "user",
parts: [{ type: "text", text: "Second message" }]
}
]
})
}
})
);
const second = await secondDone;
expect(second).toBe(true);
ws.close(1000);
});
it("cancellation still works after cleanup is moved to _reply", async () => {
const room = crypto.randomUUID();
const { ws } = await connectChatWS(`/agents/test-chat-agent/${room}`);
await new Promise((r) => setTimeout(r, 50));
// Send a request and immediately cancel it
ws.send(
JSON.stringify({
type: MessageType.CF_AGENT_USE_CHAT_REQUEST,
id: "req-cancel-test",
init: {
method: "POST",
body: JSON.stringify({
messages: [
{
id: "cancel-1",
role: "user",
parts: [{ type: "text", text: "Cancel me" }]
}
]
})
}
})
);
ws.send(
JSON.stringify({
type: MessageType.CF_AGENT_CHAT_REQUEST_CANCEL,
id: "req-cancel-test"
})
);
// Wait a moment for processing
await new Promise((r) => setTimeout(r, 500));
// Agent should still be alive -- send another request
let resolvePromise: (value: boolean) => void;
const donePromise = new Promise<boolean>((res) => {
resolvePromise = res;
});
const timeout = setTimeout(() => resolvePromise(false), 3000);
ws.addEventListener("message", (e: MessageEvent) => {
const data = JSON.parse(e.data as string);
if (
data.type === MessageType.CF_AGENT_USE_CHAT_RESPONSE &&
data.done &&
data.id === "req-after-cancel"
) {
clearTimeout(timeout);
resolvePromise(true);
}
});
ws.send(
JSON.stringify({
type: MessageType.CF_AGENT_USE_CHAT_REQUEST,
id: "req-after-cancel",
init: {
method: "POST",
body: JSON.stringify({
messages: [
{
id: "after-cancel-1",
role: "user",
parts: [{ type: "text", text: "Still alive?" }]
}
]
})
}
})
);
const done = await donePromise;
expect(done).toBe(true);
ws.close(1000);
});
it("multiple concurrent requests each get cleaned up", async () => {
const room = crypto.randomUUID();
const { ws } = await connectChatWS(`/agents/test-chat-agent/${room}`);
const received: string[] = [];
ws.addEventListener("message", (e: MessageEvent) => {
try {
const data = JSON.parse(e.data as string);
if (data.type === MessageType.CF_AGENT_USE_CHAT_RESPONSE && data.done) {
received.push(data.id);
}
} catch {
// ignore
}
});
// Send two requests simultaneously
ws.send(
JSON.stringify({
type: MessageType.CF_AGENT_USE_CHAT_REQUEST,
id: "req-concurrent-1",
init: {
method: "POST",
body: JSON.stringify({
messages: [
{
id: "conc-1",
role: "user",
parts: [{ type: "text", text: "First" }]
}
]
})
}
})
);
ws.send(
JSON.stringify({
type: MessageType.CF_AGENT_USE_CHAT_REQUEST,
id: "req-concurrent-2",
init: {
method: "POST",
body: JSON.stringify({
messages: [
{
id: "conc-1",
role: "user",
parts: [{ type: "text", text: "First" }]
},
{
id: "conc-2",
role: "user",
parts: [{ type: "text", text: "Second" }]
}
]
})
}
})
);
// Wait for both to complete
await new Promise((r) => setTimeout(r, 2000));
// Both requests should have completed
expect(received).toContain("req-concurrent-1");
expect(received).toContain("req-concurrent-2");
ws.close(1000);
});
it("abort controllers are cleaned up after stream completion (count returns to 0)", async () => {
const room = crypto.randomUUID();
const { ws } = await connectChatWS(`/agents/test-chat-agent/${room}`);
const agentStub = await getAgentByName(env.TestChatAgent, room);
// Initially no abort controllers
expect(await agentStub.getAbortControllerCount()).toBe(0);
let resolvePromise: (value: boolean) => void;
const donePromise = new Promise<boolean>((res) => {
resolvePromise = res;
});
const timeout = setTimeout(() => resolvePromise(false), 3000);
ws.addEventListener("message", (e: MessageEvent) => {
const data = JSON.parse(e.data as string);
if (data.type === MessageType.CF_AGENT_USE_CHAT_RESPONSE && data.done) {
clearTimeout(timeout);
resolvePromise(true);
}
});
// Send a request
ws.send(
JSON.stringify({
type: MessageType.CF_AGENT_USE_CHAT_REQUEST,
id: "req-ac-check",
init: {
method: "POST",
body: JSON.stringify({
messages: [
{
id: "ac-1",
role: "user",
parts: [{ type: "text", text: "Hello" }]
}
]
})
}
})
);
const done = await donePromise;
expect(done).toBe(true);
// Give _reply a moment to finish its cleanup after the stream completes
await new Promise((r) => setTimeout(r, 100));
// Abort controller should have been cleaned up by _reply
// even though our test worker's onChatMessage does NOT pass onFinish to streamText
expect(await agentStub.getAbortControllerCount()).toBe(0);
ws.close(1000);
});
it("abort controllers are cleaned up after multiple sequential requests", async () => {
const room = crypto.randomUUID();
const { ws } = await connectChatWS(`/agents/test-chat-agent/${room}`);
const agentStub = await getAgentByName(env.TestChatAgent, room);
// Send 3 sequential requests
for (let i = 0; i < 3; i++) {
let resolvePromise: (value: boolean) => void;
const donePromise = new Promise<boolean>((res) => {
resolvePromise = res;
});
const timeout = setTimeout(() => resolvePromise(false), 3000);
const listener = (e: MessageEvent) => {
const data = JSON.parse(e.data as string);
if (
data.type === MessageType.CF_AGENT_USE_CHAT_RESPONSE &&
data.done &&
data.id === `req-seq-${i}`
) {
clearTimeout(timeout);
resolvePromise(true);
}
};
ws.addEventListener("message", listener);
ws.send(
JSON.stringify({
type: MessageType.CF_AGENT_USE_CHAT_REQUEST,
id: `req-seq-${i}`,
init: {
method: "POST",
body: JSON.stringify({
messages: [
{
id: `seq-${i}`,
role: "user",
parts: [{ type: "text", text: `Message ${i}` }]
}
]
})
}
})
);
await donePromise;
ws.removeEventListener("message", listener);
}
// Wait for cleanup
await new Promise((r) => setTimeout(r, 100));
// All 3 abort controllers should be cleaned up
expect(await agentStub.getAbortControllerCount()).toBe(0);
ws.close(1000);
});
});