maestro/src/bridge/console-ws-api.ts
oss-sync 483464597a
Some checks failed
CI / build-and-test (push) Has been cancelled
sync: update from private repo (a360d15)
2026-06-09 09:19:09 +00:00

460 lines
18 KiB
TypeScript

import type { IncomingMessage, Server as HttpServer } from 'node:http';
import type { Socket } from 'node:net';
import { WebSocketServer, type WebSocket } from 'ws';
import { Router, type Request, type Response } from 'express';
import { logger } from '../logger.js';
import type { SessionRegistry } from '../ssh/console-registry.js';
import type { ConsoleSession } from '../ssh/console-session.js';
import type { AttachMessage, ServerTextMessage } from '../ssh/console-protocol.js';
import { checkConsoleInput } from '../ssh/console-deny-check.js';
import type { OpenConsoleDeps, OpenConsoleResult } from '../engine/tools/ssh-console.js';
import { openConsoleSession } from '../engine/tools/ssh-console.js';
export interface SimpleUser { id: string; role: 'admin' | 'user' | string }
export interface SimpleTask { id: string; ownerId: string; visibility: string; pieceName: string }
/**
* Resolve the request user for the Console REST endpoints. In no-auth
* single-user mode (`authActive === false`) there is no `req.user`, so a
* stable `local` admin user is synthesized — admin role is required for the
* no-auth task (owner_id NULL) to pass `buildVisibilityWhere` in resolveTask.
* Mirrors the WS upgrade path and notifications-api's no-auth synthetic user.
*/
function resolveConsoleUser(req: Request, authActive: boolean): SimpleUser | null {
const u = (req.user as SimpleUser | undefined) ?? null;
if (u) return u;
if (!authActive) return { id: 'local', role: 'admin', orgIds: [] } as SimpleUser & { orgIds: string[] };
return null;
}
export type AccessDecision =
| { allowed: true; canWrite: boolean }
| { allowed: false; reason: 'unauthenticated' | 'task_not_visible' | 'no_session' | 'no_grant' };
/**
* Pure access decision for an SSH Console WS attach attempt.
*
* - Unauthenticated → reject
* - Task not visible → reject ("not found"-like)
* - No active session for the task → reject
* - SSH access denied (no grant) → reject
* - Otherwise → allow; canWrite gated on owner OR admin.
*
* Non-owners with task-visibility (e.g. org members on an org-visible task)
* attach as read-only viewers — they see scrollback + live output but cannot
* type or resize.
*/
export function decideAccess(args: {
user: SimpleUser | null;
task: SimpleTask | null;
session: ConsoleSession | null;
accessAllowed: boolean;
}): AccessDecision {
if (!args.user) return { allowed: false, reason: 'unauthenticated' };
if (!args.task) return { allowed: false, reason: 'task_not_visible' };
if (!args.session) return { allowed: false, reason: 'no_session' };
if (!args.accessAllowed) return { allowed: false, reason: 'no_grant' };
const canWrite = args.user.id === args.task.ownerId || args.user.role === 'admin';
return { allowed: true, canWrite };
}
export interface DenyPatternProvider {
/** Returns the {deny, allow} regex patterns for a given connection_id. */
getPatterns(connectionId: string): Promise<{ deny: string[]; allow: string[] }>;
}
export interface ConsoleWsDeps {
registry: SessionRegistry;
resolveUserFromUpgrade: (req: IncomingMessage) => Promise<SimpleUser | null>;
resolveTask: (taskId: string, user: SimpleUser) => Promise<SimpleTask | null>;
resolveSshAccess: (user: SimpleUser, session: ConsoleSession, task: SimpleTask) => Promise<boolean>;
denyPatterns: DenyPatternProvider;
}
const PATH_RE = /^\/+api\/local\/tasks\/([^/]+)\/console\/ws$/;
/**
* Attach the SSH Console WebSocket upgrade handler to the given http.Server.
*
* Matches paths of the form /api/local/tasks/:taskId/console/ws and runs the
* full auth + access pipeline. Rejected upgrades are silently destroyed
* (the client gets a 1006 abnormal close) so we don't leak failure
* reasons over the upgrade channel. The reason is always logged.
*/
export function attachConsoleWs(server: HttpServer, deps: ConsoleWsDeps): void {
const wss = new WebSocketServer({ noServer: true });
server.on('upgrade', async (req, socket, head) => {
const url = req.url ?? '';
const m = url.match(PATH_RE);
if (!m) return;
const taskId = decodeURIComponent(m[1]!);
try {
const user = await deps.resolveUserFromUpgrade(req);
const task = user ? await deps.resolveTask(taskId, user) : null;
const session = deps.registry.get(taskId);
const accessAllowed = !!(user && task && session)
? await deps.resolveSshAccess(user, session, task)
: false;
const decision = decideAccess({ user: user ?? null, task, session, accessAllowed });
if (!decision.allowed) {
logger.info(`[console-ws] reject taskId=${taskId} reason=${decision.reason}`);
socket.destroy();
return;
}
const patterns = await deps.denyPatterns.getPatterns(session!.connectionId);
wss.handleUpgrade(req, socket as Socket, head, (ws) => {
handleConsoleSocket(ws, session!, user!, decision.canWrite, patterns);
});
} catch (e) {
logger.warn(`[console-ws] upgrade error: ${(e as Error).message}`);
socket.destroy();
}
});
}
/**
* Handle a single accepted Console WebSocket. The caller has already done
* auth + access + scrollback fetch.
*
* Wire protocol (see console-protocol.ts):
* - server → client: attach (JSON), ESC c (binary reset), replay bytes,
* replay_begin (JSON), replay_end (JSON), live binary output, notices
* - client → server: binary input frames (forwarded to PTY) and JSON
* control frames (`resize`).
*
* Input policy:
* - canWrite=false ⇒ all input is rejected with a 'warn' notice.
* - canWrite=true ⇒ input is forwarded as-is to the PTY, BUT any chunk
* containing a line terminator is first checked against the connection's
* deny/allow patterns. A rejected line drops the WHOLE chunk and emits
* an 'error' notice. The check fires only on chunks containing a CR/LF
* since pre-Enter keystrokes are partial input the operator hasn't
* committed yet — the live shell echo will show them on screen but
* they only matter for safety once the line is submitted.
*/
export function handleConsoleSocket(
ws: WebSocket,
session: ConsoleSession,
user: SimpleUser,
canWrite: boolean,
patterns: { deny: string[]; allow: string[] },
): void {
const sendText = (msg: ServerTextMessage) => {
if (ws.readyState === ws.OPEN) ws.send(JSON.stringify(msg));
};
const sendBinary = (buf: Buffer) => {
if (ws.readyState === ws.OPEN) ws.send(buf, { binary: true });
};
const attachMsg: AttachMessage = {
type: 'attach',
acting_user_id: user.id,
can_write: canWrite,
connection_id: session.connectionId,
cols: session.cols,
rows: session.rows,
};
sendText(attachMsg);
// Replay scrollback: ESC c reset, raw bytes, then markers around the bytes.
// The ESC c ensures the client terminal starts fresh even if it was already
// attached to another session before this one.
const scroll = session.scrollbackBytes();
sendBinary(Buffer.from([0x1b, 0x63])); // ESC c — full reset
sendText({ type: 'replay_begin', bytes: scroll.length });
if (scroll.length > 0) sendBinary(scroll);
sendText({ type: 'replay_end' });
const unsub = session.onOutput((b) => sendBinary(b));
// Register this WS as a viewer so the registry can selectively kick it
// (e.g. when its access grant is revoked) without tearing down the whole
// SSH session. unsubViewer must fire on `close` along with unsub.
const unsubViewer = session.addViewer({
userId: user.id,
close: (reason) => {
try {
sendText({ type: 'close', reason });
} catch { /* socket gone */ }
try {
// 1008 = Policy Violation — appropriate for authorization revocation.
ws.close(1008, reason);
} catch { /* already closed */ }
},
});
// Heartbeat — without this, a half-dead WS (TCP alive but the peer
// can't respond, e.g. laptop suspended or NAT/proxy dropped state)
// never fires `close` and the UI just silently swallows user input.
// Send ping every 30s; if no pong within the next ping cycle, treat
// as dead and terminate so the client switches to `disconnected`
// and the user knows to refresh / reconnect.
let alive = true;
const heartbeatTimer = setInterval(() => {
if (!alive) {
logger.warn(`[console-ws] heartbeat timeout for task=${session.localTaskId} — terminating`);
try { ws.terminate(); } catch { /* already dead */ }
return;
}
alive = false;
try {
ws.ping();
} catch (e) {
logger.warn(`[console-ws] ping failed: ${(e as Error).message}`);
}
}, 30_000);
ws.on('pong', () => { alive = true; });
ws.on('message', (data, isBinary) => {
if (isBinary) {
if (!canWrite) {
sendText({ type: 'notice', severity: 'warn', msg: 'read-only viewer; input ignored.' });
return;
}
const buf = data as Buffer;
const text = buf.toString('utf8');
if (/[\r\n]/.test(text)) {
const denyResult = checkConsoleInput(
text,
patterns.deny.length ? patterns.deny : null,
patterns.allow.length ? patterns.allow : null,
);
if (!denyResult.ok) {
sendText({
type: 'notice',
severity: 'error',
msg: `command rejected: ${denyResult.reason} (${denyResult.matched ?? 'n/a'})`,
});
return;
}
}
session.write(buf, 'human');
return;
}
try {
const msg = JSON.parse(String(data));
if (msg && msg.type === 'resize' && typeof msg.cols === 'number' && typeof msg.rows === 'number') {
if (canWrite) session.resize(msg.cols, msg.rows);
}
} catch (e) {
logger.warn(`[console-ws] bad text frame: ${(e as Error).message}`);
}
});
ws.on('close', () => {
clearInterval(heartbeatTimer);
unsub();
unsubViewer();
});
}
/**
* REST router exposing GET /local/tasks/:taskId/console/status.
*
* Used by the UI to know whether a Console tab should render an attach
* button (active=true) or a "no live session" empty state.
*/
export function createConsoleStatusRouter(deps: {
registry: SessionRegistry;
requireAuth: any;
resolveTask: (taskId: string, user: SimpleUser) => Promise<SimpleTask | null>;
/** No-auth single-user mode: synthesize a local admin user so the Console
* status poll works without login (admin role makes null-owner no-auth
* tasks visible via buildVisibilityWhere). Defaults to true. */
authActive?: boolean;
}): Router {
const r = Router();
r.get(
'/local/tasks/:taskId/console/status',
deps.requireAuth,
async (req: Request, res: Response) => {
const taskId = req.params.taskId!;
const user = resolveConsoleUser(req, deps.authActive ?? true);
if (!user) {
res.status(401).json({ error: 'unauthenticated' });
return;
}
const task = await deps.resolveTask(taskId, user);
if (!task) {
// Task is missing or not visible to this user. Return 200
// active=false rather than 404: the UI's App.tsx polls this
// endpoint every 5 seconds for the currently-selected local
// task, and a 404 logs an unsuppressible network error in
// the browser DevTools console on every tick (reported as
// issue #347 during dogfooding). The poll only needs to
// know whether a SSH attach button should be rendered, and
// "no, you can't attach" is the same answer whether the
// task doesn't exist or just doesn't expose a session —
// collapsing both into 200 active=false matches the rest of
// the route's fallback shape without leaking task existence
// either way.
res.json({ active: false });
return;
}
const s = deps.registry.get(taskId);
if (!s) {
res.json({ active: false });
return;
}
res.json({
active: true,
connection_id: s.connectionId,
started_at: new Date(s.startedAt).toISOString(),
last_activity_at: new Date(s.lastActivityAt).toISOString(),
cols: s.cols,
rows: s.rows,
});
},
);
return r;
}
/**
* Map an `OpenConsoleResult.error` code to an HTTP status. The endpoint
* surfaces the same structured `error` code string in the JSON body so the
* UI can map it to a localized message. Unknown codes default to 400 (caller
* error) except the internal failure codes which default to 500.
*/
function statusForOpenError(error: string | undefined): number {
switch (error) {
case 'connection_not_found':
return 404;
case 'no_grant':
return 403;
case 'host_key_not_verified':
case 'host_key_mismatch':
case 'connection_change_requires_force':
case 'abuse_locked':
case 'connection_disabled':
return 409;
case 'decrypt_failed':
case 'open_shell_failed':
return 500;
// missing_connection_id / missing_task_context / no_user_context /
// piece_not_configured / piece_not_allowed / preflight_denied and any
// other unexpected code → 400 (the request was malformed or denied at a
// layer that maps cleanly onto a bad-request response).
default:
return 400;
}
}
/**
* The preflight helper (`preflight` from engine/tools/ssh.ts) returns a flat
* `preflight_denied` for several distinct denials (no grant, abuse lock,
* disabled connection, connection not found). The REST contract wants the
* specific codes, so the handler re-derives them from the human-readable
* message that `openConsoleSession` mirrors into `result.message`. This is a
* best-effort refinement layered ON TOP of the real gate — the gate itself
* (inside openConsoleSession/preflight) is authoritative and unchanged; we
* never widen access here, only narrow a generic 400 into a more specific
* 4xx for the UI.
*/
function refineErrorCode(result: OpenConsoleResult): string {
const code = result.error ?? 'unknown';
if (code !== 'preflight_denied') return code;
// Match the exact human-readable strings `preflight` (engine/tools/ssh.ts)
// produces for each denial reason. Order matters: 'disabled' and 'locked'
// are checked before the generic access-denied so a disabled/locked
// connection isn't mislabeled as no_grant.
const msg = (result.message ?? '').toLowerCase();
if (msg.includes('does not exist')) return 'connection_not_found';
if (msg.includes('is disabled')) return 'connection_disabled';
if (msg.includes('temporarily locked')) return 'abuse_locked';
if (msg.includes('access denied')) return 'no_grant';
return 'preflight_denied';
}
/**
* REST router exposing POST /local/tasks/:taskId/console/session.
*
* Lets a user open an SSH console PTY session themselves from a task's
* Console tab. The session is keyed by localTaskId, so the WS/xterm and the
* AI console tools share it automatically. The handler runs the SAME gate as
* the agent-facing SshConsoleEnsure tool: it calls the shared
* `openConsoleSession` core, which runs the full preflight (piece membership
* / access decision / enabled / abuse / host-key) against the task's piece
* name. `allowedConnections: ['*']` is passed because the per-piece
* allowed-list is an agent-prompt concept; the authoritative gate is the
* access resolver against `task.pieceName`, which still runs inside the core.
*/
export function createConsoleSessionRouter(deps: {
sub: OpenConsoleDeps['sub'];
preflight: OpenConsoleDeps['preflight'];
requireAuth: any;
resolveTask: (taskId: string, user: SimpleUser) => Promise<SimpleTask | null>;
/** No-auth single-user mode: synthesize a local admin user so users can
* open Console sessions without login. Defaults to true. */
authActive?: boolean;
}): Router {
const r = Router();
r.post(
'/local/tasks/:taskId/console/session',
deps.requireAuth,
async (req: Request, res: Response) => {
const taskId = req.params.taskId!;
const user = resolveConsoleUser(req, deps.authActive ?? true);
if (!user) {
res.status(401).json({ error: 'unauthenticated' });
return;
}
const task = await deps.resolveTask(taskId, user);
if (!task) {
// Missing OR not visible to this user → opaque 404 (don't leak
// task existence across the visibility boundary).
res.status(404).json({ error: 'task_not_found' });
return;
}
const body = (req.body ?? {}) as {
connection_id?: unknown;
cols?: unknown;
rows?: unknown;
force_replace?: unknown;
};
const connectionId = typeof body.connection_id === 'string' ? body.connection_id : '';
if (!connectionId) {
res.status(400).json({ error: 'missing_connection_id' });
return;
}
const cols = typeof body.cols === 'number' ? body.cols : undefined;
const rows = typeof body.rows === 'number' ? body.rows : undefined;
const result = await openConsoleSession(
{ sub: deps.sub, preflight: deps.preflight },
{
taskId,
connectionId,
ownerId: task.ownerId || 'local',
userId: user.id,
pieceName: task.pieceName,
// Per-piece allowed-list is an agent-prompt concept; the real gate
// is the access resolver against task.pieceName inside the core.
allowedConnections: ['*'],
cols,
rows,
forceReplace: body.force_replace === true,
initiator: 'user',
},
);
if (result.ok) {
res.status(200).json({
ok: true,
connection_id: result.connectionId,
cols: result.cols,
rows: result.rows,
...(result.alreadyActive ? { already_active: true } : {}),
});
return;
}
const code = refineErrorCode(result);
res.status(statusForOpenError(code)).json({ error: code });
},
);
return r;
}