Skip to main content

Event Stream Processors

Event Stream processors enable real-time, asynchronous message processing in social.dev applications using Apache Kafka. They allow modules and plugins to consume events from topics using consumer groups, ensuring reliable and scalable event-driven architectures.

Overview

Event Stream processors are built on top of Kafka's consumer group model, providing:

  • Reliable message processing with automatic offset management
  • Scalable consumption through consumer groups
  • Network isolation via automatic context propagation
  • Topic management with automatic creation

Creating an Event Stream Processor

To create an event stream processor, implement the EventStreamProcessorInterface:

import { Injectable } from '@nestjs/common';
import { EventStreamProcessorInterface } from '@social.dev/server-sdk/core/event-stream/event-stream-processor.interface';
import { Message } from '@social.dev/server-sdk/core/event-stream/types';

@Injectable()
export class MyEventProcessor implements EventStreamProcessorInterface {
// Consumer group ID (unique per processor)
readonly groupId = 'my-processor-group';

// Topics to subscribe to
readonly topics = ['user-events', 'post-events'];

// Message handler
async consume(event: Message): void {
const data = JSON.parse(event.value);

// Process your message
console.log('Processing event:', data);

// Commit the message when done
event.commit();
}
}

Registering Processors

Register your processor with the EventStreamModule in your module:

import { Module } from '@nestjs/common';
import { EventStreamModule } from '@social.dev/server-sdk/core/event-stream/event-stream.module';
import { MyEventProcessor } from './my-event.processor';

@Module({
imports: [
EventStreamModule.forFeature({
processors: [MyEventProcessor],
topics: ['user-events', 'post-events'], // Declare topics to be created
})
],
providers: [MyEventProcessor],
})
export class MyFeatureModule {}

Message Structure

Messages received by processors have the following structure:

interface Message {
topic: string; // Topic the message came from
partition: number; // Partition number
offset: bigint; // Message offset
key: string | null; // Message key
value: string; // Message payload (typically JSON)
timestamp: number; // Message timestamp
headers: Map<string, string>; // Message headers
commit(): Promise<void>; // Commit function
}

Publishing Events

Use the EventStreamService to publish events to topics:

import { Injectable } from '@nestjs/common';
import { EventStreamService } from '@social.dev/server-sdk/core/event-stream/event-stream.service';

@Injectable()
export class MyService {
constructor(private eventStream: EventStreamService) {}

async publishEvent(data: any): Promise<void> {
await this.eventStream.send({
topic: 'user-events',
messages: [
{
key: `user-${data.userId}`,
value: JSON.stringify(data)
}
]
});
}
}

## Advanced Configuration

### Consumer Options

Customize consumer behavior through the `opts` property:

```typescript
import { MessagesStreamModes } from '@platformatic/kafka';

export class AdvancedProcessor implements EventStreamProcessorInterface {
readonly groupId = 'advanced-processor';
readonly topics = ['events'];

// Custom consumer options
readonly opts = {
mode: MessagesStreamModes.LATEST, // Start from latest messages
autocommit: false, // Manual commit (default)
sessionTimeout: 30000, // Session timeout in ms
heartbeatInterval: 3000, // Heartbeat interval in ms
};

async consume(event: Message): void {
// Process event
event.commit();
}
}

Error Handling

Implement proper error handling to prevent message loss:

async consume(event: Message): void {
try {
const data = JSON.parse(event.value);

// Process the event
await this.processEvent(data);

// Commit on success
await event.commit();
} catch (error) {
console.error('Failed to process event:', error);

// Decide whether to:
// 1. Retry processing
// 2. Send to dead letter queue
// 3. Skip and commit anyway

// Example: Retry logic
if (this.shouldRetry(error)) {
await this.retryEvent(event);
} else {
// Log error and commit to move forward
await this.logError(event, error);
await event.commit();
}
}
}

Real-World Example: Chat AI Processor

Here's a complete example from the AI plugin that processes chat messages:

import { Injectable } from '@nestjs/common';
import { EventStreamProcessorInterface } from '@social.dev/server-sdk/core/event-stream/event-stream-processor.interface';
import { Message } from '@social.dev/server-sdk/core/event-stream/types';
import { ChatMessage } from '@social.dev/server-sdk/chat/entities/message.entity';
import { OpenAiService } from './openai';

@Injectable()
export class AiEventStreamProcessor implements EventStreamProcessorInterface {
readonly groupId = 'plugin-ai-chat';
readonly topics = ['chat'];

constructor(private openai: OpenAiService) {}

async consume(event: Message): void {
const { message }: { message: ChatMessage } = JSON.parse(event.value);

// Process message with AI
await this.openai.onMessage({ message });

// Commit when done
event.commit();
}
}

Testing Event Stream Processors

import { Test } from '@nestjs/testing';
import { MyEventProcessor } from './my-event.processor';

describe('MyEventProcessor', () => {
let processor: MyEventProcessor;

beforeEach(async () => {
const module = await Test.createTestingModule({
providers: [MyEventProcessor],
}).compile();

processor = module.get(MyEventProcessor);
});

it('should process messages correctly', async () => {
const mockMessage = {
topic: 'test-topic',
value: JSON.stringify({ type: 'test', data: 'example' }),
commit: jest.fn(),
headers: new Map([['networkId', '1']]),
} as any;

await processor.consume(mockMessage);

expect(mockMessage.commit).toHaveBeenCalled();
});
});

Best Practices

  1. Idempotent Processing: Design processors to handle duplicate messages safely
  2. Error Handling: Always handle errors gracefully to prevent consumer group stalls
  3. Commit Strategy: Commit messages only after successful processing
  4. Monitoring: Log processing metrics and errors for observability
  5. Testing: Write unit tests for your processors with mock messages
  6. Graceful Shutdown: Processors automatically handle shutdown signals

Configuration

Event Stream processors connect to Kafka using these default settings:

  • Client ID: social.dev
  • Brokers: localhost:9092 (configurable via environment)
  • Consumer Group ID: social.dev-{your-groupId}
  • Auto Topic Creation: Enabled for declared topics

Troubleshooting

Consumer Not Receiving Messages

  • Verify the topic exists and has messages
  • Check consumer group offset position
  • Ensure network context matches published messages

Processing Delays

  • Check session timeout and heartbeat settings
  • Monitor consumer lag metrics
  • Consider increasing parallelism with more consumer instances

Message Loss

  • Ensure proper commit handling in error scenarios
  • Implement retry logic for transient failures
  • Consider dead letter queue for permanent failures