maestro/src/worker.ts
oss-sync b5831943a4
Some checks failed
CI / build-and-test (push) Has been cancelled
sync: update from private repo (8bc400c)
2026-06-10 10:08:28 +00:00

2057 lines
85 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import { Repository, Job, localTaskRepoName, type JobRole } from './db/repository.js';
import { userPiecesDir } from './user-folder/paths.js';
import { BrowserSessionRepo } from './db/browser-session-repo.js';
import { assertProfileOwner } from './engine/browser-session-auth.js';
import { initMasterKey, decryptUserDek, decryptStateBlob } from './crypto/sessions.js';
import { OpenAICompatClient } from './llm/openai-compat.js';
import { llmRoutingKey, shouldRequeueForModelMismatch } from './llm/routing-key.js';
import { loadPiece, runPiece, PieceRunCallbacks, PieceDef, type PieceRunResult } from './engine/piece-runner.js';
import { LocalProgressReporter } from './progress/local-reporter.js';
import { buildLocalConversationContext } from './engine/local-context.js';
import { AppConfig, isExecutionWorker, type WorkerDef, type ReflectionConfig, DEFAULT_NOTES_INJECT, type NotesInjectConfig } from './config.js';
import { existsSync, mkdirSync, readdirSync, readFileSync, statSync, writeFileSync } from 'fs';
import { join } from 'path';
import { logger } from './logger.js';
import { commitWorkspaceChanges, ensureWorkspaceGitRepo } from './git/workspace-manager.js';
import { ContextManager, fetchOllamaContextLimit } from './engine/context-manager.js';
import { summarizeToolInput, type ActivityLogMetadata } from './progress/log-format.js';
import { ensureKeepaGraphs } from './engine/tools/amazon.js';
import type { McpTokenManager } from './mcp/token-manager.js';
import { mergeMcpConfig } from './mcp/config.js';
import { NotesService } from './notes/notes-service.js';
import { NotesRepository } from './notes/notes-repository.js';
import { createStickyBackendResolver } from './worker/sticky-backend.js';
import { pickIdlerIndex } from './worker/idle-routing.js';
import { jobEventBus } from './bridge/job-events.js';
import { normalizeToolNameForMetric } from './metrics/tool-name-allowlist.js';
const RETRY_HANDOFF_MAX_LENGTH = 8_000;
const RETRY_DIAGNOSTICS_PREVIEW_LENGTH = 1_200;
const RETRY_LESSONS_MAX_LINES = 12;
function buildTimeContextBlock(): string {
const now = new Date();
const utc = now.toISOString();
const jst = new Intl.DateTimeFormat('ja-JP', {
timeZone: 'Asia/Tokyo',
year: 'numeric',
month: '2-digit',
day: '2-digit',
hour: '2-digit',
minute: '2-digit',
second: '2-digit',
hour12: false,
weekday: 'short',
}).format(now);
return [
'## 実行時刻コンテキスト',
`- Current time (UTC): ${utc}`,
`- Current time (JST): ${jst}`,
'- 時刻依存の判断(今日/昨日/直近◯時間/最新ニュース等)は必ずこの時刻を基準に行うこと。',
'',
].join('\n');
}
function getLocalTaskId(repoName: string): number | null {
const match = /^local\/task-(\d+)$/.exec(repoName);
if (!match) return null;
return Number(match[1]);
}
function getSubTaskParentJobId(repoName: string): string | null {
const match = /^subtask\/([0-9a-f-]{36})$/.exec(repoName);
if (!match) return null;
return match[1]!;
}
/**
* Browser session を keying するための「論理タスク ID」を解決する。
*
* - local task の直接実行 → そのタスクの ID (string)
* - subtask 実行 → 親方向に最大 5 段まで walk up し、最初に見つかる
* local task の ID。subtaskDepth は config 上 2 が上限なので余裕を見て 5
* - 親が gitea issue 等 local task でないジョブの場合 → null (BrowseWeb は
* noVNC モードを使えない / 旧来の頭出しに fallback する)
*/
async function resolveSessionTaskId(
repo: Repository,
job: Job,
): Promise<{ taskId: string | undefined; userId: string | undefined }> {
const directLocalTaskId = getLocalTaskId(job.repo);
if (directLocalTaskId !== null) {
return { taskId: String(directLocalTaskId), userId: job.ownerId ?? undefined };
}
let cursor: string | null = getSubTaskParentJobId(job.repo);
let hops = 0;
while (cursor && hops < 5) {
const parent = await repo.getJob(cursor);
if (!parent) return { taskId: undefined, userId: job.ownerId ?? undefined };
const parentLocalTaskId = getLocalTaskId(parent.repo);
if (parentLocalTaskId !== null) {
return {
taskId: String(parentLocalTaskId),
// owner は親と同じはずだが、念のため fallback も用意
userId: parent.ownerId ?? job.ownerId ?? undefined,
};
}
cursor = parent.parentJobId ?? getSubTaskParentJobId(parent.repo);
hops++;
}
return { taskId: undefined, userId: job.ownerId ?? undefined };
}
function buildUiMetadataBlock(job: Job): string {
return [
'---',
`ui_profile: ${job.requiredRole}`,
`ui_output_format: ${/ui_output_format:\s*(text|markdown|json)/i.exec(job.instruction)?.[1]?.toLowerCase() ?? 'markdown'}`,
`ui_ask_policy: ${/ui_ask_policy:\s*(low|high)/i.exec(job.instruction)?.[1]?.toLowerCase() ?? 'low'}`,
`ui_priority: ${/ui_priority:\s*(low|medium|high)/i.exec(job.instruction)?.[1]?.toLowerCase() ?? 'medium'}`,
'---',
].join('\n');
}
function truncateRetryText(text: string, maxLength: number): string {
const trimmed = text.trim();
if (trimmed.length <= maxLength) return trimmed;
return `${trimmed.slice(0, maxLength)}...`;
}
function readRetryLessons(workspacePath: string): string[] {
const logPath = join(workspacePath, 'logs', 'lessons.jsonl');
if (!existsSync(logPath)) return [];
try {
return readFileSync(logPath, 'utf-8')
.split('\n')
.filter(Boolean)
.slice(-RETRY_LESSONS_MAX_LINES)
.map((line) => {
try {
const data = JSON.parse(line) as { movement?: string; lessons?: string };
const movement = data.movement ? `[${data.movement}] ` : '';
return `- ${movement}${truncateRetryText(String(data.lessons ?? ''), 500)}`;
} catch {
return `- ${truncateRetryText(line, 500)}`;
}
})
.filter((line) => line.trim() !== '-');
} catch {
return [];
}
}
function readLastRunDiagnostics(workspacePath: string): string[] {
const diagnosticsPath = join(workspacePath, 'logs', 'last-run-diagnostics.json');
if (!existsSync(diagnosticsPath)) return [];
try {
const data = JSON.parse(readFileSync(diagnosticsPath, 'utf-8')) as {
status?: string;
abortReason?: string | null;
finalOutput?: string;
movementHistory?: Array<{
name?: string;
next?: string | null;
toolsUsed?: string[];
outputPreview?: string;
}>;
contextActions?: unknown[];
};
const lines: string[] = [];
if (data.status || data.abortReason) {
lines.push(`- 前回ステータス: ${data.status ?? 'unknown'}${data.abortReason ? ` (${data.abortReason})` : ''}`);
}
for (const movement of data.movementHistory ?? []) {
const tools = movement.toolsUsed && movement.toolsUsed.length > 0
? ` tools=${movement.toolsUsed.join(',')}`
: '';
lines.push(`- movement ${movement.name ?? 'unknown'} -> ${movement.next ?? 'unknown'}${tools}`);
if (movement.outputPreview) {
lines.push(` - output: ${truncateRetryText(movement.outputPreview, 300)}`);
}
}
if (data.finalOutput) {
lines.push(`- 最終出力プレビュー: ${truncateRetryText(data.finalOutput, RETRY_DIAGNOSTICS_PREVIEW_LENGTH)}`);
}
if (data.contextActions && data.contextActions.length > 0) {
lines.push(`- context action: ${JSON.stringify(data.contextActions.slice(-3))}`);
}
return lines;
} catch {
return [];
}
}
export function buildRetryHandoffSummary(params: {
workspacePath: string;
job: Job;
errorMsg: string;
nextRetryAt?: string | null;
disposition: 'requeued_unhealthy' | 'retry' | 'failed';
}): string {
const lines: string[] = [
'# Retry Handoff',
'',
`Generated: ${new Date().toISOString()}`,
`Job: ${params.job.id}`,
`Disposition: ${params.disposition}`,
`Attempt: ${params.job.attempt}/${params.job.maxAttempts}`,
];
if (params.nextRetryAt) lines.push(`Next retry at: ${params.nextRetryAt}`);
lines.push('', '## 失敗理由', truncateRetryText(params.errorMsg, 2_000));
const diagnostics = readLastRunDiagnostics(params.workspacePath);
if (diagnostics.length > 0) {
lines.push('', '## 前回実行の要約', ...diagnostics);
}
const lessons = readRetryLessons(params.workspacePath);
if (lessons.length > 0) {
lines.push('', '## これまでの lessons', ...lessons);
}
lines.push(
'',
'## 次のエージェントへの指示',
'- 前回の失敗理由と movement の進捗を踏まえ、同じ探索や同じ失敗を繰り返さないこと。',
'- 既に完了している作業・生成済みファイル・確認済み事項は再実行前に workspace とログで確認すること。',
'- 必要な情報が不足している場合は、全体を読み直すのではなく targeted Read / Grep / Bash で範囲を絞ること。',
);
return `${truncateRetryText(lines.join('\n'), RETRY_HANDOFF_MAX_LENGTH)}\n`;
}
function writeRetryHandoffSummary(params: {
workspacePath: string | null | undefined;
job: Job;
errorMsg: string;
nextRetryAt?: string | null;
disposition: 'requeued_unhealthy' | 'retry' | 'failed';
}): void {
if (!params.workspacePath) return;
try {
const logsDir = join(params.workspacePath, 'logs');
mkdirSync(logsDir, { recursive: true });
const summary = buildRetryHandoffSummary({
workspacePath: params.workspacePath,
job: params.job,
errorMsg: params.errorMsg,
nextRetryAt: params.nextRetryAt,
disposition: params.disposition,
});
writeFileSync(join(logsDir, 'retry-summary.md'), summary, 'utf-8');
} catch (err) {
logger.warn(`[worker] failed to write retry handoff summary: ${err}`);
}
}
function buildRetryHandoffContext(workspacePath: string, job: Job): string {
if (job.attempt <= 1 && !job.errorSummary) return '';
const summaryPath = join(workspacePath, 'logs', 'retry-summary.md');
if (!existsSync(summaryPath)) return '';
try {
const summary = truncateRetryText(readFileSync(summaryPath, 'utf-8'), RETRY_HANDOFF_MAX_LENGTH);
if (!summary) return '';
return [
'## Retry 復帰用引き継ぎ',
'このジョブは前回実行からの retry / 再キューです。以下を前提に、重複作業を避けて継続してください。',
'',
summary,
].join('\n');
} catch {
return '';
}
}
export async function maybeEnqueueReflection(
repo: Repository,
job: Job,
outcome: 'succeeded' | 'failed' | 'aborted',
cfg: Pick<ReflectionConfig, 'enabled' | 'workerRequired' | 'perUserDailyBudgetTokens'>,
workers: WorkerDef[] = [],
): Promise<void> {
if (!cfg.enabled) return;
if (job.taskKind === 'reflection') return;
// No-auth mode runs every job with ownerId=null. Reflection is per-user
// (memory/pieces live under data/users/{userId}/), so fall back to the same
// 'local' namespace the rest of the no-auth path uses (ToolContext, pieces,
// user-folder). Without this the enqueue gate skipped forever and reflection
// silently never ran in no-auth deployments.
const reflectionOwner = job.ownerId ?? 'local';
// worker_required enforcement: when true, at least one worker must have 'reflection' in its roles
if (cfg.workerRequired) {
const hasReflectionWorker = workers.some(
(w) => Array.isArray(w.roles) && w.roles.includes('reflection'),
);
if (!hasReflectionWorker) {
logger.warn(`[reflection] enqueue skipped reason=no_reflection_worker user=${reflectionOwner}`);
return;
}
}
// Per-user daily token budget check.
// Cap=0 means "no limit" — useful for fresh installs that haven't tuned the budget yet.
const cap = cfg.perUserDailyBudgetTokens ?? 0;
if (cap > 0) {
// Compute today's start in UTC (00:00:00.000 UTC).
const now = new Date();
const todayStartMs = Date.UTC(now.getUTCFullYear(), now.getUTCMonth(), now.getUTCDate());
const metrics = repo.aggregateReflectionMetrics(reflectionOwner, todayStartMs);
const spent = metrics.tokensIn + metrics.tokensOut;
if (spent >= cap) {
const spentM = (spent / 1_000_000).toFixed(1);
const capM = (cap / 1_000_000).toFixed(1);
logger.info(`[reflection] enqueue skipped reason=budget user=${reflectionOwner} spent=${spentM}M cap=${capM}M`);
return;
}
}
const payload = JSON.stringify({
originalJobId: job.id,
userId: reflectionOwner,
pieceName: job.pieceName,
outcome,
});
await repo.createJob({
repo: `local/reflection-${job.id}`,
issueNumber: 0,
instruction: '',
pieceName: 'reflection',
role: 'reflection',
ownerId: reflectionOwner,
visibility: 'private',
taskKind: 'reflection',
payload,
} as any);
logger.info(`[reflection] enqueued original=${job.id} owner=${reflectionOwner} piece=${job.pieceName} outcome=${outcome}`);
}
export class Worker {
private running = false;
private inflight = 0;
private polling = false;
private stopped = false;
private pollInterval: ReturnType<typeof setInterval> | null = null;
private healthInterval: ReturnType<typeof setInterval> | null = null;
/** Live sibling list, injected by WorkerManager for idle-preferring claims. */
private siblingsAccessor: (() => Worker[]) | null = null;
/** Last initialize() result — whether we'd actually claim if poked. */
private lastAvailable = false;
/** Job id we deferred to an idler last round (safety net against stuck yields). */
private lastYieldedJobId: string | null = null;
private workerId: string;
private endpoint: string;
private model: string | undefined;
private availableModels: Set<string> = new Set();
private healthy = false;
private lastHealthError: string | null = null;
private contextLimitTokens: number = 128_000;
private mcpTokenManager: McpTokenManager | null = null;
/**
* Phase 3b: optional Prometheus metrics handle. When set, the worker
* emits jobs_total / active_jobs / job_duration_seconds in
* executeJob's start/finally, llm_calls_total via the AgentLoop
* onLLMCall callback, and tool_calls_total via the new onToolMetric
* callback in agent-loop.ts. Wired by WorkerManager after the metric
* registry exists.
*/
private workerMetrics: import('./metrics/worker-metrics.js').WorkerMetrics | null = null;
private skillCatalog: import('./engine/skills.js').SkillCatalog | null = null;
/**
* V2 Web Push notification service. Null when push is disabled via
* config or when the worker is built without one (tests). Hooks fire
* via enqueue (fire-and-forget) so a slow push service can't block job
* execution.
*
* Spec: docs/superpowers/specs/2026-05-28-browser-notifications-v2-webpush.md.
*/
private pushService: import('./push-service.js').PushService | null = null;
constructor(
workerId: string,
endpoint: string,
model: string | undefined,
private repo: Repository,
private config: AppConfig,
) {
this.workerId = workerId;
this.endpoint = endpoint;
this.model = model;
}
public setMcpTokenManager(tm: McpTokenManager | null): void {
this.mcpTokenManager = tm;
}
public setSkillCatalog(catalog: import('./engine/skills.js').SkillCatalog): void {
this.skillCatalog = catalog;
}
public setPushService(svc: import('./push-service.js').PushService | null): void {
this.pushService = svc;
}
/**
* Hot-swap the global config on a still-running worker. Used by
* WorkerManager's differential rebuild: when a config change does NOT
* touch this worker's own def (e.g. a tool size limit changed), we keep
* the worker — and any in-flight job — alive and just refresh the config
* it reads for future jobs. Def-derived values (roles, endpoint,
* maxConcurrency) are read live via getWorkerDef(), so they stay correct.
* In-flight jobs keep the settings they captured at start, by design.
*/
public updateConfig(config: AppConfig): void {
this.config = config;
}
/** Jobs currently executing in this worker's detached loops. */
public get inflightCount(): number {
return this.inflight;
}
/** Free execution slots right now (max_concurrency inflight). */
public get freeSlots(): number {
return Math.max(0, this.getMaxConcurrency() - this.inflight);
}
/** True when this worker would actually pick up a job if poked. */
public get availableForClaim(): boolean {
return this.running && !this.stopped && this.lastAvailable
&& isExecutionWorker(this.getWorkerDef());
}
/** Whether this worker serves jobs of the given role. */
public canClaimRole(role: string): boolean {
return this.supportsRole(role);
}
/** Nudge this worker to poll immediately (hands a yielded job to an idler). */
public pokePoll(): void {
void this.processNext();
}
/** WorkerManager injects the live sibling list so claims prefer idler workers. */
public setSiblingsAccessor(fn: () => Worker[]): void {
this.siblingsAccessor = fn;
}
/**
* Find the idlest sibling that has strictly more free slots than us and
* serves `role`. Returns null when we are (tied for) the most free, in which
* case we should claim the job ourselves.
*/
private findIdlerCompetitor(role: string): Worker | null {
const others = (this.siblingsAccessor?.() ?? []).filter((s) => s !== this);
const idx = pickIdlerIndex(
this.freeSlots,
others.map((s) => ({
freeSlots: s.freeSlots,
availableForClaim: s.availableForClaim,
servesRole: s.canClaimRole(role),
})),
);
return idx >= 0 ? others[idx]! : null;
}
/**
* Fire a V2 push for a job status transition. Fire-and-forget — never
* throws and never awaits the underlying queue. Skips silently when
* - push is disabled (pushService === null)
* - the job has no owner (legacy / system-issued)
* - the job is not a local task (sub-task pushes go to the parent owner;
* we still send for direct local tasks).
* Reflection jobs are also skipped — they're an internal mechanism, not
* user-facing work.
*/
private enqueuePush(
job: Job,
event: 'running' | 'succeeded' | 'failed' | 'waiting_human',
): void {
if (!this.pushService) return;
if (job.taskKind === 'reflection') return;
if (!job.ownerId) return;
const localTaskId = getLocalTaskId(job.repo);
if (localTaskId === null) return;
// Title lookup is cheap (single-row SELECT) and synchronous via the
// raw db handle. Falling back to a generic label is fine — the
// push-service uses privacy-default payloads unless the user opted
// in via include_details.
let taskTitle = `Task #${localTaskId}`;
try {
const row = this.repo.getDb()
.prepare('SELECT title FROM local_tasks WHERE id = ?')
.get(localTaskId) as { title: string | null } | undefined;
if (row?.title) taskTitle = row.title;
} catch {
// best-effort; fall through with default title
}
this.pushService.enqueue({
event,
taskId: localTaskId,
taskTitle,
pieceName: job.pieceName,
ownerId: job.ownerId,
});
}
/**
* Phase 3b: install (or remove) the Prometheus metrics handle.
* Idempotent — calling with the same handle twice is fine. Null
* clears the handle, useful when reconfiguring at runtime.
*/
public setWorkerMetrics(
metrics: import('./metrics/worker-metrics.js').WorkerMetrics | null,
): void {
this.workerMetrics = metrics;
}
private getWorkerDef(): WorkerDef {
const workerDef = this.config.provider.workers.find((worker) => worker.id === this.workerId);
if (!workerDef) {
throw new Error(`Worker config not found: ${this.workerId}`);
}
return workerDef;
}
private getSupportedRoles(): string[] {
return this.getWorkerDef().roles ?? ['auto', 'fast', 'quality'];
}
private getMaxConcurrency(): number {
return Math.max(1, this.getWorkerDef().maxConcurrency ?? 1);
}
async initialize(): Promise<boolean> {
const workerDef = this.getWorkerDef();
const enabled = workerDef.enabled !== false;
if (!enabled) {
await this.repo.upsertWorkerNode({
workerId: this.workerId,
endpoint: this.endpoint,
enabled: false,
healthy: false,
roles: this.getSupportedRoles(),
availableModels: [],
inflightJobs: this.inflight,
maxConcurrency: this.getMaxConcurrency(),
lastError: 'disabled by config',
});
this.healthy = false;
this.lastHealthError = 'disabled by config';
logger.info(`[worker:${this.workerId}] disabled by config; skipping polling`);
return false;
}
try {
const ollamaBase = this.endpoint.replace(/\/v1\/?$/, '');
// Try Ollama /api/tags first, then fall back to OpenAI-compatible /v1/models.
//
// Forward `Authorization: Bearer <apiKey>` when the worker has one
// configured. The discovery probes (/api/tags / /v1/models) were
// previously sent un-authenticated, which caused 30s-interval 401
// floods against AAO Gateway endpoints (gateway requires Bearer auth
// on /v1/models) — the worker then logged "failed to fetch model
// list" indefinitely and `availableModels` stayed empty.
// Discovered during 2026-05-20 dogfooding on production aao.
const apiKey = this.getWorkerDef().apiKey;
const init: RequestInit = apiKey
? { headers: { Authorization: `Bearer ${apiKey}` } }
: {};
let models: string[] = [];
const ollamaRes = await fetch(`${ollamaBase}/api/tags`, init).catch(() => null);
if (ollamaRes?.ok) {
const data = await ollamaRes.json() as { models?: Array<{ name: string }> };
models = (data.models ?? []).map((m: { name: string }) => m.name);
} else {
const openaiBase = this.endpoint.replace(/\/?$/, '');
const openaiRes = await fetch(`${openaiBase}/models`, init).catch(() => null);
if (openaiRes?.ok) {
const data = await openaiRes.json() as { data?: Array<{ id: string }> };
models = (data.data ?? []).map((m: { id: string }) => m.id);
} else if (this.model) {
throw new Error(`failed to fetch model list from both /api/tags and /v1/models`);
}
// llama-server compat: model 未設定なら model 一覧 API は必須ではないので空配列で続行。
}
this.availableModels = new Set(models);
await this.repo.upsertWorkerNode({
workerId: this.workerId,
endpoint: this.endpoint,
enabled: true,
healthy: true,
roles: this.getSupportedRoles(),
availableModels: [...this.availableModels],
inflightJobs: this.inflight,
maxConcurrency: this.getMaxConcurrency(),
lastError: null,
});
if (!this.healthy || this.lastHealthError !== null) {
logger.info(`[worker:${this.workerId}] available models: ${[...this.availableModels].join(', ')}`);
}
this.healthy = true;
this.lastHealthError = null;
// Auto-detect context limit from Ollama if not configured
if (!this.config.context?.limitTokens) {
if (this.model) {
const contextLimit = await fetchOllamaContextLimit(this.endpoint, this.model);
if (contextLimit !== this.contextLimitTokens) {
logger.info(`[worker:${this.workerId}] context limit updated: ${contextLimit} tokens`);
this.contextLimitTokens = contextLimit;
}
} else {
// No model configured — try llama.cpp /props endpoint for context limit
const contextLimit = await fetchOllamaContextLimit(this.endpoint, '');
if (contextLimit !== this.contextLimitTokens) {
logger.info(`[worker:${this.workerId}] context limit updated: ${contextLimit} tokens`);
this.contextLimitTokens = contextLimit;
}
}
} else {
this.contextLimitTokens = this.config.context.limitTokens;
}
return true;
} catch (e) {
const errorMessage = e instanceof Error ? e.message : String(e);
this.availableModels.clear();
await this.repo.upsertWorkerNode({
workerId: this.workerId,
endpoint: this.endpoint,
enabled: true,
healthy: false,
roles: this.getSupportedRoles(),
availableModels: [],
inflightJobs: this.inflight,
maxConcurrency: this.getMaxConcurrency(),
lastError: errorMessage,
});
if (this.healthy || this.lastHealthError !== errorMessage) {
logger.warn(`[worker:${this.workerId}] failed to fetch model list: ${e}`);
}
this.healthy = false;
this.lastHealthError = errorMessage;
return false;
}
}
start(): void {
if (this.running) return;
this.running = true;
logger.info(`[worker:${this.workerId}] started`);
const tick = () => void this.processNext();
tick();
const baseInterval = 5000;
const jitter = Math.floor(Math.random() * 2000);
this.pollInterval = setInterval(tick, baseInterval + jitter);
const healthIntervalSeconds = Math.max(10, this.getWorkerDef().healthcheckIntervalSeconds ?? 30);
this.healthInterval = setInterval(() => void this.initialize(), healthIntervalSeconds * 1000);
}
stop(): void {
this.running = false;
this.stopped = true;
if (this.pollInterval) {
clearInterval(this.pollInterval);
this.pollInterval = null;
}
if (this.healthInterval) {
clearInterval(this.healthInterval);
this.healthInterval = null;
}
logger.info(`[worker:${this.workerId}] stopped`);
}
async waitForCompletion(timeoutMs = 30000): Promise<boolean> {
if (this.inflight === 0) return true;
const start = Date.now();
while (this.inflight > 0 && (Date.now() - start) < timeoutMs) {
await new Promise(resolve => setTimeout(resolve, 500));
}
return this.inflight === 0;
}
get id(): string { return this.workerId; }
private async processNext(): Promise<void> {
if (!isExecutionWorker(this.getWorkerDef()) || !this.running || this.stopped) return;
if (this.polling) return; // claim loop is single-flight (prevents over-claim)
this.polling = true;
try {
// スタックジョブ watchdog: LLM タイムアウトの2倍を閾値にする
try {
const staleMinutes = Math.max(20, (this.config.provider.timeoutMinutes ?? 10) * 2);
this.repo.recoverStuckRunningJobs(staleMinutes);
} catch (err) {
logger.warn(`[worker:${this.workerId}] recoverStuckRunningJobs error: ${err}`);
}
const available = await this.initialize();
this.lastAvailable = available;
if (!available) return;
const max = this.getMaxConcurrency();
while (this.inflight < max && this.running && !this.stopped) {
// Idle-preferring gate (most-free-wins): if a strictly-idler sibling
// serves the next job's role, hand it off (nudge that worker) instead
// of piling on. Safety net: if we already deferred this exact job last
// round and it is still here, the idler didn't take it (unhealthy /
// raced) — claim it ourselves so a job never gets stuck.
//
// Only consult the gate when there are sibling workers to defer to AND
// the repo supports peeking. Single-worker setups and unit tests skip
// it entirely — no extra query, no added latency, original claim timing.
const siblings = this.siblingsAccessor?.();
if (siblings && siblings.length > 1 && this.repo.peekNextClaimable) {
const peek = await this.repo.peekNextClaimable(this.workerId);
if (peek && peek.id !== this.lastYieldedJobId) {
const idler = this.findIdlerCompetitor(peek.requiredRole);
if (idler) {
this.lastYieldedJobId = peek.id;
idler.pokePoll();
break;
}
}
this.lastYieldedJobId = null;
}
// リトライジョブを優先
const job = await this.repo.claimNextRetryJob(this.workerId)
?? await this.repo.claimNextJob(this.workerId);
if (!job) break;
this.inflight++;
void this.runJobTracked(job); // 並行実行: await しない
}
} catch (err) {
logger.error(`[worker:${this.workerId}] processNext error: ${err}`);
} finally {
this.polling = false;
}
}
/** Run one job to completion, always restoring the inflight counter. */
private async runJobTracked(job: Job): Promise<void> {
try {
await this.executeJob(job);
} catch (err) {
logger.error(`[worker:${this.workerId}] runJobTracked error job=${job.id}: ${err}`);
} finally {
this.inflight--;
await this.reportInflight();
}
}
/** Push the live inflight count (and current health) to the worker_nodes row. */
private async reportInflight(): Promise<void> {
try {
await this.repo.updateWorkerNodeHealth(this.workerId, {
healthy: this.healthy,
lastError: this.lastHealthError,
inflightJobs: this.inflight,
availableModels: [...this.availableModels],
});
} catch (err) {
logger.warn(`[worker:${this.workerId}] reportInflight failed: ${err}`);
}
}
private supportsRole(role: string): boolean {
return this.getSupportedRoles().includes(role);
}
private buildLogMetadata(role: JobRole): ActivityLogMetadata {
return { workerId: this.workerId, mode: role };
}
/**
* サブタスクの ASK に対して、親ジョブの文脈を使って LLM に回答を生成させる
*/
private async answerSubtaskAsk(subtaskJob: Job, parentJobId: string, question: string): Promise<string> {
const parentJob = await this.repo.getJob(parentJobId);
const parentInstruction = parentJob?.instruction ?? '(不明)';
const workerDefForAnswer = this.getWorkerDef();
// Gateway routes by the subtask's tier; direct keeps the worker's model.
const resolvedModel = llmRoutingKey({
isGateway: workerDefForAnswer.proxy === true,
role: subtaskJob.requiredRole,
resolveDirectModel: () => this.model,
});
const timeoutMs = (this.config.provider.timeoutMinutes ?? 10) * 60 * 1000;
const llmClient = new OpenAICompatClient(
this.endpoint,
resolvedModel,
workerDefForAnswer.apiKey,
this.config.provider.retry,
timeoutMs,
this.contextLimitTokens,
this.config.safety?.promptGuardRatio,
undefined,
{ proxy: workerDefForAnswer.proxy === true },
);
const messages: import('./llm/openai-compat.js').Message[] = [
{
role: 'system',
content: [
'あなたはタスクを管理する親エージェントです。',
'サブタスクがユーザーに確認を求めていますが、あなたが代わりに回答してください。',
'元の依頼の意図を汲み取り、サブタスクが作業を継続できるよう具体的に回答してください。',
'回答のみを簡潔に返してください。',
].join('\n'),
},
{
role: 'user',
content: [
'## 元の依頼',
parentInstruction,
'',
'## サブタスクの指示',
subtaskJob.instruction,
'',
'## サブタスクからの質問',
question,
].join('\n'),
},
];
let answer = '';
for await (const event of llmClient.chat(messages)) {
if (event.type === 'text') {
answer += event.text;
} else if (event.type === 'error') {
throw new Error(`LLM error: ${event.error}`);
}
}
return answer.trim() || '特に制約はありません。あなたの判断で進めてください。';
}
private writeRunDiagnostics(workspacePath: string, result: PieceRunResult): void {
try {
const logsDir = join(workspacePath, 'logs');
mkdirSync(logsDir, { recursive: true });
const diagnostics = {
generatedAt: new Date().toISOString(),
status: result.status,
abortReason: result.abortReason ?? null,
finalOutput: result.finalOutput,
movementHistory: result.movementHistory.map((entry) => ({
name: entry.name,
next: entry.result.next,
toolsUsed: entry.result.toolsUsed,
outputPreview: entry.result.output.slice(0, 600),
})),
contextActions: result.contextActions,
};
writeFileSync(join(logsDir, 'last-run-diagnostics.json'), `${JSON.stringify(diagnostics, null, 2)}\n`, 'utf-8');
} catch (err) {
logger.warn(`[worker:${this.workerId}] failed to write run diagnostics: ${err}`);
}
}
/**
* Resolve the workspace path for a job and ensure the directory tree
* (input/output/logs + a git repo) exists. Persists the resolved path
* back to the job + local-task records so downstream consumers can find
* the workspace.
*
* Throws on jobs that are neither local tasks nor sub-tasks (the orchestrator
* doesn't currently spin its own workspaces for raw repo/issue jobs).
*/
private async prepareJobWorkspace(
job: Job,
isLocalTask: boolean,
isSubTask: boolean,
localTaskId: number | null,
): Promise<string> {
const { repo: repoName, issueNumber, id: jobId } = job;
const workspacePath = isLocalTask
? join(this.config.worktreeDir, 'local', String(localTaskId))
: isSubTask
? (job.worktreePath ?? join(this.config.worktreeDir, 'subtasks', jobId))
: join(this.config.worktreeDir, repoName, String(issueNumber));
if (isLocalTask) {
mkdirSync(workspacePath, { recursive: true });
mkdirSync(join(workspacePath, 'input'), { recursive: true });
mkdirSync(join(workspacePath, 'output'), { recursive: true });
mkdirSync(join(workspacePath, 'logs'), { recursive: true });
await ensureWorkspaceGitRepo(workspacePath);
if (localTaskId !== null) {
await this.repo.updateLocalTask(localTaskId, { workspacePath });
}
} else if (isSubTask) {
// SpawnSubTask 経由で worktreePath が設定されている前提
if (!job.worktreePath) {
throw new Error(`Sub-task job ${jobId} has no worktreePath set`);
}
mkdirSync(job.worktreePath, { recursive: true });
mkdirSync(join(job.worktreePath, 'output'), { recursive: true });
mkdirSync(join(job.worktreePath, 'logs'), { recursive: true });
await ensureWorkspaceGitRepo(job.worktreePath);
} else {
throw new Error(`Unsupported job type: repo="${repoName}" is neither a local task nor a sub-task`);
}
await this.repo.updateJob(jobId, { worktreePath: workspacePath });
return workspacePath;
}
/**
* Run the two pre-execution gates: role capability check, issue lock.
* On failure, requeue the job (so another worker can pick it up) and
* return false so executeJob returns early.
*/
private async acquireJobOrRequeue(job: Job): Promise<boolean> {
const { repo: repoName, issueNumber, id: jobId } = job;
if (!this.supportsRole(job.requiredRole)) {
await this.repo.updateJob(jobId, { status: 'queued', workerId: null });
await this.repo.addAuditLog(jobId, 'job_requeued_capability_mismatch', 'worker', {
workerId: this.workerId,
requiredRole: job.requiredRole,
});
logger.info(`[worker:${this.workerId}] requeued job ${jobId} due to role mismatch (role=${job.requiredRole})`);
return false;
}
const locked = await this.repo.lockIssue(repoName, issueNumber, jobId);
if (!locked) {
await this.repo.updateJob(jobId, { status: 'queued', workerId: null });
await this.repo.addAuditLog(jobId, 'job_requeued_issue_locked', 'worker', {
workerId: this.workerId,
});
logger.info(`[worker:${this.workerId}] job ${jobId}: issue ${repoName}#${issueNumber} already locked, skipping`);
return false;
}
return true;
}
private async executeJob(job: Job): Promise<void> {
const { repo: repoName, issueNumber, id: jobId } = job;
const localTaskId = getLocalTaskId(repoName);
const isLocalTask = localTaskId !== null;
const parentJobId = getSubTaskParentJobId(repoName);
const isSubTask = parentJobId !== null;
const logMetadata = this.buildLogMetadata(job.requiredRole);
if (!(await this.acquireJobOrRequeue(job))) return;
// Phase 3b: job-lifecycle metrics. Inc active_jobs at start; capture
// a terminal status + duration in the finally block. `profile` maps
// to the assigned required role (the multi-profile / multi-piece
// operator-facing dimension).
const metricPiece = job.pieceName ?? 'unknown';
const metricProfile = job.requiredRole ?? 'unknown';
const jobStartedAtMs = Date.now();
let metricFinalStatus: 'succeeded' | 'failed' | 'aborted' | 'cancelled' | 'waiting_human' | 'error' = 'error';
if (this.workerMetrics) {
try {
this.workerMetrics.activeJobs.labels({ piece: metricPiece, profile: metricProfile }).inc();
} catch { /* metrics never affect business logic */ }
}
await this.repo.updateWorkerNodeHealth(this.workerId, {
healthy: this.healthy,
lastError: this.lastHealthError,
inflightJobs: this.inflight,
availableModels: [...this.availableModels],
});
// claimNextJob がすでに status = 'running' にセット済み
await this.repo.addAuditLog(jobId, 'job_started', 'worker', {});
// V2 push: notify on the first time a job transitions queued→running.
// Retry runs are intentionally silent — V1's 4s debounce relied on this
// and we keep the same UX (one running notification, not one per retry).
if (job.attempt === 1) {
this.enqueuePush(job, 'running');
}
// Reflection jobs bypass workspace preparation and the agent / LLM loop.
// task_kind='agent' (default) keeps the pre-existing piece-runner path.
if (job.taskKind === 'reflection') {
try {
await this.handleReflectionJob(job);
} finally {
await this.repo.updateWorkerNodeHealth(this.workerId, {
healthy: this.healthy,
lastError: this.lastHealthError,
inflightJobs: this.inflight,
availableModels: [...this.availableModels],
});
await this.repo.unlockIssue(repoName, issueNumber);
}
return;
}
const workspacePath = await this.prepareJobWorkspace(job, isLocalTask, isSubTask, localTaskId);
// 進捗レポーター
// ローカルタスク・サブタスクともに activity.log を書き出す
// サブタスクでは isSubTask=true を渡し、DB コメント書き込みをスキップする
const reporter = new LocalProgressReporter(this.repo, localTaskId ?? issueNumber, workspacePath, logMetadata, isSubTask);
// 会話コンテキストの組み立て
let enrichedInstruction = `${buildTimeContextBlock()}${job.instruction}`;
if (isLocalTask) {
try {
const comments = await this.repo.listLocalTaskComments(localTaskId);
const outputFiles = this.listDir(join(workspacePath, 'output'));
const inputFiles = this.listDir(join(workspacePath, 'input'));
const contextBody = buildLocalConversationContext({
comments,
jobInstruction: job.instruction,
inputFiles,
outputFiles,
});
enrichedInstruction = `${buildTimeContextBlock()}${contextBody}`;
} catch (err) {
logger.warn(`[worker:${this.workerId}] failed to build local context: ${err}`);
}
}
const retryHandoffContext = buildRetryHandoffContext(workspacePath, job);
if (retryHandoffContext) {
enrichedInstruction = `${enrichedInstruction}\n\n${retryHandoffContext}`;
}
// watchdog 誤検知防止: runPiece 実行中に updated_at を定期更新
let heartbeatTimer: ReturnType<typeof setInterval> | undefined;
try {
// Piece 読み込み: per-user カスタムディレクトリ → global カスタムディレクトリ → builtin の順に探索
// No-auth jobs (ownerId null) resolve pieces from data/users/local/pieces, matching
// where no-auth POST now writes (LOCAL_OWNER='local' in pieces-api.ts).
const userFolderRoot = this.config.userFolderRoot ?? './data/users';
const ownerForPieces = job.ownerId ?? 'local';
const customPieceDirs = [
userPiecesDir(userFolderRoot, ownerForPieces),
this.config.customPiecesDir,
].filter((d): d is string => !!d);
logger.info(`[worker:${this.workerId}] job ${jobId} loadPiece piece=${job.pieceName} customDirs=[${customPieceDirs.join(', ') || 'none'}] piecesDir=pieces`);
const piece = loadPiece(job.pieceName, 'pieces', customPieceDirs);
// Model-mismatch requeue gate (direct mode only — gateway routes by
// role, see shouldRequeueForModelMismatch).
if (
shouldRequeueForModelMismatch({
isGateway: this.getWorkerDef().proxy === true,
pieceModel: piece.model,
availableModels: this.availableModels,
workerModel: this.model,
})
) {
await this.repo.updateJob(jobId, {
status: 'queued',
workerId: null,
errorSummary: `Required model ${piece.model} is not available on ${this.workerId}`,
});
await this.repo.addAuditLog(jobId, 'job_requeued_model_mismatch', 'worker', {
workerId: this.workerId,
requiredModel: piece.model,
availableModels: [...this.availableModels],
});
logger.info(`[worker:${this.workerId}] requeued job ${jobId} due to model mismatch (${piece.model})`);
return;
}
// MCP 認証ゲート: piece.required_mcp に記載されたサーバーのトークンがなければ park
const missingMcp = (piece.required_mcp ?? []).filter(
(serverId) => !this.mcpTokenManager || !this.mcpTokenManager.hasToken(job.ownerId ?? '', serverId),
);
if (missingMcp.length > 0) {
await this.repo.updateJob(jobId, {
status: 'waiting_human',
waitReason: 'mcp_auth_required',
resumeMovement: piece.initial_movement ?? null,
});
if (localTaskId !== null) {
await this.repo.addLocalTaskComment(
localTaskId,
'system',
`この piece は MCP サーバー「${missingMcp.join(', ')}」との連携が必要です。Settings → MCP 接続から連携してください。`,
'event',
);
}
logger.info(`[worker:${this.workerId}] mcp gate parked job=${jobId} missing=${missingMcp.join(',')}`);
return;
}
const workerDefForLlm = this.getWorkerDef();
const isProxyWorker = workerDefForLlm.proxy === true;
// Gateway routes by role; direct resolves the worker's model. The
// resolver thunk runs only in direct mode (no auto-select via gateway).
const resolvedModel = llmRoutingKey({
isGateway: isProxyWorker,
role: job.requiredRole,
resolveDirectModel: () => this.resolveModel(piece),
});
const timeoutMs = (this.config.provider.timeoutMinutes ?? 10) * 60 * 1000;
const llmClient = new OpenAICompatClient(
this.endpoint,
resolvedModel,
workerDefForLlm.apiKey,
this.config.provider.retry,
timeoutMs,
this.contextLimitTokens,
this.config.safety?.promptGuardRatio,
(line) => reporter.reportPromptPreflight(line),
{ proxy: isProxyWorker },
);
// ASK 再開の場合、resume_movement を使用
const pieceOptions = {
resumeMovement: job.resumeMovement ?? undefined,
askCount: job.askCount,
maxAskPerJob: this.config.ask.maxPerJob,
checkInterjections: isLocalTask && localTaskId !== null && !isSubTask
? async (movementName: string) => {
const comments = await this.repo.getUninjectedComments(localTaskId);
if (comments.length === 0) return [];
const injected = comments.map(c => ({ id: c.id, body: c.body }));
this.repo.markCommentsInjected(injected.map(c => c.id));
reporter.reportInterjectionAck(injected, movementName);
return injected;
}
: undefined,
spawnSubTask: job.subtaskDepth < this.config.subtasks.maxDepth
? async (params: { title: string; instruction: string; piece?: string }) => {
const subJobs = await this.repo.getSubJobs(jobId);
const subtaskIndex = subJobs.length + 1;
if (subJobs.length >= this.config.subtasks.maxPerParent) {
throw new Error(`サブタスク上限 (${this.config.subtasks.maxPerParent}) に達しました。これ以上のサブタスクは作成できません。`);
}
const subtaskWorkspace = join(workspacePath, 'subtasks', String(subtaskIndex));
mkdirSync(subtaskWorkspace, { recursive: true });
mkdirSync(join(subtaskWorkspace, 'output'), { recursive: true });
mkdirSync(join(subtaskWorkspace, 'logs'), { recursive: true });
// 親ジョブの role を継承
const subJobInstruction = [
`ui_profile: ${job.requiredRole}`,
'',
`# ${params.title}`,
'',
params.instruction,
].join('\n');
const subJob = await this.repo.createJob({
repo: `subtask/${jobId}`,
issueNumber: subtaskIndex,
instruction: subJobInstruction,
pieceName: params.piece ?? 'general',
parentJobId: jobId,
subtaskDepth: job.subtaskDepth + 1,
maxAttempts: 2,
role: job.requiredRole,
ownerId: job.ownerId,
visibility: job.visibility,
visibilityScopeOrgId: job.visibilityScopeOrgId,
});
await this.repo.updateJob(subJob.id, { worktreePath: subtaskWorkspace });
logger.info(`[worker:${this.workerId}] spawned sub-task #${subtaskIndex} depth=${job.subtaskDepth + 1} job=${subJob.id}`);
return { jobId: subJob.id, subtaskIndex, workspacePath: subtaskWorkspace };
}
: undefined,
};
const callbacks = this.buildPieceCallbacks(
jobId,
reporter,
isLocalTask,
localTaskId,
workspacePath,
// Seed the backend tracker with whatever was already persisted
// for this job (e.g. on retry / resume from ASK). Only matters for
// proxy workers; direct workers never produce a backend event.
isProxyWorker ? (job.lastBackendId ?? null) : null,
llmClient,
logMetadata,
);
// 開始コメント
await reporter.reportMovementStart(`${piece.name} タスク開始`);
// キャンセル用 AbortController
const jobAbortController = new AbortController();
// キャンセルチェック: DB のジョブ状態が 'cancelled' になっていたら中断する
const cancelCheck = (): boolean => {
const isCancelled = this.repo.getJobStatusSync(jobId) === 'cancelled';
if (isCancelled) {
jobAbortController.abort();
}
return isCancelled;
};
// ContextManager 初期化
const contextManager = new ContextManager(this.config.context ?? {});
contextManager.setContextLimit(this.contextLimitTokens);
// Piece 実行(ハートビート開始: 5分ごとに updated_at を更新)
heartbeatTimer = setInterval(() => {
try { this.repo.touchJobUpdatedAt(jobId); } catch { /* ignore */ }
}, 5 * 60 * 1000);
// VLM 対応: worker の vlm=true なら vision 設定を worker 自身の endpoint/model で上書き
const workerDef = this.getWorkerDef();
const toolsConfig = workerDef.vlm
? { ...this.config.tools, visionBaseUrl: this.endpoint, visionModel: this.model }
: this.config.tools;
logger.info(`[worker:${this.workerId}] job ${jobId} runPiece start`);
// Browser session keying: resolve the session-task identity for the
// ToolContext. Threaded into BrowseWeb / InteractiveBrowse so each
// task gets its own noVNC session and the Captcha Pool stays
// separate from per-task sessions. Subtasks walk up to find the
// root local task ID.
const sessionIdentity = await resolveSessionTaskId(this.repo, job);
// ── Browser session profile binding ─────────────────────────────
// If this job is bound to a browser_session_profile, decrypt the
// captured Playwright storageState and pass it into runPiece so
// BrowseWeb can inject it into BrowserContext. Owner-mismatch and
// expired-profile checks fail-fast before the agent loop starts.
let browserSessionState: object | undefined;
let browserSessionProfileId: number | undefined;
let browserSessionProfile:
| { loggedInSelector: string | null; loginUrlPatterns: string[] }
| undefined;
let onAuthExpired:
| ((profileId: number, reason: string) => void)
| undefined;
if (job.browserSessionProfileId) {
const sessRepo = new BrowserSessionRepo(this.repo.getDb());
const profile = sessRepo.getProfileByIdUnsafe(job.browserSessionProfileId);
if (!profile) {
sessRepo.audit({
actorUserId: job.ownerId ?? null,
ownerId: null,
profileId: job.browserSessionProfileId,
action: 'use',
result: 'error',
reason: 'profile not found',
jobId: job.id,
});
throw new Error(`Browser session profile ${job.browserSessionProfileId} not found`);
}
// Fail-closed owner check: a job with null/missing ownerId must not
// be allowed to decrypt any profile, even if the profile id would
// otherwise resolve. Helper audits + throws on rejection. Extracted
// to src/engine/browser-session-auth.ts so the contract is unit
// tested in isolation from the Worker class.
assertProfileOwner(profile, job, sessRepo);
if (profile.status !== 'active' || !profile.encryptedStateBlob) {
sessRepo.audit({
actorUserId: job.ownerId, ownerId: profile.ownerId, profileId: profile.id,
action: 'use', result: 'error', reason: `status=${profile.status}`, jobId: job.id,
});
throw new Error(`AUTH_SESSION_EXPIRED: profile ${profile.id} status=${profile.status}`);
}
const masterKeyPath = this.config.secrets?.masterKeyPath ?? './data/secrets/master.key';
const master = initMasterKey(masterKeyPath);
const encDek = sessRepo.getUserDek(profile.ownerId);
if (!encDek) {
sessRepo.audit({
actorUserId: job.ownerId,
ownerId: profile.ownerId,
profileId: profile.id,
action: 'decrypt',
result: 'error',
reason: 'user DEK missing',
jobId: job.id,
});
throw new Error('user DEK missing for browser session profile');
}
let dek: Buffer;
try {
dek = decryptUserDek(master, encDek);
} catch (e) {
sessRepo.audit({
actorUserId: job.ownerId,
ownerId: profile.ownerId,
profileId: profile.id,
action: 'decrypt',
result: 'error',
reason: `dek decrypt failed: ${(e as Error).message}`,
jobId: job.id,
});
throw e;
}
let stateJson: string;
try {
stateJson = decryptStateBlob(dek, profile.encryptedStateBlob);
} catch (e) {
sessRepo.audit({
actorUserId: job.ownerId,
ownerId: profile.ownerId,
profileId: profile.id,
action: 'decrypt',
result: 'error',
reason: `state decrypt failed: ${(e as Error).message}`,
jobId: job.id,
});
throw e;
}
browserSessionState = JSON.parse(stateJson) as object;
browserSessionProfileId = profile.id;
browserSessionProfile = {
loggedInSelector: profile.loggedInSelector,
loginUrlPatterns: profile.loginUrlPatterns,
};
onAuthExpired = (pid, reason) => {
sessRepo.markProfileStatus(pid, 'expired', reason);
sessRepo.audit({
actorUserId: job.ownerId, ownerId: profile.ownerId, profileId: pid,
action: 'expire', result: 'success', reason, jobId: job.id,
});
// Best-effort task-level notification. Subtask jobs and
// gitea-issue jobs may not have a numeric local_task id.
if (localTaskId !== null) {
this.repo.addLocalTaskComment(
localTaskId,
'agent',
`⚠️ Browser session "${profile.label}" expired: ${reason}. Re-login from Settings → Browser Sessions.`,
'progress',
).catch(() => { /* ignore — comment posting is best-effort */ });
}
};
sessRepo.audit({
actorUserId: job.ownerId, ownerId: profile.ownerId, profileId: profile.id,
action: 'use', result: 'success', jobId: job.id,
});
sessRepo.touchUsed(profile.id);
}
// Piece handoff: when this job continues an earlier one in the same
// local_task, agent-loop injects a "this is a continuation of piece X"
// block into the system prompt. We resolve the prev piece name + the
// most recent agent result/ask comment as the LLM-visible carry-over.
let handoffContext: import('./engine/agent-loop.js').HandoffContext | undefined;
if (job.continuedFromJobId && isLocalTask && localTaskId !== null) {
const prevJob = await this.repo.getJob(job.continuedFromJobId);
if (prevJob) {
const prevResultComment = await this.repo.getLatestResultComment(localTaskId);
handoffContext = {
prevPiece: prevJob.pieceName,
prevResult: prevResultComment?.body ?? null,
};
} else {
logger.warn(`[worker:${this.workerId}] continued_from_job_id=${job.continuedFromJobId} not found for job ${jobId}; skipping handoff context`);
}
}
// Shared-knowledge notes: construct once per job, forwarded into
// ToolContext so agent-loop can inject "## Subscribed Notes" into
// the system prompt. Only active when the job has a known owner.
let notesService: NotesService | undefined;
let notesInjectConfig: NotesInjectConfig | undefined;
let notesUserOrgIds: string[] | undefined;
let notesUserRole: 'admin' | 'user' | undefined;
if (job.ownerId) {
try {
const notesRepo = new NotesRepository(this.repo.getDb());
const userFolderRoot = this.config.userFolderRoot ?? './data/users';
notesService = new NotesService({
db: this.repo.getDb(),
repo: notesRepo,
userFolderRoot,
getUserOrgIds: (uid) => this.repo.listUserGiteaOrgs(uid).map((o) => o.orgId),
audit: (action, actor, target) => {
try {
this.repo.addAuditLog(jobId, action, actor, { target });
} catch (err) {
logger.warn(`[notes-audit] failed: ${(err as Error).message}`);
}
},
});
const cfgNotes = this.config.notes?.inject ?? {};
notesInjectConfig = {
perNoteMaxKb: cfgNotes.perNoteMaxKb ?? DEFAULT_NOTES_INJECT.perNoteMaxKb,
totalMaxKb: cfgNotes.totalMaxKb ?? DEFAULT_NOTES_INJECT.totalMaxKb,
overBudgetStrategy: cfgNotes.overBudgetStrategy ?? DEFAULT_NOTES_INJECT.overBudgetStrategy,
};
notesUserOrgIds = this.repo.listUserGiteaOrgs(job.ownerId).map((o) => o.orgId);
const ownerRow = this.repo.getUserById(job.ownerId);
notesUserRole = ownerRow?.role === 'admin' ? 'admin' : 'user';
} catch (err) {
logger.warn(`[worker:${this.workerId}] job ${jobId} notes setup failed: ${(err as Error).message}`);
}
}
// Parse per-task options from job payload (e.g. { options: { mcpDisabled, skillsDisabled } }).
let jobPayloadOptions: Record<string, unknown> = {};
if (job.payload) {
try {
const parsed = JSON.parse(job.payload) as Record<string, unknown>;
if (parsed?.options && typeof parsed.options === 'object' && !Array.isArray(parsed.options)) {
jobPayloadOptions = parsed.options as Record<string, unknown>;
}
} catch {
logger.warn(`[worker:${this.workerId}] job ${jobId} failed to parse payload JSON`);
}
}
const mcpDisabled = jobPayloadOptions.mcpDisabled === true;
const skillsDisabled = jobPayloadOptions.skillsDisabled === true;
const result = await runPiece(piece, enrichedInstruction, llmClient, workspacePath, callbacks, toolsConfig, {
...pieceOptions,
cancelCheck,
abortController: jobAbortController,
safetyConfig: this.config.safety,
searchFilter: this.config.searchFilter,
customPiecesDir: customPieceDirs,
contextManager,
vlmEnabled: workerDef.vlm === true,
jobId, // Phase 5: subtask handoff parent identity
handoffContext,
// Phase 5 PR2: when this run IS a subtask, pass parent identity +
// child workspace path so the runner emits a memory-delta.json on
// completion. Subtask workspaces follow `<parent>/subtasks/<N>`
// where N is the subtask job's issueNumber.
parentJobId: isSubTask && parentJobId ? parentJobId : undefined,
childWorkspaceRelative: isSubTask ? `subtasks/${issueNumber}` : undefined,
// Mission Brief: only wire IO when this run is bound to a local
// task (the brief is per-LocalTask, not per-job). Subtask runs
// and gitea-issue runs leave it unset → MissionUpdate degrades
// to a no-op and the system prompt MISSION block is skipped.
missionBrief: isLocalTask && localTaskId !== null && !isSubTask
? this.repo.makeMissionBriefIO(localTaskId)
: undefined,
taskId: sessionIdentity.taskId,
userId: sessionIdentity.userId,
browserSessionState,
browserSessionProfileId,
browserSessionProfile,
onAuthExpired,
ownerId: job.ownerId,
mcpConfig: mergeMcpConfig(this.config.mcp),
notesService,
notesInjectConfig,
notesUserOrgIds,
notesUserRole,
skillCatalog: this.skillCatalog ?? undefined,
mcpDisabled,
skillsDisabled,
});
logger.info(`[worker:${this.workerId}] job ${jobId} runPiece done: status=${result.status}`);
this.writeRunDiagnostics(workspacePath, result);
await this.handlePieceResult(result, job, reporter, workspacePath, isLocalTask, isSubTask, parentJobId);
// Phase 3b: capture the terminal status for the jobs_total label.
// result.status uses piece-runner's own enum
// ('completed'|'aborted'|'error'|'waiting_human'|'waiting_subtasks'|'cancelled'); map to the
// metric enum (waiting_subtasks stays "succeeded" for the metric
// because the job pauses cleanly — not a failure).
switch (result.status) {
case 'completed': metricFinalStatus = 'succeeded'; break;
case 'aborted': metricFinalStatus = 'aborted'; break;
case 'cancelled': metricFinalStatus = 'cancelled'; break;
case 'waiting_human': metricFinalStatus = 'waiting_human'; break;
case 'waiting_subtasks': metricFinalStatus = 'succeeded'; break;
case 'error':
default: metricFinalStatus = 'failed'; break;
}
} catch (err) {
const errorMsg = err instanceof Error ? err.message : String(err);
const errorStack = err instanceof Error && err.stack ? err.stack : '(no stack)';
if (errorMsg.includes('Piece not found')) {
logger.error(`[worker:${this.workerId}] job ${jobId} loadPiece failed piece=${job.pieceName} customPiecesDir=${this.config.customPiecesDir ?? 'none'} error=${errorMsg}`);
}
logger.error(`[worker:${this.workerId}] job ${jobId} failed: ${errorMsg}`);
// Always log the stack so opaque errors (e.g. SqliteError: FOREIGN KEY
// constraint failed) can be traced to the offending insert/update.
logger.error(`[worker:${this.workerId}] job ${jobId} stack: ${errorStack}`);
const retryDisposition = await this.scheduleRetryOrFail(job, errorMsg, workspacePath, 'worker_exception');
if (retryDisposition !== 'requeued_unhealthy') {
await reporter.reportError(errorMsg);
}
await this.repo.addAuditLog(jobId, 'job_error', 'worker', { error: errorMsg });
} finally {
if (heartbeatTimer) clearInterval(heartbeatTimer);
// Phase 3b: emit job lifecycle counters + duration histogram. The
// active gauge always decrements (matching the inc at start) so a
// process restart can't leak active_jobs > 0 forever.
if (this.workerMetrics) {
try {
this.workerMetrics.activeJobs.labels({ piece: metricPiece, profile: metricProfile }).dec();
this.workerMetrics.jobsTotal.labels({ piece: metricPiece, status: metricFinalStatus, profile: metricProfile }).inc();
this.workerMetrics.jobDurationSeconds
.labels({ piece: metricPiece, status: metricFinalStatus, profile: metricProfile })
.observe((Date.now() - jobStartedAtMs) / 1000);
} catch { /* metrics never affect business logic */ }
}
await this.repo.updateWorkerNodeHealth(this.workerId, {
healthy: this.healthy,
lastError: this.lastHealthError,
inflightJobs: this.inflight,
availableModels: [...this.availableModels],
});
await this.repo.unlockIssue(repoName, issueNumber);
}
}
private buildPieceCallbacks(
jobId: string,
reporter: LocalProgressReporter,
isLocalTask: boolean,
localTaskId: number | null,
workspacePath: string,
/**
* Initial value of jobs.last_backend_id from the DB. Seeds the backend
* tracker (and the sticky-routing hint) so a resumed/retried job goes
* back to the backend that already holds its KV cache.
* Falsy/null = no backend resolved yet.
*/
initialLastBackendId: string | null = null,
/** LLM client of this job — receives the sticky-routing hint per switch. */
llmClient?: { setPreferredBackendId(backendId: string | null): void },
/**
* The reporter's metadata object (shared by reference): mutating
* `backendId` here makes every subsequent activity.log line carry
* `[backend:...]` so the Progress tab can show the physical backend
* behind a proxy worker.
*/
logMetadata?: ActivityLogMetadata,
): PieceRunCallbacks {
let movementStartTime = Date.now();
const toolUsageCounts = new Map<string, number>();
// Backend tracker (follow-current semantics, 2026-06): persists
// jobs.last_backend_id whenever the resolved backend CHANGES so the UI
// (pet, badges) follows where the job actually runs. Switches are rare
// because the gateway honors the x-aao-preferred-backend sticky hint
// (KV-cache reuse) — they only happen when the preferred backend goes
// offline or saturates. The tracker still guarantees that a failed DB
// persist leaves the in-memory value unchanged so the next event
// retries. See src/worker/sticky-backend.ts.
const workerIdLocal = this.workerId;
const backendTracker = createStickyBackendResolver({
initial: initialLastBackendId,
persist: (backendId) => this.repo.updateJob(jobId, { lastBackendId: backendId }),
logger: {
debug: (m) => logger.debug(m),
info: (m) => logger.info(m),
warn: (m) => logger.warn(m),
},
workerId: workerIdLocal,
jobId,
});
// Seed the sticky-routing hint + activity-log backend tag from the DB
// value (resume/retry goes straight back to the cache-warm backend).
if (initialLastBackendId) {
llmClient?.setPreferredBackendId(initialLastBackendId);
if (logMetadata) logMetadata.backendId = initialLastBackendId;
}
// Phase 3b: local copy of the sticky backend so the LLM-call metric
// has a stable backend_id label even before the persist returns.
// Direct workers (non-proxy) never fire onBackendResolved, so we
// fall back to the worker id (`gpu-rtx-a`) as the backend identity.
let metricBackendId = initialLastBackendId ?? workerIdLocal;
const metricModel = this.model ?? 'unknown';
const metricsRef = this.workerMetrics;
// Pending tool calls awaiting result, keyed by callId.
// On onToolResult, we pair via callId and persist a single tool_call comment.
const pendingToolCalls = new Map<string, { name: string; args: Record<string, unknown>; movement: string }>();
let currentMovementForTools = '';
const ARG_PREVIEW_CAP = 8 * 1024;
const RESULT_PREVIEW_CAP = 16 * 1024;
const truncate = (s: string, cap: number): string =>
s.length > cap ? s.slice(0, cap) + `\n…[truncated ${s.length - cap} bytes]` : s;
return {
onMovementStart: (name) => {
movementStartTime = Date.now();
toolUsageCounts.clear();
currentMovementForTools = name;
this.repo.updateJob(jobId, { currentMovement: name, currentActivity: null });
reporter.reportMovementStart(name);
},
onToolUse: (toolName, input, callId) => {
toolUsageCounts.set(toolName, (toolUsageCounts.get(toolName) ?? 0) + 1);
const summary = summarizeToolInput(toolName, input);
this.repo.updateJob(jobId, { currentActivity: `${toolName}: ${summary}`.slice(0, 200) });
reporter.reportToolUse(toolName, input);
if (callId) {
pendingToolCalls.set(callId, { name: toolName, args: input, movement: currentMovementForTools });
}
if (jobEventBus.hasListeners(jobId)) {
jobEventBus.emitJob(jobId, {
type: 'tool_use',
toolName,
toolInput: summary,
callId: callId ?? '',
});
}
},
onToolCallDelta: (callId, name, chunk) => {
if (jobEventBus.hasListeners(jobId)) {
jobEventBus.emitJob(jobId, { type: 'tool_use_delta', callId, name, chunk });
}
},
onText: (text) => {
if (jobEventBus.hasListeners(jobId)) {
jobEventBus.emitJob(jobId, { type: 'text', text });
}
},
onPromptProgress: (progress) => {
if (jobEventBus.hasListeners(jobId)) {
jobEventBus.emitJob(jobId, {
type: 'prompt_progress',
processed: progress.processed,
total: progress.total,
timeMs: progress.timeMs,
cache: progress.cache,
});
}
},
onTextPreview: (movementName, preview) => {
reporter.reportAssistantPreview(movementName, preview);
},
onContextAction: (action) => {
reporter.reportContextAction(action);
},
onContextUpdate: (payload) => {
this.repo.updateJobContext(jobId, payload).catch(err => {
logger.warn(`[worker:${this.workerId}] failed to persist context for job ${jobId}: ${err}`);
});
},
onLLMCall: (info) => {
reporter.reportLLMCall(info);
if (metricsRef) {
try {
metricsRef.llmCallsTotal
.labels({ worker_id: workerIdLocal, backend_id: metricBackendId, model: metricModel })
.inc();
metricsRef.llmCallDurationSeconds
.labels({ worker_id: workerIdLocal, backend_id: metricBackendId, model: metricModel })
.observe(info.durationMs / 1000);
} catch { /* metrics best-effort */ }
}
},
onBackendResolved: (info) => {
// Phase 3b: update the backend id used for LLM-call metrics.
if (info.backendId) {
metricBackendId = info.backendId;
// Sticky routing: ask the gateway to keep using this backend on
// the next request (KV-cache affinity).
llmClient?.setPreferredBackendId(info.backendId);
// Tag subsequent activity.log lines with the physical backend so
// the Progress tab shows more than the proxy worker's name.
if (logMetadata) logMetadata.backendId = info.backendId;
}
// Fire-and-forget: agent-loop's onBackendResolved signature is
// sync (void). The tracker handles persist errors internally;
// we just attach a final guard to log any unexpected throw.
// cacheKey is observed but not persisted at the job level —
// Phase B's NodeStatusWidget will track cache hits out-of-band.
backendTracker.onEvent(info).catch(err => {
logger.warn(`[worker:${this.workerId}] backend tracker threw for job ${jobId}: ${err}`);
});
},
onMovementComplete: (movementName, result) => {
const durationMs = Date.now() - movementStartTime;
const tools: Record<string, number> = {};
for (const [name, count] of toolUsageCounts) {
tools[name] = count;
}
reporter.reportMovementComplete(movementName, result.output, result.next);
if (isLocalTask) {
const isTerminal = result.next === 'COMPLETE' || result.next === 'ABORT' || result.next === 'ASK';
const summary = !isTerminal ? (result.output?.trim() || undefined) : undefined;
const progressBody = JSON.stringify({ movement: movementName, tools, durationMs, ...(summary ? { summary } : {}) });
this.repo.addLocalTaskComment(localTaskId!, 'agent', progressBody, 'progress')
.catch(err => logger.warn(`[worker:${this.workerId}] failed to insert progress comment: ${err}`));
if (isTerminal && jobEventBus.hasListeners(jobId)) {
jobEventBus.emitJob(jobId, { type: 'done' });
}
}
},
onToolResult: (toolName, info, callId) => {
const { isError } = info;
reporter.reportToolResult(toolName, info);
// Pair with pending tool_use via callId, then persist as comment + emit SSE.
const pending = callId ? pendingToolCalls.get(callId) : undefined;
if (callId) pendingToolCalls.delete(callId);
if (isLocalTask && callId && pending) {
let argsStr: string;
try { argsStr = truncate(JSON.stringify(pending.args), ARG_PREVIEW_CAP); }
catch { argsStr = '"<unserializable>"'; }
const toolCallBody = JSON.stringify({
type: 'tool_call',
callId,
movement: pending.movement,
name: toolName,
args: argsStr,
result: truncate(info.result, RESULT_PREVIEW_CAP),
isError,
durationMs: info.durationMs,
cacheHit: info.cacheHit,
});
this.repo.addLocalTaskComment(localTaskId!, 'agent', toolCallBody, 'progress')
.catch(err => logger.warn(`[worker:${this.workerId}] tool_call comment failed: ${err}`));
}
if (jobEventBus.hasListeners(jobId) && callId) {
jobEventBus.emitJob(jobId, {
type: 'tool_result',
toolName,
callId,
toolOutput: truncate(info.result, 2 * 1024),
toolIsError: isError,
});
}
// Phase 3b: count every tool invocation. success label is the
// string form so Grafana queries can group by it. Same
// best-effort guard as the LLM emission above.
//
// Phase 3b post-review: normalize the tool_name label so a
// piece firing arbitrary mcp__*/user-defined names doesn't
// explode label cardinality. MCP tools collapse to a single
// `mcp` bucket; unknown names land in `unknown`. The full
// tool_name is still visible in the activity log + reporter,
// so the metric drop only affects Prometheus dimensions.
if (metricsRef) {
try {
metricsRef.toolCallsTotal
.labels({ tool_name: normalizeToolNameForMetric(toolName), success: isError ? 'false' : 'true' })
.inc();
} catch { /* metrics best-effort */ }
}
if (isLocalTask && !isError && (toolName === 'CheckItem' || toolName === 'CreateChecklist')) {
try {
const checklistDir = join(workspacePath, 'logs', 'checklists');
if (existsSync(checklistDir)) {
const files = readdirSync(checklistDir).filter(f => f.endsWith('.json'));
if (files.length > 0) {
let latestFile = files[0]!;
let latestMtime = 0;
for (const file of files) {
try {
const { mtimeMs } = statSync(join(checklistDir, file));
if (mtimeMs > latestMtime) {
latestMtime = mtimeMs;
latestFile = file;
}
} catch { /* skip */ }
}
const data = JSON.parse(readFileSync(join(checklistDir, latestFile), 'utf-8'));
const progressBody = JSON.stringify({
type: 'checklist',
name: data.name,
items: data.items,
summary: data.summary,
});
this.repo.addLocalTaskComment(localTaskId!, 'agent', progressBody, 'progress')
.catch(err => logger.warn(`[worker:${this.workerId}] checklist progress comment failed: ${err}`));
}
}
} catch (err) {
logger.warn(`[worker:${this.workerId}] checklist read failed: ${err}`);
}
}
},
};
}
private async handleReflectionJob(job: Job): Promise<void> {
const { runReflectionJob } = await import('./engine/reflection/reflection-runner.js');
try {
// Gateway mode routes by role: send the reflection tier as the key
// (job.requiredRole is 'reflection'), not the worker's model name.
const reflectionRoutingKey = llmRoutingKey({
isGateway: this.getWorkerDef().proxy === true,
role: job.requiredRole,
resolveDirectModel: () => this.model,
roleFallback: 'reflection',
});
const outcome = await runReflectionJob(
{
repo: this.repo,
config: this.config,
llmEndpoint: this.endpoint,
llmModel: reflectionRoutingKey,
// Same credential as normal task LLM calls — a key-enforcing
// gateway 401s reflection without it.
llmApiKey: this.getWorkerDef().apiKey,
},
job
);
await this.repo.updateJob(job.id, {
status: outcome === 'failed' ? 'failed' : 'succeeded',
currentActivity: null,
});
} catch (e) {
logger.error(`[reflection] runner threw job=${job.id} err=${String(e)}`);
await this.repo.updateJob(job.id, { status: 'failed', currentActivity: null });
}
}
private async handlePieceResult(
result: PieceRunResult,
job: Job,
reporter: LocalProgressReporter,
workspacePath: string,
isLocalTask: boolean,
isSubTask: boolean,
parentJobId: string | null,
): Promise<void> {
const { repo: repoName, issueNumber, id: jobId } = job;
const localTaskId = getLocalTaskId(repoName);
if (result.status === 'completed') {
if (isLocalTask) {
await this.commitLocalWorkspace(issueNumber, workspacePath);
}
await this.repo.updateJob(jobId, { status: 'succeeded', currentActivity: null });
this.enqueuePush(job, 'succeeded');
await maybeEnqueueReflection(this.repo, job, 'succeeded', this.config.reflection, this.config.provider.workers);
let resultBody = result.finalOutput;
if (resultBody) {
resultBody = ensureKeepaGraphs(resultBody);
}
await reporter.reportFinalResult('completed', resultBody);
} else if (result.status === 'waiting_human') {
if (isSubTask && parentJobId) {
await this.repo.updateJob(jobId, { status: 'waiting_human', resumeMovement: result.resumeMovement ?? null, askCount: job.askCount + 1 });
// Sub-task ASK is auto-answered below, so we don't notify on it.
reporter.reportToolUse('ASK', { question: result.finalOutput });
await this.repo.addAuditLog(jobId, 'job_ask_subtask', 'worker', { question: result.finalOutput, resumeMovement: result.resumeMovement });
try {
const answer = await this.answerSubtaskAsk(job, parentJobId, result.finalOutput);
logger.info(`[worker:${this.workerId}] answered subtask ASK for job ${jobId}: ${answer.slice(0, 100)}`);
const newJob = await this.repo.createJob({
repo: repoName,
issueNumber,
instruction: answer,
pieceName: job.pieceName,
askCount: job.askCount + 1,
resumeMovement: result.resumeMovement,
parentJobId: job.parentJobId,
subtaskDepth: job.subtaskDepth,
maxAttempts: 2,
role: job.requiredRole,
ownerId: job.ownerId,
visibility: job.visibility,
visibilityScopeOrgId: job.visibilityScopeOrgId,
});
await this.repo.updateJob(newJob.id, { worktreePath: workspacePath });
await this.repo.addAuditLog(newJob.id, 'job_queued_subtask_ask_answer', 'worker', { originalJobId: jobId, question: result.finalOutput });
} catch (askErr) {
logger.warn(`[worker:${this.workerId}] failed to answer subtask ASK, leaving as waiting_human: ${askErr}`);
}
} else {
await this.repo.updateJob(jobId, {
status: 'waiting_human',
resumeMovement: result.resumeMovement ?? null,
askCount: job.askCount + 1,
});
this.enqueuePush(job, 'waiting_human');
await reporter.reportAsk(result.finalOutput);
await this.repo.addAuditLog(jobId, 'job_ask', 'worker', {
question: result.finalOutput,
resumeMovement: result.resumeMovement,
});
}
} else if (result.status === 'waiting_subtasks') {
const subJobs = await this.repo.getSubJobs(jobId);
if (subJobs.length === 0) {
if (result.resumeMovement) {
logger.warn(`[worker:${this.workerId}] job ${jobId} waiting_subtasks but no sub-jobs exist, re-queuing to ${result.resumeMovement}`);
await this.repo.updateJob(jobId, {
status: 'queued',
resumeMovement: result.resumeMovement,
});
} else {
logger.error(`[worker:${this.workerId}] job ${jobId} waiting_subtasks with no sub-jobs and no resumeMovement, failing`);
await this.repo.updateJob(jobId, { status: 'failed' });
}
await this.repo.addAuditLog(jobId, 'job_requeued_no_subtasks', 'worker', {
resumeMovement: result.resumeMovement,
action: result.resumeMovement ? 'requeued' : 'failed',
});
} else {
await this.repo.updateJob(jobId, {
status: 'waiting_subtasks',
resumeMovement: result.resumeMovement ?? null,
});
await reporter.reportMovementStart('サブタスク待機中...');
await this.repo.addAuditLog(jobId, 'job_waiting_subtasks', 'worker', {
resumeMovement: result.resumeMovement,
});
}
} else if (result.status === 'cancelled') {
logger.info(`[worker:${this.workerId}] job ${jobId} cancelled`);
await reporter.reportFinalResult('cancelled', result.finalOutput);
} else {
const retryDisposition = await this.scheduleRetryOrFail(job, result.finalOutput, workspacePath, result.abortReason ?? null);
if (retryDisposition !== 'requeued_unhealthy') {
await reporter.reportFinalResult(result.status, result.finalOutput);
}
if (retryDisposition === 'failed') {
const outcome = result.status === 'aborted' ? 'aborted' : 'failed';
await maybeEnqueueReflection(this.repo, job, outcome, this.config.reflection, this.config.provider.workers);
}
}
// サブタスク完了時(終端ステータスのみ): 全兄弟ジョブが完了なら親ジョブを再キュー
const SUBTASK_TERMINAL = ['completed', 'error', 'aborted', 'cancelled'];
if (isSubTask && parentJobId && SUBTASK_TERMINAL.includes(result.status)) {
try {
const parentJob = await this.repo.getJob(parentJobId);
if (parentJob?.worktreePath) {
const resultDir = join(parentJob.worktreePath, 'subtasks', String(issueNumber));
mkdirSync(resultDir, { recursive: true });
writeFileSync(
join(resultDir, 'result.md'),
`# サブタスク #${issueNumber} 結果\n\nステータス: ${result.status}\n\n${result.finalOutput}\n`,
'utf-8',
);
}
const requeued = await this.repo.requeueParentJobIfAllSubtasksDone(parentJobId);
if (requeued) {
logger.info(`[worker:${this.workerId}] all sub-tasks done, re-queued parent job ${parentJobId}`);
}
} catch (subErr) {
logger.warn(`[worker:${this.workerId}] sub-task parent re-queue error: ${subErr}`);
}
}
await this.repo.addAuditLog(jobId, `job_${result.status}`, 'worker', {
movementCount: result.movementHistory.length,
abortReason: result.abortReason ?? null,
contextActionCount: result.contextActions.length,
latestContextAction: result.contextActions[result.contextActions.length - 1] ?? null,
});
}
private resolveModel(piece: PieceDef): string | undefined {
if (piece.model) {
if (this.availableModels.size === 0 || this.availableModels.has(piece.model)) {
return piece.model;
}
logger.warn(`[worker:${this.workerId}] piece model "${piece.model}" not available, falling back to ${this.model ?? '<none>'}`);
}
// If the configured model is not in available models, auto-select the first available one
if (this.model && this.availableModels.size > 0 && !this.availableModels.has(this.model)) {
const autoModel = [...this.availableModels][0]!;
logger.info(`[worker:${this.workerId}] configured model "${this.model}" not available, auto-selecting "${autoModel}"`);
return autoModel;
}
return this.model;
}
private async scheduleRetryOrFail(
job: Job,
errorMsg: string,
workspacePath?: string,
abortReason: string | null = null,
): Promise<'requeued_unhealthy' | 'retry' | 'failed'> {
const { id: jobId, attempt, maxAttempts } = job;
const isLlmConnectionFatal = /connection error:\s*fetch failed|econnrefused|enotfound|etimedout|network error/i.test(errorMsg);
if (isLlmConnectionFatal) {
this.healthy = false;
this.lastHealthError = errorMsg;
this.availableModels.clear();
await this.repo.updateWorkerNodeHealth(this.workerId, {
healthy: false,
lastError: errorMsg,
inflightJobs: this.inflight,
availableModels: [],
});
await this.repo.updateJob(jobId, {
status: 'queued',
workerId: null,
errorSummary: errorMsg,
abortReason,
nextRetryAt: null,
});
writeRetryHandoffSummary({
workspacePath: workspacePath ?? job.worktreePath,
job,
errorMsg,
nextRetryAt: null,
disposition: 'requeued_unhealthy',
});
logger.warn(`[worker:${this.workerId}] job ${jobId} requeued after LLM connection error; worker marked unhealthy`);
return 'requeued_unhealthy';
}
if (attempt < maxAttempts) {
const backoffIndex = Math.min(attempt - 1, this.config.retry.backoffSeconds.length - 1);
const backoffSec = this.config.retry.backoffSeconds[backoffIndex] ?? this.config.retry.backoffSeconds[this.config.retry.backoffSeconds.length - 1] ?? 60;
const nextRetryAt = new Date(Date.now() + backoffSec * 1000).toISOString();
await this.repo.updateJob(jobId, {
status: 'retry',
attempt: attempt + 1,
nextRetryAt,
errorSummary: errorMsg,
abortReason,
});
writeRetryHandoffSummary({
workspacePath: workspacePath ?? job.worktreePath,
job,
errorMsg,
nextRetryAt,
disposition: 'retry',
});
logger.info(`[worker:${this.workerId}] job ${jobId} scheduled for retry ${attempt + 1}/${maxAttempts} at ${nextRetryAt}`);
return 'retry';
} else {
await this.repo.updateJob(jobId, { status: 'failed', errorSummary: errorMsg, abortReason });
// V2 push: only on terminal fail. Intermediate retry attempts are
// silenced (matches V1's 4-second debounce intent).
this.enqueuePush(job, 'failed');
writeRetryHandoffSummary({
workspacePath: workspacePath ?? job.worktreePath,
job,
errorMsg,
nextRetryAt: null,
disposition: 'failed',
});
logger.info(`[worker:${this.workerId}] job ${jobId} failed permanently after ${maxAttempts} attempts`);
return 'failed';
}
}
private listDir(dirPath: string): string[] {
try {
return readdirSync(dirPath);
} catch {
return [];
}
}
private async commitLocalWorkspace(
taskId: number,
workspacePath: string,
commitMessage?: string,
): Promise<void> {
const result = await commitWorkspaceChanges({
workspacePath,
branchName: 'main',
commitMessage: commitMessage?.trim() || `agent: update task #${taskId}`,
ignoreEntries: ['input/', 'logs/'],
});
if (!result.changed) {
logger.info(`[worker:${this.workerId}] no local changes to commit for task #${taskId}`);
return;
}
if (result.committed) {
logger.info(`[worker:${this.workerId}] committed local workspace changes for task #${taskId}`);
}
if (result.pushed) {
logger.info(`[worker:${this.workerId}] pushed local workspace changes for task #${taskId}`);
}
}
}