Skip to content
Go back

Real-Time Data Processing with Apache Flink and Kafka

Real-Time Data Processing with Apache Flink and Kafka

Introduction

Apache Flink provides low-latency stream processing with exactly-once semantics. This guide covers building real-time data pipelines with Kafka integration.

Prerequisites

Create Maven project structure:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
         http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>flink-kafka-processor</artifactId>
    <version>1.0.0</version>
    <packaging>jar</packaging>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <flink.version>1.18.0</flink.version>
        <kafka.version>3.5.0</kafka.version>
        <scala.binary.version>2.12</scala.binary.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>
        
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>
        
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.15.2</version>
        </dependency>
    </dependencies>
</project>

Step 2: Define Event Models

Create event POJOs:

package com.example.events;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.Instant;

public class UserEvent {
    @JsonProperty("userId")
    public String userId;
    
    @JsonProperty("eventType")
    public String eventType;
    
    @JsonProperty("timestamp")
    public long timestamp;
    
    @JsonProperty("sessionId")
    public String sessionId;
    
    @JsonProperty("properties")
    public java.util.Map<String, Object> properties;
    
    // Constructors
    public UserEvent() {}
    
    public UserEvent(String userId, String eventType, String sessionId) {
        this.userId = userId;
        this.eventType = eventType;
        this.sessionId = sessionId;
        this.timestamp = Instant.now().toEpochMilli();
        this.properties = new java.util.HashMap<>();
    }
    
    // Getters and setters
    public String getUserId() { return userId; }
    public void setUserId(String userId) { this.userId = userId; }
    
    public String getEventType() { return eventType; }
    public void setEventType(String eventType) { this.eventType = eventType; }
    
    public long getTimestamp() { return timestamp; }
    public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
    
    public String getSessionId() { return sessionId; }
    public void setSessionId(String sessionId) { this.sessionId = sessionId; }
}
package com.example.events;

public class SessionMetrics {
    public String sessionId;
    public String userId;
    public long startTime;
    public long endTime;
    public int eventCount;
    public long duration;
    
    public SessionMetrics() {}
    
    public SessionMetrics(String sessionId, String userId) {
        this.sessionId = sessionId;
        this.userId = userId;
        this.eventCount = 0;
        this.startTime = Long.MAX_VALUE;
        this.endTime = Long.MIN_VALUE;
    }
    
    public void addEvent(UserEvent event) {
        this.eventCount++;
        this.startTime = Math.min(this.startTime, event.timestamp);
        this.endTime = Math.max(this.endTime, event.timestamp);
        this.duration = this.endTime - this.startTime;
    }
}

Step 3: Kafka Source Configuration

Create Kafka source:

package com.example.sources;

import com.example.events.UserEvent;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.formats.json.JsonDeserializationSchema;

import java.io.IOException;

public class KafkaSourceFactory {
    
    public static KafkaSource<UserEvent> createUserEventSource() {
        return KafkaSource.<UserEvent>builder()
                .setBootstrapServers("localhost:9092")
                .setTopics("user-events")
                .setGroupId("flink-consumer-group")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new JsonDeserializationSchema<>(UserEvent.class))
                .setProperty("partition.discovery.interval.ms", "10000")
                .build();
    }
    
    public static KafkaSource<String> createRawEventSource() {
        return KafkaSource.<String>builder()
                .setBootstrapServers("localhost:9092")
                .setTopics("raw-events")
                .setGroupId("flink-raw-consumer")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();
    }
    
    private static class SimpleStringSchema implements DeserializationSchema<String> {
        @Override
        public String deserialize(byte[] message) throws IOException {
            return new String(message);
        }
        
        @Override
        public boolean isEndOfStream(String nextElement) {
            return false;
        }
        
        @Override
        public TypeInformation<String> getProducedType() {
            return TypeInformation.of(String.class);
        }
    }
}

Step 4: Stream Processing Job

Create main processing job:

package com.example.jobs;

