From d95267c4b0763c40accabfcd76c050f36338bce9 Mon Sep 17 00:00:00 2001 From: oss-sync Date: Wed, 10 Jun 2026 01:00:05 +0000 Subject: [PATCH] sync: update from private repo (6be06e0) --- src/bridge/auth.test.ts | 11 +++++ src/bridge/auth.ts | 14 +++---- src/db/repository.ts | 43 +++++++++++++++++++ src/worker-manager.test.ts | 1 + src/worker-manager.ts | 4 ++ src/worker.ts | 74 +++++++++++++++++++++++++++++++++ src/worker/idle-routing.test.ts | 44 ++++++++++++++++++++ src/worker/idle-routing.ts | 39 +++++++++++++++++ 8 files changed, 223 insertions(+), 7 deletions(-) create mode 100644 src/worker/idle-routing.test.ts create mode 100644 src/worker/idle-routing.ts diff --git a/src/bridge/auth.test.ts b/src/bridge/auth.test.ts index b0fcf65..b0a9a0e 100644 --- a/src/bridge/auth.test.ts +++ b/src/bridge/auth.test.ts @@ -220,4 +220,15 @@ describe('isProviderActive (primaryProvider restriction)', () => { expect(isProviderActive(c, 'gitea')).toBe(true); // gitea still usable expect(isProviderActive(c, 'google')).toBe(false); // google not configured }); + it('does not throw when providers is entirely absent (local-only config)', () => { + // Saving "local only" from the Settings UI can persist an auth block with + // no providers key at all. The OAuth helpers + strategy registration must + // treat that as "no OAuth provider configured", not crash. + const c = { + sessionSecret: 's', sessionMaxAge: 1, secureCookie: false, adminEmails: [], + } as AuthConfig; + expect(() => isProviderActive(c, 'google')).not.toThrow(); + expect(isProviderActive(c, 'google')).toBe(false); + expect(isProviderActive(c, 'gitea')).toBe(false); + }); }); diff --git a/src/bridge/auth.ts b/src/bridge/auth.ts index 8bf47ed..5fca975 100644 --- a/src/bridge/auth.ts +++ b/src/bridge/auth.ts @@ -67,10 +67,10 @@ export function isProviderConfigured( * An invalid primary (pointing at an unconfigured provider) is ignored. */ export function isProviderActive(authConfig: AuthConfig, kind: 'google' | 'gitea'): boolean { - if (!isProviderConfigured(authConfig.providers[kind], kind)) return false; + if (!isProviderConfigured(authConfig.providers?.[kind], kind)) return false; const primary = authConfig.primaryProvider; - if (primary === 'google' && isProviderConfigured(authConfig.providers.google, 'google')) return kind === 'google'; - if (primary === 'gitea' && isProviderConfigured(authConfig.providers.gitea, 'gitea')) return kind === 'gitea'; + if (primary === 'google' && isProviderConfigured(authConfig.providers?.google, 'google')) return kind === 'google'; + if (primary === 'gitea' && isProviderConfigured(authConfig.providers?.gitea, 'gitea')) return kind === 'gitea'; // primary=local restricts login to local accounts → OAuth providers off. if (primary === 'local' && isLocalEnabled(authConfig)) return false; return true; @@ -132,8 +132,8 @@ export function buildChangePasswordHandler(repo: Repository): RequestHandler { */ function renderLoginPage(authConfig: AuthConfig, branding: LoginBranding = DEFAULT_LOGIN_BRANDING): string { const raw = readFileSync(path.join(__authDirname, 'auth-login.html'), 'utf-8'); - const googleConfigured = isProviderConfigured(authConfig.providers.google, 'google'); - const giteaConfigured = isProviderConfigured(authConfig.providers.gitea, 'gitea'); + const googleConfigured = isProviderConfigured(authConfig.providers?.google, 'google'); + const giteaConfigured = isProviderConfigured(authConfig.providers?.gitea, 'gitea'); const localEnabled = isLocalEnabled(authConfig); const allowSignup = authConfig.local?.allowSignup === true; // Ignore a primaryProvider that points to an unconfigured/disabled provider — @@ -413,7 +413,7 @@ export async function fetchGiteaOrgsForUser( // ── Strategy Registration ───────────────────────────────────────────────────── function registerGoogleStrategy(repo: Repository, authConfig: AuthConfig): void { - const googleConfig = authConfig.providers.google; + const googleConfig = authConfig.providers?.google; if (!isProviderConfigured(googleConfig, 'google')) return; if (!isProviderActive(authConfig, 'google')) return; @@ -445,7 +445,7 @@ function registerGoogleStrategy(repo: Repository, authConfig: AuthConfig): void } function registerGiteaStrategy(repo: Repository, authConfig: AuthConfig): void { - const giteaConfig = authConfig.providers.gitea; + const giteaConfig = authConfig.providers?.gitea; if (!isProviderConfigured(giteaConfig, 'gitea')) return; if (!isProviderActive(authConfig, 'gitea')) return; diff --git a/src/db/repository.ts b/src/db/repository.ts index 4bfe2db..8a87c01 100644 --- a/src/db/repository.ts +++ b/src/db/repository.ts @@ -2005,6 +2005,49 @@ export class Repository { return row ? rowToJob(row) : null; } + /** + * Read-only peek at the next job this worker WOULD claim (retry-priority, + * then oldest queued), without claiming it. Used by the idle-preferring + * claim gate to learn the next job's role before deciding whether to defer + * to an idler sibling. Mirrors the claimNext*Job WHERE clauses exactly. + */ + async peekNextClaimable(workerId: string): Promise { + const retry = this.db.prepare(` + SELECT j.* + FROM jobs j + JOIN worker_nodes w ON w.worker_id = ? + WHERE j.status = 'retry' + AND replace(j.next_retry_at, 'T', ' ') <= datetime('now') + AND w.enabled = 1 + AND w.healthy = 1 + AND instr(w.profile_tags, ',' || j.required_profile || ',') > 0 + AND NOT EXISTS ( + SELECT 1 FROM issue_locks il + WHERE il.repo = j.repo AND il.issue_number = j.issue_number + ) + ORDER BY j.next_retry_at ASC + LIMIT 1 + `).get(workerId) as JobRow | undefined; + if (retry) return rowToJob(retry); + + const queued = this.db.prepare(` + SELECT j.* + FROM jobs j + JOIN worker_nodes w ON w.worker_id = ? + WHERE j.status = 'queued' + AND w.enabled = 1 + AND w.healthy = 1 + AND instr(w.profile_tags, ',' || j.required_profile || ',') > 0 + AND NOT EXISTS ( + SELECT 1 FROM issue_locks il + WHERE il.repo = j.repo AND il.issue_number = j.issue_number + ) + ORDER BY j.created_at ASC + LIMIT 1 + `).get(workerId) as JobRow | undefined; + return queued ? rowToJob(queued) : null; + } + async getJobsByStatus(status: JobStatus): Promise { const rows = this.db .prepare('SELECT * FROM jobs WHERE status = ? ORDER BY created_at ASC') diff --git a/src/worker-manager.test.ts b/src/worker-manager.test.ts index 6b50752..e7335ae 100644 --- a/src/worker-manager.test.ts +++ b/src/worker-manager.test.ts @@ -31,6 +31,7 @@ class MockWorker { setWorkerMetrics(): void {} setSkillCatalog(): void {} setPushService(): void {} + setSiblingsAccessor(): void {} } vi.mock('./worker.js', () => ({ Worker: MockWorker })); diff --git a/src/worker-manager.ts b/src/worker-manager.ts index 748c6c9..fc9235d 100644 --- a/src/worker-manager.ts +++ b/src/worker-manager.ts @@ -240,6 +240,10 @@ export class WorkerManager { /** Build a single worker and wire up the optional collaborators. */ private createWorker(def: WorkerDef, config: AppConfig): Worker { const w = new Worker(def.id, def.endpoint, def.model, this.repo, config); + // Live sibling list for idle-preferring claims. The closure reads + // this.workers, which is reassigned on rebuild — so it always reflects + // the current pool (kept + fresh), excluding retired workers. + w.setSiblingsAccessor(() => this.workers); if (this.mcpTokenManager) w.setMcpTokenManager(this.mcpTokenManager); if (this.workerMetrics) w.setWorkerMetrics(this.workerMetrics); if (this.skillCatalog) w.setSkillCatalog(this.skillCatalog); diff --git a/src/worker.ts b/src/worker.ts index 91496c5..4a0e3fe 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -21,6 +21,7 @@ import { mergeMcpConfig } from './mcp/config.js'; import { NotesService } from './notes/notes-service.js'; import { NotesRepository } from './notes/notes-repository.js'; import { createStickyBackendResolver } from './worker/sticky-backend.js'; +import { pickIdlerIndex } from './worker/idle-routing.js'; import { jobEventBus } from './bridge/job-events.js'; import { normalizeToolNameForMetric } from './metrics/tool-name-allowlist.js'; @@ -334,6 +335,12 @@ export class Worker { private stopped = false; private pollInterval: ReturnType | null = null; private healthInterval: ReturnType | null = null; + /** Live sibling list, injected by WorkerManager for idle-preferring claims. */ + private siblingsAccessor: (() => Worker[]) | null = null; + /** Last initialize() result — whether we'd actually claim if poked. */ + private lastAvailable = false; + /** Job id we deferred to an idler last round (safety net against stuck yields). */ + private lastYieldedJobId: string | null = null; private workerId: string; private endpoint: string; private model: string | undefined; @@ -404,6 +411,50 @@ export class Worker { return this.inflight; } + /** Free execution slots right now (max_concurrency − inflight). */ + public get freeSlots(): number { + return Math.max(0, this.getMaxConcurrency() - this.inflight); + } + + /** True when this worker would actually pick up a job if poked. */ + public get availableForClaim(): boolean { + return this.running && !this.stopped && this.lastAvailable + && isExecutionWorker(this.getWorkerDef()); + } + + /** Whether this worker serves jobs of the given role. */ + public canClaimRole(role: string): boolean { + return this.supportsRole(role); + } + + /** Nudge this worker to poll immediately (hands a yielded job to an idler). */ + public pokePoll(): void { + void this.processNext(); + } + + /** WorkerManager injects the live sibling list so claims prefer idler workers. */ + public setSiblingsAccessor(fn: () => Worker[]): void { + this.siblingsAccessor = fn; + } + + /** + * Find the idlest sibling that has strictly more free slots than us and + * serves `role`. Returns null when we are (tied for) the most free, in which + * case we should claim the job ourselves. + */ + private findIdlerCompetitor(role: string): Worker | null { + const others = (this.siblingsAccessor?.() ?? []).filter((s) => s !== this); + const idx = pickIdlerIndex( + this.freeSlots, + others.map((s) => ({ + freeSlots: s.freeSlots, + availableForClaim: s.availableForClaim, + servesRole: s.canClaimRole(role), + })), + ); + return idx >= 0 ? others[idx]! : null; + } + /** * Fire a V2 push for a job status transition. Fire-and-forget — never * throws and never awaits the underlying queue. Skips silently when @@ -639,10 +690,33 @@ export class Worker { } const available = await this.initialize(); + this.lastAvailable = available; if (!available) return; const max = this.getMaxConcurrency(); while (this.inflight < max && this.running && !this.stopped) { + // Idle-preferring gate (most-free-wins): if a strictly-idler sibling + // serves the next job's role, hand it off (nudge that worker) instead + // of piling on. Safety net: if we already deferred this exact job last + // round and it is still here, the idler didn't take it (unhealthy / + // raced) — claim it ourselves so a job never gets stuck. + // + // Only consult the gate when there are sibling workers to defer to AND + // the repo supports peeking. Single-worker setups and unit tests skip + // it entirely — no extra query, no added latency, original claim timing. + const siblings = this.siblingsAccessor?.(); + if (siblings && siblings.length > 1 && this.repo.peekNextClaimable) { + const peek = await this.repo.peekNextClaimable(this.workerId); + if (peek && peek.id !== this.lastYieldedJobId) { + const idler = this.findIdlerCompetitor(peek.requiredRole); + if (idler) { + this.lastYieldedJobId = peek.id; + idler.pokePoll(); + break; + } + } + this.lastYieldedJobId = null; + } // リトライジョブを優先 const job = await this.repo.claimNextRetryJob(this.workerId) ?? await this.repo.claimNextJob(this.workerId); diff --git a/src/worker/idle-routing.test.ts b/src/worker/idle-routing.test.ts new file mode 100644 index 0000000..58e5e92 --- /dev/null +++ b/src/worker/idle-routing.test.ts @@ -0,0 +1,44 @@ +import { describe, it, expect } from 'vitest'; +import { pickIdlerIndex, type ClaimCandidate } from './idle-routing.js'; + +const c = (freeSlots: number, opts: Partial = {}): ClaimCandidate => ({ + freeSlots, + availableForClaim: opts.availableForClaim ?? true, + servesRole: opts.servesRole ?? true, +}); + +describe('pickIdlerIndex (most-free-wins)', () => { + it('returns -1 when there are no siblings', () => { + expect(pickIdlerIndex(0, [])).toBe(-1); + }); + + it('yields to a strictly-idler sibling (0/n beats a loaded worker)', () => { + // self has 0 free (fully loaded); sibling has 4 free. + expect(pickIdlerIndex(0, [c(4)])).toBe(0); + }); + + it('does not yield on a tie (claims itself)', () => { + expect(pickIdlerIndex(2, [c(2), c(2)])).toBe(-1); + }); + + it('does not yield when the caller is the most free', () => { + expect(pickIdlerIndex(4, [c(1), c(3)])).toBe(-1); + }); + + it('picks the idlest among several stricter competitors', () => { + expect(pickIdlerIndex(1, [c(2), c(5), c(3)])).toBe(1); // 5 free wins + }); + + it('ignores siblings that do not serve the role', () => { + expect(pickIdlerIndex(0, [c(8, { servesRole: false })])).toBe(-1); + }); + + it('ignores unavailable (unhealthy/stopped) siblings', () => { + expect(pickIdlerIndex(0, [c(8, { availableForClaim: false })])).toBe(-1); + }); + + it('skips a non-serving idler but still yields to a serving one', () => { + // idx0: very idle but wrong role; idx1: idle and serves → pick idx1. + expect(pickIdlerIndex(0, [c(9, { servesRole: false }), c(3)])).toBe(1); + }); +}); diff --git a/src/worker/idle-routing.ts b/src/worker/idle-routing.ts new file mode 100644 index 0000000..a8c8a6b --- /dev/null +++ b/src/worker/idle-routing.ts @@ -0,0 +1,39 @@ +/** + * Idle-preferring worker selection helper. + * + * Workers run as in-process instances that each poll the DB for jobs. Without + * coordination, whichever worker's poll timer fires first claims the next job — + * so a busy worker can grab work while an idle sibling sits at 0/n. This helper + * implements the "most-free-wins" rule: before claiming, a worker checks its + * siblings and yields to one that has STRICTLY more free slots (scoped to + * siblings that are available and actually serve the job's role). A 0/n idle + * worker therefore always beats a partially-loaded one. + * + * Returns the index of the idlest qualifying competitor, or -1 when the caller + * is already (tied for) the most free and should claim the job itself. + */ +export interface ClaimCandidate { + /** max_concurrency − inflight for this candidate. */ + freeSlots: number; + /** Running, healthy, enabled — would actually claim if poked. */ + availableForClaim: boolean; + /** Serves the role of the job about to be claimed. */ + servesRole: boolean; +} + +export function pickIdlerIndex( + selfFreeSlots: number, + candidates: readonly ClaimCandidate[], +): number { + let bestIdx = -1; + let bestFree = selfFreeSlots; // a competitor must STRICTLY exceed this to win + for (let i = 0; i < candidates.length; i++) { + const c = candidates[i]; + if (!c.availableForClaim || !c.servesRole) continue; + if (c.freeSlots > bestFree) { + bestIdx = i; + bestFree = c.freeSlots; + } + } + return bestIdx; +}