343 lines
No EOL
12 KiB
JavaScript
343 lines
No EOL
12 KiB
JavaScript
import { ReadableStream } from "./_shims/index.mjs";
|
|
import { OpenAIError } from "./error.mjs";
|
|
import { APIError } from 'openai/error';
|
|
export class Stream {
|
|
constructor(iterator, controller) {
|
|
this.iterator = iterator;
|
|
this.controller = controller;
|
|
}
|
|
static fromSSEResponse(response, controller) {
|
|
let consumed = false;
|
|
const decoder = new SSEDecoder();
|
|
async function* iterMessages() {
|
|
if (!response.body) {
|
|
controller.abort();
|
|
throw new OpenAIError(`Attempted to iterate over a response with no body`);
|
|
}
|
|
const lineDecoder = new LineDecoder();
|
|
const iter = readableStreamAsyncIterable(response.body);
|
|
for await (const chunk of iter) {
|
|
for (const line of lineDecoder.decode(chunk)) {
|
|
const sse = decoder.decode(line);
|
|
if (sse)
|
|
yield sse;
|
|
}
|
|
}
|
|
for (const line of lineDecoder.flush()) {
|
|
const sse = decoder.decode(line);
|
|
if (sse)
|
|
yield sse;
|
|
}
|
|
}
|
|
async function* iterator() {
|
|
if (consumed) {
|
|
throw new Error('Cannot iterate over a consumed stream, use `.tee()` to split the stream.');
|
|
}
|
|
consumed = true;
|
|
let done = false;
|
|
try {
|
|
for await (const sse of iterMessages()) {
|
|
if (done)
|
|
continue;
|
|
if (sse.data.startsWith('[DONE]')) {
|
|
done = true;
|
|
continue;
|
|
}
|
|
if (sse.event === null) {
|
|
let data;
|
|
try {
|
|
data = JSON.parse(sse.data);
|
|
}
|
|
catch (e) {
|
|
console.error(`Could not parse message into JSON:`, sse.data);
|
|
console.error(`From chunk:`, sse.raw);
|
|
throw e;
|
|
}
|
|
if (data && data.error) {
|
|
throw new APIError(undefined, data.error, undefined, undefined);
|
|
}
|
|
yield data;
|
|
}
|
|
}
|
|
done = true;
|
|
}
|
|
catch (e) {
|
|
// If the user calls `stream.controller.abort()`, we should exit without throwing.
|
|
if (e instanceof Error && e.name === 'AbortError')
|
|
return;
|
|
throw e;
|
|
}
|
|
finally {
|
|
// If the user `break`s, abort the ongoing request.
|
|
if (!done)
|
|
controller.abort();
|
|
}
|
|
}
|
|
return new Stream(iterator, controller);
|
|
}
|
|
/**
|
|
* Generates a Stream from a newline-separated ReadableStream
|
|
* where each item is a JSON value.
|
|
*/
|
|
static fromReadableStream(readableStream, controller) {
|
|
let consumed = false;
|
|
async function* iterLines() {
|
|
const lineDecoder = new LineDecoder();
|
|
const iter = readableStreamAsyncIterable(readableStream);
|
|
for await (const chunk of iter) {
|
|
for (const line of lineDecoder.decode(chunk)) {
|
|
yield line;
|
|
}
|
|
}
|
|
for (const line of lineDecoder.flush()) {
|
|
yield line;
|
|
}
|
|
}
|
|
async function* iterator() {
|
|
if (consumed) {
|
|
throw new Error('Cannot iterate over a consumed stream, use `.tee()` to split the stream.');
|
|
}
|
|
consumed = true;
|
|
let done = false;
|
|
try {
|
|
for await (const line of iterLines()) {
|
|
if (done)
|
|
continue;
|
|
if (line)
|
|
yield JSON.parse(line);
|
|
}
|
|
done = true;
|
|
}
|
|
catch (e) {
|
|
// If the user calls `stream.controller.abort()`, we should exit without throwing.
|
|
if (e instanceof Error && e.name === 'AbortError')
|
|
return;
|
|
throw e;
|
|
}
|
|
finally {
|
|
// If the user `break`s, abort the ongoing request.
|
|
if (!done)
|
|
controller.abort();
|
|
}
|
|
}
|
|
return new Stream(iterator, controller);
|
|
}
|
|
[Symbol.asyncIterator]() {
|
|
return this.iterator();
|
|
}
|
|
/**
|
|
* Splits the stream into two streams which can be
|
|
* independently read from at different speeds.
|
|
*/
|
|
tee() {
|
|
const left = [];
|
|
const right = [];
|
|
const iterator = this.iterator();
|
|
const teeIterator = (queue) => {
|
|
return {
|
|
next: () => {
|
|
if (queue.length === 0) {
|
|
const result = iterator.next();
|
|
left.push(result);
|
|
right.push(result);
|
|
}
|
|
return queue.shift();
|
|
},
|
|
};
|
|
};
|
|
return [
|
|
new Stream(() => teeIterator(left), this.controller),
|
|
new Stream(() => teeIterator(right), this.controller),
|
|
];
|
|
}
|
|
/**
|
|
* Converts this stream to a newline-separated ReadableStream of
|
|
* JSON stringified values in the stream
|
|
* which can be turned back into a Stream with `Stream.fromReadableStream()`.
|
|
*/
|
|
toReadableStream() {
|
|
const self = this;
|
|
let iter;
|
|
const encoder = new TextEncoder();
|
|
return new ReadableStream({
|
|
async start() {
|
|
iter = self[Symbol.asyncIterator]();
|
|
},
|
|
async pull(ctrl) {
|
|
try {
|
|
const { value, done } = await iter.next();
|
|
if (done)
|
|
return ctrl.close();
|
|
const bytes = encoder.encode(JSON.stringify(value) + '\n');
|
|
ctrl.enqueue(bytes);
|
|
}
|
|
catch (err) {
|
|
ctrl.error(err);
|
|
}
|
|
},
|
|
async cancel() {
|
|
await iter.return?.();
|
|
},
|
|
});
|
|
}
|
|
}
|
|
class SSEDecoder {
|
|
constructor() {
|
|
this.event = null;
|
|
this.data = [];
|
|
this.chunks = [];
|
|
}
|
|
decode(line) {
|
|
if (line.endsWith('\r')) {
|
|
line = line.substring(0, line.length - 1);
|
|
}
|
|
if (!line) {
|
|
// empty line and we didn't previously encounter any messages
|
|
if (!this.event && !this.data.length)
|
|
return null;
|
|
const sse = {
|
|
event: this.event,
|
|
data: this.data.join('\n'),
|
|
raw: this.chunks,
|
|
};
|
|
this.event = null;
|
|
this.data = [];
|
|
this.chunks = [];
|
|
return sse;
|
|
}
|
|
this.chunks.push(line);
|
|
if (line.startsWith(':')) {
|
|
return null;
|
|
}
|
|
let [fieldname, _, value] = partition(line, ':');
|
|
if (value.startsWith(' ')) {
|
|
value = value.substring(1);
|
|
}
|
|
if (fieldname === 'event') {
|
|
this.event = value;
|
|
}
|
|
else if (fieldname === 'data') {
|
|
this.data.push(value);
|
|
}
|
|
return null;
|
|
}
|
|
}
|
|
/**
|
|
* A re-implementation of httpx's `LineDecoder` in Python that handles incrementally
|
|
* reading lines from text.
|
|
*
|
|
* https://github.com/encode/httpx/blob/920333ea98118e9cf617f246905d7b202510941c/httpx/_decoders.py#L258
|
|
*/
|
|
class LineDecoder {
|
|
constructor() {
|
|
this.buffer = [];
|
|
this.trailingCR = false;
|
|
}
|
|
decode(chunk) {
|
|
let text = this.decodeText(chunk);
|
|
if (this.trailingCR) {
|
|
text = '\r' + text;
|
|
this.trailingCR = false;
|
|
}
|
|
if (text.endsWith('\r')) {
|
|
this.trailingCR = true;
|
|
text = text.slice(0, -1);
|
|
}
|
|
if (!text) {
|
|
return [];
|
|
}
|
|
const trailingNewline = LineDecoder.NEWLINE_CHARS.has(text[text.length - 1] || '');
|
|
let lines = text.split(LineDecoder.NEWLINE_REGEXP);
|
|
if (lines.length === 1 && !trailingNewline) {
|
|
this.buffer.push(lines[0]);
|
|
return [];
|
|
}
|
|
if (this.buffer.length > 0) {
|
|
lines = [this.buffer.join('') + lines[0], ...lines.slice(1)];
|
|
this.buffer = [];
|
|
}
|
|
if (!trailingNewline) {
|
|
this.buffer = [lines.pop() || ''];
|
|
}
|
|
return lines;
|
|
}
|
|
decodeText(bytes) {
|
|
if (bytes == null)
|
|
return '';
|
|
if (typeof bytes === 'string')
|
|
return bytes;
|
|
// Node:
|
|
if (typeof Buffer !== 'undefined') {
|
|
if (bytes instanceof Buffer) {
|
|
return bytes.toString();
|
|
}
|
|
if (bytes instanceof Uint8Array) {
|
|
return Buffer.from(bytes).toString();
|
|
}
|
|
throw new OpenAIError(`Unexpected: received non-Uint8Array (${bytes.constructor.name}) stream chunk in an environment with a global "Buffer" defined, which this library assumes to be Node. Please report this error.`);
|
|
}
|
|
// Browser
|
|
if (typeof TextDecoder !== 'undefined') {
|
|
if (bytes instanceof Uint8Array || bytes instanceof ArrayBuffer) {
|
|
this.textDecoder ?? (this.textDecoder = new TextDecoder('utf8'));
|
|
return this.textDecoder.decode(bytes);
|
|
}
|
|
throw new OpenAIError(`Unexpected: received non-Uint8Array/ArrayBuffer (${bytes.constructor.name}) in a web platform. Please report this error.`);
|
|
}
|
|
throw new OpenAIError(`Unexpected: neither Buffer nor TextDecoder are available as globals. Please report this error.`);
|
|
}
|
|
flush() {
|
|
if (!this.buffer.length && !this.trailingCR) {
|
|
return [];
|
|
}
|
|
const lines = [this.buffer.join('')];
|
|
this.buffer = [];
|
|
this.trailingCR = false;
|
|
return lines;
|
|
}
|
|
}
|
|
// prettier-ignore
|
|
LineDecoder.NEWLINE_CHARS = new Set(['\n', '\r', '\x0b', '\x0c', '\x1c', '\x1d', '\x1e', '\x85', '\u2028', '\u2029']);
|
|
LineDecoder.NEWLINE_REGEXP = /\r\n|[\n\r\x0b\x0c\x1c\x1d\x1e\x85\u2028\u2029]/g;
|
|
function partition(str, delimiter) {
|
|
const index = str.indexOf(delimiter);
|
|
if (index !== -1) {
|
|
return [str.substring(0, index), delimiter, str.substring(index + delimiter.length)];
|
|
}
|
|
return [str, '', ''];
|
|
}
|
|
/**
|
|
* Most browsers don't yet have async iterable support for ReadableStream,
|
|
* and Node has a very different way of reading bytes from its "ReadableStream".
|
|
*
|
|
* This polyfill was pulled from https://github.com/MattiasBuelens/web-streams-polyfill/pull/122#issuecomment-1627354490
|
|
*/
|
|
export function readableStreamAsyncIterable(stream) {
|
|
if (stream[Symbol.asyncIterator])
|
|
return stream;
|
|
const reader = stream.getReader();
|
|
return {
|
|
async next() {
|
|
try {
|
|
const result = await reader.read();
|
|
if (result?.done)
|
|
reader.releaseLock(); // release lock when stream becomes closed
|
|
return result;
|
|
}
|
|
catch (e) {
|
|
reader.releaseLock(); // release lock when stream becomes errored
|
|
throw e;
|
|
}
|
|
},
|
|
async return() {
|
|
const cancelPromise = reader.cancel();
|
|
reader.releaseLock();
|
|
await cancelPromise;
|
|
return { done: true, value: undefined };
|
|
},
|
|
[Symbol.asyncIterator]() {
|
|
return this;
|
|
},
|
|
};
|
|
}
|
|
//# sourceMappingURL=streaming.mjs.map
|