Event-Driven Architecture with Apache Kafka and Node.js
Introduction
Apache Kafka enables building event-driven architectures with high-throughput message streaming. This guide covers Kafka integration with Node.js for scalable microservices.
Prerequisites
- Apache Kafka cluster
- Node.js >=16
- Basic understanding of event-driven patterns
Step 1: Install Kafka Client
npm install kafkajs
npm install @types/kafkajs
Step 2: Kafka Connection Setup
Create lib/kafka.ts
:
import { Kafka, logLevel } from 'kafkajs';
const kafka = new Kafka({
clientId: 'nodejs-app',
brokers: process.env.KAFKA_BROKERS?.split(',') || ['localhost:9092'],
logLevel: logLevel.INFO,
retry: {
initialRetryTime: 100,
retries: 8,
},
connectionTimeout: 3000,
authenticationTimeout: 1000,
reauthenticationThreshold: 10000,
});
export default kafka;
// Create admin client for topic management
export const admin = kafka.admin();
// Helper function to create topics
export async function createTopics(topics: string[]) {
await admin.connect();
const topicConfigs = topics.map(topic => ({
topic,
numPartitions: 3,
replicationFactor: 1,
configEntries: [
{ name: 'retention.ms', value: '86400000' }, // 1 day
{ name: 'compression.type', value: 'gzip' },
],
}));
await admin.createTopics({
topics: topicConfigs,
waitForLeaders: true,
});
await admin.disconnect();
}
Step 3: Event Producer Service
Create services/event-producer.ts
:
import kafka from '../lib/kafka';
import { Producer, ProducerRecord } from 'kafkajs';
class EventProducer {
private producer: Producer;
private isConnected = false;
constructor() {
this.producer = kafka.producer({
maxInFlightRequests: 1,
idempotent: true,
transactionTimeout: 30000,
retry: {
retries: 5,
initialRetryTime: 300,
},
});
}
async connect(): Promise<void> {
if (!this.isConnected) {
await this.producer.connect();
this.isConnected = true;
console.log('Kafka producer connected');
}
}
async disconnect(): Promise<void> {
if (this.isConnected) {
await this.producer.disconnect();
this.isConnected = false;
console.log('Kafka producer disconnected');
}
}
async publishEvent(topic: string, event: any, key?: string): Promise<void> {
await this.connect();
const message = {
key,
value: JSON.stringify({
...event,
timestamp: new Date().toISOString(),
eventId: this.generateEventId(),
}),
headers: {
'event-type': event.type,
'service': 'nodejs-app',
},
};
await this.producer.send({
topic,
messages: [message],
});
console.log(`Event published to ${topic}:`, event.type);
}
async publishBatch(topic: string, events: any[]): Promise<void> {
await this.connect();
const messages = events.map(event => ({
key: event.userId || event.id,
value: JSON.stringify({
...event,
timestamp: new Date().toISOString(),
eventId: this.generateEventId(),
}),
headers: {
'event-type': event.type,
'service': 'nodejs-app',
},
}));
await this.producer.send({
topic,
messages,
});
console.log(`Batch of ${events.length} events published to ${topic}`);
}
async publishWithTransaction(records: ProducerRecord[]): Promise<void> {
await this.connect();
const transaction = await this.producer.transaction();
try {
for (const record of records) {
await transaction.send(record);
}
await transaction.commit();
console.log('Transactional publish completed');
} catch (error) {
await transaction.abort();
console.error('Transaction aborted:', error);
throw error;
}
}
private generateEventId(): string {
return `${Date.now()}-${Math.random().toString(36).substring(2, 9)}`;
}
}
export default EventProducer;
Step 4: Event Consumer Service
Create services/event-consumer.ts
:
import kafka from '../lib/kafka';
import { Consumer, EachMessagePayload } from 'kafkajs';
type EventHandler = (event: any) => Promise<void>;
class EventConsumer {
private consumer: Consumer;
private eventHandlers: Map<string, EventHandler[]> = new Map();
constructor(groupId: string) {
this.consumer = kafka.consumer({
groupId,
sessionTimeout: 30000,
rebalanceTimeout: 60000,
heartbeatInterval: 3000,
retry: {
retries: 10,
initialRetryTime: 300,
},
});
}
async connect(): Promise<void> {
await this.consumer.connect();
console.log('Kafka consumer connected');
}
async disconnect(): Promise<void> {
await this.consumer.disconnect();
console.log('Kafka consumer disconnected');
}
async subscribe(topics: string[]): Promise<void> {
await this.consumer.subscribe({
topics,
fromBeginning: false,
});
}
registerEventHandler(eventType: string, handler: EventHandler): void {
if (!this.eventHandlers.has(eventType)) {
this.eventHandlers.set(eventType, []);
}
this.eventHandlers.get(eventType)!.push(handler);
console.log(`Registered handler for event type: ${eventType}`);
}
async startConsuming(): Promise<void> {
await this.consumer.run({
eachMessage: async (payload: EachMessagePayload) => {
await this.handleMessage(payload);
},
});
}
private async handleMessage(payload: EachMessagePayload): Promise<void> {
const { topic, partition, message } = payload;
try {
const eventData = JSON.parse(message.value?.toString() || '{}');
const eventType = message.headers?.['event-type']?.toString() || eventData.type;
console.log(`Processing event from ${topic}:${partition}`, {
eventType,
eventId: eventData.eventId,
offset: message.offset,
});
const handlers = this.eventHandlers.get(eventType) || [];
// Process handlers in parallel
await Promise.all(
handlers.map(async (handler) => {
try {
await handler(eventData);
} catch (error) {
console.error(`Handler failed for event ${eventType}:`, error);
// Implement dead letter queue logic here
await this.handleFailedEvent(eventData, error);
}
})
);
console.log(`Event processed successfully: ${eventType}`);
} catch (error) {
console.error('Failed to process message:', error);
throw error; // This will cause the message to be retried
}
}
private async handleFailedEvent(event: any, error: any): Promise<void> {
// Send to dead letter topic
const producer = kafka.producer();
await producer.connect();
await producer.send({
topic: 'dead-letter-queue',
messages: [{
key: event.eventId,
value: JSON.stringify({
originalEvent: event,
error: error.message,
failedAt: new Date().toISOString(),
}),
}],
});
await producer.disconnect();
}
}
export default EventConsumer;
Step 5: Event Schemas and Types
Create types/events.ts
:
export interface BaseEvent {
type: string;
timestamp: string;
eventId: string;
userId?: string;
correlationId?: string;
}
export interface UserCreatedEvent extends BaseEvent {
type: 'USER_CREATED';
userId: string;
email: string;
name: string;
}
export interface OrderPlacedEvent extends BaseEvent {
type: 'ORDER_PLACED';
orderId: string;
userId: string;
items: Array<{
productId: string;
quantity: number;
price: number;
}>;
totalAmount: number;
}
export interface PaymentProcessedEvent extends BaseEvent {
type: 'PAYMENT_PROCESSED';
orderId: string;
paymentId: string;
amount: number;
status: 'SUCCESS' | 'FAILED';
}
export interface InventoryUpdatedEvent extends BaseEvent {
type: 'INVENTORY_UPDATED';
productId: string;
quantity: number;
operation: 'INCREASE' | 'DECREASE';
}
export type DomainEvent =
| UserCreatedEvent
| OrderPlacedEvent
| PaymentProcessedEvent
| InventoryUpdatedEvent;
Step 6: Express.js Integration
Create app.ts
:
import express from 'express';
import EventProducer from './services/event-producer';
import EventConsumer from './services/event-consumer';
import { createTopics } from './lib/kafka';
import { UserCreatedEvent, OrderPlacedEvent } from './types/events';
const app = express();
const producer = new EventProducer();
const consumer = new EventConsumer('user-service-group');
app.use(express.json());
// Initialize Kafka topics
async function initializeTopics() {
await createTopics([
'user-events',
'order-events',
'payment-events',
'inventory-events',
'dead-letter-queue',
]);
}
// API Routes
app.post('/users', async (req, res) => {
try {
const { name, email } = req.body;
const userId = Date.now().toString();
// Create user in database (not shown)
// ...
// Publish event
const event: UserCreatedEvent = {
type: 'USER_CREATED',
userId,
email,
name,
timestamp: new Date().toISOString(),
eventId: '',
};
await producer.publishEvent('user-events', event, userId);
res.status(201).json({ userId, name, email });
} catch (error) {
console.error('Error creating user:', error);
res.status(500).json({ error: 'Internal server error' });
}
});
app.post('/orders', async (req, res) => {
try {
const { userId, items } = req.body;
const orderId = Date.now().toString();
const totalAmount = items.reduce((sum: number, item: any) =>
sum + (item.price * item.quantity), 0);
// Create order in database (not shown)
// ...
// Publish event
const event: OrderPlacedEvent = {
type: 'ORDER_PLACED',
orderId,
userId,
items,
totalAmount,
timestamp: new Date().toISOString(),
eventId: '',
};
await producer.publishEvent('order-events', event, orderId);
res.status(201).json({ orderId, userId, items, totalAmount });
} catch (error) {
console.error('Error creating order:', error);
res.status(500).json({ error: 'Internal server error' });
}
});
// Event Handlers
consumer.registerEventHandler('USER_CREATED', async (event: UserCreatedEvent) => {
console.log('Processing user created event:', event.userId);
// Send welcome email, update analytics, etc.
});
consumer.registerEventHandler('ORDER_PLACED', async (event: OrderPlacedEvent) => {
console.log('Processing order placed event:', event.orderId);
// Update inventory, send confirmation email, etc.
});
// Start server
async function startServer() {
await initializeTopics();
await producer.connect();
await consumer.connect();
await consumer.subscribe(['user-events', 'order-events']);
await consumer.startConsuming();
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`Server running on port ${PORT}`);
});
}
// Graceful shutdown
process.on('SIGINT', async () => {
console.log('Shutting down...');
await producer.disconnect();
await consumer.disconnect();
process.exit(0);
});
startServer().catch(console.error);
export default app;
Step 7: Stream Processing
Create services/stream-processor.ts
:
import kafka from '../lib/kafka';
class StreamProcessor {
private consumer = kafka.consumer({ groupId: 'stream-processor' });
private producer = kafka.producer();
async processOrderStream() {
await this.consumer.connect();
await this.producer.connect();
await this.consumer.subscribe({ topics: ['order-events'] });
const orderTotals = new Map<string, number>();
await this.consumer.run({
eachMessage: async ({ message }) => {
const event = JSON.parse(message.value!.toString());
if (event.type === 'ORDER_PLACED') {
const userId = event.userId;
const currentTotal = orderTotals.get(userId) || 0;
const newTotal = currentTotal + event.totalAmount;
orderTotals.set(userId, newTotal);
// Publish aggregated event
if (newTotal > 1000) {
await this.producer.send({
topic: 'user-events',
messages: [{
key: userId,
value: JSON.stringify({
type: 'USER_VIP_STATUS_ACHIEVED',
userId,
totalSpent: newTotal,
timestamp: new Date().toISOString(),
}),
}],
});
}
}
},
});
}
}
export default StreamProcessor;
Summary
Apache Kafka with Node.js enables event-driven architecture through reliable message streaming. Use producers for publishing events, consumers for processing, and implement proper error handling with dead letter queues for resilient microservices.