import { logger } from '../../logger.js'; import type { Job, Repository } from '../../db/repository.js'; import type { AppConfig } from '../../config.js'; import type { ReflectionOutcome } from './types.js'; import { loadReflectionInputs } from './load-inputs.js'; import { buildSystemPrompt, buildUserPrompt } from './reflection-prompt.js'; import { callReflectionLlm } from './llm-client.js'; import { applyReflectionUnlocked, type ApplierDeps } from './applier.js'; import { writeSnapshot, type SnapshotDeps } from './snapshot.js'; import { withUserLock } from './user-lock.js'; import { PieceCatalog } from '../piece-catalog.js'; import { existsSync, readFileSync } from 'fs'; import { join } from 'path'; import { userPiecesDir } from '../../user-folder/paths.js'; export interface RunReflectionDeps { repo: Repository; config: AppConfig; llmEndpoint: string; llmModel: string | undefined; /** * Worker's API key for the LLM endpoint. Without it, a gateway that * enforces virtual keys rejects every reflection call with 401 * (normal task calls always send the worker's key — reflection must too). */ llmApiKey?: string; /** True when the reflection worker routes through the AAO Gateway (proxy). */ llmProxy?: boolean; /** Reflection worker's model context window (tokens) for the prompt guard. */ llmContextLimitTokens?: number; } export async function runReflectionJob( deps: RunReflectionDeps, job: Job ): Promise { const jobStart = Date.now(); if (!job.payload) { logger.warn(`[reflection-runner] missing payload job=${job.id}`); return 'failed'; } const meta = JSON.parse(job.payload) as { originalJobId: string; userId: string; pieceName: string; outcome: 'succeeded' | 'failed' | 'aborted'; }; logger.info(`[reflection-runner] start job=${job.id} originalJob=${meta.originalJobId} piece=${meta.pieceName} userId=${meta.userId} outcome=${meta.outcome}`); const cfg = deps.config; const reflection = cfg.reflection; const dataDir = cfg.userFolderRoot ?? 'data/users'; // Load inputs from DB + filesystem let input; try { input = await loadReflectionInputs(deps.repo, { originalJobId: meta.originalJobId, userId: meta.userId, pieceName: meta.pieceName, outcome: meta.outcome, }, { dataDir, builtinPiecesDir: 'pieces', activityLogMaxBytes: reflection?.activityLogMaxBytes ?? 4096, }); } catch (e) { logger.error(`[reflection-runner] loadReflectionInputs failed job=${job.id} err=${String(e)}`); deps.repo.recordReflectionMetric({ reflection_job_id: job.id, original_job_id: meta.originalJobId, user_id: meta.userId, piece_name: meta.pieceName, outcome: 'failed', memory_changes: 0, piece_edited: 0, tokens_in: 0, tokens_out: 0, duration_ms: Date.now() - jobStart, }); return 'failed'; } // Build prompts const system = buildSystemPrompt(); const user = buildUserPrompt(input); // Call LLM const llmCfg = { endpoint: deps.llmEndpoint, model: deps.llmModel, apiKey: deps.llmApiKey, proxy: deps.llmProxy === true, userId: meta.userId, contextLimitTokens: deps.llmContextLimitTokens, }; let llmResult; try { llmResult = await callReflectionLlm(llmCfg, system, user); } catch (e) { logger.error(`[reflection-runner] LLM call failed job=${job.id} err=${String(e)}`); deps.repo.recordReflectionMetric({ reflection_job_id: job.id, original_job_id: meta.originalJobId, user_id: meta.userId, piece_name: meta.pieceName, outcome: 'failed', memory_changes: 0, piece_edited: 0, tokens_in: 0, tokens_out: 0, duration_ms: Date.now() - jobStart, }); return 'failed'; } logger.info( `[reflection-runner] llm tokens_in=${llmResult.tokensIn} tokens_out=${llmResult.tokensOut} duration_ms=${llmResult.durationMs}` ); logger.info(`[reflection-runner] reasoning job=${job.id} reasoning=${JSON.stringify(llmResult.parsed.reasoning)}`); // Apply reflection (memory + piece changes under per-user lock) const builtinDir = 'pieces'; const catalog = new PieceCatalog(builtinDir, dataDir); const applierDeps: ApplierDeps = { dataDir, maxBodyBytes: reflection?.maxEntryBodyBytes ?? 8192, repo: deps.repo, catalog, builtinDir, cooldownHours: reflection?.pieceEditCooldownHours ?? 24, }; // Acquire the per-user lock for the FULL apply+snapshot critical section. // writeSnapshot must run inside this lock so index.jsonl append is atomic // with the memory mutations the applier produced. Without this, two // concurrent reflection workers for the same user can produce a torn // index.jsonl (Codex final-review MAJOR-1). let applierResult: Awaited> | undefined; let snapshotDir: string | undefined; try { await withUserLock(dataDir, meta.userId, async () => { applierResult = await applyReflectionUnlocked(applierDeps, input, llmResult.parsed); const outcomeLocal = applierResult.outcome; const memoryChangesAppliedLocal = applierResult.memoryDecisions.filter(d => d.accepted).length; const abstainedLocal = outcomeLocal === 'abstained'; logger.info( `[reflection-runner] applied memory_changes=${memoryChangesAppliedLocal}` + ` piece_edited=${applierResult.pieceApplied ? 'true' : 'false'}` + ` abstained=${abstainedLocal ? 'true' : 'false'}` ); // Capture before/after files using the just-applied state. Still inside // the lock so index.jsonl append is serialized with the memory writes. const beforeFiles: Record = {}; const afterFiles: Record = {}; for (const decision of applierResult.memoryDecisions) { if (decision.accepted) { const name = decision.change.op === 'remove' ? (decision.change.merge_target ?? decision.change.name) : decision.change.name; const prev = input.memoryEntries.find(e => e.name === name); if (prev) { beforeFiles[`${name}.md`] = `---\nname: ${prev.name}\ndescription: ${prev.description}\ntype: ${prev.type}\n---\n${prev.body}`; } if (decision.change.op !== 'remove') { afterFiles[`${name}.md`] = `---\nname: ${decision.change.name}\ndescription: ${decision.change.description}\ntype: ${decision.change.type}\n---\n${decision.change.body}`; } } } let pieceBeforeYaml: string | undefined; let pieceAfterYaml: string | undefined; if (applierResult.pieceApplied) { pieceBeforeYaml = input.pieceYaml; const customPath = join(userPiecesDir(dataDir, meta.userId), `${meta.pieceName}.yaml`); if (existsSync(customPath)) { pieceAfterYaml = readFileSync(customPath, 'utf-8'); } } const snapshotDeps: SnapshotDeps = { dataDir, storeLlmRaw: reflection?.storeLlmRaw ?? false, }; try { const snapResult = await writeSnapshot( snapshotDeps, beforeFiles, afterFiles, { originalJobId: meta.originalJobId, userId: meta.userId, pieceName: meta.pieceName, outcome: outcomeLocal, reasoning: llmResult.parsed.reasoning ?? '', modelUsed: deps.llmModel, tokensIn: llmResult.tokensIn, tokensOut: llmResult.tokensOut, memoryChanges: memoryChangesAppliedLocal, pieceEdited: applierResult.pieceApplied, rejections: applierResult.memoryDecisions .filter(d => !d.accepted && d.code) .map(d => ({ code: d.code!, name: d.change.name })), llmRaw: (reflection?.storeLlmRaw ?? false) ? llmResult.parsed : undefined, }, pieceBeforeYaml, pieceAfterYaml, ); snapshotDir = snapResult.dir; logger.info(`[reflection-runner] snapshot path=${snapResult.dir}`); } catch (e) { // Non-fatal inside the lock — log and continue so the metric still records. logger.error(`[reflection-runner] writeSnapshot failed job=${job.id} err=${String(e)}`); } }); } catch (e) { logger.error(`[reflection-runner] apply+snapshot failed job=${job.id} err=${String(e)}`); deps.repo.recordReflectionMetric({ reflection_job_id: job.id, original_job_id: meta.originalJobId, user_id: meta.userId, piece_name: meta.pieceName, outcome: 'failed', memory_changes: 0, piece_edited: 0, tokens_in: llmResult.tokensIn, tokens_out: llmResult.tokensOut, duration_ms: Date.now() - jobStart, }); return 'failed'; } // applierResult is guaranteed defined here — the try/catch above returns // 'failed' on any exception, so reaching this point means apply ran. const ar = applierResult!; const outcome = ar.outcome; const memoryChangesApplied = ar.memoryDecisions.filter(d => d.accepted).length; void snapshotDir; // Record metrics — recordPieceEdit is called inside applier via writePiece, // so here we only insert the reflection_metrics row (no bundled pieceEdit). deps.repo.recordReflectionMetric({ reflection_job_id: job.id, original_job_id: meta.originalJobId, user_id: meta.userId, piece_name: meta.pieceName, outcome, memory_changes: memoryChangesApplied, piece_edited: ar.pieceApplied ? 1 : 0, tokens_in: llmResult.tokensIn, tokens_out: llmResult.tokensOut, duration_ms: Date.now() - jobStart, }); logger.info(`[reflection-runner] done job=${job.id} outcome=${outcome}`); return outcome; }