import com.example.events.UserEvent;
import com.example.events.SessionMetrics;
import com.example.sources.KafkaSourceFactory;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class UserSessionAnalyzer {
    
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Configure checkpointing for fault tolerance
        env.enableCheckpointing(5000);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        
        // Create Kafka source
        DataStream<UserEvent> userEvents = env.fromSource(
            KafkaSourceFactory.createUserEventSource(),
            WatermarkStrategy.<UserEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner((event, timestamp) -> event.timestamp),
            "user-events-source"
        );
        
        // Filter and transform events
        DataStream<UserEvent> validEvents = userEvents
                .filter(event -> event.userId != null && !event.userId.isEmpty())
                .filter(event -> event.sessionId != null && !event.sessionId.isEmpty());
        
        // Session-based aggregation
        DataStream<SessionMetrics> sessionMetrics = validEvents
                .keyBy(event -> event.sessionId)
                .window(TumblingEventTimeWindows.of(Time.minutes(5)))
                .aggregate(new SessionAggregator(), new SessionWindowProcessor());
        
        // Real-time metrics calculation
        DataStream<Tuple2<String, Long>> eventCounts = validEvents
                .keyBy(event -> event.eventType)
                .window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10)))
                .aggregate(new CountAggregator());
        
        // Write results to Kafka
        KafkaSink<String> sessionSink = KafkaSink.<String>builder()
                .setBootstrapServers("localhost:9092")
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("session-metrics")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build())
                .build();
        
        sessionMetrics
                .map(metrics -> String.format(
                    "{\"sessionId\":\"%s\",\"userId\":\"%s\",\"eventCount\":%d,\"duration\":%d}",
                    metrics.sessionId, metrics.userId, metrics.eventCount, metrics.duration))
                .sinkTo(sessionSink);
        
        // Execute the job
        env.execute("User Session Analyzer");
    }
    
    // Aggregate function for session metrics
    private static class SessionAggregator implements AggregateFunction<UserEvent, SessionMetrics, SessionMetrics> {
        @Override
        public SessionMetrics createAccumulator() {
            return new SessionMetrics();
        }
        
        @Override
        public SessionMetrics add(UserEvent event, SessionMetrics accumulator) {
            if (accumulator.sessionId == null) {
                accumulator.sessionId = event.sessionId;
                accumulator.userId = event.userId;
            }
            accumulator.addEvent(event);
            return accumulator;
        }
        
        @Override
        public SessionMetrics getResult(SessionMetrics accumulator) {
            return accumulator;
        }
        
        @Override
        public SessionMetrics merge(SessionMetrics a, SessionMetrics b) {
            // Merge logic for combining accumulators
            SessionMetrics merged = new SessionMetrics(a.sessionId, a.userId);
            merged.eventCount = a.eventCount + b.eventCount;
            merged.startTime = Math.min(a.startTime, b.startTime);
            merged.endTime = Math.max(a.endTime, b.endTime);
            merged.duration = merged.endTime - merged.startTime;
            return merged;
        }
    }
    
    // Process window function for session processing
    private static class SessionWindowProcessor extends ProcessWindowFunction<SessionMetrics, SessionMetrics, String, TimeWindow> {
        @Override
        public void process(String key, Context context, 
                          Iterable<SessionMetrics> elements, 
                          Collector<SessionMetrics> out) {
            SessionMetrics metrics = elements.iterator().next();
            // Add window information
            metrics.startTime = context.window().getStart();
            metrics.endTime = context.window().getEnd();
            out.collect(metrics);
        }
    }
    
    // Count aggregator for event types
    private static class CountAggregator implements AggregateFunction<UserEvent, Long, Tuple2<String, Long>> {
        @Override
        public Long createAccumulator() {
            return 0L;
        }
        
        @Override
        public Long add(UserEvent event, Long accumulator) {
            return accumulator + 1;
        }
        
        @Override
        public Tuple2<String, Long> getResult(Long accumulator) {
            return new Tuple2<>("count", accumulator);
        }
        
        @Override
        public Long merge(Long a, Long b) {
            return a + b;
        }
    }
}

Step 5: Complex Event Processing

Create pattern detection:

package com.example.jobs;

import com.example.events.UserEvent;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.util.List;
import java.util.Map;

public class FraudDetectionJob {
    
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        DataStream<UserEvent> userEvents = env.fromSource(
            KafkaSourceFactory.createUserEventSource(),
            WatermarkStrategy.<UserEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner((event, timestamp) -> event.timestamp),
            "user-events-source"
        );
        
