maestro/src/engine/llm-stream.ts
2026-06-03 05:08:00 +00:00

206 lines
6.8 KiB
TypeScript

import type {
Message,
ToolDef,
ToolCall,
OpenAICompatClient,
LLMEvent,
} from '../llm/openai-compat.js';
import { logger } from '../logger.js';
import { stripThinkingTokens } from './strip-thinking.js';
const ISOLATED_TOOL_USE_ERROR = (name: string) =>
`Isolated LLM call unexpectedly requested tool "${name}"`;
/**
* Run an isolated text-only LLM call: no tools, no callbacks, no state.
* Used by the prompt-guard summarization stage and by buildContextOverflowResult
* to produce a last-resort handoff summary.
*
* Throws if the LLM tries to invoke a tool or returns an error event.
*/
export async function runIsolatedLlm(
client: OpenAICompatClient,
messages: Message[],
cancelSignal?: AbortSignal,
): Promise<string> {
let output = '';
for await (const event of client.chat(messages, undefined, cancelSignal)) {
if (event.type === 'text') {
output += event.text;
continue;
}
if (event.type === 'tool_use') {
throw new Error(ISOLATED_TOOL_USE_ERROR(event.name));
}
if (event.type === 'error') {
throw new Error(event.error);
}
}
return stripThinkingTokens(output);
}
export interface PromptProgress {
processed: number;
total: number;
timeMs: number;
cache: number;
}
export interface ConsumeStreamCallbacks {
onText?: (text: string) => void;
onToolUse?: (name: string, input: Record<string, unknown>, callId: string) => void;
/**
* Fired per streaming tool-call argument chunk (before the aggregated
* onToolUse). Used to render live tool content. Does NOT affect
* pendingToolCalls — the final tool_use still builds those.
*/
onToolCallDelta?: (index: number, callId: string, name: string, chunk: string) => void;
onPromptProgress?: (progress: PromptProgress) => void;
/**
* Fired at most once per LLM call when the OpenAICompatClient is in proxy
* mode and the response surfaced a backend identity header
* (e.g. `x-litellm-model-id`). The runner uses this to attribute the
* call to a specific physical backend behind the proxy, so the UI
* can render the matching Pet / NodeStatus.
*
* cacheKey is non-null only on LiteLLM cache hits (`x-litellm-cache-key`).
*/
onBackend?: (backendId: string, cacheKey: string | null) => void;
}
export interface ConsumedLLMResponse {
accumulatedText: string;
pendingToolCalls: ToolCall[];
hadError: boolean;
errorMessage: string;
lastUsage?: { prompt_tokens: number; completion_tokens: number };
/**
* The physical backend id that handled this call, set when the
* client is proxy-mode and the proxy reported one. Null for direct
* (non-proxy) workers, or proxy responses missing the header.
*/
backendId?: string;
/** LiteLLM cache key when this response was a cache hit; null otherwise. */
backendCacheKey?: string | null;
}
/**
* Consume one LLM response stream end-to-end with an idle-timeout safety net.
*
* - Resets the per-event timeout on every chunk so a long-running but actively
* streaming response is allowed; only true silence past `idleTimeoutMs`
* trips the abort.
* - On timeout or stream error, ensures the underlying generator is returned
* (with a 5s safety cap on `return()` itself, since some generators hang).
* - Strips thinking-token blocks (DeepSeek/Qwen/Gemma flavors) from the
* accumulated text before returning.
*
* Pure I/O — no movement state. Caller is responsible for translating the
* returned tool calls into actions and feeding `onToolUse`/`onText` events
* to its callback bridge.
*/
export async function consumeLlmStream(
client: OpenAICompatClient,
messages: Message[],
tools: ToolDef[],
cancelSignal: AbortSignal | undefined,
idleTimeoutMs: number,
callbacks: ConsumeStreamCallbacks = {},
contextLabel: string = '',
): Promise<ConsumedLLMResponse> {
const stream = client.chat(messages, tools, cancelSignal);
const accumulator: ConsumedLLMResponse = {
accumulatedText: '',
pendingToolCalls: [],
hadError: false,
errorMessage: '',
};
let streamExhausted = false;
try {
while (!streamExhausted) {
const nextPromise = stream.next();
const result = await Promise.race([
nextPromise,
new Promise<never>((_, reject) => {
const id = setTimeout(() => reject(new Error('LLM stream idle safety timeout')), idleTimeoutMs);
// Clear the timer when the underlying chunk resolves so we don't leak it.
void nextPromise.then(() => clearTimeout(id), () => clearTimeout(id));
}),
]);
if (result.done) {
streamExhausted = true;
break;
}
handleEvent(result.value, accumulator, callbacks, contextLabel);
}
} catch (safetyErr) {
const msg = safetyErr instanceof Error ? safetyErr.message : String(safetyErr);
logger.error(`[llm-stream] ${contextLabel}stream safety timeout or error: ${msg}`);
accumulator.hadError = true;
accumulator.errorMessage = msg;
try {
await Promise.race([
stream.return(undefined as never),
new Promise<void>((resolve) => setTimeout(resolve, 5_000)),
]);
} catch {
/* swallow — best-effort cleanup */
}
}
accumulator.accumulatedText = stripThinkingTokens(accumulator.accumulatedText);
return accumulator;
}
function handleEvent(
event: LLMEvent,
acc: ConsumedLLMResponse,
callbacks: ConsumeStreamCallbacks,
contextLabel: string,
): void {
switch (event.type) {
case 'text':
acc.accumulatedText += event.text;
callbacks.onText?.(event.text);
return;
case 'tool_use':
acc.pendingToolCalls.push({
id: event.id,
type: 'function',
function: {
name: event.name,
arguments: JSON.stringify(event.input),
},
});
callbacks.onToolUse?.(event.name, event.input, event.id);
logger.info(`[llm-stream] ${contextLabel}tool_use: ${event.name} args=${JSON.stringify(event.input).substring(0, 300)}`);
return;
case 'tool_use_delta':
callbacks.onToolCallDelta?.(event.index, event.callId, event.name, event.chunk);
return;
case 'done':
if (event.usage) acc.lastUsage = event.usage;
return;
case 'error':
acc.hadError = true;
acc.errorMessage = event.error;
logger.error(`[llm-stream] ${contextLabel}LLM error: ${event.error}`);
return;
case 'backend':
acc.backendId = event.backendId;
acc.backendCacheKey = event.cacheKey;
callbacks.onBackend?.(event.backendId, event.cacheKey);
logger.info(`[llm-stream] ${contextLabel}proxy backend resolved: id=${event.backendId} cache=${event.cacheKey ?? 'miss'}`);
return;
case 'prompt_progress':
callbacks.onPromptProgress?.({
processed: event.processed,
total: event.total,
timeMs: event.timeMs,
cache: event.cache,
});
return;
}
}