maestro/src/scheduler.ts
oss-sync 0f75bdfbab
Some checks failed
CI / build-and-test (push) Has been cancelled
sync: update from private repo (0e09596)
2026-06-08 03:23:19 +00:00

458 lines
18 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import { CronExpressionParser } from 'cron-parser';
import { mkdirSync, writeFileSync } from 'fs';
import { join } from 'path';
import { Repository, type ScheduledTask } from './db/repository.js';
import type { BrowserSessionRepo } from './db/browser-session-repo.js';
import { logger } from './logger.js';
import { loadConfig } from './config.js';
import { resolveAndRunUserScript } from './user-folder/script-orchestrator.js';
// 進行中とみなすステータス(これらの場合は次回スケジュール実行をスキップ)。
// これらはいずれも「真に実行中」か「自動回復する一時状態」のみ:
// queued/dispatching/running … 実行中
// waiting_subtasks … 並列サブタスク完了待ち。requeueWaitingSubtasks が自動回復させる
// waiting_human (ASK) は意図的に含めない。自動回復もタイムアウトも無いため、
// 無人実行のスケジュールタスクが一度 ASK を出すとその後の実行が永久にスキップされ、
// スケジュールが事実上死ぬ。各スケジュール実行は独立した新規タスクなので、
// 前回の waiting_human タスクは残置し(ユーザーが後から回答 or 放置できる)、新規実行を走らせる。
const IN_PROGRESS_STATUSES = new Set([
'queued', 'dispatching', 'running', 'waiting_subtasks',
]);
export interface ScheduleInput {
hour?: number;
minute?: number;
dayOfWeek?: number;
dayOfMonth?: number;
cronExpression?: string;
}
/**
* プリセットdaily/weekly/monthlyを cron 式に変換する。
* cron タイプはそのまま返し、once は 'once' を返す。
*/
export function convertToCron(scheduleType: string, input: ScheduleInput): string {
const m = input.minute ?? 0;
const h = input.hour ?? 0;
switch (scheduleType) {
case 'daily':
return `${m} ${h} * * *`;
case 'weekly':
return `${m} ${h} * * ${input.dayOfWeek ?? 0}`;
case 'monthly':
return `${m} ${h} ${input.dayOfMonth ?? 1} * *`;
case 'cron':
if (!input.cronExpression) throw new Error('cronExpression is required for cron type');
return input.cronExpression;
case 'once':
return 'once';
default:
throw new Error(`Unknown schedule type: ${scheduleType}`);
}
}
/**
* Date を SQLite datetime() 互換フォーマット (YYYY-MM-DD HH:MM:SS) に変換する。
* SQLite の datetime('now') は UTC で 'YYYY-MM-DD HH:MM:SS' を返すため、
* toISOString() の 'T' や 'Z'、ミリ秒部分を除去して一致させる。
*/
export function toSqliteDatetime(date: Date): string {
return date.toISOString().replace('T', ' ').replace(/\.\d{3}Z$/, '');
}
/**
* cron 式から次回実行時刻SQLite datetime 互換 UTCを算出する。
* 'once' の場合は null を返す。
*/
export function calcNextRun(cronExpression: string): string | null {
if (cronExpression === 'once') return null;
const interval = CronExpressionParser.parse(cronExpression, { tz: 'UTC' });
return toSqliteDatetime(interval.next().toDate());
}
export interface SchedulerOptions {
/**
* 'auto' を実 piece 名に解決するクラシファイア。UI 経路 (local-tasks-api) と同じ
* 関数を渡す。未指定または例外時は 'chat' に fallback する。
* これを渡さないと pieceName='auto' のスケジュールが worker で
* `Piece not found: auto` で必ず失敗する。
*/
selectPiece?: (body: string, fileNames: string[], userId?: string) => Promise<string>;
/**
* task_kind='script' をサポートするための依存。未指定なら script kind の実行は
* "Scheduler not configured for script kind" でその場で fail させる。
*/
sessRepo?: BrowserSessionRepo;
masterKeyPath?: string;
userFolderRoot?: string;
/**
* Override how the user-script security gate reads config. Production
* leaves this undefined (defaults to loadConfig()). Tests inject their own
* to avoid relying on a real config.yaml on disk, since vitest workers
* can't process.chdir().
*/
getUserScriptGate?: () => { enabled: boolean; allowUserids?: string[] };
}
export class Scheduler {
private timer: ReturnType<typeof setTimeout> | null = null;
private running = false;
private readonly pollIntervalMs = 60_000;
private readonly selectPiece?: (body: string, fileNames: string[], userId?: string) => Promise<string>;
private readonly sessRepo?: BrowserSessionRepo;
private readonly masterKeyPath?: string;
private readonly userFolderRoot?: string;
private readonly getUserScriptGate: () => { enabled: boolean; allowUserids?: string[] };
constructor(
private readonly repo: Repository,
private readonly worktreeDir: string,
options?: SchedulerOptions,
) {
this.selectPiece = options?.selectPiece;
this.sessRepo = options?.sessRepo;
this.masterKeyPath = options?.masterKeyPath;
this.userFolderRoot = options?.userFolderRoot;
this.getUserScriptGate = options?.getUserScriptGate ?? (() => {
const cfg = loadConfig();
return {
enabled: cfg.tools?.userScriptsEnabled === true,
allowUserids: cfg.tools?.userScriptsAllowUserids,
};
});
}
start(): void {
if (this.running) return;
this.running = true;
logger.info('[scheduler] started');
// 起動時に即座に1回 tick再起動後の溜まり分を処理
void this.tick().catch(err => logger.error(`[scheduler] initial tick error: ${err}`));
this.scheduleTick();
}
stop(): void {
this.running = false;
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
logger.info('[scheduler] stopped');
}
private scheduleTick(): void {
if (!this.running) return;
this.timer = setTimeout(async () => {
try {
await this.tick();
} catch (err) {
logger.error(`[scheduler] tick error: ${err}`);
}
this.scheduleTick();
}, this.pollIntervalMs);
}
async tick(): Promise<number> {
const dueItems = await this.repo.getScheduledTasksDue();
let executed = 0;
for (const item of dueItems) {
try {
// 前回ジョブが進行中ならスキップ
if (item.lastJobId) {
const lastJob = await this.repo.getJob(item.lastJobId);
if (lastJob && IN_PROGRESS_STATUSES.has(lastJob.status)) {
logger.info(`[scheduler] skipping scheduled_task=${item.id}: last job ${item.lastJobId} is ${lastJob.status}`);
// next_run_at だけ次回に更新
const nextRun = calcNextRun(item.cronExpression);
if (nextRun) {
await this.repo.updateScheduledTask(item.id, { nextRunAt: nextRun });
}
continue;
}
}
await this.executeScheduledTask(item);
executed++;
} catch (err) {
logger.error(`[scheduler] failed to execute scheduled_task=${item.id}: ${err}`);
// claimnext_run_at を 9999-12-31 に設定)後の失敗時は次回実行時刻を復元し、
// タスクが永久にロックされるのを防ぐ
try {
const nextRun = calcNextRun(item.cronExpression);
if (nextRun) {
await this.repo.updateScheduledTask(item.id, { nextRunAt: nextRun });
logger.info(`[scheduler] restored next_run_at for scheduled_task=${item.id}`);
}
} catch (restoreErr) {
logger.error(`[scheduler] failed to restore next_run_at for scheduled_task=${item.id}: ${restoreErr}`);
}
}
}
return executed;
}
private async executeScheduledTask(item: ScheduledTask): Promise<void> {
if (item.taskKind === 'script') {
await this.executeScriptScheduledTask(item);
return;
}
await this.executeAgentScheduledTask(item);
}
private async executeAgentScheduledTask(item: ScheduledTask): Promise<void> {
const now = toSqliteDatetime(new Date());
// タイトルに日時を付加
const dateStr = new Date().toLocaleString('ja-JP', { timeZone: 'Asia/Tokyo', month: '2-digit', day: '2-digit', hour: '2-digit', minute: '2-digit' });
const title = item.title ? `${item.title} (${dateStr})` : `スケジュール実行 (${dateStr})`;
// 'auto' は piece-classifier で実 piece 名に解決する。
// pieces/auto.yaml は存在しないので、'auto' をそのまま流すと worker が
// loadPiece('auto') で `Piece not found: auto` を投げて即失敗する。
// UI 経路 (local-tasks-api) と同じ classifier をコンストラクタ経由で受け取る。
let resolvedPiece = item.pieceName;
if (resolvedPiece === 'auto') {
if (this.selectPiece) {
try {
resolvedPiece = await this.selectPiece(item.body, [], item.ownerId ?? undefined);
} catch (err) {
logger.warn(`[scheduler] piece classification failed for scheduled_task=${item.id}: ${err}, falling back to 'chat'`);
resolvedPiece = 'chat';
}
} else {
logger.warn(`[scheduler] selectPiece not configured, falling back to 'chat' for scheduled_task=${item.id}`);
resolvedPiece = 'chat';
}
}
// 新規ローカルタスク作成。
// 所有権・可視性はスケジュール定義から継承する。継承しないと local_task.owner_id が
// NULL (system 扱い)・visibility が 'private' (admin のみ閲覧可) になり、
// 作成者本人が自分のスケジュール実行結果を見られなくなる。
const task = await this.repo.createLocalTask({
title,
body: item.body,
pieceName: resolvedPiece,
profile: item.profile as 'auto' | 'fast' | 'quality',
outputFormat: item.outputFormat as 'text' | 'markdown' | 'json',
ownerId: item.ownerId,
visibility: item.visibility,
visibilityScopeOrgId: item.visibilityScopeOrgId,
browserSessionProfileId: item.browserSessionProfileId ?? null,
});
// ワークスペース作成
const workspacePath = join(this.worktreeDir, 'local', String(task.id));
mkdirSync(join(workspacePath, 'input'), { recursive: true });
mkdirSync(join(workspacePath, 'output'), { recursive: true });
mkdirSync(join(workspacePath, 'logs'), { recursive: true });
await this.repo.updateLocalTask(task.id, { workspacePath });
// ジョブ作成
const metadataBlock = [
'---',
`ui_profile: ${item.profile}`,
`ui_output_format: ${item.outputFormat}`,
`ui_ask_policy: low`,
`ui_priority: medium`,
'---',
].join('\n');
const instruction = `${title}\n\n${item.body}\n\n${metadataBlock}`.trim();
const job = await this.repo.createJob({
repo: `local/task-${task.id}`,
issueNumber: task.id,
instruction,
pieceName: resolvedPiece,
role: item.profile as any,
ownerId: item.ownerId,
visibility: item.visibility,
visibilityScopeOrgId: item.visibilityScopeOrgId,
browserSessionProfileId: item.browserSessionProfileId ?? null,
});
await this.repo.addAuditLog(job.id, 'job_queued_scheduled', 'scheduler', {
scheduledTaskId: item.id,
taskId: task.id,
requestedPiece: item.pieceName,
resolvedPiece,
});
// scheduled_tasks を更新
const nextRun = calcNextRun(item.cronExpression);
await this.repo.updateScheduledTask(item.id, {
lastRunAt: now,
lastJobId: job.id,
nextRunAt: nextRun ?? item.nextRunAt,
isActive: item.cronExpression === 'once' ? false : undefined,
});
logger.info(`[scheduler] executed scheduled_task=${item.id} → task=${task.id} job=${job.id}`);
}
private async executeScriptScheduledTask(item: ScheduledTask): Promise<void> {
const now = toSqliteDatetime(new Date());
if (!item.scriptName) {
throw new Error(`scheduled_task=${item.id}: task_kind='script' but script_name is null`);
}
if (!this.userFolderRoot) {
throw new Error(`scheduled_task=${item.id}: task_kind='script' but Scheduler.userFolderRoot was not configured`);
}
// No-auth mode stores scheduled tasks with ownerId=null (scheduled-tasks-api
// has no authenticated user). Scripts are per-user (data/users/{id}/scripts/),
// so resolve to the same 'local' namespace the RunUserScript tool uses in
// no-auth (ctx.userId='local'). In auth mode item.ownerId is always set, so
// scriptOwner === item.ownerId and behaviour is unchanged.
const scriptOwner = item.ownerId ?? 'local';
// Same security gates as the LLM-facing RunUserScript tool: global config
// toggle + optional per-user allowlist. A scheduled run is automated, so a
// mis-enabled user could quietly exfiltrate via cron without the gate.
const gate = this.getUserScriptGate();
if (!gate.enabled) {
throw new Error(
`scheduled_task=${item.id}: user scripts are disabled (tools.user_scripts_enabled=false)`,
);
}
if (Array.isArray(gate.allowUserids) && gate.allowUserids.length > 0 && !gate.allowUserids.includes(scriptOwner)) {
throw new Error(
`scheduled_task=${item.id}: owner "${scriptOwner}" is not in tools.user_scripts_allow_userids`,
);
}
// Decode params; tolerate null / empty
let params: Record<string, unknown> = {};
if (item.scriptParams) {
try {
const parsed = JSON.parse(item.scriptParams);
if (parsed && typeof parsed === 'object' && !Array.isArray(parsed)) {
params = parsed as Record<string, unknown>;
}
} catch (err) {
throw new Error(`scheduled_task=${item.id}: malformed script_params JSON: ${(err as Error).message}`);
}
}
const dateStr = new Date().toLocaleString('ja-JP', { timeZone: 'Asia/Tokyo', month: '2-digit', day: '2-digit', hour: '2-digit', minute: '2-digit' });
const title = item.title ? `${item.title} (${dateStr})` : `スクリプト実行: ${item.scriptName} (${dateStr})`;
const task = await this.repo.createLocalTask({
title,
body: `script: ${item.scriptName}\nparams: ${item.scriptParams ?? '{}'}`,
pieceName: 'script',
profile: item.profile as 'auto' | 'fast' | 'quality',
outputFormat: item.outputFormat as 'text' | 'markdown' | 'json',
ownerId: item.ownerId,
visibility: item.visibility,
visibilityScopeOrgId: item.visibilityScopeOrgId,
browserSessionProfileId: item.browserSessionProfileId ?? null,
});
const workspacePath = join(this.worktreeDir, 'local', String(task.id));
mkdirSync(join(workspacePath, 'input'), { recursive: true });
mkdirSync(join(workspacePath, 'output'), { recursive: true });
mkdirSync(join(workspacePath, 'logs'), { recursive: true });
await this.repo.updateLocalTask(task.id, { workspacePath });
// Create job in a pre-completed state — scheduler runs the script inline,
// so the worker queue never sees it. We still create the job row so the UI
// (which keys off jobs) shows the run in task history.
const job = await this.repo.createJob({
repo: `local/task-${task.id}`,
issueNumber: task.id,
instruction: `Scheduled script run: ${item.scriptName}`,
pieceName: 'script',
role: item.profile as any,
ownerId: item.ownerId,
visibility: item.visibility,
visibilityScopeOrgId: item.visibilityScopeOrgId,
browserSessionProfileId: item.browserSessionProfileId ?? null,
});
await this.repo.addAuditLog(job.id, 'job_queued_scheduled', 'scheduler', {
scheduledTaskId: item.id,
taskId: task.id,
kind: 'script',
scriptName: item.scriptName,
});
let runFailed = false;
let errorMessage: string | null = null;
try {
const runResult = await resolveAndRunUserScript({
rootDir: this.userFolderRoot,
userId: scriptOwner,
name: item.scriptName,
params,
sessRepo: this.sessRepo,
masterKeyPath: this.masterKeyPath,
timeoutMs: 60_000,
});
const summary = [
`script: ${item.scriptName}`,
`subdir: ${runResult.ok ? runResult.subdir : runResult.subdir ?? 'unresolved'}`,
`ok: ${runResult.ok}`,
runResult.ok ? `durationMs: ${runResult.durationMs}` : `error: ${runResult.error}`,
].join('\n');
writeFileSync(join(workspacePath, 'logs', 'script-run.log'), summary + '\n', 'utf-8');
if (runResult.ok) {
const out = typeof runResult.result === 'string'
? runResult.result
: JSON.stringify(runResult.result, null, 2);
writeFileSync(join(workspacePath, 'output', 'script-output.txt'), out ?? '', 'utf-8');
if (runResult.logs.length > 0) {
writeFileSync(join(workspacePath, 'logs', 'script-stdout.log'), runResult.logs.join('\n') + '\n', 'utf-8');
}
} else {
runFailed = true;
errorMessage = runResult.error;
writeFileSync(join(workspacePath, 'logs', 'script-error.log'), runResult.error + '\n', 'utf-8');
}
} catch (err) {
runFailed = true;
errorMessage = (err as Error).message;
writeFileSync(join(workspacePath, 'logs', 'script-error.log'), `unexpected error: ${errorMessage}\n`, 'utf-8');
}
await this.repo.updateJob(job.id, {
status: runFailed ? 'failed' : 'succeeded',
...(runFailed && errorMessage ? { errorSummary: errorMessage.slice(0, 1000) } : {}),
});
await this.repo.addAuditLog(job.id, 'user_script_run', 'scheduler', {
scheduledTaskId: item.id,
userId: scriptOwner,
scriptName: item.scriptName,
ok: !runFailed,
...(runFailed && errorMessage ? { error: errorMessage.slice(0, 500) } : {}),
});
const nextRun = calcNextRun(item.cronExpression);
await this.repo.updateScheduledTask(item.id, {
lastRunAt: now,
lastJobId: job.id,
nextRunAt: nextRun ?? item.nextRunAt,
isActive: item.cronExpression === 'once' ? false : undefined,
});
if (runFailed) {
logger.warn(`[scheduler] script scheduled_task=${item.id} (${item.scriptName}) failed: ${errorMessage}`);
} else {
logger.info(`[scheduler] executed script scheduled_task=${item.id} → task=${task.id} job=${job.id} script=${item.scriptName}`);
}
}
/** 手動トリガー用: 指定IDのスケジュールを即時実行 */
async executeById(id: number): Promise<void> {
const item = await this.repo.getScheduledTask(id);
if (!item) throw new Error(`Scheduled task ${id} not found`);
await this.executeScheduledTask(item);
}
}