feat: update codex live chat workflow

This commit is contained in:
2026-04-22 20:00:38 +09:00
parent 9e4b70f1f1
commit b0b9980a6c
70 changed files with 5178 additions and 2401 deletions

View File

@@ -22,6 +22,7 @@ import {
updateChatConversationContext,
} from './chat-room-service.js';
import { chatRuntimeService, type ChatRuntimeJobDetail, type ChatRuntimeSnapshot } from './chat-runtime-service.js';
import { hasErrorLogViewAccessToken } from './error-log-service.js';
import { WEB_PUSH_SUBSCRIPTION_TABLE } from './notification-service.js';
import { createNotificationMessage } from './notification-message-service.js';
import {
@@ -162,6 +163,7 @@ type ChatSessionState = {
clientId: string | null;
socket: WebSocket | null;
lastSeenAt: number;
isDeleted: boolean;
context: ChatContext | null;
queue: Array<{
requestId: string;
@@ -188,12 +190,17 @@ type ActiveChatExecution = {
};
let activeRuntimeController: ChatRuntimeController | null = null;
let activeChatService: ChatService | null = null;
const activeChatProcessRegistry = new Map<string, ActiveChatExecution>();
export function getChatRuntimeController() {
return activeRuntimeController;
}
export function getActiveChatService() {
return activeChatService;
}
const SOCKET_PATH = '/ws/chat';
const KST_TIME_ZONE = 'Asia/Seoul';
const STREAM_CAPTURE_LIMIT = 256 * 1024;
@@ -275,8 +282,11 @@ function buildChatNotificationTargetUrl(context: ChatContext | null, sessionId:
try {
const targetUrl = new URL(pageUrl);
targetUrl.pathname = '/chat/live';
targetUrl.searchParams.set('topMenu', targetUrl.searchParams.get('topMenu') || 'chat');
targetUrl.searchParams.set('sessionId', sessionId);
targetUrl.searchParams.delete('chatView');
targetUrl.searchParams.delete('runtimeRequestId');
return targetUrl.toString();
} catch {
return fallbackUrl.toString();
@@ -397,6 +407,15 @@ function createRequestId() {
return `chat-${Date.now().toString(36)}-${Math.random().toString(36).slice(2, 8)}`;
}
function hasAuthorizedChatSocketAccess(request: IncomingMessage, url: URL) {
const queryToken = url.searchParams.get('accessToken')?.trim();
const headerToken = Array.isArray(request.headers['x-access-token'])
? String(request.headers['x-access-token'][0] ?? '').trim()
: String(request.headers['x-access-token'] ?? '').trim();
return hasErrorLogViewAccessToken(queryToken || headerToken);
}
function hashRequestId(value: string) {
let hash = 0;
@@ -454,13 +473,19 @@ function isSocketOpen(socket: WebSocket | null | undefined) {
return Boolean(socket && socket.readyState === SOCKET_READY_STATE_OPEN);
}
function closeSocketSafely(logger: FastifyBaseLogger, socket: WebSocket | null | undefined, message: string) {
function closeSocketSafely(
logger: FastifyBaseLogger,
socket: WebSocket | null | undefined,
message: string,
code = 1000,
reason = 'replaced',
) {
if (!socket) {
return;
}
try {
socket.close();
socket.close(code, reason);
} catch (error) {
logger.warn(error, message);
}
@@ -1293,15 +1318,8 @@ function appendDiffResourceLinks(output: string, diffUrls: string[]) {
return output;
}
const lines = ['diff 리소스 경로:'];
if (uniqueUrls.length === 1) {
lines.push(uniqueUrls[0]!);
} else {
lines.push(...uniqueUrls.map((url, index) => `${index + 1}. ${url}`));
}
return `${output}\n\n${lines.join('\n')}`;
const hiddenPreviewTags = uniqueUrls.map((url) => `[[preview:${url}]]`).join('\n');
return `${output}\n\n${hiddenPreviewTags}`;
}
export async function rewriteCodexOutputWithChatResources(output: string, repoPath: string, sessionId: string) {
@@ -1452,18 +1470,22 @@ function buildAgenticCodexPrompt(
'응답 규칙:',
'- 사실성보다 추측을 우선하지 마세요. 오늘/최신/건수 질문은 직접 확인하세요.',
'- 코드 수정이 필요 없는 질문이면 파일을 수정하지 마세요.',
'- 첨부파일 경로, 세션 리소스 경로, preview URL은 본문에 장황하게 나열하지 말고 꼭 필요한 설명만 남기세요.',
'- 코드 수정을 했다면 최종 답변에 변경 이력을 기본으로 ```diff 코드블록으로 포함하세요.',
'- 코드 수정이 있으면 마지막에 변경한 파일 경로를 짧게 적으세요.',
'- 한국어로 간결하게 답하세요.',
'',
'현재 화면 문맥:',
'채팅 유형 문맥(우선 적용):',
`- chatTypeLabel: ${context?.chatTypeLabel ?? '없음'}`,
`- chatTypeDescription: ${context?.chatTypeDescription ?? '없음'}`,
`- chatTypeIsTemplate: ${isTemplateRequest ? 'true' : 'false'}`,
'- 답변 스타일과 기본 문맥은 반드시 채팅 유형 정보만 기준으로 적용하세요.',
'',
'참고 화면 정보:',
`- pageTitle: ${context?.pageTitle ?? '없음'}`,
`- topMenu: ${context?.topMenu ?? '없음'}`,
`- focusedComponentId: ${context?.focusedComponentId ?? '없음'}`,
`- pageUrl: ${context?.pageUrl ?? '없음'}`,
`- chatTypeLabel: ${context?.chatTypeLabel ?? '없음'}`,
`- chatTypeDescription: ${context?.chatTypeDescription ?? '없음'}`,
`- chatTypeIsTemplate: ${isTemplateRequest ? 'true' : 'false'}`,
'',
isTemplateRequest ? '템플릿 요청 규칙:' : '최근 대화 문맥:',
...(isTemplateRequest
@@ -1627,7 +1649,7 @@ async function runAgenticCodexReply(
onProgress?: (text: string) => void,
onActivity?: (line: string) => void,
) {
const repoPath = env.PLAN_MAIN_PROJECT_REPO_PATH || env.PLAN_GIT_REPO_PATH;
const repoPath = env.SERVER_COMMAND_MAIN_PROJECT_ROOT || env.PLAN_MAIN_PROJECT_REPO_PATH || env.PLAN_GIT_REPO_PATH;
await validateAgenticCodexRuntime(repoPath, env.PLAN_CODEX_BIN);
const appConfig = await getAppConfigSnapshot();
const recentHistory =
@@ -2109,60 +2131,7 @@ async function buildCodexReply(
onProgress?: (text: string) => void,
onActivity?: (line: string) => void,
) {
const normalized = input.toLowerCase();
if (isAutomationRegistrationCountRequest(input)) {
return buildAutomationRegistrationCountReply();
}
if (isAutomationRegistrationDefinitionRequest(input)) {
return buildAutomationRegistrationDefinitionReply();
}
if (shouldUseAgenticCodexReply(input)) {
return runAgenticCodexReply(context, input, sessionId, requestId, onProgress, onActivity);
}
const requestsPlanContext =
isWorklogRequest(input) ||
isPlanDetailRequest(input) ||
normalized.includes('preview') ||
normalized.includes('링크') ||
normalized.includes('url') ||
input.includes('변경') ||
input.includes('스크린샷');
const parsedPlanContext = parsePlanContext(context, input);
let snapshot = parsedPlanContext.planId ? await loadPlanSnapshot(parsedPlanContext.planId) : null;
if (!snapshot && parsedPlanContext.workId) {
const planItem = await findPlanItemByWorkId(parsedPlanContext.workId);
snapshot = planItem?.id ? await loadPlanSnapshot(Number(planItem.id)) : null;
}
if (!snapshot && parsedPlanContext.previewUrl) {
const planItem = await findPlanItemByPreviewUrl(parsedPlanContext.previewUrl);
snapshot = planItem?.id ? await loadPlanSnapshot(Number(planItem.id)) : null;
}
if (!snapshot && requestsPlanContext) {
const latestPlanItem = await findLatestPlanItem();
snapshot = latestPlanItem?.id ? await loadPlanSnapshot(Number(latestPlanItem.id)) : null;
}
const isPlanPage =
context?.topMenu === 'plans' ||
context?.pageId?.startsWith('plans:') ||
isPlanDetailRequest(input);
if (isPlanPage && snapshot) {
return buildPlanReply(context, input, snapshot);
}
if (!shouldUseTemplateMacroReply(context, input)) {
return runAgenticCodexReply(context, input, sessionId, requestId, onProgress, onActivity);
}
return buildGenericReply(context, input, snapshot);
return runAgenticCodexReply(context, input, sessionId, requestId, onProgress, onActivity);
}
export class ChatService {
@@ -2174,6 +2143,7 @@ export class ChatService {
private readonly unsubscribeRuntimeBroadcast: () => void;
constructor(private readonly logger: FastifyBaseLogger) {
activeChatService = this;
activeRuntimeController = {
getJobDetail: (requestId) => chatRuntimeService.getJobDetail(requestId),
cancelJob: (requestId) => this.cancelRuntimeJob(requestId),
@@ -2215,6 +2185,9 @@ export class ChatService {
close() {
activeRuntimeController = null;
if (activeChatService === this) {
activeChatService = null;
}
this.unsubscribeRuntimeBroadcast();
for (const execution of activeChatProcessRegistry.values()) {
@@ -2248,6 +2221,7 @@ export class ChatService {
clientId: clientId?.trim() || null,
socket: null,
lastSeenAt: Date.now(),
isDeleted: false,
context: null,
queue: [],
activeRequestCount: 0,
@@ -2282,6 +2256,10 @@ export class ChatService {
}
private persistConversationMessage(session: ChatSessionState, message: ChatMessage) {
if (session.isDeleted) {
return Promise.resolve();
}
const nextPersistence = session.messagePersistenceTail
.catch(() => undefined)
.then(() =>
@@ -2319,6 +2297,10 @@ export class ChatService {
skipOfflineNotification?: boolean;
},
) {
if (session.isDeleted) {
return this.createSessionEnvelope(session, message);
}
const envelope = this.createSessionEnvelope(session, message);
this.retainEnvelopeForReplay(session, envelope);
@@ -2739,7 +2721,7 @@ export class ChatService {
}
private replaySessionHistory(session: ChatSessionState, lastEventId: number) {
if (!Number.isFinite(lastEventId) || lastEventId <= 0 || session.eventHistory.length === 0) {
if (session.isDeleted || !Number.isFinite(lastEventId) || lastEventId <= 0 || session.eventHistory.length === 0) {
return;
}
@@ -2751,6 +2733,10 @@ export class ChatService {
}
private async initializeSession(session: ChatSessionState) {
if (session.isDeleted) {
return;
}
await session.messagePersistenceTail.catch(() => undefined);
const messages = await listChatConversationMessages(session.sessionId, { limit: 500 });
@@ -2782,6 +2768,12 @@ export class ChatService {
private async handleConnection(socket: WebSocket, request: IncomingMessage) {
const origin = request.headers.host ? `http://${request.headers.host}` : 'http://localhost';
const url = new URL(request.url ?? '/', origin);
if (!hasAuthorizedChatSocketAccess(request, url)) {
closeSocketSafely(this.logger, socket, 'failed to close unauthorized chat websocket session', 1008, 'unauthorized');
return;
}
const requestedSessionId = url.searchParams.get('sessionId')?.trim() || createRequestId();
const clientId = url.searchParams.get('clientId')?.trim() || null;
const lastEventIdRaw = Number(url.searchParams.get('lastEventId') ?? 0);
@@ -2819,6 +2811,62 @@ export class ChatService {
this.replaySessionHistory(session, lastEventId);
}
async forgetSession(sessionId: string) {
const normalizedSessionId = sessionId.trim();
if (!normalizedSessionId) {
return;
}
const session = this.sessions.get(normalizedSessionId);
const runtimeSnapshot = chatRuntimeService.getSnapshot();
const runtimeRequestIds = new Set(
[...runtimeSnapshot.running, ...runtimeSnapshot.queued, ...runtimeSnapshot.recent]
.filter((item) => item.sessionId === normalizedSessionId)
.map((item) => item.requestId),
);
if (session) {
session.isDeleted = true;
session.queue = [];
session.eventHistory = [];
session.pendingQueueReleaseEventId = null;
session.watchedRuntimeRequestId = null;
session.activeRequestCount = 0;
if (session.socket) {
this.clientStates.delete(session.socket);
closeSocketSafely(this.logger, session.socket, 'failed to close deleted chat websocket session');
session.socket = null;
}
this.sessions.delete(normalizedSessionId);
}
for (const requestId of runtimeRequestIds) {
const detail = chatRuntimeService.getJobDetail(requestId);
if (detail.availableActions.cancel) {
try {
await this.cancelRuntimeJob(requestId);
} catch {
// ignore and hard-clear runtime state below
}
} else if (detail.availableActions.remove) {
try {
await this.removeQueuedRuntimeJob(requestId);
} catch {
// ignore and hard-clear runtime state below
}
}
activeChatProcessRegistry.delete(requestId);
this.cancelledRequestIds.delete(requestId);
}
chatRuntimeService.clearSession(normalizedSessionId);
}
private handleMessage(socket: WebSocket, raw: RawData) {
try {
const message = JSON.parse(raw.toString()) as ChatInboundMessage;