branch:
resumable-streaming.test.ts
37831 bytesRaw
import { env } from "cloudflare:workers";
import { describe, it, expect } from "vitest";
import { MessageType, type OutgoingMessage } from "../types";
import { connectChatWS, isUseChatResponseMessage } from "./test-utils";
import { getAgentByName } from "agents";
function isStreamResumingMessage(
m: unknown
): m is Extract<
OutgoingMessage,
{ type: MessageType.CF_AGENT_STREAM_RESUMING }
> {
return (
typeof m === "object" &&
m !== null &&
"type" in m &&
m.type === MessageType.CF_AGENT_STREAM_RESUMING
);
}
function collectMessages(ws: WebSocket): unknown[] {
const messages: unknown[] = [];
ws.addEventListener("message", (e: MessageEvent) => {
try {
messages.push(JSON.parse(e.data as string));
} catch {
messages.push(e.data);
}
});
return messages;
}
async function waitFor(
condition: () => boolean | Promise<boolean>,
timeoutMs = 2000,
intervalMs = 50
): Promise<void> {
const deadline = Date.now() + timeoutMs;
while (!(await condition())) {
if (Date.now() >= deadline) {
throw new Error(`waitFor: condition not met within ${timeoutMs}ms`);
}
await new Promise((r) => setTimeout(r, intervalMs));
}
}
describe("Resumable Streaming", () => {
describe("Stream lifecycle", () => {
it("stores stream metadata when starting a stream", async () => {
const room = crypto.randomUUID();
const { ws } = await connectChatWS(`/agents/test-chat-agent/${room}`);
await new Promise((r) => setTimeout(r, 50));
const agentStub = await getAgentByName(env.TestChatAgent, room);
const streamId = await agentStub.testStartStream("req-123");
expect(streamId).toBeDefined();
expect(typeof streamId).toBe("string");
const metadata = await agentStub.getStreamMetadata(streamId);
expect(metadata).toBeDefined();
expect(metadata?.status).toBe("streaming");
expect(metadata?.request_id).toBe("req-123");
ws.close(1000);
});
it("stores stream chunks in batches", async () => {
const room = crypto.randomUUID();
const { ws } = await connectChatWS(`/agents/test-chat-agent/${room}`);
await new Promise((r) => setTimeout(r, 50));
const agentStub = await getAgentByName(env.TestChatAgent, room);
const streamId = await agentStub.testStartStream("req-456");
// Store several chunks
await agentStub.testStoreStreamChunk(
streamId,
'{"type":"text","text":"Hello"}'
);
await agentStub.testStoreStreamChunk(
streamId,
'{"type":"text","text":" world"}'
);
await agentStub.testStoreStreamChunk(
streamId,
'{"type":"text","text":"!"}'
);
// Flush the buffer
await agentStub.testFlushChunkBuffer();
const chunks = await agentStub.getStreamChunks(streamId);
expect(chunks.length).toBe(3);
expect(chunks[0].chunk_index).toBe(0);
expect(chunks[1].chunk_index).toBe(1);
expect(chunks[2].chunk_index).toBe(2);
expect(chunks[0].body).toBe('{"type":"text","text":"Hello"}');
ws.close(1000);
});
it("marks stream as completed and clears active state", async () => {
const room = crypto.randomUUID();
const { ws } = await connectChatWS(`/agents/test-chat-agent/${room}`);
await new Promise((r) => setTimeout(r, 50));
const agentStub = await getAgentByName(env.TestChatAgent, room);
const streamId = await agentStub.testStartStream("req-789");
// Verify active state
expect(await agentStub.getActiveStreamId()).toBe(streamId);
expect(await agentStub.getActiveRequestId()).toBe("req-789");
// Complete the stream
await agentStub.testCompleteStream(streamId);
// Verify cleared state
expect(await agentStub.getActiveStreamId()).toBeNull();
expect(await agentStub.getActiveRequestId()).toBeNull();
const metadata = await agentStub.getStreamMetadata(streamId);
expect(metadata?.status).toBe("completed");
ws.close(1000);
});
it("marks stream as error on failure", async () => {
const room = crypto.randomUUID();
const { ws } = await connectChatWS(`/agents/test-chat-agent/${room}`);
await new Promise((r) => setTimeout(r, 50));
const agentStub = await getAgentByName(env.TestChatAgent, room);
const streamId = await agentStub.testStartStream("req-error");
// Mark as error
await agentStub.testMarkStreamError(streamId);
// Verify cleared state
expect(await agentStub.getActiveStreamId()).toBeNull();
const metadata = await agentStub.getStreamMetadata(streamId);
expect(metadata?.status).toBe("error");
ws.close(1000);
});
});
describe("Stream resumption", () => {
it("notifies new connections about active streams", async () => {
const room = crypto.randomUUID();
// First connection - start a stream
const { ws: ws1 } = await connectChatWS(
`/agents/test-chat-agent/${room}`
);
await new Promise((r) => setTimeout(r, 50));
const agentStub = await getAgentByName(env.TestChatAgent, room);
const streamId = await agentStub.testStartStream("req-resume");
await agentStub.testStoreStreamChunk(
streamId,
'{"type":"text","text":"Hello"}'
);
await agentStub.testFlushChunkBuffer();
ws1.close();
await new Promise((r) => setTimeout(r, 50));
// Second connection - should receive resume notification
const { ws: ws2 } = await connectChatWS(
`/agents/test-chat-agent/${room}`
);
const messages2 = collectMessages(ws2);
await new Promise((r) => setTimeout(r, 100));
const resumeMsg = messages2.find(isStreamResumingMessage);
expect(resumeMsg).toBeDefined();
expect(resumeMsg?.id).toBe("req-resume");
ws2.close(1000);
});
it("sends stream chunks after client ACK", async () => {
const room = crypto.randomUUID();
// Setup - create a stream with chunks
const { ws: ws1 } = await connectChatWS(
`/agents/test-chat-agent/${room}`
);
await new Promise((r) => setTimeout(r, 50));
const agentStub = await getAgentByName(env.TestChatAgent, room);
const streamId = await agentStub.testStartStream("req-ack");
await agentStub.testStoreStreamChunk(
streamId,
'{"type":"text","text":"chunk1"}'
);
await agentStub.testStoreStreamChunk(
streamId,
'{"type":"text","text":"chunk2"}'
);
await agentStub.testFlushChunkBuffer();
ws1.close();
await new Promise((r) => setTimeout(r, 50));
// New connection
const { ws: ws2 } = await connectChatWS(
`/agents/test-chat-agent/${room}`
);
const messages2 = collectMessages(ws2);
await new Promise((r) => setTimeout(r, 100));
// Send ACK
ws2.send(
JSON.stringify({
type: MessageType.CF_AGENT_STREAM_RESUME_ACK,
id: "req-ack"
})
);
await waitFor(
() => messages2.filter(isUseChatResponseMessage).length >= 2
);
// Should receive the chunks
const chunkMsgs = messages2.filter(isUseChatResponseMessage);
expect(chunkMsgs.length).toBeGreaterThanOrEqual(2);
expect(chunkMsgs[0].body).toBe('{"type":"text","text":"chunk1"}');
expect(chunkMsgs[1].body).toBe('{"type":"text","text":"chunk2"}');
ws2.close(1000);
});
it("does not deliver live chunks before ACK to resuming connections", async () => {
const room = crypto.randomUUID();
// First connection - start a stream
const { ws: ws1 } = await connectChatWS(
`/agents/test-chat-agent/${room}`
);
const messages1 = collectMessages(ws1);
await new Promise((r) => setTimeout(r, 50));
const agentStub = await getAgentByName(env.TestChatAgent, room);
const streamId = await agentStub.testStartStream("req-live");
// Second connection - will be notified to resume
const { ws: ws2 } = await connectChatWS(
`/agents/test-chat-agent/${room}`
);
const messages2 = collectMessages(ws2);
await new Promise((r) => setTimeout(r, 100));
// Broadcast a live chunk while ws2 is pending resume (no ACK yet)
await agentStub.testBroadcastLiveChunk(
"req-live",
streamId,
'{"type":"text-delta","id":"0","delta":"A"}'
);
await new Promise((r) => setTimeout(r, 100));
// ws2 should NOT receive live chunks before ACK
const preAckChunks = messages2.filter(isUseChatResponseMessage);
expect(preAckChunks.length).toBe(0);
// ws1 should receive the live chunk
const ws1Chunks = messages1.filter(isUseChatResponseMessage);
expect(ws1Chunks.length).toBe(1);
expect(ws1Chunks[0].body).toBe(
'{"type":"text-delta","id":"0","delta":"A"}'
);
// Send ACK to resume
ws2.send(
JSON.stringify({
type: MessageType.CF_AGENT_STREAM_RESUME_ACK,
id: "req-live"
})
);
// Wait for the full round-trip: ACK delivery → server flushes chunk
// buffer to SQLite → reads chunks → sends replay back to ws2.
await waitFor(
() => messages2.filter(isUseChatResponseMessage).length >= 1
);
// After ACK, ws2 should receive the replayed chunk
const postAckChunks = messages2.filter(isUseChatResponseMessage);
expect(postAckChunks.length).toBeGreaterThanOrEqual(1);
expect(postAckChunks[0].body).toBe(
'{"type":"text-delta","id":"0","delta":"A"}'
);
// Live chunks after ACK should be delivered
await agentStub.testBroadcastLiveChunk(
"req-live",
streamId,
'{"type":"text-delta","id":"0","delta":"B"}'
);
await waitFor(() =>
messages2
.filter(isUseChatResponseMessage)
.some((m) => m.body?.includes('"delta":"B"'))
);
const finalChunks = messages2.filter(isUseChatResponseMessage);
expect(finalChunks.some((m) => m.body?.includes('"delta":"B"'))).toBe(
true
);
ws1.close();
ws2.close(1000);
});
it("ignores ACK with wrong request ID", async () => {
const room = crypto.randomUUID();
// Setup
const { ws: ws1 } = await connectChatWS(
`/agents/test-chat-agent/${room}`
);
await new Promise((r) => setTimeout(r, 50));
const agentStub = await getAgentByName(env.TestChatAgent, room);
const streamId = await agentStub.testStartStream("req-correct");
await agentStub.testStoreStreamChunk(
streamId,
'{"type":"text","text":"secret"}'
);
await agentStub.testFlushChunkBuffer();
ws1.close();
await new Promise((r) => setTimeout(r, 50));
// New connection
const { ws: ws2 } = await connectChatWS(
`/agents/test-chat-agent/${room}`
);
const messages2 = collectMessages(ws2);
await new Promise((r) => setTimeout(r, 100));
// Send ACK with wrong ID
ws2.send(
JSON.stringify({
type: MessageType.CF_AGENT_STREAM_RESUME_ACK,
id: "req-wrong-id"
})
);
await new Promise((r) => setTimeout(r, 100));
// Should NOT receive chunks (only state/mcp messages)
const chunkMsgs = messages2.filter(isUseChatResponseMessage);
expect(chunkMsgs.length).toBe(0);
ws2.close(1000);
});
});
describe("Stale stream handling", () => {
it("deletes stale streams on restore (older than 5 minutes)", async () => {
const room = crypto.randomUUID();
const { ws } = await connectChatWS(`/agents/test-chat-agent/${room}`);
await new Promise((r) => setTimeout(r, 50));
const agentStub = await getAgentByName(env.TestChatAgent, room);
// Insert a stale stream (6 minutes old)
const staleStreamId = "stale-stream-123";
await agentStub.testInsertStaleStream(
staleStreamId,
"req-stale",
6 * 60 * 1000
);
// Verify it exists
const beforeRestore = await agentStub.getStreamMetadata(staleStreamId);
expect(beforeRestore).toBeDefined();
// Trigger restore
await agentStub.testRestoreActiveStream();
// Should be deleted
const afterRestore = await agentStub.getStreamMetadata(staleStreamId);
expect(afterRestore).toBeNull();
// Active stream should NOT be set
expect(await agentStub.getActiveStreamId()).toBeNull();
ws.close(1000);
});
it("restores fresh streams (under 5 minutes old)", async () => {
const room = crypto.randomUUID();
const { ws } = await connectChatWS(`/agents/test-chat-agent/${room}`);
await new Promise((r) => setTimeout(r, 50));
const agentStub = await getAgentByName(env.TestChatAgent, room);
// Insert a fresh stream (1 minute old)
const freshStreamId = "fresh-stream-456";
await agentStub.testInsertStaleStream(
freshStreamId,
"req-fresh",
1 * 60 * 1000
);
// Clear any active state first
const currentActive = await agentStub.getActiveStreamId();
if (currentActive) {
await agentStub.testCompleteStream(currentActive);
}
// Trigger restore
await agentStub.testRestoreActiveStream();
// Should be restored
expect(await agentStub.getActiveStreamId()).toBe(freshStreamId);
expect(await agentStub.getActiveRequestId()).toBe("req-fresh");
ws.close(1000);
});
});
describe("Clear history", () => {
it("clears stream data when chat history is cleared", async () => {
const room = crypto.randomUUID();
const { ws } = await connectChatWS(`/agents/test-chat-agent/${room}`);
await new Promise((r) => setTimeout(r, 50));
const agentStub = await getAgentByName(env.TestChatAgent, room);
// Create a stream with chunks
const streamId = await agentStub.testStartStream("req-clear");
await agentStub.testStoreStreamChunk(
streamId,
'{"type":"text","text":"data"}'
);
await agentStub.testFlushChunkBuffer();
// Verify data exists
const chunksBefore = await agentStub.getStreamChunks(streamId);
expect(chunksBefore.length).toBe(1);
// Clear history via WebSocket message
ws.send(JSON.stringify({ type: MessageType.CF_AGENT_CHAT_CLEAR }));
await new Promise((r) => setTimeout(r, 100));
// Stream data should be cleared
const chunksAfter = await agentStub.getStreamChunks(streamId);
expect(chunksAfter.length).toBe(0);
const metadataAfter = await agentStub.getStreamMetadata(streamId);
expect(metadataAfter).toBeNull();
// Active state should be cleared
expect(await agentStub.getActiveStreamId()).toBeNull();
ws.close(1000);
});
});
describe("Chunk buffer", () => {
it("flushes chunks before starting a new stream", async () => {
const room = crypto.randomUUID();
const { ws } = await connectChatWS(`/agents/test-chat-agent/${room}`);
await new Promise((r) => setTimeout(r, 50));
const agentStub = await getAgentByName(env.TestChatAgent, room);
// Start first stream and add chunks without explicit flush
const stream1 = await agentStub.testStartStream("req-1");
await agentStub.testStoreStreamChunk(
stream1,
'{"type":"text","text":"s1c1"}'
);
await agentStub.testStoreStreamChunk(
stream1,
'{"type":"text","text":"s1c2"}'
);
// Start second stream - should flush first stream's chunks
const stream2 = await agentStub.testStartStream("req-2");
// First stream's chunks should be persisted
const chunks1 = await agentStub.getStreamChunks(stream1);
expect(chunks1.length).toBe(2);
// Second stream is active
expect(await agentStub.getActiveStreamId()).toBe(stream2);
ws.close(1000);
});
it("flushes on complete", async () => {
const room = crypto.randomUUID();
const { ws } = await connectChatWS(`/agents/test-chat-agent/${room}`);
await new Promise((r) => setTimeout(r, 50));
const agentStub = await getAgentByName(env.TestChatAgent, room);
const streamId = await agentStub.testStartStream("req-flush");
await agentStub.testStoreStreamChunk(
streamId,
'{"type":"text","text":"final"}'
);
// Complete - should flush
await agentStub.testCompleteStream(streamId);
const chunks = await agentStub.getStreamChunks(streamId);
expect(chunks.length).toBe(1);
expect(chunks[0].body).toBe('{"type":"text","text":"final"}');
ws.close(1000);
});
});
describe("Completed stream handling", () => {
it("sends done signal for completed streams on resume", async () => {
const room = crypto.randomUUID();
// Setup - create and complete a stream
const { ws: ws1 } = await connectChatWS(
`/agents/test-chat-agent/${room}`
);
await new Promise((r) => setTimeout(r, 50));
const agentStub = await getAgentByName(env.TestChatAgent, room);
const streamId = await agentStub.testStartStream("req-done");
await agentStub.testStoreStreamChunk(
streamId,
'{"type":"text","text":"done"}'
);
await agentStub.testCompleteStream(streamId);
ws1.close();
await new Promise((r) => setTimeout(r, 50));
// New connection - no resume notification since stream is completed
const { ws: ws2 } = await connectChatWS(
`/agents/test-chat-agent/${room}`
);
const messages2 = collectMessages(ws2);
await new Promise((r) => setTimeout(r, 100));
// Should NOT get resume notification for completed stream
const resumeMsg = messages2.find(isStreamResumingMessage);
expect(resumeMsg).toBeUndefined();
ws2.close(1000);
});
});
describe("Client-initiated resume (issue #896)", () => {
it("CF_AGENT_STREAM_RESUME_REQUEST triggers resume notification", async () => {
const room = crypto.randomUUID();
// First connection: start a stream
const { ws: ws1 } = await connectChatWS(
`/agents/test-chat-agent/${room}`
);
await new Promise((r) => setTimeout(r, 50));
const agentStub = await getAgentByName(env.TestChatAgent, room);
const streamId = await agentStub.testStartStream("req-client-resume");
await agentStub.testStoreStreamChunk(
streamId,
'{"type":"text-start","id":"t1"}'
);
await agentStub.testStoreStreamChunk(
streamId,
'{"type":"text-delta","id":"t1","delta":"hello"}'
);
await agentStub.testFlushChunkBuffer();
ws1.close();
await new Promise((r) => setTimeout(r, 50));
// Second connection: send CF_AGENT_STREAM_RESUME_REQUEST
const { ws: ws2 } = await connectChatWS(
`/agents/test-chat-agent/${room}`
);
const messages2 = collectMessages(ws2);
// Wait briefly for any onConnect push (which we'll also get)
await new Promise((r) => setTimeout(r, 50));
// Send the client-initiated resume request
ws2.send(
JSON.stringify({
type: MessageType.CF_AGENT_STREAM_RESUME_REQUEST
})
);
await new Promise((r) => setTimeout(r, 100));
// Should have received CF_AGENT_STREAM_RESUMING (from request, not just onConnect)
const resumeMsgs = messages2.filter(isStreamResumingMessage);
// May get 2 (one from onConnect, one from request) or 1 if timing collapses them
expect(resumeMsgs.length).toBeGreaterThanOrEqual(1);
ws2.close(1000);
});
it("CF_AGENT_STREAM_RESUME_REQUEST with no active stream sends RESUME_NONE", async () => {
const room = crypto.randomUUID();
const { ws } = await connectChatWS(`/agents/test-chat-agent/${room}`);
const messages = collectMessages(ws);
await new Promise((r) => setTimeout(r, 50));
// Send resume request when there's no active stream
ws.send(
JSON.stringify({
type: MessageType.CF_AGENT_STREAM_RESUME_REQUEST
})
);
await new Promise((r) => setTimeout(r, 500));
// Should NOT get CF_AGENT_STREAM_RESUMING
const resumeMsg = messages.find(isStreamResumingMessage);
expect(resumeMsg).toBeUndefined();
// Should get CF_AGENT_STREAM_RESUME_NONE
const noneMsg = messages.find(
(m) =>
typeof m === "object" &&
m !== null &&
"type" in m &&
m.type === MessageType.CF_AGENT_STREAM_RESUME_NONE
);
expect(noneMsg).toBeDefined();
ws.close(1000);
});
it("replayed chunks have replay=true flag", async () => {
const room = crypto.randomUUID();
const { ws: ws1 } = await connectChatWS(
`/agents/test-chat-agent/${room}`
);
await new Promise((r) => setTimeout(r, 50));
const agentStub = await getAgentByName(env.TestChatAgent, room);
// Start a stream and add chunks but do NOT complete it
// (stream must be active for resume to work)
const streamId = await agentStub.testStartStream("req-replay-flag");
await agentStub.testStoreStreamChunk(
streamId,
'{"type":"text-start","id":"t1"}'
);
await agentStub.testStoreStreamChunk(
streamId,
'{"type":"text-delta","id":"t1","delta":"test"}'
);
await agentStub.testFlushChunkBuffer();
ws1.close();
await new Promise((r) => setTimeout(r, 50));
// Reconnect — active stream triggers resume
const { ws: ws2 } = await connectChatWS(
`/agents/test-chat-agent/${room}`
);
const messages2 = collectMessages(ws2);
await new Promise((r) => setTimeout(r, 50));
// Send resume request
ws2.send(
JSON.stringify({
type: MessageType.CF_AGENT_STREAM_RESUME_REQUEST
})
);
await new Promise((r) => setTimeout(r, 50));
// ACK the resuming notification
const resumeMsg = messages2.find(isStreamResumingMessage);
expect(resumeMsg).toBeDefined();
ws2.send(
JSON.stringify({
type: MessageType.CF_AGENT_STREAM_RESUME_ACK,
id: (resumeMsg as { id: string }).id
})
);
await waitFor(
() => messages2.filter(isUseChatResponseMessage).length > 0
);
// All CF_AGENT_USE_CHAT_RESPONSE messages should have replay=true
const responseMessages = messages2.filter(isUseChatResponseMessage);
expect(responseMessages.length).toBeGreaterThan(0);
for (const msg of responseMessages) {
expect((msg as { replay?: boolean }).replay).toBe(true);
}
ws2.close(1000);
});
});
describe("Replay complete signal for active streams (issue #896 follow-up)", () => {
it("sends replayComplete=true after replaying chunks for a live stream", async () => {
const room = crypto.randomUUID();
const { ws: ws1 } = await connectChatWS(
`/agents/test-chat-agent/${room}`
);
await new Promise((r) => setTimeout(r, 50));
const agentStub = await getAgentByName(env.TestChatAgent, room);
// Start a stream and add chunks but do NOT complete it
const streamId = await agentStub.testStartStream("req-replay-complete");
await agentStub.testStoreStreamChunk(
streamId,
'{"type":"text-start","id":"t1"}'
);
await agentStub.testStoreStreamChunk(
streamId,
'{"type":"text-delta","id":"t1","delta":"thinking..."}'
);
await agentStub.testFlushChunkBuffer();
ws1.close();
await new Promise((r) => setTimeout(r, 50));
// Reconnect — active stream triggers resume
const { ws: ws2 } = await connectChatWS(
`/agents/test-chat-agent/${room}`
);
const messages2 = collectMessages(ws2);
await new Promise((r) => setTimeout(r, 50));
// ACK the resuming notification
const resumeMsg = messages2.find(isStreamResumingMessage);
expect(resumeMsg).toBeDefined();
ws2.send(
JSON.stringify({
type: MessageType.CF_AGENT_STREAM_RESUME_ACK,
id: (resumeMsg as { id: string }).id
})
);
await waitFor(
() => messages2.filter(isUseChatResponseMessage).length > 0
);
const responseMessages = messages2.filter(isUseChatResponseMessage);
expect(responseMessages.length).toBeGreaterThan(0);
// The last response message should be the replayComplete signal
const lastMsg = responseMessages[responseMessages.length - 1] as {
replay?: boolean;
replayComplete?: boolean;
done?: boolean;
body?: string;
};
expect(lastMsg.replay).toBe(true);
expect(lastMsg.replayComplete).toBe(true);
expect(lastMsg.done).toBe(false);
expect(lastMsg.body).toBe("");
ws2.close(1000);
});
it("sends done=true for orphaned streams after hibernation wake", async () => {
const room = crypto.randomUUID();
const { ws: ws1 } = await connectChatWS(
`/agents/test-chat-agent/${room}`
);
await new Promise((r) => setTimeout(r, 50));
const agentStub = await getAgentByName(env.TestChatAgent, room);
// Start a stream and add chunks
const streamId = await agentStub.testStartStream("req-orphaned");
await agentStub.testStoreStreamChunk(
streamId,
'{"type":"text-start","id":"t1"}'
);
await agentStub.testStoreStreamChunk(
streamId,
'{"type":"text-delta","id":"t1","delta":"partial response"}'
);
await agentStub.testFlushChunkBuffer();
ws1.close();
await new Promise((r) => setTimeout(r, 50));
// Simulate hibernation: reinitialize ResumableStream (isLive=false)
await agentStub.testSimulateHibernationWake();
// Verify stream was restored from SQLite but is not live
expect(await agentStub.getActiveStreamId()).toBe(streamId);
// Reconnect
const { ws: ws2 } = await connectChatWS(
`/agents/test-chat-agent/${room}`
);
const messages2 = collectMessages(ws2);
await new Promise((r) => setTimeout(r, 50));
// ACK the resuming notification
const resumeMsg = messages2.find(isStreamResumingMessage);
expect(resumeMsg).toBeDefined();
ws2.send(
JSON.stringify({
type: MessageType.CF_AGENT_STREAM_RESUME_ACK,
id: (resumeMsg as { id: string }).id
})
);
await waitFor(
() => messages2.filter(isUseChatResponseMessage).length > 0
);
const responseMessages = messages2.filter(isUseChatResponseMessage);
expect(responseMessages.length).toBeGreaterThan(0);
// The last message should be done=true (NOT replayComplete)
const lastMsg = responseMessages[responseMessages.length - 1] as {
replay?: boolean;
replayComplete?: boolean;
done?: boolean;
body?: string;
};
expect(lastMsg.replay).toBe(true);
expect(lastMsg.done).toBe(true);
expect(lastMsg.replayComplete).toBeUndefined();
// Stream should be marked completed in SQLite
const metadata = await agentStub.getStreamMetadata(streamId);
expect(metadata?.status).toBe("completed");
expect(await agentStub.getActiveStreamId()).toBeNull();
// Partial assistant message should be persisted
const persisted =
(await agentStub.getPersistedMessages()) as unknown as Array<{
role: string;
parts: Array<{ type: string; text?: string }>;
}>;
const assistantMsg = persisted.find((m) => m.role === "assistant");
expect(assistantMsg).toBeDefined();
expect(assistantMsg!.parts.length).toBeGreaterThan(0);
// Should contain the text from the replayed chunks
const textPart = assistantMsg!.parts.find((p) => p.type === "text");
expect(textPart).toBeDefined();
expect(textPart!.text).toContain("partial response");
ws2.close(1000);
});
});
describe("Orphaned stream edge cases", () => {
it("orphaned stream with zero chunks completes cleanly without persisting empty message", async () => {
const room = crypto.randomUUID();
const { ws: ws1 } = await connectChatWS(
`/agents/test-chat-agent/${room}`
);
await new Promise((r) => setTimeout(r, 50));
const agentStub = await getAgentByName(env.TestChatAgent, room);
// Start a stream but add NO chunks
const streamId = await agentStub.testStartStream("req-empty-orphan");
await agentStub.testFlushChunkBuffer();
ws1.close();
await new Promise((r) => setTimeout(r, 50));
// Simulate hibernation
await agentStub.testSimulateHibernationWake();
expect(await agentStub.getActiveStreamId()).toBe(streamId);
// Reconnect
const { ws: ws2 } = await connectChatWS(
`/agents/test-chat-agent/${room}`
);
const messages2 = collectMessages(ws2);
await new Promise((r) => setTimeout(r, 50));
const resumeMsg = messages2.find(isStreamResumingMessage);
expect(resumeMsg).toBeDefined();
ws2.send(
JSON.stringify({
type: MessageType.CF_AGENT_STREAM_RESUME_ACK,
id: (resumeMsg as { id: string }).id
})
);
await waitFor(async () => (await agentStub.getActiveStreamId()) === null);
// Stream should be completed
expect(await agentStub.getActiveStreamId()).toBeNull();
const metadata = await agentStub.getStreamMetadata(streamId);
expect(metadata?.status).toBe("completed");
// No assistant message should be persisted (zero chunks = no content)
const persisted =
(await agentStub.getPersistedMessages()) as unknown as Array<{
role: string;
}>;
const assistantMsg = persisted.find((m) => m.role === "assistant");
expect(assistantMsg).toBeUndefined();
ws2.close(1000);
});
it("orphaned stream with tool call parts reconstructs correctly", async () => {
const room = crypto.randomUUID();
const { ws: ws1 } = await connectChatWS(
`/agents/test-chat-agent/${room}`
);
await new Promise((r) => setTimeout(r, 50));
const agentStub = await getAgentByName(env.TestChatAgent, room);
const streamId = await agentStub.testStartStream("req-tool-orphan");
// Simulate a stream that contained text + tool call
await agentStub.testStoreStreamChunk(
streamId,
'{"type":"text-start","id":"t1"}'
);
await agentStub.testStoreStreamChunk(
streamId,
'{"type":"text-delta","id":"t1","delta":"Let me check the weather."}'
);
await agentStub.testStoreStreamChunk(
streamId,
'{"type":"text-end","id":"t1"}'
);
await agentStub.testStoreStreamChunk(
streamId,
'{"type":"tool-input-start","toolCallId":"tc-1","toolName":"getWeather"}'
);
await agentStub.testStoreStreamChunk(
streamId,
'{"type":"tool-input-available","toolCallId":"tc-1","toolName":"getWeather","input":{"city":"London"}}'
);
await agentStub.testFlushChunkBuffer();
ws1.close();
await new Promise((r) => setTimeout(r, 50));
// Simulate hibernation
await agentStub.testSimulateHibernationWake();
// Reconnect + ACK
const { ws: ws2 } = await connectChatWS(
`/agents/test-chat-agent/${room}`
);
const messages2 = collectMessages(ws2);
await new Promise((r) => setTimeout(r, 50));
const resumeMsg = messages2.find(isStreamResumingMessage);
expect(resumeMsg).toBeDefined();
ws2.send(
JSON.stringify({
type: MessageType.CF_AGENT_STREAM_RESUME_ACK,
id: (resumeMsg as { id: string }).id
})
);
await waitFor(async () => (await agentStub.getActiveStreamId()) === null);
// Verify message was reconstructed with both text and tool parts
const persisted =
(await agentStub.getPersistedMessages()) as unknown as Array<{
role: string;
parts: Array<{ type: string; text?: string; toolCallId?: string }>;
}>;
const assistantMsg = persisted.find((m) => m.role === "assistant");
expect(assistantMsg).toBeDefined();
// Should have a text part
const textPart = assistantMsg!.parts.find((p) => p.type === "text");
expect(textPart).toBeDefined();
expect(textPart!.text).toContain("Let me check the weather.");
// Should have a tool call part
const toolPart = assistantMsg!.parts.find((p) => p.toolCallId === "tc-1");
expect(toolPart).toBeDefined();
ws2.close(1000);
});
it("second ACK after orphaned stream is finalized is a no-op", async () => {
const room = crypto.randomUUID();
const { ws: ws1 } = await connectChatWS(
`/agents/test-chat-agent/${room}`
);
await new Promise((r) => setTimeout(r, 50));
const agentStub = await getAgentByName(env.TestChatAgent, room);
const streamId = await agentStub.testStartStream("req-double-ack");
await agentStub.testStoreStreamChunk(
streamId,
'{"type":"text-start","id":"t1"}'
);
await agentStub.testStoreStreamChunk(
streamId,
'{"type":"text-delta","id":"t1","delta":"hello"}'
);
await agentStub.testFlushChunkBuffer();
ws1.close();
await new Promise((r) => setTimeout(r, 50));
// Simulate hibernation
await agentStub.testSimulateHibernationWake();
// First client connects and ACKs — orphaned stream gets finalized
const { ws: ws2 } = await connectChatWS(
`/agents/test-chat-agent/${room}`
);
const messages2 = collectMessages(ws2);
await new Promise((r) => setTimeout(r, 50));
const resumeMsg = messages2.find(isStreamResumingMessage);
expect(resumeMsg).toBeDefined();
ws2.send(
JSON.stringify({
type: MessageType.CF_AGENT_STREAM_RESUME_ACK,
id: (resumeMsg as { id: string }).id
})
);
await waitFor(async () => (await agentStub.getActiveStreamId()) === null);
// Stream is now finalized
expect(await agentStub.getActiveStreamId()).toBeNull();
// Second ACK with the same request ID — should be a no-op
ws2.send(
JSON.stringify({
type: MessageType.CF_AGENT_STREAM_RESUME_ACK,
id: "req-double-ack"
})
);
await new Promise((r) => setTimeout(r, 100));
// Should still have exactly one assistant message (no duplicate)
const persisted =
(await agentStub.getPersistedMessages()) as unknown as Array<{
role: string;
}>;
const assistantMsgs = persisted.filter((m) => m.role === "assistant");
expect(assistantMsgs.length).toBe(1);
ws2.close(1000);
});
});
describe("clearAll clears chunk buffer", () => {
it("buffered chunks are not flushed to SQLite after clearAll", async () => {
const room = crypto.randomUUID();
const { ws } = await connectChatWS(`/agents/test-chat-agent/${room}`);
await new Promise((r) => setTimeout(r, 50));
const agentStub = await getAgentByName(env.TestChatAgent, room);
// Start a stream and buffer some chunks (do NOT flush)
const streamId = await agentStub.testStartStream("req-buffer-clear");
await agentStub.testStoreStreamChunk(streamId, "chunk-1");
await agentStub.testStoreStreamChunk(streamId, "chunk-2");
// Chunks should be in buffer but not yet in SQLite (buffer size < 10)
let chunks = await agentStub.getStreamChunks(streamId);
expect(chunks.length).toBe(0); // Still in memory buffer
// Clear all — should discard the buffer
ws.send(JSON.stringify({ type: "cf_agent_chat_clear" }));
await new Promise((r) => setTimeout(r, 100));
// Flush should be a no-op since buffer was cleared
await agentStub.testFlushChunkBuffer();
chunks = await agentStub.getStreamChunks(streamId);
expect(chunks.length).toBe(0);
// Wait before close to let the agent settle
await new Promise((r) => setTimeout(r, 50));
ws.close(1000);
});
});
describe("errored stream cleanup", () => {
it("errored streams are cleaned up alongside completed streams", async () => {
const room = crypto.randomUUID();
const { ws } = await connectChatWS(`/agents/test-chat-agent/${room}`);
await new Promise((r) => setTimeout(r, 50));
const agentStub = await getAgentByName(env.TestChatAgent, room);
// Insert an old errored stream (25 hours old, past the 24h cleanup threshold)
await agentStub.testInsertOldErroredStream(
"old-errored",
"req-errored",
25 * 60 * 60 * 1000
);
// Verify the errored stream exists
const metadata = await agentStub.getStreamMetadata("old-errored");
expect(metadata?.status).toBe("error");
// Trigger cleanup by completing a dummy stream
// (cleanup runs periodically inside completeStream)
await agentStub.testTriggerStreamCleanup();
// The old errored stream should be cleaned up
const afterMetadata = await agentStub.getStreamMetadata("old-errored");
expect(afterMetadata).toBeNull();
// Wait before close to let the agent settle
await new Promise((r) => setTimeout(r, 50));
ws.close(1000);
});
});
});