branch:
text-stream.ts
4985 bytesRaw
/**
* Utilities for normalising various text-producing sources into a uniform
* `AsyncGenerator<string>`. This lets `onTurn()` return any of:
*
* - A plain `string`
* - An `AsyncIterable<string>` (e.g. AI SDK `textStream`)
* - A `ReadableStream<Uint8Array>` (e.g. a raw `fetch` response body
* containing newline-delimited JSON / SSE)
* - A `ReadableStream<string>`
*
* The generator yields individual text chunks as they become available.
*/
/** Union of every source type that {@link iterateText} accepts. */
export type TextSource =
| string
| ReadableStream<Uint8Array>
| ReadableStream<string>
| AsyncIterable<string>;
/** Shape of a parsed NDJSON/SSE chunk from common AI APIs. */
interface AIStreamChunk {
response?: string;
choices?: {
delta?: { content?: string; role?: string };
}[];
}
/**
* Turn any {@link TextSource} into a lazy async generator of string chunks.
*
* - `string` → yields the string once (if non-empty).
* - `ReadableStream<string>` → yields each chunk directly.
* - `ReadableStream<Uint8Array>` → decodes and parses as newline-delimited
* JSON (NDJSON) / SSE (`data: …` lines), extracting text from common AI
* response formats.
* - `AsyncIterable<string>` → re-yields each chunk.
*/
export async function* iterateText(source: TextSource): AsyncGenerator<string> {
// --- plain string ---
if (typeof source === "string") {
if (source) yield source;
return;
}
// --- ReadableStream ---
if (source instanceof ReadableStream) {
const reader = (source as ReadableStream<string | Uint8Array>).getReader();
const first = await reader.read();
if (first.done || first.value === undefined) return;
if (typeof first.value === "string") {
// ReadableStream<string> — yield chunks as-is
if (first.value) yield first.value;
while (true) {
const { done, value } = await reader.read();
if (done) break;
if (typeof value === "string" && value) yield value;
}
} else {
// ReadableStream<Uint8Array> — re-assemble into an NDJSON stream
// by pushing the already-read first chunk back into a new stream.
const peeked = first.value as Uint8Array;
const combined = new ReadableStream<Uint8Array>({
async start(controller) {
controller.enqueue(peeked);
while (true) {
const { done, value } = await reader.read();
if (done) break;
controller.enqueue(value as Uint8Array);
}
controller.close();
}
});
for await (const chunk of parseNDJSON(combined.getReader())) {
const ai = chunk as AIStreamChunk;
if (ai.response) {
yield ai.response;
} else if (ai.choices && ai.choices.length > 0) {
const choice = ai.choices[0];
if (choice.delta?.content && choice.delta?.role === "assistant") {
yield choice.delta.content;
}
}
}
}
return;
}
// --- AsyncIterable<string> ---
if (Symbol.asyncIterator in source) {
for await (const chunk of source as AsyncIterable<string>) {
if (typeof chunk === "string" && chunk) yield chunk;
}
}
}
// ---------------------------------------------------------------------------
// Internal: NDJSON / SSE stream parser
// ---------------------------------------------------------------------------
/**
* Parse a `ReadableStream<Uint8Array>` that contains newline-delimited JSON
* or Server-Sent Events (`data: {…}` lines). Yields each parsed JSON object.
*
* Handles the `data: [DONE]` sentinel used by OpenAI-compatible APIs.
*/
async function* parseNDJSON(
reader: ReadableStreamDefaultReader<Uint8Array>,
leftOverBuffer = ""
): AsyncGenerator<Record<string, unknown>> {
const decoder = new TextDecoder();
let buffer = leftOverBuffer;
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() ?? "";
for (const line of lines) {
const parsed = parseLine(line);
if (parsed === "DONE") return;
if (parsed) yield parsed;
}
}
// Flush remaining buffer
if (buffer.trim()) {
const remaining = buffer.split("\n").filter((l) => l.trim());
for (const line of remaining) {
const parsed = parseLine(line);
if (parsed === "DONE") return;
if (parsed) yield parsed;
}
}
}
function parseLine(line: string): Record<string, unknown> | "DONE" | null {
const trimmed = line.trim();
if (!trimmed) return null;
if (trimmed.startsWith("data: ")) {
const json = trimmed.slice(6).trim();
if (json === "[DONE]") return "DONE";
try {
return JSON.parse(json) as Record<string, unknown>;
} catch {
console.warn("[voice] Skipping malformed SSE data:", json);
return null;
}
}
return null;
}