branch:
resumable-stream.ts
16047 bytesRaw
/**
 * ResumableStream: Standalone class for buffering, persisting, and replaying
 * stream chunks in SQLite. Extracted from AIChatAgent to separate concerns.
 *
 * Handles:
 * - Chunk buffering (batched writes to SQLite for performance)
 * - Stream lifecycle (start, complete, error)
 * - Chunk replay for reconnecting clients
 * - Stale stream cleanup
 * - Active stream restoration after agent restart
 */

import { nanoid } from "nanoid";
import type { Connection } from "agents";
import { MessageType } from "./types";

/** Number of chunks to buffer before flushing to SQLite */
const CHUNK_BUFFER_SIZE = 10;
/** Maximum buffer size to prevent memory issues on rapid reconnections */
const CHUNK_BUFFER_MAX_SIZE = 100;
/** Maximum age for a "streaming" stream before considering it stale (ms) - 5 minutes */
const STREAM_STALE_THRESHOLD_MS = 5 * 60 * 1000;
/** Default cleanup interval for old streams (ms) - every 10 minutes */
const CLEANUP_INTERVAL_MS = 10 * 60 * 1000;
/** Default age threshold for cleaning up completed streams (ms) - 24 hours */
const CLEANUP_AGE_THRESHOLD_MS = 24 * 60 * 60 * 1000;
/** Shared encoder for UTF-8 byte length measurement */
const textEncoder = new TextEncoder();

/**
 * Stored stream chunk for resumable streaming
 */
type StreamChunk = {
  id: string;
  stream_id: string;
  body: string;
  chunk_index: number;
  created_at: number;
};

/**
 * Stream metadata for tracking active streams
 */
type StreamMetadata = {
  id: string;
  request_id: string;
  status: "streaming" | "completed" | "error";
  created_at: number;
  completed_at: number | null;
};

/**
 * Minimal SQL interface matching Agent's this.sql tagged template.
 * Allows ResumableStream to work with the Agent's SQLite without
 * depending on the full Agent class.
 */
export type SqlTaggedTemplate = {
  <T = Record<string, unknown>>(
    strings: TemplateStringsArray,
    ...values: (string | number | boolean | null)[]
  ): T[];
};

export class ResumableStream {
  private _activeStreamId: string | null = null;
  private _activeRequestId: string | null = null;
  private _streamChunkIndex = 0;

  /**
   * Whether the active stream was started in this instance (true) or
   * restored from SQLite after hibernation/restart (false). An orphaned
   * stream has no live LLM reader — the ReadableStream was lost when the
   * DO was evicted.
   */
  private _isLive = false;

  private _chunkBuffer: Array<{
    id: string;
    streamId: string;
    body: string;
    index: number;
  }> = [];
  private _isFlushingChunks = false;
  private _lastCleanupTime = 0;

  constructor(private sql: SqlTaggedTemplate) {
    // Create tables for stream chunks and metadata
    this.sql`create table if not exists cf_ai_chat_stream_chunks (
      id text primary key,
      stream_id text not null,
      body text not null,
      chunk_index integer not null,
      created_at integer not null
    )`;

    this.sql`create table if not exists cf_ai_chat_stream_metadata (
      id text primary key,
      request_id text not null,
      status text not null,
      created_at integer not null,
      completed_at integer
    )`;

    this.sql`create index if not exists idx_stream_chunks_stream_id 
      on cf_ai_chat_stream_chunks(stream_id, chunk_index)`;

    // Restore any active stream from a previous session
    this.restore();
  }

  // ── State accessors ────────────────────────────────────────────────

  get activeStreamId(): string | null {
    return this._activeStreamId;
  }

  get activeRequestId(): string | null {
    return this._activeRequestId;
  }

  hasActiveStream(): boolean {
    return this._activeStreamId !== null;
  }

