branch:
chat.spec.ts
51810 bytesRaw
import { test, expect } from "@playwright/test";
// ── Helpers ──────────────────────────────────────────────────────────
/** Message type constants (mirrors src/types.ts MessageType enum values) */
const MessageType = {
CF_AGENT_CHAT_MESSAGES: "cf_agent_chat_messages",
CF_AGENT_USE_CHAT_REQUEST: "cf_agent_use_chat_request",
CF_AGENT_USE_CHAT_RESPONSE: "cf_agent_use_chat_response",
CF_AGENT_CHAT_CLEAR: "cf_agent_chat_clear",
CF_AGENT_STREAM_RESUMING: "cf_agent_stream_resuming",
CF_AGENT_STREAM_RESUME_ACK: "cf_agent_stream_resume_ack"
} as const;
type WSMessage = {
type: string;
[key: string]: unknown;
};
/**
* Opens a real WebSocket to the agent inside the browser page context.
* Returns helpers to send messages and collect responses.
*/
function wsHelpers(baseURL: string) {
const agentPath = (room: string) =>
`${baseURL.replace("http", "ws")}/agents/chat-agent/${room}`;
return { agentPath };
}
/**
* Connects a WebSocket inside page.evaluate and returns collected messages.
* We run everything inside the browser because Playwright's native WS
* support is for intercepting, not initiating raw connections.
*/
async function connectAndRun(
page: import("@playwright/test").Page,
wsUrl: string,
actions: string // JS function body that receives (ws, resolve, reject, MessageType)
): Promise<WSMessage[]> {
return page.evaluate(
({ url, actions, MT }) => {
return new Promise<WSMessage[]>((resolve, reject) => {
const ws = new WebSocket(url);
const received: WSMessage[] = [];
ws.onmessage = (e) => {
try {
received.push(JSON.parse(e.data as string));
} catch {
// ignore non-JSON
}
};
ws.onerror = () => reject(new Error("WebSocket error"));
ws.onopen = () => {
try {
// Execute the caller-provided action script
const fn = new Function(
"ws",
"resolve",
"reject",
"received",
"MT",
actions
);
fn(ws, resolve, reject, received, MT);
} catch (err) {
reject(err);
}
};
});
},
{ url: wsUrl, actions, MT: MessageType }
);
}
// ── Tests ────────────────────────────────────────────────────────────
test.describe("AIChatAgent e2e", () => {
test.beforeEach(async ({ page }) => {
// Navigate to a blank page so we have a browser context for WebSockets
await page.goto("about:blank");
});
test("chat message round-trip: send request, receive streamed response", async ({
page,
baseURL
}) => {
const { agentPath } = wsHelpers(baseURL!);
const room = crypto.randomUUID();
const messages = await connectAndRun(
page,
agentPath(room),
`
const userMsg = {
id: "user-1",
role: "user",
parts: [{ type: "text", text: "Hello" }]
};
ws.send(JSON.stringify({
type: MT.CF_AGENT_USE_CHAT_REQUEST,
id: "req-1",
init: {
method: "POST",
body: JSON.stringify({ messages: [userMsg] })
}
}));
// Wait for the done signal
const check = setInterval(() => {
const doneMsg = received.find(
m => m.type === MT.CF_AGENT_USE_CHAT_RESPONSE && m.done === true
);
if (doneMsg) {
clearInterval(check);
ws.close();
resolve(received);
}
}, 50);
// Timeout safety
setTimeout(() => { clearInterval(check); ws.close(); resolve(received); }, 5000);
`
);
// Note: The sender is excluded from CF_AGENT_CHAT_MESSAGES broadcasts
// (by design — only other connections receive it). So we only check
// for the stream response here. Multi-connection sync is tested separately.
// Should have received stream response chunks
const streamResponses = messages.filter(
(m) => m.type === MessageType.CF_AGENT_USE_CHAT_RESPONSE
);
expect(streamResponses.length).toBeGreaterThanOrEqual(2); // at least one chunk + done
// Should have text-start, text-delta, text-end events
const nonDoneResponses = streamResponses.filter(
(m) => !m.done && typeof m.body === "string" && (m.body as string).trim()
);
const bodies = nonDoneResponses.map((m) => JSON.parse(m.body as string));
const types = bodies.map((b: { type: string }) => b.type);
expect(types).toContain("text-start");
expect(types).toContain("text-delta");
expect(types).toContain("text-end");
// The text-delta should contain the response text
const deltas = bodies.filter(
(b: { type: string }) => b.type === "text-delta"
);
const fullText = deltas.map((d: { delta: string }) => d.delta).join("");
expect(fullText).toBe("Hello from e2e agent!");
// The final message should be done=true
const lastMsg = streamResponses[streamResponses.length - 1];
expect(lastMsg.done).toBe(true);
});
test("message persistence: messages survive reconnection", async ({
page,
baseURL
}) => {
const { agentPath } = wsHelpers(baseURL!);
const room = crypto.randomUUID();
// First connection: send a message
await connectAndRun(
page,
agentPath(room),
`
const userMsg = {
id: "persist-1",
role: "user",
parts: [{ type: "text", text: "Remember me" }]
};
ws.send(JSON.stringify({
type: MT.CF_AGENT_USE_CHAT_REQUEST,
id: "req-persist",
init: {
method: "POST",
body: JSON.stringify({ messages: [userMsg] })
}
}));
const check = setInterval(() => {
if (received.find(m => m.type === MT.CF_AGENT_USE_CHAT_RESPONSE && m.done)) {
clearInterval(check);
ws.close();
resolve(received);
}
}, 50);
setTimeout(() => { clearInterval(check); ws.close(); resolve(received); }, 5000);
`
);
// Verify via HTTP GET /get-messages
const res = await page.request.get(
`${baseURL}/agents/chat-agent/${room}/get-messages`
);
expect(res.ok()).toBe(true);
const persisted = await res.json();
expect(Array.isArray(persisted)).toBe(true);
expect(persisted.length).toBeGreaterThanOrEqual(2); // user + assistant
// User message should be persisted
const userMsg = persisted.find((m: { id: string }) => m.id === "persist-1");
expect(userMsg).toBeTruthy();
expect(userMsg.role).toBe("user");
// Assistant response should be persisted
const assistantMsgs = persisted.filter(
(m: { role: string }) => m.role === "assistant"
);
expect(assistantMsgs.length).toBeGreaterThanOrEqual(1);
});
test("clear history: clears all messages and notifies other connections", async ({
page,
baseURL
}) => {
const { agentPath } = wsHelpers(baseURL!);
const room = crypto.randomUUID();
const wsUrl = agentPath(room);
// Send a message first
await connectAndRun(
page,
wsUrl,
`
ws.send(JSON.stringify({
type: MT.CF_AGENT_USE_CHAT_REQUEST,
id: "req-clear",
init: {
method: "POST",
body: JSON.stringify({
messages: [{
id: "clear-msg-1",
role: "user",
parts: [{ type: "text", text: "About to be cleared" }]
}]
})
}
}));
const check = setInterval(() => {
if (received.find(m => m.type === MT.CF_AGENT_USE_CHAT_RESPONSE && m.done)) {
clearInterval(check);
ws.close();
resolve(received);
}
}, 50);
setTimeout(() => { clearInterval(check); ws.close(); resolve(received); }, 5000);
`
);
// Verify messages exist
const before = await page.request.get(
`${baseURL}/agents/chat-agent/${room}/get-messages`
);
const beforeData = await before.json();
expect(beforeData.length).toBeGreaterThanOrEqual(1);
// Send clear from a new connection
await connectAndRun(
page,
wsUrl,
`
ws.send(JSON.stringify({ type: MT.CF_AGENT_CHAT_CLEAR }));
// Wait a bit for the clear to process
setTimeout(() => { ws.close(); resolve(received); }, 500);
`
);
// Verify messages are cleared
const after = await page.request.get(
`${baseURL}/agents/chat-agent/${room}/get-messages`
);
const afterData = await after.json();
expect(afterData.length).toBe(0);
});
test("multi-connection sync: second connection receives broadcast", async ({
page,
baseURL
}) => {
const { agentPath } = wsHelpers(baseURL!);
const room = crypto.randomUUID();
const wsUrl = agentPath(room);
// Open two WebSocket connections and send from the first.
// The second should receive the CF_AGENT_CHAT_MESSAGES broadcast
// AND the streamed response.
const result = await page.evaluate(
({ url, MT }) => {
return new Promise<{
ws1Messages: Array<{ type: string; [k: string]: unknown }>;
ws2Messages: Array<{ type: string; [k: string]: unknown }>;
}>((resolve) => {
const ws1Messages: Array<{ type: string; [k: string]: unknown }> = [];
const ws2Messages: Array<{ type: string; [k: string]: unknown }> = [];
const ws1 = new WebSocket(url);
const ws2 = new WebSocket(url);
let ws1Open = false;
let ws2Open = false;
function maybeSend() {
if (!ws1Open || !ws2Open) return;
// Give connections a moment to fully register
setTimeout(() => {
ws1.send(
JSON.stringify({
type: MT.CF_AGENT_USE_CHAT_REQUEST,
id: "req-sync",
init: {
method: "POST",
body: JSON.stringify({
messages: [
{
id: "sync-1",
role: "user",
parts: [{ type: "text", text: "Sync test" }]
}
]
})
}
})
);
}, 100);
}
ws1.onopen = () => {
ws1Open = true;
maybeSend();
};
ws2.onopen = () => {
ws2Open = true;
maybeSend();
};
ws1.onmessage = (e) => {
try {
ws1Messages.push(JSON.parse(e.data));
} catch {}
};
ws2.onmessage = (e) => {
try {
ws2Messages.push(JSON.parse(e.data));
} catch {}
};
// Wait for stream to complete on ws1
const check = setInterval(() => {
const done = ws1Messages.find(
(m) => m.type === MT.CF_AGENT_USE_CHAT_RESPONSE && m.done === true
);
if (done) {
clearInterval(check);
// Give ws2 a moment to receive broadcasts
setTimeout(() => {
ws1.close();
ws2.close();
resolve({ ws1Messages, ws2Messages });
}, 200);
}
}, 50);
setTimeout(() => {
clearInterval(check);
ws1.close();
ws2.close();
resolve({ ws1Messages, ws2Messages });
}, 8000);
});
},
{ url: wsUrl, MT: MessageType }
);
// ws2 should have received the chat messages broadcast
const ws2ChatMsgs = result.ws2Messages.filter(
(m) => m.type === MessageType.CF_AGENT_CHAT_MESSAGES
);
expect(ws2ChatMsgs.length).toBeGreaterThanOrEqual(1);
// ws2 should have received the stream response (it's broadcast to all)
const ws2StreamResponses = result.ws2Messages.filter(
(m) => m.type === MessageType.CF_AGENT_USE_CHAT_RESPONSE
);
expect(ws2StreamResponses.length).toBeGreaterThanOrEqual(1);
});
test("separate rooms are isolated", async ({ page, baseURL }) => {
const { agentPath } = wsHelpers(baseURL!);
const roomA = crypto.randomUUID();
const roomB = crypto.randomUUID();
// Send a message to room A
await connectAndRun(
page,
agentPath(roomA),
`
ws.send(JSON.stringify({
type: MT.CF_AGENT_USE_CHAT_REQUEST,
id: "req-iso-a",
init: {
method: "POST",
body: JSON.stringify({
messages: [{
id: "iso-a",
role: "user",
parts: [{ type: "text", text: "Room A" }]
}]
})
}
}));
const check = setInterval(() => {
if (received.find(m => m.type === MT.CF_AGENT_USE_CHAT_RESPONSE && m.done)) {
clearInterval(check);
ws.close();
resolve(received);
}
}, 50);
setTimeout(() => { clearInterval(check); ws.close(); resolve(received); }, 5000);
`
);
// Room A should have messages
const resA = await page.request.get(
`${baseURL}/agents/chat-agent/${roomA}/get-messages`
);
const dataA = await resA.json();
expect(dataA.length).toBeGreaterThanOrEqual(2);
// Room B should be empty
const resB = await page.request.get(
`${baseURL}/agents/chat-agent/${roomB}/get-messages`
);
const dataB = await resB.json();
expect(dataB.length).toBe(0);
});
test("multiple messages accumulate in conversation", async ({
page,
baseURL
}) => {
const { agentPath } = wsHelpers(baseURL!);
const room = crypto.randomUUID();
const wsUrl = agentPath(room);
// Send first message
await connectAndRun(
page,
wsUrl,
`
ws.send(JSON.stringify({
type: MT.CF_AGENT_USE_CHAT_REQUEST,
id: "req-multi-1",
init: {
method: "POST",
body: JSON.stringify({
messages: [{
id: "multi-1",
role: "user",
parts: [{ type: "text", text: "First" }]
}]
})
}
}));
const check = setInterval(() => {
if (received.find(m => m.type === MT.CF_AGENT_USE_CHAT_RESPONSE && m.done)) {
clearInterval(check);
ws.close();
resolve(received);
}
}, 50);
setTimeout(() => { clearInterval(check); ws.close(); resolve(received); }, 5000);
`
);
// Send second message (include first message in the array, as the client would)
await connectAndRun(
page,
wsUrl,
`
ws.send(JSON.stringify({
type: MT.CF_AGENT_USE_CHAT_REQUEST,
id: "req-multi-2",
init: {
method: "POST",
body: JSON.stringify({
messages: [
{ id: "multi-1", role: "user", parts: [{ type: "text", text: "First" }] },
{ id: "multi-2", role: "user", parts: [{ type: "text", text: "Second" }] }
]
})
}
}));
const check = setInterval(() => {
if (received.find(m => m.type === MT.CF_AGENT_USE_CHAT_RESPONSE && m.done)) {
clearInterval(check);
ws.close();
resolve(received);
}
}, 50);
setTimeout(() => { clearInterval(check); ws.close(); resolve(received); }, 5000);
`
);
// Verify all messages persisted
const res = await page.request.get(
`${baseURL}/agents/chat-agent/${room}/get-messages`
);
const data = await res.json();
// Should have: multi-1 (user), assistant-1, multi-2 (user), assistant-2
const userMsgs = data.filter((m: { role: string }) => m.role === "user");
const assistantMsgs = data.filter(
(m: { role: string }) => m.role === "assistant"
);
expect(userMsgs.length).toBe(2);
expect(assistantMsgs.length).toBe(2);
});
test("request cancellation", async ({ page, baseURL }) => {
const { agentPath } = wsHelpers(baseURL!);
const room = crypto.randomUUID();
// Send a request and immediately cancel it
const messages = await connectAndRun(
page,
agentPath(room),
`
ws.send(JSON.stringify({
type: MT.CF_AGENT_USE_CHAT_REQUEST,
id: "req-cancel",
init: {
method: "POST",
body: JSON.stringify({
messages: [{
id: "cancel-1",
role: "user",
parts: [{ type: "text", text: "Cancel me" }]
}]
})
}
}));
// Cancel immediately
ws.send(JSON.stringify({
type: "cf_agent_chat_request_cancel",
id: "req-cancel"
}));
// Wait a bit then close
setTimeout(() => { ws.close(); resolve(received); }, 1000);
`
);
// The test passes if no errors are thrown — cancellation is graceful
// We might or might not get a response depending on timing
expect(messages).toBeDefined();
});
});
// ── Stream Resumption ──────────────────────────────────────────────
test.describe("Stream resumption e2e", () => {
test.beforeEach(async ({ page }) => {
await page.goto("about:blank");
});
test("resume mid-stream: reconnecting client receives CF_AGENT_STREAM_RESUMING", async ({
page,
baseURL
}) => {
const room = crypto.randomUUID();
const slowUrl = `${baseURL!.replace("http", "ws")}/agents/slow-agent/${room}`;
// Connect, send a message to start a slow stream, then disconnect mid-stream
await page.evaluate(
({ url, MT }) => {
return new Promise<void>((resolve) => {
const ws = new WebSocket(url);
ws.onopen = () => {
ws.send(
JSON.stringify({
type: MT.CF_AGENT_USE_CHAT_REQUEST,
id: "req-slow",
init: {
method: "POST",
body: JSON.stringify({
messages: [
{
id: "slow-1",
role: "user",
parts: [{ type: "text", text: "Go slow" }]
}
]
})
}
})
);
// Disconnect after 600ms (mid-stream — SlowAgent sends chunks every 400ms)
setTimeout(() => {
ws.close();
resolve();
}, 600);
};
});
},
{ url: slowUrl, MT: MessageType }
);
// Wait a beat for the server to register the disconnect
await new Promise((r) => setTimeout(r, 200));
// Reconnect — should receive CF_AGENT_STREAM_RESUMING
const resumeResult = await page.evaluate(
({ url, MT }) => {
return new Promise<{
messages: Array<{ type: string; [k: string]: unknown }>;
gotResuming: boolean;
gotResumeChunks: boolean;
}>((resolve) => {
const ws = new WebSocket(url);
const messages: Array<{ type: string; [k: string]: unknown }> = [];
let gotResuming = false;
let gotResumeChunks = false;
ws.onmessage = (e) => {
try {
const data = JSON.parse(e.data);
messages.push(data);
if (data.type === MT.CF_AGENT_STREAM_RESUMING) {
gotResuming = true;
// Send ACK
ws.send(
JSON.stringify({
type: MT.CF_AGENT_STREAM_RESUME_ACK,
id: data.id
})
);
}
if (
data.type === MT.CF_AGENT_USE_CHAT_RESPONSE &&
!data.done &&
data.body
) {
gotResumeChunks = true;
}
if (data.type === MT.CF_AGENT_USE_CHAT_RESPONSE && data.done) {
setTimeout(() => {
ws.close();
resolve({ messages, gotResuming, gotResumeChunks });
}, 200);
}
} catch {
// ignore
}
};
setTimeout(() => {
ws.close();
resolve({ messages, gotResuming, gotResumeChunks });
}, 10000);
});
},
{ url: slowUrl, MT: MessageType }
);
// Should have received the resuming notification
expect(resumeResult.gotResuming).toBe(true);
// Should have received replayed chunks after ACK
expect(resumeResult.gotResumeChunks).toBe(true);
});
test("no resume after completion: reconnecting gets no CF_AGENT_STREAM_RESUMING", async ({
page,
baseURL
}) => {
const { agentPath } = wsHelpers(baseURL!);
const room = crypto.randomUUID();
// Send a message and wait for completion
await connectAndRun(
page,
agentPath(room),
`
ws.send(JSON.stringify({
type: MT.CF_AGENT_USE_CHAT_REQUEST,
id: "req-done-first",
init: {
method: "POST",
body: JSON.stringify({
messages: [{
id: "done-1",
role: "user",
parts: [{ type: "text", text: "Quick" }]
}]
})
}
}));
const check = setInterval(() => {
if (received.find(m => m.type === MT.CF_AGENT_USE_CHAT_RESPONSE && m.done)) {
clearInterval(check);
ws.close();
resolve(received);
}
}, 50);
setTimeout(() => { clearInterval(check); ws.close(); resolve(received); }, 5000);
`
);
// Reconnect — should NOT get CF_AGENT_STREAM_RESUMING
const reconnectMsgs = await connectAndRun(
page,
agentPath(room),
`
// Wait and see what messages arrive
setTimeout(() => { ws.close(); resolve(received); }, 1000);
`
);
const resumeMsg = reconnectMsgs.find(
(m) => m.type === MessageType.CF_AGENT_STREAM_RESUMING
);
expect(resumeMsg).toBeUndefined();
});
test("client-initiated resume: CF_AGENT_STREAM_RESUME_REQUEST triggers resume after handler is ready", async ({
page,
baseURL
}) => {
const room = crypto.randomUUID();
const slowUrl = `${baseURL!.replace("http", "ws")}/agents/slow-agent/${room}`;
// Start a slow stream and disconnect mid-stream
await page.evaluate(
({ url, MT }) => {
return new Promise<void>((resolve) => {
const ws = new WebSocket(url);
ws.onopen = () => {
ws.send(
JSON.stringify({
type: MT.CF_AGENT_USE_CHAT_REQUEST,
id: "req-client-resume",
init: {
method: "POST",
body: JSON.stringify({
messages: [
{
id: "cr-1",
role: "user",
parts: [{ type: "text", text: "Go slow" }]
}
]
})
}
})
);
setTimeout(() => {
ws.close();
resolve();
}, 600);
};
});
},
{ url: slowUrl, MT: MessageType }
);
await new Promise((r) => setTimeout(r, 200));
// Reconnect and explicitly send CF_AGENT_STREAM_RESUME_REQUEST
// (simulating what the client does after useEffect registers the handler)
const result = await page.evaluate(
({ url, MT }) => {
return new Promise<{
gotResuming: boolean;
gotChunks: boolean;
replayFlags: boolean[];
}>((resolve) => {
const ws = new WebSocket(url);
let gotResuming = false;
let gotChunks = false;
const replayFlags: boolean[] = [];
ws.onmessage = (e) => {
try {
const data = JSON.parse(e.data);
if (data.type === MT.CF_AGENT_STREAM_RESUMING) {
gotResuming = true;
ws.send(
JSON.stringify({
type: MT.CF_AGENT_STREAM_RESUME_ACK,
id: data.id
})
);
}
if (data.type === "cf_agent_use_chat_response") {
if (!data.done && data.body) {
gotChunks = true;
replayFlags.push(data.replay === true);
}
if (data.done) {
replayFlags.push(data.replay === true);
setTimeout(() => {
ws.close();
resolve({ gotResuming, gotChunks, replayFlags });
}, 200);
}
}
} catch {
// ignore
}
};
ws.onopen = () => {
// Send resume request after handler is ready
// (this is what useAgentChat does in its useEffect)
ws.send(
JSON.stringify({
type: "cf_agent_stream_resume_request"
})
);
};
setTimeout(() => {
ws.close();
resolve({ gotResuming, gotChunks, replayFlags });
}, 10000);
});
},
{ url: slowUrl, MT: MessageType }
);
// Should have received resume notification via the request
expect(result.gotResuming).toBe(true);
// Should have received chunks (mix of replayed from DB + live from ongoing stream)
expect(result.gotChunks).toBe(true);
// At least some chunks should have replay=true (the ones replayed from DB)
expect(result.replayFlags.length).toBeGreaterThan(0);
expect(result.replayFlags.some((f) => f === true)).toBe(true);
});
test("replayed chunks have replay=true flag", async ({ page, baseURL }) => {
const room = crypto.randomUUID();
const slowUrl = `${baseURL!.replace("http", "ws")}/agents/slow-agent/${room}`;
// Start stream, disconnect mid-stream, reconnect
await page.evaluate(
({ url, MT }) => {
return new Promise<void>((resolve) => {
const ws = new WebSocket(url);
ws.onopen = () => {
ws.send(
JSON.stringify({
type: MT.CF_AGENT_USE_CHAT_REQUEST,
id: "req-replay-flag",
init: {
method: "POST",
body: JSON.stringify({
messages: [
{
id: "rf-1",
role: "user",
parts: [{ type: "text", text: "Go slow" }]
}
]
})
}
})
);
setTimeout(() => {
ws.close();
resolve();
}, 600);
};
});
},
{ url: slowUrl, MT: MessageType }
);
await new Promise((r) => setTimeout(r, 200));
// Reconnect and check replay flags
const result = await page.evaluate(
({ url, MT }) => {
return new Promise<{
chunks: Array<{
body?: string;
done: boolean;
replay?: boolean;
}>;
}>((resolve) => {
const ws = new WebSocket(url);
const chunks: Array<{
body?: string;
done: boolean;
replay?: boolean;
}> = [];
ws.onmessage = (e) => {
try {
const data = JSON.parse(e.data);
if (data.type === MT.CF_AGENT_STREAM_RESUMING) {
ws.send(
JSON.stringify({
type: MT.CF_AGENT_STREAM_RESUME_ACK,
id: data.id
})
);
}
if (data.type === "cf_agent_use_chat_response") {
chunks.push({
body: data.body,
done: data.done,
replay: data.replay
});
if (data.done) {
setTimeout(() => {
ws.close();
resolve({ chunks });
}, 200);
}
}
} catch {
// ignore
}
};
setTimeout(() => {
ws.close();
resolve({ chunks });
}, 10000);
});
},
{ url: slowUrl, MT: MessageType }
);
// Should have received chunks
expect(result.chunks.length).toBeGreaterThan(0);
// Replayed chunks (from DB) have replay=true,
// live chunks (from ongoing stream) don't.
// At least some should be replayed.
const replayedChunks = result.chunks.filter((c) => c.replay === true);
expect(replayedChunks.length).toBeGreaterThan(0);
});
});
// ── Custom Body ────────────────────────────────────────────────────
test.describe("Custom body forwarding e2e", () => {
test.beforeEach(async ({ page }) => {
await page.goto("about:blank");
});
test("custom fields in request body are forwarded to onChatMessage", async ({
page,
baseURL
}) => {
const { agentPath } = wsHelpers(baseURL!);
const room = crypto.randomUUID();
// Send a request with custom body fields
await connectAndRun(
page,
agentPath(room),
`
ws.send(JSON.stringify({
type: MT.CF_AGENT_USE_CHAT_REQUEST,
id: "req-custom-body",
init: {
method: "POST",
body: JSON.stringify({
messages: [{
id: "cb-1",
role: "user",
parts: [{ type: "text", text: "With custom body" }]
}],
customField: "test-value",
nestedData: { key: 123 }
})
}
}));
const check = setInterval(() => {
if (received.find(m => m.type === MT.CF_AGENT_USE_CHAT_RESPONSE && m.done)) {
clearInterval(check);
ws.close();
resolve(received);
}
}, 50);
setTimeout(() => { clearInterval(check); ws.close(); resolve(received); }, 5000);
`
);
// If the agent didn't crash and responded, the custom body was received.
// The ChatAgent doesn't use options.body, but the fact that it responded
// means the parsing didn't fail — custom fields are silently passed through.
const res = await page.request.get(
`${baseURL}/agents/chat-agent/${room}/get-messages`
);
const data = await res.json();
expect(data.length).toBeGreaterThanOrEqual(2);
});
});
// ── Error Handling ─────────────────────────────────────────────────
test.describe("Error handling e2e", () => {
test.beforeEach(async ({ page }) => {
await page.goto("about:blank");
});
test("malformed JSON on WebSocket does not crash connection", async ({
page,
baseURL
}) => {
const { agentPath } = wsHelpers(baseURL!);
const room = crypto.randomUUID();
const messages = await connectAndRun(
page,
agentPath(room),
`
// Send garbage text
ws.send("this is not json at all!!!");
ws.send("{broken json");
ws.send("");
// Then send a valid request
ws.send(JSON.stringify({
type: MT.CF_AGENT_USE_CHAT_REQUEST,
id: "req-after-garbage",
init: {
method: "POST",
body: JSON.stringify({
messages: [{
id: "garbage-1",
role: "user",
parts: [{ type: "text", text: "Still works?" }]
}]
})
}
}));
const check = setInterval(() => {
if (received.find(m => m.type === MT.CF_AGENT_USE_CHAT_RESPONSE && m.done)) {
clearInterval(check);
ws.close();
resolve(received);
}
}, 50);
setTimeout(() => { clearInterval(check); ws.close(); resolve(received); }, 5000);
`
);
// Should have received a valid response despite the garbage
const doneMsg = messages.find(
(m) => m.type === MessageType.CF_AGENT_USE_CHAT_RESPONSE && m.done
);
expect(doneMsg).toBeTruthy();
});
test("invalid message type does not crash connection", async ({
page,
baseURL
}) => {
const { agentPath } = wsHelpers(baseURL!);
const room = crypto.randomUUID();
const messages = await connectAndRun(
page,
agentPath(room),
`
// Send valid JSON with unknown type
ws.send(JSON.stringify({ type: "totally_invalid_type", data: "whatever" }));
ws.send(JSON.stringify({ type: "another_fake", id: "fake-id" }));
// Then send a valid request
ws.send(JSON.stringify({
type: MT.CF_AGENT_USE_CHAT_REQUEST,
id: "req-after-invalid",
init: {
method: "POST",
body: JSON.stringify({
messages: [{
id: "invalid-type-1",
role: "user",
parts: [{ type: "text", text: "Still alive?" }]
}]
})
}
}));
const check = setInterval(() => {
if (received.find(m => m.type === MT.CF_AGENT_USE_CHAT_RESPONSE && m.done)) {
clearInterval(check);
ws.close();
resolve(received);
}
}, 50);
setTimeout(() => { clearInterval(check); ws.close(); resolve(received); }, 5000);
`
);
const doneMsg = messages.find(
(m) => m.type === MessageType.CF_AGENT_USE_CHAT_RESPONSE && m.done
);
expect(doneMsg).toBeTruthy();
});
test("LLM API error returns error response to client", async ({
page,
baseURL
}) => {
const room = crypto.randomUUID();
const badKeyUrl = `${baseURL!.replace("http", "ws")}/agents/bad-key-agent/${room}`;
const messages = await page.evaluate(
({ url, MT }) => {
return new Promise<Array<{ type: string; [k: string]: unknown }>>(
(resolve) => {
const ws = new WebSocket(url);
const received: Array<{ type: string; [k: string]: unknown }> = [];
ws.onmessage = (e) => {
try {
received.push(JSON.parse(e.data));
} catch {
// ignore
}
};
ws.onopen = () => {
ws.send(
JSON.stringify({
type: MT.CF_AGENT_USE_CHAT_REQUEST,
id: "req-bad-key",
init: {
method: "POST",
body: JSON.stringify({
messages: [
{
id: "bad-key-1",
role: "user",
parts: [{ type: "text", text: "This should fail" }]
}
]
})
}
})
);
};
// Wait for error or done
const check = setInterval(() => {
const errorOrDone = received.find(
(m) =>
m.type === MT.CF_AGENT_USE_CHAT_RESPONSE &&
(m.done || m.error)
);
if (errorOrDone) {
clearInterval(check);
setTimeout(() => {
ws.close();
resolve(received);
}, 200);
}
}, 100);
setTimeout(() => {
clearInterval(check);
ws.close();
resolve(received);
}, 15000);
}
);
},
{ url: badKeyUrl, MT: MessageType }
);
// Should have received an error response
const errorMsg = messages.find(
(m) =>
m.type === MessageType.CF_AGENT_USE_CHAT_RESPONSE && m.error === true
);
expect(errorMsg).toBeTruthy();
});
});
// ── Concurrency ────────────────────────────────────────────────────
test.describe("Concurrency e2e", () => {
test.beforeEach(async ({ page }) => {
await page.goto("about:blank");
});
test("concurrent requests: two requests complete without interference", async ({
page,
baseURL
}) => {
const { agentPath } = wsHelpers(baseURL!);
const room = crypto.randomUUID();
const result = await page.evaluate(
({ url, MT }) => {
return new Promise<{
req1Done: boolean;
req2Done: boolean;
messages: Array<{ type: string; [k: string]: unknown }>;
}>((resolve) => {
const ws = new WebSocket(url);
const messages: Array<{ type: string; [k: string]: unknown }> = [];
let req1Done = false;
let req2Done = false;
ws.onmessage = (e) => {
try {
const data = JSON.parse(e.data);
messages.push(data);
if (data.type === MT.CF_AGENT_USE_CHAT_RESPONSE && data.done) {
if (data.id === "req-conc-1") req1Done = true;
if (data.id === "req-conc-2") req2Done = true;
if (req1Done && req2Done) {
setTimeout(() => {
ws.close();
resolve({ req1Done, req2Done, messages });
}, 200);
}
}
} catch {
// ignore
}
};
ws.onopen = () => {
// Send two requests simultaneously
ws.send(
JSON.stringify({
type: MT.CF_AGENT_USE_CHAT_REQUEST,
id: "req-conc-1",
init: {
method: "POST",
body: JSON.stringify({
messages: [
{
id: "conc-1",
role: "user",
parts: [{ type: "text", text: "First concurrent" }]
}
]
})
}
})
);
ws.send(
JSON.stringify({
type: MT.CF_AGENT_USE_CHAT_REQUEST,
id: "req-conc-2",
init: {
method: "POST",
body: JSON.stringify({
messages: [
{
id: "conc-1",
role: "user",
parts: [{ type: "text", text: "First concurrent" }]
},
{
id: "conc-2",
role: "user",
parts: [{ type: "text", text: "Second concurrent" }]
}
]
})
}
})
);
};
setTimeout(() => {
ws.close();
resolve({ req1Done, req2Done, messages });
}, 10000);
});
},
{ url: agentPath(room), MT: MessageType }
);
expect(result.req1Done).toBe(true);
expect(result.req2Done).toBe(true);
});
});
// ── Large Messages ─────────────────────────────────────────────────
test.describe("Large messages e2e", () => {
test.beforeEach(async ({ page }) => {
await page.goto("about:blank");
});
test("large message persistence: 10KB message persists correctly", async ({
page,
baseURL
}) => {
const { agentPath } = wsHelpers(baseURL!);
const room = crypto.randomUUID();
// Generate a 10KB string
const largeText = "A".repeat(10 * 1024);
await connectAndRun(
page,
agentPath(room),
`
ws.send(JSON.stringify({
type: MT.CF_AGENT_USE_CHAT_REQUEST,
id: "req-large",
init: {
method: "POST",
body: JSON.stringify({
messages: [{
id: "large-1",
role: "user",
parts: [{ type: "text", text: "${largeText}" }]
}]
})
}
}));
const check = setInterval(() => {
if (received.find(m => m.type === MT.CF_AGENT_USE_CHAT_RESPONSE && m.done)) {
clearInterval(check);
ws.close();
resolve(received);
}
}, 50);
setTimeout(() => { clearInterval(check); ws.close(); resolve(received); }, 5000);
`
);
// Verify persistence
const res = await page.request.get(
`${baseURL}/agents/chat-agent/${room}/get-messages`
);
const data = await res.json();
const userMsg = data.find((m: { id: string }) => m.id === "large-1");
expect(userMsg).toBeTruthy();
const textPart = userMsg.parts.find(
(p: { type: string }) => p.type === "text"
);
expect(textPart.text.length).toBe(10 * 1024);
});
});
// ── Malformed Input ────────────────────────────────────────────────
test.describe("Malformed input e2e", () => {
test.beforeEach(async ({ page }) => {
await page.goto("about:blank");
});
test("empty messages array: agent handles gracefully", async ({
page,
baseURL
}) => {
const { agentPath } = wsHelpers(baseURL!);
const room = crypto.randomUUID();
const messages = await connectAndRun(
page,
agentPath(room),
`
ws.send(JSON.stringify({
type: MT.CF_AGENT_USE_CHAT_REQUEST,
id: "req-empty",
init: {
method: "POST",
body: JSON.stringify({ messages: [] })
}
}));
// Wait for a response or timeout
const check = setInterval(() => {
if (received.find(m => m.type === MT.CF_AGENT_USE_CHAT_RESPONSE && m.done)) {
clearInterval(check);
ws.close();
resolve(received);
}
}, 50);
setTimeout(() => { clearInterval(check); ws.close(); resolve(received); }, 3000);
`
);
// Agent should handle empty messages without crashing
// It may send a response or just not crash — either is fine
expect(messages).toBeDefined();
});
test("missing body in init: agent handles gracefully", async ({
page,
baseURL
}) => {
const { agentPath } = wsHelpers(baseURL!);
const room = crypto.randomUUID();
const messages = await connectAndRun(
page,
agentPath(room),
`
ws.send(JSON.stringify({
type: MT.CF_AGENT_USE_CHAT_REQUEST,
id: "req-no-body",
init: {
method: "POST"
}
}));
// Give the agent a moment to process (and not crash)
setTimeout(() => {
// Then send a valid request to prove the connection is still alive
ws.send(JSON.stringify({
type: MT.CF_AGENT_USE_CHAT_REQUEST,
id: "req-valid-after",
init: {
method: "POST",
body: JSON.stringify({
messages: [{
id: "after-no-body",
role: "user",
parts: [{ type: "text", text: "Recovery" }]
}]
})
}
}));
}, 500);
const check = setInterval(() => {
if (received.find(m => m.type === MT.CF_AGENT_USE_CHAT_RESPONSE && m.done)) {
clearInterval(check);
ws.close();
resolve(received);
}
}, 50);
setTimeout(() => { clearInterval(check); ws.close(); resolve(received); }, 5000);
`
);
// Connection should still be alive — the valid request should have completed
expect(messages).toBeDefined();
});
});
// ── Large Output ───────────────────────────────────────────────────
test.describe("Large tool output e2e", () => {
test.beforeEach(async ({ page }) => {
await page.goto("about:blank");
});
test("large message persists without crash", async ({ page, baseURL }) => {
const { agentPath } = wsHelpers(baseURL!);
const room = crypto.randomUUID();
// Send a message with a 100KB body to test persistence of large data
const largeText = "A".repeat(100_000);
await connectAndRun(
page,
agentPath(room),
`
ws.send(JSON.stringify({
type: MT.CF_AGENT_USE_CHAT_REQUEST,
id: "req-large-output",
init: {
method: "POST",
body: JSON.stringify({
messages: [{
id: "large-output-1",
role: "user",
parts: [{ type: "text", text: "${largeText}" }]
}]
})
}
}));
const check = setInterval(() => {
if (received.find(m => m.type === MT.CF_AGENT_USE_CHAT_RESPONSE && m.done)) {
clearInterval(check);
ws.close();
resolve(received);
}
}, 50);
setTimeout(() => { clearInterval(check); ws.close(); resolve(received); }, 5000);
`
);
// Verify the message persisted and is retrievable
const res = await page.request.get(
`${baseURL}/agents/chat-agent/${room}/get-messages`
);
expect(res.ok()).toBe(true);
const data = await res.json();
expect(data.length).toBeGreaterThanOrEqual(2); // user + assistant
// The user message should have the large text
const userMsg = data.find((m: { id: string }) => m.id === "large-output-1");
expect(userMsg).toBeTruthy();
});
test("3MB message: does not crash, degrades gracefully with compaction", async ({
page,
baseURL
}) => {
const { agentPath } = wsHelpers(baseURL!);
const room = crypto.randomUUID();
// First, send a normal message to establish the conversation
await connectAndRun(
page,
agentPath(room),
`
ws.send(JSON.stringify({
type: MT.CF_AGENT_USE_CHAT_REQUEST,
id: "req-3mb-setup",
init: {
method: "POST",
body: JSON.stringify({
messages: [{
id: "3mb-user-1",
role: "user",
parts: [{ type: "text", text: "Hello" }]
}]
})
}
}));
const check = setInterval(() => {
if (received.find(m => m.type === MT.CF_AGENT_USE_CHAT_RESPONSE && m.done)) {
clearInterval(check);
ws.close();
resolve(received);
}
}, 50);
setTimeout(() => { clearInterval(check); ws.close(); resolve(received); }, 5000);
`
);
// Now send a message set that includes a 3MB assistant message
// (simulating what would happen if a tool returned a huge result)
// We send it via CF_AGENT_CHAT_MESSAGES which goes through persistMessages
const threeMBText = "X".repeat(3_000_000);
await connectAndRun(
page,
agentPath(room),
`
// Send the 3MB message via CF_AGENT_CHAT_MESSAGES (setMessages path)
ws.send(JSON.stringify({
type: "cf_agent_chat_messages",
messages: [
{
id: "3mb-user-1",
role: "user",
parts: [{ type: "text", text: "Hello" }]
},
{
id: "3mb-assistant-huge",
role: "assistant",
parts: [
{ type: "text", text: "Here are the results:" },
{
type: "tool-bigQuery",
toolCallId: "call_3mb",
state: "output-available",
input: { query: "SELECT * FROM everything" },
output: "${threeMBText}"
}
]
}
]
}));
// Give it time to persist
setTimeout(() => { ws.close(); resolve(received); }, 1000);
`
);
// Verify the agent didn't crash -- messages are retrievable
const res = await page.request.get(
`${baseURL}/agents/chat-agent/${room}/get-messages`
);
expect(res.ok()).toBe(true);
const data = await res.json();
// Should have messages (didn't crash)
expect(data.length).toBeGreaterThanOrEqual(2);
// Find the huge assistant message
const hugeMsg = data.find(
(m: { id: string }) => m.id === "3mb-assistant-huge"
);
expect(hugeMsg).toBeTruthy();
// The tool output should be compacted (not the original 3MB)
const toolPart = hugeMsg.parts.find(
(p: { toolCallId?: string }) => p.toolCallId === "call_3mb"
);
expect(toolPart).toBeTruthy();
const output = toolPart.output as string;
// Should contain the compaction notice
expect(output).toContain("too large to persist");
expect(output).toContain("suggest re-running the tool");
expect(output).toContain("Preview:");
// Should be much smaller than 3MB
expect(output.length).toBeLessThan(10_000);
// The text part should be preserved (it was small)
const textPart = hugeMsg.parts.find(
(p: { type: string }) => p.type === "text"
);
expect(textPart).toBeTruthy();
expect(textPart.text).toBe("Here are the results:");
// Metadata should indicate compaction happened
if (hugeMsg.metadata) {
expect(hugeMsg.metadata.compactedToolOutputs).toContain("call_3mb");
}
// The conversation should still work -- send another message
const followUp = await connectAndRun(
page,
agentPath(room),
`
ws.send(JSON.stringify({
type: MT.CF_AGENT_USE_CHAT_REQUEST,
id: "req-3mb-followup",
init: {
method: "POST",
body: JSON.stringify({
messages: [
{
id: "3mb-user-1",
role: "user",
parts: [{ type: "text", text: "Hello" }]
},
{
id: "3mb-assistant-huge",
role: "assistant",
parts: [
{ type: "text", text: "Here are the results:" },
{
type: "tool-bigQuery",
toolCallId: "call_3mb",
state: "output-available",
input: { query: "SELECT * FROM everything" },
output: "compacted"
}
]
},
{
id: "3mb-user-2",
role: "user",
parts: [{ type: "text", text: "Thanks, what next?" }]
}
]
})
}
}));
const check = setInterval(() => {
if (received.find(m => m.type === MT.CF_AGENT_USE_CHAT_RESPONSE && m.done)) {
clearInterval(check);
ws.close();
resolve(received);
}
}, 50);
setTimeout(() => { clearInterval(check); ws.close(); resolve(received); }, 5000);
`
);
// Follow-up should succeed (agent still alive)
const doneMsg = followUp.find(
(m) => m.type === MessageType.CF_AGENT_USE_CHAT_RESPONSE && m.done
);
expect(doneMsg).toBeTruthy();
});
});