Scaling WebSocket Connections with Redis and Clustering
Introduction
Scaling WebSocket connections across multiple servers requires Redis pub/sub for message broadcasting and proper load balancing to maintain real-time communication.
Prerequisites
- Node.js >=14
- Redis server
- Socket.IO or ws library
Step 1: Install Dependencies
npm install socket.io socket.io-redis redis
npm install -D @types/socket.io @types/redis
Step 2: Basic WebSocket Server with Redis Adapter
Create src/websocket-server.ts
:
import { Server } from 'socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';
import http from 'http';
import express from 'express';
class ScalableWebSocketServer {
private io: Server;
private app: express.Application;
private server: http.Server;
private redisClient: any;
private redisPub: any;
constructor(private port: number = 3000) {
this.app = express();
this.server = http.createServer(this.app);
this.io = new Server(this.server, {
cors: {
origin: "*",
methods: ["GET", "POST"]
}
});
}
async initialize() {
await this.setupRedisAdapter();
this.setupMiddleware();
this.setupEventHandlers();
}
private async setupRedisAdapter() {
const redisUrl = process.env.REDIS_URL || 'redis://localhost:6379';
this.redisClient = createClient({ url: redisUrl });
this.redisPub = this.redisClient.duplicate();
await Promise.all([
this.redisClient.connect(),
this.redisPub.connect()
]);
// Use Redis adapter for scaling across multiple instances
this.io.adapter(createAdapter(this.redisPub, this.redisClient));
console.log('Redis adapter configured');
}
private setupMiddleware() {
// Authentication middleware
this.io.use((socket, next) => {
const token = socket.handshake.auth.token;
if (this.validateToken(token)) {
socket.userId = this.getUserIdFromToken(token);
next();
} else {
next(new Error('Authentication error'));
}
});
// Rate limiting middleware
this.io.use((socket, next) => {
const rateLimiter = new Map();
const userId = socket.userId;
const now = Date.now();
const windowMs = 60000; // 1 minute
const maxRequests = 100;
const userRequests = rateLimiter.get(userId) || [];
const recentRequests = userRequests.filter((time: number) => now - time < windowMs);
if (recentRequests.length >= maxRequests) {
next(new Error('Rate limit exceeded'));
} else {
recentRequests.push(now);
rateLimiter.set(userId, recentRequests);
next();
}
});
}
private setupEventHandlers() {
this.io.on('connection', (socket) => {
console.log(`User ${socket.userId} connected on process ${process.pid}`);
// Join user-specific room
socket.join(`user:${socket.userId}`);
// Handle joining chat rooms
socket.on('join:room', (roomId: string) => {
socket.join(`room:${roomId}`);
socket.emit('joined:room', roomId);
// Broadcast to room that user joined
socket.to(`room:${roomId}`).emit('user:joined', {
userId: socket.userId,
timestamp: Date.now()
});
});
// Handle leaving rooms
socket.on('leave:room', (roomId: string) => {
socket.leave(`room:${roomId}`);
socket.to(`room:${roomId}`).emit('user:left', {
userId: socket.userId,
timestamp: Date.now()
});
});
// Handle chat messages
socket.on('message:send', (data: { roomId: string; message: string }) => {
const messageData = {
id: Date.now().toString(),
userId: socket.userId,
message: data.message,
timestamp: Date.now()
};
// Broadcast to all clients in room (across all server instances)
this.io.to(`room:${data.roomId}`).emit('message:received', messageData);
// Store message in database or cache
this.storeMessage(data.roomId, messageData);
});
// Handle private messages
socket.on('message:private', (data: { targetUserId: string; message: string }) => {
const messageData = {
id: Date.now().toString(),
fromUserId: socket.userId,
message: data.message,
timestamp: Date.now()
};
// Send to specific user (might be on different server instance)
this.io.to(`user:${data.targetUserId}`).emit('message:private:received', messageData);
});
// Handle typing indicators
socket.on('typing:start', (roomId: string) => {
socket.to(`room:${roomId}`).emit('typing:user:start', socket.userId);
});
socket.on('typing:stop', (roomId: string) => {
socket.to(`room:${roomId}`).emit('typing:user:stop', socket.userId);
});
// Handle disconnect
socket.on('disconnect', () => {
console.log(`User ${socket.userId} disconnected`);
});
});
}
private validateToken(token: string): boolean {
// Implement JWT validation logic
return token && token.length > 0;
}
private getUserIdFromToken(token: string): string {
// Extract user ID from JWT token
return token; // Simplified for example
}
private async storeMessage(roomId: string, messageData: any) {
// Store in Redis or database
await this.redisClient.lpush(`room:${roomId}:messages`, JSON.stringify(messageData));
await this.redisClient.ltrim(`room:${roomId}:messages`, 0, 99); // Keep last 100 messages
}
public async start() {
await this.initialize();
this.server.listen(this.port, () => {
console.log(`WebSocket server running on port ${this.port} (PID: ${process.pid})`);
});
}
public async stop() {
await Promise.all([
this.redisClient?.quit(),
this.redisPub?.quit()
]);
this.server.close();
}
}
export { ScalableWebSocketServer };
Step 3: Cluster Management
Create src/cluster.ts
:
import cluster from 'cluster';
import os from 'os';
import { ScalableWebSocketServer } from './websocket-server';
class WebSocketCluster {
private workers: number;
constructor(workers: number = os.cpus().length) {
this.workers = workers;
}
start() {
if (cluster.isPrimary) {
console.log(`Master process ${process.pid} is running`);
console.log(`Starting ${this.workers} workers`);
// Fork workers
for (let i = 0; i < this.workers; i++) {
const worker = cluster.fork();
console.log(`Worker ${worker.process.pid} started`);
}
// Handle worker exits
cluster.on('exit', (worker, code, signal) => {
console.log(`Worker ${worker.process.pid} died with code ${code} and signal ${signal}`);
console.log('Starting a new worker');
cluster.fork();
});
// Graceful shutdown
process.on('SIGINT', () => {
console.log('Master received SIGINT, shutting down workers...');
Object.values(cluster.workers!).forEach((worker) => {
worker?.kill('SIGINT');
});
setTimeout(() => {
process.exit(0);
}, 5000);
});
} else {
// Worker process
const server = new ScalableWebSocketServer(process.env.PORT ? parseInt(process.env.PORT) : 3000);
server.start().catch(console.error);
// Handle graceful shutdown in worker
process.on('SIGINT', async () => {
console.log(`Worker ${process.pid} shutting down...`);
await server.stop();
process.exit(0);
});
}
}
}
export { WebSocketCluster };
// Start cluster if this file is executed directly
if (require.main === module) {
const cluster = new WebSocketCluster();
cluster.start();
}
Step 4: Load Balancer Configuration
Nginx configuration for WebSocket load balancing:
upstream websocket_backend {
least_conn;
server 127.0.0.1:3000;
server 127.0.0.1:3001;
server 127.0.0.1:3002;
server 127.0.0.1:3003;
}
server {
listen 80;
server_name your-domain.com;
location /socket.io/ {
proxy_pass http://websocket_backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# Important for sticky sessions
proxy_set_header X-Forwarded-Host $server_name;
proxy_cache_bypass $http_upgrade;
}
}
Step 5: Health Check and Monitoring
Create src/health-check.ts
:
import { createClient } from 'redis';
class HealthChecker {
private redisClient: any;
async initialize() {
this.redisClient = createClient({
url: process.env.REDIS_URL || 'redis://localhost:6379'
});
await this.redisClient.connect();
}
async checkHealth() {
const health = {
status: 'healthy',
timestamp: new Date().toISOString(),
process: {
pid: process.pid,
uptime: process.uptime(),
memory: process.memoryUsage()
},
redis: {
connected: false,
latency: 0
}
};
try {
const start = Date.now();
await this.redisClient.ping();
health.redis.connected = true;
health.redis.latency = Date.now() - start;
} catch (error) {
health.status = 'unhealthy';
health.redis.connected = false;
}
return health;
}
async getMetrics() {
return {
activeConnections: global.activeConnections || 0,
totalMessages: await this.redisClient.get('total:messages') || 0,
uptime: process.uptime()
};
}
}
export { HealthChecker };
Step 6: Client-Side Connection Management
import { io, Socket } from 'socket.io-client';
class WebSocketClient {
private socket: Socket;
private reconnectAttempts = 0;
private maxReconnectAttempts = 5;
constructor(private url: string, private token: string) {
this.connect();
}
private connect() {
this.socket = io(this.url, {
auth: { token: this.token },
transports: ['websocket', 'polling'],
upgrade: true,
reconnection: true,
reconnectionAttempts: this.maxReconnectAttempts,
reconnectionDelay: 1000,
reconnectionDelayMax: 5000
});
this.setupEventListeners();
}
private setupEventListeners() {
this.socket.on('connect', () => {
console.log('Connected to WebSocket server');
this.reconnectAttempts = 0;
});
this.socket.on('disconnect', (reason) => {
console.log('Disconnected:', reason);
});
this.socket.on('reconnect', (attemptNumber) => {
console.log('Reconnected after', attemptNumber, 'attempts');
});
this.socket.on('reconnect_error', (error) => {
this.reconnectAttempts++;
console.error('Reconnection failed:', error);
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error('Max reconnection attempts reached');
}
});
}
joinRoom(roomId: string) {
this.socket.emit('join:room', roomId);
}
sendMessage(roomId: string, message: string) {
this.socket.emit('message:send', { roomId, message });
}
onMessage(callback: (data: any) => void) {
this.socket.on('message:received', callback);
}
disconnect() {
this.socket.disconnect();
}
}
export { WebSocketClient };
Summary
Scaling WebSockets with Redis and clustering enables horizontal scaling across multiple Node.js instances. Use Redis pub/sub for message broadcasting, implement proper load balancing with sticky sessions, and monitor connection health for optimal performance.