        // Filter login events
        DataStream<UserEvent> loginEvents = userEvents
                .filter(event -> "LOGIN".equals(event.eventType));
        
        // Define fraud pattern: Multiple failed logins followed by successful login
        Pattern<UserEvent, ?> fraudPattern = Pattern.<UserEvent>begin("failed_logins")
                .where(new SimpleCondition<UserEvent>() {
                    @Override
                    public boolean filter(UserEvent event) {
                        return event.properties.containsKey("status") && 
                               "FAILED".equals(event.properties.get("status"));
                    }
                })
                .times(3).consecutive()
                .within(Time.minutes(5))
                .next("successful_login")
                .where(new SimpleCondition<UserEvent>() {
                    @Override
                    public boolean filter(UserEvent event) {
                        return event.properties.containsKey("status") && 
                               "SUCCESS".equals(event.properties.get("status"));
                    }
                })
                .within(Time.minutes(1));
        
        // Apply pattern to the stream
        PatternStream<UserEvent> patternStream = CEP.pattern(
            loginEvents.keyBy(event -> event.userId), 
            fraudPattern
        );
        
        // Process matched patterns
        DataStream<String> alerts = patternStream.process(
            new PatternProcessFunction<UserEvent, String>() {
                @Override
                public void processMatch(Map<String, List<UserEvent>> match, 
                                       Context ctx, 
                                       Collector<String> out) {
                    List<UserEvent> failedLogins = match.get("failed_logins");
                    List<UserEvent> successfulLogin = match.get("successful_login");
                    
                    String alert = String.format(
                        "FRAUD ALERT: User %s had %d failed logins followed by successful login at %d",
                        failedLogins.get(0).userId,
                        failedLogins.size(),
                        successfulLogin.get(0).timestamp
                    );
                    
                    out.collect(alert);
                }
            }
        );
        
        // Output alerts
        alerts.print("FRAUD_ALERT");
        
        env.execute("Fraud Detection Job");
    }
}

Step 6: State Management

Create stateful processing:

package com.example.functions;

import com.example.events.UserEvent;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

public class UserProfileEnricher extends KeyedProcessFunction<String, UserEvent, UserEvent> {
    
    private ValueState<UserProfile> profileState;
    
    @Override
    public void open(Configuration parameters) {
        profileState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("user-profile", UserProfile.class)
        );
    }
    
    @Override
    public void processElement(UserEvent event, Context ctx, Collector<UserEvent> out) throws Exception {
        UserProfile profile = profileState.value();
        
        if (profile == null) {
            profile = new UserProfile(event.userId);
        }
        
        // Update profile based on event
        profile.updateFromEvent(event);
        
        // Enrich event with profile data
        event.properties.put("userSegment", profile.getSegment());
        event.properties.put("lifetimeValue", profile.getLifetimeValue());
        
        profileState.update(profile);
        out.collect(event);
    }
    
    private static class UserProfile {
        String userId;
        int eventCount;
        long firstSeen;
        long lastSeen;
        double lifetimeValue;
        
        public UserProfile(String userId) {
            this.userId = userId;
            this.eventCount = 0;
            this.firstSeen = System.currentTimeMillis();
            this.lifetimeValue = 0.0;
        }
        
        public void updateFromEvent(UserEvent event) {
            this.eventCount++;
            this.lastSeen = event.timestamp;
            
            if (event.properties.containsKey("value")) {
                this.lifetimeValue += Double.parseDouble(event.properties.get("value").toString());
            }
        }
        
        public String getSegment() {
            if (lifetimeValue > 1000) return "PREMIUM";
            if (lifetimeValue > 100) return "STANDARD";
            return "BASIC";
        }
        
        public double getLifetimeValue() {
            return lifetimeValue;
        }
    }
}

Summary

Apache Flink provides low-latency stream processing with Kafka integration for real-time analytics. Use windowing for time-based aggregations, CEP for pattern detection, and state management for enriching events with user profiles.


Share this post on:

Previous Post
Advanced Monitoring with Prometheus and AlertManager
Next Post
Event-Driven Architecture with Apache Kafka and Node.js