  /**
   * Whether the active stream has a live LLM reader (started in this
   * instance) vs being restored from SQLite after hibernation (orphaned).
   */
  get isLive(): boolean {
    return this._isLive;
  }

  // ── Stream lifecycle ───────────────────────────────────────────────

  /**
   * Start tracking a new stream for resumable streaming.
   * Creates metadata entry in SQLite and sets up tracking state.
   * @param requestId - The unique ID of the chat request
   * @returns The generated stream ID
   */
  start(requestId: string): string {
    // Flush any pending chunks from previous streams to prevent mixing
    this.flushBuffer();

    const streamId = nanoid();
    this._activeStreamId = streamId;
    this._activeRequestId = requestId;
    this._streamChunkIndex = 0;
    this._isLive = true;

    this.sql`
      insert into cf_ai_chat_stream_metadata (id, request_id, status, created_at)
      values (${streamId}, ${requestId}, 'streaming', ${Date.now()})
    `;

    return streamId;
  }

  /**
   * Mark a stream as completed and flush any pending chunks.
   * @param streamId - The stream to mark as completed
   */
  complete(streamId: string) {
    this.flushBuffer();

    this.sql`
      update cf_ai_chat_stream_metadata 
      set status = 'completed', completed_at = ${Date.now()} 
      where id = ${streamId}
    `;
    this._activeStreamId = null;
    this._activeRequestId = null;
    this._streamChunkIndex = 0;
    this._isLive = false;

    // Periodically clean up old streams
    this._maybeCleanupOldStreams();
  }

  /**
   * Mark a stream as errored and clean up state.
   * @param streamId - The stream to mark as errored
   */
  markError(streamId: string) {
    this.flushBuffer();

    this.sql`
      update cf_ai_chat_stream_metadata 
      set status = 'error', completed_at = ${Date.now()} 
      where id = ${streamId}
    `;
    this._activeStreamId = null;
    this._activeRequestId = null;
    this._streamChunkIndex = 0;
    this._isLive = false;
  }

  // ── Chunk storage ──────────────────────────────────────────────────

  /** Maximum chunk body size before skipping storage (bytes). Prevents SQLite row limit crash. */
  private static CHUNK_MAX_BYTES = 1_800_000;

  /**
   * Buffer a stream chunk for batch write to SQLite.
   * Chunks exceeding the row size limit are skipped to prevent crashes.
   * The chunk is still broadcast to live clients (caller handles that),
   * but will be missing from replay on reconnection.
   * @param streamId - The stream this chunk belongs to
   * @param body - The serialized chunk body
   */
  storeChunk(streamId: string, body: string) {
    // Guard against chunks that would exceed SQLite row limit.
    // The chunk is still broadcast to live clients; only replay storage is skipped.
    const bodyBytes = textEncoder.encode(body).byteLength;
    if (bodyBytes > ResumableStream.CHUNK_MAX_BYTES) {
      console.warn(
        `[ResumableStream] Skipping oversized chunk (${bodyBytes} bytes) ` +
          `to prevent SQLite row limit crash. Live clients still receive it.`
      );
      return;
    }

    // Force flush if buffer is at max to prevent memory issues
    if (this._chunkBuffer.length >= CHUNK_BUFFER_MAX_SIZE) {
      this.flushBuffer();
    }

    this._chunkBuffer.push({
      id: nanoid(),
      streamId,
      body,
      index: this._streamChunkIndex
    });
    this._streamChunkIndex++;

    // Flush when buffer reaches threshold
    if (this._chunkBuffer.length >= CHUNK_BUFFER_SIZE) {
      this.flushBuffer();
    }
  }

