import { test, expect } from "@playwright/test"; // ── Helpers ────────────────────────────────────────────────────────── /** Message type constants (mirrors src/types.ts MessageType enum values) */ const MessageType = { CF_AGENT_CHAT_MESSAGES: "cf_agent_chat_messages", CF_AGENT_USE_CHAT_REQUEST: "cf_agent_use_chat_request", CF_AGENT_USE_CHAT_RESPONSE: "cf_agent_use_chat_response", CF_AGENT_CHAT_CLEAR: "cf_agent_chat_clear", CF_AGENT_STREAM_RESUMING: "cf_agent_stream_resuming", CF_AGENT_STREAM_RESUME_ACK: "cf_agent_stream_resume_ack" } as const; type WSMessage = { type: string; [key: string]: unknown; }; /** * Opens a real WebSocket to the agent inside the browser page context. * Returns helpers to send messages and collect responses. */ function wsHelpers(baseURL: string) { const agentPath = (room: string) => `${baseURL.replace("http", "ws")}/agents/chat-agent/${room}`; return { agentPath }; } /** * Connects a WebSocket inside page.evaluate and returns collected messages. * We run everything inside the browser because Playwright's native WS * support is for intercepting, not initiating raw connections. */ async function connectAndRun( page: import("@playwright/test").Page, wsUrl: string, actions: string // JS function body that receives (ws, resolve, reject, MessageType) ): Promise { return page.evaluate( ({ url, actions, MT }) => { return new Promise((resolve, reject) => { const ws = new WebSocket(url); const received: WSMessage[] = []; ws.onmessage = (e) => { try { received.push(JSON.parse(e.data as string)); } catch { // ignore non-JSON } }; ws.onerror = () => reject(new Error("WebSocket error")); ws.onopen = () => { try { // Execute the caller-provided action script const fn = new Function( "ws", "resolve", "reject", "received", "MT", actions ); fn(ws, resolve, reject, received, MT); } catch (err) { reject(err); } }; }); }, { url: wsUrl, actions, MT: MessageType } ); } // ── Tests ──────────────────────────────────────────────────────────── test.describe("AIChatAgent e2e", () => { test.beforeEach(async ({ page }) => { // Navigate to a blank page so we have a browser context for WebSockets await page.goto("about:blank"); }); test("chat message round-trip: send request, receive streamed response", async ({ page, baseURL }) => { const { agentPath } = wsHelpers(baseURL!); const room = crypto.randomUUID(); const messages = await connectAndRun( page, agentPath(room), ` const userMsg = { id: "user-1", role: "user", parts: [{ type: "text", text: "Hello" }] }; ws.send(JSON.stringify({ type: MT.CF_AGENT_USE_CHAT_REQUEST, id: "req-1", init: { method: "POST", body: JSON.stringify({ messages: [userMsg] }) } })); // Wait for the done signal const check = setInterval(() => { const doneMsg = received.find( m => m.type === MT.CF_AGENT_USE_CHAT_RESPONSE && m.done === true ); if (doneMsg) { clearInterval(check); ws.close(); resolve(received); } }, 50); // Timeout safety setTimeout(() => { clearInterval(check); ws.close(); resolve(received); }, 5000); ` ); // Note: The sender is excluded from CF_AGENT_CHAT_MESSAGES broadcasts // (by design — only other connections receive it). So we only check // for the stream response here. Multi-connection sync is tested separately. // Should have received stream response chunks const streamResponses = messages.filter( (m) => m.type === MessageType.CF_AGENT_USE_CHAT_RESPONSE ); expect(streamResponses.length).toBeGreaterThanOrEqual(2); // at least one chunk + done // Should have text-start, text-delta, text-end events const nonDoneResponses = streamResponses.filter( (m) => !m.done && typeof m.body === "string" && (m.body as string).trim() ); const bodies = nonDoneResponses.map((m) => JSON.parse(m.body as string)); const types = bodies.map((b: { type: string }) => b.type); expect(types).toContain("text-start"); expect(types).toContain("text-delta"); expect(types).toContain("text-end"); // The text-delta should contain the response text const deltas = bodies.filter( (b: { type: string }) => b.type === "text-delta" ); const fullText = deltas.map((d: { delta: string }) => d.delta).join(""); expect(fullText).toBe("Hello from e2e agent!"); // The final message should be done=true const lastMsg = streamResponses[streamResponses.length - 1]; expect(lastMsg.done).toBe(true); }); test("message persistence: messages survive reconnection", async ({ page, baseURL }) => { const { agentPath } = wsHelpers(baseURL!); const room = crypto.randomUUID(); // First connection: send a message await connectAndRun( page, agentPath(room), ` const userMsg = { id: "persist-1", role: "user", parts: [{ type: "text", text: "Remember me" }] }; ws.send(JSON.stringify({ type: MT.CF_AGENT_USE_CHAT_REQUEST, id: "req-persist", init: { method: "POST", body: JSON.stringify({ messages: [userMsg] }) } })); const check = setInterval(() => { if (received.find(m => m.type === MT.CF_AGENT_USE_CHAT_RESPONSE && m.done)) { clearInterval(check); ws.close(); resolve(received); } }, 50); setTimeout(() => { clearInterval(check); ws.close(); resolve(received); }, 5000); ` ); // Verify via HTTP GET /get-messages const res = await page.request.get( `${baseURL}/agents/chat-agent/${room}/get-messages` ); expect(res.ok()).toBe(true); const persisted = await res.json(); expect(Array.isArray(persisted)).toBe(true); expect(persisted.length).toBeGreaterThanOrEqual(2); // user + assistant // User message should be persisted const userMsg = persisted.find((m: { id: string }) => m.id === "persist-1"); expect(userMsg).toBeTruthy(); expect(userMsg.role).toBe("user"); // Assistant response should be persisted const assistantMsgs = persisted.filter( (m: { role: string }) => m.role === "assistant" ); expect(assistantMsgs.length).toBeGreaterThanOrEqual(1); }); test("clear history: clears all messages and notifies other connections", async ({ page, baseURL }) => { const { agentPath } = wsHelpers(baseURL!); const room = crypto.randomUUID(); const wsUrl = agentPath(room); // Send a message first await connectAndRun( page, wsUrl, ` ws.send(JSON.stringify({ type: MT.CF_AGENT_USE_CHAT_REQUEST, id: "req-clear", init: { method: "POST", body: JSON.stringify({ messages: [{ id: "clear-msg-1", role: "user", parts: [{ type: "text", text: "About to be cleared" }] }] }) } })); const check = setInterval(() => { if (received.find(m => m.type === MT.CF_AGENT_USE_CHAT_RESPONSE && m.done)) { clearInterval(check); ws.close(); resolve(received); } }, 50); setTimeout(() => { clearInterval(check); ws.close(); resolve(received); }, 5000); ` ); // Verify messages exist const before = await page.request.get( `${baseURL}/agents/chat-agent/${room}/get-messages` ); const beforeData = await before.json(); expect(beforeData.length).toBeGreaterThanOrEqual(1); // Send clear from a new connection await connectAndRun( page, wsUrl, ` ws.send(JSON.stringify({ type: MT.CF_AGENT_CHAT_CLEAR })); // Wait a bit for the clear to process setTimeout(() => { ws.close(); resolve(received); }, 500); ` ); // Verify messages are cleared const after = await page.request.get( `${baseURL}/agents/chat-agent/${room}/get-messages` ); const afterData = await after.json(); expect(afterData.length).toBe(0); }); test("multi-connection sync: second connection receives broadcast", async ({ page, baseURL }) => { const { agentPath } = wsHelpers(baseURL!); const room = crypto.randomUUID(); const wsUrl = agentPath(room); // Open two WebSocket connections and send from the first. // The second should receive the CF_AGENT_CHAT_MESSAGES broadcast // AND the streamed response. const result = await page.evaluate( ({ url, MT }) => { return new Promise<{ ws1Messages: Array<{ type: string; [k: string]: unknown }>; ws2Messages: Array<{ type: string; [k: string]: unknown }>; }>((resolve) => { const ws1Messages: Array<{ type: string; [k: string]: unknown }> = []; const ws2Messages: Array<{ type: string; [k: string]: unknown }> = []; const ws1 = new WebSocket(url); const ws2 = new WebSocket(url); let ws1Open = false; let ws2Open = false; function maybeSend() { if (!ws1Open || !ws2Open) return; // Give connections a moment to fully register setTimeout(() => { ws1.send( JSON.stringify({ type: MT.CF_AGENT_USE_CHAT_REQUEST, id: "req-sync", init: { method: "POST", body: JSON.stringify({ messages: [ { id: "sync-1", role: "user", parts: [{ type: "text", text: "Sync test" }] } ] }) } }) ); }, 100); } ws1.onopen = () => { ws1Open = true; maybeSend(); }; ws2.onopen = () => { ws2Open = true; maybeSend(); }; ws1.onmessage = (e) => { try { ws1Messages.push(JSON.parse(e.data)); } catch {} }; ws2.onmessage = (e) => { try { ws2Messages.push(JSON.parse(e.data)); } catch {} }; // Wait for stream to complete on ws1 const check = setInterval(() => { const done = ws1Messages.find( (m) => m.type === MT.CF_AGENT_USE_CHAT_RESPONSE && m.done === true ); if (done) { clearInterval(check); // Give ws2 a moment to receive broadcasts setTimeout(() => { ws1.close(); ws2.close(); resolve({ ws1Messages, ws2Messages }); }, 200); } }, 50); setTimeout(() => { clearInterval(check); ws1.close(); ws2.close(); resolve({ ws1Messages, ws2Messages }); }, 8000); }); }, { url: wsUrl, MT: MessageType } ); // ws2 should have received the chat messages broadcast const ws2ChatMsgs = result.ws2Messages.filter( (m) => m.type === MessageType.CF_AGENT_CHAT_MESSAGES ); expect(ws2ChatMsgs.length).toBeGreaterThanOrEqual(1); // ws2 should have received the stream response (it's broadcast to all) const ws2StreamResponses = result.ws2Messages.filter( (m) => m.type === MessageType.CF_AGENT_USE_CHAT_RESPONSE ); expect(ws2StreamResponses.length).toBeGreaterThanOrEqual(1); }); test("separate rooms are isolated", async ({ page, baseURL }) => { const { agentPath } = wsHelpers(baseURL!); const roomA = crypto.randomUUID(); const roomB = crypto.randomUUID(); // Send a message to room A await connectAndRun( page, agentPath(roomA), ` ws.send(JSON.stringify({ type: MT.CF_AGENT_USE_CHAT_REQUEST, id: "req-iso-a", init: { method: "POST", body: JSON.stringify({ messages: [{ id: "iso-a", role: "user", parts: [{ type: "text", text: "Room A" }] }] }) } })); const check = setInterval(() => { if (received.find(m => m.type === MT.CF_AGENT_USE_CHAT_RESPONSE && m.done)) { clearInterval(check); ws.close(); resolve(received); } }, 50); setTimeout(() => { clearInterval(check); ws.close(); resolve(received); }, 5000); ` ); // Room A should have messages const resA = await page.request.get( `${baseURL}/agents/chat-agent/${roomA}/get-messages` ); const dataA = await resA.json(); expect(dataA.length).toBeGreaterThanOrEqual(2); // Room B should be empty const resB = await page.request.get( `${baseURL}/agents/chat-agent/${roomB}/get-messages` ); const dataB = await resB.json(); expect(dataB.length).toBe(0); }); test("multiple messages accumulate in conversation", async ({ page, baseURL }) => { const { agentPath } = wsHelpers(baseURL!); const room = crypto.randomUUID(); const wsUrl = agentPath(room); // Send first message await connectAndRun( page, wsUrl, ` ws.send(JSON.stringify({ type: MT.CF_AGENT_USE_CHAT_REQUEST, id: "req-multi-1", init: { method: "POST", body: JSON.stringify({ messages: [{ id: "multi-1", role: "user", parts: [{ type: "text", text: "First" }] }] }) } })); const check = setInterval(() => { if (received.find(m => m.type === MT.CF_AGENT_USE_CHAT_RESPONSE && m.done)) { clearInterval(check); ws.close(); resolve(received); } }, 50); setTimeout(() => { clearInterval(check); ws.close(); resolve(received); }, 5000); ` ); // Send second message (include first message in the array, as the client would) await connectAndRun( page, wsUrl, ` ws.send(JSON.stringify({ type: MT.CF_AGENT_USE_CHAT_REQUEST, id: "req-multi-2", init: { method: "POST", body: JSON.stringify({ messages: [ { id: "multi-1", role: "user", parts: [{ type: "text", text: "First" }] }, { id: "multi-2", role: "user", parts: [{ type: "text", text: "Second" }] } ] }) } })); const check = setInterval(() => { if (received.find(m => m.type === MT.CF_AGENT_USE_CHAT_RESPONSE && m.done)) { clearInterval(check); ws.close(); resolve(received); } }, 50); setTimeout(() => { clearInterval(check); ws.close(); resolve(received); }, 5000); ` ); // Verify all messages persisted const res = await page.request.get( `${baseURL}/agents/chat-agent/${room}/get-messages` ); const data = await res.json(); // Should have: multi-1 (user), assistant-1, multi-2 (user), assistant-2 const userMsgs = data.filter((m: { role: string }) => m.role === "user"); const assistantMsgs = data.filter( (m: { role: string }) => m.role === "assistant" ); expect(userMsgs.length).toBe(2); expect(assistantMsgs.length).toBe(2); }); test("request cancellation", async ({ page, baseURL }) => { const { agentPath } = wsHelpers(baseURL!); const room = crypto.randomUUID(); // Send a request and immediately cancel it const messages = await connectAndRun( page, agentPath(room), ` ws.send(JSON.stringify({ type: MT.CF_AGENT_USE_CHAT_REQUEST, id: "req-cancel", init: { method: "POST", body: JSON.stringify({ messages: [{ id: "cancel-1", role: "user", parts: [{ type: "text", text: "Cancel me" }] }] }) } })); // Cancel immediately ws.send(JSON.stringify({ type: "cf_agent_chat_request_cancel", id: "req-cancel" })); // Wait a bit then close setTimeout(() => { ws.close(); resolve(received); }, 1000); ` ); // The test passes if no errors are thrown — cancellation is graceful // We might or might not get a response depending on timing expect(messages).toBeDefined(); }); }); // ── Stream Resumption ────────────────────────────────────────────── test.describe("Stream resumption e2e", () => { test.beforeEach(async ({ page }) => { await page.goto("about:blank"); }); test("resume mid-stream: reconnecting client receives CF_AGENT_STREAM_RESUMING", async ({ page, baseURL }) => { const room = crypto.randomUUID(); const slowUrl = `${baseURL!.replace("http", "ws")}/agents/slow-agent/${room}`; // Connect, send a message to start a slow stream, then disconnect mid-stream await page.evaluate( ({ url, MT }) => { return new Promise((resolve) => { const ws = new WebSocket(url); ws.onopen = () => { ws.send( JSON.stringify({ type: MT.CF_AGENT_USE_CHAT_REQUEST, id: "req-slow", init: { method: "POST", body: JSON.stringify({ messages: [ { id: "slow-1", role: "user", parts: [{ type: "text", text: "Go slow" }] } ] }) } }) ); // Disconnect after 600ms (mid-stream — SlowAgent sends chunks every 400ms) setTimeout(() => { ws.close(); resolve(); }, 600); }; }); }, { url: slowUrl, MT: MessageType } ); // Wait a beat for the server to register the disconnect await new Promise((r) => setTimeout(r, 200)); // Reconnect — should receive CF_AGENT_STREAM_RESUMING const resumeResult = await page.evaluate( ({ url, MT }) => { return new Promise<{ messages: Array<{ type: string; [k: string]: unknown }>; gotResuming: boolean; gotResumeChunks: boolean; }>((resolve) => { const ws = new WebSocket(url); const messages: Array<{ type: string; [k: string]: unknown }> = []; let gotResuming = false; let gotResumeChunks = false; ws.onmessage = (e) => { try { const data = JSON.parse(e.data); messages.push(data); if (data.type === MT.CF_AGENT_STREAM_RESUMING) { gotResuming = true; // Send ACK ws.send( JSON.stringify({ type: MT.CF_AGENT_STREAM_RESUME_ACK, id: data.id }) ); } if ( data.type === MT.CF_AGENT_USE_CHAT_RESPONSE && !data.done && data.body ) { gotResumeChunks = true; } if (data.type === MT.CF_AGENT_USE_CHAT_RESPONSE && data.done) { setTimeout(() => { ws.close(); resolve({ messages, gotResuming, gotResumeChunks }); }, 200); } } catch { // ignore } }; setTimeout(() => { ws.close(); resolve({ messages, gotResuming, gotResumeChunks }); }, 10000); }); }, { url: slowUrl, MT: MessageType } ); // Should have received the resuming notification expect(resumeResult.gotResuming).toBe(true); // Should have received replayed chunks after ACK expect(resumeResult.gotResumeChunks).toBe(true); }); test("no resume after completion: reconnecting gets no CF_AGENT_STREAM_RESUMING", async ({ page, baseURL }) => { const { agentPath } = wsHelpers(baseURL!); const room = crypto.randomUUID(); // Send a message and wait for completion await connectAndRun( page, agentPath(room), ` ws.send(JSON.stringify({ type: MT.CF_AGENT_USE_CHAT_REQUEST, id: "req-done-first", init: { method: "POST", body: JSON.stringify({ messages: [{ id: "done-1", role: "user", parts: [{ type: "text", text: "Quick" }] }] }) } })); const check = setInterval(() => { if (received.find(m => m.type === MT.CF_AGENT_USE_CHAT_RESPONSE && m.done)) { clearInterval(check); ws.close(); resolve(received); } }, 50); setTimeout(() => { clearInterval(check); ws.close(); resolve(received); }, 5000); ` ); // Reconnect — should NOT get CF_AGENT_STREAM_RESUMING const reconnectMsgs = await connectAndRun( page, agentPath(room), ` // Wait and see what messages arrive setTimeout(() => { ws.close(); resolve(received); }, 1000); ` ); const resumeMsg = reconnectMsgs.find( (m) => m.type === MessageType.CF_AGENT_STREAM_RESUMING ); expect(resumeMsg).toBeUndefined(); }); test("client-initiated resume: CF_AGENT_STREAM_RESUME_REQUEST triggers resume after handler is ready", async ({ page, baseURL }) => { const room = crypto.randomUUID(); const slowUrl = `${baseURL!.replace("http", "ws")}/agents/slow-agent/${room}`; // Start a slow stream and disconnect mid-stream await page.evaluate( ({ url, MT }) => { return new Promise((resolve) => { const ws = new WebSocket(url); ws.onopen = () => { ws.send( JSON.stringify({ type: MT.CF_AGENT_USE_CHAT_REQUEST, id: "req-client-resume", init: { method: "POST", body: JSON.stringify({ messages: [ { id: "cr-1", role: "user", parts: [{ type: "text", text: "Go slow" }] } ] }) } }) ); setTimeout(() => { ws.close(); resolve(); }, 600); }; }); }, { url: slowUrl, MT: MessageType } ); await new Promise((r) => setTimeout(r, 200)); // Reconnect and explicitly send CF_AGENT_STREAM_RESUME_REQUEST // (simulating what the client does after useEffect registers the handler) const result = await page.evaluate( ({ url, MT }) => { return new Promise<{ gotResuming: boolean; gotChunks: boolean; replayFlags: boolean[]; }>((resolve) => { const ws = new WebSocket(url); let gotResuming = false; let gotChunks = false; const replayFlags: boolean[] = []; ws.onmessage = (e) => { try { const data = JSON.parse(e.data); if (data.type === MT.CF_AGENT_STREAM_RESUMING) { gotResuming = true; ws.send( JSON.stringify({ type: MT.CF_AGENT_STREAM_RESUME_ACK, id: data.id }) ); } if (data.type === "cf_agent_use_chat_response") { if (!data.done && data.body) { gotChunks = true; replayFlags.push(data.replay === true); } if (data.done) { replayFlags.push(data.replay === true); setTimeout(() => { ws.close(); resolve({ gotResuming, gotChunks, replayFlags }); }, 200); } } } catch { // ignore } }; ws.onopen = () => { // Send resume request after handler is ready // (this is what useAgentChat does in its useEffect) ws.send( JSON.stringify({ type: "cf_agent_stream_resume_request" }) ); }; setTimeout(() => { ws.close(); resolve({ gotResuming, gotChunks, replayFlags }); }, 10000); }); }, { url: slowUrl, MT: MessageType } ); // Should have received resume notification via the request expect(result.gotResuming).toBe(true); // Should have received chunks (mix of replayed from DB + live from ongoing stream) expect(result.gotChunks).toBe(true); // At least some chunks should have replay=true (the ones replayed from DB) expect(result.replayFlags.length).toBeGreaterThan(0); expect(result.replayFlags.some((f) => f === true)).toBe(true); }); test("replayed chunks have replay=true flag", async ({ page, baseURL }) => { const room = crypto.randomUUID(); const slowUrl = `${baseURL!.replace("http", "ws")}/agents/slow-agent/${room}`; // Start stream, disconnect mid-stream, reconnect await page.evaluate( ({ url, MT }) => { return new Promise((resolve) => { const ws = new WebSocket(url); ws.onopen = () => { ws.send( JSON.stringify({ type: MT.CF_AGENT_USE_CHAT_REQUEST, id: "req-replay-flag", init: { method: "POST", body: JSON.stringify({ messages: [ { id: "rf-1", role: "user", parts: [{ type: "text", text: "Go slow" }] } ] }) } }) ); setTimeout(() => { ws.close(); resolve(); }, 600); }; }); }, { url: slowUrl, MT: MessageType } ); await new Promise((r) => setTimeout(r, 200)); // Reconnect and check replay flags const result = await page.evaluate( ({ url, MT }) => { return new Promise<{ chunks: Array<{ body?: string; done: boolean; replay?: boolean; }>; }>((resolve) => { const ws = new WebSocket(url); const chunks: Array<{ body?: string; done: boolean; replay?: boolean; }> = []; ws.onmessage = (e) => { try { const data = JSON.parse(e.data); if (data.type === MT.CF_AGENT_STREAM_RESUMING) { ws.send( JSON.stringify({ type: MT.CF_AGENT_STREAM_RESUME_ACK, id: data.id }) ); } if (data.type === "cf_agent_use_chat_response") { chunks.push({ body: data.body, done: data.done, replay: data.replay }); if (data.done) { setTimeout(() => { ws.close(); resolve({ chunks }); }, 200); } } } catch { // ignore } }; setTimeout(() => { ws.close(); resolve({ chunks }); }, 10000); }); }, { url: slowUrl, MT: MessageType } ); // Should have received chunks expect(result.chunks.length).toBeGreaterThan(0); // Replayed chunks (from DB) have replay=true, // live chunks (from ongoing stream) don't. // At least some should be replayed. const replayedChunks = result.chunks.filter((c) => c.replay === true); expect(replayedChunks.length).toBeGreaterThan(0); }); }); // ── Custom Body ──────────────────────────────────────────────────── test.describe("Custom body forwarding e2e", () => { test.beforeEach(async ({ page }) => { await page.goto("about:blank"); }); test("custom fields in request body are forwarded to onChatMessage", async ({ page, baseURL }) => { const { agentPath } = wsHelpers(baseURL!); const room = crypto.randomUUID(); // Send a request with custom body fields await connectAndRun( page, agentPath(room), ` ws.send(JSON.stringify({ type: MT.CF_AGENT_USE_CHAT_REQUEST, id: "req-custom-body", init: { method: "POST", body: JSON.stringify({ messages: [{ id: "cb-1", role: "user", parts: [{ type: "text", text: "With custom body" }] }], customField: "test-value", nestedData: { key: 123 } }) } })); const check = setInterval(() => { if (received.find(m => m.type === MT.CF_AGENT_USE_CHAT_RESPONSE && m.done)) { clearInterval(check); ws.close(); resolve(received); } }, 50); setTimeout(() => { clearInterval(check); ws.close(); resolve(received); }, 5000); ` ); // If the agent didn't crash and responded, the custom body was received. // The ChatAgent doesn't use options.body, but the fact that it responded // means the parsing didn't fail — custom fields are silently passed through. const res = await page.request.get( `${baseURL}/agents/chat-agent/${room}/get-messages` ); const data = await res.json(); expect(data.length).toBeGreaterThanOrEqual(2); }); }); // ── Error Handling ───────────────────────────────────────────────── test.describe("Error handling e2e", () => { test.beforeEach(async ({ page }) => { await page.goto("about:blank"); }); test("malformed JSON on WebSocket does not crash connection", async ({ page, baseURL }) => { const { agentPath } = wsHelpers(baseURL!); const room = crypto.randomUUID(); const messages = await connectAndRun( page, agentPath(room), ` // Send garbage text ws.send("this is not json at all!!!"); ws.send("{broken json"); ws.send(""); // Then send a valid request ws.send(JSON.stringify({ type: MT.CF_AGENT_USE_CHAT_REQUEST, id: "req-after-garbage", init: { method: "POST", body: JSON.stringify({ messages: [{ id: "garbage-1", role: "user", parts: [{ type: "text", text: "Still works?" }] }] }) } })); const check = setInterval(() => { if (received.find(m => m.type === MT.CF_AGENT_USE_CHAT_RESPONSE && m.done)) { clearInterval(check); ws.close(); resolve(received); } }, 50); setTimeout(() => { clearInterval(check); ws.close(); resolve(received); }, 5000); ` ); // Should have received a valid response despite the garbage const doneMsg = messages.find( (m) => m.type === MessageType.CF_AGENT_USE_CHAT_RESPONSE && m.done ); expect(doneMsg).toBeTruthy(); }); test("invalid message type does not crash connection", async ({ page, baseURL }) => { const { agentPath } = wsHelpers(baseURL!); const room = crypto.randomUUID(); const messages = await connectAndRun( page, agentPath(room), ` // Send valid JSON with unknown type ws.send(JSON.stringify({ type: "totally_invalid_type", data: "whatever" })); ws.send(JSON.stringify({ type: "another_fake", id: "fake-id" })); // Then send a valid request ws.send(JSON.stringify({ type: MT.CF_AGENT_USE_CHAT_REQUEST, id: "req-after-invalid", init: { method: "POST", body: JSON.stringify({ messages: [{ id: "invalid-type-1", role: "user", parts: [{ type: "text", text: "Still alive?" }] }] }) } })); const check = setInterval(() => { if (received.find(m => m.type === MT.CF_AGENT_USE_CHAT_RESPONSE && m.done)) { clearInterval(check); ws.close(); resolve(received); } }, 50); setTimeout(() => { clearInterval(check); ws.close(); resolve(received); }, 5000); ` ); const doneMsg = messages.find( (m) => m.type === MessageType.CF_AGENT_USE_CHAT_RESPONSE && m.done ); expect(doneMsg).toBeTruthy(); }); test("LLM API error returns error response to client", async ({ page, baseURL }) => { const room = crypto.randomUUID(); const badKeyUrl = `${baseURL!.replace("http", "ws")}/agents/bad-key-agent/${room}`; const messages = await page.evaluate( ({ url, MT }) => { return new Promise>( (resolve) => { const ws = new WebSocket(url); const received: Array<{ type: string; [k: string]: unknown }> = []; ws.onmessage = (e) => { try { received.push(JSON.parse(e.data)); } catch { // ignore } }; ws.onopen = () => { ws.send( JSON.stringify({ type: MT.CF_AGENT_USE_CHAT_REQUEST, id: "req-bad-key", init: { method: "POST", body: JSON.stringify({ messages: [ { id: "bad-key-1", role: "user", parts: [{ type: "text", text: "This should fail" }] } ] }) } }) ); }; // Wait for error or done const check = setInterval(() => { const errorOrDone = received.find( (m) => m.type === MT.CF_AGENT_USE_CHAT_RESPONSE && (m.done || m.error) ); if (errorOrDone) { clearInterval(check); setTimeout(() => { ws.close(); resolve(received); }, 200); } }, 100); setTimeout(() => { clearInterval(check); ws.close(); resolve(received); }, 15000); } ); }, { url: badKeyUrl, MT: MessageType } ); // Should have received an error response const errorMsg = messages.find( (m) => m.type === MessageType.CF_AGENT_USE_CHAT_RESPONSE && m.error === true ); expect(errorMsg).toBeTruthy(); }); }); // ── Concurrency ──────────────────────────────────────────────────── test.describe("Concurrency e2e", () => { test.beforeEach(async ({ page }) => { await page.goto("about:blank"); }); test("concurrent requests: two requests complete without interference", async ({ page, baseURL }) => { const { agentPath } = wsHelpers(baseURL!); const room = crypto.randomUUID(); const result = await page.evaluate( ({ url, MT }) => { return new Promise<{ req1Done: boolean; req2Done: boolean; messages: Array<{ type: string; [k: string]: unknown }>; }>((resolve) => { const ws = new WebSocket(url); const messages: Array<{ type: string; [k: string]: unknown }> = []; let req1Done = false; let req2Done = false; ws.onmessage = (e) => { try { const data = JSON.parse(e.data); messages.push(data); if (data.type === MT.CF_AGENT_USE_CHAT_RESPONSE && data.done) { if (data.id === "req-conc-1") req1Done = true; if (data.id === "req-conc-2") req2Done = true; if (req1Done && req2Done) { setTimeout(() => { ws.close(); resolve({ req1Done, req2Done, messages }); }, 200); } } } catch { // ignore } }; ws.onopen = () => { // Send two requests simultaneously ws.send( JSON.stringify({ type: MT.CF_AGENT_USE_CHAT_REQUEST, id: "req-conc-1", init: { method: "POST", body: JSON.stringify({ messages: [ { id: "conc-1", role: "user", parts: [{ type: "text", text: "First concurrent" }] } ] }) } }) ); ws.send( JSON.stringify({ type: MT.CF_AGENT_USE_CHAT_REQUEST, id: "req-conc-2", init: { method: "POST", body: JSON.stringify({ messages: [ { id: "conc-1", role: "user", parts: [{ type: "text", text: "First concurrent" }] }, { id: "conc-2", role: "user", parts: [{ type: "text", text: "Second concurrent" }] } ] }) } }) ); }; setTimeout(() => { ws.close(); resolve({ req1Done, req2Done, messages }); }, 10000); }); }, { url: agentPath(room), MT: MessageType } ); expect(result.req1Done).toBe(true); expect(result.req2Done).toBe(true); }); }); // ── Large Messages ───────────────────────────────────────────────── test.describe("Large messages e2e", () => { test.beforeEach(async ({ page }) => { await page.goto("about:blank"); }); test("large message persistence: 10KB message persists correctly", async ({ page, baseURL }) => { const { agentPath } = wsHelpers(baseURL!); const room = crypto.randomUUID(); // Generate a 10KB string const largeText = "A".repeat(10 * 1024); await connectAndRun( page, agentPath(room), ` ws.send(JSON.stringify({ type: MT.CF_AGENT_USE_CHAT_REQUEST, id: "req-large", init: { method: "POST", body: JSON.stringify({ messages: [{ id: "large-1", role: "user", parts: [{ type: "text", text: "${largeText}" }] }] }) } })); const check = setInterval(() => { if (received.find(m => m.type === MT.CF_AGENT_USE_CHAT_RESPONSE && m.done)) { clearInterval(check); ws.close(); resolve(received); } }, 50); setTimeout(() => { clearInterval(check); ws.close(); resolve(received); }, 5000); ` ); // Verify persistence const res = await page.request.get( `${baseURL}/agents/chat-agent/${room}/get-messages` ); const data = await res.json(); const userMsg = data.find((m: { id: string }) => m.id === "large-1"); expect(userMsg).toBeTruthy(); const textPart = userMsg.parts.find( (p: { type: string }) => p.type === "text" ); expect(textPart.text.length).toBe(10 * 1024); }); }); // ── Malformed Input ──────────────────────────────────────────────── test.describe("Malformed input e2e", () => { test.beforeEach(async ({ page }) => { await page.goto("about:blank"); }); test("empty messages array: agent handles gracefully", async ({ page, baseURL }) => { const { agentPath } = wsHelpers(baseURL!); const room = crypto.randomUUID(); const messages = await connectAndRun( page, agentPath(room), ` ws.send(JSON.stringify({ type: MT.CF_AGENT_USE_CHAT_REQUEST, id: "req-empty", init: { method: "POST", body: JSON.stringify({ messages: [] }) } })); // Wait for a response or timeout const check = setInterval(() => { if (received.find(m => m.type === MT.CF_AGENT_USE_CHAT_RESPONSE && m.done)) { clearInterval(check); ws.close(); resolve(received); } }, 50); setTimeout(() => { clearInterval(check); ws.close(); resolve(received); }, 3000); ` ); // Agent should handle empty messages without crashing // It may send a response or just not crash — either is fine expect(messages).toBeDefined(); }); test("missing body in init: agent handles gracefully", async ({ page, baseURL }) => { const { agentPath } = wsHelpers(baseURL!); const room = crypto.randomUUID(); const messages = await connectAndRun( page, agentPath(room), ` ws.send(JSON.stringify({ type: MT.CF_AGENT_USE_CHAT_REQUEST, id: "req-no-body", init: { method: "POST" } })); // Give the agent a moment to process (and not crash) setTimeout(() => { // Then send a valid request to prove the connection is still alive ws.send(JSON.stringify({ type: MT.CF_AGENT_USE_CHAT_REQUEST, id: "req-valid-after", init: { method: "POST", body: JSON.stringify({ messages: [{ id: "after-no-body", role: "user", parts: [{ type: "text", text: "Recovery" }] }] }) } })); }, 500); const check = setInterval(() => { if (received.find(m => m.type === MT.CF_AGENT_USE_CHAT_RESPONSE && m.done)) { clearInterval(check); ws.close(); resolve(received); } }, 50); setTimeout(() => { clearInterval(check); ws.close(); resolve(received); }, 5000); ` ); // Connection should still be alive — the valid request should have completed expect(messages).toBeDefined(); }); }); // ── Large Output ─────────────────────────────────────────────────── test.describe("Large tool output e2e", () => { test.beforeEach(async ({ page }) => { await page.goto("about:blank"); }); test("large message persists without crash", async ({ page, baseURL }) => { const { agentPath } = wsHelpers(baseURL!); const room = crypto.randomUUID(); // Send a message with a 100KB body to test persistence of large data const largeText = "A".repeat(100_000); await connectAndRun( page, agentPath(room), ` ws.send(JSON.stringify({ type: MT.CF_AGENT_USE_CHAT_REQUEST, id: "req-large-output", init: { method: "POST", body: JSON.stringify({ messages: [{ id: "large-output-1", role: "user", parts: [{ type: "text", text: "${largeText}" }] }] }) } })); const check = setInterval(() => { if (received.find(m => m.type === MT.CF_AGENT_USE_CHAT_RESPONSE && m.done)) { clearInterval(check); ws.close(); resolve(received); } }, 50); setTimeout(() => { clearInterval(check); ws.close(); resolve(received); }, 5000); ` ); // Verify the message persisted and is retrievable const res = await page.request.get( `${baseURL}/agents/chat-agent/${room}/get-messages` ); expect(res.ok()).toBe(true); const data = await res.json(); expect(data.length).toBeGreaterThanOrEqual(2); // user + assistant // The user message should have the large text const userMsg = data.find((m: { id: string }) => m.id === "large-output-1"); expect(userMsg).toBeTruthy(); }); test("3MB message: does not crash, degrades gracefully with compaction", async ({ page, baseURL }) => { const { agentPath } = wsHelpers(baseURL!); const room = crypto.randomUUID(); // First, send a normal message to establish the conversation await connectAndRun( page, agentPath(room), ` ws.send(JSON.stringify({ type: MT.CF_AGENT_USE_CHAT_REQUEST, id: "req-3mb-setup", init: { method: "POST", body: JSON.stringify({ messages: [{ id: "3mb-user-1", role: "user", parts: [{ type: "text", text: "Hello" }] }] }) } })); const check = setInterval(() => { if (received.find(m => m.type === MT.CF_AGENT_USE_CHAT_RESPONSE && m.done)) { clearInterval(check); ws.close(); resolve(received); } }, 50); setTimeout(() => { clearInterval(check); ws.close(); resolve(received); }, 5000); ` ); // Now send a message set that includes a 3MB assistant message // (simulating what would happen if a tool returned a huge result) // We send it via CF_AGENT_CHAT_MESSAGES which goes through persistMessages const threeMBText = "X".repeat(3_000_000); await connectAndRun( page, agentPath(room), ` // Send the 3MB message via CF_AGENT_CHAT_MESSAGES (setMessages path) ws.send(JSON.stringify({ type: "cf_agent_chat_messages", messages: [ { id: "3mb-user-1", role: "user", parts: [{ type: "text", text: "Hello" }] }, { id: "3mb-assistant-huge", role: "assistant", parts: [ { type: "text", text: "Here are the results:" }, { type: "tool-bigQuery", toolCallId: "call_3mb", state: "output-available", input: { query: "SELECT * FROM everything" }, output: "${threeMBText}" } ] } ] })); // Give it time to persist setTimeout(() => { ws.close(); resolve(received); }, 1000); ` ); // Verify the agent didn't crash -- messages are retrievable const res = await page.request.get( `${baseURL}/agents/chat-agent/${room}/get-messages` ); expect(res.ok()).toBe(true); const data = await res.json(); // Should have messages (didn't crash) expect(data.length).toBeGreaterThanOrEqual(2); // Find the huge assistant message const hugeMsg = data.find( (m: { id: string }) => m.id === "3mb-assistant-huge" ); expect(hugeMsg).toBeTruthy(); // The tool output should be compacted (not the original 3MB) const toolPart = hugeMsg.parts.find( (p: { toolCallId?: string }) => p.toolCallId === "call_3mb" ); expect(toolPart).toBeTruthy(); const output = toolPart.output as string; // Should contain the compaction notice expect(output).toContain("too large to persist"); expect(output).toContain("suggest re-running the tool"); expect(output).toContain("Preview:"); // Should be much smaller than 3MB expect(output.length).toBeLessThan(10_000); // The text part should be preserved (it was small) const textPart = hugeMsg.parts.find( (p: { type: string }) => p.type === "text" ); expect(textPart).toBeTruthy(); expect(textPart.text).toBe("Here are the results:"); // Metadata should indicate compaction happened if (hugeMsg.metadata) { expect(hugeMsg.metadata.compactedToolOutputs).toContain("call_3mb"); } // The conversation should still work -- send another message const followUp = await connectAndRun( page, agentPath(room), ` ws.send(JSON.stringify({ type: MT.CF_AGENT_USE_CHAT_REQUEST, id: "req-3mb-followup", init: { method: "POST", body: JSON.stringify({ messages: [ { id: "3mb-user-1", role: "user", parts: [{ type: "text", text: "Hello" }] }, { id: "3mb-assistant-huge", role: "assistant", parts: [ { type: "text", text: "Here are the results:" }, { type: "tool-bigQuery", toolCallId: "call_3mb", state: "output-available", input: { query: "SELECT * FROM everything" }, output: "compacted" } ] }, { id: "3mb-user-2", role: "user", parts: [{ type: "text", text: "Thanks, what next?" }] } ] }) } })); const check = setInterval(() => { if (received.find(m => m.type === MT.CF_AGENT_USE_CHAT_RESPONSE && m.done)) { clearInterval(check); ws.close(); resolve(received); } }, 50); setTimeout(() => { clearInterval(check); ws.close(); resolve(received); }, 5000); ` ); // Follow-up should succeed (agent still alive) const doneMsg = followUp.find( (m) => m.type === MessageType.CF_AGENT_USE_CHAT_RESPONSE && m.done ); expect(doneMsg).toBeTruthy(); }); });