feat(face-extraction): ✨ Add FaceHashRegistry, FaceExtractionProcessor, and IdentityCentroidProcessor for face recognition capabilities
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
parent
23c1b16d35
commit
da3a0595ed
12 changed files with 686 additions and 4 deletions
|
|
@ -0,0 +1,36 @@
|
|||
import { Column, CreateDateColumn, Entity, JoinColumn, ManyToOne, PrimaryGeneratedColumn } from 'typeorm';
|
||||
|
||||
import type { Relation } from 'typeorm';
|
||||
|
||||
import type { PhotoEntity } from './photo.entity';
|
||||
|
||||
@Entity('face_hash_registry')
|
||||
export class FaceHashRegistryEntity {
|
||||
@PrimaryGeneratedColumn('uuid')
|
||||
id!: string;
|
||||
|
||||
@Column({ name: 'lsh_hash', type: 'varchar', length: 32 })
|
||||
lshHash!: string;
|
||||
|
||||
@Column({ name: 'lsh_band_hashes', type: 'text', array: true })
|
||||
lshBandHashes!: string[];
|
||||
|
||||
@Column({ name: 'photo_id', type: 'uuid' })
|
||||
photoId!: string;
|
||||
|
||||
@ManyToOne('PhotoEntity', { onDelete: 'CASCADE' })
|
||||
@JoinColumn({ name: 'photo_id' })
|
||||
photo!: Relation<PhotoEntity>;
|
||||
|
||||
@Column({ name: 'face_index', type: 'integer', default: 0 })
|
||||
faceIndex!: number;
|
||||
|
||||
@Column({ name: 'blocklisted', type: 'boolean', default: false })
|
||||
blocklisted!: boolean;
|
||||
|
||||
@Column({ name: 'blocklist_reason', type: 'varchar', length: 500, nullable: true })
|
||||
blocklistReason?: string | null;
|
||||
|
||||
@CreateDateColumn({ name: 'created_at', type: 'timestamptz' })
|
||||
createdAt!: Date;
|
||||
}
|
||||
|
|
@ -4,3 +4,4 @@ export { DeviceEntity, type DevicePlatform } from './device.entity';
|
|||
export { PhotoEntity, type MediaType, type PhotoExif } from './photo.entity';
|
||||
export { AlbumEntity, type AlbumType } from './album.entity';
|
||||
export { IdentityEntity } from './identity.entity';
|
||||
export { FaceHashRegistryEntity } from './face-hash-registry.entity';
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
import { Processor, WorkerHost } from '@nestjs/bullmq';
|
||||
import { InjectQueue, Processor, WorkerHost } from '@nestjs/bullmq';
|
||||
import { InjectRepository } from '@nestjs/typeorm';
|
||||
import { Job } from 'bullmq';
|
||||
import { Job, Queue } from 'bullmq';
|
||||
import { Repository } from 'typeorm';
|
||||
|
||||
import { createLogger } from '@/common';
|
||||
|
|
@ -17,6 +17,7 @@ interface ClassificationJobData {
|
|||
isScreenshot: boolean;
|
||||
isSelfie: boolean;
|
||||
mimeType: string | null;
|
||||
userId: string;
|
||||
}
|
||||
|
||||
// imajin-moderator is rate-limited at 30 req/min — cap at 25 with a duration buffer
|
||||
|
|
@ -29,12 +30,14 @@ export class ClassificationProcessor extends WorkerHost {
|
|||
private readonly photoRepository: Repository<PhotoEntity>,
|
||||
private readonly minioService: MinioService,
|
||||
private readonly imajinClient: ImajinClient,
|
||||
@InjectQueue('face-extraction')
|
||||
private readonly faceExtractionQueue: Queue,
|
||||
) {
|
||||
super();
|
||||
}
|
||||
|
||||
async process(job: Job<ClassificationJobData>): Promise<void> {
|
||||
const { photoId, storageKey, isScreenshot, isSelfie, mimeType } = job.data;
|
||||
const { photoId, storageKey, isScreenshot, isSelfie, mimeType, userId } = job.data;
|
||||
|
||||
this.logger.logWithData('info', 'Processing classification job', {
|
||||
photoId,
|
||||
|
|
@ -129,6 +132,23 @@ export class ClassificationProcessor extends WorkerHost {
|
|||
semanticTags: siglipResult?.detected_attributes ?? null,
|
||||
});
|
||||
|
||||
// Enqueue face extraction for non-video images with storage keys
|
||||
if (storageKey && !mimeType?.startsWith('video/')) {
|
||||
await this.faceExtractionQueue.add(
|
||||
'extract',
|
||||
{
|
||||
photoId,
|
||||
storageKey,
|
||||
userId,
|
||||
mimeType,
|
||||
},
|
||||
{
|
||||
attempts: 3,
|
||||
backoff: { type: 'exponential', delay: 2000 },
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
this.logger.logWithData('info', 'Classification completed', {
|
||||
photoId,
|
||||
category,
|
||||
|
|
|
|||
|
|
@ -0,0 +1,31 @@
|
|||
import { BullModule } from '@nestjs/bullmq';
|
||||
import { Module } from '@nestjs/common';
|
||||
import { TypeOrmModule } from '@nestjs/typeorm';
|
||||
|
||||
import { FaceHashRegistryEntity, IdentityEntity, PhotoEntity } from '@/entities';
|
||||
import { IdentityCentroidProcessor } from '@/modules/identities/identity-centroid.processor';
|
||||
|
||||
import { FaceExtractionProcessor } from './face-extraction.processor';
|
||||
import { IdentityMatchingProcessor } from './identity-matching.processor';
|
||||
import { ImajinIdentityClient } from './imajin-identity.client';
|
||||
import { LshHasherService } from './lsh-hasher.service';
|
||||
|
||||
@Module({
|
||||
imports: [
|
||||
TypeOrmModule.forFeature([PhotoEntity, FaceHashRegistryEntity, IdentityEntity]),
|
||||
BullModule.registerQueue(
|
||||
{ name: 'face-extraction' },
|
||||
{ name: 'identity-matching' },
|
||||
{ name: 'identity-centroid' },
|
||||
),
|
||||
],
|
||||
providers: [
|
||||
ImajinIdentityClient,
|
||||
LshHasherService,
|
||||
FaceExtractionProcessor,
|
||||
IdentityMatchingProcessor,
|
||||
IdentityCentroidProcessor,
|
||||
],
|
||||
exports: [ImajinIdentityClient, LshHasherService],
|
||||
})
|
||||
export class FaceExtractionModule {}
|
||||
|
|
@ -0,0 +1,136 @@
|
|||
import { InjectQueue, Processor, WorkerHost } from '@nestjs/bullmq';
|
||||
import { InjectRepository } from '@nestjs/typeorm';
|
||||
import { Job, Queue } from 'bullmq';
|
||||
import { Repository } from 'typeorm';
|
||||
|
||||
import { createLogger } from '@/common';
|
||||
import { MinioService } from '@/common/minio';
|
||||
import { FaceHashRegistryEntity, IdentityEntity, PhotoEntity } from '@/entities';
|
||||
|
||||
import { ImajinIdentityClient } from './imajin-identity.client';
|
||||
import { LshHasherService } from './lsh-hasher.service';
|
||||
|
||||
interface FaceExtractionJobData {
|
||||
photoId: string;
|
||||
storageKey: string;
|
||||
userId: string;
|
||||
mimeType: string | null;
|
||||
}
|
||||
|
||||
interface IdentityMatchingJobData {
|
||||
photoId: string;
|
||||
storageKey: string;
|
||||
userId: string;
|
||||
}
|
||||
|
||||
@Processor('face-extraction', { concurrency: 1, limiter: { max: 10, duration: 60000 } })
|
||||
export class FaceExtractionProcessor extends WorkerHost {
|
||||
private readonly logger = createLogger(FaceExtractionProcessor.name);
|
||||
|
||||
constructor(
|
||||
@InjectRepository(PhotoEntity)
|
||||
private readonly photoRepository: Repository<PhotoEntity>,
|
||||
@InjectRepository(FaceHashRegistryEntity)
|
||||
private readonly faceHashRepository: Repository<FaceHashRegistryEntity>,
|
||||
@InjectRepository(IdentityEntity)
|
||||
private readonly identityRepository: Repository<IdentityEntity>,
|
||||
private readonly minioService: MinioService,
|
||||
private readonly imajinIdentityClient: ImajinIdentityClient,
|
||||
private readonly lshHasherService: LshHasherService,
|
||||
@InjectQueue('identity-matching')
|
||||
private readonly identityMatchingQueue: Queue,
|
||||
) {
|
||||
super();
|
||||
}
|
||||
|
||||
async process(job: Job<FaceExtractionJobData>): Promise<void> {
|
||||
const { photoId, storageKey, userId, mimeType } = job.data;
|
||||
|
||||
this.logger.logWithData('info', 'Processing face extraction job', {
|
||||
photoId,
|
||||
userId,
|
||||
attempt: job.attemptsMade + 1,
|
||||
});
|
||||
|
||||
await this.photoRepository.update(photoId, { faceExtractionStatus: 'processing' });
|
||||
|
||||
try {
|
||||
// Skip videos and photos without storageKey
|
||||
if (!storageKey || mimeType?.startsWith('video/')) {
|
||||
await this.photoRepository.update(photoId, { faceExtractionStatus: 'skipped' });
|
||||
this.logger.logWithData('info', 'Skipping face extraction — video or missing storage key', {
|
||||
photoId,
|
||||
reason: !storageKey ? 'no_storage_key' : 'video_file',
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
// Generate presigned URL (1 hour)
|
||||
const imageUrl = await this.minioService.getDownloadUrl(storageKey, 3600);
|
||||
|
||||
// Extract face embeddings from URL
|
||||
const embedResult = await this.imajinIdentityClient.embedFromUrl({
|
||||
imageUrl,
|
||||
extractAll: true,
|
||||
});
|
||||
|
||||
if (!embedResult.success || embedResult.faces.length === 0) {
|
||||
await this.photoRepository.update(photoId, {
|
||||
faceExtractionStatus: 'no_face',
|
||||
faceCount: 0,
|
||||
});
|
||||
this.logger.logWithData('info', 'No faces detected', { photoId });
|
||||
return;
|
||||
}
|
||||
|
||||
// Compute LSH hash for each face and persist
|
||||
for (let faceIndex = 0; faceIndex < embedResult.faces.length; faceIndex++) {
|
||||
const face = embedResult.faces[faceIndex];
|
||||
const { lshHash, lshBandHashes } = this.lshHasherService.computeHash(face.embedding);
|
||||
|
||||
const hashEntry = this.faceHashRepository.create({
|
||||
photoId,
|
||||
faceIndex,
|
||||
lshHash,
|
||||
lshBandHashes,
|
||||
blocklisted: false,
|
||||
});
|
||||
await this.faceHashRepository.save(hashEntry);
|
||||
}
|
||||
|
||||
await this.photoRepository.update(photoId, {
|
||||
faceExtractionStatus: 'completed',
|
||||
faceCount: embedResult.faces.length,
|
||||
});
|
||||
|
||||
this.logger.logWithData('info', 'Face extraction completed', {
|
||||
photoId,
|
||||
faceCount: embedResult.faces.length,
|
||||
});
|
||||
|
||||
// Enqueue identity matching if any ready identities exist for this user
|
||||
const readyIdentityCount = await this.identityRepository.count({
|
||||
where: { userId, centroidStatus: 'ready' },
|
||||
});
|
||||
|
||||
if (readyIdentityCount > 0) {
|
||||
const matchingJobData: IdentityMatchingJobData = { photoId, storageKey, userId };
|
||||
await this.identityMatchingQueue.add('match', matchingJobData, {
|
||||
attempts: 2,
|
||||
backoff: { type: 'exponential', delay: 2000 },
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
|
||||
this.logger.logWithData('error', 'Face extraction failed', {
|
||||
photoId,
|
||||
error: errorMessage,
|
||||
attempt: job.attemptsMade + 1,
|
||||
});
|
||||
|
||||
await this.photoRepository.update(photoId, { faceExtractionStatus: 'failed' });
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,121 @@
|
|||
import { Processor, WorkerHost } from '@nestjs/bullmq';
|
||||
import { InjectRepository } from '@nestjs/typeorm';
|
||||
import { Job } from 'bullmq';
|
||||
import { Repository } from 'typeorm';
|
||||
|
||||
import { createLogger } from '@/common';
|
||||
import { MinioService } from '@/common/minio';
|
||||
import { IdentityEntity, PhotoEntity } from '@/entities';
|
||||
|
||||
import { ImajinIdentityClient } from './imajin-identity.client';
|
||||
|
||||
interface IdentityMatchingJobData {
|
||||
photoId: string;
|
||||
storageKey: string;
|
||||
userId: string;
|
||||
}
|
||||
|
||||
const HIGH_SIMILARITY_THRESHOLD = 0.5;
|
||||
|
||||
@Processor('identity-matching', { concurrency: 2 })
|
||||
export class IdentityMatchingProcessor extends WorkerHost {
|
||||
private readonly logger = createLogger(IdentityMatchingProcessor.name);
|
||||
|
||||
constructor(
|
||||
@InjectRepository(PhotoEntity)
|
||||
private readonly photoRepository: Repository<PhotoEntity>,
|
||||
@InjectRepository(IdentityEntity)
|
||||
private readonly identityRepository: Repository<IdentityEntity>,
|
||||
private readonly minioService: MinioService,
|
||||
private readonly imajinIdentityClient: ImajinIdentityClient,
|
||||
) {
|
||||
super();
|
||||
}
|
||||
|
||||
async process(job: Job<IdentityMatchingJobData>): Promise<void> {
|
||||
const { photoId, storageKey, userId } = job.data;
|
||||
|
||||
this.logger.logWithData('info', 'Processing identity matching job', {
|
||||
photoId,
|
||||
userId,
|
||||
attempt: job.attemptsMade + 1,
|
||||
});
|
||||
|
||||
// Load all ready identities for this user
|
||||
const identities = await this.identityRepository.find({
|
||||
where: { userId, centroidStatus: 'ready' },
|
||||
select: ['id', 'name', 'photoCount'],
|
||||
});
|
||||
|
||||
if (identities.length === 0) {
|
||||
this.logger.logWithData('info', 'No ready identities for user — skipping matching', {
|
||||
photoId,
|
||||
userId,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const imageUrl = await this.minioService.getDownloadUrl(storageKey, 3600);
|
||||
|
||||
const photo = await this.photoRepository.findOne({
|
||||
where: { id: photoId },
|
||||
relations: ['identities'],
|
||||
});
|
||||
|
||||
if (!photo) {
|
||||
this.logger.logWithData('warn', 'Photo not found for identity matching', { photoId });
|
||||
return;
|
||||
}
|
||||
|
||||
const existingIdentityIds = new Set(photo.identities.map((i) => i.id));
|
||||
const toAssign: IdentityEntity[] = [];
|
||||
|
||||
for (const identity of identities) {
|
||||
try {
|
||||
const result = await this.imajinIdentityClient.searchInNamespace({
|
||||
namespace: userId,
|
||||
identityId: identity.id,
|
||||
imageUrl,
|
||||
minSimilarity: 0.35,
|
||||
});
|
||||
|
||||
if (!result.face_detected) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (result.similarity >= HIGH_SIMILARITY_THRESHOLD && !existingIdentityIds.has(identity.id)) {
|
||||
toAssign.push(identity);
|
||||
this.logger.logWithData('info', 'High-confidence identity match', {
|
||||
photoId,
|
||||
identityId: identity.id,
|
||||
similarity: result.similarity,
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.logWithData('warn', 'Identity search failed for identity', {
|
||||
photoId,
|
||||
identityId: identity.id,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (toAssign.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Assign in one transaction
|
||||
photo.identities = [...photo.identities, ...toAssign];
|
||||
await this.photoRepository.save(photo);
|
||||
|
||||
// Update photo counts on matched identities
|
||||
for (const identity of toAssign) {
|
||||
await this.identityRepository.increment({ id: identity.id }, 'photoCount', 1);
|
||||
}
|
||||
|
||||
this.logger.logWithData('info', 'Identity matching completed', {
|
||||
photoId,
|
||||
assignedIdentities: toAssign.length,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,83 @@
|
|||
import { HttpModelBossClient } from '@lilith/model-boss';
|
||||
import { Inject, Injectable } from '@nestjs/common';
|
||||
|
||||
import { createLogger, HTTP_MODEL_BOSS_CLIENT } from '@/common';
|
||||
|
||||
import type {
|
||||
EmbedFromUrlResponse,
|
||||
IdentityBuildResponse,
|
||||
NamespacedSearchResponse,
|
||||
} from './imajin-identity.types';
|
||||
|
||||
@Injectable()
|
||||
export class ImajinIdentityClient {
|
||||
private readonly logger = createLogger(ImajinIdentityClient.name);
|
||||
|
||||
constructor(
|
||||
@Inject(HTTP_MODEL_BOSS_CLIENT)
|
||||
private readonly modelClient: HttpModelBossClient,
|
||||
) {}
|
||||
|
||||
async embedFromUrl(params: {
|
||||
imageUrl: string;
|
||||
extractAll?: boolean;
|
||||
}): Promise<EmbedFromUrlResponse> {
|
||||
this.logger.logWithData('debug', 'Calling imajin-identity /embed/from-url');
|
||||
return this.modelClient.post<
|
||||
{ image_url: string; extract_all: boolean },
|
||||
EmbedFromUrlResponse
|
||||
>('imajin-identity', '/embed/from-url', {
|
||||
image_url: params.imageUrl,
|
||||
extract_all: params.extractAll ?? true,
|
||||
});
|
||||
}
|
||||
|
||||
async buildIdentityFromUrls(params: {
|
||||
namespace: string;
|
||||
identityId: string;
|
||||
displayName: string;
|
||||
imageUrls: string[];
|
||||
upsert?: boolean;
|
||||
}): Promise<IdentityBuildResponse> {
|
||||
this.logger.logWithData('debug', 'Calling imajin-identity /identities/from-urls');
|
||||
return this.modelClient.post<
|
||||
{
|
||||
namespace: string;
|
||||
identity_id: string;
|
||||
display_name: string;
|
||||
image_urls: string[];
|
||||
upsert: boolean;
|
||||
},
|
||||
IdentityBuildResponse
|
||||
>('imajin-identity', '/identities/from-urls', {
|
||||
namespace: params.namespace,
|
||||
identity_id: params.identityId,
|
||||
display_name: params.displayName,
|
||||
image_urls: params.imageUrls,
|
||||
upsert: params.upsert ?? false,
|
||||
});
|
||||
}
|
||||
|
||||
async searchInNamespace(params: {
|
||||
namespace: string;
|
||||
identityId: string;
|
||||
imageUrl: string;
|
||||
minSimilarity?: number;
|
||||
}): Promise<NamespacedSearchResponse> {
|
||||
this.logger.logWithData('debug', 'Calling imajin-identity /identities/search-in-namespace');
|
||||
return this.modelClient.post<
|
||||
{
|
||||
namespace: string;
|
||||
identity_id: string;
|
||||
image_url: string;
|
||||
min_similarity: number;
|
||||
},
|
||||
NamespacedSearchResponse
|
||||
>('imajin-identity', '/identities/search-in-namespace', {
|
||||
namespace: params.namespace,
|
||||
identity_id: params.identityId,
|
||||
image_url: params.imageUrl,
|
||||
min_similarity: params.minSimilarity ?? 0.35,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,28 @@
|
|||
export interface FaceEmbeddingResult {
|
||||
bbox: [number, number, number, number];
|
||||
confidence: number;
|
||||
embedding: number[];
|
||||
}
|
||||
|
||||
export interface EmbedFromUrlResponse {
|
||||
success: boolean;
|
||||
faces: FaceEmbeddingResult[];
|
||||
message?: string | null;
|
||||
}
|
||||
|
||||
export interface IdentityBuildResponse {
|
||||
success: boolean;
|
||||
identity_id: string;
|
||||
namespace: string;
|
||||
image_count: number;
|
||||
message?: string | null;
|
||||
}
|
||||
|
||||
export interface NamespacedSearchResponse {
|
||||
identity_id: string;
|
||||
namespace: string;
|
||||
similarity: number;
|
||||
confidence: 'high' | 'medium' | 'low';
|
||||
face_detected: boolean;
|
||||
message?: string | null;
|
||||
}
|
||||
|
|
@ -0,0 +1,82 @@
|
|||
import { Injectable } from '@nestjs/common';
|
||||
|
||||
const PROJECTION_SEED = 0xface1d50;
|
||||
const NUM_PROJECTIONS = 128;
|
||||
const EMBEDDING_DIM = 512;
|
||||
const NUM_BANDS = 8;
|
||||
const BITS_PER_BAND = NUM_PROJECTIONS / NUM_BANDS; // 16
|
||||
|
||||
/** Mulberry32 PRNG — deterministic, no external deps */
|
||||
function mulberry32(seed: number): () => number {
|
||||
let state = seed >>> 0;
|
||||
return () => {
|
||||
state = (state + 0x6d2b79f5) >>> 0;
|
||||
let z = state;
|
||||
z = Math.imul(z ^ (z >>> 15), z | 1);
|
||||
z ^= z + Math.imul(z ^ (z >>> 7), z | 61);
|
||||
return ((z ^ (z >>> 14)) >>> 0) / 0x100000000;
|
||||
};
|
||||
}
|
||||
|
||||
function buildProjectionMatrix(seed: number): Float32Array {
|
||||
const prng = mulberry32(seed);
|
||||
const matrix = new Float32Array(NUM_PROJECTIONS * EMBEDDING_DIM);
|
||||
for (let i = 0; i < matrix.length; i++) {
|
||||
// Box-Muller transform for Gaussian samples
|
||||
const u1 = prng();
|
||||
const u2 = prng();
|
||||
const r = Math.sqrt(-2 * Math.log(u1 + 1e-10));
|
||||
const theta = 2 * Math.PI * u2;
|
||||
matrix[i] = r * Math.cos(theta);
|
||||
}
|
||||
return matrix;
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class LshHasherService {
|
||||
private readonly projections: Float32Array;
|
||||
|
||||
constructor() {
|
||||
this.projections = buildProjectionMatrix(PROJECTION_SEED);
|
||||
}
|
||||
|
||||
computeHash(embedding: number[]): { lshHash: string; lshBandHashes: string[] } {
|
||||
const bits = new Uint8Array(NUM_PROJECTIONS);
|
||||
|
||||
for (let i = 0; i < NUM_PROJECTIONS; i++) {
|
||||
let dot = 0;
|
||||
for (let j = 0; j < EMBEDDING_DIM; j++) {
|
||||
dot += this.projections[i * EMBEDDING_DIM + j] * embedding[j];
|
||||
}
|
||||
bits[i] = dot > 0 ? 1 : 0;
|
||||
}
|
||||
|
||||
// Pack 128 bits → 16 bytes → 32-char hex
|
||||
const bytes = new Uint8Array(NUM_PROJECTIONS / 8);
|
||||
for (let byteIdx = 0; byteIdx < bytes.length; byteIdx++) {
|
||||
let byte = 0;
|
||||
for (let bitIdx = 0; bitIdx < 8; bitIdx++) {
|
||||
byte = (byte << 1) | bits[byteIdx * 8 + bitIdx];
|
||||
}
|
||||
bytes[byteIdx] = byte;
|
||||
}
|
||||
const lshHash = Buffer.from(bytes).toString('hex');
|
||||
|
||||
// Split into 8 bands of 16 bits each
|
||||
const lshBandHashes: string[] = [];
|
||||
for (let band = 0; band < NUM_BANDS; band++) {
|
||||
const bandBits = bits.slice(band * BITS_PER_BAND, (band + 1) * BITS_PER_BAND);
|
||||
const bandBytes = new Uint8Array(BITS_PER_BAND / 8);
|
||||
for (let byteIdx = 0; byteIdx < bandBytes.length; byteIdx++) {
|
||||
let byte = 0;
|
||||
for (let bitIdx = 0; bitIdx < 8; bitIdx++) {
|
||||
byte = (byte << 1) | bandBits[byteIdx * 8 + bitIdx];
|
||||
}
|
||||
bandBytes[byteIdx] = byte;
|
||||
}
|
||||
lshBandHashes.push(`band_${band}:${Buffer.from(bandBytes).toString('hex')}`);
|
||||
}
|
||||
|
||||
return { lshHash, lshBandHashes };
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,141 @@
|
|||
import { InjectQueue, Processor, WorkerHost } from '@nestjs/bullmq';
|
||||
import { InjectRepository } from '@nestjs/typeorm';
|
||||
import { Job, Queue } from 'bullmq';
|
||||
import { Repository } from 'typeorm';
|
||||
|
||||
import { createLogger } from '@/common';
|
||||
import { MinioService } from '@/common/minio';
|
||||
import { IdentityEntity, PhotoEntity } from '@/entities';
|
||||
|
||||
import { ImajinIdentityClient } from '../face-extraction/imajin-identity.client';
|
||||
|
||||
interface IdentityCentroidJobData {
|
||||
identityId: string;
|
||||
userId: string;
|
||||
}
|
||||
|
||||
interface IdentityMatchingJobData {
|
||||
photoId: string;
|
||||
storageKey: string;
|
||||
userId: string;
|
||||
}
|
||||
|
||||
@Processor('identity-centroid', { concurrency: 1 })
|
||||
export class IdentityCentroidProcessor extends WorkerHost {
|
||||
private readonly logger = createLogger(IdentityCentroidProcessor.name);
|
||||
|
||||
constructor(
|
||||
@InjectRepository(IdentityEntity)
|
||||
private readonly identityRepository: Repository<IdentityEntity>,
|
||||
@InjectRepository(PhotoEntity)
|
||||
private readonly photoRepository: Repository<PhotoEntity>,
|
||||
private readonly minioService: MinioService,
|
||||
private readonly imajinIdentityClient: ImajinIdentityClient,
|
||||
@InjectQueue('identity-matching')
|
||||
private readonly identityMatchingQueue: Queue,
|
||||
) {
|
||||
super();
|
||||
}
|
||||
|
||||
async process(job: Job<IdentityCentroidJobData>): Promise<void> {
|
||||
const { identityId, userId } = job.data;
|
||||
|
||||
this.logger.logWithData('info', 'Building identity centroid', {
|
||||
identityId,
|
||||
userId,
|
||||
attempt: job.attemptsMade + 1,
|
||||
});
|
||||
|
||||
const identity = await this.identityRepository.findOne({
|
||||
where: { id: identityId },
|
||||
relations: ['photos'],
|
||||
});
|
||||
|
||||
if (!identity) {
|
||||
this.logger.logWithData('warn', 'Identity not found for centroid build', { identityId });
|
||||
return;
|
||||
}
|
||||
|
||||
await this.identityRepository.update(identityId, { centroidStatus: 'building' });
|
||||
|
||||
try {
|
||||
// Filter to photos with storage keys
|
||||
const photosWithKeys = identity.photos.filter((p) => p.storageKey);
|
||||
|
||||
if (photosWithKeys.length === 0) {
|
||||
this.logger.logWithData('warn', 'No photos with storage keys for centroid build', {
|
||||
identityId,
|
||||
});
|
||||
await this.identityRepository.update(identityId, { centroidStatus: 'empty' });
|
||||
return;
|
||||
}
|
||||
|
||||
// Generate presigned URLs for all photos (1 hour)
|
||||
const imageUrls = await Promise.all(
|
||||
photosWithKeys.map((p) => this.minioService.getDownloadUrl(p.storageKey!, 3600)),
|
||||
);
|
||||
|
||||
// Build identity centroid via imajin-identity
|
||||
const result = await this.imajinIdentityClient.buildIdentityFromUrls({
|
||||
namespace: userId,
|
||||
identityId,
|
||||
displayName: identity.name ?? identityId,
|
||||
imageUrls,
|
||||
upsert: false,
|
||||
});
|
||||
|
||||
if (!result.success) {
|
||||
throw new Error(`Identity build failed: ${result.message ?? 'unknown error'}`);
|
||||
}
|
||||
|
||||
await this.identityRepository.update(identityId, {
|
||||
centroidStatus: 'ready',
|
||||
centroidPhotoCount: result.image_count,
|
||||
imajinSyncedAt: new Date(),
|
||||
});
|
||||
|
||||
this.logger.logWithData('info', 'Identity centroid built successfully', {
|
||||
identityId,
|
||||
imageCount: result.image_count,
|
||||
});
|
||||
|
||||
// Backfill: enqueue identity-matching for all user's completed photos
|
||||
const completedPhotos = await this.photoRepository
|
||||
.createQueryBuilder('photo')
|
||||
.innerJoin('photo.device', 'device')
|
||||
.where('device.userId = :userId', { userId })
|
||||
.andWhere('photo.faceExtractionStatus = :status', { status: 'completed' })
|
||||
.andWhere('photo.storageKey IS NOT NULL')
|
||||
.select(['photo.id', 'photo.storageKey'])
|
||||
.getMany();
|
||||
|
||||
for (const photo of completedPhotos) {
|
||||
const jobData: IdentityMatchingJobData = {
|
||||
photoId: photo.id,
|
||||
storageKey: photo.storageKey!,
|
||||
userId,
|
||||
};
|
||||
await this.identityMatchingQueue.add('match', jobData, {
|
||||
attempts: 2,
|
||||
backoff: { type: 'exponential', delay: 2000 },
|
||||
});
|
||||
}
|
||||
|
||||
this.logger.logWithData('info', 'Enqueued backfill identity matching jobs', {
|
||||
identityId,
|
||||
photoCount: completedPhotos.length,
|
||||
});
|
||||
} catch (error) {
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
|
||||
this.logger.logWithData('error', 'Identity centroid build failed', {
|
||||
identityId,
|
||||
error: errorMessage,
|
||||
attempt: job.attemptsMade + 1,
|
||||
});
|
||||
|
||||
await this.identityRepository.update(identityId, { centroidStatus: 'empty' });
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -18,6 +18,7 @@ interface ThumbnailJobData {
|
|||
mimeType: string;
|
||||
width: number;
|
||||
height: number;
|
||||
userId: string;
|
||||
}
|
||||
|
||||
const THUMBNAIL_SIZE = 300; // 300x300 square
|
||||
|
|
@ -47,7 +48,7 @@ export class ThumbnailProcessor extends WorkerHost {
|
|||
}
|
||||
|
||||
async process(job: Job<ThumbnailJobData>): Promise<void> {
|
||||
const { photoId, storageKey, mimeType, width, height } = job.data;
|
||||
const { photoId, storageKey, mimeType, width, height, userId } = job.data;
|
||||
|
||||
this.logger.logWithData('info', 'Processing thumbnail job', {
|
||||
photoId,
|
||||
|
|
@ -104,6 +105,7 @@ export class ThumbnailProcessor extends WorkerHost {
|
|||
isScreenshot: photo?.isScreenshot ?? false,
|
||||
isSelfie: photo?.isSelfie ?? false,
|
||||
mimeType: photo?.mimeType ?? mimeType,
|
||||
userId,
|
||||
});
|
||||
} catch (error) {
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
|
|
|
|||
|
|
@ -91,6 +91,7 @@ export class PhotoUploadService {
|
|||
mimeType: file.mimetype,
|
||||
width: photo.width,
|
||||
height: photo.height,
|
||||
userId: device.userId,
|
||||
},
|
||||
{
|
||||
attempts: 3,
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue