diff --git a/config.yaml.example b/config.yaml.example index aabfdb4..ea4bef2 100644 --- a/config.yaml.example +++ b/config.yaml.example @@ -39,7 +39,10 @@ llm: # # model はワーカーごとに明示。`default_model` は廃止された。 # roles: 用途別 (auto / fast / quality / title / reflection 等) のフィルタ。 - # max_concurrency: ワーカー単位の並列度。 + # max_concurrency: 1ワーカーが同時実行するジョブ数 (A1スロット方式)。デフォルト1。 + # >1 はオプトイン。注意: shutdown drain は粗く、fairness は低下し、 + # LLM provider 側の rate limit は別途。browser を使うワーカーで >1 にする場合は + # セッション作成は mutex 済みだが本番投入前に dogfooding すること。 # vlm: true で画像入力に対応 (ReadImage は VLM ワーカーを優先)。 workers: - id: local-ollama @@ -189,6 +192,7 @@ subtasks: # safety: # max_iterations: 200 # 1 movement 内の最大イテレーション # max_revisits: 3 # 同一 movement の最大再訪問 +# max_tool_loop_repeats: 5 # 全く同じツール呼び出し(名前+引数)を連続で繰り返した回数がこの値に達したらループとみなし強制中断 (2以上) # prompt_guard_ratio: 0.8 # コンテキスト上限の何 % まで prompt を許容するか (0.5–0.95) # history_summarization: # 古い turn を構造化要約に置換して粘る (Opencode 方式) # enabled: true # default true diff --git a/docs/tools/slide.md b/docs/tools/slide.md index 3d8104a..0699544 100644 --- a/docs/tools/slide.md +++ b/docs/tools/slide.md @@ -57,8 +57,9 @@ AddSlide({ **title**: `{ title, subtitle?, author?, date? }` **section**: `{ number?: "01", title }` -**bullets**: `{ title, bullets: string[], footnote? }` -**two-column**: `{ title, left: {heading?, bullets?, text?}, right: {...} }` +**bullets**: `{ title, bullets: Bullet[], footnote? }` + - `Bullet` = `string` または `{ text, bold?, bullets?: Bullet[] }`。`bullets` をネストするとサブ項目(インデント)になる。基本は文字列配列で十分 +**two-column**: `{ title, left: {heading?, bullets?: Bullet[], text?}, right: {...} }` **image-right** / **image-left**: `{ title, body: string | string[], image: { path, alt? } }` **image-full**: `{ image: { path }, caption? }` **table**: `{ title, headers: string[], rows: string[][], col_widths?: number[] }` diff --git a/docs/tools/webfetch.md b/docs/tools/webfetch.md index 942a1d6..c1890a6 100644 --- a/docs/tools/webfetch.md +++ b/docs/tools/webfetch.md @@ -67,3 +67,11 @@ WebFetch がエラーを返した場合、以下の原則で `BrowseWeb` にリ | `Just a moment...` 等 Cloudflare challenge | **する** — ブラウザで JS challenge を通過できる | リトライ時は同じ URL を `BrowseWeb({ url: "..." })` に渡すだけでよい。`BrowseWeb` はジョブ内で Cookie・セッションを保持するので、複数回呼んでもログイン状態は引き継がれる。 + +## バイナリの扱い + +WebFetch は取得 body の先頭 8KB を sniff し(magic byte / NUL / 不正 UTF-8 / 制御文字比率)、 +バイナリと判定したら**コンテキストに展開せずブロック**する。Content-Type が欠落・詐称 +(例: `.xls` を `text/plain`)していても実バイトで検出する。バイナリを処理したい場合は +`DownloadFile` で `input/` に保存し、`ReadExcel` / `ReadPdf` 等を使う。テキスト body は +5MB で打ち切られる(末尾に `[truncated: body exceeded 5MB]`)。 diff --git a/src/config.ts b/src/config.ts index f3e9e13..f8a2cf1 100644 --- a/src/config.ts +++ b/src/config.ts @@ -223,6 +223,13 @@ export interface HistorySummarizationConfig { export interface SafetyConfig { maxIterations?: number; maxRevisits?: number; + /** + * Tool-call loop detection threshold. If the LLM emits the byte-identical + * regular tool-call batch (same tool names + same arguments) on this many + * consecutive iterations within a single movement, the movement is + * force-aborted as a loop. Must be an integer >= 2. Default: 5. + */ + maxToolLoopRepeats?: number; /** * Fraction of the model context budget that the prompt is allowed to fill * before guardPromptBeforeSend triggers compaction/summarization. @@ -521,6 +528,7 @@ const defaults: AppConfig = { safety: { maxIterations: 200, maxRevisits: 3, + maxToolLoopRepeats: 5, bashSandbox: 'auto', }, reflection: { ...DEFAULT_REFLECTION }, @@ -815,6 +823,11 @@ export function validateConfig(config: AppConfig): string[] { errors.push('safety.maxRevisits must be a positive integer if defined'); } } + if (config.safety.maxToolLoopRepeats !== undefined) { + if (!Number.isInteger(config.safety.maxToolLoopRepeats) || config.safety.maxToolLoopRepeats < 2) { + errors.push('safety.maxToolLoopRepeats must be an integer >= 2 if defined'); + } + } if (config.safety.promptGuardRatio !== undefined) { const r = config.safety.promptGuardRatio; if (typeof r !== 'number' || !Number.isFinite(r) || r < 0.5 || r > 0.95) { diff --git a/src/engine/agent-loop.tool-loop.test.ts b/src/engine/agent-loop.tool-loop.test.ts new file mode 100644 index 0000000..8d4604e --- /dev/null +++ b/src/engine/agent-loop.tool-loop.test.ts @@ -0,0 +1,162 @@ +import { afterEach, describe, expect, it, vi } from 'vitest'; +import type { LLMEvent, ToolDef } from '../llm/openai-compat.js'; +import type { ToolContext } from './tools/index.js'; + +const { executeToolMock, getToolDefsMock } = vi.hoisted(() => ({ + executeToolMock: vi.fn(), + getToolDefsMock: vi.fn(), +})); + +vi.mock('./tools/index.js', () => ({ + executeTool: executeToolMock, + getToolDefs: getToolDefsMock, +})); + +import { executeMovement, type Movement } from './agent-loop.js'; + +function makeMovement(allowedTools: string[]): Movement { + return { + name: 'execute', + edit: false, + persona: 'worker', + instruction: 'Do the work.', + allowedTools, + rules: [{ condition: 'done', next: 'COMPLETE' }], + defaultNext: 'COMPLETE', + }; +} + +function makeToolDefs(names: string[]): ToolDef[] { + return names.map((name) => ({ + type: 'function', + function: { + name, + description: name, + parameters: { type: 'object', properties: {}, required: [] }, + }, + })); +} + +function makeContext(): ToolContext { + return { + workspacePath: '/tmp/agent-loop-tool-loop-test', + editAllowed: false, + }; +} + +/** Yields the supplied response scripts; once exhausted, yields empty batches. */ +class FakeClient { + private index = 0; + constructor(private readonly responses: LLMEvent[][]) {} + async *chat(_messages: unknown, _tools?: unknown): AsyncGenerator { + const response = this.responses[this.index++] ?? [{ type: 'done' } as LLMEvent]; + for (const event of response) yield event; + } +} + +/** One iteration that calls Read on the same path with the same args. */ +function identicalReadBatch(): LLMEvent[] { + return [ + { type: 'tool_use', id: 'read', name: 'Read', input: { file_path: 'input/a.txt' } }, + { type: 'done' }, + ]; +} + +describe('executeMovement tool-call loop detection', () => { + afterEach(() => { + executeToolMock.mockReset(); + getToolDefsMock.mockReset(); + }); + + it('aborts when the identical tool-call batch repeats up to the limit', async () => { + getToolDefsMock.mockResolvedValue(makeToolDefs(['Read'])); + executeToolMock.mockResolvedValue({ output: 'same content', isError: false }); + + // 6 identical batches available, but the default limit (5) should fire + // on the 5th before dispatching it → only 4 Read executions. + const client = new FakeClient(Array.from({ length: 6 }, identicalReadBatch)); + + const result = await executeMovement(makeMovement(['Read']), 'task', client as never, makeContext()); + + expect(result.next).toBe('ABORT'); + expect(result.abortCode).toBe('tool_loop_detected'); + expect(executeToolMock).toHaveBeenCalledTimes(4); + }); + + it('honours a custom maxToolLoopRepeats from safetyConfig', async () => { + getToolDefsMock.mockResolvedValue(makeToolDefs(['Read'])); + executeToolMock.mockResolvedValue({ output: 'same content', isError: false }); + + const client = new FakeClient(Array.from({ length: 6 }, identicalReadBatch)); + + const result = await executeMovement( + makeMovement(['Read']), + 'task', + client as never, + makeContext(), + { safetyConfig: { maxToolLoopRepeats: 3 } }, + ); + + expect(result.next).toBe('ABORT'); + expect(result.abortCode).toBe('tool_loop_detected'); + // Limit 3 → aborts before the 3rd dispatch → 2 Read executions. + expect(executeToolMock).toHaveBeenCalledTimes(2); + }); + + it('does not abort when identical calls stay under the limit, then completes', async () => { + getToolDefsMock.mockResolvedValue(makeToolDefs(['Read'])); + executeToolMock.mockResolvedValue({ output: 'same content', isError: false }); + + // 4 identical Read batches (under the default limit of 5), then complete. + const client = new FakeClient([ + ...Array.from({ length: 4 }, identicalReadBatch), + [{ type: 'tool_use', id: 'c', name: 'complete', input: { status: 'success', result: 'done' } }, { type: 'done' }], + ]); + + const result = await executeMovement(makeMovement(['Read']), 'task', client as never, makeContext()); + + expect(result.next).toBe('COMPLETE'); + expect(executeToolMock).toHaveBeenCalledTimes(4); + }); + + it('resets the counter when the tool-call args change (no false positive)', async () => { + getToolDefsMock.mockResolvedValue(makeToolDefs(['Read'])); + executeToolMock.mockResolvedValue({ output: 'content', isError: false }); + + // 8 Read calls but each on a different file → fingerprint changes every + // iteration → never reaches the consecutive-repeat limit. Then complete. + const varied: LLMEvent[][] = Array.from({ length: 8 }, (_, i) => [ + { type: 'tool_use', id: `read-${i}`, name: 'Read', input: { file_path: `input/file-${i}.txt` } }, + { type: 'done' }, + ]); + const client = new FakeClient([ + ...varied, + [{ type: 'tool_use', id: 'c', name: 'complete', input: { status: 'success', result: 'done' } }, { type: 'done' }], + ]); + + const result = await executeMovement(makeMovement(['Read']), 'task', client as never, makeContext()); + + expect(result.next).toBe('COMPLETE'); + expect(executeToolMock).toHaveBeenCalledTimes(8); + }); + + it('lets transition/complete win even if it shares a batch with a repeated call', async () => { + getToolDefsMock.mockResolvedValue(makeToolDefs(['Read'])); + executeToolMock.mockResolvedValue({ output: 'same content', isError: false }); + + // 4 identical Read batches (warning fires but no abort), then a batch that + // repeats the same Read AND completes — complete must take precedence. + const client = new FakeClient([ + ...Array.from({ length: 4 }, identicalReadBatch), + [ + { type: 'tool_use', id: 'read', name: 'Read', input: { file_path: 'input/a.txt' } }, + { type: 'tool_use', id: 'c', name: 'complete', input: { status: 'success', result: 'done' } }, + { type: 'done' }, + ], + ]); + + const result = await executeMovement(makeMovement(['Read']), 'task', client as never, makeContext()); + + expect(result.next).toBe('COMPLETE'); + }); +}); diff --git a/src/engine/agent-loop.ts b/src/engine/agent-loop.ts index fb765e8..10aa69b 100644 --- a/src/engine/agent-loop.ts +++ b/src/engine/agent-loop.ts @@ -142,6 +142,13 @@ export interface AgentLoopCallbacks { } const DEFAULT_MAX_ITERATIONS = 200; +/** + * Default for safety.maxToolLoopRepeats: how many times the LLM may emit the + * byte-identical regular tool-call batch on consecutive iterations before the + * movement is force-aborted as a tool-call loop. Overridable per config and + * from the Settings UI. + */ +const DEFAULT_MAX_TOOL_LOOP_REPEATS = 5; const TRANSITION_TOOL_NAME = 'transition'; const COMPLETE_TOOL_NAME = 'complete'; const MEMORY_UPDATE_TOOL_NAME = 'memory_update'; @@ -862,6 +869,44 @@ function buildMaxIterationsAbortMessage( ].join(' '); } +/** + * Stable, order-insensitive fingerprint of a batch of regular tool calls, + * used by the tool-call loop detector. Two iterations whose regular tool + * calls have the same names and the same (normalized) arguments produce the + * same string. Arguments are JSON-normalized (sorted keys) so insignificant + * whitespace / key-order differences don't break the match; unparseable + * arguments fall back to the raw string. + */ +function fingerprintToolCalls(calls: ToolCall[]): string { + const stable = (raw: string): string => { + try { + const parsed = JSON.parse(raw); + // Re-stringify with top-level keys sorted so insignificant key-order / + // whitespace differences don't defeat the match. + const keys = parsed && typeof parsed === 'object' && !Array.isArray(parsed) + ? Object.keys(parsed).sort() + : undefined; + return JSON.stringify(parsed, keys); + } catch { + return raw; + } + }; + return calls.map((c) => `${c.function.name}(${stable(c.function.arguments)})`).join('|'); +} + +function buildToolLoopAbortMessage( + movementName: string, + repeats: number, + limit: number, + calls: ToolCall[], +): string { + const toolNames = calls.map((c) => c.function.name).join(', '); + return [ + `Aborted: movement "${movementName}" repeated the identical tool call(s) [${toolNames}] ${repeats} times in a row (limit: ${limit}) without making progress.`, + 'This is a tool-call loop: the same action produced no change in state. Adjust safety.maxToolLoopRepeats to tune the threshold.', + ].join(' '); +} + function parseInteractiveBrowseWaitingHuman( toolName: string, resultStr: string, @@ -1945,6 +1990,17 @@ export async function executeMovement( const toolsUsed: string[] = []; let regularToolsUsed = 0; // transition 以外のツール使用回数 const textOnlyRetries = { value: 0 }; + + // Tool-call loop detection (consecutive-identical-batch). If the LLM emits + // the byte-identical regular tool-call batch on N consecutive iterations it + // is stuck — repeating an action that produces no state change. We warn once + // as the count approaches the limit (giving the LLM a chance to self-correct) + // and hard-abort the movement when the limit is reached. Counters reset the + // moment the batch fingerprint changes (loop broken). + const maxToolLoopRepeats = safetyConfig?.maxToolLoopRepeats ?? DEFAULT_MAX_TOOL_LOOP_REPEATS; + let lastRegularFingerprint: string | null = null; + let consecutiveToolRepeats = 0; + let toolLoopWarned = false; // Checklist watchdog (Phase: stronger enforcement). If the LLM goes // CHECKLIST_REMINDER_AFTER_ITERATIONS iterations without calling // CreateChecklist or GetChecklist, push a one-shot reminder. Existing @@ -2204,6 +2260,50 @@ export async function executeMovement( ); const classified = classifyTerminalCalls(flowControlCalls); + // Tool-call loop detection. Only when this batch is purely regular tool + // calls — if the LLM also asked to transition/complete, the movement is + // ending anyway and takes precedence. `injectLoopWarning` is deferred + // until AFTER the tool-result messages are appended below, so we never + // wedge a user message between an assistant tool_call and its tool_result + // (which the provider would reject). + let injectLoopWarning = false; + if (regularCalls.length > 0 && flowControlCalls.length === 0) { + const fp = fingerprintToolCalls(regularCalls); + if (fp === lastRegularFingerprint) { + consecutiveToolRepeats += 1; + } else { + lastRegularFingerprint = fp; + consecutiveToolRepeats = 1; + toolLoopWarned = false; + } + + if (consecutiveToolRepeats >= maxToolLoopRepeats) { + logger.warn(`[agent-loop] movement=${movement.name} tool-call loop detected: identical batch repeated ${consecutiveToolRepeats}x (limit=${maxToolLoopRepeats}) tools=[${regularCalls.map((c) => c.function.name).join(',')}]`); + movementEvents.emit('tool_loop_detected', { + repeats: consecutiveToolRepeats, + limit: maxToolLoopRepeats, + tools: regularCalls.map((c) => c.function.name), + fingerprint: fp.substring(0, 500), + }, { iteration }); + return finishMovement({ + next: 'ABORT', + output: buildToolLoopAbortMessage(movement.name, consecutiveToolRepeats, maxToolLoopRepeats, regularCalls), + toolsUsed, + abortCode: 'tool_loop_detected', + }); + } + + // Progressive warning ahead of the hard limit (but never on the very + // first repeat). For the default limit of 5 this fires on the 3rd + // identical batch, leaving two iterations to self-correct. Fires at + // most once per loop streak. + const warnAt = Math.max(2, maxToolLoopRepeats - 2); + if (consecutiveToolRepeats >= warnAt && !toolLoopWarned) { + injectLoopWarning = true; + toolLoopWarned = true; + } + } + const dispatch = await dispatchRegularToolCalls( regularCalls, regularTools, @@ -2300,6 +2400,20 @@ export async function executeMovement( } } + // Tool-loop progressive warning. Injected here (not before dispatch) so + // it lands AFTER every tool_result for this iteration — keeping the + // assistant/tool_result pairing intact. Gives the LLM one chance to + // change course before the hard abort on the next identical batch. + if (injectLoopWarning) { + const remaining = maxToolLoopRepeats - consecutiveToolRepeats; + messages.push({ + role: 'user', + content: `[loop watchdog] 同じツール呼び出し(${regularCalls.map((c) => c.function.name).join(', ')})を ${consecutiveToolRepeats} 回連続で繰り返しています。同じ引数で呼び続けると、あと ${remaining} 回で強制中断されます。\n直前の結果を読み直し、(1) 別のアプローチ・別の引数を試す、(2) 既に十分な情報が揃っているなら transition / complete で次に進む、のいずれかを選んでください。同じ呼び出しを繰り返さないでください。`, + }); + logger.info(`[agent-loop] movement=${movement.name} tool-loop watchdog nudge at repeats=${consecutiveToolRepeats}/${maxToolLoopRepeats}`); + movementEvents.emit('watchdog_fire', { kind2: 'tool_loop', iteration, repeats: consecutiveToolRepeats }, { iteration }); + } + // Phase 6a §2.5 (post-6b): select winner from classified terminals. // Only `native_winner` exists now — the legacy shim path was removed. const winner = selectTerminalWinner(classified); diff --git a/src/engine/async-mutex.test.ts b/src/engine/async-mutex.test.ts new file mode 100644 index 0000000..dbbafcc --- /dev/null +++ b/src/engine/async-mutex.test.ts @@ -0,0 +1,32 @@ +import { describe, expect, it } from 'vitest'; +import { AsyncMutex } from './async-mutex.js'; + +const tick = () => new Promise((r) => setTimeout(r, 5)); + +describe('AsyncMutex', () => { + it('runs exclusive sections serially, never overlapping', async () => { + const m = new AsyncMutex(); + const log: string[] = []; + const section = (name: string) => m.runExclusive(async () => { + log.push(`${name}:enter`); + await tick(); + log.push(`${name}:exit`); + }); + await Promise.all([section('A'), section('B'), section('C')]); + // every enter is immediately followed by its own exit (no interleave) + expect(log).toEqual(['A:enter', 'A:exit', 'B:enter', 'B:exit', 'C:enter', 'C:exit']); + }); + + it('continues the chain after a rejection', async () => { + const m = new AsyncMutex(); + const a = m.runExclusive(async () => { throw new Error('boom'); }); + await expect(a).rejects.toThrow('boom'); + const b = await m.runExclusive(async () => 42); + expect(b).toBe(42); + }); + + it('returns the section result', async () => { + const m = new AsyncMutex(); + expect(await m.runExclusive(async () => 'ok')).toBe('ok'); + }); +}); diff --git a/src/engine/async-mutex.ts b/src/engine/async-mutex.ts new file mode 100644 index 0000000..d7cc013 --- /dev/null +++ b/src/engine/async-mutex.ts @@ -0,0 +1,16 @@ +/** + * Minimal async mutex: serializes async sections via a promise chain. Each + * runExclusive call waits for the previous one to settle before starting, so + * critical sections never interleave across awaits. Rejections are isolated — + * a failing section does not break the chain for later callers. + */ +export class AsyncMutex { + private tail: Promise = Promise.resolve(); + + runExclusive(fn: () => Promise): Promise { + const result = this.tail.then(() => fn()); + // Keep the chain alive regardless of this section's outcome. + this.tail = result.then(() => undefined, () => undefined); + return result; + } +} diff --git a/src/engine/browser-session.ts b/src/engine/browser-session.ts index 83f5ec4..085d9aa 100644 --- a/src/engine/browser-session.ts +++ b/src/engine/browser-session.ts @@ -6,6 +6,7 @@ import { createServer } from 'net'; import { EventEmitter } from 'events'; import { logger } from '../logger.js'; import type { BrowserConfig } from '../config.js'; +import { AsyncMutex } from './async-mutex.js'; import { buildLaunchOptions, applyStealthInitScript, applyAgentSnapshotHooks } from './browser-launch.js'; /** @@ -98,6 +99,7 @@ export class SessionManager extends EventEmitter { private config: BrowserConfig; private nextDisplayNum = 99; private gcIntervalHandle: NodeJS.Timeout | null = null; + private creationMutex = new AsyncMutex(); constructor(config: BrowserConfig) { super(); @@ -126,7 +128,7 @@ export class SessionManager extends EventEmitter { * * 呼び出し元: createSession (legacy), createPoolSession, getOrCreateTaskSession */ - private async createSessionInternal(opts: { + private async doCreateSessionInternal(opts: { id?: string; kind: 'pool' | 'task' | 'login'; taskId?: string; @@ -205,6 +207,20 @@ export class SessionManager extends EventEmitter { return session; } + private async createSessionInternal(opts: { + id?: string; + kind: 'pool' | 'task' | 'login'; + taskId?: string; + userId?: string; + profileId?: number; + storageState?: object; + }): Promise { + // Serialize creation: doCreateSessionInternal mutates process.env.DISPLAY + // across an await (chromium.launch). Concurrent callers (now possible with + // worker max_concurrency > 1) would otherwise clobber each other's DISPLAY. + return this.creationMutex.runExclusive(() => this.doCreateSessionInternal(opts)); + } + /** * @deprecated 新規コードでは createPoolSession() / getOrCreateTaskSession() を使う。 * 後方互換: 既存の InteractiveBrowse / browser-api.createBrowserApi が直接呼んでいる。 diff --git a/src/engine/tools/binary-detect.test.ts b/src/engine/tools/binary-detect.test.ts new file mode 100644 index 0000000..92e866b --- /dev/null +++ b/src/engine/tools/binary-detect.test.ts @@ -0,0 +1,93 @@ +import { describe, expect, it } from 'vitest'; +import { looksLikeBinaryBytes, decodeText } from './binary-detect.js'; + +const b = (...bytes: number[]) => Buffer.from(bytes); +const txt = (s: string) => Buffer.from(s, 'utf-8'); + +describe('looksLikeBinaryBytes — magic signatures (binary)', () => { + it('detects OLE2 (.xls/.doc)', () => { + expect(looksLikeBinaryBytes(b(0xD0, 0xCF, 0x11, 0xE0, 0xA1, 0xB1, 0x1A, 0xE1, 0x00, 0x01))) + .toEqual({ binary: true, reason: 'magic:ole2' }); + }); + it('detects ZIP (.xlsx/.docx)', () => { + expect(looksLikeBinaryBytes(b(0x50, 0x4B, 0x03, 0x04, 0x14, 0x00)).binary).toBe(true); + }); + it('detects PDF', () => { + expect(looksLikeBinaryBytes(txt('%PDF-1.7\n...')).binary).toBe(true); + }); + it('detects PNG / JPEG / GIF / gzip / RIFF', () => { + expect(looksLikeBinaryBytes(b(0x89, 0x50, 0x4E, 0x47)).binary).toBe(true); + expect(looksLikeBinaryBytes(b(0xFF, 0xD8, 0xFF, 0xE0)).binary).toBe(true); + expect(looksLikeBinaryBytes(b(0x47, 0x49, 0x46, 0x38)).binary).toBe(true); + expect(looksLikeBinaryBytes(b(0x1F, 0x8B, 0x08)).binary).toBe(true); + expect(looksLikeBinaryBytes(b(0x52, 0x49, 0x46, 0x46, 1, 2, 3, 4, 0x57, 0x45, 0x42, 0x50)).binary).toBe(true); + }); + it('RIFF reason is magic:riff', () => { + expect(looksLikeBinaryBytes(b(0x52, 0x49, 0x46, 0x46, 1, 2, 3, 4, 0x57, 0x45, 0x42, 0x50))) + .toEqual({ binary: true, reason: 'magic:riff' }); + }); +}); + +describe('looksLikeBinaryBytes — byte heuristics', () => { + it('NUL byte in head => binary', () => { + expect(looksLikeBinaryBytes(b(0x68, 0x69, 0x00, 0x68))).toEqual({ binary: true, reason: 'nul-byte' }); + }); + it('invalid UTF-8 => binary', () => { + expect(looksLikeBinaryBytes(b(0xC0, 0xC1, 0x80, 0x81, 0xF8, 0xF9)).binary).toBe(true); + }); + it('high control-char ratio => binary', () => { + expect(looksLikeBinaryBytes(b(0x01, 0x02, 0x03, 0x04, 0x05, 0x41)).binary).toBe(true); + }); +}); + +describe('looksLikeBinaryBytes — text (must NOT false-positive)', () => { + it('plain UTF-8 HTML', () => { + expect(looksLikeBinaryBytes(txt('hi'))).toEqual({ binary: false, encoding: 'utf-8' }); + }); + it('JSON and CSV', () => { + expect(looksLikeBinaryBytes(txt('{"a":1,"b":[2,3]}')).binary).toBe(false); + expect(looksLikeBinaryBytes(txt('col1,col2\n1,2\n3,4\n')).binary).toBe(false); + }); + it('SVG / XML stays text', () => { + expect(looksLikeBinaryBytes(txt('')).binary).toBe(false); + }); + it('emoji-heavy UTF-8 stays text', () => { + expect(looksLikeBinaryBytes(txt('hello 😀🎉🚀 world ✨')).binary).toBe(false); + }); + it('UTF-8 BOM => text(utf-8)', () => { + expect(looksLikeBinaryBytes(b(0xEF, 0xBB, 0xBF, 0x68, 0x69))).toEqual({ binary: false, encoding: 'utf-8' }); + }); + it('UTF-16LE BOM (contains NUL) => text(utf-16le), NOT blocked', () => { + expect(looksLikeBinaryBytes(b(0xFF, 0xFE, 0x68, 0x00, 0x69, 0x00))) + .toEqual({ binary: false, encoding: 'utf-16le' }); + }); + it('UTF-16BE BOM => text(utf-16be)', () => { + expect(looksLikeBinaryBytes(b(0xFE, 0xFF, 0x00, 0x68, 0x00, 0x69))) + .toEqual({ binary: false, encoding: 'utf-16be' }); + }); + it('does not flag a clean 8KB cut through a multibyte char', () => { + const head = Buffer.concat([Buffer.alloc(4095, 0x61), b(0xC3)]); + expect(looksLikeBinaryBytes(head).binary).toBe(false); + }); + it('empty buffer => text', () => { + expect(looksLikeBinaryBytes(Buffer.alloc(0))).toEqual({ binary: false, encoding: 'utf-8' }); + }); +}); + +describe('decodeText', () => { + it('decodes valid utf-8', () => { + expect(decodeText(txt('héllo 😀'), 'utf-8')).toBe('héllo 😀'); + }); + it('returns null on invalid utf-8', () => { + expect(decodeText(b(0x41, 0xC0, 0xC1, 0x42), 'utf-8')).toBeNull(); + }); + it('tolerates a truncated trailing multibyte sequence', () => { + expect(decodeText(Buffer.concat([txt('a'), b(0xC3)]), 'utf-8')).toBe('a'); + }); + it('decodes utf-16le', () => { + expect(decodeText(b(0x68, 0x00, 0x69, 0x00), 'utf-16le')).toBe('hi'); + }); + it('decodes utf-16be (or byte-swap fallback)', () => { + expect(decodeText(b(0x00, 0x68, 0x00, 0x69), 'utf-16be')).toBe('hi'); + }); +}); diff --git a/src/engine/tools/binary-detect.ts b/src/engine/tools/binary-detect.ts new file mode 100644 index 0000000..9386e3e --- /dev/null +++ b/src/engine/tools/binary-detect.ts @@ -0,0 +1,135 @@ +/** + * Shared, pure byte-level binary detector. No file/network IO — callers pass a + * head Buffer (Read reads it from disk, WebFetch streams it from the response). + * + * Detection order (highest precision first): + * 1. magic-byte signature → binary + * 2. UTF-8 BOM → text(utf-8) + * 3. UTF-16 BOM → text(utf-16le|be) + * 4. NUL byte in head → binary + * 5. strict UTF-8 decode failure → binary + * 6. control-char ratio > 0.30 → binary + * 7. otherwise → text(utf-8) + * + * Note: BOMless UTF-16 is treated as binary (NUL bytes trip step 4); only BOM-tagged UTF-16 is recognized as text. + */ + +export const SNIFF_HEAD_BYTES = 8 * 1024; +export const CONTROL_CHAR_RATIO_THRESHOLD = 0.3; + +export type TextEncodingLabel = 'utf-8' | 'utf-16le' | 'utf-16be'; + +export type BinaryVerdict = + | { binary: true; reason: string } + | { binary: false; encoding: TextEncodingLabel }; + +const MAGIC_SIGNATURES: Array<{ reason: string; bytes: number[] }> = [ + { reason: 'magic:ole2', bytes: [0xd0, 0xcf, 0x11, 0xe0, 0xa1, 0xb1, 0x1a, 0xe1] }, + { reason: 'magic:zip', bytes: [0x50, 0x4b, 0x03, 0x04] }, + { reason: 'magic:zip', bytes: [0x50, 0x4b, 0x05, 0x06] }, + { reason: 'magic:zip', bytes: [0x50, 0x4b, 0x07, 0x08] }, + { reason: 'magic:pdf', bytes: [0x25, 0x50, 0x44, 0x46, 0x2d] }, + { reason: 'magic:png', bytes: [0x89, 0x50, 0x4e, 0x47] }, + { reason: 'magic:jpeg', bytes: [0xff, 0xd8, 0xff] }, + { reason: 'magic:gif', bytes: [0x47, 0x49, 0x46, 0x38] }, + { reason: 'magic:gzip', bytes: [0x1f, 0x8b] }, + { reason: 'magic:7z', bytes: [0x37, 0x7a, 0xbc, 0xaf, 0x27, 0x1c] }, + { reason: 'magic:rar', bytes: [0x52, 0x61, 0x72, 0x21] }, + { reason: 'magic:riff', bytes: [0x52, 0x49, 0x46, 0x46] }, +]; + +function startsWith(head: Buffer, sig: number[]): boolean { + if (head.length < sig.length) return false; + for (let i = 0; i < sig.length; i++) { + if (head[i] !== sig[i]) return false; + } + return true; +} + +function matchMagic(head: Buffer): string | null { + for (const { reason, bytes } of MAGIC_SIGNATURES) { + if (startsWith(head, bytes)) return reason; + } + return null; +} + +function controlCharRatio(head: Buffer): number { + if (head.length === 0) return 0; + let ctrl = 0; + for (const byte of head) { + if ( + byte <= 0x08 || + byte === 0x0b || + byte === 0x0c || + (byte >= 0x0e && byte <= 0x1f) || + byte === 0x7f + ) { + ctrl++; + } + } + return ctrl / head.length; +} + +/** + * Strict text decode. Returns the decoded string, or null if the bytes are not + * valid in the given encoding. Uses streaming mode so a multibyte sequence that + * is merely *truncated* at the buffer boundary (e.g. an 8KB / 5MB cut) does not + * count as invalid — only genuinely malformed mid-stream bytes fail. + */ +export function decodeText(buf: Buffer, encoding: TextEncodingLabel): string | null { + if (encoding === 'utf-16be' && !utf16beSupported()) { + // Node small-icu builds lack the 'utf-16be' label. Byte-swap BE→LE and + // decode as utf-16le (a correct recovery, not a guess). + const swapped = Buffer.from(buf); // copy so we don't mutate the caller's buffer + for (let i = 0; i + 1 < swapped.length; i += 2) { + const tmp = swapped[i]; + swapped[i] = swapped[i + 1]; + swapped[i + 1] = tmp; + } + return decodeStrict(swapped, 'utf-16le'); + } + return decodeStrict(buf, encoding); +} + +function decodeStrict(buf: Buffer, encoding: TextEncodingLabel): string | null { + try { + return new TextDecoder(encoding, { fatal: true }).decode(buf, { stream: true }); + } catch { + return null; + } +} + +let utf16beSupportedCache: boolean | null = null; +function utf16beSupported(): boolean { + if (utf16beSupportedCache === null) { + try { + new TextDecoder('utf-16be'); + utf16beSupportedCache = true; + } catch { + utf16beSupportedCache = false; + } + } + return utf16beSupportedCache; +} + +export function looksLikeBinaryBytes(head: Buffer): BinaryVerdict { + const magic = matchMagic(head); + if (magic) return { binary: true, reason: magic }; + + if (startsWith(head, [0xef, 0xbb, 0xbf])) return { binary: false, encoding: 'utf-8' }; + if (startsWith(head, [0xff, 0xfe])) return { binary: false, encoding: 'utf-16le' }; + if (startsWith(head, [0xfe, 0xff])) return { binary: false, encoding: 'utf-16be' }; + + if (head.includes(0)) return { binary: true, reason: 'nul-byte' }; + + if (head.length > 0 && decodeText(head, 'utf-8') === null) { + return { binary: true, reason: 'utf8-decode-fail' }; + } + + const ratio = controlCharRatio(head); + if (ratio > CONTROL_CHAR_RATIO_THRESHOLD) { + return { binary: true, reason: `control-ratio:${ratio.toFixed(2)}` }; + } + + return { binary: false, encoding: 'utf-8' }; +} diff --git a/src/engine/tools/slide/layouts.test.ts b/src/engine/tools/slide/layouts.test.ts index 04306cc..9552acb 100644 --- a/src/engine/tools/slide/layouts.test.ts +++ b/src/engine/tools/slide/layouts.test.ts @@ -3,7 +3,7 @@ import PptxGenJS from 'pptxgenjs'; import * as fs from 'fs'; import * as path from 'path'; import { tmpdir } from 'os'; -import { renderSlide } from './layouts.js'; +import { renderSlide, toBulletItems } from './layouts.js'; import { resolveTheme } from './themes.js'; function newDeck() { @@ -50,6 +50,17 @@ describe('layouts: title / section / bullets / closing', () => { ).not.toThrow(); }); + it('renders object bullets without throwing', () => { + const p = newDeck(); + const s = p.addSlide(); + expect(() => + renderSlide(s, 'bullets', { + title: 'Mixed', + bullets: ['plain', { text: 'parent', bullets: [{ text: 'child', bold: true }, 'child2'] }], + }, theme, { index: 2, total: 5 }), + ).not.toThrow(); + }); + it('renders closing without throwing', () => { const p = newDeck(); const s = p.addSlide(); @@ -59,6 +70,47 @@ describe('layouts: title / section / bullets / closing', () => { }); }); +describe('toBulletItems', () => { + it('passes plain strings through with the bullet glyph', () => { + const items = toBulletItems(['a', 'b']); + expect(items.map(i => i.text)).toEqual(['a', 'b']); + expect(items[0].options.bullet.code).toBe('25CF'); + }); + + it('extracts the text field from object bullets instead of "[object Object]"', () => { + const items = toBulletItems([{ text: 'hello' }, { label: 'world' }, { title: 'z' }]); + expect(items.map(i => i.text)).toEqual(['hello', 'world', 'z']); + expect(items.some(i => i.text.includes('[object Object]'))).toBe(false); + }); + + it('flattens nested sub-bullets at a deeper indentLevel', () => { + const items = toBulletItems([{ text: 'parent', bullets: ['c1', 'c2'] }]); + expect(items.map(i => i.text)).toEqual(['parent', 'c1', 'c2']); + expect(items[0].options.indentLevel).toBeUndefined(); + expect(items[1].options.indentLevel).toBe(1); + expect(items[2].options.indentLevel).toBe(1); + }); + + it('honors bold on object bullets', () => { + const items = toBulletItems([{ text: 'strong', bold: true }, { text: 'normal' }]); + expect(items[0].options.bold).toBe(true); + expect(items[1].options.bold).toBeUndefined(); + }); + + it('renders a parentless sub-bullets object without an empty line', () => { + const items = toBulletItems([{ bullets: ['only-child'] }]); + expect(items.map(i => i.text)).toEqual(['only-child']); + expect(items[0].options.indentLevel).toBe(1); + }); + + it('falls back to JSON (never "[object Object]") for a text-less junk object', () => { + const items = toBulletItems([{ foo: 1 }]); + expect(items).toHaveLength(1); + expect(items[0].text).not.toContain('[object Object]'); + expect(items[0].text).toContain('foo'); + }); +}); + describe('layouts: two-column / image-right / image-left / image-full', () => { const theme = resolveTheme('corporate-blue', {}); diff --git a/src/engine/tools/slide/layouts.ts b/src/engine/tools/slide/layouts.ts index 2b10faf..59a5587 100644 --- a/src/engine/tools/slide/layouts.ts +++ b/src/engine/tools/slide/layouts.ts @@ -14,6 +14,67 @@ const LAYOUTS_WITHOUT_PAGINATION: LayoutName[] = ['title', 'section', 'closing'] type Slide = PptxGenJS.Slide; +const BULLET_CODE = '25CF'; + +interface BulletItem { + text: string; + options: { bullet: { code: string }; indentLevel?: number; bold?: boolean }; +} + +// Extract a display string from one bullet. The documented contract is a plain +// string, but models often pass objects (sub-bullets, emphasis). We pull a +// text-like field instead of coercing the object — `String({...})` would yield +// the literal "[object Object]" in the slide. Returns '' when no text-like +// field exists so the caller can decide on a fallback. +function bulletText(b: unknown): string { + if (typeof b === 'string') return b; + if (typeof b === 'number' || typeof b === 'boolean') return String(b); + if (b && typeof b === 'object' && !Array.isArray(b)) { + const o = b as Record; + for (const k of ['text', 'label', 'title', 'content']) { + if (typeof o[k] === 'string' && o[k]) return o[k] as string; + } + return ''; + } + return b == null ? '' : String(b); +} + +// A nested bullet array on an object bullet, if present. +function subBullets(b: unknown): unknown[] | null { + if (b && typeof b === 'object' && !Array.isArray(b)) { + const o = b as Record; + const sub = o['bullets'] ?? o['children'] ?? o['items'] ?? o['sub']; + if (Array.isArray(sub) && sub.length > 0) return sub; + } + return null; +} + +// Build pptxgenjs bullet items from a (possibly mixed) bullets array. Accepts +// strings, `{ text, bold?, bullets? }` objects, and nested sub-bullets (rendered +// at a deeper indentLevel). Never emits "[object Object]": an object with no +// text-like field falls back to a single JSON line so content is not silently +// lost. +export function toBulletItems(bullets: unknown[], level = 0): BulletItem[] { + const items: BulletItem[] = []; + for (const b of bullets) { + const sub = subBullets(b); + let text = bulletText(b); + if (!text && !sub && b && typeof b === 'object') { + try { text = JSON.stringify(b); } catch { text = ''; } + } + if (text) { + const options: BulletItem['options'] = { bullet: { code: BULLET_CODE } }; + if (level > 0) options.indentLevel = level; + if (b && typeof b === 'object' && (b as Record)['bold'] === true) { + options.bold = true; + } + items.push({ text, options }); + } + if (sub) items.push(...toBulletItems(sub, level + 1)); + } + return items; +} + export function renderSlide( slide: Slide, layout: LayoutName, @@ -125,7 +186,7 @@ function renderSection(slide: Slide, c: Record, theme: Resolved function renderBullets(slide: Slide, c: Record, theme: ResolvedTheme): void { drawTitleBar(slide, String(c['title'] ?? ''), theme); const bullets = (Array.isArray(c['bullets']) ? c['bullets'] : []) as unknown[]; - const items = bullets.map((b) => ({ text: String(b), options: { bullet: { code: '25CF' } } })); + const items = toBulletItems(bullets); slide.addText(items as any, { x: SAFE.x, y: 1.8, w: SAFE.w, h: 4.7, fontSize: theme.body_size, color: stripHash(theme.text), @@ -183,7 +244,7 @@ function renderTwoColumn(slide: Slide, c: Record, theme: Resolv }); } if (bullets.length > 0) { - const items = bullets.map((b) => ({ text: String(b), options: { bullet: { code: '25CF' } } })); + const items = toBulletItems(bullets); slide.addText(items as any, { x, y: 2.5, w: colW, h: 4.0, fontSize: theme.body_size, color: stripHash(theme.text), @@ -218,7 +279,7 @@ function renderImageSide( const body = c['body']; const bullets = Array.isArray(body) ? body as unknown[] : null; if (bullets) { - const items = bullets.map((b) => ({ text: String(b), options: { bullet: { code: '25CF' } } })); + const items = toBulletItems(bullets); slide.addText(items as any, { x: textX, y: blockY, w: textW, h: blockH, fontSize: theme.body_size, color: stripHash(theme.text), diff --git a/src/engine/tools/web.binary.test.ts b/src/engine/tools/web.binary.test.ts new file mode 100644 index 0000000..47d4970 --- /dev/null +++ b/src/engine/tools/web.binary.test.ts @@ -0,0 +1,58 @@ +import { describe, expect, it } from 'vitest'; +import { sniffAndDecodeBody } from './web.js'; + +function fakeResponse(chunks: Uint8Array[]): { body: ReadableStream } { + let i = 0; + const body = new ReadableStream({ + pull(controller) { + if (i < chunks.length) controller.enqueue(chunks[i++]); + else controller.close(); + }, + }); + return { body }; +} +const u8 = (...bytes: number[]) => Uint8Array.from(bytes); +const u8txt = (s: string) => new TextEncoder().encode(s); + +describe('sniffAndDecodeBody', () => { + it('returns text for plain HTML', async () => { + const r = await sniffAndDecodeBody(fakeResponse([u8txt('hi')]) as never); + expect(r).toEqual({ binary: false, text: 'hi', truncated: false }); + }); + it('blocks an OLE2 (.xls) body even split across chunks', async () => { + const r = await sniffAndDecodeBody( + fakeResponse([u8(0xD0, 0xCF, 0x11, 0xE0), u8(0xA1, 0xB1, 0x1A, 0xE1, 0x00)]) as never, + ); + expect(r).toEqual({ binary: true, reason: 'magic:ole2' }); + }); + it('blocks a body with NUL bytes', async () => { + const r = await sniffAndDecodeBody(fakeResponse([u8(0x68, 0x69, 0x00, 0x68)]) as never); + expect(r).toEqual({ binary: true, reason: 'nul-byte' }); + }); + it('caps an oversized text body and marks it truncated', async () => { + const big = u8txt('a'.repeat(6 * 1024 * 1024)); + const r = await sniffAndDecodeBody(fakeResponse([big]) as never); + expect(r.binary).toBe(false); + if (!r.binary) { + expect(r.truncated).toBe(true); + expect(r.text.length).toBeLessThanOrEqual(5 * 1024 * 1024); + } + }); + it('handles an empty body as empty text', async () => { + const r = await sniffAndDecodeBody(fakeResponse([]) as never); + expect(r).toEqual({ binary: false, text: '', truncated: false }); + }); + it('blocks bytes that decode-fail as utf-8 (no NUL, no BOM)', async () => { + const r = await sniffAndDecodeBody(fakeResponse([u8(0x41, 0xC0, 0xC1, 0x80, 0x42)]) as never); + expect(r.binary).toBe(true); + }); + it('passes a UTF-16LE BOM body through as text', async () => { + // FF FE BOM + "hi" in UTF-16LE + const r = await sniffAndDecodeBody(fakeResponse([u8(0xFF, 0xFE, 0x68, 0x00, 0x69, 0x00)]) as never); + expect(r).toEqual({ binary: false, text: 'hi', truncated: false }); + }); + it('blocks a small (<8KB) body via control-char ratio (post-loop re-sniff path)', async () => { + const r = await sniffAndDecodeBody(fakeResponse([u8(0x01, 0x02, 0x03, 0x04, 0x05, 0x41)]) as never); + expect(r.binary).toBe(true); + }); +}); diff --git a/src/engine/tools/web.ts b/src/engine/tools/web.ts index fe54006..e0f7c6a 100644 --- a/src/engine/tools/web.ts +++ b/src/engine/tools/web.ts @@ -8,6 +8,12 @@ import { htmlToText } from './shared/html.js'; import * as fs from 'fs'; import * as path from 'path'; import * as crypto from 'crypto'; +import { + looksLikeBinaryBytes, + decodeText, + SNIFF_HEAD_BYTES, + type BinaryVerdict, +} from './binary-detect.js'; const BINARY_CONTENT_TYPE_PREFIXES = [ 'application/pdf', @@ -20,6 +26,9 @@ const BINARY_CONTENT_TYPE_PREFIXES = [ 'video/', ]; +// WebFetch text body は最大 5MB で打ち切る(巨大 HTML による context 膨張防止) +const MAX_WEBFETCH_BODY_BYTES = 5 * 1024 * 1024; + // --- ツール定義 --- const WEBSEARCH_DEF: ToolDef = { @@ -737,6 +746,66 @@ function formatResults(results: SearchResult[]): string { // --- WebFetch 実装 --- +export type SniffResult = + | { binary: true; reason: string } + | { binary: false; text: string; truncated: boolean }; + +/** + * Stream a fetch Response body, sniff the first SNIFF_HEAD_BYTES for binary + * content, and either block (binary) or strict-decode the (capped) text body. + * Never uses response.text() — that silently produces U+FFFD from binary. + */ +export async function sniffAndDecodeBody( + response: { body: ReadableStream | null }, +): Promise { + const reader = response.body?.getReader(); + if (!reader) return { binary: false, text: '', truncated: false }; + + const chunks: Buffer[] = []; + let total = 0; + let verdict: BinaryVerdict | null = null; + let truncated = false; + + try { + for (;;) { + const { done, value } = await reader.read(); + if (done) break; + if (!value || value.byteLength === 0) continue; + chunks.push(Buffer.from(value)); + total += value.byteLength; + + if (!verdict && total >= SNIFF_HEAD_BYTES) { + const head = Buffer.concat(chunks).subarray(0, SNIFF_HEAD_BYTES); + verdict = looksLikeBinaryBytes(head); + if (verdict.binary) { + await reader.cancel(); + return { binary: true, reason: verdict.reason }; + } + } + + if (total >= MAX_WEBFETCH_BODY_BYTES) { + truncated = true; + await reader.cancel(); + break; + } + } + } finally { + try { reader.releaseLock(); } catch { /* cancel() may or may not have released the lock depending on runtime */ } + } + + let full = Buffer.concat(chunks); + if (truncated) full = full.subarray(0, MAX_WEBFETCH_BODY_BYTES); + + if (!verdict) { + verdict = looksLikeBinaryBytes(full.subarray(0, SNIFF_HEAD_BYTES)); + if (verdict.binary) return { binary: true, reason: verdict.reason }; + } + + const text = decodeText(full, verdict.encoding); + if (text === null) return { binary: true, reason: 'utf8-decode-fail' }; + return { binary: false, text, truncated }; +} + async function executeWebFetch( input: Record, ctx: ToolContext, @@ -848,8 +917,25 @@ async function executeWebFetch( }; } - const html = await response.text(); - const text = htmlToText(html); + const sniffed = await sniffAndDecodeBody(response); + if (sniffed.binary) { + appendWebFetchHistory(ctx, { + timestamp: new Date().toISOString(), + url: rawUrl, + selector, + status: response.status, + contentType, + outcome: 'binary_blocked', + error: `binary content detected (${sniffed.reason})`, + }); + return { + output: `WebFetch blocked binary content from "${rawUrl}" (detected: ${sniffed.reason}). コンテキストに展開していません。DownloadFile で input/ に保存し、ReadExcel/ReadPdf 等で処理してください。`, + isError: true, + }; + } + const text = + htmlToText(sniffed.text) + + (sniffed.truncated ? '\n\n[truncated: body exceeded 5MB]' : ''); // vlmEnabled 時はファーストビューのスクショを並行取得して画像を添付する。 // 失敗時は警告ログのみで WebFetch 自体は成功扱いとする。 diff --git a/src/worker.concurrency.test.ts b/src/worker.concurrency.test.ts new file mode 100644 index 0000000..6de7af5 --- /dev/null +++ b/src/worker.concurrency.test.ts @@ -0,0 +1,100 @@ +import { describe, expect, it, vi } from 'vitest'; +import { Worker } from './worker.js'; +import type { AppConfig } from './config.js'; +import type { Job } from './db/repository.js'; + +function makeConfig(maxConcurrency?: number): AppConfig { + return { + provider: { + model: 'test-model', + workers: [{ id: 'worker-1', endpoint: 'http://localhost:11434/v1', ...(maxConcurrency ? { maxConcurrency } : {}) }], + }, + worktreeDir: '/tmp/worker-test', + concurrency: 1, + maxMovements: 30, + retry: { maxAttempts: 3, backoffSeconds: [60, 300, 900] }, + ask: { maxPerJob: 2 }, + subtasks: { maxDepth: 2 }, + tools: { + searxngUrl: 'http://localhost:8080', visionModel: 'v', visionTimeout: 60, + visionMaxTokens: 1024, webfetchTimeout: 30, websearchTimeout: 15, webfetchAllowedHosts: [], + }, + } as AppConfig; +} +function makeJob(id: string): Job { + return { + id, repo: 'acme/demo', issueNumber: 1, prNumber: null, status: 'running', + pieceName: 'general', requiredRole: 'auto', requiredProfile: 'auto', currentMovement: null, + instruction: 'x', branchName: null, worktreePath: null, attempt: 1, maxAttempts: 3, + nextRetryAt: null, errorSummary: null, resumeMovement: null, askCount: 0, workerId: 'worker-1', + parentJobId: null, subtaskDepth: 0, createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), + } as Job; +} +// Build a worker whose heavy parts (initialize, executeJob) are stubbed; executeJob is gated. +function buildWorker(maxConcurrency?: number) { + const jobs = [makeJob('j1'), makeJob('j2'), makeJob('j3')]; + let idx = 0; + const repo = { + recoverStuckRunningJobs: vi.fn(), + claimNextRetryJob: vi.fn().mockResolvedValue(null), + claimNextJob: vi.fn().mockImplementation(async () => jobs[idx++] ?? null), + updateWorkerNodeHealth: vi.fn().mockResolvedValue(undefined), + }; + const worker = new Worker('worker-1', 'http://localhost:11434/v1', 'm', repo as never, makeConfig(maxConcurrency)); + const started: string[] = []; + const resolvers: Array<() => void> = []; + (worker as never as Record)['initialize'] = vi.fn().mockResolvedValue(true); + (worker as never as Record)['running'] = true; + (worker as never as Record)['executeJob'] = vi.fn().mockImplementation((job: Job) => + new Promise((resolve) => { started.push(job.id); resolvers.push(resolve); })); + const get = (k: string) => (worker as never as Record)[k]; + const call = (k: string) => (get(k) as () => Promise).call(worker); + return { worker, repo, started, resolvers, get, call }; +} +const tick = () => new Promise((r) => setTimeout(r, 0)); + +describe('Worker max_concurrency (A1 slot-based)', () => { + it('runs up to maxConcurrency jobs concurrently and refills a freed slot', async () => { + const w = buildWorker(2); + await w.call('processNext'); + expect(w.started).toEqual(['j1', 'j2']); // 2 slots filled + expect(w.get('inflight')).toBe(2); + expect(w.repo.claimNextJob).toHaveBeenCalledTimes(2); // stopped at max, did not over-claim + + w.resolvers[0](); // j1 finishes + await tick(); + expect(w.get('inflight')).toBe(1); + await w.call('processNext'); // refill + expect(w.started).toEqual(['j1', 'j2', 'j3']); + expect(w.get('inflight')).toBe(2); + }); + + it('is single-flight when maxConcurrency is unset (backward compatible)', async () => { + const w = buildWorker(undefined); + await w.call('processNext'); + expect(w.started).toEqual(['j1']); + expect(w.get('inflight')).toBe(1); + expect(w.repo.claimNextJob).toHaveBeenCalledTimes(1); + }); + + it('waitForCompletion resolves only after all inflight jobs finish', async () => { + const w = buildWorker(2); + await w.call('processNext'); + let done = false; + const wait = (w.worker.waitForCompletion(2000)).then((r) => { done = r; }); + await tick(); + expect(done).toBe(false); + w.resolvers[0](); w.resolvers[1](); await tick(); await tick(); + await wait; + expect(done).toBe(true); + expect(w.get('inflight')).toBe(0); + }); + + it('decrements inflight even if a job throws', async () => { + const w = buildWorker(2); + (w.get('executeJob') as { mockImplementation: (f: unknown) => void }).mockImplementation(async () => { throw new Error('boom'); }); + await w.call('processNext'); + await tick(); + expect(w.get('inflight')).toBe(0); // both threw, counter restored + }); +}); diff --git a/src/worker.ts b/src/worker.ts index 2432cf9..817e0f0 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -325,7 +325,8 @@ export async function maybeEnqueueReflection( export class Worker { private running = false; - private processing = false; + private inflight = 0; + private polling = false; private stopped = false; private pollInterval: ReturnType | null = null; private healthInterval: ReturnType | null = null; @@ -460,7 +461,7 @@ export class Worker { healthy: false, roles: this.getSupportedRoles(), availableModels: [], - inflightJobs: 0, + inflightJobs: this.inflight, maxConcurrency: this.getMaxConcurrency(), lastError: 'disabled by config', }); @@ -509,7 +510,7 @@ export class Worker { healthy: true, roles: this.getSupportedRoles(), availableModels: [...this.availableModels], - inflightJobs: this.processing ? 1 : 0, + inflightJobs: this.inflight, maxConcurrency: this.getMaxConcurrency(), lastError: null, }); @@ -550,7 +551,7 @@ export class Worker { healthy: false, roles: this.getSupportedRoles(), availableModels: [], - inflightJobs: 0, + inflightJobs: this.inflight, maxConcurrency: this.getMaxConcurrency(), lastError: errorMessage, }); @@ -592,22 +593,22 @@ export class Worker { } async waitForCompletion(timeoutMs = 30000): Promise { - if (!this.processing) return true; + if (this.inflight === 0) return true; const start = Date.now(); - while (this.processing && (Date.now() - start) < timeoutMs) { + while (this.inflight > 0 && (Date.now() - start) < timeoutMs) { await new Promise(resolve => setTimeout(resolve, 500)); } - return !this.processing; + return this.inflight === 0; } get id(): string { return this.workerId; } private async processNext(): Promise { - if (!isExecutionWorker(this.getWorkerDef())) return; - if (this.processing || !this.running || this.stopped) return; - this.processing = true; + if (!isExecutionWorker(this.getWorkerDef()) || !this.running || this.stopped) return; + if (this.polling) return; // claim loop is single-flight (prevents over-claim) + this.polling = true; try { - // スタックジョブの watchdog: LLM タイムアウトの2倍を閾値にする + // スタックジョブ watchdog: LLM タイムアウトの2倍を閾値にする try { const staleMinutes = Math.max(20, (this.config.provider.timeoutMinutes ?? 10) * 2); this.repo.recoverStuckRunningJobs(staleMinutes); @@ -616,19 +617,47 @@ export class Worker { } const available = await this.initialize(); - if (!available) { - return; - } + if (!available) return; - // リトライジョブを優先的に取得 - const job = await this.repo.claimNextRetryJob(this.workerId) - ?? await this.repo.claimNextJob(this.workerId); - if (!job) return; - await this.executeJob(job); + const max = this.getMaxConcurrency(); + while (this.inflight < max && this.running && !this.stopped) { + // リトライジョブを優先 + const job = await this.repo.claimNextRetryJob(this.workerId) + ?? await this.repo.claimNextJob(this.workerId); + if (!job) break; + this.inflight++; + void this.runJobTracked(job); // 並行実行: await しない + } } catch (err) { logger.error(`[worker:${this.workerId}] processNext error: ${err}`); } finally { - this.processing = false; + this.polling = false; + } + } + + /** Run one job to completion, always restoring the inflight counter. */ + private async runJobTracked(job: Job): Promise { + try { + await this.executeJob(job); + } catch (err) { + logger.error(`[worker:${this.workerId}] runJobTracked error job=${job.id}: ${err}`); + } finally { + this.inflight--; + await this.reportInflight(); + } + } + + /** Push the live inflight count (and current health) to the worker_nodes row. */ + private async reportInflight(): Promise { + try { + await this.repo.updateWorkerNodeHealth(this.workerId, { + healthy: this.healthy, + lastError: this.lastHealthError, + inflightJobs: this.inflight, + availableModels: [...this.availableModels], + }); + } catch (err) { + logger.warn(`[worker:${this.workerId}] reportInflight failed: ${err}`); } } @@ -827,7 +856,7 @@ export class Worker { await this.repo.updateWorkerNodeHealth(this.workerId, { healthy: this.healthy, lastError: this.lastHealthError, - inflightJobs: 1, + inflightJobs: this.inflight, availableModels: [...this.availableModels], }); @@ -850,7 +879,7 @@ export class Worker { await this.repo.updateWorkerNodeHealth(this.workerId, { healthy: this.healthy, lastError: this.lastHealthError, - inflightJobs: 0, + inflightJobs: this.inflight, availableModels: [...this.availableModels], }); await this.repo.unlockIssue(repoName, issueNumber); @@ -1339,7 +1368,7 @@ export class Worker { await this.repo.updateWorkerNodeHealth(this.workerId, { healthy: this.healthy, lastError: this.lastHealthError, - inflightJobs: 0, + inflightJobs: this.inflight, availableModels: [...this.availableModels], }); await this.repo.unlockIssue(repoName, issueNumber); @@ -1785,7 +1814,7 @@ export class Worker { await this.repo.updateWorkerNodeHealth(this.workerId, { healthy: false, lastError: errorMsg, - inflightJobs: 0, + inflightJobs: this.inflight, availableModels: [], }); await this.repo.updateJob(jobId, { diff --git a/ui/src/components/chat/ChatPane.tsx b/ui/src/components/chat/ChatPane.tsx index 2f1b786..dfc7dd7 100644 --- a/ui/src/components/chat/ChatPane.tsx +++ b/ui/src/components/chat/ChatPane.tsx @@ -37,6 +37,13 @@ export function ChatPane({ task, comments, onSubmit, onCancel, onOpenDetail }: C const [sendError, setSendError] = useState(null); const scrollRef = useRef(null); const fileInputRef = useRef(null); + // Snapshot of comments.length at submit start. We hold "submitting" until + // (a) the new user comment is reflected in the list AND (b) the job is + // visibly busy (= picked up by a worker). Without this, the gap between the + // POST resolving and the worker dispatching the job lets the user fire + // multiple sends in a row. + const submitBaselineRef = useRef(null); + const submitTimeoutRef = useRef | null>(null); const [isAtBottom, setIsAtBottom] = useState(true); const [newMessageCount, setNewMessageCount] = useState(0); const prevCommentCountRef = useRef(comments.length); @@ -90,18 +97,32 @@ export function ChatPane({ task, comments, onSubmit, onCancel, onOpenDetail }: C setAttachments(prev => prev.filter(a => a.name !== name)); }; + const releaseSubmitting = () => { + if (submitTimeoutRef.current) { + clearTimeout(submitTimeoutRef.current); + submitTimeoutRef.current = null; + } + submitBaselineRef.current = null; + setSubmitting(false); + }; + const handleSubmit = async () => { if ((!draft.trim() && attachments.length === 0) || submitting) return; setSendError(null); setSubmitting(true); + submitBaselineRef.current = comments.length; try { await onSubmit(draft, attachments.length > 0 ? attachments : undefined); setDraft(''); setAttachments([]); + // Hold the lock until the agent is visibly responding (see effect below). + // Safety net: if the worker never picks the job up (queue stuck, server + // crash, etc.), release the lock after 10s so the user isn't trapped. + if (submitTimeoutRef.current) clearTimeout(submitTimeoutRef.current); + submitTimeoutRef.current = setTimeout(releaseSubmitting, 10000); } catch (e) { setSendError(e instanceof Error && e.message ? e.message : '送信に失敗しました'); - } finally { - setSubmitting(false); + releaseSubmitting(); } }; @@ -165,6 +186,26 @@ export function ChatPane({ task, comments, onSubmit, onCancel, onOpenDetail }: C const canInterject = jobStatus === 'running' || jobStatus === 'waiting_subtasks'; const inputLocked = jobStatus === 'dispatching'; + // Release the submit lock once the agent is visibly responding: the new user + // comment is reflected in the list AND the job has been picked up by a worker + // (isBusy=true). This bridges the queued->dispatching gap where a stale + // re-enabled send button would otherwise allow a double submit. + useEffect(() => { + if (!submitting) return; + const baseline = submitBaselineRef.current; + if (baseline === null) return; + if (comments.length > baseline && isBusy) { + releaseSubmitting(); + } + }, [submitting, comments.length, isBusy]); + + // Clear the safety-net timeout on unmount. + useEffect(() => { + return () => { + if (submitTimeoutRef.current) clearTimeout(submitTimeoutRef.current); + }; + }, []); + // During an active run, suppress the trailing thinking comment so the // live SSE preview is the single source of truth for in-flight text. // We keep the comment in history (MovementGroup will render it once the @@ -387,7 +428,7 @@ export function ChatPane({ task, comments, onSubmit, onCancel, onOpenDetail }: C />