maestro/src/bridge/server.ts
2026-06-04 03:03:12 +00:00

1201 lines
52 KiB
TypeScript

import express, { Request, Response, NextFunction, RequestHandler } from 'express';
import { existsSync } from 'fs';
import { join, resolve, dirname } from 'path';
import { fileURLToPath } from 'url';
import { Repository, BrowserSessionRepo } from '../db/repository.js';
import { logger } from '../logger.js';
import { loadConfig } from '../config.js';
import { ConfigManager } from '../config-manager.js';
import { mountConfigApi } from './config-api.js';
import { mountPiecesApi } from './pieces-api.js';
import { mountToolsApi } from './tools-api.js';
import { mountSkillsApi } from './skills-api.js';
import { mountNotificationsApi } from './notifications-api.js';
import { mountScheduledTasksApi } from './scheduled-tasks-api.js';
import { mountBrandingApi, resolveBranding } from './branding-api.js';
import { createBrowserApi } from './browser-api.js';
import { createBrowserSessionApi } from './browser-session-api.js';
import { createSubtaskActivityRouter } from './subtask-activity-api.js';
import { SessionManager } from '../engine/browser-session.js';
import { createNovncRouter, setupNovncWebSocketProxy } from './novnc-proxy.js';
import { setSessionManager } from '../engine/tools/browser.js';
import { setUserFolderToolDeps } from '../engine/tools/user-folder.js';
import { setSkillToolDeps } from '../engine/tools/skills.js';
import { setAppDocsDeps } from '../engine/tools/app-docs.js';
import { setupAuth, requireAuth, requireAdmin } from './auth.js';
import { canUserSeeTask } from './visibility.js';
import { mountAdminApi } from './admin-api.js';
import { createAdminGatewayApi } from './admin-gateway-api.js';
import { mountUsersApi } from './users-api.js';
import { mountShareApi } from './share-api.js';
import { mountLocalTasksApi } from './local-tasks-api.js';
import { findPieceFile } from './pieces-api.js';
import { mountLocalFilesApi } from './local-files-api.js';
import { mountSubtaskFilesApi } from './subtask-files-api.js';
import { createUserFolderApi } from './user-folder-api.js';
import { createMemoryApi } from './memory-api.js';
import { createReflectionApi } from './reflection-api.js';
import { createDashboardApi } from './dashboard-api.js';
import { registerShutdownHook, installSignalHandlers } from './shutdown.js';
import { createBackendStatusRegistry, type BackendStatusRegistry } from '../engine/backend-status-registry.js';
import { createWorkerRegistry } from '../metrics/registry.js';
import { createWorkerMetrics, type WorkerMetrics } from '../metrics/worker-metrics.js';
import { createMetricsHandler } from '../metrics/http-handler.js';
import { buildDirectProbe, buildProxyProbe } from '../engine/backend-probes.js';
import { startTrashCleanup } from '../user-folder/trash-cleanup.js';
import { userPiecesDir } from '../user-folder/paths.js';
import { startReflectionRetentionSweep } from '../engine/reflection/retention.js';
import type { AuthConfig } from '../config.js';
import { isKeyConfigured } from '../mcp/crypto.js';
import { createRegistry } from '../mcp/registry.js';
import { createTokenManager } from '../mcp/token-manager.js';
import { createToolCache } from '../mcp/tool-cache.js';
import { createAggregator } from '../mcp/aggregator.js';
import { createMcpClient } from '../mcp/client-factory.js';
import { executeMcpCall } from '../mcp/tool-executor.js';
import { refreshAccessToken } from '../mcp/discovery.js';
import { setMcpAggregator } from '../engine/tools/index.js';
import { setMcpToolLookup } from '../engine/tools/docs.js';
import { setDashboardRepo } from '../engine/tools/dashboard.js';
import { createMcpOauthRouter } from '../mcp/oauth-routes.js';
import { createAdminRouter as createMcpAdminRouter, createUserRouter as createMcpUserRouter, createUserServersRouter as createMcpUserServersRouter } from './mcp-api.js';
import { mergeMcpConfig } from '../mcp/config.js';
import { mergeSshConfig } from '../ssh/config.js';
import {
bootstrapSystemDek,
verifySystemDek,
encryptPrivateKey as sshEncryptPrivateKey,
decryptPrivateKey as sshDecryptPrivateKey,
computeKeyFingerprint as sshComputeKeyFingerprint,
formatPublicKey as sshFormatPublicKey,
generateKeypair as sshGenerateKeypair,
type GeneratedKeyType as SshGeneratedKeyType,
} from '../ssh/crypto.js';
import { createConnectionRepo } from '../ssh/connection-repo.js';
import { createGrantsRepo } from '../ssh/grants-repo.js';
import { createAuditRepo } from '../ssh/audit-repo.js';
import { createAbuseRepo } from '../ssh/abuse-repo.js';
import { createAccessResolver } from '../ssh/access.js';
import { maintenance as sshMaintenance } from '../ssh/maintenance.js';
import { createAdminRateLimiter, FORCE_UNLOCK_LIMIT } from '../ssh/admin-rate-limit.js';
import {
sshTest,
sshExec,
sshUpload,
sshDownload,
openShellChannel,
type ResolvedConnection as SshResolvedConnection,
} from '../ssh/session.js';
import { SessionRegistry } from '../ssh/console-registry.js';
import { createSshUserRouter, createSshAdminRouter, type SshApiDeps } from './ssh-api.js';
import { setSshSubsystem } from '../engine/tools/ssh.js';
import { __setActiveSessionLookup } from '../engine/agent-loop.js';
import {
attachConsoleWs,
createConsoleStatusRouter,
type SimpleTask,
type SimpleUser,
} from './console-ws-api.js';
import { createConsoleAdminRouter } from './console-admin-api.js';
import { NotesRepository } from '../notes/notes-repository.js';
import { NotesService } from '../notes/notes-service.js';
import { createNotesApi } from './notes-api.js';
import { mountGateway, type GatewayMountHandle } from './gateway-mount.js';
import { readGatewayConfig } from '../gateway/config.js';
import { createAdminGatewayStatusRouter } from './admin-gateway-status-api.js';
const __filenameServer = fileURLToPath(import.meta.url);
const __dirnameServer = dirname(__filenameServer);
export interface CoreServerOptions {
repo: Repository;
worktreeDir?: string;
configuredRepos?: string[];
generateTitle?: (body: string) => Promise<string>;
selectPiece?: (body: string, fileNames: string[], userId?: string) => Promise<string>;
configManager?: ConfigManager;
piecesDir?: string;
customPiecesDir?: string;
scheduler?: import('../scheduler.js').Scheduler;
authConfig?: AuthConfig;
/** Directory for user-uploaded branding assets (logos, favicons). Must be gitignored. */
brandingDir?: string;
workerManager?: import('../worker-manager.js').WorkerManager;
/** SkillCatalog instance for the /api/skills CRUD endpoints. */
skillCatalog?: import('../engine/skills.js').SkillCatalog;
/** Notifications V2 — PushService instance. null disables push API endpoints
* (they return 503), but the routes are still mounted so the UI can
* distinguish "feature off" from "endpoint missing". */
pushService?: import('../push-service.js').PushService | null;
/** Notifications V2 — VAPID key store. Required when pushService is set. */
vapidStore?: import('../vapid-store.js').VapidKeyStore | null;
/**
* TCP port the bridge will listen on. Forwarded to admin endpoints
* (e.g. /api/admin/gateway/status) so the UI can label the gateway
* mount with the correct port. Threaded through from
* `startCoreServer(opts, port)` so a non-default port doesn't show
* up as the PORT env-var guess (which can be stale or unset).
*/
listenPort?: number;
}
export interface SshConsoleDeps {
registry: SessionRegistry;
resolveUserFromUpgrade: (req: import('http').IncomingMessage) => Promise<SimpleUser | null>;
resolveTask: (taskId: string, user: SimpleUser) => Promise<SimpleTask | null>;
resolveSshAccess: (
user: SimpleUser,
session: import('../ssh/console-session.js').ConsoleSession,
task: SimpleTask,
) => Promise<boolean>;
denyPatterns: import('./console-ws-api.js').DenyPatternProvider;
}
export function createCoreServer(opts: CoreServerOptions): {
app: express.Application;
browserSessionManager: SessionManager | null;
authenticateUpgrade?: import('./auth.js').UpgradeAuthChecker;
authorizeNovncSession: import('./novnc-proxy.js').NovncSessionAuthorizer;
sshConsole: SshConsoleDeps | null;
/** Singleton NodeStatus cache used by the Side Info Panel's node-status widget. */
backendStatusRegistry: BackendStatusRegistry;
/** Phase 3b: Prometheus metrics handle (null when provider.metrics.enabled=false). */
workerMetrics: WorkerMetrics | null;
/**
* Phase 3c — same-process AAO Gateway mount handle. Null when no
* ConfigManager was supplied (no hot-reload available). Use this to
* read current state for the admin status endpoint.
*/
gatewayMount: GatewayMountHandle | null;
} {
const { repo, worktreeDir } = opts;
const app = express();
// リバースプロキシ背後で secure cookie / X-Forwarded-Proto を正しく処理
if (opts.authConfig?.secureCookie) {
app.set('trust proxy', 1);
}
// Phase 3b: Prometheus /metrics endpoint. Mounted BEFORE any auth
// middleware so the standard Prometheus scrape job works without a
// session cookie. Restrict scrape access at the reverse proxy /
// firewall layer (see help doc). Disabled by
// provider.metrics.enabled=false in config.yaml.
const appConfig = (() => {
try { return opts.configManager?.getConfig() ?? null; } catch { return null; }
})();
const metricsCfg = appConfig?.provider?.metrics;
let workerMetrics: WorkerMetrics | null = null;
// Phase 3c: hoist the promRegistry handle out of the metrics-enabled
// block so the same-process gateway mount below can register its
// gateway_* counters into the same /metrics endpoint (one scrape
// serves both worker and gateway). Null when metrics are disabled.
let sharedPromRegistry: import('prom-client').Registry | null = null;
if (metricsCfg?.enabled !== false) {
const prefix = metricsCfg?.prefix ?? 'aao_worker';
sharedPromRegistry = createWorkerRegistry(prefix);
workerMetrics = createWorkerMetrics(sharedPromRegistry, prefix);
// Phase 3b post-review: gate /metrics on (1) bearer token if set,
// (2) otherwise client-IP allowlist (default localhost). Labels
// like `worker_id` / `backend_id` would leak otherwise.
const metricsAuth = {
bearerToken: metricsCfg?.bearerToken,
allowedHosts: metricsCfg?.allowedHosts,
};
app.get('/metrics', createMetricsHandler(sharedPromRegistry, metricsAuth));
const authMode = metricsAuth.bearerToken
? 'bearer'
: `ip-allowlist=${(metricsAuth.allowedHosts ?? ['127.0.0.1', '::1', 'localhost']).join(',')}`;
logger.info(`[bridge] worker metrics enabled prefix=${prefix} auth=${authMode}`);
} else {
logger.info('[bridge] worker metrics disabled (provider.metrics.enabled=false)');
}
// JSON body parser for Config/Pieces/Memory/Reflection API endpoints
app.use('/api/config', express.json());
app.use('/api/pieces', express.json());
app.use('/api/local/memory', express.json());
app.use('/api/local/reflection', express.json());
// === Auth setup ===
const authActive = !!(opts.authConfig?.providers);
let authenticateUpgrade: import('./auth.js').UpgradeAuthChecker | undefined;
if (authActive) {
const auth = setupAuth(
repo,
opts.authConfig!,
() => {
const b = resolveBranding(opts.configManager);
return { appName: b.appName, loginPageTitle: b.loginPageTitle };
},
);
const { sessionMiddleware, passportInit, passportSession, authRouter } = auth;
authenticateUpgrade = auth.authenticateUpgrade;
// Global session + passport middleware (BEFORE all routes)
app.use(sessionMiddleware);
app.use(passportInit);
app.use(passportSession);
// Auth routes (unauthenticated access allowed)
app.use('/auth', authRouter);
// /api/auth/me endpoint
app.get('/api/auth/me', requireAuth, (req: Request, res: Response) => {
res.json(req.user);
});
// Protect all API routes (except /api/version and /health)
app.use('/api/local', requireAuth);
app.use('/api/repos', requireAuth);
// /api/workers exposes endpoint URLs + proxy backend probing. Without
// auth, unauthenticated callers could (a) enumerate worker endpoints
// and (b) trigger upstream `/v1/models` fetches against any
// attacker-influenced endpoint (SSRF amplifier). Gate behind requireAuth
// — admin-only is unnecessary because the responses already strip
// sensitive fields (apiKey), but anonymous access must be blocked.
app.use('/api/workers', requireAuth);
// Admin-only routes
app.use('/api/config', requireAdmin);
// /api/pieces: any authenticated user can GET (visibility-filtered);
// per-piece write authz (built-in/global-custom → admin, user-custom → owner)
// is enforced inside pieces-api.ts handlers.
app.use('/api/pieces', requireAuth);
// Scheduled tasks: any authenticated user can create/list (visibility-filtered).
// PATCH/DELETE owner-or-admin enforcement lives in the handlers (Task 14).
app.use('/api/scheduled-tasks', requireAuth);
}
// Admin user management API (always mounted; protected by requireAdmin when auth is active)
app.use('/api/admin', express.json());
mountAdminApi(app, repo, authActive);
// AAO Gateway Phase 2a: admin-only CRUD over gateway_virtual_keys.
// Enabled regardless of gateway.enabled so an admin can prep keys
// before flipping the gateway on.
//
// SECURITY: this endpoint mints `sk-aao-*` bearer tokens that grant
// access to the LLM gateway. Unlike the generic admin-api (which
// exposes user-management read paths that are safe to surface in
// auth-disabled local dev), key issuance has direct production
// impact. If we mounted it without `requireAdmin` when auth is off,
// any caller reaching the server could POST to /api/admin/gateway/keys
// and walk away with a valid gateway bearer. Refuse to mount instead
// — operators who want key management MUST configure auth.providers.
if (authActive) {
app.use(
'/api/admin/gateway/keys',
express.json({ limit: '4kb' }),
requireAdmin,
createAdminGatewayApi({
repo,
// Already gated above; pass a passthrough so the router doesn't
// try to double-guard (which would just be a no-op anyway).
requireAdmin: (_req, _res, next) => next(),
getUserId: (req) => {
const u = (req as import('express').Request & {
user?: { id?: string };
}).user;
return u?.id ?? null;
},
// Phase 3a F4: when the admin server runs in the same process
// as the gateway, gateway/bootstrap.ts hangs the shared cache
// off the Repository so mutations invalidate live cache state.
// Cross-process setups omit this; the cache's 5s TTL bounds
// the worst-case stale window.
keyCache: (repo as unknown as { __gatewayKeyCache?: import('../gateway/key-cache.js').KeyCache })
.__gatewayKeyCache,
// Phase 3b post-review: same-process gateway hangs its metrics
// handle on the Repository so admin mutations can drop the
// per-key gauge labels (revoke/rotate/delete). No-op in
// cross-process deploys (the gateway process owns its own
// registry there).
gatewayMetrics: (repo as unknown as { __gatewayMetrics?: import('../metrics/gateway-metrics.js').GatewayMetrics })
.__gatewayMetrics,
}),
);
} else {
logger.warn(
'[admin-gateway] /api/admin/gateway/keys NOT mounted (auth disabled). ' +
'Configure auth.providers in config.yaml to enable virtual key management.',
);
}
// /api/users/me/* routes — current viewer info (gated by requireAuth when auth is active)
mountUsersApi(app, repo, authActive);
// --- Share API (public + authenticated routes) ---
mountShareApi(app, repo);
// Redirect root to UI
app.get('/', (_req: Request, res: Response) => {
res.redirect('/ui');
});
const uiDistPath = resolve(join(__dirnameServer, '../../ui/dist'));
if (existsSync(uiDistPath)) {
app.use('/ui', express.static(uiDistPath));
app.get('/ui/*', (_req, res) => {
res.sendFile(join(uiDistPath, 'index.html'));
});
}
// Version endpoint
app.get('/api/version', async (_req: Request, res: Response) => {
let version = 'dev';
try {
const mod = await import('../generated/version.js');
version = mod.APP_VERSION;
} catch {
try {
const { execSync } = await import('child_process');
version = execSync("TZ=UTC git log -1 --format=%cd --date=format:'%Y%m%d.%H%M%S'", { encoding: 'utf-8' }).trim();
} catch {
// keep 'dev'
}
}
res.json({ version });
});
app.get('/api/repos', (_req: Request, res: Response) => {
try {
const reposFromJobs = repo.getDistinctRepos();
const reposFromConfig = (opts.configuredRepos ?? []).filter(Boolean);
const repos = Array.from(new Set([...reposFromConfig, ...reposFromJobs])).sort();
res.json({ repos });
} catch {
res.status(500).json({ error: 'Failed to fetch repos' });
}
});
// Per-user browser session profile repo (envelope-encrypted storageState).
// Created up-front so APIs that bind a profile to a task (local + scheduled)
// can run an owner-scoped check before persisting.
const sessRepo = new BrowserSessionRepo(repo.getDb());
// Surfaces per-user MCP servers into /api/tools so the Piece allowed_tools
// editor can include them. Populated by the MCP block below when the
// subsystem initialises; remains null otherwise.
let mcpCatalogDeps: import('./tools-api.js').McpCatalogDeps | null = null;
// MCP subsystem (gated on MCP_ENCRYPTION_KEY)
{
if (isKeyConfigured()) {
const mcpConfig = mergeMcpConfig(loadConfig().mcp);
const mcpRegistry = createRegistry(repo.getDb());
const mcpTokenManager = createTokenManager(repo.getDb(), {
doRefresh: async (serverId: string, refreshToken: string) => {
const server = mcpRegistry.getDecrypted(serverId);
if (!server || !server.tokenEndpoint) {
throw new Error(`server or token endpoint missing for ${serverId}`);
}
return refreshAccessToken({
tokenEndpoint: server.tokenEndpoint,
clientId: server.oauthClientId,
clientSecret: server.oauthClientSecret,
refreshToken,
});
},
});
const mcpToolCache = createToolCache(repo.getDb(), mcpConfig.toolCacheTtlSeconds);
const mcpAggregator = createAggregator({
registry: mcpRegistry,
tokenManager: mcpTokenManager,
toolCache: mcpToolCache,
executeCall: async (args) => {
const server = mcpRegistry.getDecrypted(args.serverId);
if (!server) return { output: `未登録の MCP サーバー: ${args.serverId}`, isError: true };
const { client, close } = await createMcpClient(server, args.accessToken, {
callTimeoutMs: mcpConfig.callTimeoutSeconds * 1000,
allowPrivateAddresses: mcpConfig.allowPrivateAddresses,
});
try {
return await executeMcpCall({
client,
serverId: args.serverId,
toolName: args.toolName,
input: args.input,
ctx: args.ctx,
});
} finally {
await close();
}
},
});
setMcpAggregator(mcpAggregator);
setMcpToolLookup((serverId, toolName) => mcpToolCache.get(serverId, toolName));
setDashboardRepo(repo);
opts.workerManager?.setMcpDeps({ tokenManager: mcpTokenManager });
// Expose MCP enumeration to /api/tools so the Piece allowed_tools editor
// can list per-user MCP tools alongside builtin ones. Methods on the
// registry/tokenManager/toolCache surfaces are already DB-backed and
// safe to call per-request.
mcpCatalogDeps = {
registry: { listEnabledForUser: (uid) => mcpRegistry.listEnabledForUser(uid) },
tokenManager: { hasToken: (uid, sid) => mcpTokenManager.hasToken(uid, sid) },
toolCache: { getAllForServers: (ids) => mcpToolCache.getAllForServers(ids) },
};
const callbackBaseUrl = deriveCallbackBaseUrl(opts.authConfig);
app.use(
'/auth/mcp',
createMcpOauthRouter({
db: repo.getDb(),
registry: mcpRegistry,
tokenManager: mcpTokenManager,
pendingTtlMinutes: mcpConfig.oauthPendingTtlMinutes,
getCallbackBaseUrl: () => callbackBaseUrl,
getAuthenticatedUserId: (req) => (req.user as { id?: string } | undefined)?.id ?? null,
resumeWaitingJobs: (uid, sid) => {
repo.resumeMcpWaitingJobs(uid, sid);
},
listToolsAfterAuth: async (serverId: string, accessToken: string) => {
const server = mcpRegistry.getDecrypted(serverId);
if (!server) return;
const { client, close } = await createMcpClient(server, accessToken, {
callTimeoutMs: mcpConfig.callTimeoutSeconds * 1000,
allowPrivateAddresses: mcpConfig.allowPrivateAddresses,
});
try {
const list = (await client.listTools()) as {
tools: Array<{ name: string; description?: string; inputSchema?: unknown }>;
};
mcpToolCache.replaceForServer(serverId, list.tools);
logger.info(
`[mcp] auto list_tools after OAuth server=${serverId} count=${list.tools.length}`,
);
} finally {
await close();
}
},
}),
);
app.use('/api/mcp/servers', express.json(), createMcpAdminRouter({
db: repo.getDb(),
registry: mcpRegistry,
tokenManager: mcpTokenManager,
toolCache: mcpToolCache,
requireAdmin: authActive ? requireAdmin : (_req, _res, next) => next(),
requireAuth: authActive ? requireAuth : (_req, _res, next) => next(),
getUserId: (req) => (req.user as { id?: string } | undefined)?.id ?? null,
allowPrivateAddresses: mcpConfig.allowPrivateAddresses,
}));
app.use('/api/mcp/connections', express.json(), createMcpUserRouter({
db: repo.getDb(),
registry: mcpRegistry,
tokenManager: mcpTokenManager,
toolCache: mcpToolCache,
requireAdmin: authActive ? requireAdmin : (_req, _res, next) => next(),
requireAuth: authActive ? requireAuth : (_req, _res, next) => next(),
getUserId: (req) => (req.user as { id?: string } | undefined)?.id ?? null,
allowPrivateAddresses: mcpConfig.allowPrivateAddresses,
}));
app.use('/api/mcp/user-servers', express.json(), createMcpUserServersRouter({
db: repo.getDb(),
registry: mcpRegistry,
tokenManager: mcpTokenManager,
toolCache: mcpToolCache,
requireAdmin: authActive ? requireAdmin : (_req, _res, next) => next(),
requireAuth: authActive ? requireAuth : (_req, _res, next) => next(),
getUserId: (req) => (req.user as { id?: string } | undefined)?.id ?? null,
insecureLocalTestMode: false,
allowPrivateAddresses: mcpConfig.allowPrivateAddresses,
}));
logger.info('[mcp] subsystem initialised');
} else {
logger.warn('[mcp] MCP_ENCRYPTION_KEY not configured — MCP features disabled');
}
}
// SSH subsystem (gated on ssh.enabled AND MCP_ENCRYPTION_KEY)
// Phase 5 (SSH Console): sshConsole is captured here so that
// startCoreServer() can wire the WS upgrade hook to the http.Server
// it eventually creates. null when SSH is disabled / failed init.
let sshConsole: SshConsoleDeps | null = null;
{
const sshConfig = mergeSshConfig(loadConfig().ssh);
if (!sshConfig.enabled) {
setSshSubsystem(null);
__setActiveSessionLookup(null);
} else if (!isKeyConfigured()) {
logger.warn('[ssh] MCP_ENCRYPTION_KEY not configured — SSH features disabled');
setSshSubsystem(null);
__setActiveSessionLookup(null);
} else {
try {
bootstrapSystemDek(repo.getDb());
verifySystemDek(repo.getDb());
const connectionRepo = createConnectionRepo(repo.getDb());
const grantsRepo = createGrantsRepo(repo.getDb());
const auditRepo = createAuditRepo(repo.getDb());
const abuseRepo = createAbuseRepo(repo.getDb(), {
windowMinutes: sshConfig.abuseWindowMinutes,
failureThreshold: sshConfig.abuseFailureThreshold,
lockMinutes: sshConfig.abuseLockMinutes,
});
const accessResolver = createAccessResolver(grantsRepo, {
adminBypassesGrants: sshConfig.adminBypassesGrants,
});
const forceUnlockLimiter = createAdminRateLimiter(FORCE_UNLOCK_LIMIT);
// Hoisted above sshDeps so the grant-revocation hook can call
// sessionRegistry.revokeAccessFor (introduced for Phase 5 hardening:
// kick active WS viewers when their grant is deleted).
const sessionRegistry = new SessionRegistry({
idleTimeoutMs: sshConfig.console.idleTimeoutSeconds * 1000,
maxSessionDurationMs: sshConfig.console.maxSessionDurationSeconds * 1000,
maxSessionsPerConnection: sshConfig.console.maxSessionsPerConnection,
});
const sshDeps: SshApiDeps = {
db: repo.getDb(),
requireAuth: authActive ? requireAuth : (_req, _res, next) => next(),
requireAdmin: authActive ? requireAdmin : (_req, _res, next) => next(),
getUserId: (req) => (req.user as { id?: string } | undefined)?.id ?? null,
isAdmin: (req) => (req.user as { role?: string } | undefined)?.role === 'admin',
getOrgIds: (req) => ((req.user as { orgIds?: string[] } | undefined)?.orgIds ?? []),
connectionRepo,
grantsRepo,
auditRepo,
abuseRepo,
accessResolver,
maintenance: sshMaintenance,
forceUnlockLimiter,
encryptKeyMaterial: (ownerId, pem, passphrase) => {
const { blob, keyVersion } = sshEncryptPrivateKey(repo.getDb(), ownerId, pem);
const passphraseBlob = passphrase
? sshEncryptPrivateKey(repo.getDb(), ownerId, passphrase).blob
: null;
const fingerprint = sshComputeKeyFingerprint(pem, passphrase);
const publicKey = sshFormatPublicKey(pem, passphrase);
return { blob, passphraseBlob, keyVersion, fingerprint, publicKey };
},
decryptKeyMaterial: (ownerId, blob) => sshDecryptPrivateKey(repo.getDb(), ownerId, blob),
decryptPassphrase: (ownerId, blob) =>
blob ? sshDecryptPrivateKey(repo.getDb(), ownerId, blob) : null,
generateKeypair: (keyType: SshGeneratedKeyType) => sshGenerateKeypair(keyType),
derivePublicKey: (ownerId, blob, passphraseBlob) => {
const pem = sshDecryptPrivateKey(repo.getDb(), ownerId, blob);
const pass = passphraseBlob
? sshDecryptPrivateKey(repo.getDb(), ownerId, passphraseBlob)
: null;
try {
return sshFormatPublicKey(pem, pass);
} finally {
pem.fill(0);
if (pass) pass.fill(0);
}
},
sshTester: {
async test({ connection, decryptedKey, passphrase, timeoutMs }) {
const conn: SshResolvedConnection = {
id: connection.id,
ownerId: connection.ownerId,
host: connection.host,
port: connection.port,
username: connection.username,
privateKeyPem: decryptedKey,
passphrase: passphrase ?? undefined,
hostKeyB64: connection.hostKeyB64,
hostKeyVerified: connection.hostKeyVerifiedAt !== null,
allowPrivate:
sshConfig.allowPrivateAddresses || connection.allowPrivateAddresses,
};
return sshTest({ connection: conn, timeoutMs });
},
},
connectionTestTimeoutMs: sshConfig.callTimeoutSeconds * 1000,
onAccessRevoked: ({ connectionId, userId }) =>
sessionRegistry.revokeAccessFor({
connectionId,
userId,
reason: 'access_revoked',
}),
};
app.use('/api/ssh/admin', express.json(), createSshAdminRouter(sshDeps));
app.use('/api/ssh', express.json(), createSshUserRouter(sshDeps));
// Phase 7: register the SSH tool subsystem so SshExec / SshUpload /
// SshDownload tools can access the same repos / session primitives /
// crypto wrappers that the HTTP layer uses. sessionRegistry is
// constructed above (hoisted so sshDeps.onAccessRevoked can use it).
setSshSubsystem({
connectionRepo,
auditRepo,
abuseRepo,
accessResolver,
decryptKeyMaterial: (ownerId, blob) =>
sshDecryptPrivateKey(repo.getDb(), ownerId, blob),
decryptPassphrase: (ownerId, blob) =>
blob ? sshDecryptPrivateKey(repo.getDb(), ownerId, blob) : null,
getUserAccess: (userId) => {
const user = repo.getUserById(userId);
const isAdmin = user?.role === 'admin';
const orgIds = repo.listUserGiteaOrgs(userId).map((o) => o.orgId);
return { isAdmin, orgIds };
},
sshExec,
sshUpload,
sshDownload,
maintenance: sshMaintenance,
config: sshConfig,
sessionRegistry,
openShellChannel,
});
// Phase 4 (SSH Console): wire the registry into agent-loop so
// buildSystemPrompt can auto-inject the live screen tail into
// the LLM system prompt for movements that allow SshConsole*.
__setActiveSessionLookup((taskId) => sessionRegistry.get(taskId));
// Phase 5 (SSH Console): start the periodic sweep so idle /
// duration-cap sessions actually get closed. Without this, the
// registry just holds sessions until shutdown.
sessionRegistry.startSweepTimer(60_000);
// Phase 5 (SSH Console): when SSH maintenance mode activates
// (master-key rotation), close all live console sessions. They
// would otherwise hold a decrypted DEK reference past the
// rewrap window. The reason 'maintenance' is surfaced to the
// WS client as the close cause.
sshMaintenance.onEnter(async () => {
const all = sessionRegistry.listAll();
for (const s of all) {
await sessionRegistry.closeForTask(s.localTaskId, 'maintenance');
}
});
// Capture WS / status deps for startCoreServer to wire up.
const consoleDeps: SshConsoleDeps = {
registry: sessionRegistry,
resolveUserFromUpgrade: async (req) => {
if (!authenticateUpgrade) {
// Auth disabled: no user available — reject WS attaches.
// (In auth-off mode the WS layer simply isn't usable.)
return null;
}
const u = await authenticateUpgrade(req);
return u ? { id: u.id, role: u.role } : null;
},
resolveTask: async (taskId, user) => {
const idNum = Number(taskId);
if (!Number.isFinite(idNum)) return null;
const viewer: Express.User = {
id: user.id,
email: '',
name: null,
avatarUrl: null,
role: (user.role === 'admin' ? 'admin' : 'user'),
status: 'active',
orgIds: repo.listUserGiteaOrgs(user.id).map((o) => o.orgId),
defaultVisibility: 'private',
defaultVisibilityOrgId: null,
};
const task = await repo.getLocalTask(idNum, { viewer });
if (!task) return null;
return {
id: String(task.id),
ownerId: task.ownerId ?? '',
visibility: task.visibility,
pieceName: task.pieceName,
};
},
resolveSshAccess: async (user, session, task) => {
const connection = connectionRepo.resolveConnection(session.connectionId);
if (!connection) return false;
const orgIds = repo.listUserGiteaOrgs(user.id).map((o) => o.orgId);
const decision = accessResolver.resolveAccess({
connection,
userId: user.id,
isAdmin: user.role === 'admin',
// Use the task's actual piece name so piece-specific grants in
// ssh_connection_grants match (applies_to_all_pieces=0 case).
// Bug pre-fix: hardcoded '' silently failed every piece-scoped grant.
pieceName: task.pieceName,
orgIds,
});
return decision.allowed;
},
denyPatterns: {
async getPatterns(connectionId: string) {
const c = connectionRepo.resolveConnection(connectionId);
if (!c) return { deny: [], allow: [] };
const split = (s: string | null): string[] =>
s ? s.split('\n').map((x) => x.trim()).filter((x) => x.length > 0) : [];
return {
deny: split(c.commandDenyPatterns),
allow: split(c.commandAllowPatterns),
};
},
},
};
sshConsole = consoleDeps;
// REST status endpoint: /api/local/tasks/:taskId/console/status
app.use(
'/api',
createConsoleStatusRouter({
registry: sessionRegistry,
requireAuth: authActive ? requireAuth : (_req: Request, _res: Response, next: NextFunction) => next(),
resolveTask: consoleDeps.resolveTask,
}),
);
// Phase 6 (SSH Console): admin list + kill endpoints. The
// `/api/admin` prefix already has `express.json()` mounted above
// (see Admin user management API), so POST bodies parse correctly.
app.use(
'/api/admin',
createConsoleAdminRouter({
registry: sessionRegistry,
requireAdmin: authActive ? requireAdmin : (_req: Request, _res: Response, next: NextFunction) => next(),
}),
);
logger.info('[ssh] subsystem initialised');
} catch (e) {
logger.error(`[ssh] init failed err=${String(e)}`);
setSshSubsystem(null);
__setActiveSessionLookup(null);
}
}
}
// --- Local tasks API ---
mountLocalTasksApi(app, {
repo,
worktreeDir,
generateTitle: opts.generateTitle,
selectPiece: opts.selectPiece,
pieceExists: opts.piecesDir
? (name: string, ownerId?: string) => {
// Mirror the worker's per-user → global-custom → builtin resolution order.
// 1. Owner's per-user dir (matches what worker uses at job run time).
// No-auth tasks (ownerId null) fall back to 'local', mirroring the worker.
const ufl = loadConfig().userFolderRoot ?? './data/users';
const ownerForPieces = ownerId ?? 'local';
if (existsSync(join(userPiecesDir(ufl, ownerForPieces), `${name}.yaml`))) return true;
// 2. Global-custom + builtin via existing helper.
return findPieceFile(name, opts.piecesDir!, opts.customPiecesDir) !== null;
}
: undefined,
sessRepo,
getMaxUploadMb: opts.configManager
? () => opts.configManager!.getConfig().tools?.taskUploadMaxSizeMb ?? 50
: () => loadConfig().tools?.taskUploadMaxSizeMb ?? 50,
});
// --- Local files API ---
mountLocalFilesApi(app, repo);
// --- Subtask activity API ---
app.use('/api/local/tasks', createSubtaskActivityRouter(repo));
// --- Subtask files API (listing MUST come before wildcard) ---
mountSubtaskFilesApi(app, repo);
// --- Job detail API ---
// Gate on viewer: getJob returns null for jobs the caller cannot see (Task 10 + 16).
// When auth is inactive, req.user is undefined → repository falls back to 1=1 (no filter).
const jobDetailHandlers: express.RequestHandler[] = [];
if (authActive) jobDetailHandlers.push(requireAuth);
jobDetailHandlers.push(async (req: Request, res: Response) => {
try {
const viewer = (req.user as Express.User | undefined) ?? undefined;
const job = await repo.getJob(req.params.jobId, viewer ? { viewer } : undefined);
if (!job) {
res.status(404).json({ error: 'Job not found' });
return;
}
res.json(job);
} catch {
res.status(500).json({ error: 'Failed to fetch job' });
}
});
app.get('/api/jobs/:jobId', ...jobDetailHandlers);
// NOTE: bridge `/health` (`{status:'ok'}`) is intentionally registered
// LATER — see the "bridge /health fallback" block below the gateway
// mount. Express matches handlers in registration order, so the
// gateway gate middleware must register before this handler to be
// able to dispatch `/health` to the gateway sub-app when running
// (LiteLLM-shape JSON — CRITICAL-3 fix).
if (opts.configManager) {
mountConfigApi(app, opts.configManager);
}
// Branding 公開 API (GET は認証不要)。アップロード系は admin のみ。
// 保存先は data/branding/ 配下で .gitignore 済み → git pull 時にユーザーのカスタム資産が失われない。
const brandingDir = opts.brandingDir ?? join(process.cwd(), 'data', 'branding');
const brandingGuard: RequestHandler = authActive ? requireAdmin : (_req, _res, next) => next();
mountBrandingApi(app, opts.configManager, { brandingDir, adminGuard: brandingGuard });
if (opts.piecesDir) {
mountPiecesApi(app, {
piecesDir: opts.piecesDir,
customPiecesDir: opts.customPiecesDir,
userPiecesRootDir: loadConfig().userFolderRoot ?? './data/users',
});
}
mountToolsApi(app, {
authActive,
requireAuth,
mcp: mcpCatalogDeps,
});
if (opts.skillCatalog) {
mountSkillsApi(app, {
skillCatalog: opts.skillCatalog,
requireAuth,
requireAdmin,
authActive,
auditLog: async (jobId, action, actor, detail) => {
try { await repo.addAuditLog(jobId, action, actor, detail); } catch {}
},
});
}
// Notifications V2 (Web Push). Always mounted — the route returns 503
// when pushService is null so the UI can render a clear status.
// Spec: docs/superpowers/specs/2026-05-28-browser-notifications-v2-webpush.md.
mountNotificationsApi(app, {
repo,
pushService: opts.pushService ?? null,
vapidStore: opts.vapidStore ?? null,
requireAuth: authActive
? requireAuth
: (_req, _res, next) => {
// Auth disabled (local mode): synthesize a 'local' user so
// the per-user state machine still functions.
(_req as unknown as { user: { id: string; role: string } }).user = {
id: 'local',
role: 'admin',
};
next();
},
});
if (opts.scheduler) {
app.use('/api/scheduled-tasks', express.json());
mountScheduledTasksApi(app, repo, opts.scheduler, { sessRepo });
}
// Browser session API
const browserSessionManager = SessionManager.isAvailable()
? new SessionManager(loadConfig().browser ?? {})
: null;
setSessionManager(browserSessionManager);
app.use('/api/local/browser/sessions', express.json(), createBrowserApi(browserSessionManager, repo));
// Per-user browser session profile CRUD (envelope-encrypted storageState).
// Distinct from /api/local/browser/sessions (which manages live noVNC sessions).
const masterKeyPath = loadConfig().secrets?.masterKeyPath ?? './data/secrets/master.key';
app.use(
'/api/browser-sessions',
express.json(),
createBrowserSessionApi({ sessRepo, sessionManager: browserSessionManager, masterKeyPath, authActive }),
);
// Per-user folder REST API: list / read / write / delete with trash.
const userFolderRoot = loadConfig().userFolderRoot ?? './data/users';
// Shared NotesService: one instance for the entire app lifetime.
// getUserOrgIds uses the same repo.listUserGiteaOrgs pattern as SSH / MCP auth.
const notesRepo = new NotesRepository(repo.getDb());
const notesService = new NotesService({
db: repo.getDb(),
repo: notesRepo,
userFolderRoot,
getUserOrgIds: (userId) => repo.listUserGiteaOrgs(userId).map((o) => o.orgId),
audit: (action, actor, target) => {
try {
repo.addAuditLog(null, action, actor, { target });
} catch (err) {
logger.warn(`[notes-audit] failed: ${(err as Error).message}`);
}
},
});
app.use('/api/users/me', createUserFolderApi({ userFolderRoot, sessRepo, masterKeyPath, authActive, notesService }));
// Notes knowledge-sharing REST API.
app.use('/api/notes', createNotesApi({ service: notesService, authActive }));
// Per-user memory entries REST API (GET/PUT/DELETE).
app.use('/api/local/memory', createMemoryApi({ dataDir: userFolderRoot, authActive }));
// Per-user reflection history REST API (GET/POST).
app.use('/api/local/reflection', createReflectionApi({ dataDir: userFolderRoot, repo, authActive }));
// BackendStatusRegistry singleton — probes every configured worker
// (direct llama-server or proxy LiteLLM) at a fixed cadence so the
// Side Info Panel's node-status widget can paint without hammering
// the upstream per page-render. We pull workers from loadConfig() on
// each tick so YAML edits propagate without needing a ConfigManager
// event subscription here.
const backendStatusRegistry = createBackendStatusRegistry({
getWorkers: () => loadConfig().provider.workers ?? [],
probeDirect: buildDirectProbe(),
probeProxy: buildProxyProbe(),
});
backendStatusRegistry.start();
// Dashboard widget + worker status REST API.
app.use('/api/local/dashboard', express.json(), createDashboardApi({
repo,
getWorkers: () => loadConfig().provider.workers,
authActive,
backendStatusRegistry,
}));
// Phase 3c — same-process AAO Gateway mount. Always installs the
// dynamic 404 gate; the gateway sub-app only comes alive when
// `gateway.enabled: true` in config.yaml (or after an admin flips it
// ON via Settings → Gateway Server). Pure no-op when no ConfigManager
// is available (tests / scripts run without one).
let gatewayMount: GatewayMountHandle | null = null;
if (opts.configManager) {
gatewayMount = mountGateway({
app,
configManager: opts.configManager,
repo,
// Per CRITICAL-2: the gateway now owns its own BackendStatusRegistry
// over gateway.backends[] rather than reusing the worker registry
// (which probes provider.workers[] — different IDs). Same-host
// double-probe is intentional and cheap.
// Reuse the worker's prom-client registry so gateway counters
// appear on the same /metrics endpoint (no port collision).
promRegistry: sharedPromRegistry,
metricsPrefix: 'aao_gateway',
});
// Apply the boot-time config so a server starting with
// `gateway.enabled: true` brings the gateway up immediately —
// no need to re-save config from the UI.
const bootGateway = readGatewayConfig(opts.configManager.getConfig());
gatewayMount.applyConfig(bootGateway).catch((e) => {
logger.warn(`[bridge-gateway] initial applyConfig threw: ${e instanceof Error ? e.message : String(e)}`);
});
// Drain in-flight gateway streams on process shutdown so SIGTERM
// doesn't strand SSE clients with a half-written response.
registerShutdownHook('bridge-gateway', async () => {
try { await gatewayMount!.stop(); } catch { /* noop */ }
});
} else {
logger.info('[bridge-gateway] not mounted (no ConfigManager — hot reload unavailable)');
}
// CRITICAL-3 fix: bridge `/health` fallback. Registered AFTER the
// gateway gate so when gateway.enabled=true the gate dispatches
// `/health` into the gateway sub-app (LiteLLM-compat
// `healthy_endpoints` / `unhealthy_endpoints` JSON shape). When the
// gateway is off, the gate falls through to here and the bridge
// answers with the legacy `{status:'ok'}` shape ops scripts rely on.
app.get('/health', (_req: Request, res: Response) => {
res.json({ status: 'ok' });
});
// Admin status endpoint for the Gateway Server UI. Read-only — the
// form drives state changes through PUT /api/config (the
// config-changed listener inside mountGateway picks them up).
// Requires admin when auth is active; in auth-off dev mode the
// endpoint is open like the rest of /api/admin/* health/status reads.
{
// Prefer the explicit listenPort threaded through CoreServerOptions
// (startCoreServer always sets it) over the PORT env-var guess so
// the status endpoint matches what the bridge actually bound.
// env-var fallback is kept for callers that bypass startCoreServer.
const envPortRaw = Number(process.env['PORT']);
const envPort = Number.isFinite(envPortRaw) && envPortRaw > 0 ? envPortRaw : 9876;
const actualPort = opts.listenPort ?? envPort;
const statusRouter = createAdminGatewayStatusRouter({
mount: gatewayMount,
configManager: opts.configManager ?? null,
workerPort: actualPort,
});
if (authActive) {
app.use('/api/admin/gateway/status', requireAdmin, statusRouter);
} else {
app.use('/api/admin/gateway/status', statusRouter);
}
}
// Wire user-folder tool deps so RunUserScript / ListUserAssets can decrypt sessions.
setUserFolderToolDeps({
sessRepo,
masterKeyPath,
userFolderRoot,
auditLog: (action, detail, jobId) => {
// Best-effort: never let an audit failure surface to the tool caller.
repo.addAuditLog(jobId ?? null, action, 'tool', detail).catch(err => {
logger.warn(`[user-folder-tool] audit log failed action=${action} err=${err}`);
});
},
});
// Wire skill tool deps so InstallSkill / InstallSkillFromDir can audit-log.
setSkillToolDeps({
auditLog: (action, detail, jobId) => {
repo.addAuditLog(jobId ?? null, action, 'tool', detail).catch(err => {
logger.warn(`[skill-tool] audit log failed action=${action} err=${err}`);
});
},
userFolderRoot,
});
// Wire app-docs tool deps so GetMyOrchestratorState can introspect DB.
setAppDocsDeps({ db: repo.getDb(), userFolderRoot });
// noVNC static files
app.use('/novnc', createNovncRouter());
// CAPTCHA Pool は admin、Task Session はタスク visibility で認可。
// novnc-proxy は Repository を直接知らずに済むよう、判定ロジックは
// ここでクロージャに包んで渡す。
const authorizeNovncSession: import('./novnc-proxy.js').NovncSessionAuthorizer = async (session, user) => {
if (session.kind === 'pool') {
return user.role === 'admin';
}
if (session.kind === 'task' && session.taskId) {
const taskIdNum = Number(session.taskId);
if (!Number.isFinite(taskIdNum)) return false;
const task = await repo.getLocalTask(taskIdNum);
if (!task) return false;
return canUserSeeTask(user, task);
}
// legacy: kind 未設定 / taskId なし → 旧来の owner-or-admin
const isOwner = session.userId === user.id;
return isOwner || user.role === 'admin';
};
return { app, browserSessionManager, authenticateUpgrade, authorizeNovncSession, sshConsole, backendStatusRegistry, workerMetrics, gatewayMount };
}
export function finalizeServer(app: express.Application): express.Application {
app.use((_req: Request, res: Response) => {
res.status(404).json({ error: 'Not found' });
});
app.use((err: Error, _req: Request, res: Response, _next: NextFunction) => {
logger.error(`Unhandled error: ${err.message}`);
res.status(500).json({ error: 'Internal server error' });
});
return app;
}
function deriveCallbackBaseUrl(authConfig: AuthConfig | undefined): string {
const providers = authConfig?.providers;
if (providers) {
for (const p of Object.values(providers) as Array<{ callbackUrl?: string } | undefined>) {
if (!p) continue;
const cb = p.callbackUrl;
if (cb) {
try {
return new URL(cb).origin;
} catch {
// ignore malformed URL
}
}
}
}
const port = Number(process.env.PORT ?? 9876);
return `http://localhost:${port}`;
}
export function startCoreServer(opts: CoreServerOptions, port: number = 9876): void {
const {
app,
browserSessionManager,
authenticateUpgrade,
authorizeNovncSession,
sshConsole,
backendStatusRegistry,
workerMetrics,
gatewayMount,
// Forward the actual port to createCoreServer so the admin gateway
// status endpoint reports the real bind port (not the PORT env
// guess). See `listenPort` doc on CoreServerOptions.
} = createCoreServer({ ...opts, listenPort: port });
// Phase 3c: parking the handle here so a future status endpoint or
// probe can read it. Currently unused at top level (the bridge
// gateway-status admin endpoint reads via the createCoreServer return).
void gatewayMount;
// Phase 3b: fan the worker metrics handle out to every Worker so the
// job lifecycle / LLM / tool counters fire. WorkerManager.setWorkerMetrics
// is the documented hook for this — it also propagates on the next
// rebuild so a config change doesn't blank metrics for new workers.
if (workerMetrics && opts.workerManager) {
opts.workerManager.setWorkerMetrics(workerMetrics);
}
const finalApp = finalizeServer(app);
const host = process.env['HOST'] ?? '0.0.0.0';
const server = finalApp.listen(port, host, () => {
logger.info(`Core server listening on ${host}:${port}`);
});
// 起動と同時に CAPTCHA Pool の idle GC を回す (task session を 5 分アイドルで GC)
if (browserSessionManager) browserSessionManager.startIdleGc();
// User folder の trash を定期的に sweep (デフォルト 30 日)
{
const cfg = loadConfig();
const userFolderRoot = cfg.userFolderRoot ?? './data/users';
const retentionDays = cfg.tools?.trashRetentionDays ?? 30;
startTrashCleanup({ userFolderRoot, retentionDays });
// Reflection snapshot retention sweep (daily, same schedule as trash)
startReflectionRetentionSweep({
dataDir: userFolderRoot,
config: {
snapshotRetentionDays: cfg.reflection.snapshotRetentionDays,
snapshotMaxBytesPerUser: cfg.reflection.snapshotMaxBytesPerUser,
},
});
}
setupNovncWebSocketProxy(server, () => browserSessionManager, authenticateUpgrade, authorizeNovncSession);
// Phase 5 (SSH Console): attach the WS upgrade handler now that we
// have the http.Server. attachConsoleWs only handles upgrades that
// match /api/local/tasks/:id/console/ws; all other upgrades fall
// through to noVNC (or are dropped).
if (sshConsole) {
attachConsoleWs(server, sshConsole);
// Graceful shutdown: close all live console sessions so we don't
// leak channels / DEK references. The registry tears down the
// sweep timer too. Errors are caught inside the shutdown driver
// (see ./shutdown.ts) — never block the signal.
registerShutdownHook('ssh-console', async () => {
await sshConsole.registry.shutdown();
});
}
// Tear down the BackendStatusRegistry's polling timer on shutdown so
// tests / restart loops don't leak handles. stop() is async — it
// aborts the in-flight probe cycle (via AbortController) and awaits
// settlement so pending fetches don't outlive the signal. Without
// this, SIGTERM could block process exit for up to ~3s per probe.
registerShutdownHook('backend-status-registry', async () => {
await backendStatusRegistry.stop();
});
// Install SIGTERM / SIGINT listeners exactly once. All previous
// subsystem-local handler pairs (one per Phase) have been folded
// into the single registry above — this avoids hitting Node's
// default MaxListeners cap (10) as Phase D adds more cleanup paths.
installSignalHandlers();
}