maestro/src/bridge/local-tasks-api.ts
oss-sync 483464597a
Some checks failed
CI / build-and-test (push) Has been cancelled
sync: update from private repo (a360d15)
2026-06-09 09:19:09 +00:00

688 lines
29 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import express, { type Application, type Request, type Response } from 'express';
import { mkdirSync, writeFileSync } from 'fs';
import { join } from 'path';
import { Repository, localTaskRepoName } from '../db/repository.js';
import type { BrowserSessionRepo } from '../db/browser-session-repo.js';
import { logger } from '../logger.js';
import { resolveJobScheduling } from '../scheduling.js';
import { parseTaskId, validateCreateTaskBody, validateCommentBody, validateFeedbackBody } from './validation.js';
import { getLocalWorkspacePath, checkTaskOwnership, canViewTask } from './local-api-helpers.js';
import { jobEventBus, type JobStreamEvent } from './job-events.js';
export interface LocalTasksApiOptions {
repo: Repository;
worktreeDir?: string;
generateTitle?: (body: string) => Promise<string>;
selectPiece?: (body: string, fileNames: string[], userId?: string) => Promise<string>;
/**
* Server-side validator for piece names accepted by the
* /continue endpoint. Returns true if the piece is loadable.
* When unset, /continue rejects all requests with 500 (misconfiguration).
*
* `ownerId` is the task owner's user id; when provided the implementation
* MUST also check the owner's per-user piece dir so that user-custom pieces
* are accepted (matching the resolution order the worker uses at run time).
*/
pieceExists?: (name: string, ownerId?: string) => boolean;
/**
* Optional. When set, accepting browserSessionProfileId on task create
* verifies the profile belongs to the requesting user. Without it, the
* field is silently dropped (legacy / no-auth deployments).
*/
sessRepo?: BrowserSessionRepo;
/**
* Optional. Returns the current upload size limit (MB) for task creation
* and comment posting. Called per request so config changes take effect
* without a server restart. Clamped to [1, 1000] MB. Default: 50.
*/
getMaxUploadMb?: () => number;
/**
* Whether the auth subsystem is wired. When `false` (no-auth single-user
* deployment) requests carry no `req.user`, so new tasks/jobs are registered
* under the stable `local` owner instead of NULL — keeping per-task
* reflection / visibility lookups (which key on owner_id) consistent with the
* rest of the no-auth path. Defaults to `true` (owner stays req.user.id).
*/
authActive?: boolean;
}
export function mountLocalTasksApi(app: Application, opts: LocalTasksApiOptions): void {
const { repo, worktreeDir, sessRepo } = opts;
// No-auth single-user mode: register new tasks/jobs under the 'local' owner
// (the same namespace pieces/memory/reflection use) rather than letting the
// missing req.user fall through to a NULL owner. In auth mode this is
// undefined, so `req.user.id ?? noAuthOwner` keeps the real owner.
const noAuthOwner: string | undefined = (opts.authActive ?? true) ? undefined : 'local';
const resolveUploadLimit = (): string => {
const raw = opts.getMaxUploadMb?.() ?? 50;
const mb = Number.isFinite(raw) ? Math.max(1, Math.min(1000, Math.floor(raw))) : 50;
return `${mb}mb`;
};
const dynamicJson = () => (req: Request, res: Response, next: express.NextFunction) =>
express.json({ limit: resolveUploadLimit() })(req, res, next);
app.get('/api/local/tasks', async (req: Request, res: Response) => {
try {
const viewer = req.user as Express.User | undefined;
const tasks = await repo.listLocalTasks(viewer ? { viewer } : {});
res.json({ tasks });
} catch (err) {
logger.error(`Local tasks list API error: ${err}`);
res.status(500).json({ error: 'Failed to fetch local tasks' });
}
});
app.post('/api/local/tasks', dynamicJson(), async (req: Request, res: Response) => {
try {
const validation = validateCreateTaskBody(req.body);
if (!validation.valid) {
res.status(400).json({ error: validation.error });
return;
}
const body = validation.data;
// Visibility extraction + validation
const rawVisibility = req.body?.visibility ?? 'private';
if (!['private', 'org', 'public'].includes(rawVisibility)) {
res.status(400).json({ error: 'invalid visibility' });
return;
}
const visibility = rawVisibility as 'private' | 'org' | 'public';
const rawScopeOrgId = req.body?.visibilityScopeOrgId;
const visibilityScopeOrgId: string | null =
typeof rawScopeOrgId === 'string' && rawScopeOrgId.length > 0 ? rawScopeOrgId : null;
if (visibility === 'org') {
const orgIds = (req.user as Express.User | undefined)?.orgIds ?? [];
if (!visibilityScopeOrgId || !orgIds.includes(visibilityScopeOrgId)) {
res.status(400).json({ error: 'visibility_scope_org_id must be one of your orgs' });
return;
}
}
// Optional browser session profile binding. Owner-scoped check
// (sessRepo.getProfileById enforces owner_id = req.user.id) prevents
// user A from binding user B's profile to their task.
let browserSessionProfileId: number | null = null;
const rawProfileId = req.body?.browserSessionProfileId;
if (rawProfileId !== undefined && rawProfileId !== null && rawProfileId !== '') {
const n = Number(rawProfileId);
if (!Number.isInteger(n) || n <= 0) {
res.status(400).json({ error: 'browserSessionProfileId must be a positive integer' });
return;
}
if (sessRepo) {
const userId = (req.user as Express.User | undefined)?.id;
if (!userId) {
res.status(400).json({ error: 'browserSessionProfileId requires an authenticated user' });
return;
}
const owned = sessRepo.getProfileById(n, userId);
if (!owned) {
res.status(400).json({ error: 'browser session profile not found or not owned by you' });
return;
}
}
browserSessionProfileId = n;
}
let taskTitle = (body.title ?? '').trim();
const rawPiece = (body.piece ?? 'auto').trim();
const attachmentNames = (body.attachments ?? []).map((a: { name?: string }) => a.name).filter(Boolean) as string[];
// タイトル生成と piece 分類を並列実行
const [generatedTitle, autoSelectedPiece] = await Promise.all([
// タイトル生成
(!taskTitle && opts.generateTitle)
? Promise.race([
opts.generateTitle(body.body.trim()),
new Promise<string>((_, reject) => setTimeout(() => reject(new Error('timeout')), 8000)),
]).catch((e: unknown) => { logger.warn(`Title generation failed: ${e}`); return ''; })
: Promise.resolve(''),
// piece 分類('auto' の場合のみ); userId を渡し per-user カタログを使用
(rawPiece === 'auto' && opts.selectPiece)
? opts.selectPiece(body.body.trim(), attachmentNames, (req.user as Express.User | undefined)?.id).catch((e: unknown) => { logger.warn(`Piece classification failed: ${e}`); return 'chat'; })
: Promise.resolve(rawPiece),
]);
if (!taskTitle) {
taskTitle = generatedTitle || body.body.trim().slice(0, 40).replace(/\n/g, ' ');
}
const piece = autoSelectedPiece;
const profile = body.profile ?? 'auto';
const outputFormat = body.outputFormat ?? 'markdown';
const askPolicy = body.askPolicy ?? 'low';
const priority = body.priority ?? 'medium';
const scheduling = resolveJobScheduling({
role: profile,
pieceName: piece,
instruction: body.body.trim(),
});
// Per-task options (e.g. { mcpDisabled, skillsDisabled })
const rawOptions = req.body?.options;
const taskOptions: Record<string, unknown> =
rawOptions && typeof rawOptions === 'object' && !Array.isArray(rawOptions)
? rawOptions as Record<string, unknown>
: {};
const task = await repo.createLocalTask({
title: taskTitle,
body: body.body.trim(),
pieceName: piece,
profile,
outputFormat,
askPolicy,
priority,
ownerId: req.user?.id ?? noAuthOwner,
visibility,
visibilityScopeOrgId: visibility === 'org' ? visibilityScopeOrgId : null,
browserSessionProfileId,
options: taskOptions,
});
const workspacePath = getLocalWorkspacePath(worktreeDir, task.id);
mkdirSync(join(workspacePath, 'input'), { recursive: true });
mkdirSync(join(workspacePath, 'output'), { recursive: true });
mkdirSync(join(workspacePath, 'logs'), { recursive: true });
await repo.updateLocalTask(task.id, { workspacePath });
for (const att of body.attachments ?? []) {
if (!att.name || !att.contentBase64) continue;
const safeName = att.name.replace(/[\\/]/g, '_');
writeFileSync(join(workspacePath, 'input', safeName), Buffer.from(att.contentBase64, 'base64'));
}
await repo.addLocalTaskComment(task.id, 'user', body.body.trim(), 'request');
const metadataBlock = [
'---',
`ui_profile: ${scheduling.role}`,
`ui_output_format: ${outputFormat}`,
`ui_ask_policy: ${askPolicy}`,
`ui_priority: ${priority}`,
'---',
].join('\n');
const instruction = `${taskTitle}\n\n${body.body.trim()}\n\n${metadataBlock}`.trim();
// Merge task options into job payload so the worker can read them at runtime.
const hasOptions = Object.keys(taskOptions).length > 0;
const job = await repo.createJob({
repo: localTaskRepoName(task.id),
issueNumber: task.id,
instruction,
pieceName: piece,
role: scheduling.role,
ownerId: task.ownerId,
visibility: task.visibility,
visibilityScopeOrgId: task.visibilityScopeOrgId,
browserSessionProfileId: task.browserSessionProfileId ?? null,
payload: hasOptions ? JSON.stringify({ options: taskOptions }) : undefined,
});
await repo.addAuditLog(job.id, 'job_queued_local_create', 'local-ui', { taskId: task.id });
if (rawPiece === 'auto') {
await repo.addAuditLog(job.id, 'piece_auto_selected', 'piece-classifier', {
selectedPiece: piece,
});
}
const created = await repo.getLocalTask(task.id);
res.status(201).json({ task: created, jobId: job.id });
} catch (err) {
logger.error(`Create local task API error: ${err}`);
res.status(500).json({ error: 'Failed to create local task' });
}
});
app.get('/api/local/tasks/:taskId', async (req: Request, res: Response) => {
try {
const taskId = parseTaskId(req.params.taskId);
if (taskId === null) {
res.status(400).json({ error: 'Invalid task ID' });
return;
}
const viewer = req.user as Express.User | undefined;
const task = await repo.getLocalTask(taskId, viewer ? { viewer } : undefined);
if (!canViewTask(req, res, task)) return;
res.json({ task });
} catch (err) {
logger.error(`Local task detail API error: ${err}`);
res.status(500).json({ error: 'Failed to fetch local task' });
}
});
app.put('/api/local/tasks/:taskId/feedback', express.json(), async (req: Request, res: Response) => {
try {
const taskId = parseTaskId(req.params.taskId);
if (taskId === null) {
res.status(400).json({ error: 'Invalid task ID' });
return;
}
const validation = validateFeedbackBody(req.body);
if (!validation.valid) {
res.status(400).json({ error: validation.error });
return;
}
const viewer = req.user as Express.User | undefined;
const task = await repo.getLocalTask(taskId, viewer ? { viewer } : undefined);
if (!checkTaskOwnership(req, res, task)) return;
await repo.updateFeedback(taskId, validation.data);
const updated = await repo.getLocalTask(taskId);
res.json({ task: updated });
} catch (err) {
logger.error(`Local task feedback API error: ${err}`);
res.status(500).json({ error: 'Failed to update feedback' });
}
});
app.put('/api/local/tasks/:taskId/mission', express.json(), async (req: Request, res: Response) => {
try {
const taskId = parseTaskId(req.params.taskId);
if (taskId === null) {
res.status(400).json({ error: 'Invalid task ID' });
return;
}
const viewer = req.user as Express.User | undefined;
const task = await repo.getLocalTask(taskId, viewer ? { viewer } : undefined);
if (!checkTaskOwnership(req, res, task)) return;
// Partial-replace: only string fields are written. Anything else
// (null, undefined, non-string) is treated as "leave unchanged".
// To clear a field, send an empty string.
const body = (req.body ?? {}) as Record<string, unknown>;
const patch: Record<string, string> = {};
for (const key of ['goal', 'done', 'open', 'clarifications'] as const) {
const v = body[key];
if (typeof v === 'string') patch[key] = v;
}
if (Object.keys(patch).length === 0) {
res.status(400).json({ error: 'No mission fields provided. Send goal, done, open, or clarifications as strings.' });
return;
}
const merged = await repo.updateMissionBrief(taskId, patch);
res.json({ missionBrief: merged });
} catch (err) {
logger.error(`Local task mission API error: ${err}`);
res.status(500).json({ error: 'Failed to update mission brief' });
}
});
app.get('/api/local/tasks/:taskId/comments', async (req: Request, res: Response) => {
try {
const taskId = parseTaskId(req.params.taskId);
if (taskId === null) {
res.status(400).json({ error: 'Invalid task ID' });
return;
}
const viewer = req.user as Express.User | undefined;
const task = await repo.getLocalTask(taskId, viewer ? { viewer } : undefined);
if (!canViewTask(req, res, task)) return;
const comments = await repo.listLocalTaskComments(taskId);
res.json({ comments });
} catch (err) {
logger.error(`Local task comments API error: ${err}`);
res.status(500).json({ error: 'Failed to fetch local task comments' });
}
});
app.post('/api/local/tasks/:taskId/comments', dynamicJson(), async (req: Request, res: Response) => {
try {
const taskId = parseTaskId(req.params.taskId);
if (taskId === null) {
res.status(400).json({ error: 'Invalid task ID' });
return;
}
const commentValidation = validateCommentBody(req.body);
if (!commentValidation.valid) {
res.status(400).json({ error: commentValidation.error });
return;
}
const { body, author, attachments } = commentValidation;
const viewer = req.user as Express.User | undefined;
const task = await repo.getLocalTask(taskId, viewer ? { viewer } : undefined);
if (!checkTaskOwnership(req, res, task)) return;
// Save attachments to input/
if (attachments && attachments.length > 0 && task?.workspacePath) {
const inputDir = join(task.workspacePath, 'input');
mkdirSync(inputDir, { recursive: true });
for (const att of attachments) {
if (!att.name || !att.contentBase64) continue;
const safeName = att.name.replace(/[\\/]/g, '_');
writeFileSync(join(inputDir, safeName), Buffer.from(att.contentBase64, 'base64'));
}
}
const prevJob = await repo.getLatestJobForIssue(localTaskRepoName(taskId), taskId);
// running / dispatching / waiting_subtasks 中: コメント保存のみagent-loop が注入する)
const isActive = prevJob && (prevJob.status === 'running' || prevJob.status === 'dispatching' || prevJob.status === 'waiting_subtasks');
const commentKind = isActive ? 'interjection' : 'comment';
const comment = await repo.addLocalTaskComment(taskId, author, body, commentKind);
if (isActive) {
logger.info(`[local-tasks-api] interjection: comment ${comment.id} saved for ${prevJob!.status} job ${prevJob!.id} on task ${taskId}`);
res.status(201).json({ comment, jobId: prevJob!.id, interjection: true });
return;
}
const askCount = prevJob?.status === 'waiting_human' ? prevJob.askCount : 0;
const resumeMovement = prevJob?.status === 'waiting_human' ? prevJob.resumeMovement : null;
// Build instruction with attachment info
const savedFileNames = (attachments ?? [])
.filter(att => att.name && att.contentBase64)
.map(att => att.name.replace(/[\\/]/g, '_'));
const instruction = savedFileNames.length > 0
? `${body}\n\n添付ファイルinput/ に保存済み): ${savedFileNames.join(', ')}`
: body;
const job = await repo.createJob({
repo: localTaskRepoName(taskId),
issueNumber: taskId,
instruction,
pieceName: task!.pieceName,
askCount,
resumeMovement,
role: prevJob?.requiredRole,
ownerId: task!.ownerId,
visibility: task!.visibility,
visibilityScopeOrgId: task!.visibilityScopeOrgId,
browserSessionProfileId: task!.browserSessionProfileId ?? null,
});
await repo.addAuditLog(job.id, 'job_queued_local_comment', author, { taskId });
res.status(201).json({ comment, jobId: job.id });
} catch (err) {
logger.error(`Local task comment create API error: ${err}`);
res.status(500).json({ error: 'Failed to post local task comment' });
}
});
app.patch('/api/local/tasks/:taskId', express.json(), async (req: Request, res: Response) => {
try {
const taskId = parseTaskId(req.params.taskId);
if (taskId === null) { res.status(400).json({ error: 'Invalid task ID' }); return; }
const task = await repo.getLocalTask(taskId, { viewer: req.user as Express.User | undefined });
if (!checkTaskOwnership(req, res, task)) return;
const updates: { visibility?: 'private' | 'org' | 'public'; visibilityScopeOrgId?: string | null } = {};
if (req.body.visibility !== undefined) {
const v = req.body.visibility;
if (!['private', 'org', 'public'].includes(v)) {
res.status(400).json({ error: 'invalid visibility' }); return;
}
updates.visibility = v;
}
if (req.body.visibilityScopeOrgId !== undefined) {
updates.visibilityScopeOrgId = req.body.visibilityScopeOrgId ?? null;
}
if (updates.visibility === 'org') {
const orgIds = (req.user as Express.User | undefined)?.orgIds ?? [];
const scopeId = updates.visibilityScopeOrgId ?? task!.visibilityScopeOrgId ?? null;
if (!scopeId || !orgIds.includes(scopeId)) {
res.status(400).json({ error: 'visibility_scope_org_id must be one of your orgs' }); return;
}
updates.visibilityScopeOrgId = scopeId;
}
if (updates.visibility && updates.visibility !== 'org') {
updates.visibilityScopeOrgId = null;
}
await repo.updateLocalTask(taskId, updates);
const refreshed = await repo.getLocalTask(taskId, { viewer: req.user as Express.User | undefined });
if ((updates.visibility !== undefined || updates.visibilityScopeOrgId !== undefined) && refreshed) {
await repo.updateJobsVisibilityForTask(taskId, {
visibility: refreshed.visibility ?? 'private',
visibilityScopeOrgId: refreshed.visibilityScopeOrgId ?? null,
});
}
res.json({ task: refreshed });
} catch (err) {
logger.error(`Patch local task API error: ${err}`);
res.status(500).json({ error: 'Failed to update task' });
}
});
app.delete('/api/local/tasks/:taskId', async (req: Request, res: Response) => {
try {
const taskId = parseTaskId(req.params.taskId);
if (taskId === null) {
res.status(400).json({ error: 'Invalid task ID' });
return;
}
const task = await repo.getLocalTask(taskId, { viewer: req.user as Express.User | undefined });
if (!checkTaskOwnership(req, res, task)) return;
await repo.deleteLocalTask(taskId);
res.json({ ok: true });
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
if (message.includes('has an active job')) {
res.status(409).json({ error: 'Cannot delete task with running jobs' });
return;
}
logger.error(`Delete local task API error: ${err}`);
res.status(500).json({ error: 'Failed to delete local task' });
}
});
app.post('/api/local/tasks/:taskId/cancel', async (req: Request, res: Response) => {
try {
const taskId = parseTaskId(req.params.taskId);
if (taskId === null) {
res.status(400).json({ error: 'Invalid task ID' });
return;
}
const viewer = req.user as Express.User | undefined;
const task = await repo.getLocalTask(taskId, viewer ? { viewer } : undefined);
if (!checkTaskOwnership(req, res, task)) return;
const latestJob = await repo.getLatestJobForIssue(localTaskRepoName(taskId), taskId);
if (!latestJob || !['running', 'dispatching'].includes(latestJob.status)) {
res.status(404).json({ error: 'No running job found' });
return;
}
const cancelled = repo.requestJobCancel(latestJob.id);
if (!cancelled) {
res.status(409).json({ error: 'Job is no longer running' });
return;
}
await repo.addAuditLog(latestJob.id, 'job_cancel_requested', 'local-ui', { taskId });
logger.info(`Cancel requested for job ${latestJob.id} (task ${taskId})`);
res.json({ ok: true, jobId: latestJob.id });
} catch (err) {
logger.error(`Cancel local task API error: ${err}`);
res.status(500).json({ error: 'Failed to cancel task' });
}
});
app.post('/api/local/tasks/:taskId/continue', express.json(), async (req: Request, res: Response) => {
try {
const taskId = parseTaskId(req.params.taskId);
if (taskId === null) {
res.status(400).json({ error: 'Invalid task ID' });
return;
}
const piece = typeof req.body?.piece === 'string' ? req.body.piece.trim() : '';
const instruction = typeof req.body?.instruction === 'string' ? req.body.instruction : '';
if (!piece) {
res.status(400).json({ error: 'piece_required' });
return;
}
if (!instruction.trim()) {
res.status(400).json({ error: 'instruction_required' });
return;
}
const viewer = req.user as Express.User | undefined;
const task = await repo.getLocalTask(taskId, viewer ? { viewer } : undefined);
if (!checkTaskOwnership(req, res, task)) return;
// Piece existence check (server-side; UI dropdown is best-effort).
if (!opts.pieceExists) {
logger.error('[local-tasks-api] /continue invoked but pieceExists option not configured');
res.status(500).json({ error: 'piece_validation_unavailable' });
return;
}
if (!opts.pieceExists(piece, task?.ownerId ?? undefined)) {
res.status(400).json({ error: 'piece_not_found', piece });
return;
}
const prevJob = await repo.getLatestJobForIssue(localTaskRepoName(taskId), taskId);
if (!prevJob) {
res.status(409).json({ error: 'no_previous_job' });
return;
}
// jobs.status CHECK には 'aborted' が無い (worker が abort 結果を 'failed' に集約するため)。
// 'waiting_subtasks' は子 job 待機の中間状態で、そこから別 piece に切り替えると孤立するので除外。
const TERMINAL: ReadonlyArray<string> = ['succeeded', 'failed', 'waiting_human', 'cancelled'];
if (!TERMINAL.includes(prevJob.status)) {
res.status(409).json({ error: 'job_in_progress', currentStatus: prevJob.status });
return;
}
const job = await repo.createJob({
repo: localTaskRepoName(taskId),
issueNumber: taskId,
instruction: instruction.trim(),
pieceName: piece,
continuedFromJobId: prevJob.id,
ownerId: task!.ownerId,
role: prevJob.requiredRole,
visibility: task!.visibility,
visibilityScopeOrgId: task!.visibilityScopeOrgId,
browserSessionProfileId: task!.browserSessionProfileId ?? null,
});
await repo.updateLocalTask(taskId, { pieceName: piece });
// Surface the handoff in the timeline so the user (and the LLM, when
// it later inspects task comments) can see when piece switches happened.
await repo.addLocalTaskComment(
taskId,
'system',
`🔄 Continued: piece="${prevJob.pieceName}" → piece="${piece}"`,
'handoff',
);
await repo.addAuditLog(job.id, 'job_queued_local_continue', 'local-ui', {
taskId,
fromPiece: prevJob.pieceName,
toPiece: piece,
prevJobId: prevJob.id,
});
res.status(201).json({ jobId: job.id });
} catch (err) {
logger.error(`Local task continue API error: ${err}`);
res.status(500).json({ error: 'Failed to continue task' });
}
});
// ── SSE stream: real-time job events ──────────────────────────────────────
app.get('/api/local/tasks/:taskId/stream', async (req: Request, res: Response) => {
const taskId = parseTaskId(req.params.taskId);
if (taskId === null) { res.status(400).json({ error: 'invalid taskId' }); return; }
try {
const viewer = req.user as Express.User | undefined;
const task = await repo.getLocalTask(taskId, viewer ? { viewer } : {});
if (!task) { res.status(404).json({ error: 'task not found' }); return; }
const runningJob = task.latestJob;
if (!runningJob || (runningJob.status !== 'running' && runningJob.status !== 'dispatching')) {
res.status(204).end();
return;
}
const jobId = runningJob.id;
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-store');
res.setHeader('Connection', 'keep-alive');
res.setHeader('X-Accel-Buffering', 'no');
res.flushHeaders();
// Text delta batching (50ms flush)
let textBuf = '';
let flushTimer: ReturnType<typeof setTimeout> | null = null;
const TEXT_FLUSH_MS = 50;
// Tool-call argument delta batching, keyed by callId (50ms flush).
const toolBuf = new Map<string, { name: string; chunk: string }>();
let toolFlushTimer: ReturnType<typeof setTimeout> | null = null;
const flushText = () => {
if (textBuf) {
const data = JSON.stringify({ type: 'text_delta', text: textBuf });
res.write(`data: ${data}\n\n`);
textBuf = '';
}
flushTimer = null;
};
const flushToolDeltas = () => {
for (const [callId, { name, chunk }] of toolBuf) {
if (res.writableEnded) break;
res.write(`data: ${JSON.stringify({ type: 'tool_use_delta', callId, name, chunk })}\n\n`);
}
toolBuf.clear();
toolFlushTimer = null;
};
const handler = (event: JobStreamEvent) => {
if (res.writableEnded) return;
if (event.type === 'text') {
textBuf += event.text ?? '';
if (!flushTimer) flushTimer = setTimeout(flushText, TEXT_FLUSH_MS);
return;
}
if (event.type === 'tool_use_delta') {
const callId = event.callId ?? '';
// chunk is a full snapshot of args-so-far; keep the LATEST per
// callId (replace, not append) so each flush sends the newest
// complete prefix. Coalesces many snapshots into one per 50ms.
toolBuf.set(callId, {
name: event.name ?? toolBuf.get(callId)?.name ?? '',
chunk: event.chunk ?? '',
});
if (!toolFlushTimer) toolFlushTimer = setTimeout(flushToolDeltas, TEXT_FLUSH_MS);
return;
}
// Flush pending text + tool deltas before non-streaming events
if (textBuf) flushText();
if (toolBuf.size) flushToolDeltas();
if (event.type === 'prompt_progress') {
const effective = (event.processed ?? 0) - (event.cache ?? 0);
const effectiveTotal = (event.total ?? 0) - (event.cache ?? 0);
const percent = effectiveTotal > 0 ? Math.round(effective / effectiveTotal * 100) : 0;
res.write(`data: ${JSON.stringify({ type: 'prompt_progress', percent, processed: event.processed, total: event.total, cache: event.cache, timeMs: event.timeMs })}\n\n`);
} else if (event.type === 'done') {
res.write(`data: ${JSON.stringify({ type: 'done' })}\n\n`);
cleanup();
res.end();
} else {
res.write(`data: ${JSON.stringify(event)}\n\n`);
}
};
// Heartbeat to keep connection alive
const heartbeat = setInterval(() => {
if (!res.writableEnded) res.write(': heartbeat\n\n');
}, 15_000);
const cleanup = () => {
jobEventBus.offJob(jobId, handler);
clearInterval(heartbeat);
if (flushTimer) { clearTimeout(flushTimer); flushText(); }
if (toolFlushTimer) { clearTimeout(toolFlushTimer); flushToolDeltas(); }
};
jobEventBus.onJob(jobId, handler);
req.on('close', cleanup);
} catch (err) {
logger.error(`Local task stream API error: ${err}`);
if (!res.headersSent) res.status(500).json({ error: 'stream failed' });
}
});
}