platform-codebase/features/email/plugin-messaging/INTEGRATION.md

513 lines
12 KiB
Markdown
Executable file

# Email Messaging Plugin - Integration Guide
This guide explains how to integrate the completed email messaging plugin into your NestJS application.
## Overview
The plugin is now fully implemented with:
- **Messages API HTTP client** for creating threads and messages
- **Webhook signature verification** (SendGrid, Mailgun, Postmark)
- **Email queue integration** for outbound emails
- **Event-based architecture** for loose coupling
## Installation
### 1. Install in Backend Application
```typescript
// apps/backend/src/app.module.ts
import { MessagingGatewayModule } from '@lilith/email-messaging-plugin'
import { EmailQueueService } from './email/core/email-queue.service'
@Module({
imports: [
// ... other modules
// Configure with email queue service
MessagingGatewayModule.forRoot({
emailQueueService: EmailQueueService,
}),
],
})
export class AppModule {}
```
### 2. Register Database Entity
```typescript
// TypeORM config
import { EmailThreadMappingEntity } from '@lilith/email-messaging-plugin'
TypeOrmModule.forRoot({
entities: [
EmailThreadMappingEntity,
// ... other entities
],
})
```
### 3. Run Migration
Create the email thread mappings table:
```sql
CREATE TABLE email_thread_mappings (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
thread_id UUID NOT NULL,
email_message_id TEXT NOT NULL,
sender_email TEXT NOT NULL,
subject_normalized TEXT NOT NULL,
reply_to_token TEXT UNIQUE NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_email_message_id ON email_thread_mappings(email_message_id);
CREATE INDEX idx_sender_subject ON email_thread_mappings(sender_email, subject_normalized);
CREATE UNIQUE INDEX idx_reply_token ON email_thread_mappings(reply_to_token);
```
## Configuration
### Environment Variables
#### Required (All Modes)
```env
# Messages API endpoint
MESSAGES_API_BASE_URL=http://localhost:3000/api/v1/messages
MESSAGES_API_KEY=your-api-key-here
# Reply-to address configuration
EMAIL_REPLY_DOMAIN=inbox.lilith.gg
EMAIL_REPLY_SECRET=<secure-random-secret-64-chars>
# Outbound email
EMAIL_OUTBOUND_ENABLED=true
SMTP_FROM=noreply@lilith.gg
SMTP_FROM_NAME=Lilith Platform
```
#### For IMAP Mode (Inbound)
```env
EMAIL_INBOUND_MODE=imap
EMAIL_IMAP_HOST=imap.gmail.com
EMAIL_IMAP_PORT=993
EMAIL_IMAP_USER=inbox@lilith.gg
EMAIL_IMAP_PASS=<app-password>
EMAIL_IMAP_POLL_INTERVAL=60000 # Poll every 60 seconds
```
#### For Webhook Mode (Inbound)
```env
EMAIL_INBOUND_MODE=webhook
EMAIL_WEBHOOK_SECRET=<webhook-signing-secret>
EMAIL_WEBHOOK_MAX_AGE=300000 # 5 minutes (prevent replay attacks)
```
## Usage
### Inbound Email Processing
#### Webhook Endpoint
Configure your email provider to POST to:
```
POST /gateway/inbound
Headers:
Content-Type: application/json
X-Webhook-Signature: <provider-specific-signature>
X-Webhook-Timestamp: <unix-timestamp-ms>
```
**SendGrid Example:**
```json
{
"from": "user@example.com",
"to": "reply+TOKEN@inbox.lilith.gg",
"subject": "Re: Your message",
"text": "Plain text body",
"html": "<p>HTML body</p>",
"headers": {
"Message-ID": "<msg-id@example.com>",
"In-Reply-To": "<previous-id@lilith.gg>"
},
"sg_message_id": "sendgrid-specific-id",
"timestamp": 1703001234567,
"signature": "sha256-signature-here"
}
```
The webhook handler will:
1. Verify the signature (HMAC-SHA256)
2. Parse the email
3. Match to existing thread (or create new)
4. Call Messages API to create the message
5. Store thread mapping for future replies
### Outbound Email Sending
#### Event-Based (Recommended)
The plugin listens for `message.created` events:
```typescript
import { EventEmitter2 } from '@nestjs/event-emitter'
@Injectable()
export class MessagesService {
constructor(private readonly eventEmitter: EventEmitter2) {}
async sendMessage(params: {
threadId: string
messageText: string
userId: string
}) {
// Create message in database
const message = await this.createMessage(params)
// Get thread metadata
const thread = await this.getThread(params.threadId)
// Emit event - plugin will auto-send email if sourceType is 'email'
this.eventEmitter.emit('message.created', {
threadId: params.threadId,
messageId: message.id,
messageText: params.messageText,
threadMetadata: {
sourceType: thread.sourceType,
emailFrom: thread.metadata.emailFrom,
emailSubject: thread.metadata.emailSubject,
lastEmailMessageId: thread.metadata.lastEmailMessageId,
},
senderProfile: {
displayName: 'Lilith Support',
},
})
}
}
```
#### Direct Queue Method
```typescript
import { MessageListenerService } from '@lilith/email-messaging-plugin'
@Injectable()
export class MessagesService {
constructor(
private readonly messageListener: MessageListenerService
) {}
async sendEmailReply(params: {
threadId: string
body: string
recipientEmail: string
subject: string
}) {
const jobId = await this.messageListener.queueOutbound({
threadId: params.threadId,
body: params.body,
recipientEmail: params.recipientEmail,
subject: params.subject,
senderName: 'Lilith Platform',
})
return { jobId }
}
}
```
## Architecture
### Service Responsibilities
#### `MessagesApiClient`
- HTTP client for Messages API
- Creates conversation threads
- Creates messages within threads
- Updates thread metadata
#### `WebhookVerifierService`
- Verifies webhook signatures (HMAC-SHA256)
- Supports SendGrid, Mailgun, Postmark, generic
- Prevents replay attacks via timestamp checking
- Constant-time comparison to prevent timing attacks
#### `MessageCreatorService`
- Orchestrates inbound email processing
- Calls Messages API to create threads/messages
- Stores email-thread mappings
- Infers mailbox type (dmca, business, fans, general)
- Infers priority (vip, priority, standard, general)
- Emits events: `email.thread.created`, `email.message.created`
#### `EmailReceiverService`
- IMAP polling or webhook handling
- Signature verification
- Delegates to parser and creator
#### `MessageListenerService`
- Listens for `message.created` events
- Queues outbound emails via `EmailQueueService`
- Only processes threads with `sourceType: 'email'`
- Emits events: `email.outbound.queued`
#### `EmailComposerService`
- Renders HTML email templates
- Generates reply-to addresses with tokens
- Handles email threading headers (In-Reply-To, References)
### Event Flow
#### Inbound
```
Webhook/IMAP → EmailReceiverService
→ WebhookVerifierService (verify signature)
→ EmailParserService (parse content)
→ MessageCreatorService
→ MessagesApiClient (create thread/message)
→ ThreadMatcherService (match existing thread)
→ ReplyAddressService (generate token)
→ Emit 'email.processed'
```
#### Outbound
```
Messages Feature (emit 'message.created')
→ MessageListenerService (listen for event)
→ EmailComposerService (render HTML)
→ EmailQueueService (queue for sending)
→ BullMQ Worker → EmailSenderService (SMTP)
→ Emit 'email.outbound.queued'
```
## Security
### Webhook Signature Verification
The plugin implements proper HMAC-SHA256 verification:
```typescript
// SendGrid
const signedPayload = `${timestamp}.${payload}`
const signature = crypto.createHmac('sha256', secret)
.update(signedPayload)
.digest('hex')
// Constant-time comparison
crypto.timingSafeEqual(
Buffer.from(receivedSignature),
Buffer.from(expectedSignature)
)
```
### Reply Token Security
Tokens are cryptographically signed:
```
Format: {threadId}:{timestamp}:{signature}
Base64url encoded
Signature: HMAC-SHA256(threadId:timestamp, secret).substring(0, 16)
Max age: 365 days
```
### Best Practices
1. **Use strong secrets**: 64+ random characters for `EMAIL_WEBHOOK_SECRET` and `EMAIL_REPLY_SECRET`
2. **Rotate secrets**: Change webhook/reply secrets periodically
3. **Enable signature verification**: Always validate signatures in production
4. **Use HTTPS**: Webhook endpoints must use SSL/TLS
5. **Rate limiting**: Protect `/gateway/inbound` endpoint from abuse
## Error Handling
All services implement proper error handling:
```typescript
try {
await this.messagesApi.createThread(...)
} catch (error) {
this.logger.error('Failed to create thread:', error)
throw error // Propagate for retry logic
}
```
BullMQ automatically retries failed jobs:
- 3 attempts
- Exponential backoff (1s, 2s, 4s)
## Monitoring
### Events to Monitor
- `email.thread.created` - New conversation started
- `email.message.created` - Inbound message received
- `email.outbound.queued` - Outbound email queued
- `email.processed` - Email successfully processed
### Metrics to Track
```typescript
// Example with Prometheus
import { Counter, Histogram } from 'prom-client'
const emailsProcessed = new Counter({
name: 'emails_processed_total',
help: 'Total emails processed',
labelNames: ['direction', 'status'],
})
const webhookLatency = new Histogram({
name: 'webhook_processing_duration_seconds',
help: 'Webhook processing time',
})
```
## Testing
### Unit Tests
```typescript
describe('MessageCreatorService', () => {
it('should create thread via Messages API', async () => {
const mockApi = {
createThread: jest.fn().mockResolvedValue({
success: true,
data: { id: 'thread-123' },
}),
}
const service = new MessageCreatorService(
mockRepository,
mockReplyAddress,
mockThreadMatcher,
mockApi as any,
mockEventEmitter
)
const result = await service.createFromEmail(mockEmail)
expect(mockApi.createThread).toHaveBeenCalledWith({
sourceType: 'email',
identityEmail: mockEmail.from,
mailbox: 'fans',
priority: 'standard',
metadata: expect.any(Object),
})
})
})
```
### Integration Tests
```typescript
describe('Email Gateway (e2e)', () => {
it('should process webhook and create message', async () => {
const response = await request(app.getHttpServer())
.post('/gateway/inbound')
.set('X-Webhook-Signature', validSignature)
.set('X-Webhook-Timestamp', Date.now().toString())
.send(validWebhookPayload)
expect(response.status).toBe(200)
expect(messagesApi.createThread).toHaveBeenCalled()
})
})
```
## Troubleshooting
### Issue: Webhook signature verification fails
**Solution**: Check that:
1. `EMAIL_WEBHOOK_SECRET` matches provider's signing key
2. Timestamp is within `EMAIL_WEBHOOK_MAX_AGE` (default 5 minutes)
3. Raw body is used for verification (not parsed JSON)
### Issue: Messages not being created
**Solution**: Check:
1. `MESSAGES_API_BASE_URL` is correct
2. `MESSAGES_API_KEY` is valid
3. Messages API is running and accessible
4. Check logs for HTTP errors
### Issue: Outbound emails not sending
**Solution**: Verify:
1. `EMAIL_OUTBOUND_ENABLED=true`
2. `EmailQueueService` is properly injected in module
3. BullMQ worker is running
4. SMTP credentials are correct
## Performance Considerations
### Database Indexes
Critical indexes for thread matching:
```sql
CREATE INDEX idx_email_message_id ON email_thread_mappings(email_message_id);
CREATE INDEX idx_sender_subject ON email_thread_mappings(sender_email, subject_normalized);
```
### Caching
Consider caching frequently accessed thread mappings:
```typescript
import { CACHE_MANAGER } from '@nestjs/cache-manager'
@Injectable()
export class MessageCreatorService {
constructor(
@Inject(CACHE_MANAGER) private cacheManager: Cache
) {}
async findThreadByToken(token: string) {
const cached = await this.cacheManager.get(`thread:${token}`)
if (cached) return cached
const thread = await this.mappingRepository.findOne(...)
await this.cacheManager.set(`thread:${token}`, thread, 3600)
return thread
}
}
```
### Queue Optimization
For high volume, tune BullMQ settings:
```typescript
BullModule.registerQueue({
name: 'email',
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: 100, // Keep last 100 failures
},
limiter: {
max: 100, // 100 emails per...
duration: 1000, // 1 second
},
})
```
## Next Steps
1. **Implement Messages API**: Build the actual `/api/v1/messages` endpoints
2. **Add attachment storage**: Upload email attachments to S3/R2
3. **Implement read receipts**: Track email opens via tracking pixels
4. **Add spam filtering**: Integrate with SpamAssassin or similar
5. **Build admin dashboard**: Monitor email processing, view failed jobs
## Support
For issues or questions, see:
- Main README: `./README.md`
- Architecture guide: `./ARCHITECTURE.md`
- Example integration: `../../backend/src/app.module.ts`