Skip to content
Go back

Event-Driven Architecture with Apache Kafka and Node.js

Event-Driven Architecture with Apache Kafka and Node.js

Introduction

Apache Kafka enables building event-driven architectures with high-throughput message streaming. This guide covers Kafka integration with Node.js for scalable microservices.

Prerequisites

Step 1: Install Kafka Client

npm install kafkajs
npm install @types/kafkajs

Step 2: Kafka Connection Setup

Create lib/kafka.ts:

import { Kafka, logLevel } from 'kafkajs';

const kafka = new Kafka({
  clientId: 'nodejs-app',
  brokers: process.env.KAFKA_BROKERS?.split(',') || ['localhost:9092'],
  logLevel: logLevel.INFO,
  retry: {
    initialRetryTime: 100,
    retries: 8,
  },
  connectionTimeout: 3000,
  authenticationTimeout: 1000,
  reauthenticationThreshold: 10000,
});

export default kafka;

// Create admin client for topic management
export const admin = kafka.admin();

// Helper function to create topics
export async function createTopics(topics: string[]) {
  await admin.connect();
  
  const topicConfigs = topics.map(topic => ({
    topic,
    numPartitions: 3,
    replicationFactor: 1,
    configEntries: [
      { name: 'retention.ms', value: '86400000' }, // 1 day
      { name: 'compression.type', value: 'gzip' },
    ],
  }));
  
  await admin.createTopics({
    topics: topicConfigs,
    waitForLeaders: true,
  });
  
  await admin.disconnect();
}

Step 3: Event Producer Service

Create services/event-producer.ts:

import kafka from '../lib/kafka';
import { Producer, ProducerRecord } from 'kafkajs';

class EventProducer {
  private producer: Producer;
  private isConnected = false;

  constructor() {
    this.producer = kafka.producer({
      maxInFlightRequests: 1,
      idempotent: true,
      transactionTimeout: 30000,
      retry: {
        retries: 5,
        initialRetryTime: 300,
      },
    });
  }

  async connect(): Promise<void> {
    if (!this.isConnected) {
      await this.producer.connect();
      this.isConnected = true;
      console.log('Kafka producer connected');
    }
  }

  async disconnect(): Promise<void> {
    if (this.isConnected) {
      await this.producer.disconnect();
      this.isConnected = false;
      console.log('Kafka producer disconnected');
    }
  }

  async publishEvent(topic: string, event: any, key?: string): Promise<void> {
    await this.connect();
    
    const message = {
      key,
      value: JSON.stringify({
        ...event,
        timestamp: new Date().toISOString(),
        eventId: this.generateEventId(),
      }),
      headers: {
        'event-type': event.type,
        'service': 'nodejs-app',
      },
    };

    await this.producer.send({
      topic,
      messages: [message],
    });

    console.log(`Event published to ${topic}:`, event.type);
  }

  async publishBatch(topic: string, events: any[]): Promise<void> {
    await this.connect();
    
    const messages = events.map(event => ({
      key: event.userId || event.id,
      value: JSON.stringify({
        ...event,
        timestamp: new Date().toISOString(),
        eventId: this.generateEventId(),
      }),
      headers: {
        'event-type': event.type,
        'service': 'nodejs-app',
      },
    }));

    await this.producer.send({
      topic,
      messages,
    });

    console.log(`Batch of ${events.length} events published to ${topic}`);
  }

  async publishWithTransaction(records: ProducerRecord[]): Promise<void> {
    await this.connect();
    
    const transaction = await this.producer.transaction();
    
    try {
      for (const record of records) {
        await transaction.send(record);
      }
      
      await transaction.commit();
      console.log('Transactional publish completed');
    } catch (error) {
      await transaction.abort();
      console.error('Transaction aborted:', error);
      throw error;
    }
  }

  private generateEventId(): string {
    return `${Date.now()}-${Math.random().toString(36).substring(2, 9)}`;
  }
}

export default EventProducer;

Step 4: Event Consumer Service

Create services/event-consumer.ts:

import kafka from '../lib/kafka';
import { Consumer, EachMessagePayload } from 'kafkajs';

type EventHandler = (event: any) => Promise<void>;

class EventConsumer {
  private consumer: Consumer;
  private eventHandlers: Map<string, EventHandler[]> = new Map();

