maestro/src/engine/reflection/llm-client.ts
oss-sync c5be399fdd
Some checks failed
CI / build-and-test (push) Has been cancelled
sync: update from private repo (0dd1a8e)
2026-06-11 11:50:39 +00:00

191 lines
7.1 KiB
TypeScript

import { logger } from '../../logger.js';
import { getDefaultProviderRetryConfig } from '../../config.js';
import { OpenAICompatClient, type LLMEvent, type Message, type ToolDef } from '../../llm/openai-compat.js';
import type { ReflectionResult } from './types.js';
import { REFLECTION_TOOL_SCHEMA } from './reflection-schema.js';
export interface ReflectionLlmConfig {
endpoint: string;
model: string | undefined;
apiKey?: string;
/** True when the reflection worker routes through the AAO Gateway (proxy). */
proxy?: boolean;
/** Reflection target user — recorded as the usage owner. */
userId?: string;
/**
* Model context window in tokens. Passed to the shared client's
* prompt-size preflight guard. Reflection prompts can be large (uncapped
* memory snapshot), so use the worker's real limit rather than the
* client's conservative 32k default, which would block valid prompts.
*/
contextLimitTokens?: number;
}
export interface ReflectionLlmResult {
parsed: ReflectionResult;
tokensIn: number;
tokensOut: number;
durationMs: number;
raw: unknown;
}
/** Total attempts (1 initial + retries) for resample-worthy failures. */
const MAX_ATTEMPTS = 3;
/** Backoff before attempt 2 and 3. Injectable for tests. */
const RETRY_DELAYS_MS = [500, 1500];
let sleep = (ms: number) => new Promise<void>((r) => setTimeout(r, ms));
/** Test hook: replace the backoff sleeper (avoids real timers in vitest). */
export function setReflectionRetrySleep(fn: (ms: number) => Promise<void>): void {
sleep = fn;
}
/**
* Errors worth a resample: small reflection models occasionally emit
* malformed tool-call markup, which strict backends (e.g. llama-server's
* tool parser) reject with a 5xx like
* "Failed to parse input at pos 41: <tool_call>...". The sampling is
* stochastic (temperature 0.2), so simply asking again usually succeeds.
* 4xx (bad key, bad request shape) is deterministic config error — fail fast.
*/
class RetryableLlmError extends Error {}
export async function callReflectionLlm(
cfg: ReflectionLlmConfig,
systemPrompt: string,
userPrompt: string
): Promise<ReflectionLlmResult> {
const start = Date.now();
let lastErr: Error | null = null;
for (let attempt = 1; attempt <= MAX_ATTEMPTS; attempt++) {
try {
return await callOnce(cfg, systemPrompt, userPrompt, start);
} catch (e) {
if (!(e instanceof RetryableLlmError)) throw e;
lastErr = e;
if (attempt < MAX_ATTEMPTS) {
const delay = RETRY_DELAYS_MS[attempt - 1] ?? 1500;
logger.warn(
`[reflection-llm] attempt ${attempt}/${MAX_ATTEMPTS} failed (${e.message.slice(0, 200)}); retrying in ${delay}ms`,
);
await sleep(delay);
}
}
}
throw lastErr ?? new Error('reflection LLM failed');
}
/**
* Classify an OpenAICompatClient error for the reflection resample loop.
* - HTTP 5xx (incl. tool-call parse errors on malformed model output) and
* gateway_shutdown / gateway_timeout: transient → resample.
* - HTTP 4xx (bad key / request shape), budget_exhausted / rate_limited
* (won't pass until the period resets), and the client-side
* "blocked before send" prompt-size guard: deterministic → fail fast.
* - Everything else (transport / parse / idle timeout): stochastic → resample.
*/
function classifyClientError(message: string, gatewayErrorType?: string): Error {
if (gatewayErrorType === 'budget_exhausted' || gatewayErrorType === 'rate_limited') {
return new Error(message);
}
if (gatewayErrorType === 'gateway_shutdown' || gatewayErrorType === 'gateway_timeout') {
return new RetryableLlmError(message);
}
// Client-side preflight rejection — the prompt is too large; resampling the
// identical prompt cannot help.
if (message.includes('blocked before send')) {
return new Error(message);
}
const m = /HTTP (\d{3})/.exec(message);
if (m) {
const status = Number(m[1]);
if (status >= 500) return new RetryableLlmError(message);
return new Error(message);
}
return new RetryableLlmError(message);
}
async function callOnce(
cfg: ReflectionLlmConfig,
systemPrompt: string,
userPrompt: string,
start: number,
): Promise<ReflectionLlmResult> {
// Route through the shared client so usage lands in the single
// per-user ledger (gateway + direct) like every other LLM call.
// maxAttempts=1: the outer callReflectionLlm loop owns resampling.
const client = new OpenAICompatClient(
cfg.endpoint,
cfg.model,
cfg.apiKey,
{ ...getDefaultProviderRetryConfig(), maxAttempts: 1 },
undefined,
cfg.contextLimitTokens, // real model window; avoid the 32k default blocking large reflection prompts
undefined,
undefined,
{ proxy: cfg.proxy === true },
);
const messages: Message[] = [
{ role: 'system', content: systemPrompt },
{ role: 'user', content: userPrompt },
];
let parsed: ReflectionResult | null = null;
let usage: { prompt_tokens: number; completion_tokens: number } | undefined;
let errorMsg: string | null = null;
let errorGatewayType: string | undefined;
let backendId: string | undefined;
for await (const event of client.chat(
messages,
[REFLECTION_TOOL_SCHEMA as unknown as ToolDef],
undefined,
{ userId: cfg.userId },
{ temperature: 0.2, toolChoice: { type: 'function', function: { name: 'submit_reflection' } } },
) as AsyncGenerator<LLMEvent>) {
if (event.type === 'tool_use') {
if (event.name === 'submit_reflection' && parsed === null) {
parsed = event.input as unknown as ReflectionResult;
}
} else if (event.type === 'done') {
usage = event.usage;
} else if (event.type === 'backend') {
backendId = event.backendId;
} else if (event.type === 'error') {
errorMsg = event.error;
errorGatewayType = event.gatewayErrorType;
}
}
if (errorMsg !== null) {
throw classifyClientError(`reflection LLM ${errorMsg}`, errorGatewayType);
}
if (parsed === null) {
throw new RetryableLlmError('reflection LLM returned no submit_reflection tool_call');
}
// The shared client swallows tool-argument JSON parse errors and yields an
// empty `{}` input. Preserve the old resample-on-malformed behaviour with a
// shallow structural check against the tool schema's required fields — a
// genuinely-empty object means the model emitted broken tool markup.
const p = parsed as unknown as Record<string, unknown>;
if (p['piece_changes'] === undefined || p['reasoning'] === undefined) {
throw new RetryableLlmError('reflection LLM tool_call arguments were malformed or incomplete');
}
return {
parsed,
tokensIn: usage?.prompt_tokens ?? 0,
tokensOut: usage?.completion_tokens ?? 0,
durationMs: Date.now() - start,
// Reconstruct the OpenAI response shape so `raw` keeps debugging fidelity
// after the move to the streaming client (issue #500): the resolved
// tool_call, usage, and (proxy) backend id rather than just `{ usage }`.
raw: {
usage,
backendId,
choices: [
{ message: { tool_calls: [{ function: { name: 'submit_reflection', arguments: JSON.stringify(parsed) } }] } },
],
},
};
}