Files
ai-code-app/etc/servers/work-server/src/services/chat-room-service.ts

3713 lines
127 KiB
TypeScript

import { z } from 'zod';
import { db } from '../db/client.js';
import { chatRuntimeService } from './chat-runtime-service.js';
import { parseChatMessageParts, stringifyChatMessageParts, type ChatMessagePart } from './chat-message-parts.js';
import { NOTIFICATION_TOKEN_TABLE, WEB_PUSH_SUBSCRIPTION_TABLE } from './notification-service.js';
import { cleanupNotificationMessagesForStaleTargetClients } from './notification-message-service.js';
export const CHAT_CONVERSATION_TABLE = 'chat_conversations';
export const CHAT_CONVERSATION_MESSAGE_TABLE = 'chat_conversation_messages';
export const CHAT_CONVERSATION_CLIENT_TABLE = 'chat_conversation_clients';
export const CHAT_CONVERSATION_REQUEST_TABLE = 'chat_conversation_requests';
export const CHAT_CONVERSATION_ACTIVITY_TABLE = 'chat_conversation_request_activities';
export const CHAT_CONVERSATION_SOURCE_CHANGE_TABLE = 'chat_conversation_source_changes';
const CHAT_ACTIVITY_MESSAGE_PREFIX = '[[activity-log]]';
export const CHAT_CONTEXT_DESCRIPTION_MAX_LENGTH = 10000;
const STALE_CHAT_REQUEST_TIMEOUT_MS = 2 * 60 * 1000;
const CURRENT_SOURCE_PREFIXES = ['src/', 'docs/', 'public/', 'scripts/', 'etc/'] as const;
const conversationPayloadSchema = z.object({
sessionId: z.string().trim().min(1).max(120),
clientId: z.string().trim().max(120).nullable().optional(),
title: z.string().trim().max(200).nullable().optional(),
requestBadgeLabel: z.string().trim().max(120).nullable().optional(),
chatTypeId: z.string().trim().max(120).nullable().optional(),
lastChatTypeId: z.string().trim().max(120).nullable().optional(),
generalSectionName: z.string().trim().max(120).nullable().optional(),
contextLabel: z.string().trim().max(200).nullable().optional(),
contextDescription: z.string().trim().max(CHAT_CONTEXT_DESCRIPTION_MAX_LENGTH).nullable().optional(),
notifyOffline: z.boolean().optional(),
});
const conversationMessagePayloadSchema = z.object({
sessionId: z.string().trim().min(1).max(120),
messageId: z.number().int().positive(),
author: z.enum(['codex', 'system', 'user']),
text: z.string().max(200000),
timestamp: z.string().trim().max(40),
clientRequestId: z.string().trim().max(120).nullable().optional(),
parts: z.array(z.custom<ChatMessagePart>()).optional(),
});
export type ChatConversationItem = {
sessionId: string;
clientId: string | null;
title: string;
requestBadgeLabel: string | null;
chatTypeId: string | null;
lastChatTypeId: string | null;
generalSectionName: string | null;
contextLabel: string | null;
contextDescription: string | null;
notifyOffline: boolean;
hasUnreadResponse: boolean;
currentRequestId: string | null;
currentJobStatus: 'queued' | 'started' | 'completed' | 'failed' | null;
currentJobMessage: string | null;
currentQueueSize: number;
currentStatusUpdatedAt: string | null;
isPendingWork: boolean;
pendingWorkReason: 'prompt' | 'analysis' | 'design' | null;
lastRequestPreview: string;
lastMessagePreview: string;
lastResponsePreview: string;
createdAt: string;
updatedAt: string;
lastMessageAt: string | null;
};
export type StoredChatMessage = {
id: number;
author: 'codex' | 'system' | 'user';
text: string;
timestamp: string;
clientRequestId?: string | null;
parts?: ChatMessagePart[];
};
export type ChatConversationRequestStatus =
| 'accepted'
| 'queued'
| 'started'
| 'completed'
| 'failed'
| 'cancelled'
| 'removed';
export type ChatConversationRequestItem = {
sessionId: string;
requestId: string;
requesterClientId: string | null;
status: ChatConversationRequestStatus;
statusMessage: string | null;
userMessageId: number | null;
userText: string;
responseMessageId: number | null;
responseText: string;
hasResponse: boolean;
canDelete: boolean;
createdAt: string;
updatedAt: string;
answeredAt: string | null;
terminalAt: string | null;
};
export type ChatConversationActivityLogItem = {
sessionId: string;
requestId: string;
lines: string[];
updatedAt: string | null;
};
export type ChatSourceChangeSnapshotItem = {
id: string;
sessionId: string;
clientId: string | null;
conversationTitle: string;
chatTypeId: string | null;
chatTypeLabel: string;
requestId: string;
requestTitle: string;
questionText: string;
answerText: string;
status: ChatConversationRequestStatus;
sourceChangedAt: string;
updatedAt: string;
featureTags: string[];
changedFiles: string[];
currentSourceFiles: string[];
diffBlocks: string[];
hasSourceChanges: boolean;
reviewStatus: 'reviewed' | 'not-reviewed';
sourceChangeKind: 'request' | 'verification-group';
sourceEntryIds: string[];
conversationDeletedAt: string | null;
};
export type RecoverableChatConversationRequestItem = {
sessionId: string;
clientId: string | null;
chatTypeId: string | null;
lastChatTypeId: string | null;
generalSectionName: string | null;
contextLabel: string | null;
contextDescription: string | null;
currentRequestId: string | null;
currentJobStatus: ChatConversationItem['currentJobStatus'];
requestId: string;
status: ChatConversationRequestStatus;
userText: string;
createdAt: string;
};
export type ChatConversationDetailPage = {
messages: StoredChatMessage[];
requests: ChatConversationRequestItem[];
activityLogs: ChatConversationActivityLogItem[];
oldestLoadedMessageId: number | null;
hasOlderMessages: boolean;
};
type ChatConversationRequestStatusPatch = {
requestId: string;
status?: ChatConversationRequestStatus;
userMessageId?: number | null;
userText?: string | null;
responseMessageId?: number | null;
responseText?: string | null;
};
type ChatConversationResponseCandidate = {
id: number;
messageId: number;
author: StoredChatMessage['author'];
text: string;
clientRequestId: string | null;
createdAt: string | null;
};
type ChatConversationClientPreference = {
sessionId: string;
clientId: string;
notifyOffline: boolean;
lastReadResponseMessageId: number | null;
};
type ChatConversationOfflineNotificationClientRow = {
clientId: string;
notifyOffline: boolean;
hasActivePushRegistration: boolean;
};
function normalizeDateTimeValue(value: unknown) {
if (value == null) {
return null;
}
if (value instanceof Date) {
return value.toISOString();
}
const normalized = String(value).trim();
if (!normalized) {
return null;
}
const parsed = new Date(normalized);
return Number.isNaN(parsed.getTime()) ? normalized : parsed.toISOString();
}
function parseStringArray(value: unknown) {
if (typeof value !== 'string') {
return [] as string[];
}
try {
const parsed = JSON.parse(value) as unknown;
if (!Array.isArray(parsed)) {
return [] as string[];
}
return parsed
.map((item) => String(item ?? '').trim())
.filter(Boolean);
} catch {
return [] as string[];
}
}
function stringifyStringArray(value: string[]) {
return JSON.stringify(
Array.from(
new Set(
value.map((item) => String(item ?? '').trim()).filter(Boolean),
),
),
);
}
function createPreview(text: string) {
const normalized = String(text ?? '').replace(/\s+/g, ' ').trim();
return normalized.length > 140 ? `${normalized.slice(0, 137).trimEnd()}...` : normalized;
}
function createCompactText(value: string | null | undefined, limit = 88) {
const normalized = String(value ?? '').replace(/\s+/g, ' ').trim();
if (!normalized) {
return '';
}
return normalized.length > limit ? `${normalized.slice(0, limit - 1)}` : normalized;
}
function normalizeClientIdSet(clientIds: Iterable<string | null | undefined>) {
return new Set(
Array.from(clientIds, (item) => String(item ?? '').trim()).filter(Boolean),
);
}
export function selectStaleOfflineNotificationClientIds(
rows: ChatConversationOfflineNotificationClientRow[],
options?: {
keepClientIds?: Iterable<string | null | undefined>;
},
) {
const keepClientIds = normalizeClientIdSet(options?.keepClientIds ?? []);
return rows
.filter((row) => {
if (!row.notifyOffline) {
return false;
}
if (!row.clientId) {
return false;
}
if (keepClientIds.has(row.clientId)) {
return false;
}
return row.hasActivePushRegistration !== true;
})
.map((row) => row.clientId);
}
const SOURCE_CHANGE_VERIFICATION_PATTERN = /^\s*\[\[source-change-verification:(.+?)\]\]\s*$/iu;
const PROMPT_PART_PATTERN = /^\s*\[\[prompt:(.+?)\]\]\s*$/iu;
type SourceChangeVerificationFeature = {
key: string;
label: string;
entryRefs: Array<{
sessionId: string;
requestId: string;
}>;
};
type SourceChangeVerificationMetadata = {
version: number;
features: SourceChangeVerificationFeature[];
};
function parseJsonRecord(value: string) {
try {
const parsed = JSON.parse(value) as unknown;
return parsed && typeof parsed === 'object' && !Array.isArray(parsed)
? parsed as Record<string, unknown>
: null;
} catch {
return null;
}
}
function parseSourceChangeVerificationMetadata(text: string) {
const lines = String(text ?? '').split('\n');
for (const line of lines) {
const matched = line.match(SOURCE_CHANGE_VERIFICATION_PATTERN);
if (!matched?.[1]) {
continue;
}
const record = parseJsonRecord(matched[1]);
if (!record) {
continue;
}
const features = Array.isArray(record.features)
? record.features.flatMap((feature) => {
if (!feature || typeof feature !== 'object' || Array.isArray(feature)) {
return [];
}
const featureRecord = feature as Record<string, unknown>;
const key = String(featureRecord.key ?? '').trim();
const label = String(featureRecord.label ?? '').trim();
const entryRefs = Array.isArray(featureRecord.entryRefs)
? featureRecord.entryRefs.flatMap((entryRef) => {
if (!entryRef || typeof entryRef !== 'object' || Array.isArray(entryRef)) {
return [];
}
const entryRefRecord = entryRef as Record<string, unknown>;
const sessionId = String(entryRefRecord.sessionId ?? '').trim();
const requestId = String(entryRefRecord.requestId ?? '').trim();
return sessionId && requestId ? [{ sessionId, requestId }] : [];
})
: [];
return key && label && entryRefs.length > 0 ? [{ key, label, entryRefs }] : [];
})
: [];
if (features.length === 0) {
continue;
}
return {
version: Number(record.version ?? 1) || 1,
features,
} satisfies SourceChangeVerificationMetadata;
}
return null;
}
function parseSelectedPromptValues(text: string) {
const selectedValues = new Set<string>();
const lines = String(text ?? '').split('\n');
for (const line of lines) {
const matched = line.match(PROMPT_PART_PATTERN);
if (!matched?.[1]) {
continue;
}
const record = parseJsonRecord(matched[1]);
if (!record) {
continue;
}
(Array.isArray(record.selectedValues) ? record.selectedValues : []).forEach((value) => {
const normalized = String(value ?? '').trim();
if (normalized) {
selectedValues.add(normalized);
}
});
}
return selectedValues;
}
function sanitizeSourceChangeGroupKey(value: string, fallback: string) {
const normalized = String(value ?? '')
.trim()
.toLowerCase()
.replace(/[^a-z0-9]+/g, '-')
.replace(/^-+|-+$/g, '')
.slice(0, 24);
return normalized || fallback;
}
function buildVerificationGroupRequestId(requestId: string, featureKey: string, index: number) {
const normalizedRequestId = String(requestId ?? '').trim() || 'verification';
const safeKey = sanitizeSourceChangeGroupKey(featureKey, `group-${index + 1}`);
const suffix = `::vf:${index + 1}:${safeKey}`;
const baseLimit = Math.max(1, 120 - suffix.length);
return `${normalizedRequestId.slice(0, baseLimit)}${suffix}`;
}
function createRequestTitle(userText: string, fallback: string) {
const compact = createCompactText(userText, 72);
return compact || fallback;
}
export function hasMeaningfulChatSourceArtifacts(snapshot: {
changedFiles?: string[] | null;
currentSourceFiles?: string[] | null;
diffBlocks?: string[] | null;
}) {
return [snapshot.changedFiles, snapshot.currentSourceFiles, snapshot.diffBlocks].some((items) =>
Array.isArray(items) && items.some((item) => typeof item === 'string' && item.trim().length > 0),
);
}
function extractDiffBlocks(text: string) {
return Array.from(text.matchAll(/```diff[^\n]*\n([\s\S]*?)```/g))
.map((match) => match[1]?.trim() ?? '')
.filter(Boolean);
}
function normalizeWorkspaceFilePath(value: string) {
const normalized = String(value ?? '')
.trim()
.replace(/\\/g, '/')
.replace(/^file:\/\//, '')
.replace(/[)>.,]+$/, '')
.replace(/:\d+(?::\d+)?$/, '');
if (!normalized) {
return '';
}
const resourceMarker = '/resource/';
const resourceIndex = normalized.lastIndexOf(resourceMarker);
if (resourceIndex >= 0) {
const innerPath = normalized.slice(resourceIndex + resourceMarker.length).replace(/^\/+/, '');
return innerPath;
}
const apiResourceMarker = '/api/chat/resources/';
const apiResourceIndex = normalized.lastIndexOf(apiResourceMarker);
if (apiResourceIndex >= 0) {
return normalized.slice(apiResourceIndex + apiResourceMarker.length).replace(/^\/+/, '');
}
const legacyWorkspaceMarker = '/workspace/main-project/';
const legacyWorkspaceIndex = normalized.lastIndexOf(legacyWorkspaceMarker);
if (legacyWorkspaceIndex >= 0) {
return normalized.slice(legacyWorkspaceIndex + legacyWorkspaceMarker.length);
}
for (const prefix of CURRENT_SOURCE_PREFIXES) {
const marker = `/${prefix}`;
const markerIndex = normalized.lastIndexOf(marker);
if (markerIndex >= 0) {
return normalized.slice(markerIndex + 1);
}
if (normalized.startsWith(prefix)) {
return normalized;
}
}
return normalized.replace(/^\/+/, '').replace(/^\.\//, '');
}
function isCurrentSourcePath(path: string) {
return CURRENT_SOURCE_PREFIXES.some((prefix) => path.startsWith(prefix));
}
function extractChangedFiles(text: string) {
const matches = Array.from(
text.matchAll(/^(?:diff --git a\/[^\s]+ b\/([^\s]+)|\+\+\+ b\/([^\s]+)|--- a\/([^\s]+))$/gm),
)
.flatMap((match) => [match[1], match[2], match[3]])
.filter((value): value is string => Boolean(value));
return Array.from(
new Set(
matches
.map((item) => normalizeWorkspaceFilePath(item))
.filter(Boolean),
),
).slice(0, 60);
}
function extractCurrentSourceFiles(text: string) {
const textWithoutChatResourcePaths = text
.replace(/\/api\/chat\/resources\/[^\s)`]+/g, ' ')
.replace(/\/?(?:public\/)?\.codex_chat\/[^\s)`]+\/resource\/[^\s)`]+/g, ' ');
const diffPathMatches = Array.from(
text.matchAll(/^(?:diff --git a\/[^\s]+ b\/([^\s]+)|\+\+\+ b\/([^\s]+)|--- a\/([^\s]+))$/gm),
)
.flatMap((match) => [match[1], match[2], match[3]])
.filter((value): value is string => Boolean(value))
.map((item) => normalizeWorkspaceFilePath(item))
.filter((path) => path && isCurrentSourcePath(path));
const workspacePathMatches = [
...(text.match(/\[[^\]]*]\((\/[^)\s]+)\)/g) ?? []).map((item) => item.replace(/^[^\(]*\(/, '').replace(/\)$/, '')),
...(text.match(/\/(?:[^/\s)]+\/)*(?:src|docs|etc|public|scripts)\/[^\s)`]+/g) ?? []),
]
.map((item) => normalizeWorkspaceFilePath(item))
.filter((path) => path && isCurrentSourcePath(path));
const directRelativeMatches = (textWithoutChatResourcePaths.match(/\b(?:src|docs|etc|public|scripts)\/[A-Za-z0-9._\-\/]+(?:\.[A-Za-z0-9]+)?\b/g) ?? [])
.map((item) => normalizeWorkspaceFilePath(item))
.filter((path) => path && isCurrentSourcePath(path));
return Array.from(new Set([...diffPathMatches, ...workspacePathMatches, ...directRelativeMatches])).slice(0, 60);
}
function deriveFeatureTags(files: string[]) {
const tags = new Set<string>();
files.forEach((file) => {
const segments = file.split('/').filter(Boolean);
if (segments[0] === 'src' && segments[1] === 'features' && segments[2]) {
tags.add(`feature:${segments[2]}`);
return;
}
if (segments[0] === 'src' && segments[1] === 'components' && segments[2]) {
tags.add(`component:${segments[2]}`);
return;
}
if (segments[0] === 'src' && segments[1] === 'widgets' && segments[2]) {
tags.add(`widget:${segments[2]}`);
return;
}
if (segments[0] === 'docs' && segments[1]) {
tags.add(`docs:${segments[1]}`);
return;
}
if (segments[0]) {
tags.add(segments[0]);
}
});
return Array.from(tags);
}
const PENDING_WORK_ANALYSIS_PATTERNS = [
//u,
//u,
//u,
//u,
//u,
/\banalysis\b/i,
/\binvestigat(?:e|ion)\b/i,
] as const;
const PENDING_WORK_DESIGN_PATTERNS = [
//u,
//u,
//u,
//u,
//u,
//u,
//u,
//u,
/\bdesign\b/i,
/\barchitecture\b/i,
] as const;
const PENDING_WORK_IMPLEMENTATION_PATTERNS = [
//u,
//u,
//u,
//u,
//u,
//u,
//u,
//u,
/.*/u,
/.*/u,
//u,
/preview/iu,
/ /u,
/diff/u,
/\bimplement(?:ed|ation)?\b/i,
/\bfix(?:ed)?\b/i,
/\bverified?\b/i,
/\btested?\b/i,
] as const;
const PENDING_WORK_RESPONSE_HOLD_PATTERNS = [
//u,
//u,
/(?:|)/u,
/ /u,
//u,
//u,
//u,
/\bif you want\b/i,
/\bnext step\b/i,
] as const;
function normalizePendingWorkText(text: string | null | undefined) {
return String(text ?? '').replace(/\s+/g, ' ').trim();
}
function hasPendingWorkPattern(text: string, patterns: readonly RegExp[]) {
return patterns.some((pattern) => pattern.test(text));
}
function resolvePendingWorkReasonFromText(text: string) {
if (!text) {
return null;
}
if (hasPendingWorkPattern(text, PENDING_WORK_DESIGN_PATTERNS)) {
return 'design' as const;
}
if (hasPendingWorkPattern(text, PENDING_WORK_ANALYSIS_PATTERNS)) {
return 'analysis' as const;
}
return null;
}
function hasOpenPromptParts(parts: ChatMessagePart[] | undefined) {
return (parts ?? []).some((part) => {
if (part.type !== 'prompt' || part.readOnly === true) {
return false;
}
if ((part.selectedValues?.length ?? 0) > 0) {
return false;
}
if ((part.resultText?.trim() ?? '').length > 0) {
return false;
}
if ((part.resolvedAt?.trim() ?? '').length > 0 || part.resolvedBy != null) {
return false;
}
return true;
});
}
function resolvePendingWorkState(args: {
requestText?: string | null;
responseText?: string | null;
latestCodexParts?: ChatMessagePart[] | undefined;
}) {
if (hasOpenPromptParts(args.latestCodexParts)) {
return {
isPendingWork: true,
pendingWorkReason: 'prompt' as const,
};
}
const requestText = normalizePendingWorkText(args.requestText);
const responseText = normalizePendingWorkText(args.responseText);
const requestReason = resolvePendingWorkReasonFromText(requestText);
if (!requestReason) {
return {
isPendingWork: false,
pendingWorkReason: null,
};
}
if (hasPendingWorkPattern(responseText, PENDING_WORK_IMPLEMENTATION_PATTERNS)) {
return {
isPendingWork: false,
pendingWorkReason: null,
};
}
if (!responseText) {
return {
isPendingWork: true,
pendingWorkReason: requestReason,
};
}
const responseReason = resolvePendingWorkReasonFromText(responseText);
if (responseReason || hasPendingWorkPattern(responseText, PENDING_WORK_RESPONSE_HOLD_PATTERNS)) {
return {
isPendingWork: true,
pendingWorkReason: responseReason ?? requestReason,
};
}
return {
isPendingWork: false,
pendingWorkReason: null,
};
}
const CONTEXT_DEPENDENT_REQUEST_PATTERNS = [
/\s*(||)/u,
/\s*/u,
/\s*/u,
//u,
/\s*/u,
/\s*\s*(|badge|)/iu,
/(?:|||||||)/u,
/^(?:||||||||||||||)/u,
/\b(?:also|continue|continued|follow[\s-]?up|same|again)\b/i,
] as const;
function normalizeRequestPreviewText(text: string) {
return String(text ?? '').replace(/\s+/g, ' ').trim();
}
function isContextDependentRequestPreview(text: string) {
const normalized = normalizeRequestPreviewText(text);
if (!normalized) {
return false;
}
if (CONTEXT_DEPENDENT_REQUEST_PATTERNS.some((pattern) => pattern.test(normalized))) {
return true;
}
if (normalized.length <= 16) {
return true;
}
return false;
}
function buildLatestRequestPreview(
requests: Array<{ text: string; createdAt: string | null }>,
): { text: string; createdAt: string | null } | null {
const normalizedRequests = requests
.map((request) => ({
text: normalizeRequestPreviewText(request.text),
createdAt: request.createdAt,
}))
.filter((request) => Boolean(request.text));
const latestRequest = normalizedRequests[0];
if (!latestRequest) {
return null;
}
if (!isContextDependentRequestPreview(latestRequest.text)) {
return latestRequest;
}
const previousRequest =
normalizedRequests.slice(1).find((request) => !isContextDependentRequestPreview(request.text)) ??
normalizedRequests[1] ??
null;
if (!previousRequest) {
return latestRequest;
}
return {
text: `${previousRequest.text} ${latestRequest.text}`.trim(),
createdAt: latestRequest.createdAt,
};
}
function isPreviewableConversationMessage(row: { author?: unknown; text?: unknown }) {
const author = String(row.author ?? '');
const text = String(row.text ?? '').trim();
if (!text) {
return false;
}
if (author === 'system' && text.startsWith(`${CHAT_ACTIVITY_MESSAGE_PREFIX}\n`)) {
return false;
}
return author === 'user' || author === 'codex';
}
function buildConversationTitle(text: string) {
const preview = createPreview(text);
return preview || '새 대화';
}
function mapConversationRow(row: Record<string, unknown>): ChatConversationItem {
return {
sessionId: String(row.session_id ?? ''),
clientId: row.client_id == null ? null : String(row.client_id),
title: String(row.title ?? '새 대화'),
requestBadgeLabel: row.request_badge_label == null ? null : String(row.request_badge_label),
chatTypeId: row.chat_type_id == null ? null : String(row.chat_type_id),
lastChatTypeId: row.last_chat_type_id == null ? null : String(row.last_chat_type_id),
generalSectionName: row.general_section_name == null ? null : String(row.general_section_name),
contextLabel: row.context_label == null ? null : String(row.context_label),
contextDescription: row.context_description == null ? null : String(row.context_description),
notifyOffline: Boolean(row.notify_offline),
hasUnreadResponse: Boolean(row.has_unread_response),
currentRequestId: row.current_request_id == null ? null : String(row.current_request_id),
currentJobStatus: row.current_job_status == null ? null : String(row.current_job_status) as ChatConversationItem['currentJobStatus'],
currentJobMessage: row.current_job_message == null ? null : String(row.current_job_message),
currentQueueSize: Number(row.current_queue_size ?? 0),
currentStatusUpdatedAt: normalizeDateTimeValue(row.current_status_updated_at),
isPendingWork: false,
pendingWorkReason: null,
lastRequestPreview: '',
lastMessagePreview: String(row.last_message_preview ?? ''),
lastResponsePreview: '',
createdAt: normalizeDateTimeValue(row.created_at) ?? '',
updatedAt: normalizeDateTimeValue(row.updated_at) ?? '',
lastMessageAt: normalizeDateTimeValue(row.last_message_at),
};
}
function mapMessageRow(row: Record<string, unknown>): StoredChatMessage {
return {
id: Number(row.message_id ?? row.id ?? 0),
author: String(row.author ?? 'codex') as StoredChatMessage['author'],
text: String(row.text ?? ''),
timestamp: String(row.display_timestamp ?? ''),
clientRequestId: row.client_request_id == null ? null : String(row.client_request_id),
parts: parseChatMessageParts(row.parts_json),
};
}
export function isVisibleConversationMessage(message: StoredChatMessage) {
if (message.author !== 'system') {
return true;
}
return message.text.startsWith(`${CHAT_ACTIVITY_MESSAGE_PREFIX}\n`);
}
function applyVisibleConversationMessageCondition(builder: any) {
builder.whereNot('author', 'system').orWhere((nestedBuilder: any) => {
nestedBuilder.where('author', '=', 'system').andWhere('text', 'like', `${CHAT_ACTIVITY_MESSAGE_PREFIX}\n%`);
});
}
function mapClientPreferenceRow(row: Record<string, unknown>): ChatConversationClientPreference {
return {
sessionId: String(row.session_id ?? ''),
clientId: String(row.client_id ?? ''),
notifyOffline: Boolean(row.notify_offline),
lastReadResponseMessageId:
row.last_read_response_message_id == null ? null : Number(row.last_read_response_message_id),
};
}
function mapRequestRow(row: Record<string, unknown>): ChatConversationRequestItem {
const status = String(row.status ?? 'accepted') as ChatConversationRequestStatus;
const hasResponse = row.response_message_id != null || String(row.response_text ?? '').trim().length > 0;
const canDelete = !hasResponse && !['queued', 'started', 'completed'].includes(status);
return {
sessionId: String(row.session_id ?? ''),
requestId: String(row.request_id ?? ''),
requesterClientId: row.requester_client_id == null ? null : String(row.requester_client_id),
status,
statusMessage: row.status_message == null ? null : String(row.status_message),
userMessageId: row.user_message_id == null ? null : Number(row.user_message_id),
userText: String(row.user_text ?? ''),
responseMessageId: row.response_message_id == null ? null : Number(row.response_message_id),
responseText: String(row.response_text ?? ''),
hasResponse,
canDelete,
createdAt: normalizeDateTimeValue(row.created_at) ?? '',
updatedAt: normalizeDateTimeValue(row.updated_at) ?? '',
answeredAt: normalizeDateTimeValue(row.answered_at),
terminalAt: normalizeDateTimeValue(row.terminal_at),
};
}
function mapSourceChangeSnapshotRow(row: Record<string, unknown>): ChatSourceChangeSnapshotItem {
const sessionId = String(row.session_id ?? '').trim();
const requestId = String(row.request_id ?? '').trim();
return {
id: `${sessionId}:${requestId}`,
sessionId,
clientId: row.client_id == null ? null : String(row.client_id),
conversationTitle: String(row.conversation_title ?? '새 대화'),
chatTypeId: row.chat_type_id == null ? null : String(row.chat_type_id),
chatTypeLabel: row.chat_type_label == null ? '' : String(row.chat_type_label),
requestId,
requestTitle: String(row.request_title ?? requestId),
questionText: String(row.question_text ?? ''),
answerText: String(row.answer_text ?? ''),
status: String(row.status ?? 'completed') as ChatConversationRequestStatus,
sourceChangedAt: normalizeDateTimeValue(row.source_changed_at) ?? normalizeDateTimeValue(row.answered_at) ?? '',
updatedAt: normalizeDateTimeValue(row.updated_at) ?? '',
featureTags: parseStringArray(row.feature_tags_json),
changedFiles: parseStringArray(row.changed_files_json),
currentSourceFiles: parseStringArray(row.current_source_files_json),
diffBlocks: parseStringArray(row.diff_blocks_json),
hasSourceChanges: Boolean(row.has_source_changes),
reviewStatus: String(row.review_status ?? 'not-reviewed') === 'reviewed' ? 'reviewed' : 'not-reviewed',
sourceChangeKind: String(row.source_change_kind ?? 'request') === 'verification-group' ? 'verification-group' : 'request',
sourceEntryIds: parseStringArray(row.source_entry_ids_json),
conversationDeletedAt: normalizeDateTimeValue(row.conversation_deleted_at),
};
}
function buildSourceChangeSnapshotPayload(args: {
sessionId: string;
clientId?: string | null;
conversationTitle?: string | null;
chatTypeId?: string | null;
chatTypeLabel?: string | null;
requestId: string;
status: ChatConversationRequestStatus;
questionText: string;
answerText: string;
answeredAt?: string | null;
updatedAt?: string | null;
reviewStatus?: 'reviewed' | 'not-reviewed';
sourceChangeKind?: 'request' | 'verification-group';
sourceEntryIds?: string[];
}) {
const changedFiles = extractChangedFiles(args.answerText);
const diffBlocks = extractDiffBlocks(args.answerText);
const currentSourceFiles = extractCurrentSourceFiles(args.answerText);
const featureTags = deriveFeatureTags(changedFiles);
const hasSourceChanges = hasMeaningfulChatSourceArtifacts({
changedFiles,
currentSourceFiles,
diffBlocks,
});
const sourceChangedAt = args.answeredAt?.trim() || args.updatedAt?.trim() || new Date().toISOString();
return {
session_id: args.sessionId,
client_id: normalizeClientId(args.clientId),
request_id: args.requestId,
conversation_title: args.conversationTitle?.trim() || '새 대화',
chat_type_id: args.chatTypeId?.trim() || null,
chat_type_label: args.chatTypeLabel?.trim() || null,
request_title: createRequestTitle(args.questionText, args.requestId),
question_text: args.questionText,
answer_text: args.answerText,
status: args.status,
answered_at: args.answeredAt?.trim() || null,
source_changed_at: sourceChangedAt,
feature_tags_json: stringifyStringArray(featureTags),
changed_files_json: stringifyStringArray(changedFiles),
current_source_files_json: stringifyStringArray(currentSourceFiles),
diff_blocks_json: stringifyStringArray(diffBlocks),
has_source_changes: hasSourceChanges,
review_status: args.reviewStatus === 'reviewed' ? 'reviewed' : 'not-reviewed',
source_change_kind: args.sourceChangeKind === 'verification-group' ? 'verification-group' : 'request',
source_entry_ids_json: stringifyStringArray(args.sourceEntryIds ?? []),
conversation_deleted_at: null,
updated_at: db.fn.now(),
};
}
function normalizeClientId(clientId?: string | null) {
return clientId?.trim() || null;
}
function getTimeValue(value: string | null | undefined) {
if (!value) {
return 0;
}
const timestamp = new Date(value).getTime();
return Number.isFinite(timestamp) ? timestamp : 0;
}
function isRuntimeRequestActive(requestId?: string | null) {
const normalizedRequestId = requestId?.trim() || null;
if (!normalizedRequestId) {
return false;
}
const detail = chatRuntimeService.getJobDetail(normalizedRequestId);
return detail.item != null && detail.terminalStatus == null;
}
function isTerminalRequestStatus(status: ChatConversationRequestStatus | null | undefined) {
return status === 'completed' || status === 'failed' || status === 'cancelled' || status === 'removed';
}
function isPreparingChatReplyText(text?: string | null) {
const normalized = String(text ?? '').replace(/\s+/g, ' ').trim();
return normalized.startsWith('응답을 준비하고 있습니다');
}
function hasStoredRequestResponse(request: {
responseMessageId?: number | null;
responseText?: string | null;
}) {
const normalizedResponseText = String(request.responseText ?? '').trim();
if (normalizedResponseText.length > 0) {
return !isPreparingChatReplyText(normalizedResponseText);
}
return request.responseMessageId != null;
}
function isConversationRequestActive(
conversation: {
current_request_id?: unknown;
current_job_status?: unknown;
} | null | undefined,
requestId?: string | null,
) {
const normalizedRequestId = requestId?.trim() || null;
if (!normalizedRequestId) {
return false;
}
const currentRequestId = String(conversation?.current_request_id ?? '').trim() || null;
const currentJobStatus = String(conversation?.current_job_status ?? '').trim();
if (currentRequestId !== normalizedRequestId) {
return false;
}
return currentJobStatus === 'queued' || currentJobStatus === 'started';
}
function hasConversationMetadata(
conversation: {
title?: unknown;
request_badge_label?: unknown;
chat_type_id?: unknown;
last_chat_type_id?: unknown;
general_section_name?: unknown;
context_label?: unknown;
context_description?: unknown;
current_request_id?: unknown;
current_job_status?: unknown;
} | null | undefined,
) {
return [
conversation?.title,
conversation?.request_badge_label,
conversation?.chat_type_id,
conversation?.last_chat_type_id,
conversation?.general_section_name,
conversation?.context_label,
conversation?.context_description,
conversation?.current_request_id,
conversation?.current_job_status,
].some((value) => String(value ?? '').trim().length > 0);
}
export function normalizeStaleRequestItem(
item: ChatConversationRequestItem,
conversation: {
current_request_id?: unknown;
current_job_status?: unknown;
current_status_updated_at?: unknown;
} | null | undefined,
) {
const runtimeActive = isRuntimeRequestActive(item.requestId);
if (
shouldClearConversationJobState({
currentRequestId: String(conversation?.current_request_id ?? ''),
currentJobStatus:
conversation?.current_job_status == null
? null
: String(conversation.current_job_status) as ChatConversationItem['currentJobStatus'],
currentStatusUpdatedAt:
conversation?.current_status_updated_at == null ? null : String(conversation.current_status_updated_at),
runtimeActive,
request: item,
})
) {
return {
...item,
status: 'failed' as const,
statusMessage: item.statusMessage ?? '중단된 오래된 요청',
canDelete: true,
terminalAt: item.terminalAt ?? item.updatedAt,
};
}
return item;
}
export function shouldClearConversationJobState(params: {
currentRequestId?: string | null;
currentJobStatus?: ChatConversationItem['currentJobStatus'];
currentStatusUpdatedAt?: string | null;
runtimeActive?: boolean;
nowMs?: number;
request:
| {
requestId?: string | null;
status?: ChatConversationRequestStatus | null;
responseMessageId?: number | null;
responseText?: string | null;
terminalAt?: string | null;
updatedAt?: string | null;
}
| null
| undefined;
}) {
const currentJobStatus = params.currentJobStatus ?? null;
const requestStatus = params.request?.status ?? null;
const hasStoredResponse = hasStoredRequestResponse(params.request ?? {});
if (!currentJobStatus) {
return false;
}
const currentRequestId = params.currentRequestId?.trim() || null;
if (!currentRequestId) {
return true;
}
const requestId = params.request?.requestId?.trim() || null;
if (!requestId || requestId !== currentRequestId) {
return false;
}
const runtimeActive = params.runtimeActive === true;
const lastUpdatedAt = Math.max(
getTimeValue(params.currentStatusUpdatedAt),
getTimeValue(params.request?.updatedAt),
);
const nowMs = Number.isFinite(params.nowMs) ? Number(params.nowMs) : Date.now();
const isStaleInProgressState =
!runtimeActive &&
(currentJobStatus === 'queued' || currentJobStatus === 'started') &&
!hasStoredRequestResponse(params.request ?? {}) &&
!isTerminalRequestStatus(params.request?.status ?? null) &&
lastUpdatedAt > 0 &&
nowMs - lastUpdatedAt >= STALE_CHAT_REQUEST_TIMEOUT_MS;
return (
(requestStatus != null && requestStatus !== 'completed' && isTerminalRequestStatus(requestStatus)) ||
hasStoredResponse ||
isStaleInProgressState
);
}
function getRequestStatusRank(status: ChatConversationRequestStatus | null | undefined) {
switch (status) {
case 'accepted':
return 0;
case 'queued':
return 1;
case 'started':
return 2;
case 'completed':
case 'failed':
case 'cancelled':
case 'removed':
return 3;
default:
return -1;
}
}
function getDefaultChatConversationRequestStatusMessage(status: ChatConversationRequestStatus) {
switch (status) {
case 'accepted':
return '요청을 접수했습니다.';
case 'queued':
return '대기열 등록';
case 'started':
return '요청 처리 중';
case 'completed':
return '요청 처리 완료';
case 'failed':
return '요청 처리 실패';
case 'cancelled':
return '요청 실행 중단';
case 'removed':
return '요청 기록이 제거되었습니다.';
default:
return null;
}
}
export function mergeChatConversationRequestStatus(
currentStatus: ChatConversationRequestStatus | null | undefined,
incomingStatus: ChatConversationRequestStatus | null | undefined,
): ChatConversationRequestStatus {
const normalizedCurrent = currentStatus ?? null;
const normalizedIncoming = incomingStatus ?? null;
if (!normalizedCurrent && !normalizedIncoming) {
return 'accepted';
}
if (!normalizedCurrent) {
return normalizedIncoming ?? 'accepted';
}
if (!normalizedIncoming) {
return normalizedCurrent;
}
if (isTerminalRequestStatus(normalizedCurrent) && !isTerminalRequestStatus(normalizedIncoming)) {
return normalizedCurrent;
}
if (!isTerminalRequestStatus(normalizedCurrent) && isTerminalRequestStatus(normalizedIncoming)) {
return normalizedIncoming;
}
return getRequestStatusRank(normalizedIncoming) >= getRequestStatusRank(normalizedCurrent)
? normalizedIncoming
: normalizedCurrent;
}
export function buildChatConversationRequestPatchFromMessage(message: {
id: number;
author: StoredChatMessage['author'];
text: string;
clientRequestId?: string | null;
}): ChatConversationRequestStatusPatch | null {
const normalizedRequestId = message.clientRequestId?.trim() || null;
if (!normalizedRequestId) {
return null;
}
if (message.author === 'user') {
return {
requestId: normalizedRequestId,
status: 'accepted',
userMessageId: message.id,
userText: message.text,
};
}
if (message.author === 'codex') {
return {
requestId: normalizedRequestId,
status: 'started',
responseMessageId: message.id,
responseText: message.text,
};
}
return null;
}
export function selectChatConversationResponseCandidate(
request: {
requestId: string;
createdAt: string;
responseMessageId?: number | null;
},
nextRequest: {
createdAt: string;
} | undefined,
messages: ChatConversationResponseCandidate[],
) {
const normalizedRequestId = request.requestId.trim();
if (!normalizedRequestId) {
return null;
}
const directMatches = messages.filter(
(message) => message.author === 'codex' && message.clientRequestId?.trim() === normalizedRequestId,
);
if (directMatches.length > 0) {
return directMatches.at(-1) ?? null;
}
if (request.responseMessageId != null) {
const responseMessageMatch = messages.find(
(message) => message.author === 'codex' && message.messageId === request.responseMessageId,
);
if (responseMessageMatch) {
return responseMessageMatch;
}
}
const requestCreatedAt = getTimeValue(request.createdAt);
const nextRequestCreatedAt = getTimeValue(nextRequest?.createdAt);
const windowMatches = messages.filter((message) => {
if (message.author !== 'codex') {
return false;
}
const linkedRequestId = message.clientRequestId?.trim() || null;
if (linkedRequestId && linkedRequestId !== normalizedRequestId) {
return false;
}
const createdAt = getTimeValue(message.createdAt);
if (requestCreatedAt > 0 && createdAt > 0 && createdAt < requestCreatedAt) {
return false;
}
if (nextRequestCreatedAt > 0 && createdAt > 0 && createdAt >= nextRequestCreatedAt) {
return false;
}
return Boolean(String(message.text ?? '').trim());
});
return windowMatches.at(-1) ?? null;
}
async function getLatestPreviewableMessageMap(sessionIds: string[]) {
const normalizedSessionIds = sessionIds.map((sessionId) => sessionId.trim()).filter(Boolean);
if (normalizedSessionIds.length === 0) {
return new Map<string, { text: string; createdAt: string | null }>();
}
const rows = await db(CHAT_CONVERSATION_MESSAGE_TABLE)
.select('session_id', 'author', 'text', 'created_at')
.whereIn('session_id', normalizedSessionIds)
.whereIn('author', ['user', 'codex'])
.orderBy('session_id', 'asc')
.orderBy('created_at', 'desc')
.orderBy('id', 'desc');
const messageMap = new Map<string, { text: string; createdAt: string | null }>();
for (const row of rows) {
const sessionId = String(row.session_id ?? '').trim();
if (!sessionId || messageMap.has(sessionId) || !isPreviewableConversationMessage(row)) {
continue;
}
messageMap.set(sessionId, {
text: String(row.text ?? ''),
createdAt: normalizeDateTimeValue(row.created_at),
});
}
return messageMap;
}
async function getLatestRequestPreviewMap(sessionIds: string[]) {
const normalizedSessionIds = Array.from(new Set(sessionIds.map((sessionId) => sessionId.trim()).filter(Boolean)));
if (normalizedSessionIds.length === 0) {
return new Map<string, { text: string; createdAt: string | null }>();
}
const rows = await db(CHAT_CONVERSATION_REQUEST_TABLE)
.select('session_id', 'user_text', 'created_at', 'status', 'request_id')
.whereIn('session_id', normalizedSessionIds)
.whereNot('status', 'removed')
.orderBy('session_id', 'asc')
.orderBy('created_at', 'desc')
.orderBy('request_id', 'desc');
const requestMap = new Map<string, { text: string; createdAt: string | null }>();
const requestRowsBySession = new Map<string, Array<{ text: string; createdAt: string | null }>>();
const completedSessionIds = new Set<string>();
for (const row of rows) {
const sessionId = String(row.session_id ?? '').trim();
const userText = String(row.user_text ?? '').trim();
if (!sessionId || completedSessionIds.has(sessionId) || !userText) {
continue;
}
const requestRows = requestRowsBySession.get(sessionId) ?? [];
requestRows.push({
text: userText,
createdAt: normalizeDateTimeValue(row.created_at),
});
if (requestRows.length >= 5) {
completedSessionIds.add(sessionId);
}
requestRowsBySession.set(sessionId, requestRows);
if (completedSessionIds.size >= normalizedSessionIds.length) {
break;
}
}
for (const sessionId of normalizedSessionIds) {
const preview = buildLatestRequestPreview(requestRowsBySession.get(sessionId) ?? []);
if (!preview) {
continue;
}
requestMap.set(sessionId, preview);
}
return requestMap;
}
async function getLatestResponsePreviewMap(sessionIds: string[]) {
const normalizedSessionIds = Array.from(new Set(sessionIds.map((sessionId) => sessionId.trim()).filter(Boolean)));
if (normalizedSessionIds.length === 0) {
return new Map<string, { text: string; createdAt: string | null }>();
}
const rows = await db(CHAT_CONVERSATION_REQUEST_TABLE)
.select('session_id', 'response_text', 'answered_at', 'updated_at', 'status', 'request_id')
.whereIn('session_id', normalizedSessionIds)
.whereNot('status', 'removed')
.orderBy('session_id', 'asc')
.orderByRaw('COALESCE(answered_at, updated_at, created_at) desc')
.orderBy('request_id', 'desc');
const responseMap = new Map<string, { text: string; createdAt: string | null }>();
for (const row of rows) {
const sessionId = String(row.session_id ?? '').trim();
const responseText = String(row.response_text ?? '').trim();
if (!sessionId || responseMap.has(sessionId) || !responseText) {
continue;
}
responseMap.set(sessionId, {
text: responseText,
createdAt: normalizeDateTimeValue(row.answered_at ?? row.updated_at),
});
}
return responseMap;
}
function resolveConversationPreviewOverride(
mapped: ChatConversationItem,
latestMessage: { text: string; createdAt: string | null } | undefined,
latestRequest: { text: string; createdAt: string | null } | undefined,
) {
const latestMessageTime = getTimeValue(latestMessage?.createdAt);
const latestRequestTime = getTimeValue(latestRequest?.createdAt);
if (latestRequest && latestRequestTime > latestMessageTime) {
return {
...mapped,
lastMessagePreview: createPreview(latestRequest.text),
lastMessageAt: latestRequest.createdAt,
};
}
if (latestMessage) {
return {
...mapped,
lastMessagePreview: createPreview(latestMessage.text),
lastMessageAt: latestMessage.createdAt,
};
}
return mapped;
}
async function getLatestResponseMessageIdMap(sessionIds: string[]) {
const normalizedSessionIds = Array.from(new Set(sessionIds.map((sessionId) => sessionId.trim()).filter(Boolean)));
if (normalizedSessionIds.length === 0) {
return new Map<string, number>();
}
const rows = await db(CHAT_CONVERSATION_REQUEST_TABLE)
.select('session_id', 'response_message_id')
.whereIn('session_id', normalizedSessionIds)
.whereNotNull('response_message_id')
.orderBy('session_id', 'asc')
.orderBy('response_message_id', 'desc');
const responseMap = new Map<string, number>();
for (const row of rows) {
const sessionId = String(row.session_id ?? '').trim();
const responseMessageId =
row.response_message_id == null ? null : Number(row.response_message_id);
if (!sessionId || responseMessageId == null || responseMap.has(sessionId)) {
continue;
}
responseMap.set(sessionId, responseMessageId);
}
return responseMap;
}
async function getLatestCodexPromptPartsMap(sessionIds: string[]) {
const normalizedSessionIds = Array.from(new Set(sessionIds.map((sessionId) => sessionId.trim()).filter(Boolean)));
if (normalizedSessionIds.length === 0) {
return new Map<string, ChatMessagePart[]>();
}
const rows = await db(CHAT_CONVERSATION_MESSAGE_TABLE)
.select('session_id', 'parts_json', 'created_at', 'message_id')
.whereIn('session_id', normalizedSessionIds)
.andWhere('author', 'codex')
.orderBy('session_id', 'asc')
.orderBy('created_at', 'desc')
.orderBy('message_id', 'desc');
const promptPartMap = new Map<string, ChatMessagePart[]>();
for (const row of rows) {
const sessionId = String(row.session_id ?? '').trim();
if (!sessionId || promptPartMap.has(sessionId)) {
continue;
}
const parts = parseChatMessageParts(row.parts_json);
if ((parts ?? []).some((part) => part.type === 'prompt')) {
promptPartMap.set(sessionId, parts ?? []);
}
}
return promptPartMap;
}
async function getLatestResponseMessageId(sessionId: string) {
const responseMap = await getLatestResponseMessageIdMap([sessionId]);
return responseMap.get(sessionId.trim()) ?? null;
}
export async function ensureChatConversationTables() {
const hasConversationTable = await db.schema.hasTable(CHAT_CONVERSATION_TABLE);
if (!hasConversationTable) {
await db.schema.createTable(CHAT_CONVERSATION_TABLE, (table) => {
table.string('session_id', 120).primary();
table.string('client_id', 120).nullable().index();
table.string('title', 200).notNullable().defaultTo('새 대화');
table.string('request_badge_label', 120).nullable();
table.string('chat_type_id', 120).nullable();
table.string('last_chat_type_id', 120).nullable();
table.string('general_section_name', 120).nullable();
table.string('context_label', 200).nullable();
table.text('context_description').nullable();
table.boolean('notify_offline').notNullable().defaultTo(false);
table.string('current_request_id', 120).nullable();
table.string('current_job_status', 40).nullable();
table.text('current_job_message').nullable();
table.integer('current_queue_size').notNullable().defaultTo(0);
table.timestamp('current_status_updated_at', { useTz: true }).nullable();
table.text('last_message_preview').notNullable().defaultTo('');
table.timestamp('last_message_at', { useTz: true }).nullable();
table.timestamp('created_at', { useTz: true }).notNullable().defaultTo(db.fn.now());
table.timestamp('updated_at', { useTz: true }).notNullable().defaultTo(db.fn.now());
});
}
const requiredConversationColumns: Array<[string, (table: any) => void]> = [
['client_id', (table) => table.string('client_id', 120).nullable().index()],
['title', (table) => table.string('title', 200).notNullable().defaultTo('새 대화')],
['request_badge_label', (table) => table.string('request_badge_label', 120).nullable()],
['chat_type_id', (table) => table.string('chat_type_id', 120).nullable()],
['last_chat_type_id', (table) => table.string('last_chat_type_id', 120).nullable()],
['general_section_name', (table) => table.string('general_section_name', 120).nullable()],
['context_label', (table) => table.string('context_label', 200).nullable()],
['context_description', (table) => table.text('context_description').nullable()],
['notify_offline', (table) => table.boolean('notify_offline').notNullable().defaultTo(false)],
['current_request_id', (table) => table.string('current_request_id', 120).nullable()],
['current_job_status', (table) => table.string('current_job_status', 40).nullable()],
['current_job_message', (table) => table.text('current_job_message').nullable()],
['current_queue_size', (table) => table.integer('current_queue_size').notNullable().defaultTo(0)],
['current_status_updated_at', (table) => table.timestamp('current_status_updated_at', { useTz: true }).nullable()],
['last_message_preview', (table) => table.text('last_message_preview').notNullable().defaultTo('')],
['last_message_at', (table) => table.timestamp('last_message_at', { useTz: true }).nullable()],
['created_at', (table) => table.timestamp('created_at', { useTz: true }).notNullable().defaultTo(db.fn.now())],
['updated_at', (table) => table.timestamp('updated_at', { useTz: true }).notNullable().defaultTo(db.fn.now())],
];
for (const [columnName, createColumn] of requiredConversationColumns) {
const hasColumn = await db.schema.hasColumn(CHAT_CONVERSATION_TABLE, columnName);
if (!hasColumn) {
await db.schema.alterTable(CHAT_CONVERSATION_TABLE, (table) => {
createColumn(table);
});
}
}
const hasClientTable = await db.schema.hasTable(CHAT_CONVERSATION_CLIENT_TABLE);
if (!hasClientTable) {
await db.schema.createTable(CHAT_CONVERSATION_CLIENT_TABLE, (table) => {
table.increments('id').primary();
table.string('session_id', 120).notNullable().index();
table.string('client_id', 120).notNullable().index();
table.boolean('notify_offline').notNullable().defaultTo(false);
table.bigInteger('last_read_response_message_id').nullable();
table.timestamp('created_at', { useTz: true }).notNullable().defaultTo(db.fn.now());
table.timestamp('updated_at', { useTz: true }).notNullable().defaultTo(db.fn.now());
table.unique(['session_id', 'client_id']);
});
}
const requiredClientColumns: Array<[string, (table: any) => void]> = [
['session_id', (table) => table.string('session_id', 120).notNullable().index()],
['client_id', (table) => table.string('client_id', 120).notNullable().index()],
['notify_offline', (table) => table.boolean('notify_offline').notNullable().defaultTo(false)],
['last_read_response_message_id', (table) => table.bigInteger('last_read_response_message_id').nullable()],
['created_at', (table) => table.timestamp('created_at', { useTz: true }).notNullable().defaultTo(db.fn.now())],
['updated_at', (table) => table.timestamp('updated_at', { useTz: true }).notNullable().defaultTo(db.fn.now())],
];
for (const [columnName, createColumn] of requiredClientColumns) {
const hasColumn = await db.schema.hasColumn(CHAT_CONVERSATION_CLIENT_TABLE, columnName);
if (!hasColumn) {
await db.schema.alterTable(CHAT_CONVERSATION_CLIENT_TABLE, (table) => {
createColumn(table);
});
}
}
const hasMessageTable = await db.schema.hasTable(CHAT_CONVERSATION_MESSAGE_TABLE);
if (!hasMessageTable) {
await db.schema.createTable(CHAT_CONVERSATION_MESSAGE_TABLE, (table) => {
table.increments('id').primary();
table.string('session_id', 120).notNullable().index();
table.bigInteger('message_id').notNullable();
table.string('author', 20).notNullable();
table.text('text').notNullable();
table.text('parts_json').notNullable().defaultTo('[]');
table.string('display_timestamp', 40).notNullable().defaultTo('');
table.string('client_request_id', 120).nullable();
table.timestamp('created_at', { useTz: true }).notNullable().defaultTo(db.fn.now());
table.unique(['session_id', 'message_id']);
});
}
const requiredMessageColumns: Array<[string, (table: any) => void]> = [
['session_id', (table) => table.string('session_id', 120).notNullable().index()],
['message_id', (table) => table.bigInteger('message_id').notNullable()],
['author', (table) => table.string('author', 20).notNullable().defaultTo('codex')],
['text', (table) => table.text('text').notNullable().defaultTo('')],
['parts_json', (table) => table.text('parts_json').notNullable().defaultTo('[]')],
['display_timestamp', (table) => table.string('display_timestamp', 40).notNullable().defaultTo('')],
['client_request_id', (table) => table.string('client_request_id', 120).nullable()],
['created_at', (table) => table.timestamp('created_at', { useTz: true }).notNullable().defaultTo(db.fn.now())],
];
for (const [columnName, createColumn] of requiredMessageColumns) {
const hasColumn = await db.schema.hasColumn(CHAT_CONVERSATION_MESSAGE_TABLE, columnName);
if (!hasColumn) {
await db.schema.alterTable(CHAT_CONVERSATION_MESSAGE_TABLE, (table) => {
createColumn(table);
});
}
}
const hasRequestTable = await db.schema.hasTable(CHAT_CONVERSATION_REQUEST_TABLE);
if (!hasRequestTable) {
await db.schema.createTable(CHAT_CONVERSATION_REQUEST_TABLE, (table) => {
table.increments('id').primary();
table.string('session_id', 120).notNullable().index();
table.string('request_id', 120).notNullable();
table.string('requester_client_id', 200).nullable();
table.string('status', 40).notNullable().defaultTo('accepted');
table.text('status_message').nullable();
table.bigInteger('user_message_id').nullable();
table.text('user_text').notNullable().defaultTo('');
table.bigInteger('response_message_id').nullable();
table.text('response_text').notNullable().defaultTo('');
table.timestamp('answered_at', { useTz: true }).nullable();
table.timestamp('terminal_at', { useTz: true }).nullable();
table.timestamp('created_at', { useTz: true }).notNullable().defaultTo(db.fn.now());
table.timestamp('updated_at', { useTz: true }).notNullable().defaultTo(db.fn.now());
table.unique(['session_id', 'request_id']);
});
}
const requiredRequestColumns: Array<[string, (table: any) => void]> = [
['session_id', (table) => table.string('session_id', 120).notNullable().index()],
['request_id', (table) => table.string('request_id', 120).notNullable()],
['requester_client_id', (table) => table.string('requester_client_id', 200).nullable()],
['status', (table) => table.string('status', 40).notNullable().defaultTo('accepted')],
['status_message', (table) => table.text('status_message').nullable()],
['user_message_id', (table) => table.bigInteger('user_message_id').nullable()],
['user_text', (table) => table.text('user_text').notNullable().defaultTo('')],
['response_message_id', (table) => table.bigInteger('response_message_id').nullable()],
['response_text', (table) => table.text('response_text').notNullable().defaultTo('')],
['answered_at', (table) => table.timestamp('answered_at', { useTz: true }).nullable()],
['terminal_at', (table) => table.timestamp('terminal_at', { useTz: true }).nullable()],
['created_at', (table) => table.timestamp('created_at', { useTz: true }).notNullable().defaultTo(db.fn.now())],
['updated_at', (table) => table.timestamp('updated_at', { useTz: true }).notNullable().defaultTo(db.fn.now())],
];
for (const [columnName, createColumn] of requiredRequestColumns) {
const hasColumn = await db.schema.hasColumn(CHAT_CONVERSATION_REQUEST_TABLE, columnName);
if (!hasColumn) {
await db.schema.alterTable(CHAT_CONVERSATION_REQUEST_TABLE, (table) => {
createColumn(table);
});
}
}
const hasActivityTable = await db.schema.hasTable(CHAT_CONVERSATION_ACTIVITY_TABLE);
if (!hasActivityTable) {
await db.schema.createTable(CHAT_CONVERSATION_ACTIVITY_TABLE, (table) => {
table.increments('id').primary();
table.string('session_id', 120).notNullable().index();
table.string('request_id', 120).notNullable().index();
table.integer('line_no').notNullable();
table.text('text').notNullable();
table.timestamp('created_at', { useTz: true }).notNullable().defaultTo(db.fn.now());
table.unique(['session_id', 'request_id', 'line_no']);
});
}
const requiredActivityColumns: Array<[string, (table: any) => void]> = [
['session_id', (table) => table.string('session_id', 120).notNullable().index()],
['request_id', (table) => table.string('request_id', 120).notNullable().index()],
['line_no', (table) => table.integer('line_no').notNullable()],
['text', (table) => table.text('text').notNullable().defaultTo('')],
['created_at', (table) => table.timestamp('created_at', { useTz: true }).notNullable().defaultTo(db.fn.now())],
];
for (const [columnName, createColumn] of requiredActivityColumns) {
const hasColumn = await db.schema.hasColumn(CHAT_CONVERSATION_ACTIVITY_TABLE, columnName);
if (!hasColumn) {
await db.schema.alterTable(CHAT_CONVERSATION_ACTIVITY_TABLE, (table) => {
createColumn(table);
});
}
}
const hasSourceChangeTable = await db.schema.hasTable(CHAT_CONVERSATION_SOURCE_CHANGE_TABLE);
if (!hasSourceChangeTable) {
await db.schema.createTable(CHAT_CONVERSATION_SOURCE_CHANGE_TABLE, (table) => {
table.increments('id').primary();
table.string('session_id', 120).notNullable().index();
table.string('client_id', 120).nullable().index();
table.string('request_id', 120).notNullable();
table.string('conversation_title', 200).notNullable().defaultTo('새 대화');
table.string('chat_type_id', 120).nullable();
table.string('chat_type_label', 200).nullable();
table.string('request_title', 200).notNullable().defaultTo('');
table.text('question_text').notNullable().defaultTo('');
table.text('answer_text').notNullable().defaultTo('');
table.string('status', 40).notNullable().defaultTo('completed');
table.timestamp('answered_at', { useTz: true }).nullable();
table.timestamp('source_changed_at', { useTz: true }).nullable();
table.text('feature_tags_json').notNullable().defaultTo('[]');
table.text('changed_files_json').notNullable().defaultTo('[]');
table.text('current_source_files_json').notNullable().defaultTo('[]');
table.text('diff_blocks_json').notNullable().defaultTo('[]');
table.boolean('has_source_changes').notNullable().defaultTo(false);
table.string('review_status', 40).notNullable().defaultTo('not-reviewed');
table.string('source_change_kind', 40).notNullable().defaultTo('request');
table.text('source_entry_ids_json').notNullable().defaultTo('[]');
table.timestamp('conversation_deleted_at', { useTz: true }).nullable();
table.timestamp('updated_at', { useTz: true }).notNullable().defaultTo(db.fn.now());
table.unique(['session_id', 'request_id']);
});
}
const requiredSourceChangeColumns: Array<[string, (table: any) => void]> = [
['session_id', (table) => table.string('session_id', 120).notNullable().index()],
['client_id', (table) => table.string('client_id', 120).nullable().index()],
['request_id', (table) => table.string('request_id', 120).notNullable()],
['conversation_title', (table) => table.string('conversation_title', 200).notNullable().defaultTo('새 대화')],
['chat_type_id', (table) => table.string('chat_type_id', 120).nullable()],
['chat_type_label', (table) => table.string('chat_type_label', 200).nullable()],
['request_title', (table) => table.string('request_title', 200).notNullable().defaultTo('')],
['question_text', (table) => table.text('question_text').notNullable().defaultTo('')],
['answer_text', (table) => table.text('answer_text').notNullable().defaultTo('')],
['status', (table) => table.string('status', 40).notNullable().defaultTo('completed')],
['answered_at', (table) => table.timestamp('answered_at', { useTz: true }).nullable()],
['source_changed_at', (table) => table.timestamp('source_changed_at', { useTz: true }).nullable()],
['feature_tags_json', (table) => table.text('feature_tags_json').notNullable().defaultTo('[]')],
['changed_files_json', (table) => table.text('changed_files_json').notNullable().defaultTo('[]')],
['current_source_files_json', (table) => table.text('current_source_files_json').notNullable().defaultTo('[]')],
['diff_blocks_json', (table) => table.text('diff_blocks_json').notNullable().defaultTo('[]')],
['has_source_changes', (table) => table.boolean('has_source_changes').notNullable().defaultTo(false)],
['review_status', (table) => table.string('review_status', 40).notNullable().defaultTo('not-reviewed')],
['source_change_kind', (table) => table.string('source_change_kind', 40).notNullable().defaultTo('request')],
['source_entry_ids_json', (table) => table.text('source_entry_ids_json').notNullable().defaultTo('[]')],
['conversation_deleted_at', (table) => table.timestamp('conversation_deleted_at', { useTz: true }).nullable()],
['updated_at', (table) => table.timestamp('updated_at', { useTz: true }).notNullable().defaultTo(db.fn.now())],
];
for (const [columnName, createColumn] of requiredSourceChangeColumns) {
const hasColumn = await db.schema.hasColumn(CHAT_CONVERSATION_SOURCE_CHANGE_TABLE, columnName);
if (!hasColumn) {
await db.schema.alterTable(CHAT_CONVERSATION_SOURCE_CHANGE_TABLE, (table) => {
createColumn(table);
});
}
}
}
export async function getChatConversation(sessionId: string, clientId?: string | null) {
const normalizedSessionId = sessionId.trim();
let row = await db(CHAT_CONVERSATION_TABLE).where({ session_id: normalizedSessionId }).first();
if (!row) {
return null;
}
const currentRequestId = String(row.current_request_id ?? '').trim() || null;
if (
shouldClearConversationJobState({
currentRequestId,
currentJobStatus: row.current_job_status == null ? null : String(row.current_job_status) as ChatConversationItem['currentJobStatus'],
currentStatusUpdatedAt: normalizeDateTimeValue(row.current_status_updated_at),
runtimeActive: isRuntimeRequestActive(currentRequestId),
request: currentRequestId
? await db(CHAT_CONVERSATION_REQUEST_TABLE)
.where({
session_id: normalizedSessionId,
request_id: currentRequestId,
})
.first()
.then((requestRow) =>
requestRow
? {
requestId: String(requestRow.request_id ?? ''),
status: String(requestRow.status ?? '') as ChatConversationRequestStatus,
responseMessageId: requestRow.response_message_id == null ? null : Number(requestRow.response_message_id),
responseText: String(requestRow.response_text ?? ''),
terminalAt: normalizeDateTimeValue(requestRow.terminal_at),
updatedAt: normalizeDateTimeValue(requestRow.updated_at),
}
: null,
)
: null,
})
) {
const shouldMarkRequestFailed =
currentRequestId &&
!isRuntimeRequestActive(currentRequestId) &&
['queued', 'started'].includes(String(row.current_job_status ?? '').trim());
if (shouldMarkRequestFailed) {
await db(CHAT_CONVERSATION_REQUEST_TABLE)
.where({
session_id: normalizedSessionId,
request_id: currentRequestId,
})
.whereIn('status', ['queued', 'started'])
.update({
status: 'failed',
status_message: '중단된 오래된 요청',
terminal_at: db.fn.now(),
updated_at: db.fn.now(),
});
}
await db(CHAT_CONVERSATION_TABLE)
.where({ session_id: normalizedSessionId })
.update({
current_request_id: null,
current_job_status: null,
current_job_message: null,
current_queue_size: 0,
current_status_updated_at: db.fn.now(),
updated_at: db.fn.now(),
});
row = await db(CHAT_CONVERSATION_TABLE).where({ session_id: normalizedSessionId }).first();
if (!row) {
return null;
}
}
const mapped = mapConversationRow(row);
const latestPreviewMessageMap = await getLatestPreviewableMessageMap([normalizedSessionId]);
const latestRequestPreviewMap = await getLatestRequestPreviewMap([normalizedSessionId]);
const previewResolvedConversation = resolveConversationPreviewOverride(
mapped,
latestPreviewMessageMap.get(normalizedSessionId),
latestRequestPreviewMap.get(normalizedSessionId),
);
const normalizedClientId = normalizeClientId(clientId);
if (!normalizedClientId) {
return previewResolvedConversation;
}
const preference = await getChatConversationClientPreference(sessionId, normalizedClientId);
const latestResponseMessageId = await getLatestResponseMessageId(normalizedSessionId);
return {
...previewResolvedConversation,
clientId: normalizedClientId,
notifyOffline: preference?.notifyOffline ?? previewResolvedConversation.notifyOffline,
hasUnreadResponse:
latestResponseMessageId != null &&
latestResponseMessageId > (preference?.lastReadResponseMessageId ?? 0),
};
}
export async function createChatConversation(payload: z.input<typeof conversationPayloadSchema>) {
const parsed = conversationPayloadSchema.parse(payload);
const normalizedClientId = normalizeClientId(parsed.clientId);
const notifyOffline = parsed.notifyOffline ?? true;
await db(CHAT_CONVERSATION_TABLE)
.insert({
session_id: parsed.sessionId,
client_id: normalizedClientId,
title: parsed.title?.trim() || '새 대화',
request_badge_label: parsed.requestBadgeLabel?.trim() || null,
chat_type_id: parsed.chatTypeId?.trim() || null,
last_chat_type_id: parsed.lastChatTypeId?.trim() || parsed.chatTypeId?.trim() || null,
general_section_name: parsed.generalSectionName?.trim() || null,
context_label: parsed.contextLabel?.trim() || null,
context_description: parsed.contextDescription?.trim() || null,
notify_offline: notifyOffline,
current_request_id: null,
current_job_status: null,
current_job_message: null,
current_queue_size: 0,
current_status_updated_at: null,
last_message_preview: '',
last_message_at: null,
created_at: db.fn.now(),
updated_at: db.fn.now(),
})
.onConflict('session_id')
.ignore();
if (normalizedClientId) {
const existingPreference = await getChatConversationClientPreference(parsed.sessionId, normalizedClientId);
if (!existingPreference) {
await upsertChatConversationClientPreference(parsed.sessionId, normalizedClientId, notifyOffline);
}
}
const conversation = await getChatConversation(parsed.sessionId, normalizedClientId);
if (!conversation) {
throw new Error('채팅방을 저장했지만 다시 불러오지 못했습니다.');
}
return conversation;
}
export async function updateChatConversationContext(
sessionId: string,
payload: {
title?: string | null;
requestBadgeLabel?: string | null;
clientId?: string | null;
chatTypeId?: string | null;
lastChatTypeId?: string | null;
generalSectionName?: string | null;
contextLabel?: string | null;
contextDescription?: string | null;
notifyOffline?: boolean | null;
},
) {
const normalizedClientId = normalizeClientId(payload.clientId);
const current = await db(CHAT_CONVERSATION_TABLE).where({ session_id: sessionId.trim() }).first();
if (!current) {
return null;
}
const currentChatTypeId = String(current.chat_type_id ?? '').trim() || null;
const requestedChatTypeId = payload.chatTypeId?.trim() || null;
const nextChatTypeId = resolveNextConversationChatTypeId(currentChatTypeId, requestedChatTypeId);
const requestedContextLabel = payload.contextLabel?.trim() || null;
const requestedContextDescription = payload.contextDescription?.trim() || null;
const hasRequestedRequestBadgeLabel = Object.prototype.hasOwnProperty.call(payload, 'requestBadgeLabel');
const hasRequestedGeneralSectionName = Object.prototype.hasOwnProperty.call(payload, 'generalSectionName');
const hasRequestedContextLabel = Object.prototype.hasOwnProperty.call(payload, 'contextLabel');
const hasRequestedContextDescription = Object.prototype.hasOwnProperty.call(payload, 'contextDescription');
await db(CHAT_CONVERSATION_TABLE)
.where({ session_id: sessionId.trim() })
.update({
title: payload.title?.trim() || current.title || '새 대화',
request_badge_label: resolveNextConversationContextValue(
current.request_badge_label,
payload.requestBadgeLabel,
hasRequestedRequestBadgeLabel,
),
client_id: normalizedClientId || current.client_id || null,
chat_type_id: nextChatTypeId,
last_chat_type_id: nextChatTypeId || payload.lastChatTypeId?.trim() || current.last_chat_type_id || null,
general_section_name: resolveNextConversationContextValue(
current.general_section_name,
payload.generalSectionName,
hasRequestedGeneralSectionName,
),
context_label: resolveNextConversationContextValue(current.context_label, requestedContextLabel, hasRequestedContextLabel),
context_description: resolveNextConversationContextValue(
current.context_description,
requestedContextDescription,
hasRequestedContextDescription,
),
notify_offline:
normalizedClientId == null && payload.notifyOffline != null
? payload.notifyOffline
: Boolean(current.notify_offline),
updated_at: db.fn.now(),
});
if (normalizedClientId && payload.notifyOffline != null) {
await upsertChatConversationClientPreference(sessionId, normalizedClientId, payload.notifyOffline);
}
return getChatConversation(sessionId, normalizedClientId);
}
export function resolveNextConversationChatTypeId(currentChatTypeId?: string | null, requestedChatTypeId?: string | null) {
const normalizedCurrentChatTypeId = String(currentChatTypeId ?? '').trim() || null;
const normalizedRequestedChatTypeId = String(requestedChatTypeId ?? '').trim() || null;
return normalizedRequestedChatTypeId ?? normalizedCurrentChatTypeId ?? null;
}
export function resolveNextConversationContextValue(
currentValue: string | null | undefined,
requestedValue: string | null | undefined,
hasRequestedValue: boolean,
) {
if (!hasRequestedValue) {
return String(currentValue ?? '').trim() || null;
}
return String(requestedValue ?? '').trim() || null;
}
export async function listChatConversations(
clientId?: string | null,
limit = 50,
unreadStateClientId?: string | null,
) {
const normalizedClientId = normalizeClientId(clientId);
const normalizedUnreadStateClientId = normalizeClientId(unreadStateClientId ?? clientId);
const normalizedLimit = Math.max(1, Math.min(200, Math.round(limit)));
let conversationListScopeClientId = normalizedClientId;
const buildConversationListQuery = (targetClientId?: string | null) => {
const query = db(CHAT_CONVERSATION_TABLE)
.select('*')
.orderByRaw('COALESCE(last_message_at, updated_at, created_at) DESC NULLS LAST')
.orderByRaw('last_message_at DESC NULLS LAST')
.orderByRaw('updated_at DESC NULLS LAST')
.orderByRaw('created_at DESC NULLS LAST')
.limit(normalizedLimit);
if (targetClientId) {
query.where((builder) => {
builder
.where({ client_id: targetClientId })
.orWhereExists(
db(CHAT_CONVERSATION_CLIENT_TABLE)
.select(db.raw('1'))
.whereRaw(`${CHAT_CONVERSATION_CLIENT_TABLE}.session_id = ${CHAT_CONVERSATION_TABLE}.session_id`)
.andWhere({ client_id: targetClientId }),
);
});
}
return query;
};
let rows = await buildConversationListQuery(normalizedClientId);
// Browser storage reset can regenerate client_id and hide existing rooms.
// When that happens, fall back to the recent global list so the user can recover.
if (normalizedClientId && rows.length === 0) {
conversationListScopeClientId = null;
rows = await buildConversationListQuery(null);
}
const sessionIds = rows.map((row) => String(row.session_id ?? '')).filter(Boolean);
const currentRequestIds = Array.from(
new Set(rows.map((row) => String(row.current_request_id ?? '').trim()).filter(Boolean)),
);
if (sessionIds.length > 0 && currentRequestIds.length > 0) {
const requestRows = await db(CHAT_CONVERSATION_REQUEST_TABLE)
.select('session_id', 'request_id', 'status', 'response_message_id', 'response_text', 'terminal_at')
.whereIn('session_id', sessionIds)
.whereIn('request_id', currentRequestIds);
const requestMap = new Map(
requestRows.map((requestRow) => [
`${String(requestRow.session_id ?? '').trim()}:${String(requestRow.request_id ?? '').trim()}`,
{
requestId: String(requestRow.request_id ?? ''),
status: String(requestRow.status ?? '') as ChatConversationRequestStatus,
responseMessageId: requestRow.response_message_id == null ? null : Number(requestRow.response_message_id),
responseText: String(requestRow.response_text ?? ''),
terminalAt: normalizeDateTimeValue(requestRow.terminal_at),
},
]),
);
const staleSessionIds = rows
.filter((row) =>
shouldClearConversationJobState({
currentRequestId: String(row.current_request_id ?? ''),
currentJobStatus: row.current_job_status == null ? null : String(row.current_job_status) as ChatConversationItem['currentJobStatus'],
currentStatusUpdatedAt: normalizeDateTimeValue(row.current_status_updated_at),
runtimeActive: isRuntimeRequestActive(String(row.current_request_id ?? '')),
request:
requestMap.get(`${String(row.session_id ?? '').trim()}:${String(row.current_request_id ?? '').trim()}`) ?? null,
}),
)
.map((row) => String(row.session_id ?? '').trim())
.filter(Boolean);
if (staleSessionIds.length > 0) {
const staleRequestIds = rows
.filter((row) => staleSessionIds.includes(String(row.session_id ?? '').trim()))
.map((row) => ({
sessionId: String(row.session_id ?? '').trim(),
requestId: String(row.current_request_id ?? '').trim(),
status: String(row.current_job_status ?? '').trim(),
}))
.filter((item) => item.requestId && (item.status === 'queued' || item.status === 'started'));
for (const staleItem of staleRequestIds) {
await db(CHAT_CONVERSATION_REQUEST_TABLE)
.where({
session_id: staleItem.sessionId,
request_id: staleItem.requestId,
})
.whereIn('status', ['queued', 'started'])
.update({
status: 'failed',
status_message: '중단된 오래된 요청',
terminal_at: db.fn.now(),
updated_at: db.fn.now(),
});
}
await db(CHAT_CONVERSATION_TABLE)
.whereIn('session_id', staleSessionIds)
.update({
current_request_id: null,
current_job_status: null,
current_job_message: null,
current_queue_size: 0,
current_status_updated_at: db.fn.now(),
updated_at: db.fn.now(),
});
rows = await buildConversationListQuery(conversationListScopeClientId);
}
}
if (rows.length > 0) {
const candidateSessionIds = rows.map((row) => String(row.session_id ?? '').trim()).filter(Boolean);
const [messageSessionRows, requestSessionRows] = await Promise.all([
db(CHAT_CONVERSATION_MESSAGE_TABLE).distinct('session_id').whereIn('session_id', candidateSessionIds),
db(CHAT_CONVERSATION_REQUEST_TABLE).distinct('session_id').whereIn('session_id', candidateSessionIds),
]);
const visibleSessionIds = new Set(
[...messageSessionRows, ...requestSessionRows]
.map((row) => String(row.session_id ?? '').trim())
.filter(Boolean),
);
rows = rows.filter((row) => {
const sessionId = String(row.session_id ?? '').trim();
return visibleSessionIds.has(sessionId) || hasConversationMetadata(row);
});
}
const latestPreviewMessageMap = await getLatestPreviewableMessageMap(
rows.map((row) => String(row.session_id ?? '')),
);
const latestRequestPreviewMap = await getLatestRequestPreviewMap(
rows.map((row) => String(row.session_id ?? '')),
);
const latestResponsePreviewMap = await getLatestResponsePreviewMap(
rows.map((row) => String(row.session_id ?? '')),
);
const latestResponseMessageIdMap = await getLatestResponseMessageIdMap(
rows.map((row) => String(row.session_id ?? '')),
);
const latestCodexPromptPartsMap = await getLatestCodexPromptPartsMap(
rows.map((row) => String(row.session_id ?? '')),
);
if (!normalizedUnreadStateClientId) {
return rows
.map((row) => {
const mapped = mapConversationRow(row);
const pendingWorkState = resolvePendingWorkState({
requestText: latestRequestPreviewMap.get(mapped.sessionId)?.text ?? '',
responseText: latestResponsePreviewMap.get(mapped.sessionId)?.text ?? '',
latestCodexParts: latestCodexPromptPartsMap.get(mapped.sessionId),
});
return {
...resolveConversationPreviewOverride(
mapped,
latestPreviewMessageMap.get(mapped.sessionId),
latestRequestPreviewMap.get(mapped.sessionId),
),
...pendingWorkState,
lastRequestPreview: createPreview(latestRequestPreviewMap.get(mapped.sessionId)?.text ?? ''),
lastResponsePreview: createPreview(latestResponsePreviewMap.get(mapped.sessionId)?.text ?? ''),
hasUnreadResponse: false,
};
})
.sort((left, right) =>
(right.lastMessageAt ?? right.updatedAt).localeCompare(left.lastMessageAt ?? left.updatedAt),
);
}
if (rows.length === 0) {
return [];
}
const preferences = await db(CHAT_CONVERSATION_CLIENT_TABLE)
.select('*')
.where({ client_id: normalizedUnreadStateClientId })
.whereIn(
'session_id',
rows.map((row) => String(row.session_id ?? '')).filter(Boolean),
);
const preferenceMap = new Map(
preferences.map((row) => {
const mapped = mapClientPreferenceRow(row);
return [mapped.sessionId, mapped];
}),
);
return rows
.map((row) => {
const mapped = mapConversationRow(row);
const preference = preferenceMap.get(mapped.sessionId);
const latestPreviewMessage = latestPreviewMessageMap.get(mapped.sessionId);
const pendingWorkState = resolvePendingWorkState({
requestText: latestRequestPreviewMap.get(mapped.sessionId)?.text ?? '',
responseText: latestResponsePreviewMap.get(mapped.sessionId)?.text ?? '',
latestCodexParts: latestCodexPromptPartsMap.get(mapped.sessionId),
});
return {
...resolveConversationPreviewOverride(
mapped,
latestPreviewMessage,
latestRequestPreviewMap.get(mapped.sessionId),
),
...pendingWorkState,
lastRequestPreview: createPreview(latestRequestPreviewMap.get(mapped.sessionId)?.text ?? ''),
lastResponsePreview: createPreview(latestResponsePreviewMap.get(mapped.sessionId)?.text ?? ''),
clientId: normalizedUnreadStateClientId,
notifyOffline: preference?.notifyOffline ?? mapped.notifyOffline,
hasUnreadResponse:
(latestResponseMessageIdMap.get(mapped.sessionId) ?? 0) >
(preference?.lastReadResponseMessageId ?? 0),
};
})
.sort((left, right) =>
(right.lastMessageAt ?? right.updatedAt).localeCompare(left.lastMessageAt ?? left.updatedAt),
);
}
export async function listChatConversationMessages(
sessionId: string,
options: {
limit?: number;
beforeMessageId?: number | null;
} = {},
) {
const normalizedLimit = Math.max(1, Math.min(1000, Math.round(options.limit ?? 200)));
const normalizedBeforeMessageId =
Number.isFinite(options.beforeMessageId) && (options.beforeMessageId ?? 0) > 0
? Math.trunc(options.beforeMessageId as number)
: null;
const query = db(CHAT_CONVERSATION_MESSAGE_TABLE)
.select('*')
.where({ session_id: sessionId.trim() })
.modify((builder) => {
if (normalizedBeforeMessageId !== null) {
builder.where('message_id', '<', normalizedBeforeMessageId);
}
builder.andWhere((visibilityBuilder) => {
applyVisibleConversationMessageCondition(visibilityBuilder);
});
})
.orderBy('message_id', 'desc')
.orderBy('id', 'desc')
.limit(normalizedLimit);
const latestRows = await query;
return latestRows.reverse().map((row: Parameters<typeof mapMessageRow>[0]) => mapMessageRow(row));
}
async function listChatConversationActivityLogsByRequestIds(
sessionId: string,
requestIds: string[],
): Promise<ChatConversationActivityLogItem[]> {
const normalizedSessionId = sessionId.trim();
const normalizedRequestIds = Array.from(new Set(requestIds.map((item) => item.trim()).filter(Boolean)));
if (!normalizedSessionId || normalizedRequestIds.length === 0) {
return [];
}
const rows = await db(CHAT_CONVERSATION_ACTIVITY_TABLE)
.select('session_id', 'request_id', 'text', 'line_no', 'created_at')
.where({ session_id: normalizedSessionId })
.whereIn('request_id', normalizedRequestIds)
.orderBy('request_id', 'asc')
.orderBy('line_no', 'asc')
.orderBy('id', 'asc');
const activityMap = new Map<string, ChatConversationActivityLogItem>();
for (const row of rows) {
const requestId = String(row.request_id ?? '').trim();
if (!requestId) {
continue;
}
const existing = activityMap.get(requestId);
if (existing) {
existing.lines.push(String(row.text ?? ''));
existing.updatedAt = normalizeDateTimeValue(row.created_at) ?? existing.updatedAt;
continue;
}
activityMap.set(requestId, {
sessionId: String(row.session_id ?? normalizedSessionId),
requestId,
lines: [String(row.text ?? '')],
updatedAt: normalizeDateTimeValue(row.created_at),
});
}
return normalizedRequestIds
.map((requestId) => activityMap.get(requestId))
.filter(Boolean) as ChatConversationActivityLogItem[];
}
async function resolveConversationRequestCursor(sessionId: string, beforeMessageId: number) {
const normalizedSessionId = sessionId.trim();
const normalizedBeforeMessageId = Math.trunc(beforeMessageId);
if (!normalizedSessionId || normalizedBeforeMessageId <= 0) {
return null;
}
const directRequestRow = await db(CHAT_CONVERSATION_REQUEST_TABLE)
.select('request_id', 'created_at')
.where({ session_id: normalizedSessionId })
.andWhere((builder) => {
builder.where('user_message_id', normalizedBeforeMessageId).orWhere('response_message_id', normalizedBeforeMessageId);
})
.first();
if (directRequestRow) {
return {
requestId: String(directRequestRow.request_id ?? '').trim(),
createdAt: normalizeDateTimeValue(directRequestRow.created_at) ?? '',
};
}
const messageRow = await db(CHAT_CONVERSATION_MESSAGE_TABLE)
.select('client_request_id')
.where({
session_id: normalizedSessionId,
message_id: normalizedBeforeMessageId,
})
.first();
const linkedRequestId = String(messageRow?.client_request_id ?? '').trim();
if (!linkedRequestId) {
return null;
}
const linkedRequestRow = await db(CHAT_CONVERSATION_REQUEST_TABLE)
.select('request_id', 'created_at')
.where({
session_id: normalizedSessionId,
request_id: linkedRequestId,
})
.first();
if (!linkedRequestRow) {
return null;
}
return {
requestId: String(linkedRequestRow.request_id ?? '').trim(),
createdAt: normalizeDateTimeValue(linkedRequestRow.created_at) ?? '',
};
}
export async function listChatConversationDetailPage(
sessionId: string,
options: {
limit?: number;
beforeMessageId?: number | null;
} = {},
): Promise<ChatConversationDetailPage> {
const normalizedSessionId = sessionId.trim();
const conversation = await db(CHAT_CONVERSATION_TABLE).where({ session_id: normalizedSessionId }).first();
const normalizedLimit = Math.max(1, Math.min(100, Math.round(options.limit ?? 8)));
const normalizedBeforeMessageId =
Number.isFinite(options.beforeMessageId) && (options.beforeMessageId ?? 0) > 0
? Math.trunc(options.beforeMessageId as number)
: null;
const requestCursor =
normalizedBeforeMessageId == null
? null
: await resolveConversationRequestCursor(normalizedSessionId, normalizedBeforeMessageId);
const requestRows = await db(CHAT_CONVERSATION_REQUEST_TABLE)
.where({ session_id: normalizedSessionId })
.whereNot('status', 'removed')
.modify((builder) => {
if (!requestCursor) {
return;
}
builder.andWhere((cursorBuilder) => {
cursorBuilder
.where('created_at', '<', requestCursor.createdAt)
.orWhere((sameTimeBuilder) => {
sameTimeBuilder.where('created_at', '=', requestCursor.createdAt).andWhere('request_id', '<', requestCursor.requestId);
});
});
})
.orderBy('created_at', 'desc')
.orderBy('request_id', 'desc')
.limit(normalizedLimit);
const orderedRequestRows = [...requestRows].reverse();
const requests = orderedRequestRows.map((row) => normalizeStaleRequestItem(mapRequestRow(row), conversation));
const requestIds = requests.map((item) => item.requestId.trim()).filter(Boolean);
if (requestIds.length === 0) {
return {
messages: [],
requests,
activityLogs: [],
oldestLoadedMessageId: null,
hasOlderMessages: false,
};
}
const messageRows = await db(CHAT_CONVERSATION_MESSAGE_TABLE)
.select('*')
.where({ session_id: normalizedSessionId })
.whereIn('client_request_id', requestIds)
.andWhere((builder) => {
applyVisibleConversationMessageCondition(builder);
})
.orderBy('message_id', 'asc')
.orderBy('id', 'asc');
const messages = messageRows.map((row: Parameters<typeof mapMessageRow>[0]) => mapMessageRow(row));
const activityLogs = await listChatConversationActivityLogsByRequestIds(normalizedSessionId, requestIds);
const oldestLoadedMessageId =
requests.reduce<number | null>((oldestId, request) => {
const candidateIds = [request.userMessageId, request.responseMessageId].filter(
(value): value is number => typeof value === 'number' && Number.isInteger(value) && value > 0,
);
if (candidateIds.length === 0) {
return oldestId;
}
const nextCandidateId = Math.min(...candidateIds);
return oldestId == null ? nextCandidateId : Math.min(oldestId, nextCandidateId);
}, null) ?? messages[0]?.id ?? null;
const oldestRequest = requests[0] ?? null;
const hasOlderMessages = oldestRequest
? Boolean(
await db(CHAT_CONVERSATION_REQUEST_TABLE)
.where({ session_id: normalizedSessionId })
.whereNot('status', 'removed')
.andWhere((builder) => {
builder
.where('created_at', '<', oldestRequest.createdAt)
.orWhere((sameTimeBuilder) => {
sameTimeBuilder.where('created_at', '=', oldestRequest.createdAt).andWhere('request_id', '<', oldestRequest.requestId);
});
})
.first(),
)
: false;
return {
messages,
requests,
activityLogs,
oldestLoadedMessageId,
hasOlderMessages,
};
}
export async function listChatConversationRequests(sessionId: string, limit = 200) {
const normalizedSessionId = sessionId.trim();
const conversation = await db(CHAT_CONVERSATION_TABLE).where({ session_id: normalizedSessionId }).first();
const rows = await db(CHAT_CONVERSATION_REQUEST_TABLE)
.where({ session_id: normalizedSessionId })
.orderBy('created_at', 'asc')
.limit(Math.max(1, Math.min(1000, Math.round(limit))));
return rows.map((row) => normalizeStaleRequestItem(mapRequestRow(row), conversation));
}
export async function listChatSourceChangeSnapshots(clientId?: string | null, limit = 200) {
await ensureChatConversationTables();
await db(CHAT_CONVERSATION_SOURCE_CHANGE_TABLE)
.where({ has_source_changes: true })
.andWhere('changed_files_json', '[]')
.andWhere('current_source_files_json', '[]')
.andWhere('diff_blocks_json', '[]')
.update({
has_source_changes: false,
updated_at: db.fn.now(),
});
const normalizedClientId = normalizeClientId(clientId);
const normalizedLimit = Math.max(1, Math.min(500, Math.round(limit)));
const buildQuery = (targetClientId?: string | null) =>
db(CHAT_CONVERSATION_SOURCE_CHANGE_TABLE)
.select('*')
.where({ has_source_changes: true })
.modify((builder) => {
if (targetClientId) {
builder.where({ client_id: targetClientId });
}
})
.orderByRaw('COALESCE(source_changed_at, answered_at, updated_at) DESC NULLS LAST')
.orderBy('updated_at', 'desc')
.limit(normalizedLimit);
let rows = await buildQuery(normalizedClientId);
if (normalizedClientId && rows.length === 0) {
rows = await buildQuery(null);
}
return rows.map((row: Parameters<typeof mapSourceChangeSnapshotRow>[0]) => mapSourceChangeSnapshotRow(row));
}
async function applyVerifiedSourceChangeGrouping(args: {
sessionId: string;
clientId?: string | null;
conversationTitle?: string | null;
chatTypeId?: string | null;
chatTypeLabel?: string | null;
requestId: string;
questionText: string;
answerText: string;
status: ChatConversationRequestStatus;
answeredAt?: string | null;
updatedAt?: string | null;
}) {
const metadata = parseSourceChangeVerificationMetadata(args.questionText);
if (!metadata) {
return;
}
const selectedFeatureKeys = parseSelectedPromptValues(args.answerText);
const entryRefs = metadata.features.flatMap((feature) => feature.entryRefs);
if (entryRefs.length === 0) {
return;
}
const sourceRows = await db(CHAT_CONVERSATION_SOURCE_CHANGE_TABLE)
.select('*')
.where((builder) => {
entryRefs.forEach((entryRef, index) => {
const method = index === 0 ? 'where' : 'orWhere';
builder[method]((nestedBuilder: any) => {
nestedBuilder.where({
session_id: entryRef.sessionId,
request_id: entryRef.requestId,
});
});
});
});
const sourceRowMap = new Map<string, Record<string, unknown>>();
sourceRows.forEach((row) => {
const sessionId = String(row.session_id ?? '').trim();
const requestId = String(row.request_id ?? '').trim();
if (sessionId && requestId) {
sourceRowMap.set(`${sessionId}:${requestId}`, row);
}
});
const activeSourceIds = Array.from(new Set(entryRefs.map((entryRef) => `${entryRef.sessionId}:${entryRef.requestId}`)));
await db.transaction(async (trx) => {
if (activeSourceIds.length > 0) {
await trx(CHAT_CONVERSATION_SOURCE_CHANGE_TABLE)
.where((builder) => {
activeSourceIds.forEach((entryId, index) => {
const dividerIndex = entryId.indexOf(':');
const sessionId = dividerIndex >= 0 ? entryId.slice(0, dividerIndex) : '';
const requestId = dividerIndex >= 0 ? entryId.slice(dividerIndex + 1) : '';
const method = index === 0 ? 'where' : 'orWhere';
builder[method]((nestedBuilder: any) => {
nestedBuilder.where({
session_id: sessionId,
request_id: requestId,
});
});
});
})
.update({
has_source_changes: false,
updated_at: db.fn.now(),
});
}
await Promise.all(metadata.features.map(async (feature, index) => {
const featureRows = feature.entryRefs
.map((entryRef) => sourceRowMap.get(`${entryRef.sessionId}:${entryRef.requestId}`) ?? null)
.filter((row): row is Record<string, unknown> => row != null);
if (featureRows.length === 0) {
return;
}
const changedFiles = stringifyStringArray(featureRows.flatMap((row) => parseStringArray(row.changed_files_json)));
const currentSourceFiles = stringifyStringArray(featureRows.flatMap((row) => parseStringArray(row.current_source_files_json)));
const diffBlocks = stringifyStringArray(featureRows.flatMap((row) => parseStringArray(row.diff_blocks_json)));
const payload = buildSourceChangeSnapshotPayload({
sessionId: args.sessionId,
clientId: args.clientId ?? null,
conversationTitle: args.conversationTitle ?? null,
chatTypeId: args.chatTypeId ?? null,
chatTypeLabel: args.chatTypeLabel ?? null,
requestId: buildVerificationGroupRequestId(args.requestId, feature.key, index),
status: args.status,
questionText: args.questionText,
answerText: args.answerText,
answeredAt: args.answeredAt ?? null,
updatedAt: args.updatedAt ?? null,
reviewStatus: selectedFeatureKeys.has(feature.key) ? 'reviewed' : 'not-reviewed',
sourceChangeKind: 'verification-group',
sourceEntryIds: feature.entryRefs.map((entryRef) => `${entryRef.sessionId}:${entryRef.requestId}`),
});
await trx(CHAT_CONVERSATION_SOURCE_CHANGE_TABLE)
.insert({
...payload,
request_title: `${feature.label} 검증`,
feature_tags_json: stringifyStringArray([feature.label]),
changed_files_json: changedFiles,
current_source_files_json: currentSourceFiles,
diff_blocks_json: diffBlocks,
has_source_changes: hasMeaningfulChatSourceArtifacts({
changedFiles: parseStringArray(changedFiles),
currentSourceFiles: parseStringArray(currentSourceFiles),
diffBlocks: parseStringArray(diffBlocks),
}),
})
.onConflict(['session_id', 'request_id'])
.merge();
}));
});
}
export async function syncChatSourceChangeSnapshot(sessionId: string, requestId: string) {
await ensureChatConversationTables();
const normalizedSessionId = sessionId.trim();
const normalizedRequestId = requestId.trim();
if (!normalizedSessionId || !normalizedRequestId) {
return null;
}
const [conversation, request] = await Promise.all([
db(CHAT_CONVERSATION_TABLE).where({ session_id: normalizedSessionId }).first(),
db(CHAT_CONVERSATION_REQUEST_TABLE)
.where({
session_id: normalizedSessionId,
request_id: normalizedRequestId,
})
.first(),
]);
if (!request) {
return null;
}
const status = String(request.status ?? 'accepted') as ChatConversationRequestStatus;
const questionText = String(request.user_text ?? '').trim();
const answerText = String(request.response_text ?? '').trim();
if (status !== 'completed' || !answerText || isPreparingChatReplyText(answerText)) {
return null;
}
const payload = buildSourceChangeSnapshotPayload({
sessionId: normalizedSessionId,
clientId: conversation?.client_id == null ? null : String(conversation.client_id),
conversationTitle: String(conversation?.title ?? '새 대화'),
chatTypeId: conversation?.chat_type_id == null ? null : String(conversation.chat_type_id),
chatTypeLabel: conversation?.context_label == null ? null : String(conversation.context_label),
requestId: normalizedRequestId,
status,
questionText,
answerText,
answeredAt: normalizeDateTimeValue(request.answered_at),
updatedAt: normalizeDateTimeValue(request.updated_at),
});
await db(CHAT_CONVERSATION_SOURCE_CHANGE_TABLE)
.insert(payload)
.onConflict(['session_id', 'request_id'])
.merge(payload);
await applyVerifiedSourceChangeGrouping({
sessionId: normalizedSessionId,
clientId: conversation?.client_id == null ? null : String(conversation.client_id),
conversationTitle: String(conversation?.title ?? '새 대화'),
chatTypeId: conversation?.chat_type_id == null ? null : String(conversation.chat_type_id),
chatTypeLabel: conversation?.context_label == null ? null : String(conversation.context_label),
requestId: normalizedRequestId,
questionText,
answerText,
status,
answeredAt: normalizeDateTimeValue(request.answered_at),
updatedAt: normalizeDateTimeValue(request.updated_at),
});
const row = await db(CHAT_CONVERSATION_SOURCE_CHANGE_TABLE)
.where({
session_id: normalizedSessionId,
request_id: normalizedRequestId,
})
.first();
return row ? mapSourceChangeSnapshotRow(row) : null;
}
export async function getChatConversationRequest(sessionId: string, requestId: string) {
const normalizedSessionId = sessionId.trim();
const normalizedRequestId = requestId.trim();
if (!normalizedSessionId || !normalizedRequestId) {
return null;
}
const conversation = await db(CHAT_CONVERSATION_TABLE)
.where({ session_id: normalizedSessionId })
.first();
const row = await db(CHAT_CONVERSATION_REQUEST_TABLE)
.where({
session_id: normalizedSessionId,
request_id: normalizedRequestId,
})
.first();
return row ? normalizeStaleRequestItem(mapRequestRow(row), conversation) : null;
}
async function refreshConversationPreview(sessionId: string) {
const latestMessage = await db(CHAT_CONVERSATION_MESSAGE_TABLE)
.where({ session_id: sessionId.trim() })
.whereIn('author', ['user', 'codex'])
.orderBy('created_at', 'desc')
.orderBy('id', 'desc')
.first();
await db(CHAT_CONVERSATION_TABLE)
.where({ session_id: sessionId.trim() })
.update({
last_message_preview: latestMessage && isPreviewableConversationMessage(latestMessage) ? createPreview(String(latestMessage.text ?? '')) : '',
last_message_at: latestMessage && isPreviewableConversationMessage(latestMessage) ? latestMessage.created_at : null,
updated_at: db.fn.now(),
});
}
export async function appendChatConversationMessage(
conversationPayload: z.input<typeof conversationPayloadSchema>,
messagePayload: z.input<typeof conversationMessagePayloadSchema>,
) {
const conversation = conversationPayloadSchema.parse(conversationPayload);
const message = conversationMessagePayloadSchema.parse(messagePayload);
await createChatConversation(conversation);
const currentConversation = await db(CHAT_CONVERSATION_TABLE).where({ session_id: conversation.sessionId }).first();
const resolvedClientRequestId =
message.clientRequestId?.trim() ||
(message.author === 'codex' ? String(currentConversation?.current_request_id ?? '').trim() || null : null);
await db(CHAT_CONVERSATION_MESSAGE_TABLE)
.insert({
session_id: message.sessionId,
message_id: message.messageId,
author: message.author,
text: message.text,
parts_json: stringifyChatMessageParts(message.parts),
display_timestamp: message.timestamp,
client_request_id: resolvedClientRequestId,
created_at: db.fn.now(),
})
.onConflict(['session_id', 'message_id'])
.merge({
author: message.author,
text: message.text,
parts_json: stringifyChatMessageParts(message.parts),
display_timestamp: message.timestamp,
client_request_id: resolvedClientRequestId,
});
const currentTitle = String(currentConversation?.title ?? '새 대화').trim() || '새 대화';
const nextTitle =
message.author === 'user' && (!currentTitle || currentTitle === '새 대화')
? buildConversationTitle(message.text)
: currentTitle;
await db(CHAT_CONVERSATION_TABLE)
.where({ session_id: conversation.sessionId })
.update({
client_id: normalizeClientId(conversation.clientId) || currentConversation?.client_id || null,
title: nextTitle,
chat_type_id: currentConversation?.chat_type_id || conversation.chatTypeId?.trim() || null,
last_chat_type_id:
currentConversation?.chat_type_id ||
currentConversation?.last_chat_type_id ||
conversation.chatTypeId?.trim() ||
conversation.lastChatTypeId?.trim() ||
null,
context_label:
currentConversation?.chat_type_id || currentConversation?.context_label
? currentConversation?.context_label || null
: conversation.contextLabel?.trim() || null,
context_description:
currentConversation?.chat_type_id || currentConversation?.context_description
? currentConversation?.context_description || null
: conversation.contextDescription?.trim() || null,
notify_offline:
conversation.notifyOffline == null ? Boolean(currentConversation?.notify_offline) : conversation.notifyOffline,
updated_at: db.fn.now(),
});
await refreshConversationPreview(conversation.sessionId);
if (normalizeClientId(conversation.clientId) && conversation.notifyOffline != null) {
await upsertChatConversationClientPreference(
conversation.sessionId,
normalizeClientId(conversation.clientId)!,
conversation.notifyOffline,
);
}
const requestPatch = buildChatConversationRequestPatchFromMessage({
id: message.messageId,
author: message.author,
text: message.text,
clientRequestId: resolvedClientRequestId,
});
if (requestPatch) {
await upsertChatConversationRequest(conversation.sessionId, {
requestId: requestPatch.requestId,
requesterClientId:
message.author === 'user'
? normalizeClientId(conversation.clientId)
: undefined,
status: requestPatch.status,
userMessageId: requestPatch.userMessageId,
userText: requestPatch.userText,
responseMessageId: requestPatch.responseMessageId,
responseText: requestPatch.responseText,
});
}
}
export async function appendChatConversationActivityLine(
sessionId: string,
requestId: string,
line: string,
lineNo?: number,
) {
const normalizedSessionId = sessionId.trim();
const normalizedRequestId = requestId.trim();
const normalizedLine = line.trim();
if (!normalizedSessionId || !normalizedRequestId || !normalizedLine) {
return null;
}
const normalizedLineNo =
Number.isInteger(lineNo) && Number(lineNo) > 0 ? Number(lineNo) : undefined;
let nextLineNo = normalizedLineNo ?? null;
if (nextLineNo == null) {
const existingLineCountRow = await db(CHAT_CONVERSATION_ACTIVITY_TABLE)
.where({
session_id: normalizedSessionId,
request_id: normalizedRequestId,
})
.count<{ count?: string | number }>('id as count')
.first();
nextLineNo = Number(existingLineCountRow?.count ?? 0) + 1;
}
await db(CHAT_CONVERSATION_ACTIVITY_TABLE)
.insert({
session_id: normalizedSessionId,
request_id: normalizedRequestId,
line_no: nextLineNo,
text: normalizedLine,
created_at: db.fn.now(),
})
.onConflict(['session_id', 'request_id', 'line_no'])
.merge({
text: normalizedLine,
created_at: db.fn.now(),
});
return nextLineNo;
}
export async function listChatConversationActivityLogs(
sessionId: string,
limitRequests = 500,
): Promise<ChatConversationActivityLogItem[]> {
const normalizedSessionId = sessionId.trim();
if (!normalizedSessionId) {
return [];
}
const requestRows = await db(CHAT_CONVERSATION_REQUEST_TABLE)
.select('request_id')
.where({ session_id: normalizedSessionId })
.whereNot('status', 'removed')
.orderBy('created_at', 'desc')
.orderBy('request_id', 'desc')
.limit(limitRequests);
const requestIds = requestRows
.map((row) => String(row.request_id ?? '').trim())
.filter(Boolean);
if (requestIds.length === 0) {
return [];
}
const rows = await db(CHAT_CONVERSATION_ACTIVITY_TABLE)
.select('session_id', 'request_id', 'text', 'line_no', 'created_at')
.where({ session_id: normalizedSessionId })
.whereIn('request_id', requestIds)
.orderBy('request_id', 'asc')
.orderBy('line_no', 'asc')
.orderBy('id', 'asc');
const activityMap = new Map<string, ChatConversationActivityLogItem>();
for (const row of rows) {
const requestId = String(row.request_id ?? '').trim();
if (!requestId) {
continue;
}
const existing = activityMap.get(requestId);
if (existing) {
existing.lines.push(String(row.text ?? ''));
existing.updatedAt = normalizeDateTimeValue(row.created_at) ?? existing.updatedAt;
continue;
}
activityMap.set(requestId, {
sessionId: String(row.session_id ?? normalizedSessionId),
requestId,
lines: [String(row.text ?? '')],
updatedAt: normalizeDateTimeValue(row.created_at),
});
}
return requestIds.map((requestId) => activityMap.get(requestId)).filter(Boolean) as ChatConversationActivityLogItem[];
}
export async function listRecoverableChatConversationRequests(): Promise<RecoverableChatConversationRequestItem[]> {
await ensureChatConversationTables();
const rows = await db(`${CHAT_CONVERSATION_REQUEST_TABLE} as request`)
.join(`${CHAT_CONVERSATION_TABLE} as conversation`, 'conversation.session_id', 'request.session_id')
.select(
'request.session_id',
'request.request_id',
'request.status',
'request.user_text',
'request.created_at',
'conversation.client_id',
'conversation.chat_type_id',
'conversation.last_chat_type_id',
'conversation.general_section_name',
'conversation.context_label',
'conversation.context_description',
'conversation.current_request_id',
'conversation.current_job_status',
)
.whereIn('request.status', ['accepted', 'queued', 'started'])
.andWhere((builder) => {
builder.whereNull('request.terminal_at');
})
.andWhere((builder) => {
builder.whereNull('request.response_message_id').orWhere('request.response_message_id', 0);
})
.orderByRaw(
"case when request.request_id = conversation.current_request_id then 0 else 1 end asc",
)
.orderBy('request.session_id', 'asc')
.orderBy('request.created_at', 'asc')
.orderBy('request.request_id', 'asc');
return rows
.map((row) => {
const sessionId = String(row.session_id ?? '').trim();
const requestId = String(row.request_id ?? '').trim();
const userText = String(row.user_text ?? '').trim();
const createdAt = normalizeDateTimeValue(row.created_at) ?? '';
if (!sessionId || !requestId || !userText || !createdAt) {
return null;
}
return {
sessionId,
clientId: row.client_id == null ? null : String(row.client_id),
chatTypeId: row.chat_type_id == null ? null : String(row.chat_type_id),
lastChatTypeId: row.last_chat_type_id == null ? null : String(row.last_chat_type_id),
generalSectionName: row.general_section_name == null ? null : String(row.general_section_name),
contextLabel: row.context_label == null ? null : String(row.context_label),
contextDescription: row.context_description == null ? null : String(row.context_description),
currentRequestId: row.current_request_id == null ? null : String(row.current_request_id),
currentJobStatus:
row.current_job_status == null ? null : (String(row.current_job_status) as ChatConversationItem['currentJobStatus']),
requestId,
status: String(row.status ?? 'accepted') as ChatConversationRequestStatus,
userText,
createdAt,
} satisfies RecoverableChatConversationRequestItem;
})
.filter(Boolean) as RecoverableChatConversationRequestItem[];
}
export async function updateChatConversationJobState(
sessionId: string,
payload: {
requestId?: string | null;
status?: 'queued' | 'started' | 'completed' | 'failed' | null;
message?: string | null;
queueSize?: number | null;
clear?: boolean;
},
) {
const current = await db(CHAT_CONVERSATION_TABLE).where({ session_id: sessionId.trim() }).first();
if (!current) {
return null;
}
const shouldClear = payload.clear === true;
await db(CHAT_CONVERSATION_TABLE)
.where({ session_id: sessionId.trim() })
.update({
current_request_id: shouldClear ? null : payload.requestId?.trim() || current.current_request_id || null,
current_job_status: shouldClear ? null : payload.status ?? current.current_job_status ?? null,
current_job_message: shouldClear ? null : payload.message?.trim() || null,
current_queue_size: shouldClear ? 0 : Math.max(0, Math.round(payload.queueSize ?? current.current_queue_size ?? 0)),
current_status_updated_at: db.fn.now(),
updated_at: db.fn.now(),
});
const row = await db(CHAT_CONVERSATION_TABLE).where({ session_id: sessionId.trim() }).first();
return row ? mapConversationRow(row) : null;
}
async function clearConversationJobStateForRequest(sessionId: string, requestId: string) {
await db(CHAT_CONVERSATION_TABLE)
.where({
session_id: sessionId,
current_request_id: requestId,
})
.update({
current_request_id: null,
current_job_status: null,
current_job_message: null,
current_queue_size: 0,
current_status_updated_at: db.fn.now(),
updated_at: db.fn.now(),
});
}
export async function upsertChatConversationRequest(
sessionId: string,
payload: {
requestId: string;
requesterClientId?: string | null;
status?: ChatConversationRequestStatus;
statusMessage?: string | null;
userMessageId?: number | null;
userText?: string | null;
responseMessageId?: number | null;
responseText?: string | null;
},
) {
const normalizedSessionId = sessionId.trim();
const normalizedRequestId = payload.requestId.trim();
const normalizedRequesterClientId = payload.requesterClientId?.trim() || null;
if (!normalizedSessionId || !normalizedRequestId) {
return null;
}
let nextRow:
| {
session_id: string;
request_id: string;
requester_client_id: string | null;
status: ChatConversationRequestStatus;
status_message: string | null;
user_message_id: number | null;
user_text: string;
response_message_id: number | null;
response_text: string;
answered_at: unknown;
terminal_at: unknown;
created_at: unknown;
updated_at: unknown;
}
| null = null;
let nextStatus: ChatConversationRequestStatus = payload.status ?? 'accepted';
for (let attempt = 0; attempt < 3; attempt += 1) {
const current = await db(CHAT_CONVERSATION_REQUEST_TABLE)
.where({
session_id: normalizedSessionId,
request_id: normalizedRequestId,
})
.first();
nextStatus = mergeChatConversationRequestStatus(
(current?.status as ChatConversationRequestStatus | undefined) ?? null,
payload.status ?? null,
);
const currentStatus = (current?.status as ChatConversationRequestStatus | undefined) ?? null;
const defaultStatusMessage =
payload.status && payload.status !== currentStatus
? getDefaultChatConversationRequestStatusMessage(nextStatus)
: null;
const terminalStatus = ['completed', 'failed', 'cancelled', 'removed'].includes(nextStatus)
? db.fn.now()
: current?.terminal_at ?? null;
const answeredAt =
payload.responseMessageId != null || (payload.responseText?.trim() ?? '').length > 0
? current?.answered_at ?? db.fn.now()
: current?.answered_at ?? null;
nextRow = {
session_id: normalizedSessionId,
request_id: normalizedRequestId,
requester_client_id: normalizedRequesterClientId ?? current?.requester_client_id ?? null,
status: nextStatus,
status_message: payload.statusMessage?.trim() || defaultStatusMessage || current?.status_message || null,
user_message_id: payload.userMessageId ?? current?.user_message_id ?? null,
user_text: payload.userText ?? current?.user_text ?? '',
response_message_id: payload.responseMessageId ?? current?.response_message_id ?? null,
response_text: payload.responseText ?? current?.response_text ?? '',
answered_at: answeredAt,
terminal_at: terminalStatus,
created_at: current?.created_at ?? db.fn.now(),
updated_at: db.fn.now(),
};
if (current) {
await db(CHAT_CONVERSATION_REQUEST_TABLE)
.where({
session_id: normalizedSessionId,
request_id: normalizedRequestId,
})
.update(nextRow);
break;
}
const insertedRows = await db(CHAT_CONVERSATION_REQUEST_TABLE)
.insert(nextRow)
.onConflict(['session_id', 'request_id'])
.ignore()
.returning(['session_id']);
if (insertedRows.length > 0) {
break;
}
}
if (!nextRow) {
return null;
}
const row = await db(CHAT_CONVERSATION_REQUEST_TABLE)
.where({
session_id: normalizedSessionId,
request_id: normalizedRequestId,
})
.first();
if (
shouldClearConversationJobState({
currentRequestId: normalizedRequestId,
currentJobStatus: 'started',
request: {
requestId: normalizedRequestId,
status: nextStatus,
responseMessageId: nextRow.response_message_id,
responseText: nextRow.response_text,
terminalAt: nextRow.terminal_at == null ? null : String(nextRow.terminal_at),
},
})
) {
await clearConversationJobStateForRequest(normalizedSessionId, normalizedRequestId);
}
if (
nextStatus === 'completed' &&
(nextRow.response_message_id != null || String(nextRow.response_text ?? '').trim().length > 0)
) {
await syncChatSourceChangeSnapshot(normalizedSessionId, normalizedRequestId);
}
return row ? mapRequestRow(row) : null;
}
export async function repairChatConversationRequestLinks(sessionId?: string | null) {
await ensureChatConversationTables();
const normalizedSessionId = sessionId?.trim() || null;
const sessionRows = normalizedSessionId
? [{ session_id: normalizedSessionId }]
: await db(CHAT_CONVERSATION_REQUEST_TABLE).distinct('session_id').orderBy('session_id', 'asc');
let repairedRequestCount = 0;
let linkedMessageCount = 0;
let completedStatusCount = 0;
const touchedSessions: string[] = [];
for (const sessionRow of sessionRows) {
const currentSessionId = String(sessionRow.session_id ?? '').trim();
if (!currentSessionId) {
continue;
}
const requestRows = await db(CHAT_CONVERSATION_REQUEST_TABLE)
.where({ session_id: currentSessionId })
.orderBy('created_at', 'asc')
.orderBy('request_id', 'asc');
const messageRows = await db(CHAT_CONVERSATION_MESSAGE_TABLE)
.where({ session_id: currentSessionId })
.select('id', 'message_id', 'author', 'text', 'parts_json', 'client_request_id', 'created_at')
.orderBy('created_at', 'asc')
.orderBy('message_id', 'asc')
.orderBy('id', 'asc');
let sessionTouched = false;
const responseMessages: ChatConversationResponseCandidate[] = messageRows.map((row) => ({
id: Number(row.id ?? 0),
messageId: Number(row.message_id ?? 0),
author: String(row.author ?? 'codex') as StoredChatMessage['author'],
text: String(row.text ?? ''),
clientRequestId: row.client_request_id == null ? null : String(row.client_request_id),
createdAt: normalizeDateTimeValue(row.created_at),
}));
for (let index = 0; index < requestRows.length; index += 1) {
const requestRow = requestRows[index];
const nextRequestRow = requestRows[index + 1];
const requestId = String(requestRow.request_id ?? '').trim();
if (!requestId) {
continue;
}
const candidate = selectChatConversationResponseCandidate(
{
requestId,
createdAt: normalizeDateTimeValue(requestRow.created_at) ?? '',
responseMessageId: requestRow.response_message_id == null ? null : Number(requestRow.response_message_id),
},
nextRequestRow
? {
createdAt: normalizeDateTimeValue(nextRequestRow.created_at) ?? '',
}
: undefined,
responseMessages,
);
if (!candidate) {
continue;
}
if ((candidate.clientRequestId?.trim() || null) !== requestId) {
await db(CHAT_CONVERSATION_MESSAGE_TABLE)
.where({
session_id: currentSessionId,
message_id: candidate.messageId,
})
.update({
client_request_id: requestId,
});
candidate.clientRequestId = requestId;
linkedMessageCount += 1;
sessionTouched = true;
}
const shouldPromoteToCompleted =
!isTerminalRequestStatus(String(requestRow.status ?? '') as ChatConversationRequestStatus) &&
requestRow.terminal_at != null;
const nextStatus = shouldPromoteToCompleted ? 'completed' : undefined;
const previousStatus = String(requestRow.status ?? '').trim();
await upsertChatConversationRequest(currentSessionId, {
requestId,
status: nextStatus,
responseMessageId: candidate.messageId,
responseText: candidate.text,
});
if (
requestRow.response_message_id == null ||
String(requestRow.response_text ?? '') !== candidate.text ||
requestRow.answered_at == null
) {
repairedRequestCount += 1;
sessionTouched = true;
}
if (nextStatus === 'completed' && previousStatus !== 'completed') {
completedStatusCount += 1;
sessionTouched = true;
}
}
if (sessionTouched) {
touchedSessions.push(currentSessionId);
}
}
return {
sessionCount: sessionRows.length,
touchedSessions,
repairedRequestCount,
linkedMessageCount,
completedStatusCount,
};
}
export async function deleteUnansweredChatConversationRequest(sessionId: string, requestId: string) {
const normalizedSessionId = sessionId.trim();
const normalizedRequestId = requestId.trim();
const current = await db(CHAT_CONVERSATION_REQUEST_TABLE)
.where({
session_id: normalizedSessionId,
request_id: normalizedRequestId,
})
.first();
if (!current) {
return { deleted: false, reason: 'not_found' as const };
}
const conversation = await db(CHAT_CONVERSATION_TABLE)
.where({ session_id: normalizedSessionId })
.first();
const mapped = normalizeStaleRequestItem(mapRequestRow(current), conversation);
if (mapped.hasResponse) {
return { deleted: false, reason: 'answered' as const };
}
if (mapped.status === 'queued' || mapped.status === 'started') {
return { deleted: false, reason: 'active' as const };
}
await db.transaction(async (trx) => {
await trx(CHAT_CONVERSATION_MESSAGE_TABLE)
.where({
session_id: normalizedSessionId,
client_request_id: normalizedRequestId,
})
.del();
await trx(CHAT_CONVERSATION_REQUEST_TABLE)
.where({
session_id: normalizedSessionId,
request_id: normalizedRequestId,
})
.del();
const conversation = await trx(CHAT_CONVERSATION_TABLE)
.where({ session_id: normalizedSessionId })
.first();
if (conversation?.current_request_id === normalizedRequestId) {
await trx(CHAT_CONVERSATION_TABLE)
.where({ session_id: normalizedSessionId })
.update({
current_request_id: null,
current_job_status: null,
current_job_message: null,
current_queue_size: 0,
current_status_updated_at: db.fn.now(),
updated_at: db.fn.now(),
});
}
});
await refreshConversationPreview(normalizedSessionId);
return { deleted: true, reason: null as null };
}
export async function clearAllChatConversationJobStates() {
await ensureChatConversationTables();
await db(CHAT_CONVERSATION_TABLE)
.whereNotNull('current_job_status')
.update({
current_request_id: null,
current_job_status: null,
current_job_message: null,
current_queue_size: 0,
current_status_updated_at: db.fn.now(),
updated_at: db.fn.now(),
});
}
export async function deleteChatConversation(sessionId: string) {
const normalizedSessionId = sessionId.trim();
return db.transaction(async (trx) => {
await trx(CHAT_CONVERSATION_SOURCE_CHANGE_TABLE)
.where({ session_id: normalizedSessionId })
.update({
conversation_deleted_at: db.fn.now(),
updated_at: db.fn.now(),
});
await trx(CHAT_CONVERSATION_CLIENT_TABLE).where({ session_id: normalizedSessionId }).del();
await trx(CHAT_CONVERSATION_ACTIVITY_TABLE).where({ session_id: normalizedSessionId }).del();
await trx(CHAT_CONVERSATION_REQUEST_TABLE).where({ session_id: normalizedSessionId }).del();
await trx(CHAT_CONVERSATION_MESSAGE_TABLE).where({ session_id: normalizedSessionId }).del();
const deletedCount = await trx(CHAT_CONVERSATION_TABLE).where({ session_id: normalizedSessionId }).del();
return deletedCount > 0;
});
}
export async function clearChatConversationData(sessionId: string, clientId?: string | null) {
const normalizedSessionId = sessionId.trim();
await db.transaction(async (trx) => {
await trx(CHAT_CONVERSATION_ACTIVITY_TABLE).where({ session_id: normalizedSessionId }).del();
await trx(CHAT_CONVERSATION_REQUEST_TABLE).where({ session_id: normalizedSessionId }).del();
await trx(CHAT_CONVERSATION_MESSAGE_TABLE).where({ session_id: normalizedSessionId }).del();
await trx(CHAT_CONVERSATION_CLIENT_TABLE)
.where({ session_id: normalizedSessionId })
.update({
last_read_response_message_id: null,
updated_at: db.fn.now(),
});
await trx(CHAT_CONVERSATION_TABLE)
.where({ session_id: normalizedSessionId })
.update({
current_request_id: null,
current_job_status: null,
current_job_message: null,
current_queue_size: 0,
current_status_updated_at: null,
last_message_preview: '',
last_message_at: null,
updated_at: db.fn.now(),
});
});
return getChatConversation(normalizedSessionId, clientId);
}
export async function getChatConversationClientPreference(sessionId: string, clientId: string) {
const row = await db(CHAT_CONVERSATION_CLIENT_TABLE)
.where({
session_id: sessionId.trim(),
client_id: clientId.trim(),
})
.first();
return row ? mapClientPreferenceRow(row) : null;
}
async function listRegisteredNotificationClientIds(clientIds: string[]) {
const normalizedClientIds = [...new Set(clientIds.map((item) => String(item ?? '').trim()).filter(Boolean))];
if (!normalizedClientIds.length) {
return new Set<string>();
}
const [webPushRows, tokenRows] = await Promise.all([
db(WEB_PUSH_SUBSCRIPTION_TABLE)
.where({ is_enabled: true })
.whereIn('device_id', normalizedClientIds)
.select('device_id'),
db(NOTIFICATION_TOKEN_TABLE)
.where({ is_enabled: true })
.whereIn('device_id', normalizedClientIds)
.select('device_id'),
]);
return new Set(
[...webPushRows, ...tokenRows]
.map((row) => String(row.device_id ?? '').trim())
.filter(Boolean),
);
}
export async function cleanupStaleChatConversationOfflineNotificationClients(
sessionId: string,
options?: {
keepClientIds?: Iterable<string | null | undefined>;
},
) {
const normalizedSessionId = sessionId.trim();
if (!normalizedSessionId) {
return [] as string[];
}
const rows = await db(CHAT_CONVERSATION_CLIENT_TABLE)
.where({
session_id: normalizedSessionId,
notify_offline: true,
})
.select('client_id');
const optedInClientIds = rows
.map((row) => String(row.client_id ?? '').trim())
.filter(Boolean);
const registeredClientIds = await listRegisteredNotificationClientIds(optedInClientIds);
const staleClientIds = selectStaleOfflineNotificationClientIds(
optedInClientIds.map((clientId) => ({
clientId,
notifyOffline: true,
hasActivePushRegistration: registeredClientIds.has(clientId),
})),
options,
);
if (!staleClientIds.length) {
return [] as string[];
}
await db(CHAT_CONVERSATION_CLIENT_TABLE)
.where({ session_id: normalizedSessionId })
.whereIn('client_id', staleClientIds)
.update({
notify_offline: false,
updated_at: db.fn.now(),
});
await cleanupNotificationMessagesForStaleTargetClients({
sessionId: normalizedSessionId,
staleClientIds,
});
return staleClientIds;
}
export async function listChatConversationOfflineNotificationClientIds(
sessionId: string,
options?: {
keepClientIds?: Iterable<string | null | undefined>;
},
) {
await cleanupStaleChatConversationOfflineNotificationClients(sessionId, options);
const rows = await db(CHAT_CONVERSATION_CLIENT_TABLE)
.where({
session_id: sessionId.trim(),
notify_offline: true,
})
.select('client_id');
return rows
.map((row) => String(row.client_id ?? '').trim())
.filter(Boolean);
}
export async function upsertChatConversationClientPreference(sessionId: string, clientId: string, notifyOffline: boolean) {
const normalizedSessionId = sessionId.trim();
const normalizedClientId = clientId.trim();
await db(CHAT_CONVERSATION_CLIENT_TABLE)
.insert({
session_id: normalizedSessionId,
client_id: normalizedClientId,
notify_offline: notifyOffline,
last_read_response_message_id: null,
created_at: db.fn.now(),
updated_at: db.fn.now(),
})
.onConflict(['session_id', 'client_id'])
.merge({
notify_offline: notifyOffline,
updated_at: db.fn.now(),
});
if (notifyOffline) {
await cleanupStaleChatConversationOfflineNotificationClients(normalizedSessionId, {
keepClientIds: [normalizedClientId],
});
}
return getChatConversationClientPreference(normalizedSessionId, normalizedClientId);
}
export async function markChatConversationResponsesRead(sessionId: string, clientId: string) {
const normalizedSessionId = sessionId.trim();
const normalizedClientId = clientId.trim();
if (!normalizedSessionId || !normalizedClientId) {
return null;
}
const currentConversation = await db(CHAT_CONVERSATION_TABLE)
.where({ session_id: normalizedSessionId })
.first();
if (!currentConversation) {
return null;
}
const latestResponseMessageId = await getLatestResponseMessageId(normalizedSessionId);
await db(CHAT_CONVERSATION_CLIENT_TABLE)
.insert({
session_id: normalizedSessionId,
client_id: normalizedClientId,
notify_offline: Boolean(currentConversation.notify_offline),
last_read_response_message_id: latestResponseMessageId,
created_at: db.fn.now(),
updated_at: db.fn.now(),
})
.onConflict(['session_id', 'client_id'])
.merge({
last_read_response_message_id: latestResponseMessageId,
updated_at: db.fn.now(),
});
await cleanupStaleChatConversationOfflineNotificationClients(normalizedSessionId, {
keepClientIds: [normalizedClientId],
});
return {
sessionId: normalizedSessionId,
lastReadResponseMessageId: latestResponseMessageId,
};
}