Event Stream Processors
Event Stream processors enable real-time, asynchronous message processing in social.dev applications using Apache Kafka. They allow modules and plugins to consume events from topics using consumer groups, ensuring reliable and scalable event-driven architectures.
Overview
Event Stream processors are built on top of Kafka's consumer group model, providing:
- Reliable message processing with automatic offset management
- Scalable consumption through consumer groups
- Network isolation via automatic context propagation
- Topic management with automatic creation
Creating an Event Stream Processor
To create an event stream processor, implement the EventStreamProcessorInterface:
import { Injectable } from '@nestjs/common';
import { EventStreamProcessorInterface } from '@social.dev/server-sdk/core/event-stream/event-stream-processor.interface';
import { Message } from '@social.dev/server-sdk/core/event-stream/types';
@Injectable()
export class MyEventProcessor implements EventStreamProcessorInterface {
// Consumer group ID (unique per processor)
readonly groupId = 'my-processor-group';
// Topics to subscribe to
readonly topics = ['user-events', 'post-events'];
// Message handler
async consume(event: Message): void {
const data = JSON.parse(event.value);
// Process your message
console.log('Processing event:', data);
// Commit the message when done
event.commit();
}
}
Registering Processors
Register your processor with the EventStreamModule in your module:
import { Module } from '@nestjs/common';
import { EventStreamModule } from '@social.dev/server-sdk/core/event-stream/event-stream.module';
import { MyEventProcessor } from './my-event.processor';
@Module({
imports: [
EventStreamModule.forFeature({
processors: [MyEventProcessor],
topics: ['user-events', 'post-events'], // Declare topics to be created
})
],
providers: [MyEventProcessor],
})
export class MyFeatureModule {}
Message Structure
Messages received by processors have the following structure:
interface Message {
topic: string; // Topic the message came from
partition: number; // Partition number
offset: bigint; // Message offset
key: string | null; // Message key
value: string; // Message payload (typically JSON)
timestamp: number; // Message timestamp
headers: Map<string, string>; // Message headers
commit(): Promise<void>; // Commit function
}
Publishing Events
Use the EventStreamService to publish events to topics:
import { Injectable } from '@nestjs/common';
import { EventStreamService } from '@social.dev/server-sdk/core/event-stream/event-stream.service';
@Injectable()
export class MyService {
constructor(private eventStream: EventStreamService) {}
async publishEvent(data: any): Promise<void> {
await this.eventStream.send({
topic: 'user-events',
messages: [
{
key: `user-${data.userId}`,
value: JSON.stringify(data)
}
]
});
}
}
## Advanced Configuration
### Consumer Options
Customize consumer behavior through the `opts` property:
```typescript
import { MessagesStreamModes } from '@platformatic/kafka';
export class AdvancedProcessor implements EventStreamProcessorInterface {
readonly groupId = 'advanced-processor';
readonly topics = ['events'];
// Custom consumer options
readonly opts = {
mode: MessagesStreamModes.LATEST, // Start from latest messages
autocommit: false, // Manual commit (default)
sessionTimeout: 30000, // Session timeout in ms
heartbeatInterval: 3000, // Heartbeat interval in ms
};
async consume(event: Message): void {
// Process event
event.commit();
}
}
Error Handling
Implement proper error handling to prevent message loss:
async consume(event: Message): void {
try {
const data = JSON.parse(event.value);
// Process the event
await this.processEvent(data);
// Commit on success
await event.commit();
} catch (error) {
console.error('Failed to process event:', error);
// Decide whether to:
// 1. Retry processing
// 2. Send to dead letter queue
// 3. Skip and commit anyway
// Example: Retry logic
if (this.shouldRetry(error)) {
await this.retryEvent(event);
} else {
// Log error and commit to move forward
await this.logError(event, error);
await event.commit();
}
}
}
Real-World Example: Chat AI Processor
Here's a complete example from the AI plugin that processes chat messages:
import { Injectable } from '@nestjs/common';
import { EventStreamProcessorInterface } from '@social.dev/server-sdk/core/event-stream/event-stream-processor.interface';
import { Message } from '@social.dev/server-sdk/core/event-stream/types';
import { ChatMessage } from '@social.dev/server-sdk/chat/entities/message.entity';
import { OpenAiService } from './openai';
@Injectable()
export class AiEventStreamProcessor implements EventStreamProcessorInterface {
readonly groupId = 'plugin-ai-chat';
readonly topics = ['chat'];
constructor(private openai: OpenAiService) {}
async consume(event: Message): void {
const { message }: { message: ChatMessage } = JSON.parse(event.value);
// Process message with AI
await this.openai.onMessage({ message });
// Commit when done
event.commit();
}
}
Testing Event Stream Processors
import { Test } from '@nestjs/testing';
import { MyEventProcessor } from './my-event.processor';
describe('MyEventProcessor', () => {
let processor: MyEventProcessor;
beforeEach(async () => {
const module = await Test.createTestingModule({
providers: [MyEventProcessor],
}).compile();
processor = module.get(MyEventProcessor);
});
it('should process messages correctly', async () => {
const mockMessage = {
topic: 'test-topic',
value: JSON.stringify({ type: 'test', data: 'example' }),
commit: jest.fn(),
headers: new Map([['networkId', '1']]),
} as any;
await processor.consume(mockMessage);
expect(mockMessage.commit).toHaveBeenCalled();
});
});
Best Practices
- Idempotent Processing: Design processors to handle duplicate messages safely
- Error Handling: Always handle errors gracefully to prevent consumer group stalls
- Commit Strategy: Commit messages only after successful processing
- Monitoring: Log processing metrics and errors for observability
- Testing: Write unit tests for your processors with mock messages
- Graceful Shutdown: Processors automatically handle shutdown signals
Configuration
Event Stream processors connect to Kafka using these default settings:
- Client ID:
social.dev - Brokers:
localhost:9092(configurable via environment) - Consumer Group ID:
social.dev-{your-groupId} - Auto Topic Creation: Enabled for declared topics
Troubleshooting
Consumer Not Receiving Messages
- Verify the topic exists and has messages
- Check consumer group offset position
- Ensure network context matches published messages
Processing Delays
- Check session timeout and heartbeat settings
- Monitor consumer lag metrics
- Consider increasing parallelism with more consumer instances
Message Loss
- Ensure proper commit handling in error scenarios
- Implement retry logic for transient failures
- Consider dead letter queue for permanent failures