|
|
|
|
@ -199,20 +199,32 @@ export const readStreamData = async (
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Process the remainder of the buffer
|
|
|
|
|
const processBuffer = (buffer: string, callback: (data: any) => void) => {
|
|
|
|
|
const processBuffer = async (buffer: string, callback: (data: any) => void) => {
|
|
|
|
|
if (!buffer) return;
|
|
|
|
|
|
|
|
|
|
const lines = buffer.split('\n');
|
|
|
|
|
for (const line of lines) {
|
|
|
|
|
if (line.startsWith('data: ')) {
|
|
|
|
|
const jsonStr = line.slice(6).trim();
|
|
|
|
|
const trimmedLine = line.trim();
|
|
|
|
|
|
|
|
|
|
if (trimmedLine.startsWith('data: ')) {
|
|
|
|
|
const jsonStr = trimmedLine.slice(6).trim();
|
|
|
|
|
try {
|
|
|
|
|
const jsonData = JSON.parse(jsonStr);
|
|
|
|
|
callback(jsonData);
|
|
|
|
|
if (jsonStr !== '[DONE]') {
|
|
|
|
|
console.log('jsonStr>>>>>>>>>>>>>done:', jsonStr);
|
|
|
|
|
const jsonData = JSON.parse(jsonStr);
|
|
|
|
|
callback(jsonData);
|
|
|
|
|
}
|
|
|
|
|
} catch (e) {
|
|
|
|
|
console.error(
|
|
|
|
|
'Failed to parse JSON from remaining buffer:',
|
|
|
|
|
jsonStr,
|
|
|
|
|
e
|
|
|
|
|
);
|
|
|
|
|
console.error('Failed to parse JSON from line:', jsonStr, e);
|
|
|
|
|
}
|
|
|
|
|
} else if (trimmedLine.startsWith('error:')) {
|
|
|
|
|
const errorStr = trimmedLine.slice(7).trim();
|
|
|
|
|
console.log('jsonStr>>>>>>>>>>>>>error:', errorStr);
|
|
|
|
|
try {
|
|
|
|
|
const jsonData = JSON.parse(errorStr);
|
|
|
|
|
callback({ error: jsonData });
|
|
|
|
|
} catch (e) {
|
|
|
|
|
console.error('Failed to parse error JSON from line:', errorStr, e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@ -221,47 +233,89 @@ const processBuffer = (buffer: string, callback: (data: any) => void) => {
|
|
|
|
|
export const readLargeStreamData = async (
|
|
|
|
|
reader: any,
|
|
|
|
|
decoder: TextDecoder,
|
|
|
|
|
callback: (data: any) => void
|
|
|
|
|
callback: (data: any) => void,
|
|
|
|
|
throttleDelay = 200
|
|
|
|
|
) => {
|
|
|
|
|
let buffer = ''; // cache incomplete line
|
|
|
|
|
|
|
|
|
|
class BufferManager {
|
|
|
|
|
private buffer: any[] = [];
|
|
|
|
|
private failed: boolean = false;
|
|
|
|
|
private isFlushing: boolean = false;
|
|
|
|
|
private callback: (data: any) => void;
|
|
|
|
|
|
|
|
|
|
constructor(callback: (data: any) => void) {
|
|
|
|
|
this.callback = callback;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public add(data: any) {
|
|
|
|
|
this.buffer.push(data);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public async flush() {
|
|
|
|
|
if (this.buffer.length === 0 || this.isFlushing) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
this.failed = false;
|
|
|
|
|
this.isFlushing = true;
|
|
|
|
|
|
|
|
|
|
while (this.buffer.length > 0) {
|
|
|
|
|
const data = this.buffer.shift();
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
processBuffer(data, this.callback);
|
|
|
|
|
} catch (error) {
|
|
|
|
|
console.error('Error processing buffer:', error);
|
|
|
|
|
this.failed = true;
|
|
|
|
|
this.buffer.unshift(data);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
this.isFlushing = false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public getBuffer() {
|
|
|
|
|
return this.buffer;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const bufferManager = new BufferManager(callback);
|
|
|
|
|
|
|
|
|
|
const throttledCallback = throttle(async () => {
|
|
|
|
|
bufferManager.flush();
|
|
|
|
|
}, throttleDelay);
|
|
|
|
|
|
|
|
|
|
let isReading = true;
|
|
|
|
|
|
|
|
|
|
while (true) {
|
|
|
|
|
const { done, value } = await reader?.read?.();
|
|
|
|
|
|
|
|
|
|
if (done) {
|
|
|
|
|
isReading = false;
|
|
|
|
|
// Process remaining buffered data
|
|
|
|
|
if (buffer.trim()) {
|
|
|
|
|
processBuffer(buffer, callback);
|
|
|
|
|
if (buffer) {
|
|
|
|
|
bufferManager.add(buffer);
|
|
|
|
|
}
|
|
|
|
|
bufferManager.flush();
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Decode new chunk of data and append to buffer
|
|
|
|
|
buffer += decoder.decode(value, { stream: true });
|
|
|
|
|
|
|
|
|
|
// Try to process the complete line in the buffer
|
|
|
|
|
const lines = buffer.split('\n');
|
|
|
|
|
buffer = lines.pop() || ''; // Keep last line (may be incomplete)
|
|
|
|
|
try {
|
|
|
|
|
// Decode new chunk of data and append to buffer
|
|
|
|
|
buffer += decoder.decode(value, { stream: true });
|
|
|
|
|
|
|
|
|
|
for (const line of lines) {
|
|
|
|
|
if (line.startsWith('data: ')) {
|
|
|
|
|
const jsonStr = line.slice(6).trim();
|
|
|
|
|
// Try to process the complete line in the buffer
|
|
|
|
|
const lines = buffer.split('\n');
|
|
|
|
|
buffer = lines.pop() || ''; // Keep last line (may be incomplete)
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
if (jsonStr !== '[DONE]') {
|
|
|
|
|
const jsonData = JSON.parse(jsonStr);
|
|
|
|
|
callback(jsonData);
|
|
|
|
|
}
|
|
|
|
|
} catch (e) {
|
|
|
|
|
console.error('Failed to parse JSON:', jsonStr, e);
|
|
|
|
|
}
|
|
|
|
|
for (const line of lines) {
|
|
|
|
|
bufferManager.add(line);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (line.startsWith('error:')) {
|
|
|
|
|
const errorStr = line.slice(7).trim();
|
|
|
|
|
const jsonData = JSON.parse(errorStr);
|
|
|
|
|
callback({ error: jsonData });
|
|
|
|
|
}
|
|
|
|
|
throttledCallback();
|
|
|
|
|
} catch (error) {
|
|
|
|
|
console.log('Error reading stream data:', error);
|
|
|
|
|
// do nothing
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|