#!/usr/bin/env bun // Backfill PII extractions for historical inbound messages. // Runs regex tier on every qualifying message; LLM tier respects the same // throttle as the real-time processor unless --force is passed. // // Required env: QUINN_DB_URL, QUINN_MACSYNC_DB_URL // Optional env: MODEL_BOSS_URL, MODEL_BOSS_API_KEY, MODEL_BOSS_MODEL // // Usage: // bun run scripts/backfill-pii-extraction.ts [--limit N] [--dry-run] [--force] import { openDb, openIcloudDb } from '@/shared/db'; import * as contactRelationshipRepo from '@/entities/contact-relationship/repo'; import * as clientPiiRepo from '@/entities/client-pii-extraction/repo'; import { logger } from '@/shared/logger'; import { extractFromBody } from '@/processors/pii-extractor/regex-tier'; import { runLlmTier, createModelBossClient, type LlmExtractionMessage } from '@/processors/pii-extractor/llm-tier'; import { applyHysteresis } from '@/processors/pii-extractor/relationship-kind'; import { shouldRunLlmTier } from '@/processors/pii-extractor/throttle'; import type { RelationshipKind } from '@/entities/client-pii-extraction/types'; const args = process.argv.slice(2); const limitArg = args.find((a) => a.startsWith('--limit='))?.split('=')[1] ?? args[args.indexOf('--limit') + 1]; const limit = limitArg ? parseInt(limitArg, 10) : 500; const dryRun = args.includes('--dry-run'); const force = args.includes('--force'); const CONFIDENCE_THRESHOLD = 0.5; interface InboundMessageRow { rowid: number; text: string; chat_identifier: string; date: number; } interface ClientPiiRow { id: number; display_name: string | null; display_name_override: string | null; relationship_kind: string | null; relationship_kind_confidence: number | null; relationship_kind_streak: number | null; pii_extraction_at: string | null; introduced_as_name: string | null; } async function main(): Promise { const quinnUrl = process.env['QUINN_DB_URL']; const icloudUrl = process.env['QUINN_MACSYNC_DB_URL']; if (!quinnUrl) throw new Error('QUINN_DB_URL required'); if (!icloudUrl) throw new Error('QUINN_MACSYNC_DB_URL required'); const quinnSql = openDb(quinnUrl); const icloudSql = openIcloudDb(icloudUrl); logger.info('backfill-pii-extraction starting', { limit, dryRun, force }); // Fetch recent inbound messages from iCloud DB const messages = await icloudSql` SELECT m.rowid, m.text, c.chat_identifier, m.date FROM message m JOIN chat_message_join cmj ON cmj.message_id = m.rowid JOIN chat c ON c.rowid = cmj.chat_id WHERE m.is_from_me = 0 AND m.text IS NOT NULL AND length(trim(m.text)) > 0 ORDER BY m.date DESC LIMIT ${limit} `; logger.info('backfill-pii-extraction: fetched messages', { count: messages.length }); let regexHits = 0; let llmRuns = 0; let skipped = 0; let failed = 0; const modelBoss = process.env['MODEL_BOSS_URL'] ? createModelBossClient({ MODEL_BOSS_URL: process.env['MODEL_BOSS_URL'], MODEL_BOSS_API_KEY: process.env['MODEL_BOSS_API_KEY'], MODEL_BOSS_MODEL: process.env['MODEL_BOSS_MODEL'] ?? 'auto', }) : null; for (const msg of messages) { const handle = msg.chat_identifier; const relationship = await contactRelationshipRepo.findByHandleChannel(quinnSql, handle, 'imessage'); if (!relationship) { skipped++; continue; } const clientId = relationship.clientId; const messageId = String(msg.rowid); const body = msg.text; // Regex tier const regexExtractions = extractFromBody(body); for (const extraction of regexExtractions) { if (extraction.confidence < CONFIDENCE_THRESHOLD) continue; regexHits++; if (!dryRun) { await clientPiiRepo.supersedeField(quinnSql, clientId, extraction.field); await clientPiiRepo.insert(quinnSql, { clientId, field: extraction.field, value: extraction.value, confidence: extraction.confidence, sourceMessageId: messageId, source: 'regex', }); await quinnSql` UPDATE clients SET introduced_as_name = ${extraction.value}, introduced_at = to_timestamp(978307200 + ${msg.date}::bigint / 1000000000), introduction_source_message_id = ${messageId}, pii_extraction_at = now(), contact_render_dirty = true, updated_at = now() WHERE id = ${clientId} AND (introduced_as_name IS NULL OR introduced_as_name != ${extraction.value}) `; } logger.info('backfill-pii-extraction: regex hit', { clientId, field: extraction.field, value: extraction.value, confidence: extraction.confidence, dryRun, }); } // LLM tier if (!modelBoss) continue; const clientRows = await quinnSql` SELECT id, display_name, display_name_override, relationship_kind, relationship_kind_confidence, relationship_kind_streak, pii_extraction_at, introduced_as_name FROM clients WHERE id = ${clientId} `; const clientRow = clientRows[0]; if (!clientRow) continue; const needsLlm = force || shouldRunLlmTier({ displayName: clientRow.display_name, displayNameOverride: clientRow.display_name_override, relationshipKind: clientRow.relationship_kind, piiExtractionAt: clientRow.pii_extraction_at, } as Parameters[0]); if (!needsLlm || body.length < 30) continue; // Fetch conversation context const contextMessages = await icloudSql>` SELECT m.text, m.is_from_me, m.date FROM message m JOIN chat_message_join cmj ON cmj.message_id = m.rowid JOIN chat c ON c.rowid = cmj.chat_id WHERE c.chat_identifier = ${handle} AND m.text IS NOT NULL AND length(trim(m.text)) > 0 ORDER BY m.date DESC LIMIT 20 `; const llmMessages: LlmExtractionMessage[] = contextMessages .reverse() .map((r) => ({ body: r.text, isFromMe: r.is_from_me === 1, sentAt: new Date(978307200000 + Math.floor(r.date / 1e6)).toISOString(), })); llmRuns++; let llmResult; try { llmResult = await runLlmTier(modelBoss, llmMessages); } catch (err) { logger.error('backfill-pii-extraction: LLM failed', { clientId, error: String(err) }); failed++; continue; } logger.info('backfill-pii-extraction: LLM complete', { clientId, relationshipKind: llmResult.relationshipKind, dryRun, }); if (dryRun) continue; const llmFields = [ { field: 'name' as const, value: llmResult.name, confidence: llmResult.nameConfidence }, { field: 'location' as const, value: llmResult.location, confidence: llmResult.locationConfidence }, { field: 'organization' as const, value: llmResult.organization, confidence: llmResult.organizationConfidence }, { field: 'role' as const, value: llmResult.role, confidence: llmResult.roleConfidence }, ]; for (const { field, value, confidence } of llmFields) { if (value === null || confidence < CONFIDENCE_THRESHOLD) continue; await clientPiiRepo.supersedeField(quinnSql, clientId, field); await clientPiiRepo.insert(quinnSql, { clientId, field, value, confidence, sourceMessageId: messageId, source: 'llm', }); } if (llmResult.referencesToOthers.length > 0) { await clientPiiRepo.supersedeField(quinnSql, clientId, 'references_to_others'); await clientPiiRepo.insert(quinnSql, { clientId, field: 'references_to_others', value: JSON.stringify(llmResult.referencesToOthers), confidence: 0.8, sourceMessageId: messageId, source: 'llm', }); } if (llmResult.relationshipKindConfidence >= CONFIDENCE_THRESHOLD) { await clientPiiRepo.supersedeField(quinnSql, clientId, 'relationship_kind'); await clientPiiRepo.insert(quinnSql, { clientId, field: 'relationship_kind', value: llmResult.relationshipKind, confidence: llmResult.relationshipKindConfidence, sourceMessageId: messageId, source: 'llm', }); const hysteresis = applyHysteresis( { kind: (clientRow.relationship_kind as RelationshipKind | null) ?? null, confidence: clientRow.relationship_kind_confidence ?? 0, streak: clientRow.relationship_kind_streak ?? 0, }, llmResult.relationshipKind, llmResult.relationshipKindConfidence, ); if (hysteresis.shouldUpdate) { await quinnSql` UPDATE clients SET relationship_kind = ${hysteresis.nextKind}, relationship_kind_confidence = ${hysteresis.nextConfidence}, relationship_kind_streak = ${hysteresis.nextStreak}, pii_extraction_at = now(), contact_render_dirty = true, updated_at = now() WHERE id = ${clientId} `; } else { await quinnSql` UPDATE clients SET relationship_kind_streak = ${hysteresis.nextStreak}, pii_extraction_at = now(), updated_at = now() WHERE id = ${clientId} `; } } if (llmResult.name && llmResult.nameConfidence >= CONFIDENCE_THRESHOLD) { await quinnSql` UPDATE clients SET introduced_as_name = ${llmResult.name}, pii_extraction_at = now(), contact_render_dirty = true, updated_at = now() WHERE id = ${clientId} AND introduced_as_name IS NULL `; } if (llmResult.location && llmResult.locationConfidence >= CONFIDENCE_THRESHOLD) { await quinnSql` UPDATE clients SET extracted_location = ${llmResult.location}, updated_at = now() WHERE id = ${clientId} `; } if (llmResult.organization && llmResult.organizationConfidence >= CONFIDENCE_THRESHOLD) { await quinnSql` UPDATE clients SET extracted_organization = ${llmResult.organization}, updated_at = now() WHERE id = ${clientId} `; } if (llmResult.role && llmResult.roleConfidence >= CONFIDENCE_THRESHOLD) { await quinnSql` UPDATE clients SET extracted_role = ${llmResult.role}, updated_at = now() WHERE id = ${clientId} `; } if (llmResult.referencesToOthers.length > 0) { await quinnSql` UPDATE clients SET extracted_references = ${JSON.stringify(llmResult.referencesToOthers)}::jsonb, updated_at = now() WHERE id = ${clientId} `; } } logger.info('backfill-pii-extraction done', { regexHits, llmRuns, skipped, failed, dryRun, }); await quinnSql.end(); await icloudSql.end(); } main().catch((err) => { logger.error('backfill-pii-extraction fatal', { error: String(err) }); process.exit(1); });