Skip to content
Go back

Scaling WebSocket Connections with Redis and Clustering

Scaling WebSocket Connections with Redis and Clustering

Introduction

Scaling WebSocket connections across multiple servers requires Redis pub/sub for message broadcasting and proper load balancing to maintain real-time communication.

Prerequisites

Step 1: Install Dependencies

npm install socket.io socket.io-redis redis
npm install -D @types/socket.io @types/redis

Step 2: Basic WebSocket Server with Redis Adapter

Create src/websocket-server.ts:

import { Server } from 'socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';
import http from 'http';
import express from 'express';

class ScalableWebSocketServer {
  private io: Server;
  private app: express.Application;
  private server: http.Server;
  private redisClient: any;
  private redisPub: any;

  constructor(private port: number = 3000) {
    this.app = express();
    this.server = http.createServer(this.app);
    this.io = new Server(this.server, {
      cors: {
        origin: "*",
        methods: ["GET", "POST"]
      }
    });
  }

  async initialize() {
    await this.setupRedisAdapter();
    this.setupMiddleware();
    this.setupEventHandlers();
  }

  private async setupRedisAdapter() {
    const redisUrl = process.env.REDIS_URL || 'redis://localhost:6379';
    
    this.redisClient = createClient({ url: redisUrl });
    this.redisPub = this.redisClient.duplicate();

    await Promise.all([
      this.redisClient.connect(),
      this.redisPub.connect()
    ]);

    // Use Redis adapter for scaling across multiple instances
    this.io.adapter(createAdapter(this.redisPub, this.redisClient));
    
    console.log('Redis adapter configured');
  }

  private setupMiddleware() {
    // Authentication middleware
    this.io.use((socket, next) => {
      const token = socket.handshake.auth.token;
      if (this.validateToken(token)) {
        socket.userId = this.getUserIdFromToken(token);
        next();
      } else {
        next(new Error('Authentication error'));
      }
    });

    // Rate limiting middleware
    this.io.use((socket, next) => {
      const rateLimiter = new Map();
      const userId = socket.userId;
      const now = Date.now();
      const windowMs = 60000; // 1 minute
      const maxRequests = 100;

      const userRequests = rateLimiter.get(userId) || [];
      const recentRequests = userRequests.filter((time: number) => now - time < windowMs);

      if (recentRequests.length >= maxRequests) {
        next(new Error('Rate limit exceeded'));
      } else {
        recentRequests.push(now);
        rateLimiter.set(userId, recentRequests);
        next();
      }
    });
  }

  private setupEventHandlers() {
    this.io.on('connection', (socket) => {
      console.log(`User ${socket.userId} connected on process ${process.pid}`);
      
      // Join user-specific room
      socket.join(`user:${socket.userId}`);
      
      // Handle joining chat rooms
      socket.on('join:room', (roomId: string) => {
        socket.join(`room:${roomId}`);
        socket.emit('joined:room', roomId);
        
        // Broadcast to room that user joined
        socket.to(`room:${roomId}`).emit('user:joined', {
          userId: socket.userId,
          timestamp: Date.now()
        });
      });

      // Handle leaving rooms
      socket.on('leave:room', (roomId: string) => {
        socket.leave(`room:${roomId}`);
        socket.to(`room:${roomId}`).emit('user:left', {
          userId: socket.userId,
          timestamp: Date.now()
        });
      });

      // Handle chat messages
      socket.on('message:send', (data: { roomId: string; message: string }) => {
        const messageData = {
          id: Date.now().toString(),
          userId: socket.userId,
          message: data.message,
          timestamp: Date.now()
        };

        // Broadcast to all clients in room (across all server instances)
        this.io.to(`room:${data.roomId}`).emit('message:received', messageData);
        
        // Store message in database or cache
        this.storeMessage(data.roomId, messageData);
      });

      // Handle private messages
      socket.on('message:private', (data: { targetUserId: string; message: string }) => {
        const messageData = {
          id: Date.now().toString(),
          fromUserId: socket.userId,
          message: data.message,
          timestamp: Date.now()
        };

        // Send to specific user (might be on different server instance)
        this.io.to(`user:${data.targetUserId}`).emit('message:private:received', messageData);
      });

      // Handle typing indicators
      socket.on('typing:start', (roomId: string) => {
        socket.to(`room:${roomId}`).emit('typing:user:start', socket.userId);
      });

      socket.on('typing:stop', (roomId: string) => {
        socket.to(`room:${roomId}`).emit('typing:user:stop', socket.userId);
      });

      // Handle disconnect
      socket.on('disconnect', () => {
        console.log(`User ${socket.userId} disconnected`);
      });
    });
  }

  private validateToken(token: string): boolean {
    // Implement JWT validation logic
    return token && token.length > 0;
  }

  private getUserIdFromToken(token: string): string {
    // Extract user ID from JWT token
    return token; // Simplified for example
  }

  private async storeMessage(roomId: string, messageData: any) {
    // Store in Redis or database
    await this.redisClient.lpush(`room:${roomId}:messages`, JSON.stringify(messageData));
    await this.redisClient.ltrim(`room:${roomId}:messages`, 0, 99); // Keep last 100 messages
  }

  public async start() {
    await this.initialize();
    
    this.server.listen(this.port, () => {
      console.log(`WebSocket server running on port ${this.port} (PID: ${process.pid})`);
    });
  }

  public async stop() {
    await Promise.all([
      this.redisClient?.quit(),
      this.redisPub?.quit()
    ]);
    this.server.close();
  }
}

