maestro/src/worker-manager.test.ts
oss-sync d95267c4b0
Some checks failed
CI / build-and-test (push) Has been cancelled
sync: update from private repo (6be06e0)
2026-06-10 01:00:05 +00:00

173 lines
6.4 KiB
TypeScript

// src/worker-manager.test.ts
import { describe, it, expect, vi, beforeEach } from 'vitest';
import type { AppConfig, WorkerDef } from './config.js';
// Capture every MockWorker the manager constructs so tests can inspect
// lifecycle (started / stopped / config hot-swaps / in-flight count).
const instances: MockWorker[] = [];
class MockWorker {
started = false;
stopped = false;
configUpdates = 0;
inflight = 0;
config: AppConfig;
constructor(
public id: string,
public endpoint: string,
public model: string | undefined,
public repo: unknown,
config: AppConfig,
) {
this.config = config;
instances.push(this);
}
start(): void { this.started = true; }
stop(): void { this.stopped = true; }
updateConfig(c: AppConfig): void { this.config = c; this.configUpdates++; }
get inflightCount(): number { return this.inflight; }
async waitForCompletion(): Promise<boolean> { return this.inflight === 0; }
setMcpTokenManager(): void {}
setWorkerMetrics(): void {}
setSkillCatalog(): void {}
setPushService(): void {}
setSiblingsAccessor(): void {}
}
vi.mock('./worker.js', () => ({ Worker: MockWorker }));
// Imported after the mock is registered.
const { WorkerManager } = await import('./worker-manager.js');
function def(id: string, extra: Partial<WorkerDef> = {}): WorkerDef {
return { id, endpoint: `http://${id}:8080/v1`, roles: ['auto'], maxConcurrency: 1, ...extra } as WorkerDef;
}
function cfgWith(workers: WorkerDef[], extra: Record<string, unknown> = {}): AppConfig {
return { provider: { workers }, ...extra } as unknown as AppConfig;
}
function makeManager(initial: AppConfig) {
const requeueRunningJobs = vi.fn();
const repo = { requeueRunningJobs } as any;
let current = initial;
const configManager = {
getConfig: () => current,
onConfigChanged: () => {},
setConfig: (c: AppConfig) => { current = c; },
} as any;
const wm = new WorkerManager(repo, configManager);
return { wm, requeueRunningJobs, configManager };
}
beforeEach(() => { instances.length = 0; });
describe('WorkerManager differential rebuild', () => {
it('keeps a busy worker alive (no requeue) when its def is unchanged', async () => {
const cfg1 = cfgWith([def('A'), def('B')], { safety: { maxIterations: 200 } });
const { wm, requeueRunningJobs } = makeManager(cfg1);
wm.start();
const [a, b] = wm.getWorkers() as unknown as MockWorker[];
a.inflight = 1; // A is mid-job
// Only a *global* field changed; worker defs are byte-identical.
const cfg2 = cfgWith(
[structuredClone(def('A')), structuredClone(def('B'))],
{ safety: { maxIterations: 50 } },
);
await wm.rebuild(cfg2);
// The busy worker must NOT be torn down and its job must NOT be requeued.
expect(requeueRunningJobs).not.toHaveBeenCalled();
expect(a.stopped).toBe(false);
expect(b.stopped).toBe(false);
expect(a.configUpdates).toBe(1); // config hot-swapped in place
expect(a.config).toBe(cfg2);
// Same two instances kept — no fresh workers created.
expect(wm.getWorkers()).toEqual([a, b]);
expect(instances).toHaveLength(2);
});
it('retires a changed-def worker without requeue; finishing its in-flight job', async () => {
const cfg1 = cfgWith([def('A'), def('B', { maxConcurrency: 4 })]);
const { wm, requeueRunningJobs } = makeManager(cfg1);
wm.start();
const [, b] = wm.getWorkers() as unknown as MockWorker[];
b.inflight = 1; // B busy
// B's def changes (concurrency 4 -> 2); A unchanged.
const cfg2 = cfgWith([structuredClone(def('A')), def('B', { maxConcurrency: 2 })]);
await wm.rebuild(cfg2);
// Rebuild NEVER requeues — that was the double-execution bug.
expect(requeueRunningJobs).not.toHaveBeenCalled();
// Old B stopped polling (retiring) but is not in the active set...
expect(b.stopped).toBe(true);
const activeIds = (wm.getWorkers() as unknown as MockWorker[]).map(w => w.id);
expect(activeIds.sort()).toEqual(['A', 'B']);
// ...and the active B is a fresh instance (3rd constructed), started.
const freshB = (wm.getWorkers() as unknown as MockWorker[]).find(w => w.id === 'B')!;
expect(freshB).not.toBe(b);
expect(freshB.started).toBe(true);
});
it('removing a worker retires it without requeue', async () => {
const cfg1 = cfgWith([def('A'), def('B')]);
const { wm, requeueRunningJobs } = makeManager(cfg1);
wm.start();
const [, b] = wm.getWorkers() as unknown as MockWorker[];
b.inflight = 1;
const cfg2 = cfgWith([structuredClone(def('A'))]); // B removed
await wm.rebuild(cfg2);
expect(requeueRunningJobs).not.toHaveBeenCalled();
expect(b.stopped).toBe(true);
expect((wm.getWorkers() as unknown as MockWorker[]).map(w => w.id)).toEqual(['A']);
});
it('prunes a retired worker once its in-flight job finishes (on next rebuild)', async () => {
const cfg1 = cfgWith([def('A', { maxConcurrency: 4 })]);
const { wm } = makeManager(cfg1);
wm.start();
const [a] = wm.getWorkers() as unknown as MockWorker[];
a.inflight = 1;
// Change A's def -> A retired (still draining, inflight=1).
const cfg2 = cfgWith([def('A', { maxConcurrency: 2 })]);
await wm.rebuild(cfg2);
expect(a.stopped).toBe(true);
// A's job finishes; a no-op rebuild should prune the retired instance.
a.inflight = 0;
const cfg3 = cfgWith([structuredClone(def('A', { maxConcurrency: 2 }))]);
await wm.rebuild(cfg3);
// Active set has exactly one A; the drained retired instance is gone.
expect((wm.getWorkers() as unknown as MockWorker[]).filter(w => w.id === 'A')).toHaveLength(1);
});
});
describe('WorkerManager shutdown', () => {
it('requeues running jobs only on shutdown when a worker fails to drain', async () => {
const cfg = cfgWith([def('A')]);
const { wm, requeueRunningJobs } = makeManager(cfg);
wm.start();
const [a] = wm.getWorkers() as unknown as MockWorker[];
a.inflight = 1; // never drains -> waitForCompletion returns false
const res = await wm.stop(0);
expect(res.requeued).toContain('A');
expect(requeueRunningJobs).toHaveBeenCalledTimes(1);
});
it('does not requeue on a clean shutdown', async () => {
const cfg = cfgWith([def('A')]);
const { wm, requeueRunningJobs } = makeManager(cfg);
wm.start();
const res = await wm.stop(0);
expect(res.drained).toContain('A');
expect(requeueRunningJobs).not.toHaveBeenCalled();
});
});