  /**
   * Flush buffered chunks to SQLite in a single batch.
   * Uses a lock to prevent concurrent flush operations.
   */
  flushBuffer() {
    if (this._isFlushingChunks || this._chunkBuffer.length === 0) {
      return;
    }

    this._isFlushingChunks = true;
    try {
      const chunks = this._chunkBuffer;
      this._chunkBuffer = [];

      const now = Date.now();
      for (const chunk of chunks) {
        this.sql`
          insert into cf_ai_chat_stream_chunks (id, stream_id, body, chunk_index, created_at)
          values (${chunk.id}, ${chunk.streamId}, ${chunk.body}, ${chunk.index}, ${now})
        `;
      }
    } finally {
      this._isFlushingChunks = false;
    }
  }

  // ── Chunk replay ───────────────────────────────────────────────────

  /**
   * Send stored stream chunks to a connection for replay.
   * Chunks are marked with replay: true so the client can batch-apply them.
   *
   * Three outcomes:
   * - **Live stream**: sends chunks + `replayComplete` — client flushes and
   *   continues receiving live chunks from the LLM reader.
   * - **Orphaned stream** (restored from SQLite after hibernation, no reader):
   *   sends chunks + `done` and completes the stream. The caller should
   *   reconstruct and persist the partial message from the stored chunks.
   * - **Completed during replay** (defensive): sends chunks + `done`.
   *
   * @param connection - The WebSocket connection
   * @param requestId - The original request ID
   * @returns The stream ID if the stream was orphaned and finalized, null otherwise.
   *          When non-null the caller should reconstruct the message from chunks.
   */
  replayChunks(connection: Connection, requestId: string): string | null {
    const streamId = this._activeStreamId;
    if (!streamId) return null;

    this.flushBuffer();

    const chunks = this.sql<StreamChunk>`
      select * from cf_ai_chat_stream_chunks 
      where stream_id = ${streamId} 
      order by chunk_index asc
    `;

    for (const chunk of chunks || []) {
      connection.send(
        JSON.stringify({
          body: chunk.body,
          done: false,
          id: requestId,
          type: MessageType.CF_AGENT_USE_CHAT_RESPONSE,
          replay: true
        })
      );
    }

    if (this._activeStreamId !== streamId) {
      // Stream completed between our check above and now — send done.
      // In practice this cannot happen (DO is single-threaded and replay is
      // synchronous), but we guard defensively in case the flow changes.
      connection.send(
        JSON.stringify({
          body: "",
          done: true,
          id: requestId,
          type: MessageType.CF_AGENT_USE_CHAT_RESPONSE,
          replay: true
        })
      );
      return null;
    }

    if (!this._isLive) {
      // Orphaned stream — restored from SQLite after hibernation but the
      // LLM ReadableStream reader was lost. No more live chunks will ever
      // arrive, so finalize it: send done and mark completed in SQLite.
      connection.send(
        JSON.stringify({
          body: "",
          done: true,
          id: requestId,
          type: MessageType.CF_AGENT_USE_CHAT_RESPONSE,
          replay: true
        })
      );
      this.complete(streamId);
      return streamId;
    }

    // Stream is still active with a live reader — signal that replay is
    // complete so the client can flush accumulated parts to React state.
    // Without this, replayed chunks sit in activeStreamRef unflushed
    // until the next live chunk arrives.
    connection.send(
      JSON.stringify({
        body: "",
        done: false,
        id: requestId,
        type: MessageType.CF_AGENT_USE_CHAT_RESPONSE,
        replay: true,
        replayComplete: true
      })
    );
    return null;
  }

  // ── Restore / cleanup ──────────────────────────────────────────────