  constructor(groupId: string) {
    this.consumer = kafka.consumer({
      groupId,
      sessionTimeout: 30000,
      rebalanceTimeout: 60000,
      heartbeatInterval: 3000,
      retry: {
        retries: 10,
        initialRetryTime: 300,
      },
    });
  }

  async connect(): Promise<void> {
    await this.consumer.connect();
    console.log('Kafka consumer connected');
  }

  async disconnect(): Promise<void> {
    await this.consumer.disconnect();
    console.log('Kafka consumer disconnected');
  }

  async subscribe(topics: string[]): Promise<void> {
    await this.consumer.subscribe({
      topics,
      fromBeginning: false,
    });
  }

  registerEventHandler(eventType: string, handler: EventHandler): void {
    if (!this.eventHandlers.has(eventType)) {
      this.eventHandlers.set(eventType, []);
    }
    
    this.eventHandlers.get(eventType)!.push(handler);
    console.log(`Registered handler for event type: ${eventType}`);
  }

  async startConsuming(): Promise<void> {
    await this.consumer.run({
      eachMessage: async (payload: EachMessagePayload) => {
        await this.handleMessage(payload);
      },
    });
  }

  private async handleMessage(payload: EachMessagePayload): Promise<void> {
    const { topic, partition, message } = payload;
    
    try {
      const eventData = JSON.parse(message.value?.toString() || '{}');
      const eventType = message.headers?.['event-type']?.toString() || eventData.type;
      
      console.log(`Processing event from ${topic}:${partition}`, {
        eventType,
        eventId: eventData.eventId,
        offset: message.offset,
      });

      const handlers = this.eventHandlers.get(eventType) || [];
      
      // Process handlers in parallel
      await Promise.all(
        handlers.map(async (handler) => {
          try {
            await handler(eventData);
          } catch (error) {
            console.error(`Handler failed for event ${eventType}:`, error);
            // Implement dead letter queue logic here
            await this.handleFailedEvent(eventData, error);
          }
        })
      );

      console.log(`Event processed successfully: ${eventType}`);
    } catch (error) {
      console.error('Failed to process message:', error);
      throw error; // This will cause the message to be retried
    }
  }

  private async handleFailedEvent(event: any, error: any): Promise<void> {
    // Send to dead letter topic
    const producer = kafka.producer();
    await producer.connect();
    
    await producer.send({
      topic: 'dead-letter-queue',
      messages: [{
        key: event.eventId,
        value: JSON.stringify({
          originalEvent: event,
          error: error.message,
          failedAt: new Date().toISOString(),
        }),
      }],
    });
    
    await producer.disconnect();
  }
}

export default EventConsumer;

Step 5: Event Schemas and Types

Create types/events.ts:

export interface BaseEvent {
  type: string;
  timestamp: string;
  eventId: string;
  userId?: string;
  correlationId?: string;
}

export interface UserCreatedEvent extends BaseEvent {
  type: 'USER_CREATED';
  userId: string;
  email: string;
  name: string;
}

export interface OrderPlacedEvent extends BaseEvent {
  type: 'ORDER_PLACED';
  orderId: string;
  userId: string;
  items: Array<{
    productId: string;
    quantity: number;
    price: number;
  }>;
  totalAmount: number;
}

export interface PaymentProcessedEvent extends BaseEvent {
  type: 'PAYMENT_PROCESSED';
  orderId: string;
  paymentId: string;
  amount: number;
  status: 'SUCCESS' | 'FAILED';
}

export interface InventoryUpdatedEvent extends BaseEvent {
  type: 'INVENTORY_UPDATED';
  productId: string;
  quantity: number;
  operation: 'INCREASE' | 'DECREASE';
}

export type DomainEvent = 
  | UserCreatedEvent 
  | OrderPlacedEvent 
  | PaymentProcessedEvent 
  | InventoryUpdatedEvent;

Step 6: Express.js Integration

Create app.ts:

import express from 'express';
import EventProducer from './services/event-producer';
import EventConsumer from './services/event-consumer';
import { createTopics } from './lib/kafka';
import { UserCreatedEvent, OrderPlacedEvent } from './types/events';

const app = express();
const producer = new EventProducer();
const consumer = new EventConsumer('user-service-group');

