diff options
Diffstat (limited to 'packages/cli/src/core/gemini-stream.ts')
| -rw-r--r-- | packages/cli/src/core/gemini-stream.ts | 260 |
1 files changed, 134 insertions, 126 deletions
diff --git a/packages/cli/src/core/gemini-stream.ts b/packages/cli/src/core/gemini-stream.ts index b47eb1c6..065d261a 100644 --- a/packages/cli/src/core/gemini-stream.ts +++ b/packages/cli/src/core/gemini-stream.ts @@ -1,167 +1,175 @@ -import { ToolCallEvent } from "../ui/types.js"; +import { ToolCallEvent } from '../ui/types.js'; import { Part } from '@google/genai'; import { HistoryItem } from '../ui/types.js'; -import { handleToolCallChunk, addErrorMessageToHistory } from './history-updater.js'; +import { + handleToolCallChunk, + addErrorMessageToHistory, +} from './history-updater.js'; export enum GeminiEventType { - Content, - ToolCallInfo, + Content, + ToolCallInfo, } export interface GeminiContentEvent { - type: GeminiEventType.Content; - value: string; + type: GeminiEventType.Content; + value: string; } export interface GeminiToolCallInfoEvent { - type: GeminiEventType.ToolCallInfo; - value: ToolCallEvent; + type: GeminiEventType.ToolCallInfo; + value: ToolCallEvent; } -export type GeminiEvent = - | GeminiContentEvent - | GeminiToolCallInfoEvent; +export type GeminiEvent = GeminiContentEvent | GeminiToolCallInfoEvent; export type GeminiStream = AsyncIterable<GeminiEvent>; export enum StreamingState { - Idle, - Responding, + Idle, + Responding, } interface StreamProcessorParams { - stream: GeminiStream; - signal: AbortSignal; - setHistory: React.Dispatch<React.SetStateAction<HistoryItem[]>>; - submitQuery: (query: Part) => Promise<void>, - getNextMessageId: () => number; - addHistoryItem: (itemData: Omit<HistoryItem, 'id'>, id: number) => void; - currentToolGroupIdRef: React.MutableRefObject<number | null>; + stream: GeminiStream; + signal: AbortSignal; + setHistory: React.Dispatch<React.SetStateAction<HistoryItem[]>>; + submitQuery: (query: Part) => Promise<void>; + getNextMessageId: () => number; + addHistoryItem: (itemData: Omit<HistoryItem, 'id'>, id: number) => void; + currentToolGroupIdRef: React.MutableRefObject<number | null>; } /** * Processes the Gemini stream, managing text buffering, adaptive rendering, * and delegating history updates for tool calls and errors. */ -export const processGeminiStream = async ({ // Renamed function for clarity - stream, - signal, - setHistory, - submitQuery, - getNextMessageId, - addHistoryItem, - currentToolGroupIdRef, +export const processGeminiStream = async ({ + // Renamed function for clarity + stream, + signal, + setHistory, + submitQuery, + getNextMessageId, + addHistoryItem, + currentToolGroupIdRef, }: StreamProcessorParams): Promise<void> => { - // --- State specific to this stream processing invocation --- - let textBuffer = ''; - let renderTimeoutId: NodeJS.Timeout | null = null; - let isStreamComplete = false; - let currentGeminiMessageId: number | null = null; + // --- State specific to this stream processing invocation --- + let textBuffer = ''; + let renderTimeoutId: NodeJS.Timeout | null = null; + let isStreamComplete = false; + let currentGeminiMessageId: number | null = null; - const render = (content: string) => { - if (currentGeminiMessageId === null) { - return; - } - setHistory(prev => prev.map(item => - item.id === currentGeminiMessageId && item.type === 'gemini' - ? { ...item, text: (item.text ?? '') + content } - : item - )); + const render = (content: string) => { + if (currentGeminiMessageId === null) { + return; + } + setHistory((prev) => + prev.map((item) => + item.id === currentGeminiMessageId && item.type === 'gemini' + ? { ...item, text: (item.text ?? '') + content } + : item, + ), + ); + }; + // --- Adaptive Rendering Logic (nested) --- + const renderBufferedText = () => { + if (signal.aborted) { + if (renderTimeoutId) clearTimeout(renderTimeoutId); + renderTimeoutId = null; + return; } - // --- Adaptive Rendering Logic (nested) --- - const renderBufferedText = () => { - if (signal.aborted) { - if (renderTimeoutId) clearTimeout(renderTimeoutId); - renderTimeoutId = null; - return; - } - - const bufferLength = textBuffer.length; - let chunkSize = 0; - let delay = 50; - - if (bufferLength > 150) { - chunkSize = Math.min(bufferLength, 30); delay = 5; - } else if (bufferLength > 30) { - chunkSize = Math.min(bufferLength, 10); delay = 10; - } else if (bufferLength > 0) { - chunkSize = 2; delay = 20; - } - if (chunkSize > 0) { - const chunkToRender = textBuffer.substring(0, chunkSize); - textBuffer = textBuffer.substring(chunkSize); - render(chunkToRender); + const bufferLength = textBuffer.length; + let chunkSize = 0; + let delay = 50; - renderTimeoutId = setTimeout(renderBufferedText, delay); - } else { - renderTimeoutId = null; // Clear timeout ID if nothing to render - if (!isStreamComplete) { - // Buffer empty, but stream might still send data, check again later - renderTimeoutId = setTimeout(renderBufferedText, 50); - } - } - }; + if (bufferLength > 150) { + chunkSize = Math.min(bufferLength, 30); + delay = 5; + } else if (bufferLength > 30) { + chunkSize = Math.min(bufferLength, 10); + delay = 10; + } else if (bufferLength > 0) { + chunkSize = 2; + delay = 20; + } - const scheduleRender = () => { - if (renderTimeoutId === null) { - renderTimeoutId = setTimeout(renderBufferedText, 0); - } - }; + if (chunkSize > 0) { + const chunkToRender = textBuffer.substring(0, chunkSize); + textBuffer = textBuffer.substring(chunkSize); + render(chunkToRender); - // --- Stream Processing Loop --- - try { - for await (const chunk of stream) { - if (signal.aborted) break; + renderTimeoutId = setTimeout(renderBufferedText, delay); + } else { + renderTimeoutId = null; // Clear timeout ID if nothing to render + if (!isStreamComplete) { + // Buffer empty, but stream might still send data, check again later + renderTimeoutId = setTimeout(renderBufferedText, 50); + } + } + }; - if (chunk.type === GeminiEventType.Content) { - currentToolGroupIdRef.current = null; // Reset tool group on text + const scheduleRender = () => { + if (renderTimeoutId === null) { + renderTimeoutId = setTimeout(renderBufferedText, 0); + } + }; - if (currentGeminiMessageId === null) { - currentGeminiMessageId = getNextMessageId(); - addHistoryItem({ type: 'gemini', text: '' }, currentGeminiMessageId); - textBuffer = ''; - } - textBuffer += chunk.value; - scheduleRender(); + // --- Stream Processing Loop --- + try { + for await (const chunk of stream) { + if (signal.aborted) break; - } else if (chunk.type === GeminiEventType.ToolCallInfo) { - if (renderTimeoutId) { // Stop rendering loop - clearTimeout(renderTimeoutId); - renderTimeoutId = null; - } - // Flush any text buffer content. - render(textBuffer); - currentGeminiMessageId = null; // End text message context - textBuffer = ''; // Clear buffer + if (chunk.type === GeminiEventType.Content) { + currentToolGroupIdRef.current = null; // Reset tool group on text - // Delegate history update for tool call - handleToolCallChunk( - chunk.value, - setHistory, - submitQuery, - getNextMessageId, - currentToolGroupIdRef - ); - } - } - if (signal.aborted) { - throw new Error("Request cancelled by user"); - } - } catch (error: any) { - if (renderTimeoutId) { // Ensure render loop stops on error - clearTimeout(renderTimeoutId); - renderTimeoutId = null; + if (currentGeminiMessageId === null) { + currentGeminiMessageId = getNextMessageId(); + addHistoryItem({ type: 'gemini', text: '' }, currentGeminiMessageId); + textBuffer = ''; } - // Delegate history update for error message - addErrorMessageToHistory(error, setHistory, getNextMessageId); - } finally { - isStreamComplete = true; // Signal stream end for render loop completion + textBuffer += chunk.value; + scheduleRender(); + } else if (chunk.type === GeminiEventType.ToolCallInfo) { if (renderTimeoutId) { - clearTimeout(renderTimeoutId); - renderTimeoutId = null; + // Stop rendering loop + clearTimeout(renderTimeoutId); + renderTimeoutId = null; } + // Flush any text buffer content. + render(textBuffer); + currentGeminiMessageId = null; // End text message context + textBuffer = ''; // Clear buffer - renderBufferedText(); // Force final render + // Delegate history update for tool call + handleToolCallChunk( + chunk.value, + setHistory, + submitQuery, + getNextMessageId, + currentToolGroupIdRef, + ); + } } -};
\ No newline at end of file + if (signal.aborted) { + throw new Error('Request cancelled by user'); + } + } catch (error: any) { + if (renderTimeoutId) { + // Ensure render loop stops on error + clearTimeout(renderTimeoutId); + renderTimeoutId = null; + } + // Delegate history update for error message + addErrorMessageToHistory(error, setHistory, getNextMessageId); + } finally { + isStreamComplete = true; // Signal stream end for render loop completion + if (renderTimeoutId) { + clearTimeout(renderTimeoutId); + renderTimeoutId = null; + } + + renderBufferedText(); // Force final render + } +}; |