export { ScalableWebSocketServer };

Step 3: Cluster Management

Create src/cluster.ts:

import cluster from 'cluster';
import os from 'os';
import { ScalableWebSocketServer } from './websocket-server';

class WebSocketCluster {
  private workers: number;

  constructor(workers: number = os.cpus().length) {
    this.workers = workers;
  }

  start() {
    if (cluster.isPrimary) {
      console.log(`Master process ${process.pid} is running`);
      console.log(`Starting ${this.workers} workers`);

      // Fork workers
      for (let i = 0; i < this.workers; i++) {
        const worker = cluster.fork();
        console.log(`Worker ${worker.process.pid} started`);
      }

      // Handle worker exits
      cluster.on('exit', (worker, code, signal) => {
        console.log(`Worker ${worker.process.pid} died with code ${code} and signal ${signal}`);
        console.log('Starting a new worker');
        cluster.fork();
      });

      // Graceful shutdown
      process.on('SIGINT', () => {
        console.log('Master received SIGINT, shutting down workers...');
        
        Object.values(cluster.workers!).forEach((worker) => {
          worker?.kill('SIGINT');
        });

        setTimeout(() => {
          process.exit(0);
        }, 5000);
      });

    } else {
      // Worker process
      const server = new ScalableWebSocketServer(process.env.PORT ? parseInt(process.env.PORT) : 3000);
      
      server.start().catch(console.error);

      // Handle graceful shutdown in worker
      process.on('SIGINT', async () => {
        console.log(`Worker ${process.pid} shutting down...`);
        await server.stop();
        process.exit(0);
      });
    }
  }
}

export { WebSocketCluster };

// Start cluster if this file is executed directly
if (require.main === module) {
  const cluster = new WebSocketCluster();
  cluster.start();
}

Step 4: Load Balancer Configuration

Nginx configuration for WebSocket load balancing:

upstream websocket_backend {
    least_conn;
    server 127.0.0.1:3000;
    server 127.0.0.1:3001;
    server 127.0.0.1:3002;
    server 127.0.0.1:3003;
}

server {
    listen 80;
    server_name your-domain.com;

    location /socket.io/ {
        proxy_pass http://websocket_backend;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        
        # Important for sticky sessions
        proxy_set_header X-Forwarded-Host $server_name;
        proxy_cache_bypass $http_upgrade;
    }
}

Step 5: Health Check and Monitoring

Create src/health-check.ts:

import { createClient } from 'redis';

class HealthChecker {
  private redisClient: any;

  async initialize() {
    this.redisClient = createClient({
      url: process.env.REDIS_URL || 'redis://localhost:6379'
    });
    await this.redisClient.connect();
  }

  async checkHealth() {
    const health = {
      status: 'healthy',
      timestamp: new Date().toISOString(),
      process: {
        pid: process.pid,
        uptime: process.uptime(),
        memory: process.memoryUsage()
      },
      redis: {
        connected: false,
        latency: 0
      }
    };

    try {
      const start = Date.now();
      await this.redisClient.ping();
      health.redis.connected = true;
      health.redis.latency = Date.now() - start;
    } catch (error) {
      health.status = 'unhealthy';
      health.redis.connected = false;
    }

    return health;
  }

  async getMetrics() {
    return {
      activeConnections: global.activeConnections || 0,
      totalMessages: await this.redisClient.get('total:messages') || 0,
      uptime: process.uptime()
    };
  }
}

export { HealthChecker };

Step 6: Client-Side Connection Management

import { io, Socket } from 'socket.io-client';

class WebSocketClient {
  private socket: Socket;
  private reconnectAttempts = 0;
  private maxReconnectAttempts = 5;

  constructor(private url: string, private token: string) {
    this.connect();
  }

  private connect() {
    this.socket = io(this.url, {
      auth: { token: this.token },
      transports: ['websocket', 'polling'],
      upgrade: true,
      reconnection: true,
      reconnectionAttempts: this.maxReconnectAttempts,
      reconnectionDelay: 1000,
      reconnectionDelayMax: 5000
    });

    this.setupEventListeners();
  }

  private setupEventListeners() {
    this.socket.on('connect', () => {
      console.log('Connected to WebSocket server');
      this.reconnectAttempts = 0;
    });

    this.socket.on('disconnect', (reason) => {
      console.log('Disconnected:', reason);
    });

    this.socket.on('reconnect', (attemptNumber) => {
      console.log('Reconnected after', attemptNumber, 'attempts');
    });

    this.socket.on('reconnect_error', (error) => {
      this.reconnectAttempts++;
      console.error('Reconnection failed:', error);
      
      if (this.reconnectAttempts >= this.maxReconnectAttempts) {
        console.error('Max reconnection attempts reached');
      }
    });
  }

  joinRoom(roomId: string) {
    this.socket.emit('join:room', roomId);
  }

  sendMessage(roomId: string, message: string) {
    this.socket.emit('message:send', { roomId, message });
  }

  onMessage(callback: (data: any) => void) {
    this.socket.on('message:received', callback);
  }

  disconnect() {
    this.socket.disconnect();
  }
}

export { WebSocketClient };

Summary

Scaling WebSockets with Redis and clustering enables horizontal scaling across multiple Node.js instances. Use Redis pub/sub for message broadcasting, implement proper load balancing with sticky sessions, and monitor connection health for optimal performance.


Share this post on:

Previous Post
Implementing GraphQL Federation with Apollo Server
Next Post
Building gRPC Services with Node.js and TypeScript