Real-Time Data Processing with Apache Flink and Kafka
Introduction
Apache Flink provides low-latency stream processing with exactly-once semantics. This guide covers building real-time data pipelines with Kafka integration.
Prerequisites
- Apache Flink cluster
- Apache Kafka cluster
- Java 11+
- Maven or Gradle
Step 1: Setup Flink Project
Create Maven project structure:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>flink-kafka-processor</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<flink.version>1.18.0</flink.version>
<kafka.version>3.5.0</kafka.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
</dependencies>
</project>
Step 2: Define Event Models
Create event POJOs:
package com.example.events;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.Instant;
public class UserEvent {
@JsonProperty("userId")
public String userId;
@JsonProperty("eventType")
public String eventType;
@JsonProperty("timestamp")
public long timestamp;
@JsonProperty("sessionId")
public String sessionId;
@JsonProperty("properties")
public java.util.Map<String, Object> properties;
// Constructors
public UserEvent() {}
public UserEvent(String userId, String eventType, String sessionId) {
this.userId = userId;
this.eventType = eventType;
this.sessionId = sessionId;
this.timestamp = Instant.now().toEpochMilli();
this.properties = new java.util.HashMap<>();
}
// Getters and setters
public String getUserId() { return userId; }
public void setUserId(String userId) { this.userId = userId; }
public String getEventType() { return eventType; }
public void setEventType(String eventType) { this.eventType = eventType; }
public long getTimestamp() { return timestamp; }
public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
public String getSessionId() { return sessionId; }
public void setSessionId(String sessionId) { this.sessionId = sessionId; }
}
package com.example.events;
public class SessionMetrics {
public String sessionId;
public String userId;
public long startTime;
public long endTime;
public int eventCount;
public long duration;
public SessionMetrics() {}
public SessionMetrics(String sessionId, String userId) {
this.sessionId = sessionId;
this.userId = userId;
this.eventCount = 0;
this.startTime = Long.MAX_VALUE;
this.endTime = Long.MIN_VALUE;
}
public void addEvent(UserEvent event) {
this.eventCount++;
this.startTime = Math.min(this.startTime, event.timestamp);
this.endTime = Math.max(this.endTime, event.timestamp);
this.duration = this.endTime - this.startTime;
}
}
Step 3: Kafka Source Configuration
Create Kafka source:
package com.example.sources;
import com.example.events.UserEvent;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.formats.json.JsonDeserializationSchema;
import java.io.IOException;
public class KafkaSourceFactory {
public static KafkaSource<UserEvent> createUserEventSource() {
return KafkaSource.<UserEvent>builder()
.setBootstrapServers("localhost:9092")
.setTopics("user-events")
.setGroupId("flink-consumer-group")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new JsonDeserializationSchema<>(UserEvent.class))
.setProperty("partition.discovery.interval.ms", "10000")
.build();
}
public static KafkaSource<String> createRawEventSource() {
return KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("raw-events")
.setGroupId("flink-raw-consumer")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
}
private static class SimpleStringSchema implements DeserializationSchema<String> {
@Override
public String deserialize(byte[] message) throws IOException {
return new String(message);
}
@Override
public boolean isEndOfStream(String nextElement) {
return false;
}
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
}
}
Step 4: Stream Processing Job
Create main processing job:
package com.example.jobs;
import com.example.events.UserEvent;
import com.example.events.SessionMetrics;
import com.example.sources.KafkaSourceFactory;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
public class UserSessionAnalyzer {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Configure checkpointing for fault tolerance
env.enableCheckpointing(5000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(60000);
// Create Kafka source
DataStream<UserEvent> userEvents = env.fromSource(
KafkaSourceFactory.createUserEventSource(),
WatermarkStrategy.<UserEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.timestamp),
"user-events-source"
);
// Filter and transform events
DataStream<UserEvent> validEvents = userEvents
.filter(event -> event.userId != null && !event.userId.isEmpty())
.filter(event -> event.sessionId != null && !event.sessionId.isEmpty());
// Session-based aggregation
DataStream<SessionMetrics> sessionMetrics = validEvents
.keyBy(event -> event.sessionId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new SessionAggregator(), new SessionWindowProcessor());
// Real-time metrics calculation
DataStream<Tuple2<String, Long>> eventCounts = validEvents
.keyBy(event -> event.eventType)
.window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10)))
.aggregate(new CountAggregator());
// Write results to Kafka
KafkaSink<String> sessionSink = KafkaSink.<String>builder()
.setBootstrapServers("localhost:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("session-metrics")
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.build();
sessionMetrics
.map(metrics -> String.format(
"{\"sessionId\":\"%s\",\"userId\":\"%s\",\"eventCount\":%d,\"duration\":%d}",
metrics.sessionId, metrics.userId, metrics.eventCount, metrics.duration))
.sinkTo(sessionSink);
// Execute the job
env.execute("User Session Analyzer");
}
// Aggregate function for session metrics
private static class SessionAggregator implements AggregateFunction<UserEvent, SessionMetrics, SessionMetrics> {
@Override
public SessionMetrics createAccumulator() {
return new SessionMetrics();
}
@Override
public SessionMetrics add(UserEvent event, SessionMetrics accumulator) {
if (accumulator.sessionId == null) {
accumulator.sessionId = event.sessionId;
accumulator.userId = event.userId;
}
accumulator.addEvent(event);
return accumulator;
}
@Override
public SessionMetrics getResult(SessionMetrics accumulator) {
return accumulator;
}
@Override
public SessionMetrics merge(SessionMetrics a, SessionMetrics b) {
// Merge logic for combining accumulators
SessionMetrics merged = new SessionMetrics(a.sessionId, a.userId);
merged.eventCount = a.eventCount + b.eventCount;
merged.startTime = Math.min(a.startTime, b.startTime);
merged.endTime = Math.max(a.endTime, b.endTime);
merged.duration = merged.endTime - merged.startTime;
return merged;
}
}
// Process window function for session processing
private static class SessionWindowProcessor extends ProcessWindowFunction<SessionMetrics, SessionMetrics, String, TimeWindow> {
@Override
public void process(String key, Context context,
Iterable<SessionMetrics> elements,
Collector<SessionMetrics> out) {
SessionMetrics metrics = elements.iterator().next();
// Add window information
metrics.startTime = context.window().getStart();
metrics.endTime = context.window().getEnd();
out.collect(metrics);
}
}
// Count aggregator for event types
private static class CountAggregator implements AggregateFunction<UserEvent, Long, Tuple2<String, Long>> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(UserEvent event, Long accumulator) {
return accumulator + 1;
}
@Override
public Tuple2<String, Long> getResult(Long accumulator) {
return new Tuple2<>("count", accumulator);
}
@Override
public Long merge(Long a, Long b) {
return a + b;
}
}
}
Step 5: Complex Event Processing
Create pattern detection:
package com.example.jobs;
import com.example.events.UserEvent;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import java.util.List;
import java.util.Map;
public class FraudDetectionJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<UserEvent> userEvents = env.fromSource(
KafkaSourceFactory.createUserEventSource(),
WatermarkStrategy.<UserEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.timestamp),
"user-events-source"
);
// Filter login events
DataStream<UserEvent> loginEvents = userEvents
.filter(event -> "LOGIN".equals(event.eventType));
// Define fraud pattern: Multiple failed logins followed by successful login
Pattern<UserEvent, ?> fraudPattern = Pattern.<UserEvent>begin("failed_logins")
.where(new SimpleCondition<UserEvent>() {
@Override
public boolean filter(UserEvent event) {
return event.properties.containsKey("status") &&
"FAILED".equals(event.properties.get("status"));
}
})
.times(3).consecutive()
.within(Time.minutes(5))
.next("successful_login")
.where(new SimpleCondition<UserEvent>() {
@Override
public boolean filter(UserEvent event) {
return event.properties.containsKey("status") &&
"SUCCESS".equals(event.properties.get("status"));
}
})
.within(Time.minutes(1));
// Apply pattern to the stream
PatternStream<UserEvent> patternStream = CEP.pattern(
loginEvents.keyBy(event -> event.userId),
fraudPattern
);
// Process matched patterns
DataStream<String> alerts = patternStream.process(
new PatternProcessFunction<UserEvent, String>() {
@Override
public void processMatch(Map<String, List<UserEvent>> match,
Context ctx,
Collector<String> out) {
List<UserEvent> failedLogins = match.get("failed_logins");
List<UserEvent> successfulLogin = match.get("successful_login");
String alert = String.format(
"FRAUD ALERT: User %s had %d failed logins followed by successful login at %d",
failedLogins.get(0).userId,
failedLogins.size(),
successfulLogin.get(0).timestamp
);
out.collect(alert);
}
}
);
// Output alerts
alerts.print("FRAUD_ALERT");
env.execute("Fraud Detection Job");
}
}
Step 6: State Management
Create stateful processing:
package com.example.functions;
import com.example.events.UserEvent;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
public class UserProfileEnricher extends KeyedProcessFunction<String, UserEvent, UserEvent> {
private ValueState<UserProfile> profileState;
@Override
public void open(Configuration parameters) {
profileState = getRuntimeContext().getState(
new ValueStateDescriptor<>("user-profile", UserProfile.class)
);
}
@Override
public void processElement(UserEvent event, Context ctx, Collector<UserEvent> out) throws Exception {
UserProfile profile = profileState.value();
if (profile == null) {
profile = new UserProfile(event.userId);
}
// Update profile based on event
profile.updateFromEvent(event);
// Enrich event with profile data
event.properties.put("userSegment", profile.getSegment());
event.properties.put("lifetimeValue", profile.getLifetimeValue());
profileState.update(profile);
out.collect(event);
}
private static class UserProfile {
String userId;
int eventCount;
long firstSeen;
long lastSeen;
double lifetimeValue;
public UserProfile(String userId) {
this.userId = userId;
this.eventCount = 0;
this.firstSeen = System.currentTimeMillis();
this.lifetimeValue = 0.0;
}
public void updateFromEvent(UserEvent event) {
this.eventCount++;
this.lastSeen = event.timestamp;
if (event.properties.containsKey("value")) {
this.lifetimeValue += Double.parseDouble(event.properties.get("value").toString());
}
}
public String getSegment() {
if (lifetimeValue > 1000) return "PREMIUM";
if (lifetimeValue > 100) return "STANDARD";
return "BASIC";
}
public double getLifetimeValue() {
return lifetimeValue;
}
}
}
Summary
Apache Flink provides low-latency stream processing with Kafka integration for real-time analytics. Use windowing for time-based aggregations, CEP for pattern detection, and state management for enriching events with user profiles.