Files
ai-code-app/etc/servers/work-server/src/services/chat-runtime-service.ts
2026-04-21 03:33:23 +09:00

393 lines
11 KiB
TypeScript
Executable File

type ChatRuntimeJobMode = 'queue' | 'direct';
type ChatRuntimeLifecycleStatus = 'queued' | 'running';
type ChatRuntimeTerminalStatus = 'completed' | 'failed' | 'cancelled' | 'removed';
type RuntimeJobControl = {
cancel?: () => Promise<boolean> | boolean;
remove?: () => Promise<boolean> | boolean;
};
type RuntimeJobRecord = ChatRuntimeJobItem & {
logs: string[];
lastUpdatedAt: string;
terminalStatus: ChatRuntimeTerminalStatus | null;
};
export type ChatRuntimeJobItem = {
requestId: string;
sessionId: string;
mode: ChatRuntimeJobMode;
status: ChatRuntimeLifecycleStatus;
summary: string;
enqueuedAt: string;
startedAt: string | null;
pid: number | null;
};
export type ChatRuntimeSessionSummary = {
sessionId: string;
runningCount: number;
queuedCount: number;
latestRequestId: string | null;
latestStatus: ChatRuntimeLifecycleStatus | null;
};
export type ChatRuntimeSnapshot = {
generatedAt: string;
runningCount: number;
queuedCount: number;
sessionCount: number;
running: ChatRuntimeJobItem[];
queued: ChatRuntimeJobItem[];
sessions: ChatRuntimeSessionSummary[];
recent: Array<ChatRuntimeJobItem & { terminalStatus: ChatRuntimeTerminalStatus; lastUpdatedAt: string }>;
};
export type ChatRuntimeJobDetail = {
item: ChatRuntimeJobItem | null;
logs: string[];
lastUpdatedAt: string | null;
terminalStatus: ChatRuntimeTerminalStatus | null;
availableActions: {
cancel: boolean;
remove: boolean;
};
};
type RuntimeSubscriber = (snapshot: ChatRuntimeSnapshot) => void;
const MAX_LOG_LINES = 80;
const MAX_ARCHIVED_JOBS = 40;
function nowIso() {
return new Date().toISOString();
}
function summarizeText(text: string) {
const normalized = String(text ?? '').replace(/\s+/g, ' ').trim();
return normalized.length > 120 ? `${normalized.slice(0, 117).trimEnd()}...` : normalized;
}
function normalizeLogLine(line: string) {
return String(line ?? '').replace(/\r/g, '').trimEnd();
}
class ChatRuntimeService {
private readonly queuedJobs = new Map<string, RuntimeJobRecord>();
private readonly runningJobs = new Map<string, RuntimeJobRecord>();
private readonly archivedJobs = new Map<string, RuntimeJobRecord>();
private readonly controls = new Map<string, RuntimeJobControl>();
private readonly subscribers = new Set<RuntimeSubscriber>();
subscribe(listener: RuntimeSubscriber) {
this.subscribers.add(listener);
return () => {
this.subscribers.delete(listener);
};
}
getSnapshot(): ChatRuntimeSnapshot {
const running = [...this.runningJobs.values()].sort((left, right) =>
(left.startedAt ?? left.enqueuedAt).localeCompare(right.startedAt ?? right.enqueuedAt),
);
const queued = [...this.queuedJobs.values()].sort((left, right) => left.enqueuedAt.localeCompare(right.enqueuedAt));
const sessionMap = new Map<string, ChatRuntimeSessionSummary>();
for (const item of [...running, ...queued]) {
const current = sessionMap.get(item.sessionId) ?? {
sessionId: item.sessionId,
runningCount: 0,
queuedCount: 0,
latestRequestId: null,
latestStatus: null,
};
if (item.status === 'running') {
current.runningCount += 1;
} else {
current.queuedCount += 1;
}
current.latestRequestId = item.requestId;
current.latestStatus = item.status;
sessionMap.set(item.sessionId, current);
}
const sessions = [...sessionMap.values()].sort((left, right) => {
const loadDiff = right.runningCount + right.queuedCount - (left.runningCount + left.queuedCount);
return loadDiff !== 0 ? loadDiff : left.sessionId.localeCompare(right.sessionId);
});
return {
generatedAt: nowIso(),
runningCount: running.length,
queuedCount: queued.length,
sessionCount: sessions.length,
running: running.map(({ logs: _logs, lastUpdatedAt: _lastUpdatedAt, terminalStatus: _terminalStatus, ...item }) => item),
queued: queued.map(({ logs: _logs, lastUpdatedAt: _lastUpdatedAt, terminalStatus: _terminalStatus, ...item }) => item),
sessions,
recent: [...this.archivedJobs.values()]
.sort((left, right) => right.lastUpdatedAt.localeCompare(left.lastUpdatedAt))
.slice(0, 12)
.map(({ logs: _logs, ...item }) => ({
...item,
terminalStatus: item.terminalStatus ?? 'completed',
})),
};
}
getJobDetail(requestId: string): ChatRuntimeJobDetail {
const current =
this.runningJobs.get(requestId) ??
this.queuedJobs.get(requestId) ??
this.archivedJobs.get(requestId) ??
null;
return {
item: current
? {
requestId: current.requestId,
sessionId: current.sessionId,
mode: current.mode,
status: current.status,
summary: current.summary,
enqueuedAt: current.enqueuedAt,
startedAt: current.startedAt,
pid: current.pid,
}
: null,
logs: current?.logs ?? [],
lastUpdatedAt: current?.lastUpdatedAt ?? null,
terminalStatus: current?.terminalStatus ?? null,
availableActions: {
cancel: this.runningJobs.has(requestId) && this.controls.has(requestId),
remove: this.queuedJobs.has(requestId) && this.controls.has(requestId),
},
};
}
registerQueuedControl(requestId: string, control: RuntimeJobControl) {
this.controls.set(requestId, control);
}
registerRunningControl(requestId: string, control: RuntimeJobControl) {
this.controls.set(requestId, control);
}
enqueueJob(args: {
sessionId: string;
requestId: string;
mode: ChatRuntimeJobMode;
text: string;
}) {
const existingRunning = this.runningJobs.get(args.requestId);
if (existingRunning) {
return existingRunning;
}
const item: RuntimeJobRecord = {
requestId: args.requestId,
sessionId: args.sessionId,
mode: args.mode,
status: 'queued',
summary: summarizeText(args.text),
enqueuedAt: nowIso(),
startedAt: null,
pid: null,
logs: ['큐에 등록되었습니다.'],
lastUpdatedAt: nowIso(),
terminalStatus: null,
};
this.archivedJobs.delete(args.requestId);
this.queuedJobs.set(args.requestId, item);
this.emit();
return item;
}
startJob(args: {
sessionId: string;
requestId: string;
mode: ChatRuntimeJobMode;
text: string;
pid?: number | null;
}) {
const queuedItem = this.queuedJobs.get(args.requestId);
const runningItem: RuntimeJobRecord = {
requestId: args.requestId,
sessionId: args.sessionId,
mode: args.mode,
status: 'running',
summary: summarizeText(args.text),
enqueuedAt: queuedItem?.enqueuedAt ?? nowIso(),
startedAt: nowIso(),
pid: args.pid == null ? null : Math.round(args.pid),
logs: queuedItem?.logs ?? [],
lastUpdatedAt: nowIso(),
terminalStatus: null,
};
runningItem.logs = [...runningItem.logs, '실행이 시작되었습니다.'].slice(-MAX_LOG_LINES);
this.queuedJobs.delete(args.requestId);
this.archivedJobs.delete(args.requestId);
this.runningJobs.set(args.requestId, runningItem);
this.emit();
return runningItem;
}
attachProcess(requestId: string, pid?: number | null) {
const current = this.runningJobs.get(requestId);
if (!current || pid == null) {
return current ?? null;
}
const next: RuntimeJobRecord = {
...current,
pid: Math.round(pid),
lastUpdatedAt: nowIso(),
logs: [...current.logs, `프로세스가 연결되었습니다. pid=${Math.round(pid)}`].slice(-MAX_LOG_LINES),
};
this.runningJobs.set(requestId, next);
this.emit();
return next;
}
appendLog(requestId: string, line: string) {
const normalizedLine = normalizeLogLine(line);
if (!normalizedLine) {
return;
}
const current = this.runningJobs.get(requestId) ?? this.queuedJobs.get(requestId) ?? this.archivedJobs.get(requestId);
if (!current) {
return;
}
const next: RuntimeJobRecord = {
...current,
logs: [...current.logs, normalizedLine].slice(-MAX_LOG_LINES),
lastUpdatedAt: nowIso(),
};
if (this.runningJobs.has(requestId)) {
this.runningJobs.set(requestId, next);
} else if (this.queuedJobs.has(requestId)) {
this.queuedJobs.set(requestId, next);
} else {
this.archivedJobs.set(requestId, next);
}
this.emit();
}
async cancelJob(requestId: string) {
const control = this.controls.get(requestId);
if (!this.runningJobs.has(requestId) || !control?.cancel) {
return false;
}
const result = await control.cancel();
return result === true;
}
async removeQueuedJob(requestId: string) {
const control = this.controls.get(requestId);
if (!this.queuedJobs.has(requestId) || !control?.remove) {
return false;
}
const result = await control.remove();
return result === true;
}
finishJob(requestId: string, terminalStatus: ChatRuntimeTerminalStatus = 'completed') {
const removedRunning = this.runningJobs.get(requestId);
const removedQueued = this.queuedJobs.get(requestId);
const removed = removedRunning ?? removedQueued ?? null;
this.runningJobs.delete(requestId);
this.queuedJobs.delete(requestId);
this.controls.delete(requestId);
if (!removed) {
return;
}
const archived: RuntimeJobRecord = {
...removed,
lastUpdatedAt: nowIso(),
terminalStatus,
logs: [...removed.logs, this.buildTerminalLog(terminalStatus)].slice(-MAX_LOG_LINES),
};
this.archivedJobs.delete(requestId);
this.archivedJobs.set(requestId, archived);
this.trimArchivedJobs();
this.emit();
}
clearAll() {
if (
this.runningJobs.size === 0 &&
this.queuedJobs.size === 0 &&
this.archivedJobs.size === 0 &&
this.controls.size === 0
) {
return;
}
this.runningJobs.clear();
this.queuedJobs.clear();
this.archivedJobs.clear();
this.controls.clear();
this.emit();
}
private buildTerminalLog(status: ChatRuntimeTerminalStatus) {
if (status === 'completed') {
return '실행이 완료되었습니다.';
}
if (status === 'failed') {
return '실행이 실패로 종료되었습니다.';
}
if (status === 'cancelled') {
return '실행이 강제 취소되었습니다.';
}
return '대기열에서 제거되었습니다.';
}
private trimArchivedJobs() {
while (this.archivedJobs.size > MAX_ARCHIVED_JOBS) {
const firstKey = this.archivedJobs.keys().next().value;
if (!firstKey) {
return;
}
this.archivedJobs.delete(firstKey);
}
}
private emit() {
const snapshot = this.getSnapshot();
this.subscribers.forEach((listener) => {
listener(snapshot);
});
}
}
export const chatRuntimeService = new ChatRuntimeService();