From 25c087067ad1534d9228c1419d148ccfd5ca2d17 Mon Sep 17 00:00:00 2001 From: oss-sync Date: Wed, 10 Jun 2026 09:09:39 +0000 Subject: [PATCH] sync: update from private repo (c8b6d29) --- src/bridge/auth-login.html | 135 ++++++++++++++++-- src/gateway/router.test.ts | 61 ++++++++ src/gateway/router.ts | 25 +++- src/gateway/stream-proxy.ts | 8 +- src/llm/openai-compat.ts | 18 +++ src/progress/log-format.test.ts | 24 ++++ src/progress/log-format.ts | 5 + src/worker.ts | 61 +++++--- src/worker/sticky-backend.test.ts | 76 +++++----- src/worker/sticky-backend.ts | 91 ++++++------ .../components/activity/ActivityEventCard.tsx | 2 +- ui/src/lib/utils.ts | 15 +- 12 files changed, 400 insertions(+), 121 deletions(-) diff --git a/src/bridge/auth-login.html b/src/bridge/auth-login.html index 50d6170..0832257 100644 --- a/src/bridge/auth-login.html +++ b/src/bridge/auth-login.html @@ -199,6 +199,34 @@ line-height: 1.5; } + .auth-error { + background: #fef2f2; + border: 1px solid #fecaca; + color: #b91c1c; + border-radius: 8px; + padding: 10px 14px; + font-size: 0.8125rem; + line-height: 1.5; + margin-bottom: 16px; + } + + .view-toggle { + margin-top: 16px; + font-size: 0.8125rem; + color: #64748b; + text-align: center; + } + + .view-toggle a { + color: #4f46e5; + font-weight: 600; + text-decoration: none; + } + + .view-toggle a:hover { + text-decoration: underline; + } + /* RESPONSIVE */ @media (max-width: 768px) { .container { @@ -261,6 +289,20 @@ .footer-note { color: #475569; } + + .auth-error { + background: rgba(239, 68, 68, 0.12); + border-color: rgba(239, 68, 68, 0.35); + color: #fca5a5; + } + + .view-toggle { + color: #94a3b8; + } + + .view-toggle a { + color: #818cf8; + } } @@ -291,9 +333,12 @@
+ diff --git a/src/gateway/router.test.ts b/src/gateway/router.test.ts index 9360e23..0188a10 100644 --- a/src/gateway/router.test.ts +++ b/src/gateway/router.test.ts @@ -104,6 +104,67 @@ describe('createRouter.pick', () => { expect(counts).toEqual({ a: 2, b: 2, c: 2 }); }); + // ── Sticky routing (x-aao-preferred-backend) ──────────────────────────── + + it('honors the preferred backend even when another is idler', () => { + const r = createRouter({ + getBackends: () => [bk('a', 'qwen3:8b'), bk('b', 'qwen3:8b')], + registry: makeRegistry([ + status('a', true, 3, 4), // busier, but preferred (KV cache lives here) + status('b', true, 0, 4), // idler + ]), + }); + expect(r.pick('qwen3:8b', 'a')?.id).toBe('a'); + }); + + it('falls back to normal ranking when the preferred backend is saturated', () => { + const r = createRouter({ + getBackends: () => [bk('a', 'qwen3:8b'), bk('b', 'qwen3:8b')], + registry: makeRegistry([ + status('a', true, 4, 4), // preferred but full + status('b', true, 1, 4), + ]), + }); + expect(r.pick('qwen3:8b', 'a')?.id).toBe('b'); + }); + + it('falls back when the preferred backend is offline', () => { + const r = createRouter({ + getBackends: () => [bk('a', 'qwen3:8b'), bk('b', 'qwen3:8b')], + registry: makeRegistry([ + status('a', false, 0, 4), + status('b', true, 1, 4), + ]), + }); + expect(r.pick('qwen3:8b', 'a')?.id).toBe('b'); + }); + + it('ignores a preferred backend that does not serve the requested model/role', () => { + const r = createRouter({ + getBackends: () => [bk('a', 'qwen3:8b'), bk('x', 'llama:70b')], + registry: makeRegistry([ + status('a', true, 2, 4), + status('x', true, 0, 4), // idle but wrong model — must not leak across pools + ]), + }); + expect(r.pick('qwen3:8b', 'x')?.id).toBe('a'); + }); + + it('preferred pick still reserves an inflight slot', () => { + const inflight = createBackendInflightCounter(); + const r = createRouter({ + getBackends: () => [bk('a', 'qwen3:8b', 2)], + registry: makeRegistry([status('a', true, 0, 2)]), + inflight, + }); + expect(r.pick('qwen3:8b', 'a')?.id).toBe('a'); + expect(inflight.get('a')).toBe(1); + expect(r.pick('qwen3:8b', 'a')?.id).toBe('a'); + expect(inflight.get('a')).toBe(2); + // Saturated by reservations → preferred no longer admitted → null + expect(r.pick('qwen3:8b', 'a')).toBeNull(); + }); + it('treats registry-cold backends as idle (avoids startup outage)', () => { // Registry has nothing yet — first request still routes. const r = createRouter({ diff --git a/src/gateway/router.ts b/src/gateway/router.ts index f73d1f2..880babc 100644 --- a/src/gateway/router.ts +++ b/src/gateway/router.ts @@ -44,7 +44,13 @@ export interface Router { * backend matches the model OR all matching backends are * offline/saturated. */ - pick(model: string): GatewayBackendConfig | null; + /** + * Pick a backend for the routing key. `preferredBackendId` is the sticky + * routing hint (x-aao-preferred-backend): honored when that backend is an + * admitted candidate (right role/model, online, free capacity), otherwise + * the normal least-busy ranking applies. + */ + pick(model: string, preferredBackendId?: string | null): GatewayBackendConfig | null; /** * Inspection helper — returns the registry-augmented view of every * configured backend (regardless of model). Useful for /v1/models @@ -118,7 +124,7 @@ export function createRouter(deps: CreateRouterDeps): Router { } return { - pick(model: string): GatewayBackendConfig | null { + pick(model: string, preferredBackendId?: string | null): GatewayBackendConfig | null { // Note registry activity so the polling cadence tightens while // we're actively routing. Without this, a single-page burst of // requests would still see the idle 30s cadence. @@ -190,6 +196,21 @@ export function createRouter(deps: CreateRouterDeps): Router { const candidates = warm.length > 0 ? warm : cold; if (candidates.length === 0) return null; + // Sticky routing (x-aao-preferred-backend): if the client asks for a + // specific backend and that backend is an ADMITTED candidate (right + // role/model, online, has free capacity), serve it without ranking — + // the job's KV cache lives there, so a least-busy re-pick would trade + // a large prefill cost for a small load-balance win. When the + // preferred backend is saturated or offline it simply isn't in the + // candidate list and we fall through to the normal ranking. + if (preferredBackendId) { + const preferred = candidates.find(c => c.backend.id === preferredBackendId); + if (preferred) { + deps.inflight?.inc(preferred.backend.id); + return preferred.backend; + } + } + // Stable ascending sort by ratio. candidates.sort((a, b) => a.ratio - b.ratio); const bestRatio = candidates[0]!.ratio; diff --git a/src/gateway/stream-proxy.ts b/src/gateway/stream-proxy.ts index f2e4656..ca4d1d6 100644 --- a/src/gateway/stream-proxy.ts +++ b/src/gateway/stream-proxy.ts @@ -297,7 +297,13 @@ export function buildChatCompletionsHandler(deps: StreamProxyDeps) { return; } - const backend = deps.router.pick(model); + // Sticky routing hint from the worker (see router.pick): prefer the + // backend that already holds this job's KV cache when it has capacity. + const rawPreferred = req.headers['x-aao-preferred-backend']; + const preferredBackendId = typeof rawPreferred === 'string' && rawPreferred.trim().length > 0 + ? rawPreferred.trim() + : null; + const backend = deps.router.pick(model, preferredBackendId); if (!backend) { emitRequestMetric(deps.metrics, { team: team0, backend: 'none', model, status: 'no_backend', durationMs: 0, diff --git a/src/llm/openai-compat.ts b/src/llm/openai-compat.ts index 112935e..6547a3d 100644 --- a/src/llm/openai-compat.ts +++ b/src/llm/openai-compat.ts @@ -281,6 +281,17 @@ export class OpenAICompatClient { return `Request timed out (${mins} minutes)`; } + /** + * Backend the next request should prefer (gateway sticky routing for + * KV-cache reuse). Updated by the worker whenever the resolved backend + * changes; per-client so concurrent jobs never share affinity. + */ + private preferredBackendId: string | null = null; + + setPreferredBackendId(backendId: string | null): void { + this.preferredBackendId = backendId; + } + async *chat(messages: Message[], tools?: ToolDef[], externalSignal?: AbortSignal): AsyncGenerator { const controller = new AbortController(); // アイドルタイムアウト: チャンク受信のたびにリセットされる @@ -308,6 +319,13 @@ export class OpenAICompatClient { if (this.apiKey) { headers['Authorization'] = `Bearer ${this.apiKey}`; } + // Sticky routing hint: ask the gateway to keep serving this job from + // the backend that already holds its KV cache. The gateway only honors + // it while that backend is online with free capacity; otherwise it + // re-routes normally. Direct (non-proxy) backends ignore the header. + if (this.preferredBackendId) { + headers['x-aao-preferred-backend'] = this.preferredBackendId; + } const body: Record = { messages, diff --git a/src/progress/log-format.test.ts b/src/progress/log-format.test.ts index 68bb9b6..d9251f9 100644 --- a/src/progress/log-format.test.ts +++ b/src/progress/log-format.test.ts @@ -12,6 +12,7 @@ describe('activity log format', () => { expect(parseActivityLogMetadata('[2026-03-13T00:00:00.000Z] [worker:worker-074] [mode:fast] Read: {"file_path":"input/a.png"}')).toEqual({ workerId: 'worker-074', mode: 'fast', + backendId: null, }); }); @@ -20,6 +21,7 @@ describe('activity log format', () => { expect(parseActivityLogMetadata('[2026-03-13T00:00:00.000Z] final: completed')).toEqual({ workerId: null, mode: null, + backendId: null, }); }); @@ -27,6 +29,7 @@ describe('activity log format', () => { expect(parseActivityLogMetadata('[2026-03-13T00:00:00.000Z] [worker:worker-148] [mode:quality] [execute] preview: checked draft')).toEqual({ workerId: 'worker-148', mode: 'quality', + backendId: null, }); }); @@ -54,3 +57,24 @@ describe('activity log format', () => { expect(formatDuration(Number.POSITIVE_INFINITY)).toBe('?'); }); }); + +describe('backend tag', () => { + it('formats and parses [backend:...] alongside worker/mode', () => { + const line = formatActivityLogEntry('LLM call done', { + workerId: 'aao-gateway', + mode: 'quality', + backendId: 'gpu-rtx-a', + }); + expect(line).toContain('[worker:aao-gateway]'); + expect(line).toContain('[backend:gpu-rtx-a]'); + const parsed = parseActivityLogMetadata(line); + expect(parsed.workerId).toBe('aao-gateway'); + expect(parsed.backendId).toBe('gpu-rtx-a'); + }); + + it('omits the backend tag when backendId is unset', () => { + const line = formatActivityLogEntry('entry', { workerId: 'w1', mode: 'auto' }); + expect(line).not.toContain('[backend:'); + expect(parseActivityLogMetadata(line).backendId).toBeNull(); + }); +}); diff --git a/src/progress/log-format.ts b/src/progress/log-format.ts index 4113901..fad8005 100644 --- a/src/progress/log-format.ts +++ b/src/progress/log-format.ts @@ -1,6 +1,8 @@ export interface ActivityLogMetadata { workerId?: string | null; mode?: string | null; + /** Physical backend behind a proxy worker (gateway-resolved), if known. */ + backendId?: string | null; } const TOOL_SUMMARY_KEYS = [ @@ -24,6 +26,7 @@ export function formatActivityLogMetadata(metadata?: ActivityLogMetadata): strin const segments: string[] = []; if (metadata.workerId) segments.push(`[worker:${metadata.workerId}]`); if (metadata.mode) segments.push(`[mode:${metadata.mode}]`); + if (metadata.backendId) segments.push(`[backend:${metadata.backendId}]`); return segments.join(' '); } @@ -35,9 +38,11 @@ export function formatActivityLogEntry(entry: string, metadata?: ActivityLogMeta export function parseActivityLogMetadata(line: string): ActivityLogMetadata { const workerMatch = /\[worker:([^\]]+)\]/.exec(line); const modeMatch = /\[mode:([^\]]+)\]/.exec(line); + const backendMatch = /\[backend:([^\]]+)\]/.exec(line); return { workerId: workerMatch?.[1] ?? null, mode: modeMatch?.[1] ?? null, + backendId: backendMatch?.[1] ?? null, }; } diff --git a/src/worker.ts b/src/worker.ts index 4a0e3fe..b98a9f3 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -1164,10 +1164,12 @@ export class Worker { isLocalTask, localTaskId, workspacePath, - // Seed the sticky-backend guard with whatever was already persisted + // Seed the backend tracker with whatever was already persisted // for this job (e.g. on retry / resume from ASK). Only matters for // proxy workers; direct workers never produce a backend event. isProxyWorker ? (job.lastBackendId ?? null) : null, + llmClient, + logMetadata, ); // 開始コメント @@ -1500,24 +1502,34 @@ export class Worker { localTaskId: number | null, workspacePath: string, /** - * Initial value of jobs.last_backend_id from the DB. Used to seed the - * sticky guard so callers don't repeatedly write the same value on - * every LLM iteration. Falsy/null = no backend resolved yet. + * Initial value of jobs.last_backend_id from the DB. Seeds the backend + * tracker (and the sticky-routing hint) so a resumed/retried job goes + * back to the backend that already holds its KV cache. + * Falsy/null = no backend resolved yet. */ initialLastBackendId: string | null = null, + /** LLM client of this job — receives the sticky-routing hint per switch. */ + llmClient?: { setPreferredBackendId(backendId: string | null): void }, + /** + * The reporter's metadata object (shared by reference): mutating + * `backendId` here makes every subsequent activity.log line carry + * `[backend:...]` so the Progress tab can show the physical backend + * behind a proxy worker. + */ + logMetadata?: ActivityLogMetadata, ): PieceRunCallbacks { let movementStartTime = Date.now(); const toolUsageCounts = new Map(); - // Sticky-backend per design Open Question #3: take the first proxy - // backend the job sees and never overwrite it. Subsequent calls that - // happen to land on a different deployment are ignored at this layer - // so the UI Pet doesn't flicker between sprites. The resolver also - // guarantees that if the DB persist fails, the local sticky stays - // unset so the next event can retry (otherwise a transient DB error - // would orphan the worker → backend mapping for the lifetime of the - // job). See src/worker/sticky-backend.ts. + // Backend tracker (follow-current semantics, 2026-06): persists + // jobs.last_backend_id whenever the resolved backend CHANGES so the UI + // (pet, badges) follows where the job actually runs. Switches are rare + // because the gateway honors the x-aao-preferred-backend sticky hint + // (KV-cache reuse) — they only happen when the preferred backend goes + // offline or saturates. The tracker still guarantees that a failed DB + // persist leaves the in-memory value unchanged so the next event + // retries. See src/worker/sticky-backend.ts. const workerIdLocal = this.workerId; - const onBackendResolvedHandler = createStickyBackendResolver({ + const backendTracker = createStickyBackendResolver({ initial: initialLastBackendId, persist: (backendId) => this.repo.updateJob(jobId, { lastBackendId: backendId }), logger: { @@ -1528,6 +1540,12 @@ export class Worker { workerId: workerIdLocal, jobId, }); + // Seed the sticky-routing hint + activity-log backend tag from the DB + // value (resume/retry goes straight back to the cache-warm backend). + if (initialLastBackendId) { + llmClient?.setPreferredBackendId(initialLastBackendId); + if (logMetadata) logMetadata.backendId = initialLastBackendId; + } // Phase 3b: local copy of the sticky backend so the LLM-call metric // has a stable backend_id label even before the persist returns. // Direct workers (non-proxy) never fire onBackendResolved, so we @@ -1616,20 +1634,23 @@ export class Worker { } }, onBackendResolved: (info) => { - // Phase 3b: update the sticky backend id used for LLM-call - // metrics. We capture every event (not just the first) so a - // routing change mid-job is reflected in the next iteration's - // counters; the DB-side sticky still preserves the first. + // Phase 3b: update the backend id used for LLM-call metrics. if (info.backendId) { metricBackendId = info.backendId; + // Sticky routing: ask the gateway to keep using this backend on + // the next request (KV-cache affinity). + llmClient?.setPreferredBackendId(info.backendId); + // Tag subsequent activity.log lines with the physical backend so + // the Progress tab shows more than the proxy worker's name. + if (logMetadata) logMetadata.backendId = info.backendId; } // Fire-and-forget: agent-loop's onBackendResolved signature is - // sync (void). The resolver handles persist errors internally; + // sync (void). The tracker handles persist errors internally; // we just attach a final guard to log any unexpected throw. // cacheKey is observed but not persisted at the job level — // Phase B's NodeStatusWidget will track cache hits out-of-band. - onBackendResolvedHandler(info).catch(err => { - logger.warn(`[worker:${this.workerId}] sticky backend resolver threw for job ${jobId}: ${err}`); + backendTracker.onEvent(info).catch(err => { + logger.warn(`[worker:${this.workerId}] backend tracker threw for job ${jobId}: ${err}`); }); }, onMovementComplete: (movementName, result) => { diff --git a/src/worker/sticky-backend.test.ts b/src/worker/sticky-backend.test.ts index 572cc8a..e02af98 100644 --- a/src/worker/sticky-backend.test.ts +++ b/src/worker/sticky-backend.test.ts @@ -13,11 +13,11 @@ function makeLogger(): StickyBackendLogger & { }; } -describe('createStickyBackendResolver', () => { - it('persists the first backend, sets sticky, and logs at info', async () => { +describe('createStickyBackendResolver (follow-current semantics)', () => { + it('persists the first backend, advances current, and logs at info', async () => { const logger = makeLogger(); const persist = vi.fn().mockResolvedValue(undefined); - const resolve = createStickyBackendResolver({ + const tracker = createStickyBackendResolver({ initial: null, persist, logger, @@ -25,18 +25,19 @@ describe('createStickyBackendResolver', () => { jobId: 'j1', }); - await resolve({ backendId: 'gpu-a', cacheKey: null }); + await tracker.onEvent({ backendId: 'gpu-a', cacheKey: null }); expect(persist).toHaveBeenCalledTimes(1); expect(persist).toHaveBeenCalledWith('gpu-a'); + expect(tracker.current()).toBe('gpu-a'); expect(logger.calls.info).toHaveLength(1); expect(logger.calls.info[0]).toContain('gpu-a'); }); - it('short-circuits subsequent events once sticky is set', async () => { + it('follows backend switches: each CHANGE persists; repeats do not', async () => { const logger = makeLogger(); const persist = vi.fn().mockResolvedValue(undefined); - const resolve = createStickyBackendResolver({ + const tracker = createStickyBackendResolver({ initial: null, persist, logger, @@ -44,20 +45,24 @@ describe('createStickyBackendResolver', () => { jobId: 'j1', }); - await resolve({ backendId: 'gpu-a', cacheKey: null }); - await resolve({ backendId: 'gpu-b', cacheKey: 'sha:xyz' }); - await resolve({ backendId: 'gpu-a', cacheKey: null }); + await tracker.onEvent({ backendId: 'gpu-a', cacheKey: null }); + await tracker.onEvent({ backendId: 'gpu-a', cacheKey: 'sha:1' }); // same — no persist + await tracker.onEvent({ backendId: 'gpu-b', cacheKey: null }); // switch — persists + await tracker.onEvent({ backendId: 'gpu-b', cacheKey: null }); // same — no persist - expect(persist).toHaveBeenCalledTimes(1); - // gpu-b ≠ sticky → debug; gpu-a == sticky → no log - expect(logger.calls.debug).toHaveLength(1); - expect(logger.calls.debug[0]).toContain('gpu-b'); + expect(persist).toHaveBeenCalledTimes(2); + expect(persist).toHaveBeenNthCalledWith(1, 'gpu-a'); + expect(persist).toHaveBeenNthCalledWith(2, 'gpu-b'); + expect(tracker.current()).toBe('gpu-b'); + // The switch log mentions both ends + expect(logger.calls.info[1]).toContain('gpu-a'); + expect(logger.calls.info[1]).toContain('gpu-b'); }); - it('honors initial sticky from DB without re-persisting', async () => { + it('honors initial value from DB: same backend does not re-persist, a switch does', async () => { const logger = makeLogger(); const persist = vi.fn().mockResolvedValue(undefined); - const resolve = createStickyBackendResolver({ + const tracker = createStickyBackendResolver({ initial: 'gpu-seed', persist, logger, @@ -65,21 +70,22 @@ describe('createStickyBackendResolver', () => { jobId: 'j1', }); - await resolve({ backendId: 'gpu-other', cacheKey: null }); - await resolve({ backendId: 'gpu-seed', cacheKey: null }); - + expect(tracker.current()).toBe('gpu-seed'); + await tracker.onEvent({ backendId: 'gpu-seed', cacheKey: null }); expect(persist).not.toHaveBeenCalled(); - expect(logger.calls.debug).toHaveLength(1); - expect(logger.calls.debug[0]).toContain('gpu-other'); + + await tracker.onEvent({ backendId: 'gpu-other', cacheKey: null }); + expect(persist).toHaveBeenCalledTimes(1); + expect(tracker.current()).toBe('gpu-other'); }); - it('does NOT set sticky when persist fails — next event retries', async () => { + it('does NOT advance when persist fails — next event retries', async () => { const logger = makeLogger(); const persist = vi .fn() .mockRejectedValueOnce(new Error('SQLITE_BUSY')) .mockResolvedValueOnce(undefined); - const resolve = createStickyBackendResolver({ + const tracker = createStickyBackendResolver({ initial: null, persist, logger, @@ -87,23 +93,19 @@ describe('createStickyBackendResolver', () => { jobId: 'j1', }); - // First call: DB write fails → sticky unset → warn logged - await resolve({ backendId: 'gpu-a', cacheKey: null }); + // First call: DB write fails → current stays null → warn logged + await tracker.onEvent({ backendId: 'gpu-a', cacheKey: null }); expect(persist).toHaveBeenCalledTimes(1); + expect(tracker.current()).toBeNull(); expect(logger.calls.warn).toHaveLength(1); expect(logger.calls.warn[0]).toContain('SQLITE_BUSY'); expect(logger.calls.info).toHaveLength(0); - // Second call: DB write succeeds → sticky set - await resolve({ backendId: 'gpu-b', cacheKey: null }); + // Second call (same backend again): retries because current ≠ backendId + await tracker.onEvent({ backendId: 'gpu-a', cacheKey: null }); expect(persist).toHaveBeenCalledTimes(2); - expect(persist).toHaveBeenLastCalledWith('gpu-b'); + expect(tracker.current()).toBe('gpu-a'); expect(logger.calls.info).toHaveLength(1); - expect(logger.calls.info[0]).toContain('gpu-b'); - - // Third call: sticky is now set → no further persist - await resolve({ backendId: 'gpu-c', cacheKey: null }); - expect(persist).toHaveBeenCalledTimes(2); }); it('retries on every event until persist succeeds (multiple failures)', async () => { @@ -113,7 +115,7 @@ describe('createStickyBackendResolver', () => { .mockRejectedValueOnce(new Error('fail 1')) .mockRejectedValueOnce(new Error('fail 2')) .mockResolvedValueOnce(undefined); - const resolve = createStickyBackendResolver({ + const tracker = createStickyBackendResolver({ initial: null, persist, logger, @@ -121,13 +123,13 @@ describe('createStickyBackendResolver', () => { jobId: 'j1', }); - await resolve({ backendId: 'gpu-a', cacheKey: null }); - await resolve({ backendId: 'gpu-b', cacheKey: null }); - await resolve({ backendId: 'gpu-c', cacheKey: null }); + await tracker.onEvent({ backendId: 'gpu-a', cacheKey: null }); + await tracker.onEvent({ backendId: 'gpu-b', cacheKey: null }); + await tracker.onEvent({ backendId: 'gpu-c', cacheKey: null }); expect(persist).toHaveBeenCalledTimes(3); expect(logger.calls.warn).toHaveLength(2); expect(logger.calls.info).toHaveLength(1); - expect(logger.calls.info[0]).toContain('gpu-c'); + expect(tracker.current()).toBe('gpu-c'); }); }); diff --git a/src/worker/sticky-backend.ts b/src/worker/sticky-backend.ts index 54de4a5..517c979 100644 --- a/src/worker/sticky-backend.ts +++ b/src/worker/sticky-backend.ts @@ -1,24 +1,23 @@ /** - * Sticky-backend resolver for proxy worker jobs. + * Backend tracker for proxy worker jobs. * - * Design (per - * docs/superpowers/specs/2026-05-18-multi-team-gpu-pool-and-node-status-design.md - * Open Question #3 case 1, "最初に確定したら以後 update しない"): + * History: originally "first backend wins" (2026-05-18 design, Open Question + * #3 case 1) to keep the UI pet from flickering while the gateway rebalanced + * every request. As of 2026-06 the gateway honors `x-aao-preferred-backend` + * (client-side sticky routing for KV-cache reuse), so backend switches are + * RARE — they only happen when the preferred backend goes offline or + * saturates. The tracker therefore now follows the CURRENT backend: * - * - For a proxy worker, every LLM call may resolve to a different - * physical backend (LiteLLM rebalances per request). The UI pet should - * not flicker, so we record only the FIRST backend a job sees. - * - Persistence happens via `updateJob({ lastBackendId })`. If that DB - * write FAILS, we must remain in the unset state so the next - * `onBackendResolved` event has a chance to retry. If we set the local - * sticky variable BEFORE persisting, a transient DB error would lose - * the binding permanently for the lifetime of the job (sticky check - * short-circuits all subsequent events) and the UI would never see - * the worker → backend mapping. + * - `jobs.last_backend_id` is updated whenever the resolved backend CHANGES, + * so the UI (pet, badges) tracks where the job actually runs. + * - Persistence happens via `updateJob({ lastBackendId })`. If that DB write + * FAILS, the in-memory value is left unchanged so the next + * `onBackendResolved` event retries the persist (a transient DB error must + * not permanently lose the worker → backend mapping). * - * This module isolates the "set sticky only after persist succeeds" - * invariant from `Worker.buildPieceRunCallbacks`, which already has a - * dozen other concerns and is hard to unit-test in isolation. + * This module isolates the "advance only after persist succeeds" invariant + * from `Worker.buildPieceCallbacks`, which already has a dozen other + * concerns and is hard to unit-test in isolation. */ export interface StickyBackendLogger { @@ -32,16 +31,20 @@ export interface StickyBackendEvent { cacheKey: string | null; } +export interface BackendTracker { + /** The onBackendResolved callback for the agent loop (fire-and-forget safe). */ + onEvent: (event: StickyBackendEvent) => Promise; + /** + * The most recently persisted backend id (or the initial DB value). + * Used as the `x-aao-preferred-backend` hint on the next LLM request. + */ + current: () => string | null; +} + /** - * Build the `onBackendResolved` callback. `persist(backendId)` is the - * DB write (typically `repo.updateJob(jobId, { lastBackendId })`); it - * must reject on failure so we can keep sticky unset for retry. - * - * Returns an async function the agent-loop can call without awaiting — - * errors are caught internally and logged. The function resolves once - * either: (a) sticky was already set and we short-circuited, (b) the - * persist succeeded and sticky is now set, or (c) the persist failed - * and sticky remains unset for the next event to retry. + * Build the backend tracker. `persist(backendId)` is the DB write + * (typically `repo.updateJob(jobId, { lastBackendId })`); it must reject on + * failure so the in-memory value stays put for retry. */ export function createStickyBackendResolver(opts: { initial: string | null; @@ -49,33 +52,31 @@ export function createStickyBackendResolver(opts: { logger: StickyBackendLogger; workerId: string; jobId: string; -}): (event: StickyBackendEvent) => Promise { +}): BackendTracker { const { initial, persist, logger, workerId, jobId } = opts; - let sticky: string | null = initial; + let current: string | null = initial; - return async function onBackendResolved({ backendId, cacheKey }: StickyBackendEvent): Promise { - if (sticky) { - if (sticky !== backendId) { - logger.debug( - `[worker:${workerId}] job ${jobId} backend re-resolved to ${backendId} (sticky=${sticky}, cache=${cacheKey ?? 'miss'}); keeping sticky`, - ); - } - return; - } + async function onEvent({ backendId, cacheKey }: StickyBackendEvent): Promise { + if (current === backendId) return; // unchanged — nothing to persist try { await persist(backendId); - // Only set sticky AFTER persist succeeds. If we set first and persist - // failed, the next event would short-circuit on the sticky check and - // we'd never recover — the UI would render "no backend" forever. - sticky = backendId; + // Only advance AFTER persist succeeds. If we advanced first and the + // persist failed, the next identical event would short-circuit on the + // equality check and the DB would stay stale forever. + const previous = current; + current = backendId; logger.info( - `[worker:${workerId}] job ${jobId} backend resolved: ${backendId} cache=${cacheKey ?? 'miss'}`, + previous + ? `[worker:${workerId}] job ${jobId} backend switched: ${previous} → ${backendId} cache=${cacheKey ?? 'miss'}` + : `[worker:${workerId}] job ${jobId} backend resolved: ${backendId} cache=${cacheKey ?? 'miss'}`, ); } catch (err) { logger.warn( - `[worker:${workerId}] failed to persist lastBackendId for job ${jobId}: ${err} — sticky left unset for retry`, + `[worker:${workerId}] failed to persist lastBackendId for job ${jobId}: ${err} — keeping ${current ?? 'unset'} for retry`, ); - // Intentionally do NOT set sticky. Next event retries. + // Intentionally do NOT advance. Next event retries. } - }; + } + + return { onEvent, current: () => current }; } diff --git a/ui/src/components/activity/ActivityEventCard.tsx b/ui/src/components/activity/ActivityEventCard.tsx index 302450b..2f52ab2 100644 --- a/ui/src/components/activity/ActivityEventCard.tsx +++ b/ui/src/components/activity/ActivityEventCard.tsx @@ -22,7 +22,7 @@ const KIND_COLORS: Record diff --git a/ui/src/lib/utils.ts b/ui/src/lib/utils.ts index 6084656..4a7b390 100644 --- a/ui/src/lib/utils.ts +++ b/ui/src/lib/utils.ts @@ -115,10 +115,17 @@ export interface ActivityEvent { timestamp: string | null; workerId: string | null; mode: string | null; + /** Physical backend behind a proxy worker (from [backend:...] tags). */ + backendId: string | null; } -export function formatActivityMeta(workerId: string | null, mode: string | null): string { - return [workerId ? `worker: ${workerPill(workerId)}` : '', mode ? `mode: ${mode}` : ''].filter(Boolean).join(' · '); +export function formatActivityMeta(workerId: string | null, mode: string | null, backendId?: string | null): string { + return [ + workerId ? `worker: ${workerPill(workerId)}` : '', + // Show the physical backend when it differs from the (proxy) worker name. + backendId && backendId !== workerId ? `backend: ${workerPill(backendId)}` : '', + mode ? `mode: ${mode}` : '', + ].filter(Boolean).join(' · '); } export function parseActivityLog(logText: string): ActivityEvent[] { @@ -130,13 +137,15 @@ export function parseActivityLog(logText: string): ActivityEvent[] { const timestamp = timestampMatch?.[1] ?? null; const workerId = /\[worker:([^\]]+)\]/.exec(rawLine)?.[1] ?? null; const mode = /\[mode:([^\]]+)\]/.exec(rawLine)?.[1] ?? null; + const backendId = /\[backend:([^\]]+)\]/.exec(rawLine)?.[1] ?? null; const line = rawLine .replace(/^\[[^\]]+\]\s+/, '') .replace(/\[worker:[^\]]+\]\s*/g, '') .replace(/\[mode:[^\]]+\]\s*/g, '') + .replace(/\[backend:[^\]]+\]\s*/g, '') .trim(); - const base = { id: `${timestamp ?? 'line'}-${index}`, timestamp, workerId, mode }; + const base = { id: `${timestamp ?? 'line'}-${index}`, timestamp, workerId, mode, backendId }; const movementStart = /^\[([^\]]+)\] (?:start|ステップ開始)$/.exec(line); if (movementStart) {