maestro/src/ssh/console-session.ts
2026-06-03 05:08:00 +00:00

343 lines
11 KiB
TypeScript

import { createRequire } from 'node:module';
import type { Terminal as HeadlessTerminalType } from '@xterm/headless';
import type { ClientChannel } from 'ssh2';
import { ByteRingBuffer } from './ring-buffer.js';
/** Where an input chunk came from — drives audit `source` + back-pressure label. */
export type InputSource = 'human' | 'ai';
import type { SessionCloseReason } from './console-protocol.js';
import type { SshAuditRepo } from './audit-repo.js';
import { logger } from '../logger.js';
// @xterm/headless is CommonJS-only — named ESM import fails at runtime with
// "Named export 'Terminal' not found" even though TypeScript types resolve.
// Load via createRequire (same pattern as ssh2 / crypto modules in this repo).
const cjsRequire = createRequire(import.meta.url);
const { Terminal: HeadlessTerminal } = cjsRequire('@xterm/headless') as {
Terminal: typeof HeadlessTerminalType;
};
type HeadlessTerminal = HeadlessTerminalType;
export interface ConsoleSessionArgs {
localTaskId: string;
connectionId: string;
ownerId: string | null;
startedByUserId: string;
cols: number;
rows: number;
scrollbackCap: number;
channel: ClientChannel;
auditRepo: SshAuditRepo;
}
export interface ScreenSnapshot {
cols: number;
rows: number;
text: string;
cursor: { x: number; y: number };
}
/**
* Per-WebSocket viewer handle registered to a ConsoleSession. Used so the
* registry can selectively kick viewers (e.g. when a grant is revoked) without
* killing the underlying SSH session that other viewers / the agent still use.
*/
export interface ViewerHandle {
/** Acting user the WS authenticated as (req.user.id at upgrade time). */
userId: string;
/** Closes the WS with a structured close message; idempotent if already closed. */
close: (reason: SessionCloseReason) => void;
}
export interface ScrollbackSnapshot {
text: string;
byteCount: number;
truncated: boolean;
}
/**
* Replace every LF (0x0a) byte with CR (0x0d). Applied to AI input only —
* the PTY's ICRNL flag translates CR→NL for the shell's readline, but the
* reverse (LF→NL on input) does not happen in cooked mode. Browser xterm
* sends CR on Enter, so this normalization makes AI and human input
* indistinguishable downstream.
*
* Allocates a new Buffer (never mutates input). Returns the original
* reference if no LF is present (fast path for control chars and partial
* inputs).
*/
function normalizeLfToCr(buf: Buffer): Buffer {
if (buf.indexOf(0x0a) === -1) return buf;
const out = Buffer.from(buf);
for (let i = 0; i < out.length; i++) {
if (out[i] === 0x0a) out[i] = 0x0d;
}
return out;
}
/**
* Strip common ANSI escape sequences (CSI, OSC, SGR-style) so the AI can read
* scrollback as plain text. This is permissive on purpose — we strip the
* common shapes seen from interactive shells (bash/zsh prompts, ls --color,
* tput) rather than implement a full xterm parser. The headless xterm
* terminal already gives us the rendered screen for screen snapshots; this
* helper is only used for the longer raw byte history.
*/
function stripAnsi(s: string): string {
return (
s
// CSI: ESC '[' parameters intermediate final-byte
.replace(/\x1b\[[0-?]*[ -/]*[@-~]/g, '')
// OSC: ESC ']' ... BEL or ESC ']' ... ESC \
.replace(/\x1b\][\s\S]*?(?:\x07|\x1b\\)/g, '')
// Other 2-byte ESC sequences (ESC + single char in 0x40-0x5F range,
// excluding '[' and ']' which were already handled above).
.replace(/\x1b[@-Z\\^_]/g, '')
// Lone control bytes (BEL, BS, VT, FF, SO, SI etc.) — keep TAB/LF/CR.
.replace(/[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]/g, '')
);
}
export class ConsoleSession {
readonly localTaskId: string;
readonly connectionId: string;
readonly ownerId: string | null;
readonly startedByUserId: string;
readonly startedAt: number;
cols: number;
rows: number;
private readonly channel: ClientChannel;
private readonly headless: HeadlessTerminal;
private readonly scrollback: ByteRingBuffer;
private readonly auditRepo: SshAuditRepo;
private _lastActivityAt: number;
private _totalInputBytes = 0;
private _totalOutputBytes = 0;
private closing = false;
private closed = false;
private outputListeners: Set<(chunk: Buffer) => void> = new Set();
private viewers: Set<ViewerHandle> = new Set();
constructor(args: ConsoleSessionArgs) {
this.localTaskId = args.localTaskId;
this.connectionId = args.connectionId;
this.ownerId = args.ownerId;
this.startedByUserId = args.startedByUserId;
this.startedAt = Date.now();
this._lastActivityAt = this.startedAt;
this.cols = args.cols;
this.rows = args.rows;
this.channel = args.channel;
this.scrollback = new ByteRingBuffer(args.scrollbackCap);
this.auditRepo = args.auditRepo;
this.headless = new HeadlessTerminal({
cols: args.cols,
rows: args.rows,
allowProposedApi: true,
// We use writeSync (still supported, listed as "deprecated" by xterm)
// to keep snapshotScreen() callable synchronously. Silence the
// one-shot deprecation warning that would otherwise spam logs.
logLevel: 'off',
});
this.channel.on('data', (data: Buffer) => this.handleOutput(data));
this.channel.on('close', () => {
if (!this.closing) {
this.close('host_disconnect').catch((e) =>
logger.warn(`[console-session] close error: ${(e as Error).message}`),
);
}
});
}
get lastActivityAt(): number {
return this._lastActivityAt;
}
get totalInputBytes(): number {
return this._totalInputBytes;
}
get totalOutputBytes(): number {
return this._totalOutputBytes;
}
get isClosed(): boolean {
return this.closed;
}
onOutput(listener: (chunk: Buffer) => void): () => void {
this.outputListeners.add(listener);
return () => {
this.outputListeners.delete(listener);
};
}
/**
* Register a WebSocket viewer attached to this session. Each viewer carries
* the acting userId and a close() closure that gracefully ends the WS.
*
* Returns an unsubscribe function the caller must invoke on ws 'close'.
* Used by `SessionRegistry.revokeAccessFor` to kick specific viewers when
* their grant is revoked, without tearing down the whole session.
*/
addViewer(handle: ViewerHandle): () => void {
this.viewers.add(handle);
return () => {
this.viewers.delete(handle);
};
}
listViewers(): ViewerHandle[] {
return [...this.viewers];
}
scrollbackBytes(): Buffer {
return this.scrollback.concat();
}
write(buf: Buffer, source: InputSource): void {
if (this.closed) return;
this._lastActivityAt = Date.now();
// Both human and AI inputs are forwarded byte-for-byte to the PTY,
// so the shell's local echo is what drives what appears on screen —
// same path xterm.js takes for human keystrokes. AI input also has
// its LF terminators rewritten to CR because PTY cooked mode
// (ICRNL) expects CR as Enter; browser xterm sends CR for Enter, so
// this aligns AI and human flows. Without it, bash would see
// "ls -la\n" as a single literal character and never execute.
//
// Deny-list enforcement on full lines happens upstream:
// - human input → checkConsoleInput in console-ws-api at line term
// - AI input → checkConsoleInput in sendInput before reaching here
// Partial input (no newline) is forwarded so the shell can echo each
// character back, matching the live terminal experience the user
// expects in either role.
const out = source === 'ai' ? normalizeLfToCr(buf) : buf;
this._totalInputBytes += out.length;
const ok = this.channel.write(out);
if (!ok) {
// ssh2 channel signals back-pressure when the SSH send window
// shrinks below the chunk size. The write is still buffered
// internally and will be flushed on 'drain', but the input echo
// from the shell will be delayed. Log so we can correlate user
// reports of "freeze" with actual flow-control events.
logger.warn(
`[console-session] ${source} channel.write back-pressure task=${this.localTaskId} bytes=${out.length}`,
);
}
}
resize(cols: number, rows: number): void {
if (this.closed) return;
this.cols = cols;
this.rows = rows;
this.channel.setWindow(rows, cols, 0, 0);
this.headless.resize(cols, rows);
}
snapshotScreen(): ScreenSnapshot {
const buf = this.headless.buffer.active;
const lines: string[] = [];
for (let y = 0; y < this.rows; y++) {
const line = buf.getLine(buf.viewportY + y);
lines.push(line ? line.translateToString(true) : '');
}
return {
cols: this.cols,
rows: this.rows,
text: lines.join('\n'),
cursor: { x: buf.cursorX, y: buf.cursorY },
};
}
snapshotScrollback(opts: { maxBytes: number }): ScrollbackSnapshot {
const raw = this.scrollback.concat().toString('utf8');
const stripped = stripAnsi(raw);
if (stripped.length <= opts.maxBytes) {
return { text: stripped, byteCount: stripped.length, truncated: false };
}
return {
text: stripped.slice(stripped.length - opts.maxBytes),
byteCount: stripped.length,
truncated: true,
};
}
async close(reason: SessionCloseReason): Promise<void> {
if (this.closing) return;
this.closing = true;
this.closed = true;
try {
try {
this.channel.end();
} catch {
/* already gone */
}
try {
this.headless.dispose();
} catch {
/* idempotent */
}
this.auditRepo.beginAndComplete(
{
action: 'ssh.console.close',
connectionId: this.connectionId,
ownerId: this.ownerId,
actingUserId: this.startedByUserId,
detail: {
reason,
duration_ms: Date.now() - this.startedAt,
total_input_bytes: this._totalInputBytes,
total_output_bytes: this._totalOutputBytes,
},
},
'success',
);
} finally {
this.outputListeners.clear();
this.viewers.clear();
}
}
private handleOutput(data: Buffer): void {
this._totalOutputBytes += data.length;
this._lastActivityAt = Date.now();
this.scrollback.append(data);
this.writeToHeadlessSync(data);
for (const l of this.outputListeners) {
try {
l(data);
} catch (e) {
logger.warn(`[console-session] listener error: ${(e as Error).message}`);
}
}
}
/**
* Write data to the headless xterm so that buffer reads in the same tick
* see it. The public Terminal.write() is async (parser runs via a
* scheduler). We use _core._writeBuffer.writeSync() — an internal path
* marked "deprecated" but in fact still the documented escape hatch for
* server-side rendering. If a future xterm release removes it, the unit
* test will fail and we'll need to make snapshotScreen() async.
*/
private writeToHeadlessSync(data: Buffer): void {
interface HeadlessInternals {
_core?: {
_writeBuffer?: {
writeSync?: (data: Uint8Array | string) => void;
};
};
}
const internals = this.headless as unknown as HeadlessInternals;
const ws = internals._core?._writeBuffer?.writeSync;
if (typeof ws === 'function') {
ws.call(internals._core!._writeBuffer, data);
} else {
this.headless.write(data);
}
}
}