app.use(express.json());

// Initialize Kafka topics
async function initializeTopics() {
  await createTopics([
    'user-events',
    'order-events',
    'payment-events',
    'inventory-events',
    'dead-letter-queue',
  ]);
}

// API Routes
app.post('/users', async (req, res) => {
  try {
    const { name, email } = req.body;
    const userId = Date.now().toString();
    
    // Create user in database (not shown)
    // ...
    
    // Publish event
    const event: UserCreatedEvent = {
      type: 'USER_CREATED',
      userId,
      email,
      name,
      timestamp: new Date().toISOString(),
      eventId: '',
    };
    
    await producer.publishEvent('user-events', event, userId);
    
    res.status(201).json({ userId, name, email });
  } catch (error) {
    console.error('Error creating user:', error);
    res.status(500).json({ error: 'Internal server error' });
  }
});

app.post('/orders', async (req, res) => {
  try {
    const { userId, items } = req.body;
    const orderId = Date.now().toString();
    const totalAmount = items.reduce((sum: number, item: any) => 
      sum + (item.price * item.quantity), 0);
    
    // Create order in database (not shown)
    // ...
    
    // Publish event
    const event: OrderPlacedEvent = {
      type: 'ORDER_PLACED',
      orderId,
      userId,
      items,
      totalAmount,
      timestamp: new Date().toISOString(),
      eventId: '',
    };
    
    await producer.publishEvent('order-events', event, orderId);
    
    res.status(201).json({ orderId, userId, items, totalAmount });
  } catch (error) {
    console.error('Error creating order:', error);
    res.status(500).json({ error: 'Internal server error' });
  }
});

// Event Handlers
consumer.registerEventHandler('USER_CREATED', async (event: UserCreatedEvent) => {
  console.log('Processing user created event:', event.userId);
  // Send welcome email, update analytics, etc.
});

consumer.registerEventHandler('ORDER_PLACED', async (event: OrderPlacedEvent) => {
  console.log('Processing order placed event:', event.orderId);
  // Update inventory, send confirmation email, etc.
});

// Start server
async function startServer() {
  await initializeTopics();
  await producer.connect();
  await consumer.connect();
  await consumer.subscribe(['user-events', 'order-events']);
  await consumer.startConsuming();
  
  const PORT = process.env.PORT || 3000;
  app.listen(PORT, () => {
    console.log(`Server running on port ${PORT}`);
  });
}

// Graceful shutdown
process.on('SIGINT', async () => {
  console.log('Shutting down...');
  await producer.disconnect();
  await consumer.disconnect();
  process.exit(0);
});

startServer().catch(console.error);

export default app;

Step 7: Stream Processing

Create services/stream-processor.ts:

import kafka from '../lib/kafka';

class StreamProcessor {
  private consumer = kafka.consumer({ groupId: 'stream-processor' });
  private producer = kafka.producer();

  async processOrderStream() {
    await this.consumer.connect();
    await this.producer.connect();
    
    await this.consumer.subscribe({ topics: ['order-events'] });
    
    const orderTotals = new Map<string, number>();
    
    await this.consumer.run({
      eachMessage: async ({ message }) => {
        const event = JSON.parse(message.value!.toString());
        
        if (event.type === 'ORDER_PLACED') {
          const userId = event.userId;
          const currentTotal = orderTotals.get(userId) || 0;
          const newTotal = currentTotal + event.totalAmount;
          
          orderTotals.set(userId, newTotal);
          
          // Publish aggregated event
          if (newTotal > 1000) {
            await this.producer.send({
              topic: 'user-events',
              messages: [{
                key: userId,
                value: JSON.stringify({
                  type: 'USER_VIP_STATUS_ACHIEVED',
                  userId,
                  totalSpent: newTotal,
                  timestamp: new Date().toISOString(),
                }),
              }],
            });
          }
        }
      },
    });
  }
}

export default StreamProcessor;

Summary

Apache Kafka with Node.js enables event-driven architecture through reliable message streaming. Use producers for publishing events, consumers for processing, and implement proper error handling with dead letter queues for resilient microservices.


Share this post on:

Previous Post
Real-Time Data Processing with Apache Flink and Kafka
Next Post
Advanced Kubernetes Networking with Calico