  /**
   * Restore active stream state if the agent was restarted during streaming.
   * Validates stream freshness to avoid sending stale resume notifications.
   */
  restore() {
    const activeStreams = this.sql<StreamMetadata>`
      select * from cf_ai_chat_stream_metadata 
      where status = 'streaming' 
      order by created_at desc 
      limit 1
    `;

    if (activeStreams && activeStreams.length > 0) {
      const stream = activeStreams[0];
      const streamAge = Date.now() - stream.created_at;

      // Check if stream is stale; delete to free storage
      if (streamAge > STREAM_STALE_THRESHOLD_MS) {
        this
          .sql`delete from cf_ai_chat_stream_chunks where stream_id = ${stream.id}`;
        this
          .sql`delete from cf_ai_chat_stream_metadata where id = ${stream.id}`;
        console.warn(
          `[ResumableStream] Deleted stale stream ${stream.id} (age: ${Math.round(streamAge / 1000)}s)`
        );
        return;
      }

      this._activeStreamId = stream.id;
      this._activeRequestId = stream.request_id;

      // Get the last chunk index
      const lastChunk = this.sql<{ max_index: number }>`
        select max(chunk_index) as max_index 
        from cf_ai_chat_stream_chunks 
        where stream_id = ${this._activeStreamId}
      `;
      this._streamChunkIndex =
        lastChunk && lastChunk[0]?.max_index != null
          ? lastChunk[0].max_index + 1
          : 0;
    }
  }

  /**
   * Clear all stream data (called on chat history clear).
   */
  clearAll() {
    this._chunkBuffer = [];
    this.sql`delete from cf_ai_chat_stream_chunks`;
    this.sql`delete from cf_ai_chat_stream_metadata`;
    this._activeStreamId = null;
    this._activeRequestId = null;
    this._streamChunkIndex = 0;
  }

  /**
   * Drop all stream tables (called on destroy).
   */
  destroy() {
    this.flushBuffer();
    this.sql`drop table if exists cf_ai_chat_stream_chunks`;
    this.sql`drop table if exists cf_ai_chat_stream_metadata`;
    this._activeStreamId = null;
    this._activeRequestId = null;
  }

  // ── Internal ───────────────────────────────────────────────────────

  private _maybeCleanupOldStreams() {
    const now = Date.now();
    if (now - this._lastCleanupTime < CLEANUP_INTERVAL_MS) {
      return;
    }
    this._lastCleanupTime = now;

    const cutoff = now - CLEANUP_AGE_THRESHOLD_MS;
    this.sql`
      delete from cf_ai_chat_stream_chunks 
      where stream_id in (
        select id from cf_ai_chat_stream_metadata 
        where status in ('completed', 'error') and completed_at < ${cutoff}
      )
    `;
    this.sql`
      delete from cf_ai_chat_stream_metadata 
      where status in ('completed', 'error') and completed_at < ${cutoff}
    `;
  }

  // ── Test helpers (matching old AIChatAgent test API) ────────────────

  /** @internal For testing only */
  getStreamChunks(
    streamId: string
  ): Array<{ body: string; chunk_index: number }> {
    return (
      this.sql<{ body: string; chunk_index: number }>`
        select body, chunk_index from cf_ai_chat_stream_chunks 
        where stream_id = ${streamId} 
        order by chunk_index asc
      ` || []
    );
  }

  /** @internal For testing only */
  getStreamMetadata(
    streamId: string
  ): { status: string; request_id: string } | null {
    const result = this.sql<{ status: string; request_id: string }>`
      select status, request_id from cf_ai_chat_stream_metadata 
      where id = ${streamId}
    `;
    return result && result.length > 0 ? result[0] : null;
  }

  /** @internal For testing only */
  getAllStreamMetadata(): Array<{
    id: string;
    status: string;
    request_id: string;
    created_at: number;
  }> {
    return (
      this.sql<{
        id: string;
        status: string;
        request_id: string;
        created_at: number;
      }>`select id, status, request_id, created_at from cf_ai_chat_stream_metadata` ||
      []
    );
  }

  /** @internal For testing only */
  insertStaleStream(streamId: string, requestId: string, ageMs: number): void {
    const createdAt = Date.now() - ageMs;
    this.sql`
      insert into cf_ai_chat_stream_metadata (id, request_id, status, created_at)
      values (${streamId}, ${requestId}, 'streaming', ${createdAt})
    `;
  }
}