/** * Utilities for normalising various text-producing sources into a uniform * `AsyncGenerator`. This lets `onTurn()` return any of: * * - A plain `string` * - An `AsyncIterable` (e.g. AI SDK `textStream`) * - A `ReadableStream` (e.g. a raw `fetch` response body * containing newline-delimited JSON / SSE) * - A `ReadableStream` * * The generator yields individual text chunks as they become available. */ /** Union of every source type that {@link iterateText} accepts. */ export type TextSource = | string | ReadableStream | ReadableStream | AsyncIterable; /** Shape of a parsed NDJSON/SSE chunk from common AI APIs. */ interface AIStreamChunk { response?: string; choices?: { delta?: { content?: string; role?: string }; }[]; } /** * Turn any {@link TextSource} into a lazy async generator of string chunks. * * - `string` → yields the string once (if non-empty). * - `ReadableStream` → yields each chunk directly. * - `ReadableStream` → decodes and parses as newline-delimited * JSON (NDJSON) / SSE (`data: …` lines), extracting text from common AI * response formats. * - `AsyncIterable` → re-yields each chunk. */ export async function* iterateText(source: TextSource): AsyncGenerator { // --- plain string --- if (typeof source === "string") { if (source) yield source; return; } // --- ReadableStream --- if (source instanceof ReadableStream) { const reader = (source as ReadableStream).getReader(); const first = await reader.read(); if (first.done || first.value === undefined) return; if (typeof first.value === "string") { // ReadableStream — yield chunks as-is if (first.value) yield first.value; while (true) { const { done, value } = await reader.read(); if (done) break; if (typeof value === "string" && value) yield value; } } else { // ReadableStream — re-assemble into an NDJSON stream // by pushing the already-read first chunk back into a new stream. const peeked = first.value as Uint8Array; const combined = new ReadableStream({ async start(controller) { controller.enqueue(peeked); while (true) { const { done, value } = await reader.read(); if (done) break; controller.enqueue(value as Uint8Array); } controller.close(); } }); for await (const chunk of parseNDJSON(combined.getReader())) { const ai = chunk as AIStreamChunk; if (ai.response) { yield ai.response; } else if (ai.choices && ai.choices.length > 0) { const choice = ai.choices[0]; if (choice.delta?.content && choice.delta?.role === "assistant") { yield choice.delta.content; } } } } return; } // --- AsyncIterable --- if (Symbol.asyncIterator in source) { for await (const chunk of source as AsyncIterable) { if (typeof chunk === "string" && chunk) yield chunk; } } } // --------------------------------------------------------------------------- // Internal: NDJSON / SSE stream parser // --------------------------------------------------------------------------- /** * Parse a `ReadableStream` that contains newline-delimited JSON * or Server-Sent Events (`data: {…}` lines). Yields each parsed JSON object. * * Handles the `data: [DONE]` sentinel used by OpenAI-compatible APIs. */ async function* parseNDJSON( reader: ReadableStreamDefaultReader, leftOverBuffer = "" ): AsyncGenerator> { const decoder = new TextDecoder(); let buffer = leftOverBuffer; while (true) { const { done, value } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); const lines = buffer.split("\n"); buffer = lines.pop() ?? ""; for (const line of lines) { const parsed = parseLine(line); if (parsed === "DONE") return; if (parsed) yield parsed; } } // Flush remaining buffer if (buffer.trim()) { const remaining = buffer.split("\n").filter((l) => l.trim()); for (const line of remaining) { const parsed = parseLine(line); if (parsed === "DONE") return; if (parsed) yield parsed; } } } function parseLine(line: string): Record | "DONE" | null { const trimmed = line.trim(); if (!trimmed) return null; if (trimmed.startsWith("data: ")) { const json = trimmed.slice(6).trim(); if (json === "[DONE]") return "DONE"; try { return JSON.parse(json) as Record; } catch { console.warn("[voice] Skipping malformed SSE data:", json); return null; } } return null; }