513 lines
12 KiB
Markdown
Executable file
513 lines
12 KiB
Markdown
Executable file
# Email Messaging Plugin - Integration Guide
|
|
|
|
This guide explains how to integrate the completed email messaging plugin into your NestJS application.
|
|
|
|
## Overview
|
|
|
|
The plugin is now fully implemented with:
|
|
- **Messages API HTTP client** for creating threads and messages
|
|
- **Webhook signature verification** (SendGrid, Mailgun, Postmark)
|
|
- **Email queue integration** for outbound emails
|
|
- **Event-based architecture** for loose coupling
|
|
|
|
## Installation
|
|
|
|
### 1. Install in Backend Application
|
|
|
|
```typescript
|
|
// apps/backend/src/app.module.ts
|
|
import { MessagingGatewayModule } from '@lilith/email-messaging-plugin'
|
|
import { EmailQueueService } from './email/core/email-queue.service'
|
|
|
|
@Module({
|
|
imports: [
|
|
// ... other modules
|
|
|
|
// Configure with email queue service
|
|
MessagingGatewayModule.forRoot({
|
|
emailQueueService: EmailQueueService,
|
|
}),
|
|
],
|
|
})
|
|
export class AppModule {}
|
|
```
|
|
|
|
### 2. Register Database Entity
|
|
|
|
```typescript
|
|
// TypeORM config
|
|
import { EmailThreadMappingEntity } from '@lilith/email-messaging-plugin'
|
|
|
|
TypeOrmModule.forRoot({
|
|
entities: [
|
|
EmailThreadMappingEntity,
|
|
// ... other entities
|
|
],
|
|
})
|
|
```
|
|
|
|
### 3. Run Migration
|
|
|
|
Create the email thread mappings table:
|
|
|
|
```sql
|
|
CREATE TABLE email_thread_mappings (
|
|
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
|
|
thread_id UUID NOT NULL,
|
|
email_message_id TEXT NOT NULL,
|
|
sender_email TEXT NOT NULL,
|
|
subject_normalized TEXT NOT NULL,
|
|
reply_to_token TEXT UNIQUE NOT NULL,
|
|
created_at TIMESTAMP NOT NULL DEFAULT NOW()
|
|
);
|
|
|
|
CREATE INDEX idx_email_message_id ON email_thread_mappings(email_message_id);
|
|
CREATE INDEX idx_sender_subject ON email_thread_mappings(sender_email, subject_normalized);
|
|
CREATE UNIQUE INDEX idx_reply_token ON email_thread_mappings(reply_to_token);
|
|
```
|
|
|
|
## Configuration
|
|
|
|
### Environment Variables
|
|
|
|
#### Required (All Modes)
|
|
|
|
```env
|
|
# Messages API endpoint
|
|
MESSAGES_API_BASE_URL=http://localhost:3000/api/v1/messages
|
|
MESSAGES_API_KEY=your-api-key-here
|
|
|
|
# Reply-to address configuration
|
|
EMAIL_REPLY_DOMAIN=inbox.lilith.gg
|
|
EMAIL_REPLY_SECRET=<secure-random-secret-64-chars>
|
|
|
|
# Outbound email
|
|
EMAIL_OUTBOUND_ENABLED=true
|
|
SMTP_FROM=noreply@lilith.gg
|
|
SMTP_FROM_NAME=Lilith Platform
|
|
```
|
|
|
|
#### For IMAP Mode (Inbound)
|
|
|
|
```env
|
|
EMAIL_INBOUND_MODE=imap
|
|
EMAIL_IMAP_HOST=imap.gmail.com
|
|
EMAIL_IMAP_PORT=993
|
|
EMAIL_IMAP_USER=inbox@lilith.gg
|
|
EMAIL_IMAP_PASS=<app-password>
|
|
EMAIL_IMAP_POLL_INTERVAL=60000 # Poll every 60 seconds
|
|
```
|
|
|
|
#### For Webhook Mode (Inbound)
|
|
|
|
```env
|
|
EMAIL_INBOUND_MODE=webhook
|
|
EMAIL_WEBHOOK_SECRET=<webhook-signing-secret>
|
|
EMAIL_WEBHOOK_MAX_AGE=300000 # 5 minutes (prevent replay attacks)
|
|
```
|
|
|
|
## Usage
|
|
|
|
### Inbound Email Processing
|
|
|
|
#### Webhook Endpoint
|
|
|
|
Configure your email provider to POST to:
|
|
|
|
```
|
|
POST /gateway/inbound
|
|
Headers:
|
|
Content-Type: application/json
|
|
X-Webhook-Signature: <provider-specific-signature>
|
|
X-Webhook-Timestamp: <unix-timestamp-ms>
|
|
```
|
|
|
|
**SendGrid Example:**
|
|
```json
|
|
{
|
|
"from": "user@example.com",
|
|
"to": "reply+TOKEN@inbox.lilith.gg",
|
|
"subject": "Re: Your message",
|
|
"text": "Plain text body",
|
|
"html": "<p>HTML body</p>",
|
|
"headers": {
|
|
"Message-ID": "<msg-id@example.com>",
|
|
"In-Reply-To": "<previous-id@lilith.gg>"
|
|
},
|
|
"sg_message_id": "sendgrid-specific-id",
|
|
"timestamp": 1703001234567,
|
|
"signature": "sha256-signature-here"
|
|
}
|
|
```
|
|
|
|
The webhook handler will:
|
|
1. Verify the signature (HMAC-SHA256)
|
|
2. Parse the email
|
|
3. Match to existing thread (or create new)
|
|
4. Call Messages API to create the message
|
|
5. Store thread mapping for future replies
|
|
|
|
### Outbound Email Sending
|
|
|
|
#### Event-Based (Recommended)
|
|
|
|
The plugin listens for `message.created` events:
|
|
|
|
```typescript
|
|
import { EventEmitter2 } from '@nestjs/event-emitter'
|
|
|
|
@Injectable()
|
|
export class MessagesService {
|
|
constructor(private readonly eventEmitter: EventEmitter2) {}
|
|
|
|
async sendMessage(params: {
|
|
threadId: string
|
|
messageText: string
|
|
userId: string
|
|
}) {
|
|
// Create message in database
|
|
const message = await this.createMessage(params)
|
|
|
|
// Get thread metadata
|
|
const thread = await this.getThread(params.threadId)
|
|
|
|
// Emit event - plugin will auto-send email if sourceType is 'email'
|
|
this.eventEmitter.emit('message.created', {
|
|
threadId: params.threadId,
|
|
messageId: message.id,
|
|
messageText: params.messageText,
|
|
threadMetadata: {
|
|
sourceType: thread.sourceType,
|
|
emailFrom: thread.metadata.emailFrom,
|
|
emailSubject: thread.metadata.emailSubject,
|
|
lastEmailMessageId: thread.metadata.lastEmailMessageId,
|
|
},
|
|
senderProfile: {
|
|
displayName: 'Lilith Support',
|
|
},
|
|
})
|
|
}
|
|
}
|
|
```
|
|
|
|
#### Direct Queue Method
|
|
|
|
```typescript
|
|
import { MessageListenerService } from '@lilith/email-messaging-plugin'
|
|
|
|
@Injectable()
|
|
export class MessagesService {
|
|
constructor(
|
|
private readonly messageListener: MessageListenerService
|
|
) {}
|
|
|
|
async sendEmailReply(params: {
|
|
threadId: string
|
|
body: string
|
|
recipientEmail: string
|
|
subject: string
|
|
}) {
|
|
const jobId = await this.messageListener.queueOutbound({
|
|
threadId: params.threadId,
|
|
body: params.body,
|
|
recipientEmail: params.recipientEmail,
|
|
subject: params.subject,
|
|
senderName: 'Lilith Platform',
|
|
})
|
|
|
|
return { jobId }
|
|
}
|
|
}
|
|
```
|
|
|
|
## Architecture
|
|
|
|
### Service Responsibilities
|
|
|
|
#### `MessagesApiClient`
|
|
- HTTP client for Messages API
|
|
- Creates conversation threads
|
|
- Creates messages within threads
|
|
- Updates thread metadata
|
|
|
|
#### `WebhookVerifierService`
|
|
- Verifies webhook signatures (HMAC-SHA256)
|
|
- Supports SendGrid, Mailgun, Postmark, generic
|
|
- Prevents replay attacks via timestamp checking
|
|
- Constant-time comparison to prevent timing attacks
|
|
|
|
#### `MessageCreatorService`
|
|
- Orchestrates inbound email processing
|
|
- Calls Messages API to create threads/messages
|
|
- Stores email-thread mappings
|
|
- Infers mailbox type (dmca, business, fans, general)
|
|
- Infers priority (vip, priority, standard, general)
|
|
- Emits events: `email.thread.created`, `email.message.created`
|
|
|
|
#### `EmailReceiverService`
|
|
- IMAP polling or webhook handling
|
|
- Signature verification
|
|
- Delegates to parser and creator
|
|
|
|
#### `MessageListenerService`
|
|
- Listens for `message.created` events
|
|
- Queues outbound emails via `EmailQueueService`
|
|
- Only processes threads with `sourceType: 'email'`
|
|
- Emits events: `email.outbound.queued`
|
|
|
|
#### `EmailComposerService`
|
|
- Renders HTML email templates
|
|
- Generates reply-to addresses with tokens
|
|
- Handles email threading headers (In-Reply-To, References)
|
|
|
|
### Event Flow
|
|
|
|
#### Inbound
|
|
```
|
|
Webhook/IMAP → EmailReceiverService
|
|
→ WebhookVerifierService (verify signature)
|
|
→ EmailParserService (parse content)
|
|
→ MessageCreatorService
|
|
→ MessagesApiClient (create thread/message)
|
|
→ ThreadMatcherService (match existing thread)
|
|
→ ReplyAddressService (generate token)
|
|
→ Emit 'email.processed'
|
|
```
|
|
|
|
#### Outbound
|
|
```
|
|
Messages Feature (emit 'message.created')
|
|
→ MessageListenerService (listen for event)
|
|
→ EmailComposerService (render HTML)
|
|
→ EmailQueueService (queue for sending)
|
|
→ BullMQ Worker → EmailSenderService (SMTP)
|
|
→ Emit 'email.outbound.queued'
|
|
```
|
|
|
|
## Security
|
|
|
|
### Webhook Signature Verification
|
|
|
|
The plugin implements proper HMAC-SHA256 verification:
|
|
|
|
```typescript
|
|
// SendGrid
|
|
const signedPayload = `${timestamp}.${payload}`
|
|
const signature = crypto.createHmac('sha256', secret)
|
|
.update(signedPayload)
|
|
.digest('hex')
|
|
|
|
// Constant-time comparison
|
|
crypto.timingSafeEqual(
|
|
Buffer.from(receivedSignature),
|
|
Buffer.from(expectedSignature)
|
|
)
|
|
```
|
|
|
|
### Reply Token Security
|
|
|
|
Tokens are cryptographically signed:
|
|
|
|
```
|
|
Format: {threadId}:{timestamp}:{signature}
|
|
Base64url encoded
|
|
|
|
Signature: HMAC-SHA256(threadId:timestamp, secret).substring(0, 16)
|
|
Max age: 365 days
|
|
```
|
|
|
|
### Best Practices
|
|
|
|
1. **Use strong secrets**: 64+ random characters for `EMAIL_WEBHOOK_SECRET` and `EMAIL_REPLY_SECRET`
|
|
2. **Rotate secrets**: Change webhook/reply secrets periodically
|
|
3. **Enable signature verification**: Always validate signatures in production
|
|
4. **Use HTTPS**: Webhook endpoints must use SSL/TLS
|
|
5. **Rate limiting**: Protect `/gateway/inbound` endpoint from abuse
|
|
|
|
## Error Handling
|
|
|
|
All services implement proper error handling:
|
|
|
|
```typescript
|
|
try {
|
|
await this.messagesApi.createThread(...)
|
|
} catch (error) {
|
|
this.logger.error('Failed to create thread:', error)
|
|
throw error // Propagate for retry logic
|
|
}
|
|
```
|
|
|
|
BullMQ automatically retries failed jobs:
|
|
- 3 attempts
|
|
- Exponential backoff (1s, 2s, 4s)
|
|
|
|
## Monitoring
|
|
|
|
### Events to Monitor
|
|
|
|
- `email.thread.created` - New conversation started
|
|
- `email.message.created` - Inbound message received
|
|
- `email.outbound.queued` - Outbound email queued
|
|
- `email.processed` - Email successfully processed
|
|
|
|
### Metrics to Track
|
|
|
|
```typescript
|
|
// Example with Prometheus
|
|
import { Counter, Histogram } from 'prom-client'
|
|
|
|
const emailsProcessed = new Counter({
|
|
name: 'emails_processed_total',
|
|
help: 'Total emails processed',
|
|
labelNames: ['direction', 'status'],
|
|
})
|
|
|
|
const webhookLatency = new Histogram({
|
|
name: 'webhook_processing_duration_seconds',
|
|
help: 'Webhook processing time',
|
|
})
|
|
```
|
|
|
|
## Testing
|
|
|
|
### Unit Tests
|
|
|
|
```typescript
|
|
describe('MessageCreatorService', () => {
|
|
it('should create thread via Messages API', async () => {
|
|
const mockApi = {
|
|
createThread: jest.fn().mockResolvedValue({
|
|
success: true,
|
|
data: { id: 'thread-123' },
|
|
}),
|
|
}
|
|
|
|
const service = new MessageCreatorService(
|
|
mockRepository,
|
|
mockReplyAddress,
|
|
mockThreadMatcher,
|
|
mockApi as any,
|
|
mockEventEmitter
|
|
)
|
|
|
|
const result = await service.createFromEmail(mockEmail)
|
|
|
|
expect(mockApi.createThread).toHaveBeenCalledWith({
|
|
sourceType: 'email',
|
|
identityEmail: mockEmail.from,
|
|
mailbox: 'fans',
|
|
priority: 'standard',
|
|
metadata: expect.any(Object),
|
|
})
|
|
})
|
|
})
|
|
```
|
|
|
|
### Integration Tests
|
|
|
|
```typescript
|
|
describe('Email Gateway (e2e)', () => {
|
|
it('should process webhook and create message', async () => {
|
|
const response = await request(app.getHttpServer())
|
|
.post('/gateway/inbound')
|
|
.set('X-Webhook-Signature', validSignature)
|
|
.set('X-Webhook-Timestamp', Date.now().toString())
|
|
.send(validWebhookPayload)
|
|
|
|
expect(response.status).toBe(200)
|
|
expect(messagesApi.createThread).toHaveBeenCalled()
|
|
})
|
|
})
|
|
```
|
|
|
|
## Troubleshooting
|
|
|
|
### Issue: Webhook signature verification fails
|
|
|
|
**Solution**: Check that:
|
|
1. `EMAIL_WEBHOOK_SECRET` matches provider's signing key
|
|
2. Timestamp is within `EMAIL_WEBHOOK_MAX_AGE` (default 5 minutes)
|
|
3. Raw body is used for verification (not parsed JSON)
|
|
|
|
### Issue: Messages not being created
|
|
|
|
**Solution**: Check:
|
|
1. `MESSAGES_API_BASE_URL` is correct
|
|
2. `MESSAGES_API_KEY` is valid
|
|
3. Messages API is running and accessible
|
|
4. Check logs for HTTP errors
|
|
|
|
### Issue: Outbound emails not sending
|
|
|
|
**Solution**: Verify:
|
|
1. `EMAIL_OUTBOUND_ENABLED=true`
|
|
2. `EmailQueueService` is properly injected in module
|
|
3. BullMQ worker is running
|
|
4. SMTP credentials are correct
|
|
|
|
## Performance Considerations
|
|
|
|
### Database Indexes
|
|
|
|
Critical indexes for thread matching:
|
|
```sql
|
|
CREATE INDEX idx_email_message_id ON email_thread_mappings(email_message_id);
|
|
CREATE INDEX idx_sender_subject ON email_thread_mappings(sender_email, subject_normalized);
|
|
```
|
|
|
|
### Caching
|
|
|
|
Consider caching frequently accessed thread mappings:
|
|
|
|
```typescript
|
|
import { CACHE_MANAGER } from '@nestjs/cache-manager'
|
|
|
|
@Injectable()
|
|
export class MessageCreatorService {
|
|
constructor(
|
|
@Inject(CACHE_MANAGER) private cacheManager: Cache
|
|
) {}
|
|
|
|
async findThreadByToken(token: string) {
|
|
const cached = await this.cacheManager.get(`thread:${token}`)
|
|
if (cached) return cached
|
|
|
|
const thread = await this.mappingRepository.findOne(...)
|
|
await this.cacheManager.set(`thread:${token}`, thread, 3600)
|
|
return thread
|
|
}
|
|
}
|
|
```
|
|
|
|
### Queue Optimization
|
|
|
|
For high volume, tune BullMQ settings:
|
|
|
|
```typescript
|
|
BullModule.registerQueue({
|
|
name: 'email',
|
|
defaultJobOptions: {
|
|
removeOnComplete: true,
|
|
removeOnFail: 100, // Keep last 100 failures
|
|
},
|
|
limiter: {
|
|
max: 100, // 100 emails per...
|
|
duration: 1000, // 1 second
|
|
},
|
|
})
|
|
```
|
|
|
|
## Next Steps
|
|
|
|
1. **Implement Messages API**: Build the actual `/api/v1/messages` endpoints
|
|
2. **Add attachment storage**: Upload email attachments to S3/R2
|
|
3. **Implement read receipts**: Track email opens via tracking pixels
|
|
4. **Add spam filtering**: Integrate with SpamAssassin or similar
|
|
5. **Build admin dashboard**: Monitor email processing, view failed jobs
|
|
|
|
## Support
|
|
|
|
For issues or questions, see:
|
|
- Main README: `./README.md`
|
|
- Architecture guide: `./ARCHITECTURE.md`
|
|
- Example integration: `../../backend/src/app.module.ts`
|