branch:
client-tools-broadcast.test.ts
4395 bytesRaw
import { describe, it, expect } from "vitest";
import { MessageType } from "../types";
import type { UIMessage as ChatMessage } from "ai";
import { connectChatWS } from "./test-utils";

describe("Client Tools Broadcast", () => {
  it("should not broadcast CF_AGENT_CHAT_MESSAGES back to the originating connection after chat request", async () => {
    const room = crypto.randomUUID();
    const { ws } = await connectChatWS(`/agents/test-chat-agent/${room}`);

    const receivedMessages: Array<{ type: string; [key: string]: unknown }> =
      [];
    let resolvePromise: (value: boolean) => void;
    const donePromise = new Promise<boolean>((res) => {
      resolvePromise = res;
    });

    const timeout = setTimeout(() => resolvePromise(false), 2000);

    ws.addEventListener("message", (e: MessageEvent) => {
      const data = JSON.parse(e.data as string);
      receivedMessages.push(data);

      // Wait for the response to complete
      if (data.type === MessageType.CF_AGENT_USE_CHAT_RESPONSE && data.done) {
        // Give a small delay to catch any broadcast that might follow
        setTimeout(() => {
          clearTimeout(timeout);
          resolvePromise(true);
        }, 100);
      }
    });

    const userMessage: ChatMessage = {
      id: "msg1",
      role: "user",
      parts: [{ type: "text", text: "Hello" }]
    };

    // Send chat request from the client
    ws.send(
      JSON.stringify({
        type: MessageType.CF_AGENT_USE_CHAT_REQUEST,
        id: "req1",
        init: {
          method: "POST",
          body: JSON.stringify({ messages: [userMessage] })
        }
      })
    );

    const done = await donePromise;
    expect(done).toBe(true);

    // The originating connection should NOT receive CF_AGENT_CHAT_MESSAGES
    // It should only receive CF_AGENT_USE_CHAT_RESPONSE messages
    const chatMessagesReceived = receivedMessages.filter(
      (m) => m.type === MessageType.CF_AGENT_CHAT_MESSAGES
    );

    // This is the bug: the originating connection receives CF_AGENT_CHAT_MESSAGES
    // which causes duplicate messages when combined with the stream response
    expect(chatMessagesReceived.length).toBe(0);

    ws.close(1000);
  });

  it("should broadcast CF_AGENT_CHAT_MESSAGES to other connections but not the originator", async () => {
    const room = crypto.randomUUID();

    // Connect two clients to the same room
    const { ws: ws1 } = await connectChatWS(`/agents/test-chat-agent/${room}`);
    const { ws: ws2 } = await connectChatWS(`/agents/test-chat-agent/${room}`);

    const ws1Messages: Array<{ type: string; [key: string]: unknown }> = [];
    const ws2Messages: Array<{ type: string; [key: string]: unknown }> = [];

    let resolvePromise: (value: boolean) => void;
    const donePromise = new Promise<boolean>((res) => {
      resolvePromise = res;
    });

    const timeout = setTimeout(() => resolvePromise(false), 2000);

    ws1.addEventListener("message", (e: MessageEvent) => {
      const data = JSON.parse(e.data as string);
      ws1Messages.push(data);

      if (data.type === MessageType.CF_AGENT_USE_CHAT_RESPONSE && data.done) {
        setTimeout(() => {
          clearTimeout(timeout);
          resolvePromise(true);
        }, 100);
      }
    });

    ws2.addEventListener("message", (e: MessageEvent) => {
      const data = JSON.parse(e.data as string);
      ws2Messages.push(data);
    });

    const userMessage: ChatMessage = {
      id: "msg1",
      role: "user",
      parts: [{ type: "text", text: "Hello" }]
    };

    // WS1 sends the chat request
    ws1.send(
      JSON.stringify({
        type: MessageType.CF_AGENT_USE_CHAT_REQUEST,
        id: "req1",
        init: {
          method: "POST",
          body: JSON.stringify({ messages: [userMessage] })
        }
      })
    );

    const done = await donePromise;
    expect(done).toBe(true);

    // WS1 (originator) should NOT receive CF_AGENT_CHAT_MESSAGES
    const ws1ChatMessages = ws1Messages.filter(
      (m) => m.type === MessageType.CF_AGENT_CHAT_MESSAGES
    );
    expect(ws1ChatMessages.length).toBe(0);

    // WS2 (other connection) SHOULD receive CF_AGENT_CHAT_MESSAGES
    const ws2ChatMessages = ws2Messages.filter(
      (m) => m.type === MessageType.CF_AGENT_CHAT_MESSAGES
    );
    expect(ws2ChatMessages.length).toBeGreaterThan(0);

    ws1.close();
    ws2.close(1000);
  });
});