import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; import { createBackendStatusRegistry, type NodeStatus, type ProbeContext, } from './backend-status-registry.js'; import type { WorkerDef } from '../config.js'; function makeStatus(partial: Partial & { nodeId: string; workerId: string; source: 'direct' | 'proxy' }): NodeStatus { return { online: true, busy: false, busySlots: 0, totalSlots: 1, loadedModel: null, throughputTps: null, lastSeen: '2026-05-18T00:00:00.000Z', ...partial, }; } function fixedClock(): () => string { return () => '2026-05-18T00:00:00.000Z'; } describe('createBackendStatusRegistry', () => { beforeEach(() => { vi.useFakeTimers(); }); afterEach(() => { vi.useRealTimers(); }); it('probes direct workers on start and exposes the snapshot via getAll', async () => { const workers: WorkerDef[] = [{ id: 'w1', endpoint: 'http://w1', model: 'qwen' }]; const probeDirect = vi.fn().mockResolvedValue(makeStatus({ nodeId: 'w1', workerId: 'w1', source: 'direct', loadedModel: 'qwen', })); const reg = createBackendStatusRegistry({ getWorkers: () => workers, probeDirect, probeProxy: vi.fn(), pollIntervalMs: 60_000, now: fixedClock(), }); reg.start(); await reg.refresh(); const snap = reg.getAll(); expect(snap).toHaveLength(1); expect(snap[0]!.nodeId).toBe('w1'); expect(snap[0]!.loadedModel).toBe('qwen'); await reg.stop(); }); it('expands proxy workers into multiple backends', async () => { const workers: WorkerDef[] = [{ id: 'pool', endpoint: 'http://litellm', proxy: true }]; const probeProxy = vi.fn().mockResolvedValue([ makeStatus({ nodeId: 'gpu-a', workerId: 'pool', source: 'proxy', loadedModel: 'qwen3:8b' }), makeStatus({ nodeId: 'gpu-b', workerId: 'pool', source: 'proxy', loadedModel: 'qwen3:32b' }), ]); const reg = createBackendStatusRegistry({ getWorkers: () => workers, probeDirect: vi.fn(), probeProxy, pollIntervalMs: 60_000, now: fixedClock(), }); reg.start(); await reg.refresh(); const snap = reg.getAll(); expect(snap.map(s => s.nodeId).sort()).toEqual(['gpu-a', 'gpu-b']); expect(snap.every(s => s.workerId === 'pool')).toBe(true); await reg.stop(); }); it('isolates probe failures: one node failing does not affect others', async () => { const workers: WorkerDef[] = [ { id: 'w1', endpoint: 'http://w1' }, { id: 'w2', endpoint: 'http://w2' }, ]; const probeDirect = vi.fn().mockImplementation(async (w: WorkerDef) => { if (w.id === 'w1') throw new Error('boom'); return makeStatus({ nodeId: 'w2', workerId: 'w2', source: 'direct' }); }); const reg = createBackendStatusRegistry({ getWorkers: () => workers, probeDirect, probeProxy: vi.fn(), pollIntervalMs: 60_000, now: fixedClock(), }); reg.start(); await reg.refresh(); const snap = reg.getAll(); expect(snap).toHaveLength(2); const w1 = snap.find(s => s.nodeId === 'w1')!; const w2 = snap.find(s => s.nodeId === 'w2')!; expect(w1.online).toBe(false); expect(w1.lastProbeError).toBe('boom'); expect(w2.online).toBe(true); expect(w2.lastProbeError).toBeUndefined(); await reg.stop(); }); it('subscribe() delivers current snapshot synchronously and on each tick', async () => { const workers: WorkerDef[] = [{ id: 'w1', endpoint: 'http://w1' }]; let count = 0; const probeDirect = vi.fn().mockImplementation(async () => { count++; return makeStatus({ nodeId: 'w1', workerId: 'w1', source: 'direct', busySlots: count }); }); const reg = createBackendStatusRegistry({ getWorkers: () => workers, probeDirect, probeProxy: vi.fn(), pollIntervalMs: 60_000, now: fixedClock(), }); reg.start(); await reg.refresh(); const seen: number[] = []; const unsub = reg.subscribe(snap => { seen.push(snap[0]?.busySlots ?? -1); }); // Synchronous delivery expect(seen).toEqual([1]); await reg.refresh(); expect(seen).toEqual([1, 2]); unsub(); await reg.refresh(); // After unsubscribe, no further deliveries expect(seen).toEqual([1, 2]); await reg.stop(); }); it('skips overlapping ticks rather than stacking', async () => { const workers: WorkerDef[] = [{ id: 'w1', endpoint: 'http://w1' }]; let resolveFirst: (() => void) | null = null; const probeDirect = vi.fn().mockImplementation(() => new Promise(resolve => { resolveFirst = () => resolve(makeStatus({ nodeId: 'w1', workerId: 'w1', source: 'direct' })); })); const reg = createBackendStatusRegistry({ getWorkers: () => workers, probeDirect, probeProxy: vi.fn(), pollIntervalMs: 1000, now: fixedClock(), }); reg.start(); // Two parallel refresh calls should share the same inflight const a = reg.refresh(); const b = reg.refresh(); resolveFirst!(); await Promise.all([a, b]); expect(probeDirect).toHaveBeenCalledTimes(1); await reg.stop(); }); it('respects maxConcurrency when probing many workers', async () => { vi.useRealTimers(); const workers: WorkerDef[] = Array.from({ length: 6 }, (_, i) => ({ id: `w${i}`, endpoint: `http://w${i}` })); let active = 0; let maxActive = 0; const probeDirect = vi.fn().mockImplementation(async (w: WorkerDef) => { active++; maxActive = Math.max(maxActive, active); await new Promise(r => setTimeout(r, 5)); active--; return makeStatus({ nodeId: w.id, workerId: w.id, source: 'direct' }); }); const reg = createBackendStatusRegistry({ getWorkers: () => workers, probeDirect, probeProxy: vi.fn(), pollIntervalMs: 60_000, maxConcurrency: 2, now: fixedClock(), }); reg.start(); await reg.refresh(); expect(maxActive).toBeLessThanOrEqual(2); expect(reg.getAll()).toHaveLength(6); await reg.stop(); }); it('stop() aborts in-flight probes and resolves promptly (no shutdown hang)', async () => { vi.useRealTimers(); const workers: WorkerDef[] = [{ id: 'w1', endpoint: 'http://w1' }]; // Probe that only resolves when its external AbortSignal fires — // simulates an upstream that would otherwise wedge until per-probe // timeout (3s in prod). let aborts = 0; const probeDirect = vi.fn().mockImplementation((_w: WorkerDef, ctx?: ProbeContext) => new Promise((_resolve, reject) => { const sig = ctx?.signal; if (!sig) { reject(new Error('test expected a signal')); return; } sig.addEventListener('abort', () => { aborts++; reject(new Error('aborted')); }, { once: true }); })); const reg = createBackendStatusRegistry({ getWorkers: () => workers, probeDirect, probeProxy: vi.fn(), pollIntervalMs: 60_000, now: () => '2026-05-18T00:00:00.000Z', }); reg.start(); // Don't await refresh — refresh() resolves only after the probe // settles, and we want to confirm stop() drives that settlement. const refreshPromise = reg.refresh().catch(() => { /* expected */ }); // Give the microtask queue a turn so runOnce attaches the abort listener. await new Promise(r => setImmediate(r)); const before = Date.now(); await reg.stop(); const elapsed = Date.now() - before; // stop() should not have waited the full per-probe timeout (3s in // prod, but the test uses no timeout cap — without abort it would // hang forever). 500ms is a generous upper bound. expect(elapsed).toBeLessThan(500); expect(aborts).toBe(1); await refreshPromise; }); it('refresh() shares the inflight cycle when called during the start() initial probe (race regression)', async () => { // Regression for the "scheduleNext vs refresh inflight" race noted // in PR #318 review. start() kicks off an immediate probe and // assigns it to `inflight`; a refresh() call landing before that // probe settles must reuse the same inflight promise rather than // spawning a parallel runOnce — otherwise two probe cycles race to // write `cache` and notify subscribers. vi.useRealTimers(); const workers: WorkerDef[] = [{ id: 'w1', endpoint: 'http://w1' }]; let resolveProbe: ((s: NodeStatus) => void) | null = null; const probeDirect = vi.fn().mockImplementation(() => new Promise(resolve => { // Capture only the FIRST probe's resolver. If refresh() spawned // a second runOnce, this mock would be invoked twice and the // captured resolver would point at the second invocation, // leaving the first cycle hanging — the test would time out. if (!resolveProbe) { resolveProbe = (s) => resolve(s); } else { // A duplicate invocation indicates the race fired; resolve // with a marker so the assertion below catches it instead of // hanging. resolve(makeStatus({ nodeId: 'DUPLICATE', workerId: 'w1', source: 'direct' })); } })); const reg = createBackendStatusRegistry({ getWorkers: () => workers, probeDirect, probeProxy: vi.fn(), pollIntervalMs: 60_000, now: () => '2026-05-18T00:00:00.000Z', }); reg.start(); // refresh() lands while the start()-initiated probe is still in flight. const refreshPromise = reg.refresh(); // Let the runtime schedule both call sites. await new Promise(r => setImmediate(r)); // Exactly one probe must have been issued: the initial start() one, // shared by refresh(). expect(probeDirect).toHaveBeenCalledTimes(1); resolveProbe!(makeStatus({ nodeId: 'w1', workerId: 'w1', source: 'direct' })); await refreshPromise; expect(probeDirect).toHaveBeenCalledTimes(1); expect(reg.getAll().map(s => s.nodeId)).toEqual(['w1']); await reg.stop(); }); describe('dynamic polling cadence', () => { it('uses the active interval when at least one listener is subscribed', async () => { vi.useFakeTimers(); const workers: WorkerDef[] = [{ id: 'w1', endpoint: 'http://w1' }]; const probeDirect = vi.fn().mockResolvedValue(makeStatus({ nodeId: 'w1', workerId: 'w1', source: 'direct' })); const reg = createBackendStatusRegistry({ getWorkers: () => workers, probeDirect, probeProxy: vi.fn(), pollIntervalMs: 5_000, idlePollIntervalMs: 60_000, subscriberActiveWindowMs: 30_000, now: fixedClock(), monotonicNowMs: () => Date.now(), }); reg.start(); // Drain the initial probe so we're sitting at the first // scheduleNext setTimeout. await vi.advanceTimersByTimeAsync(0); await Promise.resolve(); const unsub = reg.subscribe(() => {}); const initialCalls = probeDirect.mock.calls.length; // After 5s the active-band tick should fire. await vi.advanceTimersByTimeAsync(5_001); await Promise.resolve(); expect(probeDirect.mock.calls.length).toBeGreaterThan(initialCalls); unsub(); await reg.stop(); }); it('falls back to the idle interval when no subscribers are active', async () => { vi.useFakeTimers(); const workers: WorkerDef[] = [{ id: 'w1', endpoint: 'http://w1' }]; const probeDirect = vi.fn().mockResolvedValue(makeStatus({ nodeId: 'w1', workerId: 'w1', source: 'direct' })); const reg = createBackendStatusRegistry({ getWorkers: () => workers, probeDirect, probeProxy: vi.fn(), pollIntervalMs: 5_000, idlePollIntervalMs: 60_000, subscriberActiveWindowMs: 30_000, now: fixedClock(), monotonicNowMs: () => Date.now(), }); reg.start(); await vi.advanceTimersByTimeAsync(0); await Promise.resolve(); const before = probeDirect.mock.calls.length; // Advance just past the active interval but well short of idle. // No subscribers ever, so the registry must NOT fire at 5s. await vi.advanceTimersByTimeAsync(10_000); await Promise.resolve(); expect(probeDirect.mock.calls.length).toBe(before); // Now jump past the idle interval — one tick should fire. await vi.advanceTimersByTimeAsync(60_000); await Promise.resolve(); expect(probeDirect.mock.calls.length).toBe(before + 1); await reg.stop(); }); it('noteSubscriberActivity() wakes the registry from idle to active cadence', async () => { vi.useFakeTimers(); const workers: WorkerDef[] = [{ id: 'w1', endpoint: 'http://w1' }]; const probeDirect = vi.fn().mockResolvedValue(makeStatus({ nodeId: 'w1', workerId: 'w1', source: 'direct' })); const reg = createBackendStatusRegistry({ getWorkers: () => workers, probeDirect, probeProxy: vi.fn(), pollIntervalMs: 5_000, idlePollIntervalMs: 60_000, subscriberActiveWindowMs: 30_000, now: fixedClock(), monotonicNowMs: () => Date.now(), }); reg.start(); await vi.advanceTimersByTimeAsync(0); await Promise.resolve(); const before = probeDirect.mock.calls.length; // We're in idle band (no subscribers). Confirm by checking nothing // ticked after 6s (well past active interval). await vi.advanceTimersByTimeAsync(6_000); await Promise.resolve(); expect(probeDirect.mock.calls.length).toBe(before); // Note activity — the next tick should now be on the active band. reg.noteSubscriberActivity!(); await vi.advanceTimersByTimeAsync(5_001); await Promise.resolve(); expect(probeDirect.mock.calls.length).toBe(before + 1); await reg.stop(); }); it('falls back to idle cadence after the active window elapses without activity', async () => { vi.useFakeTimers(); const workers: WorkerDef[] = [{ id: 'w1', endpoint: 'http://w1' }]; const probeDirect = vi.fn().mockResolvedValue(makeStatus({ nodeId: 'w1', workerId: 'w1', source: 'direct' })); const reg = createBackendStatusRegistry({ getWorkers: () => workers, probeDirect, probeProxy: vi.fn(), pollIntervalMs: 5_000, idlePollIntervalMs: 60_000, subscriberActiveWindowMs: 10_000, now: fixedClock(), monotonicNowMs: () => Date.now(), }); reg.start(); await vi.advanceTimersByTimeAsync(0); await Promise.resolve(); reg.noteSubscriberActivity!(); // First active tick at +5s (subscriber window still open). await vi.advanceTimersByTimeAsync(5_001); await Promise.resolve(); const afterFirst = probeDirect.mock.calls.length; // Second active tick fires at +10s (lastSubscriberAt was at t=0; // when this tick was *scheduled* at t=5s the window was still // open, so it ran on active cadence). The cadence decision after // that tick must drop to idle because the window has now closed. await vi.advanceTimersByTimeAsync(5_001); await Promise.resolve(); const afterSecond = probeDirect.mock.calls.length; expect(afterSecond).toBe(afterFirst + 1); // The next scheduled tick is on the idle band (60s). Advance the // full active interval and verify no tick fired. await vi.advanceTimersByTimeAsync(10_000); await Promise.resolve(); expect(probeDirect.mock.calls.length).toBe(afterSecond); // After the idle interval, the next tick fires. await vi.advanceTimersByTimeAsync(60_000); await Promise.resolve(); expect(probeDirect.mock.calls.length).toBe(afterSecond + 1); await reg.stop(); }); }); it('getByNodeId returns the matching status or null', async () => { const workers: WorkerDef[] = [{ id: 'w1', endpoint: 'http://w1' }]; const probeDirect = vi.fn().mockResolvedValue(makeStatus({ nodeId: 'w1', workerId: 'w1', source: 'direct', })); const reg = createBackendStatusRegistry({ getWorkers: () => workers, probeDirect, probeProxy: vi.fn(), pollIntervalMs: 60_000, now: fixedClock(), }); reg.start(); await reg.refresh(); expect(reg.getByNodeId('w1')).not.toBeNull(); expect(reg.getByNodeId('does-not-exist')).toBeNull(); await reg.stop(); }); });