branch:
row-size-guard.test.ts
12433 bytesRaw
import { env } from "cloudflare:workers";
import { describe, it, expect } from "vitest";
import type { UIMessage as ChatMessage } from "ai";
import { connectChatWS } from "./test-utils";
import { getAgentByName } from "agents";
import { MessageType } from "../types";

describe("Row Size Guard and Incremental Persistence", () => {
  describe("Incremental persistence", () => {
    it("persists new messages and skips unchanged ones on second call", async () => {
      const room = crypto.randomUUID();
      const { ws } = await connectChatWS(`/agents/test-chat-agent/${room}`);
      await new Promise((r) => setTimeout(r, 50));

      const agentStub = await getAgentByName(env.TestChatAgent, room);

      // Persist two messages
      const messages: ChatMessage[] = [
        {
          id: "inc-1",
          role: "user",
          parts: [{ type: "text", text: "Hello" }]
        },
        {
          id: "inc-2",
          role: "assistant",
          parts: [{ type: "text", text: "Hi there!" }]
        }
      ];

      await agentStub.persistMessages(messages);

      let persisted = (await agentStub.getPersistedMessages()) as ChatMessage[];
      expect(persisted.length).toBe(2);

      // Persist the same messages again -- should be a no-op in SQL
      // (we can't directly observe SQL write count, but we can verify
      // the messages are still correct)
      await agentStub.persistMessages(messages);

      persisted = (await agentStub.getPersistedMessages()) as ChatMessage[];
      expect(persisted.length).toBe(2);
      expect(persisted[0].id).toBe("inc-1");
      expect(persisted[1].id).toBe("inc-2");

      ws.close(1000);
    });

    it("persists modified messages when content changes", async () => {
      const room = crypto.randomUUID();
      const { ws } = await connectChatWS(`/agents/test-chat-agent/${room}`);
      await new Promise((r) => setTimeout(r, 50));

      const agentStub = await getAgentByName(env.TestChatAgent, room);

      // Persist initial message
      await agentStub.persistMessages([
        {
          id: "mod-1",
          role: "assistant",
          parts: [{ type: "text", text: "Original" }]
        }
      ]);

      // Modify the message
      await agentStub.persistMessages([
        {
          id: "mod-1",
          role: "assistant",
          parts: [{ type: "text", text: "Updated content" }]
        }
      ]);

      const persisted =
        (await agentStub.getPersistedMessages()) as ChatMessage[];
      expect(persisted.length).toBe(1);
      const textPart = persisted[0].parts[0] as { text: string };
      expect(textPart.text).toBe("Updated content");

      ws.close(1000);
    });

    it("cache is cleared on chat clear", async () => {
      const room = crypto.randomUUID();
      const { ws } = await connectChatWS(`/agents/test-chat-agent/${room}`);
      await new Promise((r) => setTimeout(r, 50));

      const agentStub = await getAgentByName(env.TestChatAgent, room);

      // Persist a message
      await agentStub.persistMessages([
        {
          id: "clear-cache-1",
          role: "user",
          parts: [{ type: "text", text: "Hello" }]
        }
      ]);

      // Clear via WebSocket
      ws.send(JSON.stringify({ type: MessageType.CF_AGENT_CHAT_CLEAR }));
      await new Promise((r) => setTimeout(r, 100));

      // Verify cleared
      const persisted =
        (await agentStub.getPersistedMessages()) as ChatMessage[];
      expect(persisted.length).toBe(0);

      // Persist a new message with the same ID -- should succeed
      // (cache was cleared, so it won't skip)
      await agentStub.persistMessages([
        {
          id: "clear-cache-1",
          role: "user",
          parts: [{ type: "text", text: "New message same ID" }]
        }
      ]);

      const afterPersist =
        (await agentStub.getPersistedMessages()) as ChatMessage[];
      expect(afterPersist.length).toBe(1);
      expect((afterPersist[0].parts[0] as { text: string }).text).toBe(
        "New message same ID"
      );

      ws.close(1000);
    });
  });

  describe("Row size enforcement", () => {
    it("messages under 1.8MB pass through unchanged", async () => {
      const room = crypto.randomUUID();
      const { ws } = await connectChatWS(`/agents/test-chat-agent/${room}`);
      await new Promise((r) => setTimeout(r, 50));

      const agentStub = await getAgentByName(env.TestChatAgent, room);

      // Create a message with a moderately large tool output (50KB)
      const toolOutput = "A".repeat(50_000);
      const message: ChatMessage = {
        id: "size-ok",
        role: "assistant",
        parts: [
          {
            type: "tool-bigTool",
            toolCallId: "call_ok",
            state: "output-available",
            input: {},
            output: toolOutput
          }
        ] as ChatMessage["parts"]
      };

      await agentStub.persistMessages([message]);

      const persisted =
        (await agentStub.getPersistedMessages()) as ChatMessage[];
      expect(persisted.length).toBe(1);

      // Output should be preserved at full fidelity (under 1.8MB)
      const part = persisted[0].parts[0] as { output: unknown };
      expect(part.output).toBe(toolOutput);

      ws.close(1000);
    });

    it("messages over 1.8MB have tool outputs compacted", async () => {
      const room = crypto.randomUUID();
      const { ws } = await connectChatWS(`/agents/test-chat-agent/${room}`);
      await new Promise((r) => setTimeout(r, 50));

      const agentStub = await getAgentByName(env.TestChatAgent, room);

      // Create a message with a huge tool output that pushes over 1.8MB
      const hugeOutput = "X".repeat(1_900_000);
      const message: ChatMessage = {
        id: "size-big",
        role: "assistant",
        parts: [
          {
            type: "tool-hugeTool",
            toolCallId: "call_huge",
            state: "output-available",
            input: { query: "big data" },
            output: hugeOutput
          }
        ] as ChatMessage["parts"]
      };

      // Should NOT throw -- the guard compacts the output
      await agentStub.persistMessages([message]);

      const persisted =
        (await agentStub.getPersistedMessages()) as ChatMessage[];
      expect(persisted.length).toBe(1);

      // Output should be compacted (not the original huge string)
      const part = persisted[0].parts[0] as { output: unknown };
      const outputStr = part.output as string;
      expect(outputStr).toContain("too large to persist");
      expect(outputStr).toContain("bytes");
      expect(outputStr.length).toBeLessThan(hugeOutput.length);

      ws.close(1000);
    });

    it("compacted messages have metadata with compactedToolOutputs", async () => {
      const room = crypto.randomUUID();
      const { ws } = await connectChatWS(`/agents/test-chat-agent/${room}`);
      await new Promise((r) => setTimeout(r, 50));

      const agentStub = await getAgentByName(env.TestChatAgent, room);

      const message: ChatMessage = {
        id: "meta-compact",
        role: "assistant",
        parts: [
          {
            type: "tool-bigTool",
            toolCallId: "call_meta",
            state: "output-available",
            input: {},
            output: "Y".repeat(1_900_000)
          }
        ] as ChatMessage["parts"]
      };

      await agentStub.persistMessages([message]);

      const persisted =
        (await agentStub.getPersistedMessages()) as ChatMessage[];
      const metadata = persisted[0].metadata as Record<string, unknown>;
      expect(metadata).toBeDefined();
      expect(metadata.compactedToolOutputs).toEqual(["call_meta"]);

      ws.close(1000);
    });

    it("non-assistant messages pass through even if large", async () => {
      const room = crypto.randomUUID();
      const { ws } = await connectChatWS(`/agents/test-chat-agent/${room}`);
      await new Promise((r) => setTimeout(r, 50));

      const agentStub = await getAgentByName(env.TestChatAgent, room);

      // A large user message (no tool outputs to compact)
      // This tests the text truncation fallback for non-assistant messages
      const largeText = "Z".repeat(1_900_000);
      const message: ChatMessage = {
        id: "user-big",
        role: "user",
        parts: [{ type: "text", text: largeText }]
      };

      // Should not crash
      await agentStub.persistMessages([message]);

      const persisted =
        (await agentStub.getPersistedMessages()) as ChatMessage[];
      expect(persisted.length).toBe(1);

      // Text should be truncated
      const textPart = persisted[0].parts[0] as { text: string };
      expect(textPart.text).toContain("truncated for storage");
      expect(textPart.text.length).toBeLessThan(largeText.length);

      ws.close(1000);
    });
  });

  describe("Unicode byte-length measurement", () => {
    it("compacts messages with multi-byte Unicode that exceed byte limit", async () => {
      const room = crypto.randomUUID();
      const { ws } = await connectChatWS(`/agents/test-chat-agent/${room}`);
      await new Promise((r) => setTimeout(r, 50));

      const agentStub = await getAgentByName(env.TestChatAgent, room);

      // CJK character \u4e00 is 1 JS char but 3 bytes in UTF-8.
      // 700,000 CJK chars = 700,000 JS chars (under 1.8M char limit)
      // but 2,100,000 UTF-8 bytes (over 1.8MB byte limit).
      // This tests that the byte-length guard catches it.
      const cjkOutput = "\u4e00".repeat(700_000);

      const message: ChatMessage = {
        id: "unicode-test",
        role: "assistant",
        parts: [
          {
            type: "tool-cjkTool",
            toolCallId: "call_unicode",
            state: "output-available",
            input: {},
            output: cjkOutput
          }
        ] as ChatMessage["parts"]
      };

      await agentStub.persistMessages([message]);

      const persisted =
        (await agentStub.getPersistedMessages()) as ChatMessage[];
      expect(persisted.length).toBe(1);

      // The tool output should be compacted (byte size exceeds limit)
      const part = persisted[0].parts[0] as { output: unknown };
      expect(typeof part.output).toBe("string");
      expect((part.output as string).length).toBeLessThan(cjkOutput.length);
      expect(part.output as string).toContain("too large to persist");

      ws.close(1000);
    });
  });

  describe("Stream chunk size guard", () => {
    it("normal chunks are stored", async () => {
      const room = crypto.randomUUID();
      const { ws } = await connectChatWS(`/agents/test-chat-agent/${room}`);
      await new Promise((r) => setTimeout(r, 50));

      const agentStub = await getAgentByName(env.TestChatAgent, room);

      const streamId = await agentStub.testStartStream("req-chunk-ok");
      await agentStub.testStoreStreamChunk(
        streamId,
        '{"type":"text-delta","delta":"hello"}'
      );
      await agentStub.testFlushChunkBuffer();

      const chunks = await agentStub.getStreamChunks(streamId);
      expect(chunks.length).toBe(1);

      ws.close(1000);
    });

    it("oversized chunks are skipped without crash", async () => {
      const room = crypto.randomUUID();
      const { ws } = await connectChatWS(`/agents/test-chat-agent/${room}`);
      await new Promise((r) => setTimeout(r, 50));

      const agentStub = await getAgentByName(env.TestChatAgent, room);

      const streamId = await agentStub.testStartStream("req-chunk-big");

      // Store a normal chunk first
      await agentStub.testStoreStreamChunk(
        streamId,
        '{"type":"text-start","id":"t1"}'
      );

      // Store an oversized chunk (>1.8MB) -- should be skipped
      const hugeChunk =
        '{"type":"tool-output-available","output":"' +
        "X".repeat(1_900_000) +
        '"}';
      await agentStub.testStoreStreamChunk(streamId, hugeChunk);

      // Store another normal chunk after
      await agentStub.testStoreStreamChunk(
        streamId,
        '{"type":"text-end","id":"t1"}'
      );

      await agentStub.testFlushChunkBuffer();

      const chunks = await agentStub.getStreamChunks(streamId);
      // Should have 2 chunks (the oversized one was skipped)
      expect(chunks.length).toBe(2);
      expect(chunks[0].body).toContain("text-start");
      expect(chunks[1].body).toContain("text-end");

      ws.close(1000);
    });
  });
});