maestro/src/db/repository.ts
oss-sync 641fe0177d
Some checks failed
CI / build-and-test (push) Has been cancelled
sync: update from private repo (bfcd4d5)
2026-06-11 15:12:40 +00:00

4080 lines
150 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import Database from 'better-sqlite3';
import { readFileSync, rmSync, existsSync } from 'fs';
import { fileURLToPath } from 'url';
import { dirname, join } from 'path';
import { randomUUID, scryptSync, randomBytes, timingSafeEqual } from 'crypto';
import { v4 as uuidv4 } from 'uuid';
import { logger } from '../logger.js';
import { buildVisibilityWhere } from '../bridge/visibility.js';
import { buildTitleFromGoal } from '../title-generation.js';
const __filename = fileURLToPath(import.meta.url);
const __dirname = dirname(__filename);
/**
* Shared SQL fragments for LocalTask read queries.
*
* getLocalTask / listLocalTasks / getLocalTaskByShareToken all need to expose
* the owner's display name and the org display name of `visibility_scope_org_id`.
* These constants keep the three queries in sync.
*
* Usage: splice into the SELECT list and the FROM-clause joins, e.g.
*
* SELECT lt.*, ${LOCAL_TASK_DISPLAY_SELECT}
* FROM local_tasks lt
* ${LOCAL_TASK_DISPLAY_JOIN}
* WHERE ...
*
* A correlated subquery (`MIN(org_name)`) is used instead of a JOIN because
* user_gitea_orgs is keyed per-user, and we only need any one display name
* for the org id — this avoids row-multiplication across the join.
*/
const LOCAL_TASK_DISPLAY_SELECT = `
u.name AS owner_name,
COALESCE(
(SELECT MIN(org_name) FROM user_gitea_orgs WHERE org_id = lt.visibility_scope_org_id),
(SELECT name FROM local_orgs WHERE id = lt.visibility_scope_org_id)
) AS visibility_scope_org_name
`.trim();
const LOCAL_TASK_DISPLAY_JOIN = `LEFT JOIN users u ON u.id = lt.owner_id`;
const SCHEDULED_TASK_DISPLAY_SELECT = `
u.name AS owner_name,
COALESCE(
(SELECT MIN(org_name) FROM user_gitea_orgs WHERE org_id = st.visibility_scope_org_id),
(SELECT name FROM local_orgs WHERE id = st.visibility_scope_org_id)
) AS visibility_scope_org_name
`.trim();
const SCHEDULED_TASK_DISPLAY_JOIN = `LEFT JOIN users u ON u.id = st.owner_id`;
export type JobStatus =
| 'queued'
| 'dispatching'
| 'running'
| 'succeeded'
| 'failed'
| 'retry'
| 'cancelled'
| 'waiting_human'
| 'waiting_subtasks';
export type JobRole = 'auto' | 'fast' | 'quality' | 'reflection';
/** @deprecated Use JobRole instead */
export type JobProfile = JobRole;
/** @deprecated Removed — taskClass is no longer a separate concept */
export type TaskClass = 'auto' | 'low_level' | 'high_level';
export interface Job {
id: string;
repo: string;
issueNumber: number;
prNumber: number | null;
status: JobStatus;
pieceName: string;
currentMovement: string | null;
currentActivity: string | null;
instruction: string;
branchName: string | null;
worktreePath: string | null;
attempt: number;
maxAttempts: number;
nextRetryAt: string | null;
errorSummary: string | null;
abortReason: string | null;
resumeMovement: string | null;
waitReason: string | null;
askCount: number;
workerId: string | null;
/**
* Physical backend id (e.g. LiteLLM deployment name) that handled
* this job's LLM calls when running through a proxy worker. Set on
* the first LLM call and never overwritten — sticky-backend policy
* per design Open Question #3. NULL for direct workers and for jobs
* that haven't issued any proxied LLM call yet.
*/
lastBackendId: string | null;
parentJobId: string | null;
continuedFromJobId: string | null;
subtaskDepth: number;
requiredRole: JobRole;
/** @deprecated Use requiredRole */
requiredProfile: JobRole;
ownerId: string | null;
visibility: 'private' | 'org' | 'public';
visibilityScopeOrgId: string | null;
contextPromptTokens: number | null;
contextLimitTokens: number | null;
contextUpdatedAt: string | null;
browserSessionProfileId?: number | null;
taskKind: 'agent' | 'reflection';
payload: string | null;
createdAt: string;
updatedAt: string;
}
export interface CreateJobParams {
repo: string;
issueNumber: number;
instruction: string;
pieceName?: string;
maxAttempts?: number;
resumeMovement?: string | null;
askCount?: number;
role?: JobRole;
/** @deprecated Use role instead */
profile?: JobRole;
parentJobId?: string | null;
continuedFromJobId?: string | null;
subtaskDepth?: number;
ownerId?: string | null;
visibility?: 'private' | 'org' | 'public';
visibilityScopeOrgId?: string | null;
browserSessionProfileId?: number | null;
taskKind?: 'agent' | 'reflection';
payload?: string;
}
export interface SubtaskInfo {
id: string;
issueNumber: number;
status: JobStatus;
instruction: string;
worktreePath: string | null;
createdAt: string;
updatedAt: string;
children?: SubtaskInfo[];
childCount?: number;
childCompleted?: number;
}
export type TitleSource = 'auto' | 'agent' | 'user';
export interface LocalTask {
id: number;
title: string;
/** Provenance of `title`. 'user' is never auto-overwritten by the agent. */
titleSource: TitleSource;
body: string;
pieceName: string;
profile: 'auto' | 'fast' | 'quality' | string;
outputFormat: 'text' | 'markdown' | 'json' | string;
askPolicy: 'low' | 'high' | string;
priority: 'low' | 'medium' | 'high' | string;
state: 'open' | 'closed' | string;
workspacePath: string | null;
ownerId: string | null;
ownerName?: string | null;
visibility: 'private' | 'org' | 'public';
visibilityScopeOrgId: string | null;
visibilityScopeOrgName?: string | null;
createdAt: string;
updatedAt: string;
feedbackRating: 'good' | 'bad' | null;
feedbackTags: string[] | null;
feedbackComment: string | null;
feedbackAt: string | null;
shareToken: string | null;
sharedAt: string | null;
browserSessionProfileId?: number | null;
/**
* Mission Brief: per-task pinned memo. Carries goal / done / open /
* clarifications. Always rendered at the top of every movement's
* system prompt. The LLM updates it via mission_update; the user
* edits it from the Overview tab.
*/
missionBrief: MissionBrief | null;
/** Per-task runtime options (e.g. { mcpDisabled, skillsDisabled }). */
options: Record<string, unknown>;
latestJob?: Job | null;
subtasks?: SubtaskInfo[];
subtaskCount?: number;
subtaskCompleted?: number;
}
/**
* Allowed widget kinds. 'markdown' is the original Side Info Panel widget
* (PR #308). 'node-status' was added in Phase B (2026-05) and ignores
* markdown_content — it renders BackendStatusRegistry data live. The union
* intentionally lives here so the API, tools and UI all share a single
* source of truth.
*/
export type DashboardWidgetKind = 'markdown' | 'node-status';
export const DASHBOARD_WIDGET_KINDS: readonly DashboardWidgetKind[] = ['markdown', 'node-status'];
export function isDashboardWidgetKind(value: unknown): value is DashboardWidgetKind {
return typeof value === 'string' && (DASHBOARD_WIDGET_KINDS as readonly string[]).includes(value);
}
export interface DashboardWidget {
id: number;
userId: string;
slug: string;
title: string;
kind: DashboardWidgetKind;
markdownContent: string;
sortOrder: number;
createdAt: string;
updatedAt: string;
}
interface DashboardWidgetRow {
id: number;
user_id: string;
slug: string;
title: string;
kind: string | null;
markdown_content: string;
sort_order: number;
created_at: string;
updated_at: string;
}
function rowToDashboardWidget(row: DashboardWidgetRow): DashboardWidget {
// Defensive default: schema sets kind to 'markdown' but rows persisted
// before the column was added can briefly show up as NULL while the
// migration is racing with a read. Coerce to the default safely.
const kind: DashboardWidgetKind = isDashboardWidgetKind(row.kind) ? row.kind : 'markdown';
return {
id: row.id,
userId: row.user_id,
slug: row.slug,
title: row.title,
kind,
markdownContent: row.markdown_content,
sortOrder: row.sort_order,
createdAt: row.created_at,
updatedAt: row.updated_at,
};
}
export interface MissionBrief {
goal: string;
done: string;
open: string;
clarifications: string;
}
// ── AAO Gateway Phase 2a: virtual keys ─────────────────────────────────
// Stored shape returned by all gateway-key repository methods. The raw
// bearer is NEVER persisted or returned — only `keyHash` (sha256) lives
// in the DB, and `keyPrefix` is the human-readable head used in admin UI
// lists (`sk-aao-XXXXXX`). The raw key surfaces exactly once, from the
// admin API on issue / rotate; see src/bridge/admin-gateway-api.ts.
export type GatewayVirtualKeySource = 'admin' | 'config-import';
export interface GatewayVirtualKey {
id: string;
keyHash: string;
keyPrefix: string;
team: string;
/** Null = no per-key allowlist (any backend.model is accepted). */
allowedModels: string[] | null;
source: GatewayVirtualKeySource;
createdAt: string;
createdBy: string | null;
/** ISO timestamp if revoked; null while active. */
revokedAt: string | null;
revokedBy: string | null;
lastUsedAt: string | null;
/**
* Phase 2b: monthly tokens budget. NULL = unlimited. When set, the
* gateway rejects requests with 402 once the current UTC month
* `tokens_in + tokens_out` reaches this number (post-hoc enforcement —
* the offending request that pushes the counter over the limit still
* completes; the next one is rejected).
*/
tokensBudget: number | null;
/**
* Phase 2b: per-key requests-per-minute cap. NULL = unlimited. Enforced
* as a sliding 60-second window in-process; multi-instance setups are
* intentionally NOT synchronized (Phase 3 if needed).
*/
rateLimitRpm: number | null;
}
interface GatewayVirtualKeyRow {
id: string;
key_hash: string;
key_prefix: string;
team: string;
allowed_models: string | null;
source: string;
created_at: string;
created_by: string | null;
revoked_at: string | null;
revoked_by: string | null;
last_used_at: string | null;
tokens_budget: number | null;
rate_limit_rpm: number | null;
}
// Phase 2b: monthly usage counter per virtual key.
export interface GatewayKeyUsage {
keyId: string;
/** UTC month bucket as 'YYYY-MM' — see src/gateway/period.ts. */
periodStart: string;
tokensIn: number;
tokensOut: number;
requests: number;
lastUpdatedAt: string;
}
interface GatewayKeyUsageRow {
key_id: string;
period_start: string;
tokens_in: number;
tokens_out: number;
requests: number;
last_updated_at: string;
}
function rowToGatewayKeyUsage(row: GatewayKeyUsageRow): GatewayKeyUsage {
return {
keyId: row.key_id,
periodStart: row.period_start,
tokensIn: row.tokens_in,
tokensOut: row.tokens_out,
requests: row.requests,
lastUpdatedAt: row.last_updated_at,
};
}
/** Per-call delta for the per-user daily LLM usage ledger. */
export interface LlmUsageIncrement {
/** UTC day bucket 'YYYY-MM-DD'. Defaults to today (UTC) when omitted. */
day?: string;
/** Owner id, or 'local' (no-auth) / 'system' (ownerless) sentinel. */
userId: string;
source: 'gateway' | 'direct';
/** Real model name (chunk.model), routing-key fallback, or 'unknown'. */
model: string;
/** Backend server name (gateway backendId / direct host), or 'unknown'. */
route: string;
tokensIn?: number;
tokensOut?: number;
requests?: number;
at?: string;
}
/** Daily-grouped aggregate row (model/route collapsed) for the usage API. */
export interface LlmUsageDailyAgg {
day: string;
userId: string;
source: string;
tokensIn: number;
tokensOut: number;
requests: number;
}
/** Hour-grain UPSERT input for the v2 usage ledger. */
export interface LlmUsageHourlyIncrement {
/** UTC hour bucket 'YYYY-MM-DDTHH'. Defaults to the current hour (UTC). */
hour?: string;
/** Owner id, or 'local' (no-auth) / 'system' (ownerless) sentinel. */
userId: string;
source: 'gateway' | 'direct';
/** Real model name (chunk.model), routing-key fallback, or 'unknown'. */
model: string;
/** Backend server name (gateway backendId / direct host), or 'unknown'. */
route: string;
tokensIn?: number;
tokensOut?: number;
requests?: number;
/** ISO timestamp for last_updated_at + hour default. Defaults to now. */
at?: string;
}
/**
* Raw hour-grain ledger row (no axis collapsed) for the usage API. The API
* re-buckets `hour` into the viewer's local calendar period and groups by
* whichever of source/model/route/user/org the request asked for.
*/
export interface LlmUsageHourlyRow {
hour: string;
userId: string;
source: string;
model: string;
route: string;
tokensIn: number;
tokensOut: number;
requests: number;
}
/**
* Coerce an optional limit (tokens_budget / rate_limit_rpm) to either
* a positive integer or null. Anything else (undefined, null, 0,
* negative, NaN, non-number) collapses to null = "no limit" so callers
* can't accidentally persist a value that would silently block all
* traffic.
*/
function normalizeOptionalPositiveInt(v: unknown): number | null {
if (v === undefined || v === null) return null;
if (typeof v !== 'number' || !Number.isFinite(v) || v <= 0) return null;
return Math.floor(v);
}
function rowToGatewayVirtualKey(row: GatewayVirtualKeyRow): GatewayVirtualKey {
let allowedModels: string[] | null = null;
if (row.allowed_models !== null && row.allowed_models !== '') {
try {
const parsed: unknown = JSON.parse(row.allowed_models);
if (Array.isArray(parsed) && parsed.every(x => typeof x === 'string')) {
allowedModels = parsed;
}
} catch {
// Corrupt JSON: treat as "no allowlist" (safer than fail-open
// because routing has its own backend.model gate; this is just
// the per-key filter on top).
allowedModels = null;
}
}
const source: GatewayVirtualKeySource =
row.source === 'config-import' ? 'config-import' : 'admin';
// tokens_budget / rate_limit_rpm may legitimately arrive as null (no
// limit). Coerce non-positive integers to null defensively because the
// gateway middleware treats null as "unlimited" — a corrupt `0` would
// otherwise silently block every request.
const tokensBudget =
typeof row.tokens_budget === 'number' && Number.isFinite(row.tokens_budget) && row.tokens_budget > 0
? Math.floor(row.tokens_budget)
: null;
const rateLimitRpm =
typeof row.rate_limit_rpm === 'number' && Number.isFinite(row.rate_limit_rpm) && row.rate_limit_rpm > 0
? Math.floor(row.rate_limit_rpm)
: null;
return {
id: row.id,
keyHash: row.key_hash,
keyPrefix: row.key_prefix,
team: row.team,
allowedModels,
source,
createdAt: row.created_at,
createdBy: row.created_by,
revokedAt: row.revoked_at,
revokedBy: row.revoked_by,
lastUsedAt: row.last_used_at,
tokensBudget,
rateLimitRpm,
};
}
export interface LocalTaskComment {
id: number;
taskId: number;
author: string;
kind: string;
body: string;
createdAt: string;
injectedAt: string | null;
}
export interface WorkerNode {
workerId: string;
endpoint: string;
enabled: boolean;
healthy: boolean;
roles: string[];
availableModels: string[];
inflightJobs: number;
maxConcurrency: number;
lastError: string | null;
lastSeenAt: string;
updatedAt: string;
}
export interface UpsertWorkerNodeParams {
workerId: string;
endpoint: string;
enabled: boolean;
healthy: boolean;
roles: string[];
availableModels?: string[];
inflightJobs?: number;
maxConcurrency?: number;
lastError?: string | null;
}
export interface CreateLocalTaskParams {
title: string;
/** Defaults to 'auto'. Pass 'user' when the caller supplied an explicit title. */
titleSource?: TitleSource;
body: string;
pieceName?: string;
profile?: 'auto' | 'fast' | 'quality';
outputFormat?: 'text' | 'markdown' | 'json';
askPolicy?: 'low' | 'high';
priority?: 'low' | 'medium' | 'high';
workspacePath?: string | null;
ownerId?: string | null;
visibility?: 'private' | 'org' | 'public';
visibilityScopeOrgId?: string | null;
browserSessionProfileId?: number | null;
/** Per-task runtime options (e.g. { mcpDisabled, skillsDisabled }). Stored as JSON. */
options?: Record<string, unknown>;
}
// ── Browser Notifications V2 (Web Push) ──────────────────────────────
// Spec: docs/superpowers/specs/2026-05-28-browser-notifications-v2-webpush.md
export type NotifyEventType = 'running' | 'succeeded' | 'failed' | 'waiting_human';
export interface PushSubscriptionRecord {
id: string;
userId: string;
endpoint: string;
p256dh: string;
auth: string;
userAgent: string | null;
vapidKeyId: string;
createdAt: string;
lastSuccessAt: string | null;
lastFailureAt: string | null;
failureCount: number;
}
export interface UpsertPushSubscriptionInput {
userId: string;
endpoint: string;
p256dh: string;
auth: string;
userAgent?: string | null;
vapidKeyId: string;
}
export interface NotificationPrefs {
userId: string;
enabled: boolean;
events: Record<NotifyEventType, boolean>;
includeDetails: boolean;
v1Migrated: boolean;
updatedAt: string;
}
export type NotificationPrefsUpdate = Partial<Omit<NotificationPrefs, 'userId' | 'updatedAt'>>;
export type ScheduledTaskKind = 'agent' | 'script';
export interface ScheduledTask {
id: number;
title: string | null;
body: string;
pieceName: string;
profile: string;
outputFormat: string;
cronExpression: string;
nextRunAt: string;
lastRunAt: string | null;
lastJobId: string | null;
isActive: boolean;
ownerId: string | null;
ownerName?: string | null;
visibility: 'private' | 'org' | 'public';
visibilityScopeOrgId: string | null;
visibilityScopeOrgName?: string | null;
browserSessionProfileId?: number | null;
taskKind: ScheduledTaskKind;
scriptName: string | null;
scriptParams: string | null; // JSON-encoded object or null
createdAt: string;
updatedAt: string;
}
export interface CreateScheduledTaskParams {
title?: string | null;
body: string;
pieceName?: string;
profile?: string;
outputFormat?: string;
cronExpression: string;
nextRunAt: string;
ownerId?: string | null;
visibility?: 'private' | 'org' | 'public';
visibilityScopeOrgId?: string | null;
browserSessionProfileId?: number | null;
taskKind?: ScheduledTaskKind;
scriptName?: string | null;
scriptParams?: string | null;
}
export interface UpdateScheduledTaskParams {
title?: string;
body?: string;
pieceName?: string;
profile?: string;
outputFormat?: string;
cronExpression?: string;
nextRunAt?: string;
lastRunAt?: string;
lastJobId?: string;
isActive?: boolean;
visibility?: 'private' | 'org' | 'public';
visibilityScopeOrgId?: string | null;
browserSessionProfileId?: number | null;
taskKind?: ScheduledTaskKind;
scriptName?: string | null;
scriptParams?: string | null;
}
export interface User {
id: string;
email: string;
name: string | null;
avatarUrl: string | null;
role: 'admin' | 'user';
status: 'active' | 'pending' | 'disabled';
defaultVisibility: 'private' | 'org' | 'public';
defaultVisibilityOrgId: string | null;
createdAt: string;
updatedAt: string;
}
export interface CreateUserParams {
email: string;
name: string;
role: 'admin' | 'user';
status: 'active' | 'pending' | 'disabled';
avatarUrl?: string;
}
export interface FindOrCreateByOAuthParams {
provider: string;
providerId: string;
email: string;
name: string;
avatarUrl?: string;
}
export interface CreateLocalUserParams {
email: string;
password: string;
role: 'admin' | 'user';
status: 'active' | 'pending' | 'disabled';
name?: string;
}
export interface LocalOrg {
id: string;
name: string;
createdBy: string | null;
createdAt: string;
}
export interface LocalOrgMember {
userId: string;
role: string;
}
export interface GiteaOrgInput {
orgId: string;
orgName: string;
}
export interface GiteaOrg extends GiteaOrgInput {
fetchedAt: string;
}
interface JobRow {
id: string;
repo: string;
issue_number: number;
pr_number: number | null;
status: string;
piece_name: string;
current_movement: string | null;
current_activity: string | null;
instruction: string;
branch_name: string | null;
worktree_path: string | null;
attempt: number;
max_attempts: number;
next_retry_at: string | null;
error_summary: string | null;
abort_reason: string | null;
resume_movement: string | null;
wait_reason: string | null;
ask_count: number;
worker_id: string | null;
last_backend_id: string | null;
parent_job_id: string | null;
continued_from_job_id: string | null;
subtask_depth: number;
required_profile: string;
task_class: string;
owner_id: string | null;
visibility: string | null;
visibility_scope_org_id: string | null;
context_prompt_tokens: number | null;
context_limit_tokens: number | null;
context_updated_at: string | null;
browser_session_profile_id: number | null;
task_kind: string;
payload: string | null;
created_at: string;
updated_at: string;
}
interface LocalTaskRow {
id: number;
title: string;
title_source: string | null;
body: string;
piece_name: string;
profile: string;
output_format: string;
ask_policy: string;
priority: string;
state: string;
workspace_path: string | null;
owner_id: string | null;
owner_name?: string | null;
visibility: string | null;
visibility_scope_org_id: string | null;
visibility_scope_org_name?: string | null;
created_at: string;
updated_at: string;
feedback_rating: string | null;
feedback_comment: string | null;
feedback_tags: string | null;
feedback_at: string | null;
share_token: string | null;
shared_at: string | null;
mission_brief: string | null;
browser_session_profile_id: number | null;
options: string | null;
}
interface LocalTaskCommentRow {
id: number;
task_id: number;
author: string;
kind: string;
body: string;
created_at: string;
injected_at: string | null;
}
interface UserRow {
id: string;
email: string;
name: string | null;
avatar_url: string | null;
role: string;
status: string;
default_visibility: string | null;
default_visibility_org_id: string | null;
created_at: string;
updated_at: string;
}
interface WorkerNodeRow {
worker_id: string;
endpoint: string;
enabled: number;
healthy: number;
profile_tags: string;
task_class_tags: string;
available_models: string | null;
inflight_jobs: number;
max_concurrency: number;
last_error: string | null;
last_seen_at: string;
updated_at: string;
}
function isJobRole(value: string): value is JobRole {
return value === 'auto' || value === 'fast' || value === 'quality' || value === 'reflection';
}
function normalizeJobRole(value: string | undefined): JobRole {
return value && isJobRole(value) ? value : 'auto';
}
function encodeTags(values: string[]): string {
const unique = Array.from(new Set(values.filter(Boolean)));
return `,${unique.join(',')},`;
}
function decodeTags(raw: string | null): string[] {
if (!raw) return [];
return raw.split(',').map((value) => value.trim()).filter(Boolean);
}
function decodeAvailableModels(raw: string | null): string[] {
if (!raw) return [];
try {
const parsed = JSON.parse(raw) as unknown;
return Array.isArray(parsed) ? parsed.filter((value): value is string => typeof value === 'string') : [];
} catch {
return [];
}
}
function deriveJobRole(instruction: string, explicitRole?: JobRole): JobRole {
if (explicitRole) return explicitRole;
const match = /ui_profile:\s*(auto|fast|quality)/i.exec(instruction);
return normalizeJobRole(match?.[1]?.toLowerCase());
}
/** SQLite datetime('now') は UTC だがタイムゾーン情報なし。'Z' を付加して ISO 8601 UTC として明示する */
function utc(dt: string | null): string {
if (!dt) return '';
// 既に Z や +/- オフセットが付いていれば何もしない
if (/[Zz]$/.test(dt) || /[+-]\d{2}:\d{2}$/.test(dt)) return dt;
return dt.replace(' ', 'T') + 'Z';
}
function rowToJob(row: JobRow): Job {
return {
id: row.id,
repo: row.repo,
issueNumber: row.issue_number,
prNumber: row.pr_number,
status: row.status as JobStatus,
pieceName: row.piece_name,
currentMovement: row.current_movement,
currentActivity: row.current_activity,
instruction: row.instruction,
branchName: row.branch_name,
worktreePath: row.worktree_path,
attempt: row.attempt,
maxAttempts: row.max_attempts,
nextRetryAt: utc(row.next_retry_at),
errorSummary: row.error_summary,
abortReason: row.abort_reason ?? null,
resumeMovement: row.resume_movement,
waitReason: row.wait_reason ?? null,
askCount: row.ask_count,
workerId: row.worker_id,
lastBackendId: row.last_backend_id ?? null,
parentJobId: row.parent_job_id,
continuedFromJobId: row.continued_from_job_id ?? null,
subtaskDepth: row.subtask_depth ?? 0,
requiredRole: normalizeJobRole(row.required_profile),
requiredProfile: normalizeJobRole(row.required_profile),
ownerId: row.owner_id ?? null,
visibility: (row.visibility === 'org' || row.visibility === 'public' ? row.visibility : 'private'),
visibilityScopeOrgId: row.visibility_scope_org_id ?? null,
contextPromptTokens: row.context_prompt_tokens,
contextLimitTokens: row.context_limit_tokens,
contextUpdatedAt: row.context_updated_at ? utc(row.context_updated_at) : null,
browserSessionProfileId: row.browser_session_profile_id ?? null,
taskKind: row.task_kind === 'reflection' ? 'reflection' : 'agent',
payload: row.payload,
createdAt: utc(row.created_at),
updatedAt: utc(row.updated_at),
};
}
function rowToLocalTask(row: LocalTaskRow): LocalTask {
return {
id: row.id,
title: row.title,
titleSource: (row.title_source as TitleSource | null) ?? 'auto',
body: row.body,
pieceName: row.piece_name,
profile: row.profile,
outputFormat: row.output_format,
askPolicy: row.ask_policy,
priority: row.priority,
state: row.state,
workspacePath: row.workspace_path,
ownerId: row.owner_id ?? null,
ownerName: row.owner_name ?? null,
visibility: (row.visibility ?? 'private') as LocalTask['visibility'],
visibilityScopeOrgId: row.visibility_scope_org_id ?? null,
visibilityScopeOrgName: row.visibility_scope_org_name ?? null,
createdAt: utc(row.created_at),
updatedAt: utc(row.updated_at),
feedbackRating: (row.feedback_rating as 'good' | 'bad' | null) ?? null,
feedbackTags: row.feedback_tags ? JSON.parse(row.feedback_tags) : null,
feedbackComment: row.feedback_comment ?? null,
feedbackAt: row.feedback_at ? utc(row.feedback_at) : null,
shareToken: row.share_token ?? null,
sharedAt: row.shared_at ? utc(row.shared_at) : null,
browserSessionProfileId: row.browser_session_profile_id ?? null,
missionBrief: parseMissionBrief(row.mission_brief),
options: parseTaskOptions(row.options),
};
}
function parseMissionBrief(raw: string | null | undefined): MissionBrief | null {
if (!raw) return null;
try {
const parsed = JSON.parse(raw);
if (!parsed || typeof parsed !== 'object') return null;
const goal = typeof parsed.goal === 'string' ? parsed.goal : '';
const done = typeof parsed.done === 'string' ? parsed.done : '';
const open = typeof parsed.open === 'string' ? parsed.open : '';
const clarifications = typeof parsed.clarifications === 'string' ? parsed.clarifications : '';
if (!goal && !done && !open && !clarifications) return null;
return { goal, done, open, clarifications };
} catch {
return null;
}
}
function parseTaskOptions(raw: string | null | undefined): Record<string, unknown> {
if (!raw) return {};
try {
const parsed = JSON.parse(raw);
if (parsed && typeof parsed === 'object' && !Array.isArray(parsed)) {
return parsed as Record<string, unknown>;
}
return {};
} catch {
return {};
}
}
function rowToLocalTaskComment(row: LocalTaskCommentRow): LocalTaskComment {
return {
id: row.id,
taskId: row.task_id,
author: row.author,
kind: row.kind,
body: row.body,
createdAt: utc(row.created_at),
injectedAt: row.injected_at ? utc(row.injected_at) : null,
};
}
function rowToWorkerNode(row: WorkerNodeRow): WorkerNode {
return {
workerId: row.worker_id,
endpoint: row.endpoint,
enabled: row.enabled === 1,
healthy: row.healthy === 1,
roles: decodeTags(row.profile_tags),
availableModels: decodeAvailableModels(row.available_models),
inflightJobs: row.inflight_jobs,
maxConcurrency: row.max_concurrency,
lastError: row.last_error,
lastSeenAt: utc(row.last_seen_at),
updatedAt: utc(row.updated_at),
};
}
function rowToUser(row: UserRow): User {
return {
id: row.id,
email: row.email,
name: row.name,
avatarUrl: row.avatar_url,
role: row.role as 'admin' | 'user',
status: row.status as 'active' | 'pending' | 'disabled',
defaultVisibility: (row.default_visibility ?? 'private') as User['defaultVisibility'],
defaultVisibilityOrgId: row.default_visibility_org_id,
createdAt: utc(row.created_at),
updatedAt: utc(row.updated_at),
};
}
export function localTaskRepoName(taskId: number): string {
return `local/task-${taskId}`;
}
export class Repository {
private readonly db: Database.Database;
constructor(dbPath: string) {
this.db = new Database(dbPath);
this.db.pragma('journal_mode = WAL');
this.db.pragma('foreign_keys = ON');
this.db.pragma('busy_timeout = 5000');
this.initSchema();
logger.info(`Repository: initialized at ${dbPath}`);
}
private initSchema(): void {
const schemaPath = join(__dirname, 'schema.sql');
const schema = readFileSync(schemaPath, 'utf-8');
this.db.exec(schema);
this.ensureColumn('jobs', 'required_profile', "TEXT NOT NULL DEFAULT 'auto'");
this.ensureColumn('jobs', 'task_class', "TEXT NOT NULL DEFAULT 'auto'");
this.ensureColumn('worker_nodes', 'profile_tags', "TEXT NOT NULL DEFAULT ',auto,'");
this.ensureColumn('worker_nodes', 'task_class_tags', "TEXT NOT NULL DEFAULT ',auto,'");
this.ensureColumn('worker_nodes', 'available_models', 'TEXT');
this.ensureColumn('worker_nodes', 'max_concurrency', 'INTEGER NOT NULL DEFAULT 1');
this.ensureColumn('worker_nodes', 'last_error', 'TEXT');
this.db.exec("CREATE INDEX IF NOT EXISTS idx_jobs_profile_task_class ON jobs (status, required_profile, task_class)");
this.db.exec("CREATE INDEX IF NOT EXISTS idx_worker_nodes_health ON worker_nodes (enabled, healthy, last_seen_at)");
this.ensureColumn('jobs', 'parent_job_id', 'TEXT');
this.ensureColumn('jobs', 'subtask_depth', 'INTEGER NOT NULL DEFAULT 0');
this.ensureColumn('jobs', 'wait_reason', 'TEXT');
this.ensureColumn('jobs', 'continued_from_job_id', 'TEXT');
// Phase A (multi-team GPU pool): physical backend id assigned when
// the worker is a proxy. Sticky once set; never overwritten mid-job.
this.ensureColumn('jobs', 'last_backend_id', 'TEXT');
this.db.exec("CREATE INDEX IF NOT EXISTS idx_jobs_parent_job_id ON jobs (parent_job_id)");
this.ensureColumn('local_tasks', 'feedback_rating', 'TEXT');
this.ensureColumn('local_tasks', 'feedback_comment', 'TEXT');
this.ensureColumn('local_tasks', 'feedback_tags', 'TEXT');
this.ensureColumn('local_tasks', 'feedback_at', 'TEXT');
this.migrateWaitingSubtasksStatus();
// Auth migrations: owner_id columns
this.ensureColumn('jobs', 'owner_id', 'TEXT');
this.ensureColumn('local_tasks', 'owner_id', 'TEXT');
// #142: 実行中アクティビティ表示
this.ensureColumn('jobs', 'current_activity', 'TEXT');
// abortReason 細分化: agent-loop / piece-runner が出す構造化 abort code を保持
this.ensureColumn('jobs', 'abort_reason', 'TEXT');
// #134: 共有機能
this.ensureColumn('local_tasks', 'share_token', 'TEXT');
this.ensureColumn('local_tasks', 'shared_at', 'TEXT');
this.db.exec("CREATE UNIQUE INDEX IF NOT EXISTS idx_local_tasks_share_token ON local_tasks (share_token)");
// Ownership and visibility columns (3 tables)
for (const table of ['local_tasks', 'scheduled_tasks', 'jobs']) {
this.ensureColumn(table, 'owner_id', 'TEXT');
this.ensureColumn(table, 'visibility', "TEXT NOT NULL DEFAULT 'private'");
this.ensureColumn(table, 'visibility_scope_org_id', 'TEXT');
}
// User preferences
this.ensureColumn('users', 'default_visibility', "TEXT NOT NULL DEFAULT 'private'");
this.ensureColumn('users', 'default_visibility_org_id', 'TEXT');
// Indexes and user_gitea_orgs table
this.db.exec(`
CREATE INDEX IF NOT EXISTS idx_local_tasks_owner_id ON local_tasks(owner_id);
CREATE INDEX IF NOT EXISTS idx_local_tasks_visibility ON local_tasks(visibility, visibility_scope_org_id);
CREATE INDEX IF NOT EXISTS idx_scheduled_tasks_owner_id ON scheduled_tasks(owner_id);
CREATE INDEX IF NOT EXISTS idx_jobs_owner_id ON jobs(owner_id);
CREATE INDEX IF NOT EXISTS idx_jobs_visibility ON jobs(visibility, visibility_scope_org_id);
CREATE TABLE IF NOT EXISTS user_gitea_orgs (
user_id TEXT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
org_id TEXT NOT NULL,
org_name TEXT NOT NULL,
fetched_at TEXT NOT NULL DEFAULT (datetime('now')),
PRIMARY KEY (user_id, org_id)
);
CREATE INDEX IF NOT EXISTS idx_user_gitea_orgs_org_id ON user_gitea_orgs(org_id);
`);
// Browser session persistence (2026-05) — keep in sync with schema.sql
this.db.exec(`
CREATE TABLE IF NOT EXISTS user_deks (
user_id TEXT PRIMARY KEY REFERENCES users(id) ON DELETE CASCADE,
encrypted_dek BLOB NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS browser_session_profiles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
owner_id TEXT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
label TEXT NOT NULL,
start_url TEXT NOT NULL,
match_patterns TEXT NOT NULL DEFAULT '[]',
storage_origins TEXT NOT NULL DEFAULT '[]',
logged_in_selector TEXT,
login_url_patterns TEXT NOT NULL DEFAULT '[]',
encrypted_state_blob BLOB,
state_version INTEGER NOT NULL DEFAULT 0,
playwright_version TEXT,
status TEXT NOT NULL DEFAULT 'pending' CHECK (status IN ('pending','active','expired','revoked','error')),
last_saved_at TEXT,
last_used_at TEXT,
last_validated_at TEXT,
last_error TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_bsp_owner ON browser_session_profiles(owner_id);
-- audit log: intentionally no FK — must survive deletion of referenced rows
CREATE TABLE IF NOT EXISTS browser_session_audit (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts TEXT NOT NULL DEFAULT (datetime('now')),
actor_user_id TEXT,
profile_id INTEGER,
owner_id TEXT,
action TEXT NOT NULL CHECK (action IN ('create','save','decrypt','use','delete','expire','revoke','test','login_start','login_cancel')),
task_id INTEGER,
job_id TEXT,
result TEXT NOT NULL CHECK (result IN ('success','error')),
reason TEXT
);
CREATE INDEX IF NOT EXISTS idx_bsa_profile ON browser_session_audit(profile_id);
CREATE INDEX IF NOT EXISTS idx_bsa_actor ON browser_session_audit(actor_user_id);
`);
this.ensureColumn('local_tasks', 'browser_session_profile_id', 'INTEGER REFERENCES browser_session_profiles(id) ON DELETE SET NULL');
this.ensureColumn('scheduled_tasks', 'browser_session_profile_id', 'INTEGER REFERENCES browser_session_profiles(id) ON DELETE SET NULL');
this.ensureColumn('jobs', 'browser_session_profile_id', 'INTEGER REFERENCES browser_session_profiles(id) ON DELETE SET NULL');
// E: scheduled_tasks can now run a user script directly (without going
// through the agent / LLM loop). task_kind='agent' (default) keeps the
// pre-existing behavior; task_kind='script' uses script_name + script_params.
this.ensureColumn('scheduled_tasks', 'task_kind', "TEXT NOT NULL DEFAULT 'agent' CHECK (task_kind IN ('agent','script'))");
this.ensureColumn('scheduled_tasks', 'script_name', 'TEXT');
this.ensureColumn('scheduled_tasks', 'script_params', 'TEXT'); // JSON-encoded object or NULL
// F: reflection jobs — task_kind distinguishes agent jobs from reflection
// jobs that run the self-improving-memory pipeline (no LLM task loop).
// payload carries JSON inputs (scope, trigger metadata, etc.).
this.ensureColumn('jobs', 'task_kind', "TEXT NOT NULL DEFAULT 'agent'");
this.ensureColumn('jobs', 'payload', 'TEXT');
// G: reflection piece-edit cooldown tracking.
// reflection_piece_edits records each time the reflection pipeline writes
// a user's custom piece. The cooldown gate in piece-writer.ts queries
// countRecentPieceEdits to rate-limit piece rewrites.
this.db.exec(`
CREATE TABLE IF NOT EXISTS reflection_piece_edits (
user_id TEXT NOT NULL,
piece_name TEXT NOT NULL,
snapshot_id TEXT NOT NULL,
created_at INTEGER NOT NULL,
PRIMARY KEY (user_id, piece_name, created_at)
);
CREATE INDEX IF NOT EXISTS idx_rpe_user_piece_time
ON reflection_piece_edits (user_id, piece_name, created_at DESC);
`);
// Phase B (node-status widget, 2026-05): per-widget kind discriminator.
// 'markdown' (default) preserves the original Markdown-only behavior;
// 'node-status' renders BackendStatusRegistry data live. Existing rows
// default to 'markdown' so direct-mode deployments stay unchanged.
this.ensureColumn('user_dashboard_widgets', 'kind', "TEXT NOT NULL DEFAULT 'markdown'");
// H: reflection_metrics — one row per reflection job, records outcome,
// token usage, memory changes, and whether a piece edit was applied.
// Used by the /api/reflection/metrics endpoint (Phase 7.2) and future
// per-user budget enforcement (Phase 8.2).
this.db.exec(`
CREATE TABLE IF NOT EXISTS reflection_metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
reflection_job_id TEXT NOT NULL,
original_job_id TEXT,
user_id TEXT NOT NULL,
piece_name TEXT,
outcome TEXT NOT NULL,
memory_changes INTEGER NOT NULL DEFAULT 0,
piece_edited INTEGER NOT NULL DEFAULT 0,
tokens_in INTEGER NOT NULL DEFAULT 0,
tokens_out INTEGER NOT NULL DEFAULT 0,
duration_ms INTEGER NOT NULL DEFAULT 0,
created_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_rm_user_time
ON reflection_metrics (user_id, created_at DESC);
`);
// I: AAO Gateway Phase 2a — DB-backed virtual keys.
// Mirrors src/db/schema.sql and migrateGatewayVirtualKeys in migrate.ts;
// all three paths must stay in sync (project_db_migration_dual_path).
this.db.exec(`
CREATE TABLE IF NOT EXISTS gateway_virtual_keys (
id TEXT PRIMARY KEY,
key_hash TEXT NOT NULL UNIQUE,
key_prefix TEXT NOT NULL,
team TEXT NOT NULL,
allowed_models TEXT,
source TEXT NOT NULL DEFAULT 'admin' CHECK (source IN ('admin','config-import')),
created_at TEXT NOT NULL,
created_by TEXT,
revoked_at TEXT,
revoked_by TEXT,
last_used_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_gateway_keys_hash_active
ON gateway_virtual_keys (key_hash)
WHERE revoked_at IS NULL;
CREATE INDEX IF NOT EXISTS idx_gateway_keys_team
ON gateway_virtual_keys (team);
`);
// I.b: AAO Gateway Phase 2b — budget / rate limit columns +
// gateway_key_usage table. Mirrors schema.sql + migrate.ts and uses
// the same PRAGMA-based idempotency pattern.
this.ensureColumn('gateway_virtual_keys', 'tokens_budget', 'INTEGER');
this.ensureColumn('gateway_virtual_keys', 'rate_limit_rpm', 'INTEGER');
this.db.exec(`
CREATE TABLE IF NOT EXISTS gateway_key_usage (
key_id TEXT NOT NULL REFERENCES gateway_virtual_keys(id) ON DELETE CASCADE,
period_start TEXT NOT NULL,
tokens_in INTEGER NOT NULL DEFAULT 0,
tokens_out INTEGER NOT NULL DEFAULT 0,
requests INTEGER NOT NULL DEFAULT 0,
last_updated_at TEXT NOT NULL,
PRIMARY KEY (key_id, period_start)
);
CREATE INDEX IF NOT EXISTS idx_gateway_usage_key
ON gateway_key_usage (key_id);
`);
// Per-user daily LLM usage (gateway + direct). Mirrors schema.sql +
// migrate.ts (dual-path rule). Separate lens from gateway_key_usage.
// Spec: docs/superpowers/specs/2026-06-11-llm-usage-aggregation-design.md
this.db.exec(`
CREATE TABLE IF NOT EXISTS llm_usage_daily (
day TEXT NOT NULL,
user_id TEXT NOT NULL,
source TEXT NOT NULL,
model TEXT NOT NULL,
route TEXT NOT NULL,
tokens_in INTEGER NOT NULL DEFAULT 0,
tokens_out INTEGER NOT NULL DEFAULT 0,
requests INTEGER NOT NULL DEFAULT 0,
last_updated_at TEXT NOT NULL,
PRIMARY KEY (day, user_id, source, model, route)
);
CREATE INDEX IF NOT EXISTS idx_llm_usage_daily_user_day
ON llm_usage_daily (user_id, day);
`);
// Usage dashboard v2: hour-grain ledger (supersedes llm_usage_daily as the
// write target; daily kept as frozen archive). Mirrors schema.sql +
// migrate.ts (dual-path). Backfill of the daily archive lives in migrate.ts
// so it runs once on upgrade, not on every fresh initSchema.
// Spec: docs/superpowers/specs/2026-06-11-usage-dashboard-v2-design.md
this.db.exec(`
CREATE TABLE IF NOT EXISTS llm_usage_hourly (
hour TEXT NOT NULL,
user_id TEXT NOT NULL,
source TEXT NOT NULL,
model TEXT NOT NULL,
route TEXT NOT NULL,
tokens_in INTEGER NOT NULL DEFAULT 0,
tokens_out INTEGER NOT NULL DEFAULT 0,
requests INTEGER NOT NULL DEFAULT 0,
last_updated_at TEXT NOT NULL,
PRIMARY KEY (hour, user_id, source, model, route)
);
CREATE INDEX IF NOT EXISTS idx_llm_usage_hourly_user_hour
ON llm_usage_hourly (user_id, hour);
`);
}
private ensureColumn(tableName: string, columnName: string, definition: string): void {
const columns = this.db.prepare(`PRAGMA table_info(${tableName})`).all() as Array<{ name: string }>;
if (columns.some((column) => column.name === columnName)) {
return;
}
this.db.prepare(`ALTER TABLE ${tableName} ADD COLUMN ${columnName} ${definition}`).run();
}
private migrateWaitingSubtasksStatus(): void {
// Check if jobs table already has waiting_subtasks in its CHECK constraint
const tableInfo = this.db.prepare(
"SELECT sql FROM sqlite_master WHERE type='table' AND name='jobs'"
).get() as { sql: string } | undefined;
if (!tableInfo || tableInfo.sql.includes('waiting_subtasks')) return;
// Recreate jobs table with updated CHECK constraint
logger.info('Repository: migrating jobs table to support waiting_subtasks status...');
this.db.transaction(() => {
this.db.exec(`
CREATE TABLE IF NOT EXISTS jobs_v2 (
id TEXT PRIMARY KEY,
repo TEXT NOT NULL,
issue_number INTEGER NOT NULL,
pr_number INTEGER,
status TEXT NOT NULL DEFAULT 'queued'
CHECK (status IN ('queued','dispatching','running','succeeded','failed','retry','cancelled','waiting_human','waiting_subtasks')),
piece_name TEXT NOT NULL DEFAULT 'general',
required_profile TEXT NOT NULL DEFAULT 'auto',
task_class TEXT NOT NULL DEFAULT 'auto',
current_movement TEXT,
instruction TEXT NOT NULL DEFAULT '',
branch_name TEXT,
worktree_path TEXT,
attempt INTEGER NOT NULL DEFAULT 1,
max_attempts INTEGER NOT NULL DEFAULT 3,
next_retry_at TEXT,
error_summary TEXT,
resume_movement TEXT,
ask_count INTEGER NOT NULL DEFAULT 0,
worker_id TEXT,
parent_job_id TEXT,
subtask_depth INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
INSERT INTO jobs_v2
SELECT id, repo, issue_number, pr_number, status, piece_name, required_profile, task_class,
current_movement, instruction, branch_name, worktree_path, attempt, max_attempts,
next_retry_at, error_summary, resume_movement, ask_count, worker_id,
NULL AS parent_job_id, 0 AS subtask_depth, created_at, updated_at
FROM jobs;
DROP TABLE jobs;
ALTER TABLE jobs_v2 RENAME TO jobs;
`);
})();
logger.info('Repository: jobs table migration complete');
}
async createJob(params: CreateJobParams): Promise<Job> {
const id = randomUUID();
const now = new Date().toISOString();
const pieceName = params.pieceName ?? 'chat';
const maxAttempts = params.maxAttempts ?? 3;
const resumeMovement = params.resumeMovement ?? null;
const askCount = params.askCount ?? 0;
const requiredRole = deriveJobRole(params.instruction, params.role ?? params.profile);
this.db
.prepare(
`INSERT INTO jobs (id, repo, issue_number, status, piece_name, required_profile, task_class, instruction, attempt, max_attempts, resume_movement, ask_count, worker_id, parent_job_id, continued_from_job_id, subtask_depth, owner_id, visibility, visibility_scope_org_id, browser_session_profile_id, task_kind, payload, created_at, updated_at)
VALUES (@id, @repo, @issueNumber, 'queued', @pieceName, @requiredRole, 'auto', @instruction, 1, @maxAttempts, @resumeMovement, @askCount, NULL, @parentJobId, @continuedFromJobId, @subtaskDepth, @ownerId, @visibility, @visibilityScopeOrgId, @browserSessionProfileId, @taskKind, @payload, @now, @now)`
)
.run({
id,
repo: params.repo,
issueNumber: params.issueNumber,
pieceName,
instruction: params.instruction,
maxAttempts,
resumeMovement,
askCount,
requiredRole,
parentJobId: params.parentJobId ?? null,
continuedFromJobId: params.continuedFromJobId ?? null,
subtaskDepth: params.subtaskDepth ?? 0,
ownerId: params.ownerId ?? null,
visibility: params.visibility ?? 'private',
visibilityScopeOrgId: params.visibilityScopeOrgId ?? null,
browserSessionProfileId: params.browserSessionProfileId ?? null,
taskKind: params.taskKind ?? 'agent',
payload: params.payload ?? null,
now,
});
const job = this.getJobSync(id);
if (!job) throw new Error(`createJob: failed to retrieve created job ${id}`);
return job;
}
async getJob(id: string, opts?: { viewer?: Express.User }): Promise<Job | null> {
const viewerClause = opts?.viewer
? buildVisibilityWhere(opts.viewer, 'j')
: { clause: '1=1', params: [] as unknown[] };
const row = this.db
.prepare(`SELECT j.* FROM jobs j WHERE j.id = ? AND ${viewerClause.clause}`)
.get(id, ...viewerClause.params) as JobRow | undefined;
return row ? rowToJob(row) : null;
}
async createLocalTask(params: CreateLocalTaskParams): Promise<LocalTask> {
const result = this.db
.prepare(
`INSERT INTO local_tasks (title, title_source, body, piece_name, profile, output_format, ask_policy, priority, workspace_path, owner_id, visibility, visibility_scope_org_id, browser_session_profile_id, options)
VALUES (@title, @titleSource, @body, @pieceName, @profile, @outputFormat, @askPolicy, @priority, @workspacePath, @ownerId, @visibility, @visibilityScopeOrgId, @browserSessionProfileId, @options)`
)
.run({
title: params.title,
titleSource: params.titleSource ?? 'auto',
body: params.body,
pieceName: params.pieceName ?? 'chat',
profile: params.profile ?? 'auto',
outputFormat: params.outputFormat ?? 'markdown',
askPolicy: params.askPolicy ?? 'low',
priority: params.priority ?? 'medium',
workspacePath: params.workspacePath ?? null,
ownerId: params.ownerId ?? null,
visibility: params.visibility ?? 'private',
visibilityScopeOrgId: params.visibilityScopeOrgId ?? null,
browserSessionProfileId: params.browserSessionProfileId ?? null,
options: JSON.stringify(params.options ?? {}),
});
const task = await this.getLocalTask(Number(result.lastInsertRowid));
if (!task) throw new Error('createLocalTask: failed to load inserted task');
return task;
}
/** サブジョブ一覧を SubtaskInfo[] に変換。waiting_subtasks の子は再帰的に children を取得する */
private async buildSubtaskInfos(subJobs: Job[], maxDepth: number = 3): Promise<SubtaskInfo[]> {
return Promise.all(subJobs.map(async (j): Promise<SubtaskInfo> => {
const info: SubtaskInfo = {
id: j.id,
issueNumber: j.issueNumber,
status: j.status,
instruction: j.instruction,
worktreePath: j.worktreePath,
createdAt: j.createdAt,
updatedAt: j.updatedAt,
};
// 再帰: waiting_subtasks のサブタスクは孫タスク情報も取得
if (j.status === 'waiting_subtasks' && maxDepth > 0) {
const grandChildren = await this.getSubJobs(j.id);
if (grandChildren.length > 0) {
info.children = await this.buildSubtaskInfos(grandChildren, maxDepth - 1);
info.childCount = grandChildren.length;
info.childCompleted = grandChildren.filter(g =>
['succeeded', 'failed', 'cancelled'].includes(g.status)
).length;
}
}
return info;
}));
}
async getLocalTask(taskId: number, opts?: { viewer?: Express.User }): Promise<LocalTask | null> {
const viewerClause = opts?.viewer
? buildVisibilityWhere(opts.viewer, 'lt')
: { clause: '1=1', params: [] as unknown[] };
const row = this.db
.prepare(`
SELECT lt.*,
${LOCAL_TASK_DISPLAY_SELECT}
FROM local_tasks lt
${LOCAL_TASK_DISPLAY_JOIN}
WHERE lt.id = ? AND ${viewerClause.clause}
`)
.get(taskId, ...viewerClause.params) as LocalTaskRow | undefined;
if (!row) return null;
const task = rowToLocalTask(row);
task.latestJob = await this.getLatestJobForIssue(localTaskRepoName(taskId), taskId);
// サブタスク情報を付与
if (task.latestJob) {
const subJobs = await this.getSubJobs(task.latestJob.id);
if (subJobs.length > 0) {
task.subtasks = await this.buildSubtaskInfos(subJobs);
task.subtaskCount = subJobs.length;
task.subtaskCompleted = subJobs.filter(j =>
['succeeded', 'failed', 'cancelled'].includes(j.status)
).length;
}
}
return task;
}
async shareLocalTask(taskId: number): Promise<string> {
const existing = await this.getLocalTask(taskId);
if (!existing) throw new Error(`Task ${taskId} not found`);
if (existing.shareToken) return existing.shareToken;
const token = randomUUID();
this.db.prepare(
`UPDATE local_tasks SET share_token = ?, shared_at = datetime('now'), updated_at = datetime('now') WHERE id = ?`
).run(token, taskId);
return token;
}
async unshareLocalTask(taskId: number): Promise<void> {
this.db.prepare(
`UPDATE local_tasks SET share_token = NULL, shared_at = NULL, updated_at = datetime('now') WHERE id = ?`
).run(taskId);
}
/**
* Mission Brief: partial-replace update. Only fields explicitly provided
* are written; undefined leaves the field untouched. Passing all-empty
* strings is treated as "clear the brief" (NULL in storage).
*
* Returns the merged brief so callers can echo it back to the client
* without an extra read.
*/
async updateMissionBrief(
taskId: number,
patch: Partial<MissionBrief>,
): Promise<MissionBrief | null> {
return this.updateMissionBriefSync(taskId, patch);
}
/** Sync variant used by the engine's MissionBriefIO so it can be called
* from sync paths (e.g. buildSystemPrompt). better-sqlite3 is sync
* underneath anyway. */
updateMissionBriefSync(taskId: number, patch: Partial<MissionBrief>): MissionBrief | null {
const row = this.db
.prepare(`SELECT mission_brief, title_source FROM local_tasks WHERE id = ?`)
.get(taskId) as { mission_brief: string | null; title_source: string | null } | undefined;
const existing = parseMissionBrief(row?.mission_brief ?? null);
const next: MissionBrief = {
goal: patch.goal !== undefined ? patch.goal : existing?.goal ?? '',
done: patch.done !== undefined ? patch.done : existing?.done ?? '',
open: patch.open !== undefined ? patch.open : existing?.open ?? '',
clarifications: patch.clarifications !== undefined ? patch.clarifications : existing?.clarifications ?? '',
};
const allEmpty = !next.goal && !next.done && !next.open && !next.clarifications;
const stored = allEmpty ? null : JSON.stringify(next);
// Derive the task title from the agent's goal (no LLM call). Only when the
// goal value actually changed (agents re-send an unchanged brief across
// iterations — re-deriving every time would churn updated_at and flicker
// the title) and the user hasn't taken manual control (a user edit pins
// title_source='user' and is never overwritten).
const goalChanged = patch.goal !== undefined && patch.goal !== (existing?.goal ?? '');
const derivedTitle = (goalChanged && (row?.title_source ?? 'auto') !== 'user')
? buildTitleFromGoal(next.goal)
: '';
// Atomic: persist the brief and the derived title as one unit so a crash
// between them can't leave the title out of sync with the goal.
this.db.transaction(() => {
this.db.prepare(
`UPDATE local_tasks SET mission_brief = ?, updated_at = datetime('now') WHERE id = ?`
).run(stored, taskId);
if (derivedTitle) {
this.db.prepare(
`UPDATE local_tasks SET title = ?, title_source = 'agent' WHERE id = ?`
).run(derivedTitle, taskId);
}
})();
return allEmpty ? null : next;
}
/** Sync read of the mission brief column. Used by MissionBriefIO.read() */
getMissionBriefSync(taskId: number): MissionBrief | null {
const row = this.db.prepare(`SELECT mission_brief FROM local_tasks WHERE id = ?`).get(taskId) as { mission_brief: string | null } | undefined;
return parseMissionBrief(row?.mission_brief ?? null);
}
/**
* Construct a MissionBriefIO bound to a specific local task. The engine
* uses this to thread mission brief read/write into the ToolContext
* without leaking the repository instance into tool code.
*/
makeMissionBriefIO(taskId: number): import('../engine/tools/core.js').MissionBriefIO {
return {
read: () => this.getMissionBriefSync(taskId),
update: (patch) => this.updateMissionBriefSync(taskId, patch),
};
}
async getLocalTaskByShareToken(token: string): Promise<LocalTask | null> {
const row = this.db
.prepare(`
SELECT lt.*,
${LOCAL_TASK_DISPLAY_SELECT}
FROM local_tasks lt
${LOCAL_TASK_DISPLAY_JOIN}
WHERE lt.share_token = ?
`)
.get(token) as LocalTaskRow | undefined;
if (!row) return null;
const task = rowToLocalTask(row);
task.latestJob = await this.getLatestJobForIssue(localTaskRepoName(task.id), task.id);
if (task.latestJob) {
const subJobs = await this.getSubJobs(task.latestJob.id);
if (subJobs.length > 0) {
task.subtasks = await this.buildSubtaskInfos(subJobs);
task.subtaskCount = subJobs.length;
task.subtaskCompleted = subJobs.filter(j =>
['succeeded', 'failed', 'cancelled'].includes(j.status)
).length;
}
}
return task;
}
async listLocalTasks(filter?: { ownerId?: string; viewer?: Express.User }): Promise<LocalTask[]> {
// 1. Single JOIN query: local_tasks LEFT JOIN jobs (latest per task via correlated subquery)
const conditions: string[] = [];
const queryParams: unknown[] = [];
if (filter?.ownerId) {
conditions.push('lt.owner_id = ?');
queryParams.push(filter.ownerId);
}
if (filter?.viewer) {
const w = buildVisibilityWhere(filter.viewer, 'lt');
conditions.push(w.clause);
queryParams.push(...w.params);
}
const whereClause = conditions.length > 0 ? `WHERE ${conditions.join(' AND ')}` : '';
const joinedRows = this.db
.prepare(`
SELECT
lt.*,
${LOCAL_TASK_DISPLAY_SELECT},
j.id AS job_id,
j.repo AS job_repo,
j.issue_number AS job_issue_number,
j.pr_number AS job_pr_number,
j.status AS job_status,
j.piece_name AS job_piece_name,
j.required_profile AS job_required_profile,
j.task_class AS job_task_class,
j.current_movement AS job_current_movement,
j.current_activity AS job_current_activity,
j.instruction AS job_instruction,
j.branch_name AS job_branch_name,
j.worktree_path AS job_worktree_path,
j.attempt AS job_attempt,
j.max_attempts AS job_max_attempts,
j.next_retry_at AS job_next_retry_at,
j.error_summary AS job_error_summary,
j.abort_reason AS job_abort_reason,
j.resume_movement AS job_resume_movement,
j.wait_reason AS job_wait_reason,
j.ask_count AS job_ask_count,
j.worker_id AS job_worker_id,
j.last_backend_id AS job_last_backend_id,
j.parent_job_id AS job_parent_job_id,
j.continued_from_job_id AS job_continued_from_job_id,
j.subtask_depth AS job_subtask_depth,
j.owner_id AS job_owner_id,
j.visibility AS job_visibility,
j.visibility_scope_org_id AS job_visibility_scope_org_id,
j.created_at AS job_created_at,
j.updated_at AS job_updated_at,
j.context_prompt_tokens AS job_context_prompt_tokens,
j.context_limit_tokens AS job_context_limit_tokens,
j.context_updated_at AS job_context_updated_at,
j.browser_session_profile_id AS job_browser_session_profile_id,
j.task_kind AS job_task_kind,
j.payload AS job_payload
FROM local_tasks lt
${LOCAL_TASK_DISPLAY_JOIN}
LEFT JOIN jobs j ON j.id = (
SELECT j2.id FROM jobs j2
WHERE j2.repo = 'local/task-' || lt.id
AND j2.issue_number = lt.id
ORDER BY j2.created_at DESC, j2.rowid DESC
LIMIT 1
)
${whereClause}
ORDER BY lt.updated_at DESC, lt.id DESC
`)
.all(...queryParams) as Array<LocalTaskRow & {
job_id: string | null;
job_repo: string | null;
job_issue_number: number | null;
job_pr_number: number | null;
job_status: string | null;
job_piece_name: string | null;
job_required_profile: string | null;
job_task_class: string | null;
job_current_movement: string | null;
job_current_activity: string | null;
job_instruction: string | null;
job_branch_name: string | null;
job_worktree_path: string | null;
job_attempt: number | null;
job_max_attempts: number | null;
job_next_retry_at: string | null;
job_error_summary: string | null;
job_abort_reason: string | null;
job_resume_movement: string | null;
job_wait_reason: string | null;
job_ask_count: number | null;
job_worker_id: string | null;
job_last_backend_id: string | null;
job_parent_job_id: string | null;
job_continued_from_job_id: string | null;
job_subtask_depth: number | null;
job_owner_id: string | null;
job_visibility: string | null;
job_visibility_scope_org_id: string | null;
job_created_at: string | null;
job_updated_at: string | null;
job_context_prompt_tokens: number | null;
job_context_limit_tokens: number | null;
job_context_updated_at: string | null;
job_browser_session_profile_id: number | null;
job_task_kind: string | null;
job_payload: string | null;
}>;
// Build tasks with latestJob from joined data
const tasks: LocalTask[] = [];
const jobIds: string[] = [];
for (const row of joinedRows) {
const task = rowToLocalTask(row);
if (row.job_id) {
task.latestJob = rowToJob({
id: row.job_id,
repo: row.job_repo!,
issue_number: row.job_issue_number!,
pr_number: row.job_pr_number ?? null,
status: row.job_status!,
piece_name: row.job_piece_name!,
current_movement: row.job_current_movement ?? null,
current_activity: row.job_current_activity ?? null,
instruction: row.job_instruction!,
branch_name: row.job_branch_name ?? null,
worktree_path: row.job_worktree_path ?? null,
attempt: row.job_attempt!,
max_attempts: row.job_max_attempts!,
next_retry_at: row.job_next_retry_at ?? null,
error_summary: row.job_error_summary ?? null,
abort_reason: row.job_abort_reason ?? null,
resume_movement: row.job_resume_movement ?? null,
wait_reason: row.job_wait_reason ?? null,
ask_count: row.job_ask_count!,
worker_id: row.job_worker_id ?? null,
last_backend_id: row.job_last_backend_id ?? null,
parent_job_id: row.job_parent_job_id ?? null,
continued_from_job_id: row.job_continued_from_job_id ?? null,
subtask_depth: row.job_subtask_depth ?? 0,
required_profile: row.job_required_profile!,
task_class: row.job_task_class!,
owner_id: row.job_owner_id ?? null,
visibility: row.job_visibility ?? null,
visibility_scope_org_id: row.job_visibility_scope_org_id ?? null,
context_prompt_tokens: row.job_context_prompt_tokens ?? null,
context_limit_tokens: row.job_context_limit_tokens ?? null,
context_updated_at: row.job_context_updated_at ?? null,
browser_session_profile_id: row.job_browser_session_profile_id ?? null,
task_kind: row.job_task_kind ?? 'agent',
payload: row.job_payload ?? null,
created_at: row.job_created_at!,
updated_at: row.job_updated_at!,
});
jobIds.push(row.job_id);
} else {
task.latestJob = null;
}
tasks.push(task);
}
// 2. Single query for all sub-jobs
if (jobIds.length > 0) {
const placeholders = jobIds.map(() => '?').join(', ');
const subJobRows = this.db
.prepare(`
SELECT * FROM (
SELECT j.*, ROW_NUMBER() OVER (
PARTITION BY j.parent_job_id, j.issue_number
ORDER BY j.created_at DESC, j.rowid DESC
) AS rn
FROM jobs j
WHERE j.parent_job_id IN (${placeholders})
) WHERE rn = 1
ORDER BY parent_job_id, issue_number ASC
`)
.all(...jobIds) as JobRow[];
// Group sub-jobs by parent_job_id
const subJobsByParent = new Map<string, Job[]>();
for (const row of subJobRows) {
const parentId = row.parent_job_id!;
if (!subJobsByParent.has(parentId)) {
subJobsByParent.set(parentId, []);
}
subJobsByParent.get(parentId)!.push(rowToJob(row));
}
// 3. Attach subtask info to tasks
for (const task of tasks) {
if (task.latestJob && subJobsByParent.has(task.latestJob.id)) {
const subJobs = subJobsByParent.get(task.latestJob.id)!;
task.subtasks = await this.buildSubtaskInfos(subJobs);
task.subtaskCount = subJobs.length;
task.subtaskCompleted = subJobs.filter(j =>
['succeeded', 'failed', 'cancelled'].includes(j.status)
).length;
}
}
}
return tasks;
}
async updateLocalTask(taskId: number, updates: Partial<Omit<LocalTask, 'id' | 'createdAt' | 'latestJob'>>): Promise<void> {
const setClauses: string[] = ["updated_at = datetime('now')"];
const params: Record<string, unknown> = { taskId };
const fieldMap: Record<string, string> = {
title: 'title',
titleSource: 'title_source',
body: 'body',
pieceName: 'piece_name',
profile: 'profile',
outputFormat: 'output_format',
askPolicy: 'ask_policy',
priority: 'priority',
state: 'state',
workspacePath: 'workspace_path',
visibility: 'visibility',
visibilityScopeOrgId: 'visibility_scope_org_id',
};
for (const [jsKey, dbCol] of Object.entries(fieldMap)) {
const val = (updates as Record<string, unknown>)[jsKey];
if (val !== undefined) {
setClauses.push(`${dbCol} = @${jsKey}`);
params[jsKey] = val;
}
}
if (setClauses.length === 1) return;
this.db
.prepare(`UPDATE local_tasks SET ${setClauses.join(', ')} WHERE id = @taskId`)
.run(params);
}
async updateFeedback(taskId: number, feedback: {
rating: 'good' | 'bad';
tags: string[];
comment: string | null;
}): Promise<void> {
this.db
.prepare(`
UPDATE local_tasks
SET feedback_rating = @rating,
feedback_tags = @tags,
feedback_comment = @comment,
feedback_at = datetime('now'),
updated_at = datetime('now')
WHERE id = @taskId
`)
.run({
taskId,
rating: feedback.rating,
tags: JSON.stringify(feedback.tags),
comment: feedback.comment,
});
}
async addLocalTaskComment(taskId: number, author: string, body: string, kind: string = 'comment'): Promise<LocalTaskComment> {
const result = this.db
.prepare('INSERT INTO local_task_comments (task_id, author, kind, body) VALUES (?, ?, ?, ?)')
.run(taskId, author, kind, body);
this.db
.prepare("UPDATE local_tasks SET updated_at = datetime('now') WHERE id = ?")
.run(taskId);
const row = this.db
.prepare('SELECT * FROM local_task_comments WHERE id = ?')
.get(Number(result.lastInsertRowid)) as LocalTaskCommentRow | undefined;
if (!row) throw new Error('addLocalTaskComment: failed to load inserted comment');
return rowToLocalTaskComment(row);
}
async listLocalTaskComments(taskId: number): Promise<LocalTaskComment[]> {
const rows = this.db
.prepare('SELECT * FROM local_task_comments WHERE task_id = ? ORDER BY created_at ASC, id ASC')
.all(taskId) as LocalTaskCommentRow[];
return rows.map(rowToLocalTaskComment);
}
async getUninjectedComments(taskId: number, sinceId: number = 0): Promise<LocalTaskComment[]> {
const rows = this.db
.prepare(
`SELECT * FROM local_task_comments
WHERE task_id = ? AND id > ? AND author = 'user' AND injected_at IS NULL
ORDER BY id ASC`
)
.all(taskId, sinceId) as LocalTaskCommentRow[];
return rows.map(rowToLocalTaskComment);
}
markCommentsInjected(commentIds: number[]): void {
if (commentIds.length === 0) return;
const placeholders = commentIds.map(() => '?').join(',');
this.db
.prepare(`UPDATE local_task_comments SET injected_at = datetime('now') WHERE id IN (${placeholders})`)
.run(...commentIds);
}
/**
* Latest agent-authored "result" or "ask" comment for a task. Used by the
* piece handoff feature to surface the previous job's terminal output as
* context to a continuation job's LLM. Returns null when none exist
* (e.g., task has not yet completed any job).
*/
async getLatestResultComment(taskId: number): Promise<{ body: string; kind: string; createdAt: string } | null> {
const row = this.db
.prepare(
`SELECT body, kind, created_at
FROM local_task_comments
WHERE task_id = ? AND author = 'agent' AND kind IN ('result', 'ask')
ORDER BY created_at DESC
LIMIT 1`
)
.get(taskId) as { body: string; kind: string; created_at: string } | undefined;
return row ? { body: row.body, kind: row.kind, createdAt: row.created_at } : null;
}
private getJobSync(id: string): Job | null {
const row = this.db
.prepare('SELECT * FROM jobs WHERE id = ?')
.get(id) as JobRow | undefined;
return row ? rowToJob(row) : null;
}
/**
* ジョブの現在のステータスを同期的に取得する。
* キャンセルチェックなど、非同期が使えない箇所で利用する。
*/
getJobStatusSync(id: string): JobStatus | null {
const row = this.db
.prepare('SELECT status FROM jobs WHERE id = ?')
.get(id) as { status: string } | undefined;
return row ? (row.status as JobStatus) : null;
}
/**
* Returns true if `workerId` currently has at least one job with status='running'.
* Used by the Side Info Panel's worker status endpoint. Boolean-only on purpose:
* never expose the job id/title/owner to other users in the shared panel.
*/
isWorkerBusy(workerId: string): boolean {
const row = this.db
.prepare(`SELECT 1 AS hit FROM jobs WHERE worker_id = ? AND status = 'running' LIMIT 1`)
.get(workerId) as { hit: number } | undefined;
return !!row;
}
async updateJob(id: string, updates: Partial<Omit<Job, 'id' | 'createdAt'>>): Promise<void> {
const setClauses: string[] = ["updated_at = datetime('now')"];
const params: Record<string, unknown> = { id };
const fieldMap: Record<string, string> = {
status: 'status',
pieceName: 'piece_name',
currentMovement: 'current_movement',
currentActivity: 'current_activity',
instruction: 'instruction',
branchName: 'branch_name',
worktreePath: 'worktree_path',
prNumber: 'pr_number',
attempt: 'attempt',
maxAttempts: 'max_attempts',
nextRetryAt: 'next_retry_at',
errorSummary: 'error_summary',
abortReason: 'abort_reason',
resumeMovement: 'resume_movement',
waitReason: 'wait_reason',
askCount: 'ask_count',
workerId: 'worker_id',
lastBackendId: 'last_backend_id',
parentJobId: 'parent_job_id',
subtaskDepth: 'subtask_depth',
requiredRole: 'required_profile',
requiredProfile: 'required_profile',
};
for (const [jsKey, dbCol] of Object.entries(fieldMap)) {
const val = (updates as Record<string, unknown>)[jsKey];
if (val !== undefined) {
setClauses.push(`${dbCol} = @${jsKey}`);
params[jsKey] = val;
}
}
if (setClauses.length === 1) return; // updated_at のみ = 実質変更なし
this.db
.prepare(`UPDATE jobs SET ${setClauses.join(', ')} WHERE id = @id`)
.run(params);
}
/** ジョブの updated_at のみを更新ハートビート用。updateJob は変更フィールドなしだと早期リターンするため別メソッド */
touchJobUpdatedAt(id: string): void {
this.db.prepare("UPDATE jobs SET updated_at = datetime('now') WHERE id = ?").run(id);
}
/**
* Re-queue jobs parked with wait_reason='mcp_auth_required' for the given owner.
* Worker re-evaluates required_mcp on next pickup and will pause again if other servers
* are still unauthorized. _serverId is accepted for API symmetry but not used at SQL time
* (filtering by piece's required_mcp happens at the worker side).
*
* Returns the number of jobs actually re-queued.
*/
resumeMcpWaitingJobs(ownerId: string, _serverId: string): number {
const result = this.db
.prepare(
`UPDATE jobs
SET status='queued', wait_reason=NULL, updated_at=datetime('now')
WHERE status='waiting_human' AND wait_reason='mcp_auth_required'
AND owner_id = ?`,
)
.run(ownerId);
return result.changes;
}
async lockIssue(repo: string, issueNumber: number, jobId: string): Promise<boolean> {
try {
this.db
.prepare('INSERT INTO issue_locks (repo, issue_number, job_id) VALUES (?, ?, ?)')
.run(repo, issueNumber, jobId);
return true;
} catch {
return false;
}
}
async unlockIssue(repo: string, issueNumber: number): Promise<void> {
this.db
.prepare('DELETE FROM issue_locks WHERE repo = ? AND issue_number = ?')
.run(repo, issueNumber);
}
async deleteJobsForIssue(repo: string, issueNumber: number): Promise<number> {
const result = this.db
.prepare('DELETE FROM jobs WHERE repo = ? AND issue_number = ?')
.run(repo, issueNumber);
this.db
.prepare('DELETE FROM issue_locks WHERE repo = ? AND issue_number = ?')
.run(repo, issueNumber);
return result.changes;
}
async addAuditLog(jobId: string | null, action: string, actor: string, detail: object): Promise<void> {
this.db
.prepare('INSERT INTO audit_log (job_id, action, actor, detail) VALUES (?, ?, ?, ?)')
.run(jobId, action, actor, JSON.stringify(detail));
}
async upsertWorkerNode(params: UpsertWorkerNodeParams): Promise<void> {
const now = new Date().toISOString();
this.db.prepare(`
INSERT INTO worker_nodes (
worker_id, endpoint, enabled, healthy, profile_tags, task_class_tags, available_models,
inflight_jobs, max_concurrency, last_error, last_seen_at, updated_at
) VALUES (
@workerId, @endpoint, @enabled, @healthy, @roleTags, @roleTags, @availableModels,
@inflightJobs, @maxConcurrency, @lastError, @now, @now
)
ON CONFLICT(worker_id) DO UPDATE SET
endpoint = excluded.endpoint,
enabled = excluded.enabled,
healthy = excluded.healthy,
profile_tags = excluded.profile_tags,
task_class_tags = excluded.task_class_tags,
available_models = excluded.available_models,
inflight_jobs = excluded.inflight_jobs,
max_concurrency = excluded.max_concurrency,
last_error = excluded.last_error,
last_seen_at = excluded.last_seen_at,
updated_at = excluded.updated_at
`).run({
workerId: params.workerId,
endpoint: params.endpoint,
enabled: params.enabled ? 1 : 0,
healthy: params.healthy ? 1 : 0,
roleTags: encodeTags(params.roles),
availableModels: JSON.stringify(params.availableModels ?? []),
inflightJobs: params.inflightJobs ?? 0,
maxConcurrency: params.maxConcurrency ?? 1,
lastError: params.lastError ?? null,
now,
});
}
async updateWorkerNodeHealth(
workerId: string,
updates: { healthy: boolean; lastError?: string | null; inflightJobs?: number; availableModels?: string[]; enabled?: boolean },
): Promise<void> {
const setClauses = [
'healthy = @healthy',
'last_error = @lastError',
"last_seen_at = @now",
"updated_at = @now",
];
const params: Record<string, unknown> = {
workerId,
healthy: updates.healthy ? 1 : 0,
lastError: updates.lastError ?? null,
now: new Date().toISOString(),
};
if (updates.inflightJobs !== undefined) {
setClauses.push('inflight_jobs = @inflightJobs');
params['inflightJobs'] = updates.inflightJobs;
}
if (updates.availableModels !== undefined) {
setClauses.push('available_models = @availableModels');
params['availableModels'] = JSON.stringify(updates.availableModels);
}
if (updates.enabled !== undefined) {
setClauses.push('enabled = @enabled');
params['enabled'] = updates.enabled ? 1 : 0;
}
this.db.prepare(`UPDATE worker_nodes SET ${setClauses.join(', ')} WHERE worker_id = @workerId`).run(params);
}
async getWorkerNode(workerId: string): Promise<WorkerNode | null> {
const row = this.db
.prepare('SELECT * FROM worker_nodes WHERE worker_id = ?')
.get(workerId) as WorkerNodeRow | undefined;
return row ? rowToWorkerNode(row) : null;
}
async claimNextJob(workerId: string): Promise<Job | null> {
const row = this.db.prepare(`
UPDATE jobs
SET status = 'running', worker_id = ?, updated_at = datetime('now')
WHERE id = (
SELECT j.id
FROM jobs j
JOIN worker_nodes w ON w.worker_id = ?
WHERE j.status = 'queued'
AND w.enabled = 1
AND w.healthy = 1
AND instr(w.profile_tags, ',' || j.required_profile || ',') > 0
AND NOT EXISTS (
SELECT 1 FROM issue_locks il
WHERE il.repo = j.repo AND il.issue_number = j.issue_number
)
ORDER BY j.created_at ASC
LIMIT 1
)
RETURNING *
`).get(workerId, workerId) as JobRow | undefined;
return row ? rowToJob(row) : null;
}
/**
* リトライ待ちジョブの中から next_retry_at を過ぎたものを1件取得して running に遷移
*/
async claimNextRetryJob(workerId: string): Promise<Job | null> {
const row = this.db.prepare(`
UPDATE jobs
SET status = 'running', worker_id = ?, updated_at = datetime('now')
WHERE id = (
SELECT j.id
FROM jobs j
JOIN worker_nodes w ON w.worker_id = ?
WHERE j.status = 'retry'
AND replace(j.next_retry_at, 'T', ' ') <= datetime('now')
AND w.enabled = 1
AND w.healthy = 1
AND instr(w.profile_tags, ',' || j.required_profile || ',') > 0
AND NOT EXISTS (
SELECT 1 FROM issue_locks il
WHERE il.repo = j.repo AND il.issue_number = j.issue_number
)
ORDER BY j.next_retry_at ASC
LIMIT 1
)
RETURNING *
`).get(workerId, workerId) as JobRow | undefined;
return row ? rowToJob(row) : null;
}
/**
* Read-only peek at the next job this worker WOULD claim (retry-priority,
* then oldest queued), without claiming it. Used by the idle-preferring
* claim gate to learn the next job's role before deciding whether to defer
* to an idler sibling. Mirrors the claimNext*Job WHERE clauses exactly.
*/
async peekNextClaimable(workerId: string): Promise<Job | null> {
const retry = this.db.prepare(`
SELECT j.*
FROM jobs j
JOIN worker_nodes w ON w.worker_id = ?
WHERE j.status = 'retry'
AND replace(j.next_retry_at, 'T', ' ') <= datetime('now')
AND w.enabled = 1
AND w.healthy = 1
AND instr(w.profile_tags, ',' || j.required_profile || ',') > 0
AND NOT EXISTS (
SELECT 1 FROM issue_locks il
WHERE il.repo = j.repo AND il.issue_number = j.issue_number
)
ORDER BY j.next_retry_at ASC
LIMIT 1
`).get(workerId) as JobRow | undefined;
if (retry) return rowToJob(retry);
const queued = this.db.prepare(`
SELECT j.*
FROM jobs j
JOIN worker_nodes w ON w.worker_id = ?
WHERE j.status = 'queued'
AND w.enabled = 1
AND w.healthy = 1
AND instr(w.profile_tags, ',' || j.required_profile || ',') > 0
AND NOT EXISTS (
SELECT 1 FROM issue_locks il
WHERE il.repo = j.repo AND il.issue_number = j.issue_number
)
ORDER BY j.created_at ASC
LIMIT 1
`).get(workerId) as JobRow | undefined;
return queued ? rowToJob(queued) : null;
}
async getJobsByStatus(status: JobStatus): Promise<Job[]> {
const rows = this.db
.prepare('SELECT * FROM jobs WHERE status = ? ORDER BY created_at ASC')
.all(status) as JobRow[];
return rows.map(rowToJob);
}
async getLatestJobForIssue(repo: string, issueNumber: number): Promise<Job | null> {
const row = this.db
.prepare('SELECT * FROM jobs WHERE repo = ? AND issue_number = ? ORDER BY created_at DESC LIMIT 1')
.get(repo, issueNumber) as JobRow | undefined;
return row ? rowToJob(row) : null;
}
async updateJobContext(
jobId: string,
payload: { promptTokens: number; limitTokens: number }
): Promise<void> {
const updatedAt = new Date().toISOString();
this.db
.prepare(
`UPDATE jobs
SET context_prompt_tokens = ?,
context_limit_tokens = ?,
context_updated_at = ?
WHERE id = ?`
)
.run(payload.promptTokens, payload.limitTokens, updatedAt, jobId);
}
/** 起動時に孤立ジョブを回復 */
async recoverOrphanedJobs(): Promise<number> {
const result = this.db
.prepare("UPDATE jobs SET status = 'queued', worker_id = NULL, updated_at = datetime('now') WHERE status IN ('running', 'dispatching')")
.run();
if (result.changes > 0) {
this.db.prepare('DELETE FROM issue_locks').run();
logger.warn(`Repository: recovered ${result.changes} orphaned jobs, cleared issue locks`);
}
// waiting_subtasks のジョブで全サブジョブが完了済みのものを再キュー
// 同一 issue_number に複数ジョブがある場合、最新のみで判定する
const subtaskRecovery = this.db.prepare(`
UPDATE jobs
SET status = 'queued', updated_at = datetime('now')
WHERE status = 'waiting_subtasks'
AND (
SELECT COUNT(*) FROM (
SELECT s.status, ROW_NUMBER() OVER (
PARTITION BY s.issue_number
ORDER BY s.created_at DESC, s.rowid DESC
) AS rn
FROM jobs s
WHERE s.parent_job_id = jobs.id
) WHERE rn = 1
AND status NOT IN ('succeeded','failed','cancelled')
) = 0
`).run();
if (subtaskRecovery.changes > 0) {
logger.warn(`Repository: recovered ${subtaskRecovery.changes} waiting_subtasks jobs`);
}
return result.changes;
}
/**
* running/dispatching 状態のまま staleMinutes 以上 updated_at が更新されていないジョブを
* queued に戻す(ランタイム watchdog
*/
recoverStuckRunningJobs(staleMinutes: number): number {
const rows = this.db.prepare(`
UPDATE jobs
SET status = 'queued',
worker_id = NULL,
error_summary = 'Recovered: stuck in running for over ' || ? || ' minutes',
updated_at = datetime('now')
WHERE status IN ('running', 'dispatching')
AND updated_at < datetime('now', '-' || ? || ' minutes')
RETURNING id, repo
`).all(staleMinutes, staleMinutes) as Array<{ id: string; repo: string }>;
if (rows.length > 0) {
// issue ロックも解除
for (const row of rows) {
this.db.prepare('DELETE FROM issue_locks WHERE job_id = ?').run(row.id);
}
logger.warn(`Repository: recovered ${rows.length} stuck jobs (stale > ${staleMinutes}min): ${rows.map(r => r.repo).join(', ')}`);
}
return rows.length;
}
/** running/dispatching 状態のジョブを全て queued に戻すgraceful shutdown 用) */
requeueRunningJobs(): number {
const result = this.db
.prepare("UPDATE jobs SET status = 'queued', worker_id = NULL, updated_at = datetime('now') WHERE status IN ('running', 'dispatching')")
.run();
if (result.changes > 0) {
this.db.prepare('DELETE FROM issue_locks').run();
logger.warn(`Repository: requeued ${result.changes} running jobs, cleared issue locks`);
}
return result.changes;
}
getDistinctRepos(): string[] {
const rows = this.db.prepare('SELECT DISTINCT repo FROM jobs ORDER BY repo').all() as { repo: string }[];
return rows.map(r => r.repo);
}
getJobsByRepo(repoName: string): Job[] {
const rows = this.db.prepare('SELECT * FROM jobs WHERE repo = ? ORDER BY created_at DESC').all(repoName) as JobRow[];
return rows.map(rowToJob);
}
/** Issue ごとに最新の Job だけを返すカンバンUI用 */
getLatestJobsPerIssue(repoName: string): Job[] {
const rows = this.db.prepare(`
SELECT j.*
FROM jobs j
WHERE j.repo = ?
AND j.id = (
SELECT j2.id
FROM jobs j2
WHERE j2.repo = j.repo
AND j2.issue_number = j.issue_number
ORDER BY j2.updated_at DESC, j2.created_at DESC, j2.rowid DESC
LIMIT 1
)
ORDER BY j.updated_at DESC
`).all(repoName) as JobRow[];
return rows.map(rowToJob);
}
// Cascade a local_task visibility change to all jobs spawned for that task
// and their recursive subtask descendants (repo='subtask/<parentJobId>').
// Returns the number of job rows updated.
async updateJobsVisibilityForTask(
taskId: number,
updates: { visibility: 'private' | 'org' | 'public'; visibilityScopeOrgId: string | null },
): Promise<number> {
const repoName = localTaskRepoName(taskId);
const now = new Date().toISOString();
const result = this.db
.prepare(`
WITH RECURSIVE job_tree(id) AS (
SELECT id FROM jobs WHERE repo = ?
UNION ALL
SELECT j.id FROM jobs j JOIN job_tree jt ON j.parent_job_id = jt.id
)
UPDATE jobs
SET visibility = ?,
visibility_scope_org_id = ?,
updated_at = ?
WHERE id IN (SELECT id FROM job_tree)
`)
.run(repoName, updates.visibility, updates.visibilityScopeOrgId, now);
return result.changes;
}
async getSubJobs(parentJobId: string): Promise<Job[]> {
// 同一 issue_number に複数ジョブがある場合ASK再投入等、最新のみ返す
// ROW_NUMBER() + rowid で同一 created_at でも一意に決定する
const rows = this.db
.prepare(`
SELECT * FROM (
SELECT j.*, ROW_NUMBER() OVER (
PARTITION BY j.issue_number
ORDER BY j.created_at DESC, j.rowid DESC
) AS rn
FROM jobs j
WHERE j.parent_job_id = ?
) WHERE rn = 1
ORDER BY issue_number ASC
`)
.all(parentJobId) as JobRow[];
return rows.map(rowToJob);
}
/**
* 全サブジョブが終端状態なら親ジョブを再キューに戻す。
* 再キューできた場合 true を返す。
*/
async requeueParentJobIfAllSubtasksDone(parentJobId: string): Promise<boolean> {
// 同一 issue_number に複数ジョブがある場合、最新のもの(ROW_NUMBER=1)のみで判定する
const result = this.db.prepare(`
UPDATE jobs
SET status = 'queued',
updated_at = datetime('now')
WHERE id = ?
AND status = 'waiting_subtasks'
AND (
SELECT COUNT(*) FROM (
SELECT status, ROW_NUMBER() OVER (
PARTITION BY issue_number
ORDER BY created_at DESC, rowid DESC
) AS rn
FROM jobs
WHERE parent_job_id = ?
) WHERE rn = 1
AND status NOT IN ('succeeded', 'failed', 'cancelled')
) = 0
`).run(parentJobId, parentJobId);
return result.changes > 0;
}
async deleteLocalTask(taskId: number): Promise<void> {
const repoName = localTaskRepoName(taskId);
// タスク存在確認 & workspace_path を取得(削除前に必要)
const taskRow = this.db
.prepare('SELECT workspace_path FROM local_tasks WHERE id = ?')
.get(taskId) as { workspace_path: string | null } | undefined;
if (!taskRow) {
throw new Error(`deleteLocalTask: task ${taskId} not found`);
}
// running/dispatching なジョブがある場合は削除を拒否
const activeJob = this.db
.prepare("SELECT id FROM jobs WHERE repo = ? AND status IN ('running', 'dispatching') LIMIT 1")
.get(repoName) as { id: string } | undefined;
if (activeJob) {
throw new Error(`deleteLocalTask: task ${taskId} has an active job (${activeJob.id})`);
}
// DB 操作をトランザクションで実行
const deleteTransaction = this.db.transaction(() => {
this.db.prepare('DELETE FROM issue_locks WHERE repo = ?').run(repoName);
this.db.prepare('DELETE FROM jobs WHERE repo = ?').run(repoName);
this.db.prepare('DELETE FROM local_tasks WHERE id = ?').run(taskId);
});
deleteTransaction();
// ワークスペースディレクトリを削除DB トランザクション外 — ロールバック不可のため)
if (taskRow.workspace_path && existsSync(taskRow.workspace_path)) {
rmSync(taskRow.workspace_path, { recursive: true, force: true });
}
logger.info(`Repository: deleted local task ${taskId}`);
}
/**
* 実行中のジョブをキャンセル状態に変更する。
* running または dispatching 状態のジョブのみ対象。
* 戻り値: キャンセル対象ジョブが見つかったら true、見つからなかったら false。
*/
requestJobCancel(jobId: string): boolean {
const result = this.db.prepare(`
UPDATE jobs
SET status = 'cancelled', updated_at = datetime('now')
WHERE id = ? AND status IN ('running', 'dispatching')
`).run(jobId);
return result.changes > 0;
}
// ── Scheduled Tasks ──────────────────────────────────────────
private mapScheduledTask(row: any): ScheduledTask {
const rawVisibility = row.visibility;
const visibility: ScheduledTask['visibility'] =
rawVisibility === 'org' || rawVisibility === 'public' ? rawVisibility : 'private';
return {
id: row.id,
title: row.title,
body: row.body,
pieceName: row.piece_name,
profile: row.profile,
outputFormat: row.output_format,
cronExpression: row.cron_expression,
nextRunAt: utc(row.next_run_at),
lastRunAt: utc(row.last_run_at),
lastJobId: row.last_job_id,
isActive: row.is_active === 1,
ownerId: row.owner_id ?? null,
ownerName: row.owner_name ?? null,
visibility,
visibilityScopeOrgId: row.visibility_scope_org_id ?? null,
visibilityScopeOrgName: row.visibility_scope_org_name ?? null,
browserSessionProfileId: row.browser_session_profile_id ?? null,
taskKind: row.task_kind === 'script' ? 'script' : 'agent',
scriptName: row.script_name ?? null,
scriptParams: row.script_params ?? null,
createdAt: utc(row.created_at),
updatedAt: utc(row.updated_at),
};
}
async createScheduledTask(params: CreateScheduledTaskParams): Promise<ScheduledTask> {
const result = this.db
.prepare(
`INSERT INTO scheduled_tasks (title, body, piece_name, profile, output_format, cron_expression, next_run_at, owner_id, visibility, visibility_scope_org_id, browser_session_profile_id, task_kind, script_name, script_params)
VALUES (@title, @body, @pieceName, @profile, @outputFormat, @cronExpression, @nextRunAt, @ownerId, @visibility, @visibilityScopeOrgId, @browserSessionProfileId, @taskKind, @scriptName, @scriptParams)`
)
.run({
title: params.title ?? null,
body: params.body,
pieceName: params.pieceName ?? 'auto',
profile: params.profile ?? 'auto',
outputFormat: params.outputFormat ?? 'markdown',
cronExpression: params.cronExpression,
nextRunAt: params.nextRunAt,
ownerId: params.ownerId ?? null,
visibility: params.visibility ?? 'private',
visibilityScopeOrgId: params.visibilityScopeOrgId ?? null,
browserSessionProfileId: params.browserSessionProfileId ?? null,
taskKind: params.taskKind ?? 'agent',
scriptName: params.scriptName ?? null,
scriptParams: params.scriptParams ?? null,
});
const task = this.getScheduledTaskSync(Number(result.lastInsertRowid));
if (!task) throw new Error('createScheduledTask: failed to load inserted task');
return task;
}
private getScheduledTaskSync(id: number): ScheduledTask | null {
const row = this.db
.prepare(`
SELECT st.*,
${SCHEDULED_TASK_DISPLAY_SELECT}
FROM scheduled_tasks st
${SCHEDULED_TASK_DISPLAY_JOIN}
WHERE st.id = ?
`)
.get(id) as any;
return row ? this.mapScheduledTask(row) : null;
}
async getScheduledTask(id: number, opts?: { viewer?: Express.User }): Promise<ScheduledTask | null> {
const viewerClause = opts?.viewer
? buildVisibilityWhere(opts.viewer, 'st')
: { clause: '1=1', params: [] as unknown[] };
const row = this.db
.prepare(`
SELECT st.*,
${SCHEDULED_TASK_DISPLAY_SELECT}
FROM scheduled_tasks st
${SCHEDULED_TASK_DISPLAY_JOIN}
WHERE st.id = ? AND ${viewerClause.clause}
`)
.get(id, ...viewerClause.params) as any;
return row ? this.mapScheduledTask(row) : null;
}
async listScheduledTasks(filter?: { viewer?: Express.User }): Promise<ScheduledTask[]> {
const conditions: string[] = [];
const queryParams: unknown[] = [];
if (filter?.viewer) {
const w = buildVisibilityWhere(filter.viewer, 'st');
conditions.push(w.clause);
queryParams.push(...w.params);
}
const whereClause = conditions.length > 0 ? `WHERE ${conditions.join(' AND ')}` : '';
const rows = this.db
.prepare(`
SELECT st.*,
${SCHEDULED_TASK_DISPLAY_SELECT}
FROM scheduled_tasks st
${SCHEDULED_TASK_DISPLAY_JOIN}
${whereClause}
ORDER BY st.created_at DESC
`)
.all(...queryParams) as any[];
return rows.map(r => this.mapScheduledTask(r));
}
/**
* due なスケジュールタスクをアトミックに claim して返す。
* BEGIN IMMEDIATE で書き込みロックを即座に取得し、
* 他のスケジューラーインスタンスとの重複実行を防止する。
* claim されたタスクは next_run_at が遠い未来に設定されるため、
* 他インスタンスに再取得されない。
* 呼び出し側が実行後に正しい next_run_at を再設定する。
*/
async getScheduledTasksDue(): Promise<ScheduledTask[]> {
// 十分遠い未来claim マーカー)
const claimMarker = '9999-12-31 23:59:59';
// BEGIN IMMEDIATE: トランザクション開始時に RESERVED ロックを取得し、
// 他の書き込みトランザクションとの競合を防ぐ
const txn = this.db.transaction(() => {
const rows = this.db
.prepare(
`SELECT * FROM scheduled_tasks
WHERE is_active = 1 AND next_run_at <= datetime('now')
ORDER BY next_run_at ASC`
)
.all() as any[];
if (rows.length === 0) return [];
// claim: next_run_at を遠い未来に設定して他インスタンスからの重複取得を防止
const ids = rows.map((r: any) => r.id);
this.db
.prepare(
`UPDATE scheduled_tasks
SET next_run_at = ?, updated_at = datetime('now')
WHERE id IN (${ids.map(() => '?').join(',')})`
)
.run(claimMarker, ...ids);
return rows;
});
const rows = txn.immediate();
return rows.map((r: any) => this.mapScheduledTask(r));
}
async updateScheduledTask(id: number, params: UpdateScheduledTaskParams): Promise<ScheduledTask | null> {
const sets: string[] = [];
const values: Record<string, any> = { id };
if (params.title !== undefined) { sets.push('title = @title'); values.title = params.title; }
if (params.body !== undefined) { sets.push('body = @body'); values.body = params.body; }
if (params.pieceName !== undefined) { sets.push('piece_name = @pieceName'); values.pieceName = params.pieceName; }
if (params.profile !== undefined) { sets.push('profile = @profile'); values.profile = params.profile; }
if (params.outputFormat !== undefined) { sets.push('output_format = @outputFormat'); values.outputFormat = params.outputFormat; }
if (params.cronExpression !== undefined) { sets.push('cron_expression = @cronExpression'); values.cronExpression = params.cronExpression; }
if (params.nextRunAt !== undefined) { sets.push('next_run_at = @nextRunAt'); values.nextRunAt = params.nextRunAt; }
if (params.lastRunAt !== undefined) { sets.push('last_run_at = @lastRunAt'); values.lastRunAt = params.lastRunAt; }
if (params.lastJobId !== undefined) { sets.push('last_job_id = @lastJobId'); values.lastJobId = params.lastJobId; }
if (params.isActive !== undefined) { sets.push('is_active = @isActive'); values.isActive = params.isActive ? 1 : 0; }
if (params.visibility !== undefined) { sets.push('visibility = @visibility'); values.visibility = params.visibility; }
if (params.visibilityScopeOrgId !== undefined) {
sets.push('visibility_scope_org_id = @visibilityScopeOrgId');
values.visibilityScopeOrgId = params.visibilityScopeOrgId;
}
if (params.browserSessionProfileId !== undefined) {
sets.push('browser_session_profile_id = @browserSessionProfileId');
values.browserSessionProfileId = params.browserSessionProfileId;
}
if (params.taskKind !== undefined) {
sets.push('task_kind = @taskKind');
values.taskKind = params.taskKind;
}
if (params.scriptName !== undefined) {
sets.push('script_name = @scriptName');
values.scriptName = params.scriptName;
}
if (params.scriptParams !== undefined) {
sets.push('script_params = @scriptParams');
values.scriptParams = params.scriptParams;
}
if (sets.length === 0) return this.getScheduledTaskSync(id);
sets.push("updated_at = datetime('now')");
this.db.prepare(`UPDATE scheduled_tasks SET ${sets.join(', ')} WHERE id = @id`).run(values);
return this.getScheduledTaskSync(id);
}
async deleteScheduledTask(id: number): Promise<boolean> {
const result = this.db.prepare('DELETE FROM scheduled_tasks WHERE id = ?').run(id);
return result.changes > 0;
}
// ── User CRUD ────────────────────────────────────────────────────
private rowToUser(row: UserRow): User {
return rowToUser(row);
}
createUser(params: CreateUserParams): User {
const id = uuidv4();
const now = new Date().toISOString();
this.db
.prepare(
`INSERT INTO users (id, email, name, avatar_url, role, status, created_at, updated_at)
VALUES (@id, @email, @name, @avatarUrl, @role, @status, @now, @now)`
)
.run({
id,
email: params.email,
name: params.name,
avatarUrl: params.avatarUrl ?? null,
role: params.role,
status: params.status,
now,
});
const user = this.getUserById(id);
if (!user) throw new Error(`createUser: failed to retrieve created user ${id}`);
return user;
}
/**
* Ensure the synthetic 'local' user row exists. No-auth single-user
* deployments own per-user rows under the id 'local' (tasks, jobs, SSH
* connections, DEKs, …). Many of those tables FK to users(id) with
* foreign_keys ON, so the row must exist or the inserts fail — e.g.
* ssh_user_deks → SSH connection creation returned create_failed.
* Idempotent (INSERT OR IGNORE), so it is safe to call on every startup.
* role='admin' mirrors the synthetic 'local' user the HTTP layer injects
* for task-visibility routes in no-auth mode.
*/
ensureLocalUser(): void {
const now = new Date().toISOString();
this.db
.prepare(
`INSERT OR IGNORE INTO users (id, email, name, avatar_url, role, status, created_at, updated_at)
VALUES ('local', 'local@localhost', 'local', NULL, 'admin', 'active', @now, @now)`
)
.run({ now });
}
// ── Local auth (email + password) ─────────────────────────────────────
/** scrypt hash with a fresh per-user salt. Overwrites any existing credential. */
setLocalPassword(userId: string, plainPassword: string): void {
const salt = randomBytes(16).toString('hex');
const hash = scryptSync(plainPassword, salt, 64).toString('hex');
const now = new Date().toISOString();
this.db
.prepare(
`INSERT INTO local_credentials (user_id, password_hash, salt, updated_at)
VALUES (@userId, @hash, @salt, @now)
ON CONFLICT(user_id) DO UPDATE SET password_hash=@hash, salt=@salt, updated_at=@now`,
)
.run({ userId, hash, salt, now });
}
/** Constant-time verify. False when the user has no local credential. */
verifyLocalPassword(userId: string, plainPassword: string): boolean {
const row = this.db
.prepare('SELECT password_hash, salt FROM local_credentials WHERE user_id = ?')
.get(userId) as { password_hash: string; salt: string } | undefined;
if (!row) return false;
const expected = Buffer.from(row.password_hash, 'hex');
const actual = scryptSync(plainPassword, row.salt, expected.length);
return expected.length === actual.length && timingSafeEqual(expected, actual);
}
hasLocalCredential(userId: string): boolean {
return !!this.db.prepare('SELECT 1 FROM local_credentials WHERE user_id = ?').get(userId);
}
/**
* Create a brand-new local account (self-signup or admin-created). The email
* MUST be unused: attaching a password to an existing account would be an
* account-takeover vector, so we reject instead of linking. Linking a local
* credential to an existing OAuth account is a separate, authenticated action
* (not v1 signup).
*/
createLocalUser(params: CreateLocalUserParams): User {
if (this.getUserByEmail(params.email)) {
throw new Error(`createLocalUser: a user with email ${params.email} already exists`);
}
const user = this.createUser({
email: params.email,
name: params.name ?? params.email,
role: params.role,
status: params.status,
});
this.db
.prepare(
`INSERT OR IGNORE INTO oauth_accounts (id, user_id, provider, provider_id, created_at)
VALUES (@id, @userId, 'local', @providerId, @now)`,
)
.run({ id: uuidv4(), userId: user.id, providerId: params.email, now: new Date().toISOString() });
this.setLocalPassword(user.id, params.password);
return user;
}
/**
* Idempotently seed the shared system admin under the fixed id `local` — the
* same owner the no-auth path synthesizes. This makes all pre-existing
* `local`-owned data belong to the logged-in admin once local auth is turned
* on, and lets an existing no-auth deployment gain a login mid-stream.
* Re-running updates the password and keeps role=admin/status=active.
*/
upsertLocalSystemAdmin(params: { email: string; password: string; name?: string }): User {
const LOCAL_ID = 'local';
const now = new Date().toISOString();
const existing = this.getUserById(LOCAL_ID);
if (!existing) {
this.db
.prepare(
`INSERT INTO users (id, email, name, avatar_url, role, status, created_at, updated_at)
VALUES (@id, @email, @name, NULL, 'admin', 'active', @now, @now)`,
)
.run({ id: LOCAL_ID, email: params.email, name: params.name ?? 'Local Admin', now });
} else {
this.db
.prepare(`UPDATE users SET email=@email, role='admin', status='active', updated_at=@now WHERE id=@id`)
.run({ id: LOCAL_ID, email: params.email, now });
}
this.db
.prepare(
`INSERT OR IGNORE INTO oauth_accounts (id, user_id, provider, provider_id, created_at)
VALUES (@id, @userId, 'local', @providerId, @now)`,
)
.run({ id: uuidv4(), userId: LOCAL_ID, providerId: params.email, now });
this.setLocalPassword(LOCAL_ID, params.password);
const user = this.getUserById(LOCAL_ID);
if (!user) throw new Error('upsertLocalSystemAdmin: failed to retrieve local admin');
return user;
}
// ── Local organizations ───────────────────────────────────────────────
private rowToLocalOrg(r: { id: string; name: string; created_by: string | null; created_at: string }): LocalOrg {
return { id: r.id, name: r.name, createdBy: r.created_by, createdAt: r.created_at };
}
/** Create a local org. id is prefixed `lorg:` so it never collides with a
* Gitea numeric org id (both live in visibility_scope_org_id). */
createLocalOrg(name: string, createdBy: string | null): LocalOrg {
const id = `lorg:${uuidv4()}`;
const now = new Date().toISOString();
this.db
.prepare(`INSERT INTO local_orgs (id, name, created_by, created_at) VALUES (@id, @name, @createdBy, @now)`)
.run({ id, name, createdBy, now });
return { id, name, createdBy, createdAt: now };
}
getLocalOrg(id: string): LocalOrg | null {
const r = this.db.prepare('SELECT id, name, created_by, created_at FROM local_orgs WHERE id = ?').get(id) as
| { id: string; name: string; created_by: string | null; created_at: string }
| undefined;
return r ? this.rowToLocalOrg(r) : null;
}
listLocalOrgs(): LocalOrg[] {
const rows = this.db.prepare('SELECT id, name, created_by, created_at FROM local_orgs ORDER BY name COLLATE NOCASE').all() as Array<{ id: string; name: string; created_by: string | null; created_at: string }>;
return rows.map(r => this.rowToLocalOrg(r));
}
renameLocalOrg(id: string, name: string): void {
this.db.prepare('UPDATE local_orgs SET name = ? WHERE id = ?').run(name, id);
}
/** Tables carrying `visibility_scope_org_id` (org-scoped resources). */
private static readonly ORG_SCOPED_TABLES = ['local_tasks', 'scheduled_tasks', 'jobs', 'note_index'];
/**
* Delete a local org. Members cascade via FK. Resources scoped to this org
* (visibility='org', visibility_scope_org_id=id) would become invisible to
* everyone once the org is gone — so first downgrade them to 'private'
* (owner + admin can still see them; no data loss). Atomic.
*/
deleteLocalOrg(id: string): void {
const tx = this.db.transaction((orgId: string) => {
for (const table of Repository.ORG_SCOPED_TABLES) {
this.db
.prepare(`UPDATE ${table} SET visibility = 'private', visibility_scope_org_id = NULL WHERE visibility_scope_org_id = ?`)
.run(orgId);
}
this.db.prepare('DELETE FROM local_orgs WHERE id = ?').run(orgId);
});
tx(id);
}
/** Add or update a member (idempotent — re-add updates the role). */
addOrgMember(orgId: string, userId: string, role: string = 'member'): void {
const now = new Date().toISOString();
this.db
.prepare(
`INSERT INTO local_org_members (org_id, user_id, role, added_at)
VALUES (@orgId, @userId, @role, @now)
ON CONFLICT(org_id, user_id) DO UPDATE SET role=@role`,
)
.run({ orgId, userId, role, now });
}
removeOrgMember(orgId: string, userId: string): void {
this.db.prepare('DELETE FROM local_org_members WHERE org_id = ? AND user_id = ?').run(orgId, userId);
}
listOrgMembers(orgId: string): LocalOrgMember[] {
const rows = this.db
.prepare('SELECT user_id, role FROM local_org_members WHERE org_id = ? ORDER BY added_at')
.all(orgId) as Array<{ user_id: string; role: string }>;
return rows.map(r => ({ userId: r.user_id, role: r.role }));
}
/** Orgs a user belongs to — merged into session.orgIds so the existing
* provider-agnostic 'org' visibility (buildVisibilityWhere) covers them. */
listUserLocalOrgs(userId: string): Array<{ orgId: string; name: string }> {
const rows = this.db
.prepare(
`SELECT o.id AS org_id, o.name AS name
FROM local_org_members m JOIN local_orgs o ON o.id = m.org_id
WHERE m.user_id = ? ORDER BY o.name COLLATE NOCASE`,
)
.all(userId) as Array<{ org_id: string; name: string }>;
return rows.map(r => ({ orgId: r.org_id, name: r.name }));
}
getUserById(id: string): User | null {
const row = this.db
.prepare('SELECT * FROM users WHERE id = ?')
.get(id) as UserRow | undefined;
return row ? this.rowToUser(row) : null;
}
getUserByEmail(email: string): User | null {
const row = this.db
.prepare('SELECT * FROM users WHERE email = ?')
.get(email) as UserRow | undefined;
return row ? this.rowToUser(row) : null;
}
findOrCreateUserByOAuth(params: FindOrCreateByOAuthParams): User {
// 1. Check if oauth_account already exists
const existing = this.db
.prepare('SELECT user_id FROM oauth_accounts WHERE provider = ? AND provider_id = ?')
.get(params.provider, params.providerId) as { user_id: string } | undefined;
if (existing) {
const user = this.getUserById(existing.user_id);
if (!user) throw new Error(`findOrCreateUserByOAuth: user ${existing.user_id} not found`);
// Sync mutable profile fields from the provider on every re-login so
// existing users whose name was missing on first login pick it up once
// their Gitea profile is populated. Email upgrade only applies when the
// dummy @gitea.local placeholder is being replaced.
const patch: { email?: string; name?: string; avatarUrl?: string | null } = {};
if (user.email.endsWith('@gitea.local') && !params.email.endsWith('@gitea.local')) {
patch.email = params.email;
}
if (params.name && params.name !== user.name) patch.name = params.name;
if (params.avatarUrl !== undefined && params.avatarUrl !== user.avatarUrl) {
patch.avatarUrl = params.avatarUrl;
}
if (Object.keys(patch).length > 0) {
this.updateUser(user.id, patch);
const refreshed = this.getUserById(user.id);
if (refreshed) return refreshed;
}
return user;
}
// 2. Check if user exists by email
let user = this.getUserByEmail(params.email);
if (!user) {
// 3. Create new user with status=pending
user = this.createUser({
email: params.email,
name: params.name,
role: 'user',
status: 'pending',
avatarUrl: params.avatarUrl,
});
}
// 4. Link oauth_account to user
const oauthId = uuidv4();
const now = new Date().toISOString();
this.db
.prepare(
`INSERT OR IGNORE INTO oauth_accounts (id, user_id, provider, provider_id, created_at)
VALUES (@id, @userId, @provider, @providerId, @now)`
)
.run({
id: oauthId,
userId: user.id,
provider: params.provider,
providerId: params.providerId,
now,
});
return user;
}
listUsers(): User[] {
const rows = this.db
.prepare('SELECT * FROM users ORDER BY created_at ASC')
.all() as UserRow[];
return rows.map(row => this.rowToUser(row));
}
updateUser(id: string, updates: {
status?: 'active' | 'pending' | 'disabled';
role?: 'admin' | 'user';
email?: string;
name?: string;
avatarUrl?: string | null;
defaultVisibility?: 'private' | 'org' | 'public';
defaultVisibilityOrgId?: string | null;
}): void {
const setClauses: string[] = ["updated_at = datetime('now')"];
const params: Record<string, unknown> = { id };
if (updates.status !== undefined) {
setClauses.push('status = @status');
params['status'] = updates.status;
}
if (updates.role !== undefined) {
setClauses.push('role = @role');
params['role'] = updates.role;
}
if (updates.email !== undefined) {
setClauses.push('email = @email');
params['email'] = updates.email;
}
if (updates.name !== undefined) {
setClauses.push('name = @name');
params['name'] = updates.name;
}
if (updates.avatarUrl !== undefined) {
setClauses.push('avatar_url = @avatar_url');
params['avatar_url'] = updates.avatarUrl;
}
if (updates.defaultVisibility !== undefined) {
setClauses.push('default_visibility = @default_visibility');
params['default_visibility'] = updates.defaultVisibility;
}
if (updates.defaultVisibilityOrgId !== undefined) {
setClauses.push('default_visibility_org_id = @default_visibility_org_id');
params['default_visibility_org_id'] = updates.defaultVisibilityOrgId;
}
if (setClauses.length === 1) return;
this.db
.prepare(`UPDATE users SET ${setClauses.join(', ')} WHERE id = @id`)
.run(params);
}
deleteUser(id: string): void {
// Never delete the shared `local` system/admin user: it is the no-auth
// fallback owner and owns all single-user-mode data. Deleting it would
// break no-auth mode and orphan every `local`-owned task/job/folder.
if (id === 'local') {
throw new Error('cannot delete the local/system user');
}
this.db.prepare('DELETE FROM users WHERE id = ?').run(id);
}
deleteSessionsByUserId(userId: string): void {
// Sessions store passport user info as JSON in sess column
// Delete sessions where sess contains the user id
const rows = this.db
.prepare('SELECT sid, sess FROM sessions')
.all() as Array<{ sid: string; sess: string }>;
const toDelete: string[] = [];
for (const row of rows) {
try {
const sess = JSON.parse(row.sess) as Record<string, unknown>;
const passport = sess['passport'] as Record<string, unknown> | undefined;
if (passport && passport['user'] === userId) {
toDelete.push(row.sid);
}
} catch {
// ignore parse errors
}
}
if (toDelete.length > 0) {
const placeholders = toDelete.map(() => '?').join(', ');
this.db.prepare(`DELETE FROM sessions WHERE sid IN (${placeholders})`).run(...toDelete);
}
}
replaceUserGiteaOrgs(userId: string, orgs: GiteaOrgInput[]): void {
const tx = this.db.transaction((uid: string, items: GiteaOrgInput[]) => {
this.db.prepare('DELETE FROM user_gitea_orgs WHERE user_id = ?').run(uid);
const insert = this.db.prepare(
'INSERT INTO user_gitea_orgs (user_id, org_id, org_name) VALUES (?, ?, ?)'
);
for (const o of items) insert.run(uid, o.orgId, o.orgName);
});
tx(userId, orgs);
}
listUserGiteaOrgs(userId: string): GiteaOrg[] {
const rows = this.db
.prepare('SELECT org_id, org_name, fetched_at FROM user_gitea_orgs WHERE user_id = ? ORDER BY org_name ASC')
.all(userId) as Array<{ org_id: string; org_name: string; fetched_at: string }>;
return rows.map(r => ({ orgId: r.org_id, orgName: r.org_name, fetchedAt: r.fetched_at }));
}
// ── Reflection piece-edit cooldown ──────────────────────────────────────────
/**
* Records that the reflection pipeline wrote a new version of pieceName for
* userId. snapshotId ties the edit back to the snapshot that triggered it.
*/
recordPieceEdit(userId: string, pieceName: string, snapshotId: string): void {
this.db.prepare(
`INSERT INTO reflection_piece_edits (user_id, piece_name, snapshot_id, created_at)
VALUES (?, ?, ?, ?)`
).run(userId, pieceName, snapshotId, Date.now());
}
/**
* Returns the number of piece edits for (userId, pieceName) that occurred
* within the last sinceMs milliseconds. Used by the cooldown gate in
* piece-writer.ts to prevent over-editing the same piece.
*/
countRecentPieceEdits(userId: string, pieceName: string, sinceMs: number): number {
return (this.db.prepare(
`SELECT COUNT(*) AS c FROM reflection_piece_edits
WHERE user_id = ? AND piece_name = ? AND created_at > ?`
).get(userId, pieceName, Date.now() - sinceMs) as { c: number }).c;
}
// ── Reflection metrics ───────────────────────────────────────────────────────
/**
* Insert one row into reflection_metrics, optionally bundling a
* reflection_piece_edits row in the same transaction.
*
* When pieceEdit is supplied the two inserts are wrapped in a single
* db.transaction() so the tables stay consistent even if the process
* crashes between them.
*/
recordReflectionRun(
metric: {
reflection_job_id: string;
original_job_id: string | null;
user_id: string;
piece_name: string | null;
outcome: 'applied' | 'partial' | 'abstained' | 'rejected' | 'failed';
memory_changes: number;
piece_edited: 0 | 1;
tokens_in: number;
tokens_out: number;
duration_ms: number;
},
pieceEdit?: { pieceName: string; snapshotId: string },
): void {
const now = Date.now();
const insertMetric = this.db.prepare(`
INSERT INTO reflection_metrics
(reflection_job_id, original_job_id, user_id, piece_name, outcome,
memory_changes, piece_edited, tokens_in, tokens_out, duration_ms, created_at)
VALUES
(@reflection_job_id, @original_job_id, @user_id, @piece_name, @outcome,
@memory_changes, @piece_edited, @tokens_in, @tokens_out, @duration_ms, @created_at)
`);
if (pieceEdit) {
const insertEdit = this.db.prepare(`
INSERT INTO reflection_piece_edits (user_id, piece_name, snapshot_id, created_at)
VALUES (?, ?, ?, ?)
`);
this.db.transaction(() => {
insertMetric.run({ ...metric, created_at: now });
insertEdit.run(metric.user_id, pieceEdit.pieceName, pieceEdit.snapshotId, now);
})();
} else {
insertMetric.run({ ...metric, created_at: now });
}
}
/**
* Convenience alias for callers that don't need the bundled pieceEdit path.
*/
recordReflectionMetric(
row: {
reflection_job_id: string;
original_job_id: string | null;
user_id: string;
piece_name: string | null;
outcome: 'applied' | 'partial' | 'abstained' | 'rejected' | 'failed';
memory_changes: number;
piece_edited: 0 | 1;
tokens_in: number;
tokens_out: number;
duration_ms: number;
},
): void {
this.recordReflectionRun(row);
}
/**
* Aggregate reflection metrics for a user since sinceMs (epoch ms).
* Returns counts per outcome and totals for tokens + piece edits.
*/
aggregateReflectionMetrics(
userId: string,
sinceMs: number,
): {
applied: number;
partial: number;
abstained: number;
rejected: number;
failed: number;
tokensIn: number;
tokensOut: number;
pieceEdits: number;
totalRuns: number;
} {
const rows = this.db
.prepare(
`SELECT outcome, COUNT(*) AS cnt,
SUM(tokens_in) AS ti, SUM(tokens_out) AS to_,
SUM(piece_edited) AS pe
FROM reflection_metrics
WHERE user_id = ? AND created_at >= ?
GROUP BY outcome`,
)
.all(userId, sinceMs) as Array<{
outcome: string;
cnt: number;
ti: number;
to_: number;
pe: number;
}>;
const result = {
applied: 0, partial: 0, abstained: 0, rejected: 0, failed: 0,
tokensIn: 0, tokensOut: 0, pieceEdits: 0, totalRuns: 0,
};
for (const r of rows) {
const o = r.outcome as keyof Pick<typeof result, 'applied' | 'partial' | 'abstained' | 'rejected' | 'failed'>;
if (o in result) (result as Record<string, number>)[o] = r.cnt;
result.tokensIn += r.ti ?? 0;
result.tokensOut += r.to_ ?? 0;
result.pieceEdits += r.pe ?? 0;
result.totalRuns += r.cnt;
}
return result;
}
async createDashboardWidget(params: {
userId: string;
slug: string;
title: string;
content?: string;
kind?: DashboardWidgetKind;
}): Promise<DashboardWidget> {
const max = this.db
.prepare(`SELECT COALESCE(MAX(sort_order), -1) AS m FROM user_dashboard_widgets WHERE user_id = ?`)
.get(params.userId) as { m: number };
const kind: DashboardWidgetKind = params.kind ?? 'markdown';
const result = this.db
.prepare(
`INSERT INTO user_dashboard_widgets (user_id, slug, title, kind, markdown_content, sort_order)
VALUES (?, ?, ?, ?, ?, ?)`
)
.run(params.userId, params.slug, params.title, kind, params.content ?? '', max.m + 1);
const row = this.db
.prepare(`SELECT * FROM user_dashboard_widgets WHERE id = ?`)
.get(Number(result.lastInsertRowid)) as DashboardWidgetRow;
return rowToDashboardWidget(row);
}
async listDashboardWidgets(userId: string): Promise<DashboardWidget[]> {
const rows = this.db
.prepare(`SELECT * FROM user_dashboard_widgets WHERE user_id = ? ORDER BY sort_order ASC, id ASC`)
.all(userId) as DashboardWidgetRow[];
return rows.map(rowToDashboardWidget);
}
async getDashboardWidget(id: number, userId: string): Promise<DashboardWidget | null> {
const row = this.db
.prepare(`SELECT * FROM user_dashboard_widgets WHERE id = ? AND user_id = ?`)
.get(id, userId) as DashboardWidgetRow | undefined;
return row ? rowToDashboardWidget(row) : null;
}
async updateDashboardWidget(
id: number,
userId: string,
patch: { title?: string; content?: string },
): Promise<DashboardWidget> {
const sets: string[] = [];
const args: unknown[] = [];
if (patch.title !== undefined) {
sets.push('title = ?');
args.push(patch.title);
}
if (patch.content !== undefined) {
sets.push('markdown_content = ?');
args.push(patch.content);
}
sets.push(`updated_at = datetime('now')`);
args.push(id, userId);
this.db.prepare(`UPDATE user_dashboard_widgets SET ${sets.join(', ')} WHERE id = ? AND user_id = ?`).run(...args);
const row = this.db
.prepare(`SELECT * FROM user_dashboard_widgets WHERE id = ? AND user_id = ?`)
.get(id, userId) as DashboardWidgetRow | undefined;
if (!row) throw new Error(`updateDashboardWidget: widget ${id} not found for user ${userId}`);
return rowToDashboardWidget(row);
}
async upsertDashboardWidgetBySlug(params: {
userId: string;
slug: string;
title?: string;
content: string;
mode?: 'replace' | 'append';
}): Promise<DashboardWidget> {
const existing = this.db
.prepare(`SELECT * FROM user_dashboard_widgets WHERE user_id = ? AND slug = ?`)
.get(params.userId, params.slug) as DashboardWidgetRow | undefined;
if (existing) {
const newContent =
params.mode === 'append'
? (existing.markdown_content ? `${existing.markdown_content}\n\n${params.content}` : params.content)
: params.content;
return this.updateDashboardWidget(existing.id, params.userId, { content: newContent });
}
if (!params.title) {
throw new Error('upsertDashboardWidgetBySlug: title is required when creating a new widget');
}
return this.createDashboardWidget({
userId: params.userId,
slug: params.slug,
title: params.title,
content: params.content,
});
}
async deleteDashboardWidget(id: number, userId: string): Promise<void> {
this.db
.prepare(`DELETE FROM user_dashboard_widgets WHERE id = ? AND user_id = ?`)
.run(id, userId);
}
async reorderDashboardWidgets(userId: string, ids: number[]): Promise<void> {
const owned = this.db
.prepare(`SELECT id FROM user_dashboard_widgets WHERE user_id = ?`)
.all(userId) as Array<{ id: number }>;
const ownedSet = new Set(owned.map(r => r.id));
const filtered = ids.filter(id => ownedSet.has(id));
const update = this.db.prepare(`UPDATE user_dashboard_widgets SET sort_order = ? WHERE id = ? AND user_id = ?`);
const tx = this.db.transaction((arr: number[]) => {
arr.forEach((id, idx) => update.run(idx, id, userId));
});
tx(filtered);
}
// ── AAO Gateway Phase 2a: virtual keys ───────────────────────────────
//
// The gateway auth middleware reads `findGatewayVirtualKeyByHash` on
// every request, so it MUST stay an indexed point lookup. The partial
// index `idx_gateway_keys_hash_active` covers that path. Admin-side
// methods (list/get/revoke/rotate/delete) are not hot.
/**
* Insert a new virtual key row. Throws on UNIQUE(key_hash) violation —
* callers must hash the raw key first (via src/gateway/key-format.ts)
* and pass the hash here. The raw key is never accepted by the
* Repository on purpose: there is no path that could log it.
*
* `allowedModels` is JSON-encoded when present and stored as NULL when
* omitted — distinct from `[]` which means "lock to zero models".
*/
createGatewayVirtualKey(params: {
id?: string;
keyHash: string;
keyPrefix: string;
team: string;
allowedModels?: string[] | null;
source?: GatewayVirtualKeySource;
createdBy?: string | null;
createdAt?: string;
/** Phase 2b: optional monthly tokens budget. null/undefined = unlimited. */
tokensBudget?: number | null;
/** Phase 2b: optional requests-per-minute cap. null/undefined = unlimited. */
rateLimitRpm?: number | null;
}): GatewayVirtualKey {
const id = params.id ?? randomUUID();
const allowedJson =
params.allowedModels === null || params.allowedModels === undefined
? null
: JSON.stringify(params.allowedModels);
const source: GatewayVirtualKeySource = params.source ?? 'admin';
const createdAt = params.createdAt ?? new Date().toISOString();
const tokensBudget = normalizeOptionalPositiveInt(params.tokensBudget);
const rateLimitRpm = normalizeOptionalPositiveInt(params.rateLimitRpm);
this.db
.prepare(
`INSERT INTO gateway_virtual_keys
(id, key_hash, key_prefix, team, allowed_models, source, created_at, created_by, tokens_budget, rate_limit_rpm)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
)
.run(
id,
params.keyHash,
params.keyPrefix,
params.team,
allowedJson,
source,
createdAt,
params.createdBy ?? null,
tokensBudget,
rateLimitRpm,
);
const row = this.db
.prepare(`SELECT * FROM gateway_virtual_keys WHERE id = ?`)
.get(id) as GatewayVirtualKeyRow;
return rowToGatewayVirtualKey(row);
}
/**
* Phase 2b: partial update of a virtual key's policy fields. The
* bearer hash, team, source, and creation metadata are immutable here
* (use rotate to change the bearer). Each field is opt-in — undefined
* means "leave alone"; explicit null clears the limit (= unlimited).
*
* Returns the refreshed row. Throws when the id doesn't exist (caller
* is expected to 404 before calling).
*/
updateGatewayVirtualKey(
id: string,
patch: {
/**
* Phase 3a follow-up: team is now patchable so the config-migration
* importer can propagate a YAML-side team rename to the DB. Admin
* PATCH never sends this field (the team is intentionally immutable
* via the public API to avoid an admin accidentally rewriting the
* owner of a key); the only caller is importConfigKeysToDb.
*/
team?: string;
tokensBudget?: number | null;
rateLimitRpm?: number | null;
allowedModels?: string[] | null;
},
): GatewayVirtualKey {
const sets: string[] = [];
const args: unknown[] = [];
if (Object.prototype.hasOwnProperty.call(patch, 'team')) {
sets.push('team = ?');
args.push(patch.team);
}
if (Object.prototype.hasOwnProperty.call(patch, 'tokensBudget')) {
sets.push('tokens_budget = ?');
args.push(normalizeOptionalPositiveInt(patch.tokensBudget));
}
if (Object.prototype.hasOwnProperty.call(patch, 'rateLimitRpm')) {
sets.push('rate_limit_rpm = ?');
args.push(normalizeOptionalPositiveInt(patch.rateLimitRpm));
}
if (Object.prototype.hasOwnProperty.call(patch, 'allowedModels')) {
sets.push('allowed_models = ?');
args.push(
patch.allowedModels === null || patch.allowedModels === undefined
? null
: JSON.stringify(patch.allowedModels),
);
}
if (sets.length > 0) {
args.push(id);
this.db
.prepare(`UPDATE gateway_virtual_keys SET ${sets.join(', ')} WHERE id = ?`)
.run(...args);
}
const refreshed = this.findGatewayVirtualKeyById(id);
if (!refreshed) {
throw new Error(`updateGatewayVirtualKey: id not found (${id})`);
}
return refreshed;
}
/**
* Auth hot path: look up an active (non-revoked) key by SHA-256 hash.
* The partial index covers this query so the planner uses it directly.
* Returns null on miss; never throws.
*/
findGatewayVirtualKeyByHash(keyHash: string): GatewayVirtualKey | null {
const row = this.db
.prepare(
`SELECT * FROM gateway_virtual_keys WHERE key_hash = ? AND revoked_at IS NULL`,
)
.get(keyHash) as GatewayVirtualKeyRow | undefined;
return row ? rowToGatewayVirtualKey(row) : null;
}
/**
* Admin lookup by row id. Includes revoked keys (the admin list/detail
* view shows them so an admin can audit a recent revoke).
*/
findGatewayVirtualKeyById(id: string): GatewayVirtualKey | null {
const row = this.db
.prepare(`SELECT * FROM gateway_virtual_keys WHERE id = ?`)
.get(id) as GatewayVirtualKeyRow | undefined;
return row ? rowToGatewayVirtualKey(row) : null;
}
/**
* Admin list. `activeOnly` filters out revoked rows; `team` narrows by
* team string (exact match). Ordering is `created_at DESC, id DESC` so
* the freshest issuance is first regardless of system clock skew.
*/
listGatewayVirtualKeys(opts?: { team?: string; activeOnly?: boolean }): GatewayVirtualKey[] {
const where: string[] = [];
const args: unknown[] = [];
if (opts?.team !== undefined) {
where.push('team = ?');
args.push(opts.team);
}
if (opts?.activeOnly) {
where.push('revoked_at IS NULL');
}
const sql =
`SELECT * FROM gateway_virtual_keys` +
(where.length > 0 ? ` WHERE ${where.join(' AND ')}` : '') +
` ORDER BY created_at DESC, id DESC`;
const rows = this.db.prepare(sql).all(...args) as GatewayVirtualKeyRow[];
return rows.map(rowToGatewayVirtualKey);
}
/**
* Mark a key as revoked. Returns true if the row was active (so the
* caller can return a clean 200) and false if it was already revoked
* or doesn't exist (so the caller can return 404 / 409). Idempotent
* second calls return false.
*/
revokeGatewayVirtualKey(id: string, revokedBy: string, at?: string): boolean {
const ts = at ?? new Date().toISOString();
const info = this.db
.prepare(
`UPDATE gateway_virtual_keys
SET revoked_at = ?, revoked_by = ?
WHERE id = ? AND revoked_at IS NULL`,
)
.run(ts, revokedBy, id);
return info.changes > 0;
}
/**
* Hard delete. The admin API guards `source='config-import'` and
* returns 400 before calling this — but the Repository itself doesn't
* enforce that (tests need to be able to clean up). Returns true if a
* row was deleted.
*/
/**
* Hard-delete a virtual key row.
*
* Defense-in-depth: refuses to delete rows with `source='config-import'`
* by throwing. The admin REST API also rejects this case (returning a
* 400 with a human-readable message), but a future internal caller
* could easily forget — and a hard delete of a config-import row
* would simply be replayed on the next gateway boot when
* importConfigKeysToDb re-imports the entry from config.yaml. That
* recreates the row with a different id, which silently breaks any
* audit history that referenced the previous id and is generally
* confusing operator behavior. Force callers to use `revoke` (soft
* delete) or to remove the entry from config.yaml first.
*
* Returns true when a row was deleted, false when the id didn't
* exist. Throws when the row exists but is config-import.
*/
deleteGatewayVirtualKey(id: string): boolean {
const row = this.findGatewayVirtualKeyById(id);
if (!row) return false;
if (row.source === 'config-import') {
throw new Error(
`cannot delete config-import virtual key (id=${id}); ` +
"remove the entry from config.yaml's gateway.virtual_keys instead, " +
'or use revoke for a soft delete',
);
}
const info = this.db
.prepare(`DELETE FROM gateway_virtual_keys WHERE id = ?`)
.run(id);
return info.changes > 0;
}
/**
* Bump `last_used_at` for an active key. Called from the gateway auth
* middleware on successful match. Per-request volume can be high, so
* callers typically dedup with a 30-second in-memory bucket (see
* src/gateway/auth.ts) before touching the DB. Best-effort: failures
* are swallowed by the caller so a temporary write-lock contention
* never blocks auth.
*/
touchGatewayVirtualKeyLastUsed(id: string, at?: string): void {
const ts = at ?? new Date().toISOString();
this.db
.prepare(`UPDATE gateway_virtual_keys SET last_used_at = ? WHERE id = ?`)
.run(ts, id);
}
// ── AAO Gateway Phase 2b: usage tracking ─────────────────────────────
//
// Read path is on the budget enforcement hot loop, so it stays a
// single point lookup over the composite PRIMARY KEY. Write path is
// an UPSERT (`ON CONFLICT … DO UPDATE`) so the gateway can fire-and-
// forget after every chat completion without a pre-read.
/**
* Point-lookup over the (key_id, period_start) PRIMARY KEY. Returns
* null when there's no row yet (= "no usage in this period"), which
* the caller treats as zero counters.
*/
getGatewayKeyUsage(keyId: string, periodStart: string): GatewayKeyUsage | null {
const row = this.db
.prepare(
`SELECT * FROM gateway_key_usage WHERE key_id = ? AND period_start = ?`,
)
.get(keyId, periodStart) as GatewayKeyUsageRow | undefined;
return row ? rowToGatewayKeyUsage(row) : null;
}
/**
* UPSERT: bump the per-(key, period) counters by the supplied deltas.
* All three deltas are clamped at zero so a buggy caller can never
* decrement a counter. `last_updated_at` always reflects the call
* time (or the explicit `at` override) so a downstream sweeper can
* tell when activity stopped.
*
* Called from two places on the gateway hot path:
* 1. stream-proxy's finally block (token deltas from upstream usage)
* 2. rate-limiter's 30-second batch flush (request count only)
*
* The second caller passes `tokensIn=0 tokensOut=0` so the UPSERT
* still creates a row even when no token usage was extracted.
*/
incrementGatewayKeyUsage(params: {
keyId: string;
period: string;
tokensIn?: number;
tokensOut?: number;
requests?: number;
at?: string;
}): void {
const tIn = Math.max(0, Math.floor(params.tokensIn ?? 0));
const tOut = Math.max(0, Math.floor(params.tokensOut ?? 0));
const reqs = Math.max(0, Math.floor(params.requests ?? 0));
const ts = params.at ?? new Date().toISOString();
this.db
.prepare(
`INSERT INTO gateway_key_usage
(key_id, period_start, tokens_in, tokens_out, requests, last_updated_at)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT (key_id, period_start) DO UPDATE SET
tokens_in = tokens_in + excluded.tokens_in,
tokens_out = tokens_out + excluded.tokens_out,
requests = requests + excluded.requests,
last_updated_at = excluded.last_updated_at`,
)
.run(params.keyId, params.period, tIn, tOut, reqs, ts);
}
/**
* Admin view: history of usage rows for a key, freshest period first.
* Default limit 12 covers a full year of monthly buckets — enough for
* the in-UI bar chart and for `GET /:id/usage` to embed history without
* a follow-up call.
*/
listGatewayKeyUsagesByKey(keyId: string, opts?: { limit?: number }): GatewayKeyUsage[] {
const limit = Math.max(1, Math.min(120, Math.floor(opts?.limit ?? 12)));
const rows = this.db
.prepare(
`SELECT * FROM gateway_key_usage
WHERE key_id = ?
ORDER BY period_start DESC
LIMIT ?`,
)
.all(keyId, limit) as GatewayKeyUsageRow[];
return rows.map(rowToGatewayKeyUsage);
}
// ── Per-user daily LLM usage (gateway + direct) ──────────────────────
//
// Recorded at the OpenAICompatClient completion boundary for every
// successful chat completion. UPSERT on the (day, user_id, source,
// model, route) grain. Separate lens from gateway_key_usage — never
// summed across the two tables. Spec:
// docs/superpowers/specs/2026-06-11-llm-usage-aggregation-design.md
/**
* UPSERT: bump per-(day, user, source, model, route) counters. Deltas
* are clamped at zero. `day` defaults to the UTC day of `at` (or now).
* Called once per successful stream completion; `usage`-less completions
* still bump `requests` (tokens 0) so a 0-token request is distinct from
* a failed/aborted one (which is never recorded).
*/
incrementLlmUsage(params: LlmUsageIncrement): void {
const tIn = Math.max(0, Math.floor(params.tokensIn ?? 0));
const tOut = Math.max(0, Math.floor(params.tokensOut ?? 0));
const reqs = Math.max(0, Math.floor(params.requests ?? 1));
const ts = params.at ?? new Date().toISOString();
const day = params.day ?? ts.slice(0, 10);
this.db
.prepare(
`INSERT INTO llm_usage_daily
(day, user_id, source, model, route, tokens_in, tokens_out, requests, last_updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (day, user_id, source, model, route) DO UPDATE SET
tokens_in = tokens_in + excluded.tokens_in,
tokens_out = tokens_out + excluded.tokens_out,
requests = requests + excluded.requests,
last_updated_at = excluded.last_updated_at`,
)
.run(day, params.userId, params.source, params.model, params.route, tIn, tOut, reqs, ts);
}
/**
* Daily time series for the usage dashboard, grouped by (day, user_id,
* source) with model/route collapsed. `userId` filter scopes a non-admin
* to their own rows; omit it for the admin all-users view (callers can
* collapse user_id afterwards). Inclusive `from`/`to` are 'YYYY-MM-DD'.
*/
queryLlmUsageDaily(opts: { from: string; to: string; userId?: string }): LlmUsageDailyAgg[] {
const where = ['day >= ?', 'day <= ?'];
const args: unknown[] = [opts.from, opts.to];
if (opts.userId !== undefined) {
where.push('user_id = ?');
args.push(opts.userId);
}
const rows = this.db
.prepare(
`SELECT day, user_id, source,
SUM(tokens_in) AS tokens_in,
SUM(tokens_out) AS tokens_out,
SUM(requests) AS requests
FROM llm_usage_daily
WHERE ${where.join(' AND ')}
GROUP BY day, user_id, source
ORDER BY day ASC`,
)
.all(...args) as Array<{
day: string;
user_id: string;
source: string;
tokens_in: number;
tokens_out: number;
requests: number;
}>;
return rows.map((r) => ({
day: r.day,
userId: r.user_id,
source: r.source,
tokensIn: r.tokens_in,
tokensOut: r.tokens_out,
requests: r.requests,
}));
}
/**
* v2 write path: UPSERT per-(hour, user, source, model, route) counters.
* `hour` defaults to the current UTC hour 'YYYY-MM-DDTHH'. Same contract as
* incrementLlmUsage (deltas clamped at zero; usage-less completions still
* bump `requests`). Supersedes incrementLlmUsage as the recorder target.
*/
incrementLlmUsageHourly(params: LlmUsageHourlyIncrement): void {
const tIn = Math.max(0, Math.floor(params.tokensIn ?? 0));
const tOut = Math.max(0, Math.floor(params.tokensOut ?? 0));
const reqs = Math.max(0, Math.floor(params.requests ?? 1));
const ts = params.at ?? new Date().toISOString();
const hour = params.hour ?? ts.slice(0, 13);
this.db
.prepare(
`INSERT INTO llm_usage_hourly
(hour, user_id, source, model, route, tokens_in, tokens_out, requests, last_updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (hour, user_id, source, model, route) DO UPDATE SET
tokens_in = tokens_in + excluded.tokens_in,
tokens_out = tokens_out + excluded.tokens_out,
requests = requests + excluded.requests,
last_updated_at = excluded.last_updated_at`,
)
.run(hour, params.userId, params.source, params.model, params.route, tIn, tOut, reqs, ts);
}
/**
* Raw hour-grain rows for the usage dashboard v2, no axis collapsed so the
* API can group by any of source/model/route/user/org and re-bucket into the
* viewer's local timezone. `userId` scopes a non-admin to their own rows.
* Inclusive `fromHour`/`toHour` are 'YYYY-MM-DDTHH' (UTC) — callers widen the
* UTC window by ±1 day before filtering precisely against local days.
*/
queryLlmUsageHourly(opts: {
fromHour: string;
toHour: string;
userId?: string;
}): LlmUsageHourlyRow[] {
const where = ['hour >= ?', 'hour <= ?'];
const args: unknown[] = [opts.fromHour, opts.toHour];
if (opts.userId !== undefined) {
where.push('user_id = ?');
args.push(opts.userId);
}
const rows = this.db
.prepare(
`SELECT hour, user_id, source, model, route, tokens_in, tokens_out, requests
FROM llm_usage_hourly
WHERE ${where.join(' AND ')}
ORDER BY hour ASC`,
)
.all(...args) as Array<{
hour: string;
user_id: string;
source: string;
model: string;
route: string;
tokens_in: number;
tokens_out: number;
requests: number;
}>;
return rows.map((r) => ({
hour: r.hour,
userId: r.user_id,
source: r.source,
model: r.model,
route: r.route,
tokensIn: r.tokens_in,
tokensOut: r.tokens_out,
requests: r.requests,
}));
}
/**
* Map every user to a single org label for the usage dashboard's "by org"
* breakdown. Unions Gitea org cache (user_gitea_orgs) and local orgs
* (local_org_members → local_orgs.name); a multi-org user collapses to the
* alphabetically-first org name (MIN) so the breakdown is deterministic.
* Users with no org (and the 'local'/'system' sentinels) are simply absent —
* the API buckets them under 'no-org'.
*/
getUsageOrgMap(): Map<string, string> {
const rows = this.db
.prepare(
`SELECT user_id, MIN(org_name) AS org_name FROM (
SELECT user_id, org_name FROM user_gitea_orgs
UNION ALL
SELECT m.user_id AS user_id, o.name AS org_name
FROM local_org_members m
JOIN local_orgs o ON o.id = m.org_id
)
GROUP BY user_id`,
)
.all() as Array<{ user_id: string; org_name: string }>;
const map = new Map<string, string>();
for (const r of rows) map.set(r.user_id, r.org_name);
return map;
}
/** Return the underlying Database instance (needed by migrate.ts and session store) */
getDb(): Database.Database {
return this.db;
}
// ── Browser Notifications V2: push_subscriptions ────────────────────
// Spec: docs/superpowers/specs/2026-05-28-browser-notifications-v2-webpush.md
/**
* Insert or, on endpoint collision, transfer ownership to the new user.
* endpoint is globally UNIQUE: the same browser logging in as a different
* user re-uses the same push service URL, so we move it rather than fail.
*/
upsertPushSubscription(input: UpsertPushSubscriptionInput): { id: string } {
const existing = this.db
.prepare('SELECT id FROM push_subscriptions WHERE endpoint = ?')
.get(input.endpoint) as { id: string } | undefined;
if (existing) {
this.db
.prepare(
`UPDATE push_subscriptions
SET user_id = ?, p256dh = ?, auth = ?, user_agent = ?,
vapid_key_id = ?, last_success_at = NULL,
last_failure_at = NULL, failure_count = 0
WHERE id = ?`,
)
.run(
input.userId,
input.p256dh,
input.auth,
input.userAgent ?? null,
input.vapidKeyId,
existing.id,
);
return { id: existing.id };
}
const id = randomUUID();
this.db
.prepare(
`INSERT INTO push_subscriptions
(id, user_id, endpoint, p256dh, auth, user_agent, vapid_key_id)
VALUES (?, ?, ?, ?, ?, ?, ?)`,
)
.run(
id,
input.userId,
input.endpoint,
input.p256dh,
input.auth,
input.userAgent ?? null,
input.vapidKeyId,
);
return { id };
}
listPushSubscriptionsForUser(userId: string): PushSubscriptionRecord[] {
const rows = this.db
.prepare(
`SELECT id, user_id, endpoint, p256dh, auth, user_agent, vapid_key_id,
created_at, last_success_at, last_failure_at, failure_count
FROM push_subscriptions
WHERE user_id = ?
ORDER BY created_at ASC`,
)
.all(userId) as Array<{
id: string;
user_id: string;
endpoint: string;
p256dh: string;
auth: string;
user_agent: string | null;
vapid_key_id: string;
created_at: string;
last_success_at: string | null;
last_failure_at: string | null;
failure_count: number;
}>;
return rows.map(rowToPushSubscription);
}
getPushSubscriptionById(id: string): PushSubscriptionRecord | null {
const row = this.db
.prepare(
`SELECT id, user_id, endpoint, p256dh, auth, user_agent, vapid_key_id,
created_at, last_success_at, last_failure_at, failure_count
FROM push_subscriptions WHERE id = ?`,
)
.get(id) as Parameters<typeof rowToPushSubscription>[0] | undefined;
return row ? rowToPushSubscription(row) : null;
}
deletePushSubscription(id: string): void {
this.db.prepare('DELETE FROM push_subscriptions WHERE id = ?').run(id);
}
markPushSubscriptionSuccess(id: string): void {
this.db
.prepare(
`UPDATE push_subscriptions
SET last_success_at = datetime('now'), failure_count = 0
WHERE id = ?`,
)
.run(id);
}
markPushSubscriptionFailure(id: string): void {
this.db
.prepare(
`UPDATE push_subscriptions
SET last_failure_at = datetime('now'),
failure_count = failure_count + 1
WHERE id = ?`,
)
.run(id);
}
// ── Browser Notifications V2: user_notification_prefs ────────────────
/**
* Read per-user prefs; create row with defaults if none exists.
* Defaults: enabled=true, all events on, include_details=false, v1_migrated=false.
*/
getUserNotificationPrefs(userId: string): NotificationPrefs {
const row = this.db
.prepare(
`SELECT user_id, enabled, event_running, event_succeeded,
event_failed, event_waiting_human, include_details,
v1_migrated, updated_at
FROM user_notification_prefs WHERE user_id = ?`,
)
.get(userId) as
| {
user_id: string;
enabled: number;
event_running: number;
event_succeeded: number;
event_failed: number;
event_waiting_human: number;
include_details: number;
v1_migrated: number;
updated_at: string;
}
| undefined;
if (!row) {
// Lazily create the default row so subsequent reads/updates are
// a simple UPDATE rather than a conditional insert.
this.db
.prepare(
`INSERT INTO user_notification_prefs (user_id) VALUES (?)
ON CONFLICT(user_id) DO NOTHING`,
)
.run(userId);
return {
userId,
enabled: true,
events: { running: true, succeeded: true, failed: true, waiting_human: true },
includeDetails: false,
v1Migrated: false,
updatedAt: new Date().toISOString(),
};
}
return {
userId: row.user_id,
enabled: row.enabled !== 0,
events: {
running: row.event_running !== 0,
succeeded: row.event_succeeded !== 0,
failed: row.event_failed !== 0,
waiting_human: row.event_waiting_human !== 0,
},
includeDetails: row.include_details !== 0,
v1Migrated: row.v1_migrated !== 0,
updatedAt: row.updated_at,
};
}
upsertUserNotificationPrefs(userId: string, update: NotificationPrefsUpdate): void {
// Ensure a row exists first (lazy default creation matches getUserNotificationPrefs).
this.db
.prepare(`INSERT OR IGNORE INTO user_notification_prefs (user_id) VALUES (?)`)
.run(userId);
const sets: string[] = [];
const params: Array<string | number> = [];
if (update.enabled !== undefined) {
sets.push('enabled = ?');
params.push(update.enabled ? 1 : 0);
}
if (update.events) {
if (update.events.running !== undefined) {
sets.push('event_running = ?');
params.push(update.events.running ? 1 : 0);
}
if (update.events.succeeded !== undefined) {
sets.push('event_succeeded = ?');
params.push(update.events.succeeded ? 1 : 0);
}
if (update.events.failed !== undefined) {
sets.push('event_failed = ?');
params.push(update.events.failed ? 1 : 0);
}
if (update.events.waiting_human !== undefined) {
sets.push('event_waiting_human = ?');
params.push(update.events.waiting_human ? 1 : 0);
}
}
if (update.includeDetails !== undefined) {
sets.push('include_details = ?');
params.push(update.includeDetails ? 1 : 0);
}
if (update.v1Migrated !== undefined) {
sets.push('v1_migrated = ?');
params.push(update.v1Migrated ? 1 : 0);
}
if (sets.length === 0) return;
sets.push("updated_at = datetime('now')");
params.push(userId);
this.db
.prepare(`UPDATE user_notification_prefs SET ${sets.join(', ')} WHERE user_id = ?`)
.run(...params);
}
/**
* One-time V1 (localStorage) → V2 (server) preferences migration marker.
* Returns true if this call performed the migration mark (caller should
* then apply the localStorage values via upsertUserNotificationPrefs).
* Returns false if already migrated (caller should treat as 409 conflict).
*/
markV1MigrationComplete(userId: string): boolean {
this.db
.prepare(`INSERT OR IGNORE INTO user_notification_prefs (user_id) VALUES (?)`)
.run(userId);
const result = this.db
.prepare(
`UPDATE user_notification_prefs
SET v1_migrated = 1, updated_at = datetime('now')
WHERE user_id = ? AND v1_migrated = 0`,
)
.run(userId);
return result.changes > 0;
}
close(): void {
this.db.close();
}
}
function rowToPushSubscription(row: {
id: string;
user_id: string;
endpoint: string;
p256dh: string;
auth: string;
user_agent: string | null;
vapid_key_id: string;
created_at: string;
last_success_at: string | null;
last_failure_at: string | null;
failure_count: number;
}): PushSubscriptionRecord {
return {
id: row.id,
userId: row.user_id,
endpoint: row.endpoint,
p256dh: row.p256dh,
auth: row.auth,
userAgent: row.user_agent,
vapidKeyId: row.vapid_key_id,
createdAt: row.created_at,
lastSuccessAt: row.last_success_at,
lastFailureAt: row.last_failure_at,
failureCount: row.failure_count,
};
}
export { BrowserSessionRepo } from './browser-session-repo.js';
export type { BrowserSessionProfile, CreateProfileInput, AuditInput } from './browser-session-repo.js';