maestro/src/worker.ts
oss-sync c526adddc2
Some checks failed
CI / build-and-test (push) Has been cancelled
sync: update from private repo (402599f)
2026-06-04 13:41:33 +00:00

1934 lines
80 KiB
TypeScript

import { Repository, Job, localTaskRepoName, type JobRole } from './db/repository.js';
import { userPiecesDir } from './user-folder/paths.js';
import { BrowserSessionRepo } from './db/browser-session-repo.js';
import { assertProfileOwner } from './engine/browser-session-auth.js';
import { initMasterKey, decryptUserDek, decryptStateBlob } from './crypto/sessions.js';
import { OpenAICompatClient } from './llm/openai-compat.js';
import { loadPiece, runPiece, PieceRunCallbacks, PieceDef, type PieceRunResult } from './engine/piece-runner.js';
import { LocalProgressReporter } from './progress/local-reporter.js';
import { buildLocalConversationContext } from './engine/local-context.js';
import { AppConfig, isExecutionWorker, type WorkerDef, type ReflectionConfig, DEFAULT_NOTES_INJECT, type NotesInjectConfig } from './config.js';
import { existsSync, mkdirSync, readdirSync, readFileSync, statSync, writeFileSync } from 'fs';
import { join } from 'path';
import { logger } from './logger.js';
import { commitWorkspaceChanges, ensureWorkspaceGitRepo } from './git/workspace-manager.js';
import { ContextManager, fetchOllamaContextLimit } from './engine/context-manager.js';
import { summarizeToolInput, type ActivityLogMetadata } from './progress/log-format.js';
import { ensureKeepaGraphs } from './engine/tools/amazon.js';
import type { McpTokenManager } from './mcp/token-manager.js';
import { mergeMcpConfig } from './mcp/config.js';
import { NotesService } from './notes/notes-service.js';
import { NotesRepository } from './notes/notes-repository.js';
import { createStickyBackendResolver } from './worker/sticky-backend.js';
import { jobEventBus } from './bridge/job-events.js';
import { normalizeToolNameForMetric } from './metrics/tool-name-allowlist.js';
const RETRY_HANDOFF_MAX_LENGTH = 8_000;
const RETRY_DIAGNOSTICS_PREVIEW_LENGTH = 1_200;
const RETRY_LESSONS_MAX_LINES = 12;
function buildTimeContextBlock(): string {
const now = new Date();
const utc = now.toISOString();
const jst = new Intl.DateTimeFormat('ja-JP', {
timeZone: 'Asia/Tokyo',
year: 'numeric',
month: '2-digit',
day: '2-digit',
hour: '2-digit',
minute: '2-digit',
second: '2-digit',
hour12: false,
weekday: 'short',
}).format(now);
return [
'## 実行時刻コンテキスト',
`- Current time (UTC): ${utc}`,
`- Current time (JST): ${jst}`,
'- 時刻依存の判断(今日/昨日/直近◯時間/最新ニュース等)は必ずこの時刻を基準に行うこと。',
'',
].join('\n');
}
function getLocalTaskId(repoName: string): number | null {
const match = /^local\/task-(\d+)$/.exec(repoName);
if (!match) return null;
return Number(match[1]);
}
function getSubTaskParentJobId(repoName: string): string | null {
const match = /^subtask\/([0-9a-f-]{36})$/.exec(repoName);
if (!match) return null;
return match[1]!;
}
/**
* Browser session を keying するための「論理タスク ID」を解決する。
*
* - local task の直接実行 → そのタスクの ID (string)
* - subtask 実行 → 親方向に最大 5 段まで walk up し、最初に見つかる
* local task の ID。subtaskDepth は config 上 2 が上限なので余裕を見て 5
* - 親が gitea issue 等 local task でないジョブの場合 → null (BrowseWeb は
* noVNC モードを使えない / 旧来の頭出しに fallback する)
*/
async function resolveSessionTaskId(
repo: Repository,
job: Job,
): Promise<{ taskId: string | undefined; userId: string | undefined }> {
const directLocalTaskId = getLocalTaskId(job.repo);
if (directLocalTaskId !== null) {
return { taskId: String(directLocalTaskId), userId: job.ownerId ?? undefined };
}
let cursor: string | null = getSubTaskParentJobId(job.repo);
let hops = 0;
while (cursor && hops < 5) {
const parent = await repo.getJob(cursor);
if (!parent) return { taskId: undefined, userId: job.ownerId ?? undefined };
const parentLocalTaskId = getLocalTaskId(parent.repo);
if (parentLocalTaskId !== null) {
return {
taskId: String(parentLocalTaskId),
// owner は親と同じはずだが、念のため fallback も用意
userId: parent.ownerId ?? job.ownerId ?? undefined,
};
}
cursor = parent.parentJobId ?? getSubTaskParentJobId(parent.repo);
hops++;
}
return { taskId: undefined, userId: job.ownerId ?? undefined };
}
function buildUiMetadataBlock(job: Job): string {
return [
'---',
`ui_profile: ${job.requiredRole}`,
`ui_output_format: ${/ui_output_format:\s*(text|markdown|json)/i.exec(job.instruction)?.[1]?.toLowerCase() ?? 'markdown'}`,
`ui_ask_policy: ${/ui_ask_policy:\s*(low|high)/i.exec(job.instruction)?.[1]?.toLowerCase() ?? 'low'}`,
`ui_priority: ${/ui_priority:\s*(low|medium|high)/i.exec(job.instruction)?.[1]?.toLowerCase() ?? 'medium'}`,
'---',
].join('\n');
}
function truncateRetryText(text: string, maxLength: number): string {
const trimmed = text.trim();
if (trimmed.length <= maxLength) return trimmed;
return `${trimmed.slice(0, maxLength)}...`;
}
function readRetryLessons(workspacePath: string): string[] {
const logPath = join(workspacePath, 'logs', 'lessons.jsonl');
if (!existsSync(logPath)) return [];
try {
return readFileSync(logPath, 'utf-8')
.split('\n')
.filter(Boolean)
.slice(-RETRY_LESSONS_MAX_LINES)
.map((line) => {
try {
const data = JSON.parse(line) as { movement?: string; lessons?: string };
const movement = data.movement ? `[${data.movement}] ` : '';
return `- ${movement}${truncateRetryText(String(data.lessons ?? ''), 500)}`;
} catch {
return `- ${truncateRetryText(line, 500)}`;
}
})
.filter((line) => line.trim() !== '-');
} catch {
return [];
}
}
function readLastRunDiagnostics(workspacePath: string): string[] {
const diagnosticsPath = join(workspacePath, 'logs', 'last-run-diagnostics.json');
if (!existsSync(diagnosticsPath)) return [];
try {
const data = JSON.parse(readFileSync(diagnosticsPath, 'utf-8')) as {
status?: string;
abortReason?: string | null;
finalOutput?: string;
movementHistory?: Array<{
name?: string;
next?: string | null;
toolsUsed?: string[];
outputPreview?: string;
}>;
contextActions?: unknown[];
};
const lines: string[] = [];
if (data.status || data.abortReason) {
lines.push(`- 前回ステータス: ${data.status ?? 'unknown'}${data.abortReason ? ` (${data.abortReason})` : ''}`);
}
for (const movement of data.movementHistory ?? []) {
const tools = movement.toolsUsed && movement.toolsUsed.length > 0
? ` tools=${movement.toolsUsed.join(',')}`
: '';
lines.push(`- movement ${movement.name ?? 'unknown'} -> ${movement.next ?? 'unknown'}${tools}`);
if (movement.outputPreview) {
lines.push(` - output: ${truncateRetryText(movement.outputPreview, 300)}`);
}
}
if (data.finalOutput) {
lines.push(`- 最終出力プレビュー: ${truncateRetryText(data.finalOutput, RETRY_DIAGNOSTICS_PREVIEW_LENGTH)}`);
}
if (data.contextActions && data.contextActions.length > 0) {
lines.push(`- context action: ${JSON.stringify(data.contextActions.slice(-3))}`);
}
return lines;
} catch {
return [];
}
}
export function buildRetryHandoffSummary(params: {
workspacePath: string;
job: Job;
errorMsg: string;
nextRetryAt?: string | null;
disposition: 'requeued_unhealthy' | 'retry' | 'failed';
}): string {
const lines: string[] = [
'# Retry Handoff',
'',
`Generated: ${new Date().toISOString()}`,
`Job: ${params.job.id}`,
`Disposition: ${params.disposition}`,
`Attempt: ${params.job.attempt}/${params.job.maxAttempts}`,
];
if (params.nextRetryAt) lines.push(`Next retry at: ${params.nextRetryAt}`);
lines.push('', '## 失敗理由', truncateRetryText(params.errorMsg, 2_000));
const diagnostics = readLastRunDiagnostics(params.workspacePath);
if (diagnostics.length > 0) {
lines.push('', '## 前回実行の要約', ...diagnostics);
}
const lessons = readRetryLessons(params.workspacePath);
if (lessons.length > 0) {
lines.push('', '## これまでの lessons', ...lessons);
}
lines.push(
'',
'## 次のエージェントへの指示',
'- 前回の失敗理由と movement の進捗を踏まえ、同じ探索や同じ失敗を繰り返さないこと。',
'- 既に完了している作業・生成済みファイル・確認済み事項は再実行前に workspace とログで確認すること。',
'- 必要な情報が不足している場合は、全体を読み直すのではなく targeted Read / Grep / Bash で範囲を絞ること。',
);
return `${truncateRetryText(lines.join('\n'), RETRY_HANDOFF_MAX_LENGTH)}\n`;
}
function writeRetryHandoffSummary(params: {
workspacePath: string | null | undefined;
job: Job;
errorMsg: string;
nextRetryAt?: string | null;
disposition: 'requeued_unhealthy' | 'retry' | 'failed';
}): void {
if (!params.workspacePath) return;
try {
const logsDir = join(params.workspacePath, 'logs');
mkdirSync(logsDir, { recursive: true });
const summary = buildRetryHandoffSummary({
workspacePath: params.workspacePath,
job: params.job,
errorMsg: params.errorMsg,
nextRetryAt: params.nextRetryAt,
disposition: params.disposition,
});
writeFileSync(join(logsDir, 'retry-summary.md'), summary, 'utf-8');
} catch (err) {
logger.warn(`[worker] failed to write retry handoff summary: ${err}`);
}
}
function buildRetryHandoffContext(workspacePath: string, job: Job): string {
if (job.attempt <= 1 && !job.errorSummary) return '';
const summaryPath = join(workspacePath, 'logs', 'retry-summary.md');
if (!existsSync(summaryPath)) return '';
try {
const summary = truncateRetryText(readFileSync(summaryPath, 'utf-8'), RETRY_HANDOFF_MAX_LENGTH);
if (!summary) return '';
return [
'## Retry 復帰用引き継ぎ',
'このジョブは前回実行からの retry / 再キューです。以下を前提に、重複作業を避けて継続してください。',
'',
summary,
].join('\n');
} catch {
return '';
}
}
export async function maybeEnqueueReflection(
repo: Repository,
job: Job,
outcome: 'succeeded' | 'failed' | 'aborted',
cfg: Pick<ReflectionConfig, 'enabled' | 'workerRequired' | 'perUserDailyBudgetTokens'>,
workers: WorkerDef[] = [],
): Promise<void> {
if (!cfg.enabled) return;
if (job.taskKind === 'reflection') return;
if (!job.ownerId) {
logger.warn(`[reflection] skip enqueue job=${job.id} reason=no_owner`);
return;
}
// worker_required enforcement: when true, at least one worker must have 'reflection' in its roles
if (cfg.workerRequired) {
const hasReflectionWorker = workers.some(
(w) => Array.isArray(w.roles) && w.roles.includes('reflection'),
);
if (!hasReflectionWorker) {
logger.warn(`[reflection] enqueue skipped reason=no_reflection_worker user=${job.ownerId}`);
return;
}
}
// Per-user daily token budget check.
// Cap=0 means "no limit" — useful for fresh installs that haven't tuned the budget yet.
const cap = cfg.perUserDailyBudgetTokens ?? 0;
if (cap > 0) {
// Compute today's start in UTC (00:00:00.000 UTC).
const now = new Date();
const todayStartMs = Date.UTC(now.getUTCFullYear(), now.getUTCMonth(), now.getUTCDate());
const metrics = repo.aggregateReflectionMetrics(job.ownerId, todayStartMs);
const spent = metrics.tokensIn + metrics.tokensOut;
if (spent >= cap) {
const spentM = (spent / 1_000_000).toFixed(1);
const capM = (cap / 1_000_000).toFixed(1);
logger.info(`[reflection] enqueue skipped reason=budget user=${job.ownerId} spent=${spentM}M cap=${capM}M`);
return;
}
}
const payload = JSON.stringify({
originalJobId: job.id,
userId: job.ownerId,
pieceName: job.pieceName,
outcome,
});
await repo.createJob({
repo: `local/reflection-${job.id}`,
issueNumber: 0,
instruction: '',
pieceName: 'reflection',
role: 'reflection',
ownerId: job.ownerId,
visibility: 'private',
taskKind: 'reflection',
payload,
} as any);
logger.info(`[reflection] enqueued original=${job.id} owner=${job.ownerId} piece=${job.pieceName} outcome=${outcome}`);
}
export class Worker {
private running = false;
private inflight = 0;
private polling = false;
private stopped = false;
private pollInterval: ReturnType<typeof setInterval> | null = null;
private healthInterval: ReturnType<typeof setInterval> | null = null;
private workerId: string;
private endpoint: string;
private model: string | undefined;
private availableModels: Set<string> = new Set();
private healthy = false;
private lastHealthError: string | null = null;
private contextLimitTokens: number = 128_000;
private mcpTokenManager: McpTokenManager | null = null;
/**
* Phase 3b: optional Prometheus metrics handle. When set, the worker
* emits jobs_total / active_jobs / job_duration_seconds in
* executeJob's start/finally, llm_calls_total via the AgentLoop
* onLLMCall callback, and tool_calls_total via the new onToolMetric
* callback in agent-loop.ts. Wired by WorkerManager after the metric
* registry exists.
*/
private workerMetrics: import('./metrics/worker-metrics.js').WorkerMetrics | null = null;
private skillCatalog: import('./engine/skills.js').SkillCatalog | null = null;
/**
* V2 Web Push notification service. Null when push is disabled via
* config or when the worker is built without one (tests). Hooks fire
* via enqueue (fire-and-forget) so a slow push service can't block job
* execution.
*
* Spec: docs/superpowers/specs/2026-05-28-browser-notifications-v2-webpush.md.
*/
private pushService: import('./push-service.js').PushService | null = null;
constructor(
workerId: string,
endpoint: string,
model: string | undefined,
private repo: Repository,
private config: AppConfig,
) {
this.workerId = workerId;
this.endpoint = endpoint;
this.model = model;
}
public setMcpTokenManager(tm: McpTokenManager | null): void {
this.mcpTokenManager = tm;
}
public setSkillCatalog(catalog: import('./engine/skills.js').SkillCatalog): void {
this.skillCatalog = catalog;
}
public setPushService(svc: import('./push-service.js').PushService | null): void {
this.pushService = svc;
}
/**
* Hot-swap the global config on a still-running worker. Used by
* WorkerManager's differential rebuild: when a config change does NOT
* touch this worker's own def (e.g. a tool size limit changed), we keep
* the worker — and any in-flight job — alive and just refresh the config
* it reads for future jobs. Def-derived values (roles, endpoint,
* maxConcurrency) are read live via getWorkerDef(), so they stay correct.
* In-flight jobs keep the settings they captured at start, by design.
*/
public updateConfig(config: AppConfig): void {
this.config = config;
}
/** Jobs currently executing in this worker's detached loops. */
public get inflightCount(): number {
return this.inflight;
}
/**
* Fire a V2 push for a job status transition. Fire-and-forget — never
* throws and never awaits the underlying queue. Skips silently when
* - push is disabled (pushService === null)
* - the job has no owner (legacy / system-issued)
* - the job is not a local task (sub-task pushes go to the parent owner;
* we still send for direct local tasks).
* Reflection jobs are also skipped — they're an internal mechanism, not
* user-facing work.
*/
private enqueuePush(
job: Job,
event: 'running' | 'succeeded' | 'failed' | 'waiting_human',
): void {
if (!this.pushService) return;
if (job.taskKind === 'reflection') return;
if (!job.ownerId) return;
const localTaskId = getLocalTaskId(job.repo);
if (localTaskId === null) return;
// Title lookup is cheap (single-row SELECT) and synchronous via the
// raw db handle. Falling back to a generic label is fine — the
// push-service uses privacy-default payloads unless the user opted
// in via include_details.
let taskTitle = `Task #${localTaskId}`;
try {
const row = this.repo.getDb()
.prepare('SELECT title FROM local_tasks WHERE id = ?')
.get(localTaskId) as { title: string | null } | undefined;
if (row?.title) taskTitle = row.title;
} catch {
// best-effort; fall through with default title
}
this.pushService.enqueue({
event,
taskId: localTaskId,
taskTitle,
pieceName: job.pieceName,
ownerId: job.ownerId,
});
}
/**
* Phase 3b: install (or remove) the Prometheus metrics handle.
* Idempotent — calling with the same handle twice is fine. Null
* clears the handle, useful when reconfiguring at runtime.
*/
public setWorkerMetrics(
metrics: import('./metrics/worker-metrics.js').WorkerMetrics | null,
): void {
this.workerMetrics = metrics;
}
private getWorkerDef(): WorkerDef {
const workerDef = this.config.provider.workers.find((worker) => worker.id === this.workerId);
if (!workerDef) {
throw new Error(`Worker config not found: ${this.workerId}`);
}
return workerDef;
}
private getSupportedRoles(): string[] {
return this.getWorkerDef().roles ?? ['auto', 'fast', 'quality'];
}
private getMaxConcurrency(): number {
return Math.max(1, this.getWorkerDef().maxConcurrency ?? 1);
}
async initialize(): Promise<boolean> {
const workerDef = this.getWorkerDef();
const enabled = workerDef.enabled !== false;
if (!enabled) {
await this.repo.upsertWorkerNode({
workerId: this.workerId,
endpoint: this.endpoint,
enabled: false,
healthy: false,
roles: this.getSupportedRoles(),
availableModels: [],
inflightJobs: this.inflight,
maxConcurrency: this.getMaxConcurrency(),
lastError: 'disabled by config',
});
this.healthy = false;
this.lastHealthError = 'disabled by config';
logger.info(`[worker:${this.workerId}] disabled by config; skipping polling`);
return false;
}
try {
const ollamaBase = this.endpoint.replace(/\/v1\/?$/, '');
// Try Ollama /api/tags first, then fall back to OpenAI-compatible /v1/models.
//
// Forward `Authorization: Bearer <apiKey>` when the worker has one
// configured. The discovery probes (/api/tags / /v1/models) were
// previously sent un-authenticated, which caused 30s-interval 401
// floods against AAO Gateway endpoints (gateway requires Bearer auth
// on /v1/models) — the worker then logged "failed to fetch model
// list" indefinitely and `availableModels` stayed empty.
// Discovered during 2026-05-20 dogfooding on production aao.
const apiKey = this.getWorkerDef().apiKey;
const init: RequestInit = apiKey
? { headers: { Authorization: `Bearer ${apiKey}` } }
: {};
let models: string[] = [];
const ollamaRes = await fetch(`${ollamaBase}/api/tags`, init).catch(() => null);
if (ollamaRes?.ok) {
const data = await ollamaRes.json() as { models?: Array<{ name: string }> };
models = (data.models ?? []).map((m: { name: string }) => m.name);
} else {
const openaiBase = this.endpoint.replace(/\/?$/, '');
const openaiRes = await fetch(`${openaiBase}/models`, init).catch(() => null);
if (openaiRes?.ok) {
const data = await openaiRes.json() as { data?: Array<{ id: string }> };
models = (data.data ?? []).map((m: { id: string }) => m.id);
} else if (this.model) {
throw new Error(`failed to fetch model list from both /api/tags and /v1/models`);
}
// llama-server compat: model 未設定なら model 一覧 API は必須ではないので空配列で続行。
}
this.availableModels = new Set(models);
await this.repo.upsertWorkerNode({
workerId: this.workerId,
endpoint: this.endpoint,
enabled: true,
healthy: true,
roles: this.getSupportedRoles(),
availableModels: [...this.availableModels],
inflightJobs: this.inflight,
maxConcurrency: this.getMaxConcurrency(),
lastError: null,
});
if (!this.healthy || this.lastHealthError !== null) {
logger.info(`[worker:${this.workerId}] available models: ${[...this.availableModels].join(', ')}`);
}
this.healthy = true;
this.lastHealthError = null;
// Auto-detect context limit from Ollama if not configured
if (!this.config.context?.limitTokens) {
if (this.model) {
const contextLimit = await fetchOllamaContextLimit(this.endpoint, this.model);
if (contextLimit !== this.contextLimitTokens) {
logger.info(`[worker:${this.workerId}] context limit updated: ${contextLimit} tokens`);
this.contextLimitTokens = contextLimit;
}
} else {
// No model configured — try llama.cpp /props endpoint for context limit
const contextLimit = await fetchOllamaContextLimit(this.endpoint, '');
if (contextLimit !== this.contextLimitTokens) {
logger.info(`[worker:${this.workerId}] context limit updated: ${contextLimit} tokens`);
this.contextLimitTokens = contextLimit;
}
}
} else {
this.contextLimitTokens = this.config.context.limitTokens;
}
return true;
} catch (e) {
const errorMessage = e instanceof Error ? e.message : String(e);
this.availableModels.clear();
await this.repo.upsertWorkerNode({
workerId: this.workerId,
endpoint: this.endpoint,
enabled: true,
healthy: false,
roles: this.getSupportedRoles(),
availableModels: [],
inflightJobs: this.inflight,
maxConcurrency: this.getMaxConcurrency(),
lastError: errorMessage,
});
if (this.healthy || this.lastHealthError !== errorMessage) {
logger.warn(`[worker:${this.workerId}] failed to fetch model list: ${e}`);
}
this.healthy = false;
this.lastHealthError = errorMessage;
return false;
}
}
start(): void {
if (this.running) return;
this.running = true;
logger.info(`[worker:${this.workerId}] started`);
const tick = () => void this.processNext();
tick();
const baseInterval = 5000;
const jitter = Math.floor(Math.random() * 2000);
this.pollInterval = setInterval(tick, baseInterval + jitter);
const healthIntervalSeconds = Math.max(10, this.getWorkerDef().healthcheckIntervalSeconds ?? 30);
this.healthInterval = setInterval(() => void this.initialize(), healthIntervalSeconds * 1000);
}
stop(): void {
this.running = false;
this.stopped = true;
if (this.pollInterval) {
clearInterval(this.pollInterval);
this.pollInterval = null;
}
if (this.healthInterval) {
clearInterval(this.healthInterval);
this.healthInterval = null;
}
logger.info(`[worker:${this.workerId}] stopped`);
}
async waitForCompletion(timeoutMs = 30000): Promise<boolean> {
if (this.inflight === 0) return true;
const start = Date.now();
while (this.inflight > 0 && (Date.now() - start) < timeoutMs) {
await new Promise(resolve => setTimeout(resolve, 500));
}
return this.inflight === 0;
}
get id(): string { return this.workerId; }
private async processNext(): Promise<void> {
if (!isExecutionWorker(this.getWorkerDef()) || !this.running || this.stopped) return;
if (this.polling) return; // claim loop is single-flight (prevents over-claim)
this.polling = true;
try {
// スタックジョブ watchdog: LLM タイムアウトの2倍を閾値にする
try {
const staleMinutes = Math.max(20, (this.config.provider.timeoutMinutes ?? 10) * 2);
this.repo.recoverStuckRunningJobs(staleMinutes);
} catch (err) {
logger.warn(`[worker:${this.workerId}] recoverStuckRunningJobs error: ${err}`);
}
const available = await this.initialize();
if (!available) return;
const max = this.getMaxConcurrency();
while (this.inflight < max && this.running && !this.stopped) {
// リトライジョブを優先
const job = await this.repo.claimNextRetryJob(this.workerId)
?? await this.repo.claimNextJob(this.workerId);
if (!job) break;
this.inflight++;
void this.runJobTracked(job); // 並行実行: await しない
}
} catch (err) {
logger.error(`[worker:${this.workerId}] processNext error: ${err}`);
} finally {
this.polling = false;
}
}
/** Run one job to completion, always restoring the inflight counter. */
private async runJobTracked(job: Job): Promise<void> {
try {
await this.executeJob(job);
} catch (err) {
logger.error(`[worker:${this.workerId}] runJobTracked error job=${job.id}: ${err}`);
} finally {
this.inflight--;
await this.reportInflight();
}
}
/** Push the live inflight count (and current health) to the worker_nodes row. */
private async reportInflight(): Promise<void> {
try {
await this.repo.updateWorkerNodeHealth(this.workerId, {
healthy: this.healthy,
lastError: this.lastHealthError,
inflightJobs: this.inflight,
availableModels: [...this.availableModels],
});
} catch (err) {
logger.warn(`[worker:${this.workerId}] reportInflight failed: ${err}`);
}
}
private supportsRole(role: string): boolean {
return this.getSupportedRoles().includes(role);
}
private buildLogMetadata(role: JobRole): ActivityLogMetadata {
return { workerId: this.workerId, mode: role };
}
/**
* サブタスクの ASK に対して、親ジョブの文脈を使って LLM に回答を生成させる
*/
private async answerSubtaskAsk(subtaskJob: Job, parentJobId: string, question: string): Promise<string> {
const parentJob = await this.repo.getJob(parentJobId);
const parentInstruction = parentJob?.instruction ?? '(不明)';
const resolvedModel = this.model;
const timeoutMs = (this.config.provider.timeoutMinutes ?? 10) * 60 * 1000;
const workerDefForAnswer = this.getWorkerDef();
const llmClient = new OpenAICompatClient(
this.endpoint,
resolvedModel,
workerDefForAnswer.apiKey,
this.config.provider.retry,
timeoutMs,
this.contextLimitTokens,
this.config.safety?.promptGuardRatio,
undefined,
{ proxy: workerDefForAnswer.proxy === true },
);
const messages: import('./llm/openai-compat.js').Message[] = [
{
role: 'system',
content: [
'あなたはタスクを管理する親エージェントです。',
'サブタスクがユーザーに確認を求めていますが、あなたが代わりに回答してください。',
'元の依頼の意図を汲み取り、サブタスクが作業を継続できるよう具体的に回答してください。',
'回答のみを簡潔に返してください。',
].join('\n'),
},
{
role: 'user',
content: [
'## 元の依頼',
parentInstruction,
'',
'## サブタスクの指示',
subtaskJob.instruction,
'',
'## サブタスクからの質問',
question,
].join('\n'),
},
];
let answer = '';
for await (const event of llmClient.chat(messages)) {
if (event.type === 'text') {
answer += event.text;
} else if (event.type === 'error') {
throw new Error(`LLM error: ${event.error}`);
}
}
return answer.trim() || '特に制約はありません。あなたの判断で進めてください。';
}
private writeRunDiagnostics(workspacePath: string, result: PieceRunResult): void {
try {
const logsDir = join(workspacePath, 'logs');
mkdirSync(logsDir, { recursive: true });
const diagnostics = {
generatedAt: new Date().toISOString(),
status: result.status,
abortReason: result.abortReason ?? null,
finalOutput: result.finalOutput,
movementHistory: result.movementHistory.map((entry) => ({
name: entry.name,
next: entry.result.next,
toolsUsed: entry.result.toolsUsed,
outputPreview: entry.result.output.slice(0, 600),
})),
contextActions: result.contextActions,
};
writeFileSync(join(logsDir, 'last-run-diagnostics.json'), `${JSON.stringify(diagnostics, null, 2)}\n`, 'utf-8');
} catch (err) {
logger.warn(`[worker:${this.workerId}] failed to write run diagnostics: ${err}`);
}
}
/**
* Resolve the workspace path for a job and ensure the directory tree
* (input/output/logs + a git repo) exists. Persists the resolved path
* back to the job + local-task records so downstream consumers can find
* the workspace.
*
* Throws on jobs that are neither local tasks nor sub-tasks (the orchestrator
* doesn't currently spin its own workspaces for raw repo/issue jobs).
*/
private async prepareJobWorkspace(
job: Job,
isLocalTask: boolean,
isSubTask: boolean,
localTaskId: number | null,
): Promise<string> {
const { repo: repoName, issueNumber, id: jobId } = job;
const workspacePath = isLocalTask
? join(this.config.worktreeDir, 'local', String(localTaskId))
: isSubTask
? (job.worktreePath ?? join(this.config.worktreeDir, 'subtasks', jobId))
: join(this.config.worktreeDir, repoName, String(issueNumber));
if (isLocalTask) {
mkdirSync(workspacePath, { recursive: true });
mkdirSync(join(workspacePath, 'input'), { recursive: true });
mkdirSync(join(workspacePath, 'output'), { recursive: true });
mkdirSync(join(workspacePath, 'logs'), { recursive: true });
await ensureWorkspaceGitRepo(workspacePath);
if (localTaskId !== null) {
await this.repo.updateLocalTask(localTaskId, { workspacePath });
}
} else if (isSubTask) {
// SpawnSubTask 経由で worktreePath が設定されている前提
if (!job.worktreePath) {
throw new Error(`Sub-task job ${jobId} has no worktreePath set`);
}
mkdirSync(job.worktreePath, { recursive: true });
mkdirSync(join(job.worktreePath, 'output'), { recursive: true });
mkdirSync(join(job.worktreePath, 'logs'), { recursive: true });
await ensureWorkspaceGitRepo(job.worktreePath);
} else {
throw new Error(`Unsupported job type: repo="${repoName}" is neither a local task nor a sub-task`);
}
await this.repo.updateJob(jobId, { worktreePath: workspacePath });
return workspacePath;
}
/**
* Run the two pre-execution gates: role capability check, issue lock.
* On failure, requeue the job (so another worker can pick it up) and
* return false so executeJob returns early.
*/
private async acquireJobOrRequeue(job: Job): Promise<boolean> {
const { repo: repoName, issueNumber, id: jobId } = job;
if (!this.supportsRole(job.requiredRole)) {
await this.repo.updateJob(jobId, { status: 'queued', workerId: null });
await this.repo.addAuditLog(jobId, 'job_requeued_capability_mismatch', 'worker', {
workerId: this.workerId,
requiredRole: job.requiredRole,
});
logger.info(`[worker:${this.workerId}] requeued job ${jobId} due to role mismatch (role=${job.requiredRole})`);
return false;
}
const locked = await this.repo.lockIssue(repoName, issueNumber, jobId);
if (!locked) {
await this.repo.updateJob(jobId, { status: 'queued', workerId: null });
await this.repo.addAuditLog(jobId, 'job_requeued_issue_locked', 'worker', {
workerId: this.workerId,
});
logger.info(`[worker:${this.workerId}] job ${jobId}: issue ${repoName}#${issueNumber} already locked, skipping`);
return false;
}
return true;
}
private async executeJob(job: Job): Promise<void> {
const { repo: repoName, issueNumber, id: jobId } = job;
const localTaskId = getLocalTaskId(repoName);
const isLocalTask = localTaskId !== null;
const parentJobId = getSubTaskParentJobId(repoName);
const isSubTask = parentJobId !== null;
const logMetadata = this.buildLogMetadata(job.requiredRole);
if (!(await this.acquireJobOrRequeue(job))) return;
// Phase 3b: job-lifecycle metrics. Inc active_jobs at start; capture
// a terminal status + duration in the finally block. `profile` maps
// to the assigned required role (the multi-profile / multi-piece
// operator-facing dimension).
const metricPiece = job.pieceName ?? 'unknown';
const metricProfile = job.requiredRole ?? 'unknown';
const jobStartedAtMs = Date.now();
let metricFinalStatus: 'succeeded' | 'failed' | 'aborted' | 'cancelled' | 'waiting_human' | 'error' = 'error';
if (this.workerMetrics) {
try {
this.workerMetrics.activeJobs.labels({ piece: metricPiece, profile: metricProfile }).inc();
} catch { /* metrics never affect business logic */ }
}
await this.repo.updateWorkerNodeHealth(this.workerId, {
healthy: this.healthy,
lastError: this.lastHealthError,
inflightJobs: this.inflight,
availableModels: [...this.availableModels],
});
// claimNextJob がすでに status = 'running' にセット済み
await this.repo.addAuditLog(jobId, 'job_started', 'worker', {});
// V2 push: notify on the first time a job transitions queued→running.
// Retry runs are intentionally silent — V1's 4s debounce relied on this
// and we keep the same UX (one running notification, not one per retry).
if (job.attempt === 1) {
this.enqueuePush(job, 'running');
}
// Reflection jobs bypass workspace preparation and the agent / LLM loop.
// task_kind='agent' (default) keeps the pre-existing piece-runner path.
if (job.taskKind === 'reflection') {
try {
await this.handleReflectionJob(job);
} finally {
await this.repo.updateWorkerNodeHealth(this.workerId, {
healthy: this.healthy,
lastError: this.lastHealthError,
inflightJobs: this.inflight,
availableModels: [...this.availableModels],
});
await this.repo.unlockIssue(repoName, issueNumber);
}
return;
}
const workspacePath = await this.prepareJobWorkspace(job, isLocalTask, isSubTask, localTaskId);
// 進捗レポーター
// ローカルタスク・サブタスクともに activity.log を書き出す
// サブタスクでは isSubTask=true を渡し、DB コメント書き込みをスキップする
const reporter = new LocalProgressReporter(this.repo, localTaskId ?? issueNumber, workspacePath, logMetadata, isSubTask);
// 会話コンテキストの組み立て
let enrichedInstruction = `${buildTimeContextBlock()}${job.instruction}`;
if (isLocalTask) {
try {
const comments = await this.repo.listLocalTaskComments(localTaskId);
const outputFiles = this.listDir(join(workspacePath, 'output'));
const inputFiles = this.listDir(join(workspacePath, 'input'));
const contextBody = buildLocalConversationContext({
comments,
jobInstruction: job.instruction,
inputFiles,
outputFiles,
});
enrichedInstruction = `${buildTimeContextBlock()}${contextBody}`;
} catch (err) {
logger.warn(`[worker:${this.workerId}] failed to build local context: ${err}`);
}
}
const retryHandoffContext = buildRetryHandoffContext(workspacePath, job);
if (retryHandoffContext) {
enrichedInstruction = `${enrichedInstruction}\n\n${retryHandoffContext}`;
}
// watchdog 誤検知防止: runPiece 実行中に updated_at を定期更新
let heartbeatTimer: ReturnType<typeof setInterval> | undefined;
try {
// Piece 読み込み: per-user カスタムディレクトリ → global カスタムディレクトリ → builtin の順に探索
// No-auth jobs (ownerId null) resolve pieces from data/users/local/pieces, matching
// where no-auth POST now writes (LOCAL_OWNER='local' in pieces-api.ts).
const userFolderRoot = this.config.userFolderRoot ?? './data/users';
const ownerForPieces = job.ownerId ?? 'local';
const customPieceDirs = [
userPiecesDir(userFolderRoot, ownerForPieces),
this.config.customPiecesDir,
].filter((d): d is string => !!d);
logger.info(`[worker:${this.workerId}] job ${jobId} loadPiece piece=${job.pieceName} customDirs=[${customPieceDirs.join(', ') || 'none'}] piecesDir=pieces`);
const piece = loadPiece(job.pieceName, 'pieces', customPieceDirs);
if (
piece.model &&
this.availableModels.size > 0 &&
!this.availableModels.has(piece.model) &&
this.model !== piece.model
) {
await this.repo.updateJob(jobId, {
status: 'queued',
workerId: null,
errorSummary: `Required model ${piece.model} is not available on ${this.workerId}`,
});
await this.repo.addAuditLog(jobId, 'job_requeued_model_mismatch', 'worker', {
workerId: this.workerId,
requiredModel: piece.model,
availableModels: [...this.availableModels],
});
logger.info(`[worker:${this.workerId}] requeued job ${jobId} due to model mismatch (${piece.model})`);
return;
}
// MCP 認証ゲート: piece.required_mcp に記載されたサーバーのトークンがなければ park
const missingMcp = (piece.required_mcp ?? []).filter(
(serverId) => !this.mcpTokenManager || !this.mcpTokenManager.hasToken(job.ownerId ?? '', serverId),
);
if (missingMcp.length > 0) {
await this.repo.updateJob(jobId, {
status: 'waiting_human',
waitReason: 'mcp_auth_required',
resumeMovement: piece.initial_movement ?? null,
});
if (localTaskId !== null) {
await this.repo.addLocalTaskComment(
localTaskId,
'system',
`この piece は MCP サーバー「${missingMcp.join(', ')}」との連携が必要です。Settings → MCP 接続から連携してください。`,
'event',
);
}
logger.info(`[worker:${this.workerId}] mcp gate parked job=${jobId} missing=${missingMcp.join(',')}`);
return;
}
// Piece のモデル指定を解決
const resolvedModel = this.resolveModel(piece);
const timeoutMs = (this.config.provider.timeoutMinutes ?? 10) * 60 * 1000;
const workerDefForLlm = this.getWorkerDef();
const isProxyWorker = workerDefForLlm.proxy === true;
const llmClient = new OpenAICompatClient(
this.endpoint,
resolvedModel,
workerDefForLlm.apiKey,
this.config.provider.retry,
timeoutMs,
this.contextLimitTokens,
this.config.safety?.promptGuardRatio,
(line) => reporter.reportPromptPreflight(line),
{ proxy: isProxyWorker },
);
// ASK 再開の場合、resume_movement を使用
const pieceOptions = {
resumeMovement: job.resumeMovement ?? undefined,
askCount: job.askCount,
maxAskPerJob: this.config.ask.maxPerJob,
checkInterjections: isLocalTask && localTaskId !== null && !isSubTask
? async (movementName: string) => {
const comments = await this.repo.getUninjectedComments(localTaskId);
if (comments.length === 0) return [];
const injected = comments.map(c => ({ id: c.id, body: c.body }));
this.repo.markCommentsInjected(injected.map(c => c.id));
reporter.reportInterjectionAck(injected, movementName);
return injected;
}
: undefined,
spawnSubTask: job.subtaskDepth < this.config.subtasks.maxDepth
? async (params: { title: string; instruction: string; piece?: string }) => {
const subJobs = await this.repo.getSubJobs(jobId);
const subtaskIndex = subJobs.length + 1;
if (subJobs.length >= this.config.subtasks.maxPerParent) {
throw new Error(`サブタスク上限 (${this.config.subtasks.maxPerParent}) に達しました。これ以上のサブタスクは作成できません。`);
}
const subtaskWorkspace = join(workspacePath, 'subtasks', String(subtaskIndex));
mkdirSync(subtaskWorkspace, { recursive: true });
mkdirSync(join(subtaskWorkspace, 'output'), { recursive: true });
mkdirSync(join(subtaskWorkspace, 'logs'), { recursive: true });
// 親ジョブの role を継承
const subJobInstruction = [
`ui_profile: ${job.requiredRole}`,
'',
`# ${params.title}`,
'',
params.instruction,
].join('\n');
const subJob = await this.repo.createJob({
repo: `subtask/${jobId}`,
issueNumber: subtaskIndex,
instruction: subJobInstruction,
pieceName: params.piece ?? 'general',
parentJobId: jobId,
subtaskDepth: job.subtaskDepth + 1,
maxAttempts: 2,
role: job.requiredRole,
ownerId: job.ownerId,
visibility: job.visibility,
visibilityScopeOrgId: job.visibilityScopeOrgId,
});
await this.repo.updateJob(subJob.id, { worktreePath: subtaskWorkspace });
logger.info(`[worker:${this.workerId}] spawned sub-task #${subtaskIndex} depth=${job.subtaskDepth + 1} job=${subJob.id}`);
return { jobId: subJob.id, subtaskIndex, workspacePath: subtaskWorkspace };
}
: undefined,
};
const callbacks = this.buildPieceCallbacks(
jobId,
reporter,
isLocalTask,
localTaskId,
workspacePath,
// Seed the sticky-backend guard with whatever was already persisted
// for this job (e.g. on retry / resume from ASK). Only matters for
// proxy workers; direct workers never produce a backend event.
isProxyWorker ? (job.lastBackendId ?? null) : null,
);
// 開始コメント
await reporter.reportMovementStart(`${piece.name} タスク開始`);
// キャンセル用 AbortController
const jobAbortController = new AbortController();
// キャンセルチェック: DB のジョブ状態が 'cancelled' になっていたら中断する
const cancelCheck = (): boolean => {
const isCancelled = this.repo.getJobStatusSync(jobId) === 'cancelled';
if (isCancelled) {
jobAbortController.abort();
}
return isCancelled;
};
// ContextManager 初期化
const contextManager = new ContextManager(this.config.context ?? {});
contextManager.setContextLimit(this.contextLimitTokens);
// Piece 実行(ハートビート開始: 5分ごとに updated_at を更新)
heartbeatTimer = setInterval(() => {
try { this.repo.touchJobUpdatedAt(jobId); } catch { /* ignore */ }
}, 5 * 60 * 1000);
// VLM 対応: worker の vlm=true なら vision 設定を worker 自身の endpoint/model で上書き
const workerDef = this.getWorkerDef();
const toolsConfig = workerDef.vlm
? { ...this.config.tools, visionBaseUrl: this.endpoint, visionModel: this.model }
: this.config.tools;
logger.info(`[worker:${this.workerId}] job ${jobId} runPiece start`);
// Browser session keying: resolve the session-task identity for the
// ToolContext. Threaded into BrowseWeb / InteractiveBrowse so each
// task gets its own noVNC session and the Captcha Pool stays
// separate from per-task sessions. Subtasks walk up to find the
// root local task ID.
const sessionIdentity = await resolveSessionTaskId(this.repo, job);
// ── Browser session profile binding ─────────────────────────────
// If this job is bound to a browser_session_profile, decrypt the
// captured Playwright storageState and pass it into runPiece so
// BrowseWeb can inject it into BrowserContext. Owner-mismatch and
// expired-profile checks fail-fast before the agent loop starts.
let browserSessionState: object | undefined;
let browserSessionProfileId: number | undefined;
let browserSessionProfile:
| { loggedInSelector: string | null; loginUrlPatterns: string[] }
| undefined;
let onAuthExpired:
| ((profileId: number, reason: string) => void)
| undefined;
if (job.browserSessionProfileId) {
const sessRepo = new BrowserSessionRepo(this.repo.getDb());
const profile = sessRepo.getProfileByIdUnsafe(job.browserSessionProfileId);
if (!profile) {
sessRepo.audit({
actorUserId: job.ownerId ?? null,
ownerId: null,
profileId: job.browserSessionProfileId,
action: 'use',
result: 'error',
reason: 'profile not found',
jobId: job.id,
});
throw new Error(`Browser session profile ${job.browserSessionProfileId} not found`);
}
// Fail-closed owner check: a job with null/missing ownerId must not
// be allowed to decrypt any profile, even if the profile id would
// otherwise resolve. Helper audits + throws on rejection. Extracted
// to src/engine/browser-session-auth.ts so the contract is unit
// tested in isolation from the Worker class.
assertProfileOwner(profile, job, sessRepo);
if (profile.status !== 'active' || !profile.encryptedStateBlob) {
sessRepo.audit({
actorUserId: job.ownerId, ownerId: profile.ownerId, profileId: profile.id,
action: 'use', result: 'error', reason: `status=${profile.status}`, jobId: job.id,
});
throw new Error(`AUTH_SESSION_EXPIRED: profile ${profile.id} status=${profile.status}`);
}
const masterKeyPath = this.config.secrets?.masterKeyPath ?? './data/secrets/master.key';
const master = initMasterKey(masterKeyPath);
const encDek = sessRepo.getUserDek(profile.ownerId);
if (!encDek) {
sessRepo.audit({
actorUserId: job.ownerId,
ownerId: profile.ownerId,
profileId: profile.id,
action: 'decrypt',
result: 'error',
reason: 'user DEK missing',
jobId: job.id,
});
throw new Error('user DEK missing for browser session profile');
}
let dek: Buffer;
try {
dek = decryptUserDek(master, encDek);
} catch (e) {
sessRepo.audit({
actorUserId: job.ownerId,
ownerId: profile.ownerId,
profileId: profile.id,
action: 'decrypt',
result: 'error',
reason: `dek decrypt failed: ${(e as Error).message}`,
jobId: job.id,
});
throw e;
}
let stateJson: string;
try {
stateJson = decryptStateBlob(dek, profile.encryptedStateBlob);
} catch (e) {
sessRepo.audit({
actorUserId: job.ownerId,
ownerId: profile.ownerId,
profileId: profile.id,
action: 'decrypt',
result: 'error',
reason: `state decrypt failed: ${(e as Error).message}`,
jobId: job.id,
});
throw e;
}
browserSessionState = JSON.parse(stateJson) as object;
browserSessionProfileId = profile.id;
browserSessionProfile = {
loggedInSelector: profile.loggedInSelector,
loginUrlPatterns: profile.loginUrlPatterns,
};
onAuthExpired = (pid, reason) => {
sessRepo.markProfileStatus(pid, 'expired', reason);
sessRepo.audit({
actorUserId: job.ownerId, ownerId: profile.ownerId, profileId: pid,
action: 'expire', result: 'success', reason, jobId: job.id,
});
// Best-effort task-level notification. Subtask jobs and
// gitea-issue jobs may not have a numeric local_task id.
if (localTaskId !== null) {
this.repo.addLocalTaskComment(
localTaskId,
'agent',
`⚠️ Browser session "${profile.label}" expired: ${reason}. Re-login from Settings → Browser Sessions.`,
'progress',
).catch(() => { /* ignore — comment posting is best-effort */ });
}
};
sessRepo.audit({
actorUserId: job.ownerId, ownerId: profile.ownerId, profileId: profile.id,
action: 'use', result: 'success', jobId: job.id,
});
sessRepo.touchUsed(profile.id);
}
// Piece handoff: when this job continues an earlier one in the same
// local_task, agent-loop injects a "this is a continuation of piece X"
// block into the system prompt. We resolve the prev piece name + the
// most recent agent result/ask comment as the LLM-visible carry-over.
let handoffContext: import('./engine/agent-loop.js').HandoffContext | undefined;
if (job.continuedFromJobId && isLocalTask && localTaskId !== null) {
const prevJob = await this.repo.getJob(job.continuedFromJobId);
if (prevJob) {
const prevResultComment = await this.repo.getLatestResultComment(localTaskId);
handoffContext = {
prevPiece: prevJob.pieceName,
prevResult: prevResultComment?.body ?? null,
};
} else {
logger.warn(`[worker:${this.workerId}] continued_from_job_id=${job.continuedFromJobId} not found for job ${jobId}; skipping handoff context`);
}
}
// Shared-knowledge notes: construct once per job, forwarded into
// ToolContext so agent-loop can inject "## Subscribed Notes" into
// the system prompt. Only active when the job has a known owner.
let notesService: NotesService | undefined;
let notesInjectConfig: NotesInjectConfig | undefined;
let notesUserOrgIds: string[] | undefined;
let notesUserRole: 'admin' | 'user' | undefined;
if (job.ownerId) {
try {
const notesRepo = new NotesRepository(this.repo.getDb());
const userFolderRoot = this.config.userFolderRoot ?? './data/users';
notesService = new NotesService({
db: this.repo.getDb(),
repo: notesRepo,
userFolderRoot,
getUserOrgIds: (uid) => this.repo.listUserGiteaOrgs(uid).map((o) => o.orgId),
audit: (action, actor, target) => {
try {
this.repo.addAuditLog(jobId, action, actor, { target });
} catch (err) {
logger.warn(`[notes-audit] failed: ${(err as Error).message}`);
}
},
});
const cfgNotes = this.config.notes?.inject ?? {};
notesInjectConfig = {
perNoteMaxKb: cfgNotes.perNoteMaxKb ?? DEFAULT_NOTES_INJECT.perNoteMaxKb,
totalMaxKb: cfgNotes.totalMaxKb ?? DEFAULT_NOTES_INJECT.totalMaxKb,
overBudgetStrategy: cfgNotes.overBudgetStrategy ?? DEFAULT_NOTES_INJECT.overBudgetStrategy,
};
notesUserOrgIds = this.repo.listUserGiteaOrgs(job.ownerId).map((o) => o.orgId);
const ownerRow = this.repo.getUserById(job.ownerId);
notesUserRole = ownerRow?.role === 'admin' ? 'admin' : 'user';
} catch (err) {
logger.warn(`[worker:${this.workerId}] job ${jobId} notes setup failed: ${(err as Error).message}`);
}
}
// Parse per-task options from job payload (e.g. { options: { mcpDisabled, skillsDisabled } }).
let jobPayloadOptions: Record<string, unknown> = {};
if (job.payload) {
try {
const parsed = JSON.parse(job.payload) as Record<string, unknown>;
if (parsed?.options && typeof parsed.options === 'object' && !Array.isArray(parsed.options)) {
jobPayloadOptions = parsed.options as Record<string, unknown>;
}
} catch {
logger.warn(`[worker:${this.workerId}] job ${jobId} failed to parse payload JSON`);
}
}
const mcpDisabled = jobPayloadOptions.mcpDisabled === true;
const skillsDisabled = jobPayloadOptions.skillsDisabled === true;
const result = await runPiece(piece, enrichedInstruction, llmClient, workspacePath, callbacks, toolsConfig, {
...pieceOptions,
cancelCheck,
abortController: jobAbortController,
safetyConfig: this.config.safety,
searchFilter: this.config.searchFilter,
customPiecesDir: customPieceDirs,
contextManager,
vlmEnabled: workerDef.vlm === true,
jobId, // Phase 5: subtask handoff parent identity
handoffContext,
// Phase 5 PR2: when this run IS a subtask, pass parent identity +
// child workspace path so the runner emits a memory-delta.json on
// completion. Subtask workspaces follow `<parent>/subtasks/<N>`
// where N is the subtask job's issueNumber.
parentJobId: isSubTask && parentJobId ? parentJobId : undefined,
childWorkspaceRelative: isSubTask ? `subtasks/${issueNumber}` : undefined,
// Mission Brief: only wire IO when this run is bound to a local
// task (the brief is per-LocalTask, not per-job). Subtask runs
// and gitea-issue runs leave it unset → MissionUpdate degrades
// to a no-op and the system prompt MISSION block is skipped.
missionBrief: isLocalTask && localTaskId !== null && !isSubTask
? this.repo.makeMissionBriefIO(localTaskId)
: undefined,
taskId: sessionIdentity.taskId,
userId: sessionIdentity.userId,
browserSessionState,
browserSessionProfileId,
browserSessionProfile,
onAuthExpired,
ownerId: job.ownerId,
mcpConfig: mergeMcpConfig(this.config.mcp),
notesService,
notesInjectConfig,
notesUserOrgIds,
notesUserRole,
skillCatalog: this.skillCatalog ?? undefined,
mcpDisabled,
skillsDisabled,
});
logger.info(`[worker:${this.workerId}] job ${jobId} runPiece done: status=${result.status}`);
this.writeRunDiagnostics(workspacePath, result);
await this.handlePieceResult(result, job, reporter, workspacePath, isLocalTask, isSubTask, parentJobId);
// Phase 3b: capture the terminal status for the jobs_total label.
// result.status uses piece-runner's own enum
// ('completed'|'aborted'|'error'|'waiting_human'|'waiting_subtasks'|'cancelled'); map to the
// metric enum (waiting_subtasks stays "succeeded" for the metric
// because the job pauses cleanly — not a failure).
switch (result.status) {
case 'completed': metricFinalStatus = 'succeeded'; break;
case 'aborted': metricFinalStatus = 'aborted'; break;
case 'cancelled': metricFinalStatus = 'cancelled'; break;
case 'waiting_human': metricFinalStatus = 'waiting_human'; break;
case 'waiting_subtasks': metricFinalStatus = 'succeeded'; break;
case 'error':
default: metricFinalStatus = 'failed'; break;
}
} catch (err) {
const errorMsg = err instanceof Error ? err.message : String(err);
const errorStack = err instanceof Error && err.stack ? err.stack : '(no stack)';
if (errorMsg.includes('Piece not found')) {
logger.error(`[worker:${this.workerId}] job ${jobId} loadPiece failed piece=${job.pieceName} customPiecesDir=${this.config.customPiecesDir ?? 'none'} error=${errorMsg}`);
}
logger.error(`[worker:${this.workerId}] job ${jobId} failed: ${errorMsg}`);
// Always log the stack so opaque errors (e.g. SqliteError: FOREIGN KEY
// constraint failed) can be traced to the offending insert/update.
logger.error(`[worker:${this.workerId}] job ${jobId} stack: ${errorStack}`);
const retryDisposition = await this.scheduleRetryOrFail(job, errorMsg, workspacePath, 'worker_exception');
if (retryDisposition !== 'requeued_unhealthy') {
await reporter.reportError(errorMsg);
}
await this.repo.addAuditLog(jobId, 'job_error', 'worker', { error: errorMsg });
} finally {
if (heartbeatTimer) clearInterval(heartbeatTimer);
// Phase 3b: emit job lifecycle counters + duration histogram. The
// active gauge always decrements (matching the inc at start) so a
// process restart can't leak active_jobs > 0 forever.
if (this.workerMetrics) {
try {
this.workerMetrics.activeJobs.labels({ piece: metricPiece, profile: metricProfile }).dec();
this.workerMetrics.jobsTotal.labels({ piece: metricPiece, status: metricFinalStatus, profile: metricProfile }).inc();
this.workerMetrics.jobDurationSeconds
.labels({ piece: metricPiece, status: metricFinalStatus, profile: metricProfile })
.observe((Date.now() - jobStartedAtMs) / 1000);
} catch { /* metrics never affect business logic */ }
}
await this.repo.updateWorkerNodeHealth(this.workerId, {
healthy: this.healthy,
lastError: this.lastHealthError,
inflightJobs: this.inflight,
availableModels: [...this.availableModels],
});
await this.repo.unlockIssue(repoName, issueNumber);
}
}
private buildPieceCallbacks(
jobId: string,
reporter: LocalProgressReporter,
isLocalTask: boolean,
localTaskId: number | null,
workspacePath: string,
/**
* Initial value of jobs.last_backend_id from the DB. Used to seed the
* sticky guard so callers don't repeatedly write the same value on
* every LLM iteration. Falsy/null = no backend resolved yet.
*/
initialLastBackendId: string | null = null,
): PieceRunCallbacks {
let movementStartTime = Date.now();
const toolUsageCounts = new Map<string, number>();
// Sticky-backend per design Open Question #3: take the first proxy
// backend the job sees and never overwrite it. Subsequent calls that
// happen to land on a different deployment are ignored at this layer
// so the UI Pet doesn't flicker between sprites. The resolver also
// guarantees that if the DB persist fails, the local sticky stays
// unset so the next event can retry (otherwise a transient DB error
// would orphan the worker → backend mapping for the lifetime of the
// job). See src/worker/sticky-backend.ts.
const workerIdLocal = this.workerId;
const onBackendResolvedHandler = createStickyBackendResolver({
initial: initialLastBackendId,
persist: (backendId) => this.repo.updateJob(jobId, { lastBackendId: backendId }),
logger: {
debug: (m) => logger.debug(m),
info: (m) => logger.info(m),
warn: (m) => logger.warn(m),
},
workerId: workerIdLocal,
jobId,
});
// Phase 3b: local copy of the sticky backend so the LLM-call metric
// has a stable backend_id label even before the persist returns.
// Direct workers (non-proxy) never fire onBackendResolved, so we
// fall back to the worker id (`gpu-rtx-a`) as the backend identity.
let metricBackendId = initialLastBackendId ?? workerIdLocal;
const metricModel = this.model ?? 'unknown';
const metricsRef = this.workerMetrics;
// Pending tool calls awaiting result, keyed by callId.
// On onToolResult, we pair via callId and persist a single tool_call comment.
const pendingToolCalls = new Map<string, { name: string; args: Record<string, unknown>; movement: string }>();
let currentMovementForTools = '';
const ARG_PREVIEW_CAP = 8 * 1024;
const RESULT_PREVIEW_CAP = 16 * 1024;
const truncate = (s: string, cap: number): string =>
s.length > cap ? s.slice(0, cap) + `\n…[truncated ${s.length - cap} bytes]` : s;
return {
onMovementStart: (name) => {
movementStartTime = Date.now();
toolUsageCounts.clear();
currentMovementForTools = name;
this.repo.updateJob(jobId, { currentMovement: name, currentActivity: null });
reporter.reportMovementStart(name);
},
onToolUse: (toolName, input, callId) => {
toolUsageCounts.set(toolName, (toolUsageCounts.get(toolName) ?? 0) + 1);
const summary = summarizeToolInput(toolName, input);
this.repo.updateJob(jobId, { currentActivity: `${toolName}: ${summary}`.slice(0, 200) });
reporter.reportToolUse(toolName, input);
if (callId) {
pendingToolCalls.set(callId, { name: toolName, args: input, movement: currentMovementForTools });
}
if (jobEventBus.hasListeners(jobId)) {
jobEventBus.emitJob(jobId, {
type: 'tool_use',
toolName,
toolInput: summary,
callId: callId ?? '',
});
}
},
onToolCallDelta: (callId, name, chunk) => {
if (jobEventBus.hasListeners(jobId)) {
jobEventBus.emitJob(jobId, { type: 'tool_use_delta', callId, name, chunk });
}
},
onText: (text) => {
if (jobEventBus.hasListeners(jobId)) {
jobEventBus.emitJob(jobId, { type: 'text', text });
}
},
onPromptProgress: (progress) => {
if (jobEventBus.hasListeners(jobId)) {
jobEventBus.emitJob(jobId, {
type: 'prompt_progress',
processed: progress.processed,
total: progress.total,
timeMs: progress.timeMs,
cache: progress.cache,
});
}
},
onTextPreview: (movementName, preview) => {
reporter.reportAssistantPreview(movementName, preview);
},
onContextAction: (action) => {
reporter.reportContextAction(action);
},
onContextUpdate: (payload) => {
this.repo.updateJobContext(jobId, payload).catch(err => {
logger.warn(`[worker:${this.workerId}] failed to persist context for job ${jobId}: ${err}`);
});
},
onLLMCall: (info) => {
reporter.reportLLMCall(info);
if (metricsRef) {
try {
metricsRef.llmCallsTotal
.labels({ worker_id: workerIdLocal, backend_id: metricBackendId, model: metricModel })
.inc();
metricsRef.llmCallDurationSeconds
.labels({ worker_id: workerIdLocal, backend_id: metricBackendId, model: metricModel })
.observe(info.durationMs / 1000);
} catch { /* metrics best-effort */ }
}
},
onBackendResolved: (info) => {
// Phase 3b: update the sticky backend id used for LLM-call
// metrics. We capture every event (not just the first) so a
// routing change mid-job is reflected in the next iteration's
// counters; the DB-side sticky still preserves the first.
if (info.backendId) {
metricBackendId = info.backendId;
}
// Fire-and-forget: agent-loop's onBackendResolved signature is
// sync (void). The resolver handles persist errors internally;
// we just attach a final guard to log any unexpected throw.
// cacheKey is observed but not persisted at the job level —
// Phase B's NodeStatusWidget will track cache hits out-of-band.
onBackendResolvedHandler(info).catch(err => {
logger.warn(`[worker:${this.workerId}] sticky backend resolver threw for job ${jobId}: ${err}`);
});
},
onMovementComplete: (movementName, result) => {
const durationMs = Date.now() - movementStartTime;
const tools: Record<string, number> = {};
for (const [name, count] of toolUsageCounts) {
tools[name] = count;
}
reporter.reportMovementComplete(movementName, result.output, result.next);
if (isLocalTask) {
const isTerminal = result.next === 'COMPLETE' || result.next === 'ABORT' || result.next === 'ASK';
const summary = !isTerminal ? (result.output?.trim() || undefined) : undefined;
const progressBody = JSON.stringify({ movement: movementName, tools, durationMs, ...(summary ? { summary } : {}) });
this.repo.addLocalTaskComment(localTaskId!, 'agent', progressBody, 'progress')
.catch(err => logger.warn(`[worker:${this.workerId}] failed to insert progress comment: ${err}`));
if (isTerminal && jobEventBus.hasListeners(jobId)) {
jobEventBus.emitJob(jobId, { type: 'done' });
}
}
},
onToolResult: (toolName, info, callId) => {
const { isError } = info;
reporter.reportToolResult(toolName, info);
// Pair with pending tool_use via callId, then persist as comment + emit SSE.
const pending = callId ? pendingToolCalls.get(callId) : undefined;
if (callId) pendingToolCalls.delete(callId);
if (isLocalTask && callId && pending) {
let argsStr: string;
try { argsStr = truncate(JSON.stringify(pending.args), ARG_PREVIEW_CAP); }
catch { argsStr = '"<unserializable>"'; }
const toolCallBody = JSON.stringify({
type: 'tool_call',
callId,
movement: pending.movement,
name: toolName,
args: argsStr,
result: truncate(info.result, RESULT_PREVIEW_CAP),
isError,
durationMs: info.durationMs,
cacheHit: info.cacheHit,
});
this.repo.addLocalTaskComment(localTaskId!, 'agent', toolCallBody, 'progress')
.catch(err => logger.warn(`[worker:${this.workerId}] tool_call comment failed: ${err}`));
}
if (jobEventBus.hasListeners(jobId) && callId) {
jobEventBus.emitJob(jobId, {
type: 'tool_result',
toolName,
callId,
toolOutput: truncate(info.result, 2 * 1024),
toolIsError: isError,
});
}
// Phase 3b: count every tool invocation. success label is the
// string form so Grafana queries can group by it. Same
// best-effort guard as the LLM emission above.
//
// Phase 3b post-review: normalize the tool_name label so a
// piece firing arbitrary mcp__*/user-defined names doesn't
// explode label cardinality. MCP tools collapse to a single
// `mcp` bucket; unknown names land in `unknown`. The full
// tool_name is still visible in the activity log + reporter,
// so the metric drop only affects Prometheus dimensions.
if (metricsRef) {
try {
metricsRef.toolCallsTotal
.labels({ tool_name: normalizeToolNameForMetric(toolName), success: isError ? 'false' : 'true' })
.inc();
} catch { /* metrics best-effort */ }
}
if (isLocalTask && !isError && (toolName === 'CheckItem' || toolName === 'CreateChecklist')) {
try {
const checklistDir = join(workspacePath, 'logs', 'checklists');
if (existsSync(checklistDir)) {
const files = readdirSync(checklistDir).filter(f => f.endsWith('.json'));
if (files.length > 0) {
let latestFile = files[0]!;
let latestMtime = 0;
for (const file of files) {
try {
const { mtimeMs } = statSync(join(checklistDir, file));
if (mtimeMs > latestMtime) {
latestMtime = mtimeMs;
latestFile = file;
}
} catch { /* skip */ }
}
const data = JSON.parse(readFileSync(join(checklistDir, latestFile), 'utf-8'));
const progressBody = JSON.stringify({
type: 'checklist',
name: data.name,
items: data.items,
summary: data.summary,
});
this.repo.addLocalTaskComment(localTaskId!, 'agent', progressBody, 'progress')
.catch(err => logger.warn(`[worker:${this.workerId}] checklist progress comment failed: ${err}`));
}
}
} catch (err) {
logger.warn(`[worker:${this.workerId}] checklist read failed: ${err}`);
}
}
},
};
}
private async handleReflectionJob(job: Job): Promise<void> {
const { runReflectionJob } = await import('./engine/reflection/reflection-runner.js');
try {
const outcome = await runReflectionJob(
{
repo: this.repo,
config: this.config,
llmEndpoint: this.endpoint,
llmModel: this.model,
},
job
);
await this.repo.updateJob(job.id, {
status: outcome === 'failed' ? 'failed' : 'succeeded',
currentActivity: null,
});
} catch (e) {
logger.error(`[reflection] runner threw job=${job.id} err=${String(e)}`);
await this.repo.updateJob(job.id, { status: 'failed', currentActivity: null });
}
}
private async handlePieceResult(
result: PieceRunResult,
job: Job,
reporter: LocalProgressReporter,
workspacePath: string,
isLocalTask: boolean,
isSubTask: boolean,
parentJobId: string | null,
): Promise<void> {
const { repo: repoName, issueNumber, id: jobId } = job;
const localTaskId = getLocalTaskId(repoName);
if (result.status === 'completed') {
if (isLocalTask) {
await this.commitLocalWorkspace(issueNumber, workspacePath);
}
await this.repo.updateJob(jobId, { status: 'succeeded', currentActivity: null });
this.enqueuePush(job, 'succeeded');
await maybeEnqueueReflection(this.repo, job, 'succeeded', this.config.reflection, this.config.provider.workers);
let resultBody = result.finalOutput;
if (resultBody) {
resultBody = ensureKeepaGraphs(resultBody);
}
await reporter.reportFinalResult('completed', resultBody);
} else if (result.status === 'waiting_human') {
if (isSubTask && parentJobId) {
await this.repo.updateJob(jobId, { status: 'waiting_human', resumeMovement: result.resumeMovement ?? null, askCount: job.askCount + 1 });
// Sub-task ASK is auto-answered below, so we don't notify on it.
reporter.reportToolUse('ASK', { question: result.finalOutput });
await this.repo.addAuditLog(jobId, 'job_ask_subtask', 'worker', { question: result.finalOutput, resumeMovement: result.resumeMovement });
try {
const answer = await this.answerSubtaskAsk(job, parentJobId, result.finalOutput);
logger.info(`[worker:${this.workerId}] answered subtask ASK for job ${jobId}: ${answer.slice(0, 100)}`);
const newJob = await this.repo.createJob({
repo: repoName,
issueNumber,
instruction: answer,
pieceName: job.pieceName,
askCount: job.askCount + 1,
resumeMovement: result.resumeMovement,
parentJobId: job.parentJobId,
subtaskDepth: job.subtaskDepth,
maxAttempts: 2,
role: job.requiredRole,
ownerId: job.ownerId,
visibility: job.visibility,
visibilityScopeOrgId: job.visibilityScopeOrgId,
});
await this.repo.updateJob(newJob.id, { worktreePath: workspacePath });
await this.repo.addAuditLog(newJob.id, 'job_queued_subtask_ask_answer', 'worker', { originalJobId: jobId, question: result.finalOutput });
} catch (askErr) {
logger.warn(`[worker:${this.workerId}] failed to answer subtask ASK, leaving as waiting_human: ${askErr}`);
}
} else {
await this.repo.updateJob(jobId, {
status: 'waiting_human',
resumeMovement: result.resumeMovement ?? null,
askCount: job.askCount + 1,
});
this.enqueuePush(job, 'waiting_human');
await reporter.reportAsk(result.finalOutput);
await this.repo.addAuditLog(jobId, 'job_ask', 'worker', {
question: result.finalOutput,
resumeMovement: result.resumeMovement,
});
}
} else if (result.status === 'waiting_subtasks') {
const subJobs = await this.repo.getSubJobs(jobId);
if (subJobs.length === 0) {
if (result.resumeMovement) {
logger.warn(`[worker:${this.workerId}] job ${jobId} waiting_subtasks but no sub-jobs exist, re-queuing to ${result.resumeMovement}`);
await this.repo.updateJob(jobId, {
status: 'queued',
resumeMovement: result.resumeMovement,
});
} else {
logger.error(`[worker:${this.workerId}] job ${jobId} waiting_subtasks with no sub-jobs and no resumeMovement, failing`);
await this.repo.updateJob(jobId, { status: 'failed' });
}
await this.repo.addAuditLog(jobId, 'job_requeued_no_subtasks', 'worker', {
resumeMovement: result.resumeMovement,
action: result.resumeMovement ? 'requeued' : 'failed',
});
} else {
await this.repo.updateJob(jobId, {
status: 'waiting_subtasks',
resumeMovement: result.resumeMovement ?? null,
});
await reporter.reportMovementStart('サブタスク待機中...');
await this.repo.addAuditLog(jobId, 'job_waiting_subtasks', 'worker', {
resumeMovement: result.resumeMovement,
});
}
} else if (result.status === 'cancelled') {
logger.info(`[worker:${this.workerId}] job ${jobId} cancelled`);
await reporter.reportFinalResult('cancelled', result.finalOutput);
} else {
const retryDisposition = await this.scheduleRetryOrFail(job, result.finalOutput, workspacePath, result.abortReason ?? null);
if (retryDisposition !== 'requeued_unhealthy') {
await reporter.reportFinalResult(result.status, result.finalOutput);
}
if (retryDisposition === 'failed') {
const outcome = result.status === 'aborted' ? 'aborted' : 'failed';
await maybeEnqueueReflection(this.repo, job, outcome, this.config.reflection, this.config.provider.workers);
}
}
// サブタスク完了時(終端ステータスのみ): 全兄弟ジョブが完了なら親ジョブを再キュー
const SUBTASK_TERMINAL = ['completed', 'error', 'aborted', 'cancelled'];
if (isSubTask && parentJobId && SUBTASK_TERMINAL.includes(result.status)) {
try {
const parentJob = await this.repo.getJob(parentJobId);
if (parentJob?.worktreePath) {
const resultDir = join(parentJob.worktreePath, 'subtasks', String(issueNumber));
mkdirSync(resultDir, { recursive: true });
writeFileSync(
join(resultDir, 'result.md'),
`# サブタスク #${issueNumber} 結果\n\nステータス: ${result.status}\n\n${result.finalOutput}\n`,
'utf-8',
);
}
const requeued = await this.repo.requeueParentJobIfAllSubtasksDone(parentJobId);
if (requeued) {
logger.info(`[worker:${this.workerId}] all sub-tasks done, re-queued parent job ${parentJobId}`);
}
} catch (subErr) {
logger.warn(`[worker:${this.workerId}] sub-task parent re-queue error: ${subErr}`);
}
}
await this.repo.addAuditLog(jobId, `job_${result.status}`, 'worker', {
movementCount: result.movementHistory.length,
abortReason: result.abortReason ?? null,
contextActionCount: result.contextActions.length,
latestContextAction: result.contextActions[result.contextActions.length - 1] ?? null,
});
}
private resolveModel(piece: PieceDef): string | undefined {
if (piece.model) {
if (this.availableModels.size === 0 || this.availableModels.has(piece.model)) {
return piece.model;
}
logger.warn(`[worker:${this.workerId}] piece model "${piece.model}" not available, falling back to ${this.model ?? '<none>'}`);
}
// If the configured model is not in available models, auto-select the first available one
if (this.model && this.availableModels.size > 0 && !this.availableModels.has(this.model)) {
const autoModel = [...this.availableModels][0]!;
logger.info(`[worker:${this.workerId}] configured model "${this.model}" not available, auto-selecting "${autoModel}"`);
return autoModel;
}
return this.model;
}
private async scheduleRetryOrFail(
job: Job,
errorMsg: string,
workspacePath?: string,
abortReason: string | null = null,
): Promise<'requeued_unhealthy' | 'retry' | 'failed'> {
const { id: jobId, attempt, maxAttempts } = job;
const isLlmConnectionFatal = /connection error:\s*fetch failed|econnrefused|enotfound|etimedout|network error/i.test(errorMsg);
if (isLlmConnectionFatal) {
this.healthy = false;
this.lastHealthError = errorMsg;
this.availableModels.clear();
await this.repo.updateWorkerNodeHealth(this.workerId, {
healthy: false,
lastError: errorMsg,
inflightJobs: this.inflight,
availableModels: [],
});
await this.repo.updateJob(jobId, {
status: 'queued',
workerId: null,
errorSummary: errorMsg,
abortReason,
nextRetryAt: null,
});
writeRetryHandoffSummary({
workspacePath: workspacePath ?? job.worktreePath,
job,
errorMsg,
nextRetryAt: null,
disposition: 'requeued_unhealthy',
});
logger.warn(`[worker:${this.workerId}] job ${jobId} requeued after LLM connection error; worker marked unhealthy`);
return 'requeued_unhealthy';
}
if (attempt < maxAttempts) {
const backoffIndex = Math.min(attempt - 1, this.config.retry.backoffSeconds.length - 1);
const backoffSec = this.config.retry.backoffSeconds[backoffIndex] ?? this.config.retry.backoffSeconds[this.config.retry.backoffSeconds.length - 1] ?? 60;
const nextRetryAt = new Date(Date.now() + backoffSec * 1000).toISOString();
await this.repo.updateJob(jobId, {
status: 'retry',
attempt: attempt + 1,
nextRetryAt,
errorSummary: errorMsg,
abortReason,
});
writeRetryHandoffSummary({
workspacePath: workspacePath ?? job.worktreePath,
job,
errorMsg,
nextRetryAt,
disposition: 'retry',
});
logger.info(`[worker:${this.workerId}] job ${jobId} scheduled for retry ${attempt + 1}/${maxAttempts} at ${nextRetryAt}`);
return 'retry';
} else {
await this.repo.updateJob(jobId, { status: 'failed', errorSummary: errorMsg, abortReason });
// V2 push: only on terminal fail. Intermediate retry attempts are
// silenced (matches V1's 4-second debounce intent).
this.enqueuePush(job, 'failed');
writeRetryHandoffSummary({
workspacePath: workspacePath ?? job.worktreePath,
job,
errorMsg,
nextRetryAt: null,
disposition: 'failed',
});
logger.info(`[worker:${this.workerId}] job ${jobId} failed permanently after ${maxAttempts} attempts`);
return 'failed';
}
}
private listDir(dirPath: string): string[] {
try {
return readdirSync(dirPath);
} catch {
return [];
}
}
private async commitLocalWorkspace(
taskId: number,
workspacePath: string,
commitMessage?: string,
): Promise<void> {
const result = await commitWorkspaceChanges({
workspacePath,
branchName: 'main',
commitMessage: commitMessage?.trim() || `agent: update task #${taskId}`,
ignoreEntries: ['input/', 'logs/'],
});
if (!result.changed) {
logger.info(`[worker:${this.workerId}] no local changes to commit for task #${taskId}`);
return;
}
if (result.committed) {
logger.info(`[worker:${this.workerId}] committed local workspace changes for task #${taskId}`);
}
if (result.pushed) {
logger.info(`[worker:${this.workerId}] pushed local workspace changes for task #${taskId}`);
}
}
}