import { Repository, Job, localTaskRepoName, type JobRole } from './db/repository.js'; import { userPiecesDir } from './user-folder/paths.js'; import { BrowserSessionRepo } from './db/browser-session-repo.js'; import { assertProfileOwner } from './engine/browser-session-auth.js'; import { initMasterKey, decryptUserDek, decryptStateBlob } from './crypto/sessions.js'; import { OpenAICompatClient } from './llm/openai-compat.js'; import { loadPiece, runPiece, PieceRunCallbacks, PieceDef, type PieceRunResult } from './engine/piece-runner.js'; import { LocalProgressReporter } from './progress/local-reporter.js'; import { buildLocalConversationContext } from './engine/local-context.js'; import { AppConfig, isExecutionWorker, type WorkerDef, type ReflectionConfig, DEFAULT_NOTES_INJECT, type NotesInjectConfig } from './config.js'; import { existsSync, mkdirSync, readdirSync, readFileSync, statSync, writeFileSync } from 'fs'; import { join } from 'path'; import { logger } from './logger.js'; import { commitWorkspaceChanges, ensureWorkspaceGitRepo } from './git/workspace-manager.js'; import { ContextManager, fetchOllamaContextLimit } from './engine/context-manager.js'; import { summarizeToolInput, type ActivityLogMetadata } from './progress/log-format.js'; import { ensureKeepaGraphs } from './engine/tools/amazon.js'; import type { McpTokenManager } from './mcp/token-manager.js'; import { mergeMcpConfig } from './mcp/config.js'; import { NotesService } from './notes/notes-service.js'; import { NotesRepository } from './notes/notes-repository.js'; import { createStickyBackendResolver } from './worker/sticky-backend.js'; import { jobEventBus } from './bridge/job-events.js'; import { normalizeToolNameForMetric } from './metrics/tool-name-allowlist.js'; const RETRY_HANDOFF_MAX_LENGTH = 8_000; const RETRY_DIAGNOSTICS_PREVIEW_LENGTH = 1_200; const RETRY_LESSONS_MAX_LINES = 12; function buildTimeContextBlock(): string { const now = new Date(); const utc = now.toISOString(); const jst = new Intl.DateTimeFormat('ja-JP', { timeZone: 'Asia/Tokyo', year: 'numeric', month: '2-digit', day: '2-digit', hour: '2-digit', minute: '2-digit', second: '2-digit', hour12: false, weekday: 'short', }).format(now); return [ '## 実行時刻コンテキスト', `- Current time (UTC): ${utc}`, `- Current time (JST): ${jst}`, '- 時刻依存の判断(今日/昨日/直近◯時間/最新ニュース等)は必ずこの時刻を基準に行うこと。', '', ].join('\n'); } function getLocalTaskId(repoName: string): number | null { const match = /^local\/task-(\d+)$/.exec(repoName); if (!match) return null; return Number(match[1]); } function getSubTaskParentJobId(repoName: string): string | null { const match = /^subtask\/([0-9a-f-]{36})$/.exec(repoName); if (!match) return null; return match[1]!; } /** * Browser session を keying するための「論理タスク ID」を解決する。 * * - local task の直接実行 → そのタスクの ID (string) * - subtask 実行 → 親方向に最大 5 段まで walk up し、最初に見つかる * local task の ID。subtaskDepth は config 上 2 が上限なので余裕を見て 5 * - 親が gitea issue 等 local task でないジョブの場合 → null (BrowseWeb は * noVNC モードを使えない / 旧来の頭出しに fallback する) */ async function resolveSessionTaskId( repo: Repository, job: Job, ): Promise<{ taskId: string | undefined; userId: string | undefined }> { const directLocalTaskId = getLocalTaskId(job.repo); if (directLocalTaskId !== null) { return { taskId: String(directLocalTaskId), userId: job.ownerId ?? undefined }; } let cursor: string | null = getSubTaskParentJobId(job.repo); let hops = 0; while (cursor && hops < 5) { const parent = await repo.getJob(cursor); if (!parent) return { taskId: undefined, userId: job.ownerId ?? undefined }; const parentLocalTaskId = getLocalTaskId(parent.repo); if (parentLocalTaskId !== null) { return { taskId: String(parentLocalTaskId), // owner は親と同じはずだが、念のため fallback も用意 userId: parent.ownerId ?? job.ownerId ?? undefined, }; } cursor = parent.parentJobId ?? getSubTaskParentJobId(parent.repo); hops++; } return { taskId: undefined, userId: job.ownerId ?? undefined }; } function buildUiMetadataBlock(job: Job): string { return [ '---', `ui_profile: ${job.requiredRole}`, `ui_output_format: ${/ui_output_format:\s*(text|markdown|json)/i.exec(job.instruction)?.[1]?.toLowerCase() ?? 'markdown'}`, `ui_ask_policy: ${/ui_ask_policy:\s*(low|high)/i.exec(job.instruction)?.[1]?.toLowerCase() ?? 'low'}`, `ui_priority: ${/ui_priority:\s*(low|medium|high)/i.exec(job.instruction)?.[1]?.toLowerCase() ?? 'medium'}`, '---', ].join('\n'); } function truncateRetryText(text: string, maxLength: number): string { const trimmed = text.trim(); if (trimmed.length <= maxLength) return trimmed; return `${trimmed.slice(0, maxLength)}...`; } function readRetryLessons(workspacePath: string): string[] { const logPath = join(workspacePath, 'logs', 'lessons.jsonl'); if (!existsSync(logPath)) return []; try { return readFileSync(logPath, 'utf-8') .split('\n') .filter(Boolean) .slice(-RETRY_LESSONS_MAX_LINES) .map((line) => { try { const data = JSON.parse(line) as { movement?: string; lessons?: string }; const movement = data.movement ? `[${data.movement}] ` : ''; return `- ${movement}${truncateRetryText(String(data.lessons ?? ''), 500)}`; } catch { return `- ${truncateRetryText(line, 500)}`; } }) .filter((line) => line.trim() !== '-'); } catch { return []; } } function readLastRunDiagnostics(workspacePath: string): string[] { const diagnosticsPath = join(workspacePath, 'logs', 'last-run-diagnostics.json'); if (!existsSync(diagnosticsPath)) return []; try { const data = JSON.parse(readFileSync(diagnosticsPath, 'utf-8')) as { status?: string; abortReason?: string | null; finalOutput?: string; movementHistory?: Array<{ name?: string; next?: string | null; toolsUsed?: string[]; outputPreview?: string; }>; contextActions?: unknown[]; }; const lines: string[] = []; if (data.status || data.abortReason) { lines.push(`- 前回ステータス: ${data.status ?? 'unknown'}${data.abortReason ? ` (${data.abortReason})` : ''}`); } for (const movement of data.movementHistory ?? []) { const tools = movement.toolsUsed && movement.toolsUsed.length > 0 ? ` tools=${movement.toolsUsed.join(',')}` : ''; lines.push(`- movement ${movement.name ?? 'unknown'} -> ${movement.next ?? 'unknown'}${tools}`); if (movement.outputPreview) { lines.push(` - output: ${truncateRetryText(movement.outputPreview, 300)}`); } } if (data.finalOutput) { lines.push(`- 最終出力プレビュー: ${truncateRetryText(data.finalOutput, RETRY_DIAGNOSTICS_PREVIEW_LENGTH)}`); } if (data.contextActions && data.contextActions.length > 0) { lines.push(`- context action: ${JSON.stringify(data.contextActions.slice(-3))}`); } return lines; } catch { return []; } } export function buildRetryHandoffSummary(params: { workspacePath: string; job: Job; errorMsg: string; nextRetryAt?: string | null; disposition: 'requeued_unhealthy' | 'retry' | 'failed'; }): string { const lines: string[] = [ '# Retry Handoff', '', `Generated: ${new Date().toISOString()}`, `Job: ${params.job.id}`, `Disposition: ${params.disposition}`, `Attempt: ${params.job.attempt}/${params.job.maxAttempts}`, ]; if (params.nextRetryAt) lines.push(`Next retry at: ${params.nextRetryAt}`); lines.push('', '## 失敗理由', truncateRetryText(params.errorMsg, 2_000)); const diagnostics = readLastRunDiagnostics(params.workspacePath); if (diagnostics.length > 0) { lines.push('', '## 前回実行の要約', ...diagnostics); } const lessons = readRetryLessons(params.workspacePath); if (lessons.length > 0) { lines.push('', '## これまでの lessons', ...lessons); } lines.push( '', '## 次のエージェントへの指示', '- 前回の失敗理由と movement の進捗を踏まえ、同じ探索や同じ失敗を繰り返さないこと。', '- 既に完了している作業・生成済みファイル・確認済み事項は再実行前に workspace とログで確認すること。', '- 必要な情報が不足している場合は、全体を読み直すのではなく targeted Read / Grep / Bash で範囲を絞ること。', ); return `${truncateRetryText(lines.join('\n'), RETRY_HANDOFF_MAX_LENGTH)}\n`; } function writeRetryHandoffSummary(params: { workspacePath: string | null | undefined; job: Job; errorMsg: string; nextRetryAt?: string | null; disposition: 'requeued_unhealthy' | 'retry' | 'failed'; }): void { if (!params.workspacePath) return; try { const logsDir = join(params.workspacePath, 'logs'); mkdirSync(logsDir, { recursive: true }); const summary = buildRetryHandoffSummary({ workspacePath: params.workspacePath, job: params.job, errorMsg: params.errorMsg, nextRetryAt: params.nextRetryAt, disposition: params.disposition, }); writeFileSync(join(logsDir, 'retry-summary.md'), summary, 'utf-8'); } catch (err) { logger.warn(`[worker] failed to write retry handoff summary: ${err}`); } } function buildRetryHandoffContext(workspacePath: string, job: Job): string { if (job.attempt <= 1 && !job.errorSummary) return ''; const summaryPath = join(workspacePath, 'logs', 'retry-summary.md'); if (!existsSync(summaryPath)) return ''; try { const summary = truncateRetryText(readFileSync(summaryPath, 'utf-8'), RETRY_HANDOFF_MAX_LENGTH); if (!summary) return ''; return [ '## Retry 復帰用引き継ぎ', 'このジョブは前回実行からの retry / 再キューです。以下を前提に、重複作業を避けて継続してください。', '', summary, ].join('\n'); } catch { return ''; } } export async function maybeEnqueueReflection( repo: Repository, job: Job, outcome: 'succeeded' | 'failed' | 'aborted', cfg: Pick, workers: WorkerDef[] = [], ): Promise { if (!cfg.enabled) return; if (job.taskKind === 'reflection') return; // No-auth mode runs every job with ownerId=null. Reflection is per-user // (memory/pieces live under data/users/{userId}/), so fall back to the same // 'local' namespace the rest of the no-auth path uses (ToolContext, pieces, // user-folder). Without this the enqueue gate skipped forever and reflection // silently never ran in no-auth deployments. const reflectionOwner = job.ownerId ?? 'local'; // worker_required enforcement: when true, at least one worker must have 'reflection' in its roles if (cfg.workerRequired) { const hasReflectionWorker = workers.some( (w) => Array.isArray(w.roles) && w.roles.includes('reflection'), ); if (!hasReflectionWorker) { logger.warn(`[reflection] enqueue skipped reason=no_reflection_worker user=${reflectionOwner}`); return; } } // Per-user daily token budget check. // Cap=0 means "no limit" — useful for fresh installs that haven't tuned the budget yet. const cap = cfg.perUserDailyBudgetTokens ?? 0; if (cap > 0) { // Compute today's start in UTC (00:00:00.000 UTC). const now = new Date(); const todayStartMs = Date.UTC(now.getUTCFullYear(), now.getUTCMonth(), now.getUTCDate()); const metrics = repo.aggregateReflectionMetrics(reflectionOwner, todayStartMs); const spent = metrics.tokensIn + metrics.tokensOut; if (spent >= cap) { const spentM = (spent / 1_000_000).toFixed(1); const capM = (cap / 1_000_000).toFixed(1); logger.info(`[reflection] enqueue skipped reason=budget user=${reflectionOwner} spent=${spentM}M cap=${capM}M`); return; } } const payload = JSON.stringify({ originalJobId: job.id, userId: reflectionOwner, pieceName: job.pieceName, outcome, }); await repo.createJob({ repo: `local/reflection-${job.id}`, issueNumber: 0, instruction: '', pieceName: 'reflection', role: 'reflection', ownerId: reflectionOwner, visibility: 'private', taskKind: 'reflection', payload, } as any); logger.info(`[reflection] enqueued original=${job.id} owner=${reflectionOwner} piece=${job.pieceName} outcome=${outcome}`); } export class Worker { private running = false; private inflight = 0; private polling = false; private stopped = false; private pollInterval: ReturnType | null = null; private healthInterval: ReturnType | null = null; private workerId: string; private endpoint: string; private model: string | undefined; private availableModels: Set = new Set(); private healthy = false; private lastHealthError: string | null = null; private contextLimitTokens: number = 128_000; private mcpTokenManager: McpTokenManager | null = null; /** * Phase 3b: optional Prometheus metrics handle. When set, the worker * emits jobs_total / active_jobs / job_duration_seconds in * executeJob's start/finally, llm_calls_total via the AgentLoop * onLLMCall callback, and tool_calls_total via the new onToolMetric * callback in agent-loop.ts. Wired by WorkerManager after the metric * registry exists. */ private workerMetrics: import('./metrics/worker-metrics.js').WorkerMetrics | null = null; private skillCatalog: import('./engine/skills.js').SkillCatalog | null = null; /** * V2 Web Push notification service. Null when push is disabled via * config or when the worker is built without one (tests). Hooks fire * via enqueue (fire-and-forget) so a slow push service can't block job * execution. * * Spec: docs/superpowers/specs/2026-05-28-browser-notifications-v2-webpush.md. */ private pushService: import('./push-service.js').PushService | null = null; constructor( workerId: string, endpoint: string, model: string | undefined, private repo: Repository, private config: AppConfig, ) { this.workerId = workerId; this.endpoint = endpoint; this.model = model; } public setMcpTokenManager(tm: McpTokenManager | null): void { this.mcpTokenManager = tm; } public setSkillCatalog(catalog: import('./engine/skills.js').SkillCatalog): void { this.skillCatalog = catalog; } public setPushService(svc: import('./push-service.js').PushService | null): void { this.pushService = svc; } /** * Hot-swap the global config on a still-running worker. Used by * WorkerManager's differential rebuild: when a config change does NOT * touch this worker's own def (e.g. a tool size limit changed), we keep * the worker — and any in-flight job — alive and just refresh the config * it reads for future jobs. Def-derived values (roles, endpoint, * maxConcurrency) are read live via getWorkerDef(), so they stay correct. * In-flight jobs keep the settings they captured at start, by design. */ public updateConfig(config: AppConfig): void { this.config = config; } /** Jobs currently executing in this worker's detached loops. */ public get inflightCount(): number { return this.inflight; } /** * Fire a V2 push for a job status transition. Fire-and-forget — never * throws and never awaits the underlying queue. Skips silently when * - push is disabled (pushService === null) * - the job has no owner (legacy / system-issued) * - the job is not a local task (sub-task pushes go to the parent owner; * we still send for direct local tasks). * Reflection jobs are also skipped — they're an internal mechanism, not * user-facing work. */ private enqueuePush( job: Job, event: 'running' | 'succeeded' | 'failed' | 'waiting_human', ): void { if (!this.pushService) return; if (job.taskKind === 'reflection') return; if (!job.ownerId) return; const localTaskId = getLocalTaskId(job.repo); if (localTaskId === null) return; // Title lookup is cheap (single-row SELECT) and synchronous via the // raw db handle. Falling back to a generic label is fine — the // push-service uses privacy-default payloads unless the user opted // in via include_details. let taskTitle = `Task #${localTaskId}`; try { const row = this.repo.getDb() .prepare('SELECT title FROM local_tasks WHERE id = ?') .get(localTaskId) as { title: string | null } | undefined; if (row?.title) taskTitle = row.title; } catch { // best-effort; fall through with default title } this.pushService.enqueue({ event, taskId: localTaskId, taskTitle, pieceName: job.pieceName, ownerId: job.ownerId, }); } /** * Phase 3b: install (or remove) the Prometheus metrics handle. * Idempotent — calling with the same handle twice is fine. Null * clears the handle, useful when reconfiguring at runtime. */ public setWorkerMetrics( metrics: import('./metrics/worker-metrics.js').WorkerMetrics | null, ): void { this.workerMetrics = metrics; } private getWorkerDef(): WorkerDef { const workerDef = this.config.provider.workers.find((worker) => worker.id === this.workerId); if (!workerDef) { throw new Error(`Worker config not found: ${this.workerId}`); } return workerDef; } private getSupportedRoles(): string[] { return this.getWorkerDef().roles ?? ['auto', 'fast', 'quality']; } private getMaxConcurrency(): number { return Math.max(1, this.getWorkerDef().maxConcurrency ?? 1); } async initialize(): Promise { const workerDef = this.getWorkerDef(); const enabled = workerDef.enabled !== false; if (!enabled) { await this.repo.upsertWorkerNode({ workerId: this.workerId, endpoint: this.endpoint, enabled: false, healthy: false, roles: this.getSupportedRoles(), availableModels: [], inflightJobs: this.inflight, maxConcurrency: this.getMaxConcurrency(), lastError: 'disabled by config', }); this.healthy = false; this.lastHealthError = 'disabled by config'; logger.info(`[worker:${this.workerId}] disabled by config; skipping polling`); return false; } try { const ollamaBase = this.endpoint.replace(/\/v1\/?$/, ''); // Try Ollama /api/tags first, then fall back to OpenAI-compatible /v1/models. // // Forward `Authorization: Bearer ` when the worker has one // configured. The discovery probes (/api/tags / /v1/models) were // previously sent un-authenticated, which caused 30s-interval 401 // floods against AAO Gateway endpoints (gateway requires Bearer auth // on /v1/models) — the worker then logged "failed to fetch model // list" indefinitely and `availableModels` stayed empty. // Discovered during 2026-05-20 dogfooding on production aao. const apiKey = this.getWorkerDef().apiKey; const init: RequestInit = apiKey ? { headers: { Authorization: `Bearer ${apiKey}` } } : {}; let models: string[] = []; const ollamaRes = await fetch(`${ollamaBase}/api/tags`, init).catch(() => null); if (ollamaRes?.ok) { const data = await ollamaRes.json() as { models?: Array<{ name: string }> }; models = (data.models ?? []).map((m: { name: string }) => m.name); } else { const openaiBase = this.endpoint.replace(/\/?$/, ''); const openaiRes = await fetch(`${openaiBase}/models`, init).catch(() => null); if (openaiRes?.ok) { const data = await openaiRes.json() as { data?: Array<{ id: string }> }; models = (data.data ?? []).map((m: { id: string }) => m.id); } else if (this.model) { throw new Error(`failed to fetch model list from both /api/tags and /v1/models`); } // llama-server compat: model 未設定なら model 一覧 API は必須ではないので空配列で続行。 } this.availableModels = new Set(models); await this.repo.upsertWorkerNode({ workerId: this.workerId, endpoint: this.endpoint, enabled: true, healthy: true, roles: this.getSupportedRoles(), availableModels: [...this.availableModels], inflightJobs: this.inflight, maxConcurrency: this.getMaxConcurrency(), lastError: null, }); if (!this.healthy || this.lastHealthError !== null) { logger.info(`[worker:${this.workerId}] available models: ${[...this.availableModels].join(', ')}`); } this.healthy = true; this.lastHealthError = null; // Auto-detect context limit from Ollama if not configured if (!this.config.context?.limitTokens) { if (this.model) { const contextLimit = await fetchOllamaContextLimit(this.endpoint, this.model); if (contextLimit !== this.contextLimitTokens) { logger.info(`[worker:${this.workerId}] context limit updated: ${contextLimit} tokens`); this.contextLimitTokens = contextLimit; } } else { // No model configured — try llama.cpp /props endpoint for context limit const contextLimit = await fetchOllamaContextLimit(this.endpoint, ''); if (contextLimit !== this.contextLimitTokens) { logger.info(`[worker:${this.workerId}] context limit updated: ${contextLimit} tokens`); this.contextLimitTokens = contextLimit; } } } else { this.contextLimitTokens = this.config.context.limitTokens; } return true; } catch (e) { const errorMessage = e instanceof Error ? e.message : String(e); this.availableModels.clear(); await this.repo.upsertWorkerNode({ workerId: this.workerId, endpoint: this.endpoint, enabled: true, healthy: false, roles: this.getSupportedRoles(), availableModels: [], inflightJobs: this.inflight, maxConcurrency: this.getMaxConcurrency(), lastError: errorMessage, }); if (this.healthy || this.lastHealthError !== errorMessage) { logger.warn(`[worker:${this.workerId}] failed to fetch model list: ${e}`); } this.healthy = false; this.lastHealthError = errorMessage; return false; } } start(): void { if (this.running) return; this.running = true; logger.info(`[worker:${this.workerId}] started`); const tick = () => void this.processNext(); tick(); const baseInterval = 5000; const jitter = Math.floor(Math.random() * 2000); this.pollInterval = setInterval(tick, baseInterval + jitter); const healthIntervalSeconds = Math.max(10, this.getWorkerDef().healthcheckIntervalSeconds ?? 30); this.healthInterval = setInterval(() => void this.initialize(), healthIntervalSeconds * 1000); } stop(): void { this.running = false; this.stopped = true; if (this.pollInterval) { clearInterval(this.pollInterval); this.pollInterval = null; } if (this.healthInterval) { clearInterval(this.healthInterval); this.healthInterval = null; } logger.info(`[worker:${this.workerId}] stopped`); } async waitForCompletion(timeoutMs = 30000): Promise { if (this.inflight === 0) return true; const start = Date.now(); while (this.inflight > 0 && (Date.now() - start) < timeoutMs) { await new Promise(resolve => setTimeout(resolve, 500)); } return this.inflight === 0; } get id(): string { return this.workerId; } private async processNext(): Promise { if (!isExecutionWorker(this.getWorkerDef()) || !this.running || this.stopped) return; if (this.polling) return; // claim loop is single-flight (prevents over-claim) this.polling = true; try { // スタックジョブ watchdog: LLM タイムアウトの2倍を閾値にする try { const staleMinutes = Math.max(20, (this.config.provider.timeoutMinutes ?? 10) * 2); this.repo.recoverStuckRunningJobs(staleMinutes); } catch (err) { logger.warn(`[worker:${this.workerId}] recoverStuckRunningJobs error: ${err}`); } const available = await this.initialize(); if (!available) return; const max = this.getMaxConcurrency(); while (this.inflight < max && this.running && !this.stopped) { // リトライジョブを優先 const job = await this.repo.claimNextRetryJob(this.workerId) ?? await this.repo.claimNextJob(this.workerId); if (!job) break; this.inflight++; void this.runJobTracked(job); // 並行実行: await しない } } catch (err) { logger.error(`[worker:${this.workerId}] processNext error: ${err}`); } finally { this.polling = false; } } /** Run one job to completion, always restoring the inflight counter. */ private async runJobTracked(job: Job): Promise { try { await this.executeJob(job); } catch (err) { logger.error(`[worker:${this.workerId}] runJobTracked error job=${job.id}: ${err}`); } finally { this.inflight--; await this.reportInflight(); } } /** Push the live inflight count (and current health) to the worker_nodes row. */ private async reportInflight(): Promise { try { await this.repo.updateWorkerNodeHealth(this.workerId, { healthy: this.healthy, lastError: this.lastHealthError, inflightJobs: this.inflight, availableModels: [...this.availableModels], }); } catch (err) { logger.warn(`[worker:${this.workerId}] reportInflight failed: ${err}`); } } private supportsRole(role: string): boolean { return this.getSupportedRoles().includes(role); } private buildLogMetadata(role: JobRole): ActivityLogMetadata { return { workerId: this.workerId, mode: role }; } /** * サブタスクの ASK に対して、親ジョブの文脈を使って LLM に回答を生成させる */ private async answerSubtaskAsk(subtaskJob: Job, parentJobId: string, question: string): Promise { const parentJob = await this.repo.getJob(parentJobId); const parentInstruction = parentJob?.instruction ?? '(不明)'; const resolvedModel = this.model; const timeoutMs = (this.config.provider.timeoutMinutes ?? 10) * 60 * 1000; const workerDefForAnswer = this.getWorkerDef(); const llmClient = new OpenAICompatClient( this.endpoint, resolvedModel, workerDefForAnswer.apiKey, this.config.provider.retry, timeoutMs, this.contextLimitTokens, this.config.safety?.promptGuardRatio, undefined, { proxy: workerDefForAnswer.proxy === true }, ); const messages: import('./llm/openai-compat.js').Message[] = [ { role: 'system', content: [ 'あなたはタスクを管理する親エージェントです。', 'サブタスクがユーザーに確認を求めていますが、あなたが代わりに回答してください。', '元の依頼の意図を汲み取り、サブタスクが作業を継続できるよう具体的に回答してください。', '回答のみを簡潔に返してください。', ].join('\n'), }, { role: 'user', content: [ '## 元の依頼', parentInstruction, '', '## サブタスクの指示', subtaskJob.instruction, '', '## サブタスクからの質問', question, ].join('\n'), }, ]; let answer = ''; for await (const event of llmClient.chat(messages)) { if (event.type === 'text') { answer += event.text; } else if (event.type === 'error') { throw new Error(`LLM error: ${event.error}`); } } return answer.trim() || '特に制約はありません。あなたの判断で進めてください。'; } private writeRunDiagnostics(workspacePath: string, result: PieceRunResult): void { try { const logsDir = join(workspacePath, 'logs'); mkdirSync(logsDir, { recursive: true }); const diagnostics = { generatedAt: new Date().toISOString(), status: result.status, abortReason: result.abortReason ?? null, finalOutput: result.finalOutput, movementHistory: result.movementHistory.map((entry) => ({ name: entry.name, next: entry.result.next, toolsUsed: entry.result.toolsUsed, outputPreview: entry.result.output.slice(0, 600), })), contextActions: result.contextActions, }; writeFileSync(join(logsDir, 'last-run-diagnostics.json'), `${JSON.stringify(diagnostics, null, 2)}\n`, 'utf-8'); } catch (err) { logger.warn(`[worker:${this.workerId}] failed to write run diagnostics: ${err}`); } } /** * Resolve the workspace path for a job and ensure the directory tree * (input/output/logs + a git repo) exists. Persists the resolved path * back to the job + local-task records so downstream consumers can find * the workspace. * * Throws on jobs that are neither local tasks nor sub-tasks (the orchestrator * doesn't currently spin its own workspaces for raw repo/issue jobs). */ private async prepareJobWorkspace( job: Job, isLocalTask: boolean, isSubTask: boolean, localTaskId: number | null, ): Promise { const { repo: repoName, issueNumber, id: jobId } = job; const workspacePath = isLocalTask ? join(this.config.worktreeDir, 'local', String(localTaskId)) : isSubTask ? (job.worktreePath ?? join(this.config.worktreeDir, 'subtasks', jobId)) : join(this.config.worktreeDir, repoName, String(issueNumber)); if (isLocalTask) { mkdirSync(workspacePath, { recursive: true }); mkdirSync(join(workspacePath, 'input'), { recursive: true }); mkdirSync(join(workspacePath, 'output'), { recursive: true }); mkdirSync(join(workspacePath, 'logs'), { recursive: true }); await ensureWorkspaceGitRepo(workspacePath); if (localTaskId !== null) { await this.repo.updateLocalTask(localTaskId, { workspacePath }); } } else if (isSubTask) { // SpawnSubTask 経由で worktreePath が設定されている前提 if (!job.worktreePath) { throw new Error(`Sub-task job ${jobId} has no worktreePath set`); } mkdirSync(job.worktreePath, { recursive: true }); mkdirSync(join(job.worktreePath, 'output'), { recursive: true }); mkdirSync(join(job.worktreePath, 'logs'), { recursive: true }); await ensureWorkspaceGitRepo(job.worktreePath); } else { throw new Error(`Unsupported job type: repo="${repoName}" is neither a local task nor a sub-task`); } await this.repo.updateJob(jobId, { worktreePath: workspacePath }); return workspacePath; } /** * Run the two pre-execution gates: role capability check, issue lock. * On failure, requeue the job (so another worker can pick it up) and * return false so executeJob returns early. */ private async acquireJobOrRequeue(job: Job): Promise { const { repo: repoName, issueNumber, id: jobId } = job; if (!this.supportsRole(job.requiredRole)) { await this.repo.updateJob(jobId, { status: 'queued', workerId: null }); await this.repo.addAuditLog(jobId, 'job_requeued_capability_mismatch', 'worker', { workerId: this.workerId, requiredRole: job.requiredRole, }); logger.info(`[worker:${this.workerId}] requeued job ${jobId} due to role mismatch (role=${job.requiredRole})`); return false; } const locked = await this.repo.lockIssue(repoName, issueNumber, jobId); if (!locked) { await this.repo.updateJob(jobId, { status: 'queued', workerId: null }); await this.repo.addAuditLog(jobId, 'job_requeued_issue_locked', 'worker', { workerId: this.workerId, }); logger.info(`[worker:${this.workerId}] job ${jobId}: issue ${repoName}#${issueNumber} already locked, skipping`); return false; } return true; } private async executeJob(job: Job): Promise { const { repo: repoName, issueNumber, id: jobId } = job; const localTaskId = getLocalTaskId(repoName); const isLocalTask = localTaskId !== null; const parentJobId = getSubTaskParentJobId(repoName); const isSubTask = parentJobId !== null; const logMetadata = this.buildLogMetadata(job.requiredRole); if (!(await this.acquireJobOrRequeue(job))) return; // Phase 3b: job-lifecycle metrics. Inc active_jobs at start; capture // a terminal status + duration in the finally block. `profile` maps // to the assigned required role (the multi-profile / multi-piece // operator-facing dimension). const metricPiece = job.pieceName ?? 'unknown'; const metricProfile = job.requiredRole ?? 'unknown'; const jobStartedAtMs = Date.now(); let metricFinalStatus: 'succeeded' | 'failed' | 'aborted' | 'cancelled' | 'waiting_human' | 'error' = 'error'; if (this.workerMetrics) { try { this.workerMetrics.activeJobs.labels({ piece: metricPiece, profile: metricProfile }).inc(); } catch { /* metrics never affect business logic */ } } await this.repo.updateWorkerNodeHealth(this.workerId, { healthy: this.healthy, lastError: this.lastHealthError, inflightJobs: this.inflight, availableModels: [...this.availableModels], }); // claimNextJob がすでに status = 'running' にセット済み await this.repo.addAuditLog(jobId, 'job_started', 'worker', {}); // V2 push: notify on the first time a job transitions queued→running. // Retry runs are intentionally silent — V1's 4s debounce relied on this // and we keep the same UX (one running notification, not one per retry). if (job.attempt === 1) { this.enqueuePush(job, 'running'); } // Reflection jobs bypass workspace preparation and the agent / LLM loop. // task_kind='agent' (default) keeps the pre-existing piece-runner path. if (job.taskKind === 'reflection') { try { await this.handleReflectionJob(job); } finally { await this.repo.updateWorkerNodeHealth(this.workerId, { healthy: this.healthy, lastError: this.lastHealthError, inflightJobs: this.inflight, availableModels: [...this.availableModels], }); await this.repo.unlockIssue(repoName, issueNumber); } return; } const workspacePath = await this.prepareJobWorkspace(job, isLocalTask, isSubTask, localTaskId); // 進捗レポーター // ローカルタスク・サブタスクともに activity.log を書き出す // サブタスクでは isSubTask=true を渡し、DB コメント書き込みをスキップする const reporter = new LocalProgressReporter(this.repo, localTaskId ?? issueNumber, workspacePath, logMetadata, isSubTask); // 会話コンテキストの組み立て let enrichedInstruction = `${buildTimeContextBlock()}${job.instruction}`; if (isLocalTask) { try { const comments = await this.repo.listLocalTaskComments(localTaskId); const outputFiles = this.listDir(join(workspacePath, 'output')); const inputFiles = this.listDir(join(workspacePath, 'input')); const contextBody = buildLocalConversationContext({ comments, jobInstruction: job.instruction, inputFiles, outputFiles, }); enrichedInstruction = `${buildTimeContextBlock()}${contextBody}`; } catch (err) { logger.warn(`[worker:${this.workerId}] failed to build local context: ${err}`); } } const retryHandoffContext = buildRetryHandoffContext(workspacePath, job); if (retryHandoffContext) { enrichedInstruction = `${enrichedInstruction}\n\n${retryHandoffContext}`; } // watchdog 誤検知防止: runPiece 実行中に updated_at を定期更新 let heartbeatTimer: ReturnType | undefined; try { // Piece 読み込み: per-user カスタムディレクトリ → global カスタムディレクトリ → builtin の順に探索 // No-auth jobs (ownerId null) resolve pieces from data/users/local/pieces, matching // where no-auth POST now writes (LOCAL_OWNER='local' in pieces-api.ts). const userFolderRoot = this.config.userFolderRoot ?? './data/users'; const ownerForPieces = job.ownerId ?? 'local'; const customPieceDirs = [ userPiecesDir(userFolderRoot, ownerForPieces), this.config.customPiecesDir, ].filter((d): d is string => !!d); logger.info(`[worker:${this.workerId}] job ${jobId} loadPiece piece=${job.pieceName} customDirs=[${customPieceDirs.join(', ') || 'none'}] piecesDir=pieces`); const piece = loadPiece(job.pieceName, 'pieces', customPieceDirs); if ( piece.model && this.availableModels.size > 0 && !this.availableModels.has(piece.model) && this.model !== piece.model ) { await this.repo.updateJob(jobId, { status: 'queued', workerId: null, errorSummary: `Required model ${piece.model} is not available on ${this.workerId}`, }); await this.repo.addAuditLog(jobId, 'job_requeued_model_mismatch', 'worker', { workerId: this.workerId, requiredModel: piece.model, availableModels: [...this.availableModels], }); logger.info(`[worker:${this.workerId}] requeued job ${jobId} due to model mismatch (${piece.model})`); return; } // MCP 認証ゲート: piece.required_mcp に記載されたサーバーのトークンがなければ park const missingMcp = (piece.required_mcp ?? []).filter( (serverId) => !this.mcpTokenManager || !this.mcpTokenManager.hasToken(job.ownerId ?? '', serverId), ); if (missingMcp.length > 0) { await this.repo.updateJob(jobId, { status: 'waiting_human', waitReason: 'mcp_auth_required', resumeMovement: piece.initial_movement ?? null, }); if (localTaskId !== null) { await this.repo.addLocalTaskComment( localTaskId, 'system', `この piece は MCP サーバー「${missingMcp.join(', ')}」との連携が必要です。Settings → MCP 接続から連携してください。`, 'event', ); } logger.info(`[worker:${this.workerId}] mcp gate parked job=${jobId} missing=${missingMcp.join(',')}`); return; } // Piece のモデル指定を解決 const resolvedModel = this.resolveModel(piece); const timeoutMs = (this.config.provider.timeoutMinutes ?? 10) * 60 * 1000; const workerDefForLlm = this.getWorkerDef(); const isProxyWorker = workerDefForLlm.proxy === true; const llmClient = new OpenAICompatClient( this.endpoint, resolvedModel, workerDefForLlm.apiKey, this.config.provider.retry, timeoutMs, this.contextLimitTokens, this.config.safety?.promptGuardRatio, (line) => reporter.reportPromptPreflight(line), { proxy: isProxyWorker }, ); // ASK 再開の場合、resume_movement を使用 const pieceOptions = { resumeMovement: job.resumeMovement ?? undefined, askCount: job.askCount, maxAskPerJob: this.config.ask.maxPerJob, checkInterjections: isLocalTask && localTaskId !== null && !isSubTask ? async (movementName: string) => { const comments = await this.repo.getUninjectedComments(localTaskId); if (comments.length === 0) return []; const injected = comments.map(c => ({ id: c.id, body: c.body })); this.repo.markCommentsInjected(injected.map(c => c.id)); reporter.reportInterjectionAck(injected, movementName); return injected; } : undefined, spawnSubTask: job.subtaskDepth < this.config.subtasks.maxDepth ? async (params: { title: string; instruction: string; piece?: string }) => { const subJobs = await this.repo.getSubJobs(jobId); const subtaskIndex = subJobs.length + 1; if (subJobs.length >= this.config.subtasks.maxPerParent) { throw new Error(`サブタスク上限 (${this.config.subtasks.maxPerParent}) に達しました。これ以上のサブタスクは作成できません。`); } const subtaskWorkspace = join(workspacePath, 'subtasks', String(subtaskIndex)); mkdirSync(subtaskWorkspace, { recursive: true }); mkdirSync(join(subtaskWorkspace, 'output'), { recursive: true }); mkdirSync(join(subtaskWorkspace, 'logs'), { recursive: true }); // 親ジョブの role を継承 const subJobInstruction = [ `ui_profile: ${job.requiredRole}`, '', `# ${params.title}`, '', params.instruction, ].join('\n'); const subJob = await this.repo.createJob({ repo: `subtask/${jobId}`, issueNumber: subtaskIndex, instruction: subJobInstruction, pieceName: params.piece ?? 'general', parentJobId: jobId, subtaskDepth: job.subtaskDepth + 1, maxAttempts: 2, role: job.requiredRole, ownerId: job.ownerId, visibility: job.visibility, visibilityScopeOrgId: job.visibilityScopeOrgId, }); await this.repo.updateJob(subJob.id, { worktreePath: subtaskWorkspace }); logger.info(`[worker:${this.workerId}] spawned sub-task #${subtaskIndex} depth=${job.subtaskDepth + 1} job=${subJob.id}`); return { jobId: subJob.id, subtaskIndex, workspacePath: subtaskWorkspace }; } : undefined, }; const callbacks = this.buildPieceCallbacks( jobId, reporter, isLocalTask, localTaskId, workspacePath, // Seed the sticky-backend guard with whatever was already persisted // for this job (e.g. on retry / resume from ASK). Only matters for // proxy workers; direct workers never produce a backend event. isProxyWorker ? (job.lastBackendId ?? null) : null, ); // 開始コメント await reporter.reportMovementStart(`${piece.name} タスク開始`); // キャンセル用 AbortController const jobAbortController = new AbortController(); // キャンセルチェック: DB のジョブ状態が 'cancelled' になっていたら中断する const cancelCheck = (): boolean => { const isCancelled = this.repo.getJobStatusSync(jobId) === 'cancelled'; if (isCancelled) { jobAbortController.abort(); } return isCancelled; }; // ContextManager 初期化 const contextManager = new ContextManager(this.config.context ?? {}); contextManager.setContextLimit(this.contextLimitTokens); // Piece 実行(ハートビート開始: 5分ごとに updated_at を更新) heartbeatTimer = setInterval(() => { try { this.repo.touchJobUpdatedAt(jobId); } catch { /* ignore */ } }, 5 * 60 * 1000); // VLM 対応: worker の vlm=true なら vision 設定を worker 自身の endpoint/model で上書き const workerDef = this.getWorkerDef(); const toolsConfig = workerDef.vlm ? { ...this.config.tools, visionBaseUrl: this.endpoint, visionModel: this.model } : this.config.tools; logger.info(`[worker:${this.workerId}] job ${jobId} runPiece start`); // Browser session keying: resolve the session-task identity for the // ToolContext. Threaded into BrowseWeb / InteractiveBrowse so each // task gets its own noVNC session and the Captcha Pool stays // separate from per-task sessions. Subtasks walk up to find the // root local task ID. const sessionIdentity = await resolveSessionTaskId(this.repo, job); // ── Browser session profile binding ───────────────────────────── // If this job is bound to a browser_session_profile, decrypt the // captured Playwright storageState and pass it into runPiece so // BrowseWeb can inject it into BrowserContext. Owner-mismatch and // expired-profile checks fail-fast before the agent loop starts. let browserSessionState: object | undefined; let browserSessionProfileId: number | undefined; let browserSessionProfile: | { loggedInSelector: string | null; loginUrlPatterns: string[] } | undefined; let onAuthExpired: | ((profileId: number, reason: string) => void) | undefined; if (job.browserSessionProfileId) { const sessRepo = new BrowserSessionRepo(this.repo.getDb()); const profile = sessRepo.getProfileByIdUnsafe(job.browserSessionProfileId); if (!profile) { sessRepo.audit({ actorUserId: job.ownerId ?? null, ownerId: null, profileId: job.browserSessionProfileId, action: 'use', result: 'error', reason: 'profile not found', jobId: job.id, }); throw new Error(`Browser session profile ${job.browserSessionProfileId} not found`); } // Fail-closed owner check: a job with null/missing ownerId must not // be allowed to decrypt any profile, even if the profile id would // otherwise resolve. Helper audits + throws on rejection. Extracted // to src/engine/browser-session-auth.ts so the contract is unit // tested in isolation from the Worker class. assertProfileOwner(profile, job, sessRepo); if (profile.status !== 'active' || !profile.encryptedStateBlob) { sessRepo.audit({ actorUserId: job.ownerId, ownerId: profile.ownerId, profileId: profile.id, action: 'use', result: 'error', reason: `status=${profile.status}`, jobId: job.id, }); throw new Error(`AUTH_SESSION_EXPIRED: profile ${profile.id} status=${profile.status}`); } const masterKeyPath = this.config.secrets?.masterKeyPath ?? './data/secrets/master.key'; const master = initMasterKey(masterKeyPath); const encDek = sessRepo.getUserDek(profile.ownerId); if (!encDek) { sessRepo.audit({ actorUserId: job.ownerId, ownerId: profile.ownerId, profileId: profile.id, action: 'decrypt', result: 'error', reason: 'user DEK missing', jobId: job.id, }); throw new Error('user DEK missing for browser session profile'); } let dek: Buffer; try { dek = decryptUserDek(master, encDek); } catch (e) { sessRepo.audit({ actorUserId: job.ownerId, ownerId: profile.ownerId, profileId: profile.id, action: 'decrypt', result: 'error', reason: `dek decrypt failed: ${(e as Error).message}`, jobId: job.id, }); throw e; } let stateJson: string; try { stateJson = decryptStateBlob(dek, profile.encryptedStateBlob); } catch (e) { sessRepo.audit({ actorUserId: job.ownerId, ownerId: profile.ownerId, profileId: profile.id, action: 'decrypt', result: 'error', reason: `state decrypt failed: ${(e as Error).message}`, jobId: job.id, }); throw e; } browserSessionState = JSON.parse(stateJson) as object; browserSessionProfileId = profile.id; browserSessionProfile = { loggedInSelector: profile.loggedInSelector, loginUrlPatterns: profile.loginUrlPatterns, }; onAuthExpired = (pid, reason) => { sessRepo.markProfileStatus(pid, 'expired', reason); sessRepo.audit({ actorUserId: job.ownerId, ownerId: profile.ownerId, profileId: pid, action: 'expire', result: 'success', reason, jobId: job.id, }); // Best-effort task-level notification. Subtask jobs and // gitea-issue jobs may not have a numeric local_task id. if (localTaskId !== null) { this.repo.addLocalTaskComment( localTaskId, 'agent', `⚠️ Browser session "${profile.label}" expired: ${reason}. Re-login from Settings → Browser Sessions.`, 'progress', ).catch(() => { /* ignore — comment posting is best-effort */ }); } }; sessRepo.audit({ actorUserId: job.ownerId, ownerId: profile.ownerId, profileId: profile.id, action: 'use', result: 'success', jobId: job.id, }); sessRepo.touchUsed(profile.id); } // Piece handoff: when this job continues an earlier one in the same // local_task, agent-loop injects a "this is a continuation of piece X" // block into the system prompt. We resolve the prev piece name + the // most recent agent result/ask comment as the LLM-visible carry-over. let handoffContext: import('./engine/agent-loop.js').HandoffContext | undefined; if (job.continuedFromJobId && isLocalTask && localTaskId !== null) { const prevJob = await this.repo.getJob(job.continuedFromJobId); if (prevJob) { const prevResultComment = await this.repo.getLatestResultComment(localTaskId); handoffContext = { prevPiece: prevJob.pieceName, prevResult: prevResultComment?.body ?? null, }; } else { logger.warn(`[worker:${this.workerId}] continued_from_job_id=${job.continuedFromJobId} not found for job ${jobId}; skipping handoff context`); } } // Shared-knowledge notes: construct once per job, forwarded into // ToolContext so agent-loop can inject "## Subscribed Notes" into // the system prompt. Only active when the job has a known owner. let notesService: NotesService | undefined; let notesInjectConfig: NotesInjectConfig | undefined; let notesUserOrgIds: string[] | undefined; let notesUserRole: 'admin' | 'user' | undefined; if (job.ownerId) { try { const notesRepo = new NotesRepository(this.repo.getDb()); const userFolderRoot = this.config.userFolderRoot ?? './data/users'; notesService = new NotesService({ db: this.repo.getDb(), repo: notesRepo, userFolderRoot, getUserOrgIds: (uid) => this.repo.listUserGiteaOrgs(uid).map((o) => o.orgId), audit: (action, actor, target) => { try { this.repo.addAuditLog(jobId, action, actor, { target }); } catch (err) { logger.warn(`[notes-audit] failed: ${(err as Error).message}`); } }, }); const cfgNotes = this.config.notes?.inject ?? {}; notesInjectConfig = { perNoteMaxKb: cfgNotes.perNoteMaxKb ?? DEFAULT_NOTES_INJECT.perNoteMaxKb, totalMaxKb: cfgNotes.totalMaxKb ?? DEFAULT_NOTES_INJECT.totalMaxKb, overBudgetStrategy: cfgNotes.overBudgetStrategy ?? DEFAULT_NOTES_INJECT.overBudgetStrategy, }; notesUserOrgIds = this.repo.listUserGiteaOrgs(job.ownerId).map((o) => o.orgId); const ownerRow = this.repo.getUserById(job.ownerId); notesUserRole = ownerRow?.role === 'admin' ? 'admin' : 'user'; } catch (err) { logger.warn(`[worker:${this.workerId}] job ${jobId} notes setup failed: ${(err as Error).message}`); } } // Parse per-task options from job payload (e.g. { options: { mcpDisabled, skillsDisabled } }). let jobPayloadOptions: Record = {}; if (job.payload) { try { const parsed = JSON.parse(job.payload) as Record; if (parsed?.options && typeof parsed.options === 'object' && !Array.isArray(parsed.options)) { jobPayloadOptions = parsed.options as Record; } } catch { logger.warn(`[worker:${this.workerId}] job ${jobId} failed to parse payload JSON`); } } const mcpDisabled = jobPayloadOptions.mcpDisabled === true; const skillsDisabled = jobPayloadOptions.skillsDisabled === true; const result = await runPiece(piece, enrichedInstruction, llmClient, workspacePath, callbacks, toolsConfig, { ...pieceOptions, cancelCheck, abortController: jobAbortController, safetyConfig: this.config.safety, searchFilter: this.config.searchFilter, customPiecesDir: customPieceDirs, contextManager, vlmEnabled: workerDef.vlm === true, jobId, // Phase 5: subtask handoff parent identity handoffContext, // Phase 5 PR2: when this run IS a subtask, pass parent identity + // child workspace path so the runner emits a memory-delta.json on // completion. Subtask workspaces follow `/subtasks/` // where N is the subtask job's issueNumber. parentJobId: isSubTask && parentJobId ? parentJobId : undefined, childWorkspaceRelative: isSubTask ? `subtasks/${issueNumber}` : undefined, // Mission Brief: only wire IO when this run is bound to a local // task (the brief is per-LocalTask, not per-job). Subtask runs // and gitea-issue runs leave it unset → MissionUpdate degrades // to a no-op and the system prompt MISSION block is skipped. missionBrief: isLocalTask && localTaskId !== null && !isSubTask ? this.repo.makeMissionBriefIO(localTaskId) : undefined, taskId: sessionIdentity.taskId, userId: sessionIdentity.userId, browserSessionState, browserSessionProfileId, browserSessionProfile, onAuthExpired, ownerId: job.ownerId, mcpConfig: mergeMcpConfig(this.config.mcp), notesService, notesInjectConfig, notesUserOrgIds, notesUserRole, skillCatalog: this.skillCatalog ?? undefined, mcpDisabled, skillsDisabled, }); logger.info(`[worker:${this.workerId}] job ${jobId} runPiece done: status=${result.status}`); this.writeRunDiagnostics(workspacePath, result); await this.handlePieceResult(result, job, reporter, workspacePath, isLocalTask, isSubTask, parentJobId); // Phase 3b: capture the terminal status for the jobs_total label. // result.status uses piece-runner's own enum // ('completed'|'aborted'|'error'|'waiting_human'|'waiting_subtasks'|'cancelled'); map to the // metric enum (waiting_subtasks stays "succeeded" for the metric // because the job pauses cleanly — not a failure). switch (result.status) { case 'completed': metricFinalStatus = 'succeeded'; break; case 'aborted': metricFinalStatus = 'aborted'; break; case 'cancelled': metricFinalStatus = 'cancelled'; break; case 'waiting_human': metricFinalStatus = 'waiting_human'; break; case 'waiting_subtasks': metricFinalStatus = 'succeeded'; break; case 'error': default: metricFinalStatus = 'failed'; break; } } catch (err) { const errorMsg = err instanceof Error ? err.message : String(err); const errorStack = err instanceof Error && err.stack ? err.stack : '(no stack)'; if (errorMsg.includes('Piece not found')) { logger.error(`[worker:${this.workerId}] job ${jobId} loadPiece failed piece=${job.pieceName} customPiecesDir=${this.config.customPiecesDir ?? 'none'} error=${errorMsg}`); } logger.error(`[worker:${this.workerId}] job ${jobId} failed: ${errorMsg}`); // Always log the stack so opaque errors (e.g. SqliteError: FOREIGN KEY // constraint failed) can be traced to the offending insert/update. logger.error(`[worker:${this.workerId}] job ${jobId} stack: ${errorStack}`); const retryDisposition = await this.scheduleRetryOrFail(job, errorMsg, workspacePath, 'worker_exception'); if (retryDisposition !== 'requeued_unhealthy') { await reporter.reportError(errorMsg); } await this.repo.addAuditLog(jobId, 'job_error', 'worker', { error: errorMsg }); } finally { if (heartbeatTimer) clearInterval(heartbeatTimer); // Phase 3b: emit job lifecycle counters + duration histogram. The // active gauge always decrements (matching the inc at start) so a // process restart can't leak active_jobs > 0 forever. if (this.workerMetrics) { try { this.workerMetrics.activeJobs.labels({ piece: metricPiece, profile: metricProfile }).dec(); this.workerMetrics.jobsTotal.labels({ piece: metricPiece, status: metricFinalStatus, profile: metricProfile }).inc(); this.workerMetrics.jobDurationSeconds .labels({ piece: metricPiece, status: metricFinalStatus, profile: metricProfile }) .observe((Date.now() - jobStartedAtMs) / 1000); } catch { /* metrics never affect business logic */ } } await this.repo.updateWorkerNodeHealth(this.workerId, { healthy: this.healthy, lastError: this.lastHealthError, inflightJobs: this.inflight, availableModels: [...this.availableModels], }); await this.repo.unlockIssue(repoName, issueNumber); } } private buildPieceCallbacks( jobId: string, reporter: LocalProgressReporter, isLocalTask: boolean, localTaskId: number | null, workspacePath: string, /** * Initial value of jobs.last_backend_id from the DB. Used to seed the * sticky guard so callers don't repeatedly write the same value on * every LLM iteration. Falsy/null = no backend resolved yet. */ initialLastBackendId: string | null = null, ): PieceRunCallbacks { let movementStartTime = Date.now(); const toolUsageCounts = new Map(); // Sticky-backend per design Open Question #3: take the first proxy // backend the job sees and never overwrite it. Subsequent calls that // happen to land on a different deployment are ignored at this layer // so the UI Pet doesn't flicker between sprites. The resolver also // guarantees that if the DB persist fails, the local sticky stays // unset so the next event can retry (otherwise a transient DB error // would orphan the worker → backend mapping for the lifetime of the // job). See src/worker/sticky-backend.ts. const workerIdLocal = this.workerId; const onBackendResolvedHandler = createStickyBackendResolver({ initial: initialLastBackendId, persist: (backendId) => this.repo.updateJob(jobId, { lastBackendId: backendId }), logger: { debug: (m) => logger.debug(m), info: (m) => logger.info(m), warn: (m) => logger.warn(m), }, workerId: workerIdLocal, jobId, }); // Phase 3b: local copy of the sticky backend so the LLM-call metric // has a stable backend_id label even before the persist returns. // Direct workers (non-proxy) never fire onBackendResolved, so we // fall back to the worker id (`gpu-rtx-a`) as the backend identity. let metricBackendId = initialLastBackendId ?? workerIdLocal; const metricModel = this.model ?? 'unknown'; const metricsRef = this.workerMetrics; // Pending tool calls awaiting result, keyed by callId. // On onToolResult, we pair via callId and persist a single tool_call comment. const pendingToolCalls = new Map; movement: string }>(); let currentMovementForTools = ''; const ARG_PREVIEW_CAP = 8 * 1024; const RESULT_PREVIEW_CAP = 16 * 1024; const truncate = (s: string, cap: number): string => s.length > cap ? s.slice(0, cap) + `\n…[truncated ${s.length - cap} bytes]` : s; return { onMovementStart: (name) => { movementStartTime = Date.now(); toolUsageCounts.clear(); currentMovementForTools = name; this.repo.updateJob(jobId, { currentMovement: name, currentActivity: null }); reporter.reportMovementStart(name); }, onToolUse: (toolName, input, callId) => { toolUsageCounts.set(toolName, (toolUsageCounts.get(toolName) ?? 0) + 1); const summary = summarizeToolInput(toolName, input); this.repo.updateJob(jobId, { currentActivity: `${toolName}: ${summary}`.slice(0, 200) }); reporter.reportToolUse(toolName, input); if (callId) { pendingToolCalls.set(callId, { name: toolName, args: input, movement: currentMovementForTools }); } if (jobEventBus.hasListeners(jobId)) { jobEventBus.emitJob(jobId, { type: 'tool_use', toolName, toolInput: summary, callId: callId ?? '', }); } }, onToolCallDelta: (callId, name, chunk) => { if (jobEventBus.hasListeners(jobId)) { jobEventBus.emitJob(jobId, { type: 'tool_use_delta', callId, name, chunk }); } }, onText: (text) => { if (jobEventBus.hasListeners(jobId)) { jobEventBus.emitJob(jobId, { type: 'text', text }); } }, onPromptProgress: (progress) => { if (jobEventBus.hasListeners(jobId)) { jobEventBus.emitJob(jobId, { type: 'prompt_progress', processed: progress.processed, total: progress.total, timeMs: progress.timeMs, cache: progress.cache, }); } }, onTextPreview: (movementName, preview) => { reporter.reportAssistantPreview(movementName, preview); }, onContextAction: (action) => { reporter.reportContextAction(action); }, onContextUpdate: (payload) => { this.repo.updateJobContext(jobId, payload).catch(err => { logger.warn(`[worker:${this.workerId}] failed to persist context for job ${jobId}: ${err}`); }); }, onLLMCall: (info) => { reporter.reportLLMCall(info); if (metricsRef) { try { metricsRef.llmCallsTotal .labels({ worker_id: workerIdLocal, backend_id: metricBackendId, model: metricModel }) .inc(); metricsRef.llmCallDurationSeconds .labels({ worker_id: workerIdLocal, backend_id: metricBackendId, model: metricModel }) .observe(info.durationMs / 1000); } catch { /* metrics best-effort */ } } }, onBackendResolved: (info) => { // Phase 3b: update the sticky backend id used for LLM-call // metrics. We capture every event (not just the first) so a // routing change mid-job is reflected in the next iteration's // counters; the DB-side sticky still preserves the first. if (info.backendId) { metricBackendId = info.backendId; } // Fire-and-forget: agent-loop's onBackendResolved signature is // sync (void). The resolver handles persist errors internally; // we just attach a final guard to log any unexpected throw. // cacheKey is observed but not persisted at the job level — // Phase B's NodeStatusWidget will track cache hits out-of-band. onBackendResolvedHandler(info).catch(err => { logger.warn(`[worker:${this.workerId}] sticky backend resolver threw for job ${jobId}: ${err}`); }); }, onMovementComplete: (movementName, result) => { const durationMs = Date.now() - movementStartTime; const tools: Record = {}; for (const [name, count] of toolUsageCounts) { tools[name] = count; } reporter.reportMovementComplete(movementName, result.output, result.next); if (isLocalTask) { const isTerminal = result.next === 'COMPLETE' || result.next === 'ABORT' || result.next === 'ASK'; const summary = !isTerminal ? (result.output?.trim() || undefined) : undefined; const progressBody = JSON.stringify({ movement: movementName, tools, durationMs, ...(summary ? { summary } : {}) }); this.repo.addLocalTaskComment(localTaskId!, 'agent', progressBody, 'progress') .catch(err => logger.warn(`[worker:${this.workerId}] failed to insert progress comment: ${err}`)); if (isTerminal && jobEventBus.hasListeners(jobId)) { jobEventBus.emitJob(jobId, { type: 'done' }); } } }, onToolResult: (toolName, info, callId) => { const { isError } = info; reporter.reportToolResult(toolName, info); // Pair with pending tool_use via callId, then persist as comment + emit SSE. const pending = callId ? pendingToolCalls.get(callId) : undefined; if (callId) pendingToolCalls.delete(callId); if (isLocalTask && callId && pending) { let argsStr: string; try { argsStr = truncate(JSON.stringify(pending.args), ARG_PREVIEW_CAP); } catch { argsStr = '""'; } const toolCallBody = JSON.stringify({ type: 'tool_call', callId, movement: pending.movement, name: toolName, args: argsStr, result: truncate(info.result, RESULT_PREVIEW_CAP), isError, durationMs: info.durationMs, cacheHit: info.cacheHit, }); this.repo.addLocalTaskComment(localTaskId!, 'agent', toolCallBody, 'progress') .catch(err => logger.warn(`[worker:${this.workerId}] tool_call comment failed: ${err}`)); } if (jobEventBus.hasListeners(jobId) && callId) { jobEventBus.emitJob(jobId, { type: 'tool_result', toolName, callId, toolOutput: truncate(info.result, 2 * 1024), toolIsError: isError, }); } // Phase 3b: count every tool invocation. success label is the // string form so Grafana queries can group by it. Same // best-effort guard as the LLM emission above. // // Phase 3b post-review: normalize the tool_name label so a // piece firing arbitrary mcp__*/user-defined names doesn't // explode label cardinality. MCP tools collapse to a single // `mcp` bucket; unknown names land in `unknown`. The full // tool_name is still visible in the activity log + reporter, // so the metric drop only affects Prometheus dimensions. if (metricsRef) { try { metricsRef.toolCallsTotal .labels({ tool_name: normalizeToolNameForMetric(toolName), success: isError ? 'false' : 'true' }) .inc(); } catch { /* metrics best-effort */ } } if (isLocalTask && !isError && (toolName === 'CheckItem' || toolName === 'CreateChecklist')) { try { const checklistDir = join(workspacePath, 'logs', 'checklists'); if (existsSync(checklistDir)) { const files = readdirSync(checklistDir).filter(f => f.endsWith('.json')); if (files.length > 0) { let latestFile = files[0]!; let latestMtime = 0; for (const file of files) { try { const { mtimeMs } = statSync(join(checklistDir, file)); if (mtimeMs > latestMtime) { latestMtime = mtimeMs; latestFile = file; } } catch { /* skip */ } } const data = JSON.parse(readFileSync(join(checklistDir, latestFile), 'utf-8')); const progressBody = JSON.stringify({ type: 'checklist', name: data.name, items: data.items, summary: data.summary, }); this.repo.addLocalTaskComment(localTaskId!, 'agent', progressBody, 'progress') .catch(err => logger.warn(`[worker:${this.workerId}] checklist progress comment failed: ${err}`)); } } } catch (err) { logger.warn(`[worker:${this.workerId}] checklist read failed: ${err}`); } } }, }; } private async handleReflectionJob(job: Job): Promise { const { runReflectionJob } = await import('./engine/reflection/reflection-runner.js'); try { const outcome = await runReflectionJob( { repo: this.repo, config: this.config, llmEndpoint: this.endpoint, llmModel: this.model, }, job ); await this.repo.updateJob(job.id, { status: outcome === 'failed' ? 'failed' : 'succeeded', currentActivity: null, }); } catch (e) { logger.error(`[reflection] runner threw job=${job.id} err=${String(e)}`); await this.repo.updateJob(job.id, { status: 'failed', currentActivity: null }); } } private async handlePieceResult( result: PieceRunResult, job: Job, reporter: LocalProgressReporter, workspacePath: string, isLocalTask: boolean, isSubTask: boolean, parentJobId: string | null, ): Promise { const { repo: repoName, issueNumber, id: jobId } = job; const localTaskId = getLocalTaskId(repoName); if (result.status === 'completed') { if (isLocalTask) { await this.commitLocalWorkspace(issueNumber, workspacePath); } await this.repo.updateJob(jobId, { status: 'succeeded', currentActivity: null }); this.enqueuePush(job, 'succeeded'); await maybeEnqueueReflection(this.repo, job, 'succeeded', this.config.reflection, this.config.provider.workers); let resultBody = result.finalOutput; if (resultBody) { resultBody = ensureKeepaGraphs(resultBody); } await reporter.reportFinalResult('completed', resultBody); } else if (result.status === 'waiting_human') { if (isSubTask && parentJobId) { await this.repo.updateJob(jobId, { status: 'waiting_human', resumeMovement: result.resumeMovement ?? null, askCount: job.askCount + 1 }); // Sub-task ASK is auto-answered below, so we don't notify on it. reporter.reportToolUse('ASK', { question: result.finalOutput }); await this.repo.addAuditLog(jobId, 'job_ask_subtask', 'worker', { question: result.finalOutput, resumeMovement: result.resumeMovement }); try { const answer = await this.answerSubtaskAsk(job, parentJobId, result.finalOutput); logger.info(`[worker:${this.workerId}] answered subtask ASK for job ${jobId}: ${answer.slice(0, 100)}`); const newJob = await this.repo.createJob({ repo: repoName, issueNumber, instruction: answer, pieceName: job.pieceName, askCount: job.askCount + 1, resumeMovement: result.resumeMovement, parentJobId: job.parentJobId, subtaskDepth: job.subtaskDepth, maxAttempts: 2, role: job.requiredRole, ownerId: job.ownerId, visibility: job.visibility, visibilityScopeOrgId: job.visibilityScopeOrgId, }); await this.repo.updateJob(newJob.id, { worktreePath: workspacePath }); await this.repo.addAuditLog(newJob.id, 'job_queued_subtask_ask_answer', 'worker', { originalJobId: jobId, question: result.finalOutput }); } catch (askErr) { logger.warn(`[worker:${this.workerId}] failed to answer subtask ASK, leaving as waiting_human: ${askErr}`); } } else { await this.repo.updateJob(jobId, { status: 'waiting_human', resumeMovement: result.resumeMovement ?? null, askCount: job.askCount + 1, }); this.enqueuePush(job, 'waiting_human'); await reporter.reportAsk(result.finalOutput); await this.repo.addAuditLog(jobId, 'job_ask', 'worker', { question: result.finalOutput, resumeMovement: result.resumeMovement, }); } } else if (result.status === 'waiting_subtasks') { const subJobs = await this.repo.getSubJobs(jobId); if (subJobs.length === 0) { if (result.resumeMovement) { logger.warn(`[worker:${this.workerId}] job ${jobId} waiting_subtasks but no sub-jobs exist, re-queuing to ${result.resumeMovement}`); await this.repo.updateJob(jobId, { status: 'queued', resumeMovement: result.resumeMovement, }); } else { logger.error(`[worker:${this.workerId}] job ${jobId} waiting_subtasks with no sub-jobs and no resumeMovement, failing`); await this.repo.updateJob(jobId, { status: 'failed' }); } await this.repo.addAuditLog(jobId, 'job_requeued_no_subtasks', 'worker', { resumeMovement: result.resumeMovement, action: result.resumeMovement ? 'requeued' : 'failed', }); } else { await this.repo.updateJob(jobId, { status: 'waiting_subtasks', resumeMovement: result.resumeMovement ?? null, }); await reporter.reportMovementStart('サブタスク待機中...'); await this.repo.addAuditLog(jobId, 'job_waiting_subtasks', 'worker', { resumeMovement: result.resumeMovement, }); } } else if (result.status === 'cancelled') { logger.info(`[worker:${this.workerId}] job ${jobId} cancelled`); await reporter.reportFinalResult('cancelled', result.finalOutput); } else { const retryDisposition = await this.scheduleRetryOrFail(job, result.finalOutput, workspacePath, result.abortReason ?? null); if (retryDisposition !== 'requeued_unhealthy') { await reporter.reportFinalResult(result.status, result.finalOutput); } if (retryDisposition === 'failed') { const outcome = result.status === 'aborted' ? 'aborted' : 'failed'; await maybeEnqueueReflection(this.repo, job, outcome, this.config.reflection, this.config.provider.workers); } } // サブタスク完了時(終端ステータスのみ): 全兄弟ジョブが完了なら親ジョブを再キュー const SUBTASK_TERMINAL = ['completed', 'error', 'aborted', 'cancelled']; if (isSubTask && parentJobId && SUBTASK_TERMINAL.includes(result.status)) { try { const parentJob = await this.repo.getJob(parentJobId); if (parentJob?.worktreePath) { const resultDir = join(parentJob.worktreePath, 'subtasks', String(issueNumber)); mkdirSync(resultDir, { recursive: true }); writeFileSync( join(resultDir, 'result.md'), `# サブタスク #${issueNumber} 結果\n\nステータス: ${result.status}\n\n${result.finalOutput}\n`, 'utf-8', ); } const requeued = await this.repo.requeueParentJobIfAllSubtasksDone(parentJobId); if (requeued) { logger.info(`[worker:${this.workerId}] all sub-tasks done, re-queued parent job ${parentJobId}`); } } catch (subErr) { logger.warn(`[worker:${this.workerId}] sub-task parent re-queue error: ${subErr}`); } } await this.repo.addAuditLog(jobId, `job_${result.status}`, 'worker', { movementCount: result.movementHistory.length, abortReason: result.abortReason ?? null, contextActionCount: result.contextActions.length, latestContextAction: result.contextActions[result.contextActions.length - 1] ?? null, }); } private resolveModel(piece: PieceDef): string | undefined { if (piece.model) { if (this.availableModels.size === 0 || this.availableModels.has(piece.model)) { return piece.model; } logger.warn(`[worker:${this.workerId}] piece model "${piece.model}" not available, falling back to ${this.model ?? ''}`); } // If the configured model is not in available models, auto-select the first available one if (this.model && this.availableModels.size > 0 && !this.availableModels.has(this.model)) { const autoModel = [...this.availableModels][0]!; logger.info(`[worker:${this.workerId}] configured model "${this.model}" not available, auto-selecting "${autoModel}"`); return autoModel; } return this.model; } private async scheduleRetryOrFail( job: Job, errorMsg: string, workspacePath?: string, abortReason: string | null = null, ): Promise<'requeued_unhealthy' | 'retry' | 'failed'> { const { id: jobId, attempt, maxAttempts } = job; const isLlmConnectionFatal = /connection error:\s*fetch failed|econnrefused|enotfound|etimedout|network error/i.test(errorMsg); if (isLlmConnectionFatal) { this.healthy = false; this.lastHealthError = errorMsg; this.availableModels.clear(); await this.repo.updateWorkerNodeHealth(this.workerId, { healthy: false, lastError: errorMsg, inflightJobs: this.inflight, availableModels: [], }); await this.repo.updateJob(jobId, { status: 'queued', workerId: null, errorSummary: errorMsg, abortReason, nextRetryAt: null, }); writeRetryHandoffSummary({ workspacePath: workspacePath ?? job.worktreePath, job, errorMsg, nextRetryAt: null, disposition: 'requeued_unhealthy', }); logger.warn(`[worker:${this.workerId}] job ${jobId} requeued after LLM connection error; worker marked unhealthy`); return 'requeued_unhealthy'; } if (attempt < maxAttempts) { const backoffIndex = Math.min(attempt - 1, this.config.retry.backoffSeconds.length - 1); const backoffSec = this.config.retry.backoffSeconds[backoffIndex] ?? this.config.retry.backoffSeconds[this.config.retry.backoffSeconds.length - 1] ?? 60; const nextRetryAt = new Date(Date.now() + backoffSec * 1000).toISOString(); await this.repo.updateJob(jobId, { status: 'retry', attempt: attempt + 1, nextRetryAt, errorSummary: errorMsg, abortReason, }); writeRetryHandoffSummary({ workspacePath: workspacePath ?? job.worktreePath, job, errorMsg, nextRetryAt, disposition: 'retry', }); logger.info(`[worker:${this.workerId}] job ${jobId} scheduled for retry ${attempt + 1}/${maxAttempts} at ${nextRetryAt}`); return 'retry'; } else { await this.repo.updateJob(jobId, { status: 'failed', errorSummary: errorMsg, abortReason }); // V2 push: only on terminal fail. Intermediate retry attempts are // silenced (matches V1's 4-second debounce intent). this.enqueuePush(job, 'failed'); writeRetryHandoffSummary({ workspacePath: workspacePath ?? job.worktreePath, job, errorMsg, nextRetryAt: null, disposition: 'failed', }); logger.info(`[worker:${this.workerId}] job ${jobId} failed permanently after ${maxAttempts} attempts`); return 'failed'; } } private listDir(dirPath: string): string[] { try { return readdirSync(dirPath); } catch { return []; } } private async commitLocalWorkspace( taskId: number, workspacePath: string, commitMessage?: string, ): Promise { const result = await commitWorkspaceChanges({ workspacePath, branchName: 'main', commitMessage: commitMessage?.trim() || `agent: update task #${taskId}`, ignoreEntries: ['input/', 'logs/'], }); if (!result.changed) { logger.info(`[worker:${this.workerId}] no local changes to commit for task #${taskId}`); return; } if (result.committed) { logger.info(`[worker:${this.workerId}] committed local workspace changes for task #${taskId}`); } if (result.pushed) { logger.info(`[worker:${this.workerId}] pushed local workspace changes for task #${taskId}`); } } }