Apache Flink Architecture
Technical Analysis
A comprehensive reference covering runtime internals, consistency models, performance tuning, and architectural trade-offs for production streaming stacks.
1.1 — Runtime Stack: JobManager, TaskManager & Slot Allocation
The Flink runtime is a two-tier distributed system: a single (HA-enabled) JobManager acts as the cluster master, while one or more TaskManagers execute the actual stream processing workloads. A lightweight Client translates the user program into a serialized JobGraph and submits it — it is not part of the runtime execution path.
The JM contains three components: ResourceManager (slot lifecycle), Dispatcher (REST API + WebUI, one-to-many JobMasters), and JobMaster (one per job: schedules ExecutionGraph, coordinates checkpoints, handles failures). In HA mode, multiple JMs run with leader election via ZooKeeper or Kubernetes.
Each TM is a JVM process executing one or more task slots. A slot is the minimum unit of resource scheduling. Slots share the TM's managed memory equally but do not provide CPU isolation. By default, slots within the same TM share a SlotSharingGroup, allowing different operator subtasks to co-locate — maximising utilisation without over-provisioning.
Production Insight: Set taskmanager.numberOfTaskSlots to match physical CPU cores. With fine-grained resource management (Flink 1.14+), you can request fractional slot profiles (e.g., 0.25 Core, 1 GB) per operator, eliminating wasteful uniform slot sizing for heterogeneous pipelines.
1.2 — Dataflow Graph Transformation Pipeline
Flink implements a four-stage compilation from user code to physical execution. Each stage applies progressively more physical reality — from logical semantics to actual parallelised tasks running on TaskManagers.
StreamGraph (Client-side)
Built from the DataStream API transformation chain. Each DataStream.map(), .keyBy(), etc. creates a StreamNode. Directed edges (StreamEdge) capture data flow. The StreamGraph is a logical plan — no physical resources assigned yet.
JobGraph (Operator Chaining — Client-side optimisation)
The StreamingJobGraphGenerator traverses the DAG breadth-first and identifies chainable operators (same parallelism, forwarding partitioner, no shuffle boundary). Chainable operators are merged into a single JobVertex, running in one thread with no serialization overhead between them. The result is submitted to the Dispatcher as a serialized blob with attached JARs.
ExecutionGraph (JobManager — parallel expansion)
The JobMaster inflates the JobGraph into a physical DAG. Each JobVertex with parallelism p spawns p ExecutionVertex instances. An operator with p=100 → 1 JobVertex, 100 ExecutionVertices. The CheckpointCoordinator, KvStateLocationRegistry, and failure recovery scaffolding are also initialised at this stage.
Physical Tasks (TaskManager — runtime)
The JobMaster maps each ExecutionVertex → Execution → deployed as a Task in a TM slot. The Task.invoke() method starts the operator loop. Each Execution has a unique ExecutionAttemptID to support retries and recovery across failure events.
// Client-side — DataStream API → StreamGraph → JobGraph
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
DataStream<Order> orders = env
.addSource(new FlinkKafkaConsumer<>("orders", schema, props)) // StreamNode 1
.map(Order::enrich) // chained → same JobVertex
.keyBy(Order::getCustomerId) // shuffle boundary → new JobVertex
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.reduce(OrderAgg::merge); // StreamNode 3
// Calling execute() triggers: StreamGraph → JobGraph → submit to JobManager
env.execute("Order Aggregation Pipeline");
1.3 — State Backends: HashMapStateBackend vs. EmbeddedRocksDB
Flink separates two orthogonal concerns: in-flight state storage (the State Backend) and durable checkpoint storage (the Checkpoint Storage). The former determines runtime access latency; the latter determines fault-tolerance durability.
| Property | HashMapStateBackend | EmbeddedRocksDBStateBackend |
|---|---|---|
| Storage location | JVM heap (Java HashMap) | Off-heap RocksDB on local disk |
| Max state size | Limited by heap (GC pressure) | Limited by local disk (TBs possible) |
| Read/Write cost | Nanoseconds (direct object access) | µs–ms (serialise + RocksDB lookup) |
| Incremental CP | No (full snapshots only)* | Yes (SSTable diffs) |
| GC sensitivity | High — objects on heap | Low — off-heap native memory |
| Object reuse | Unsafe (direct refs) | Safe (serialise/deserialise) |
| Best for | Small state (<100 MB), low-latency | Large stateful jobs, long windows, HA |
* Flink 1.18+ introduces generalised incremental CP via the Changelog State Backend (DSTL), enabling incremental snapshots for HashMapStateBackend.
RocksDB internals: State is stored in column families per Flink state descriptor. Keys are composite: keyGroup | key | namespace (serialised bytes). On incremental checkpoint, Flink leverages RocksDB's native checkpoint (hard-links to SSTables) to upload only new/changed SSTable files to durable storage (S3/HDFS), dramatically reducing checkpoint size for slowly-changing state.
1.4 — Chandy-Lamport Variant: Asynchronous Barrier Snapshotting (ABS)
Flink uses a variant of the classic Chandy-Lamport distributed snapshot algorithm, adapted for directed acyclic streaming topologies. The core innovation is the checkpoint barrier — a special control record injected into all source streams simultaneously by the CheckpointCoordinator.
Trigger
CheckpointCoordinator sends a triggerCheckpoint(cpId) RPC to all source tasks. Sources record their input offsets (e.g., Kafka partition offsets) and inject barrier B#N into their output streams.
Barrier Alignment (Aligned mode) / Unaligned Mode
In aligned mode, an operator waits for barriers from all input channels before snapshotting. Records arriving after a barrier on channel X but before alignment are buffered. In unaligned mode (Flink 1.11+, closer to original Chandy-Lamport), in-flight records are included in the checkpoint state, eliminating alignment stall at the cost of larger checkpoint size.
Async Snapshot
Synchronous phase: For HashMap backend — copy-on-write hash table snapshot (ms). For RocksDB — flush MemTable to disk, hard-link to SSTables (ms). Stream processing resumes. Asynchronous phase: Background thread uploads state to durable storage. Upload does not block processing.
Commit
Each operator sends an ACK with a StateHandle (pointer to durable state) to the Coordinator. Once all ACKs arrive, the Coordinator marks the checkpoint as complete. Transactional sinks then commit their pending writes (2PC commit phase).
# flink-conf.yaml — production checkpoint config
state.backend: rocksdb
state.backend.incremental: true
state.checkpoints.dir: s3://prod-bucket/flink-checkpoints
execution.checkpointing.interval: 60000 # 60s
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.timeout: 300000 # 5min max
execution.checkpointing.min-pause: 30000
state.backend.rocksdb.memory.managed: true
taskmanager.memory.managed.fraction: 0.4 # 40% for RocksDB
2.1 — Event Time vs. Processing Time
Flink distinguishes three notions of time. Only Event Time provides deterministic, reproducible results regardless of processing delays, network jitter, or replay scenarios.
Uses the wall-clock of the machine processing the event. Lowest latency, non-deterministic. Two identical jobs may produce different results if they process data at different wall-clock times. Suitable for monitoring dashboards tolerant of imprecision.
Lowest latencyNon-deterministicUses the timestamp embedded in the event payload (e.g., Kafka message timestamp or application-level field). Deterministic, reproducible, handles out-of-order events. Requires watermarks. Ideal for financial, IoT, fraud detection pipelines.
DeterministicRequires watermarks2.2 — Watermark Mechanics
A watermark W(t) is a monotonically increasing timestamp assertion: "all events with event-time < t have been observed". Watermarks flow as special records through the operator DAG. Every window operator and time-based join uses the watermark to decide when to close a window and emit results.
The idle partition problem: If one Kafka partition is idle, its watermark stays frozen at its last seen event. The global watermark (which is the minimum across all subtask watermarks) is pinned, blocking window triggering. Mitigation: use WatermarkStrategy.withIdleness(Duration.ofSeconds(30)) to mark idle sources and exclude them from the min computation.
// BoundedOutOfOrderness Watermark Strategy
WatermarkStrategy<Order> strategy = WatermarkStrategy
.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10)) // max expected lateness
.withTimestampAssigner((order, ts) -> order.getEventTimestamp())
.withIdleness(Duration.ofSeconds(30)); // unblock idle partitions
DataStream<Order> stream = env
.fromSource(kafkaSource, strategy, "Kafka Orders")
.keyBy(Order::getCustomerId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.allowedLateness(Time.minutes(2)) // re-fire window for late arrivals
.sideOutputLateData(lateTag) // route very-late to side output
.reduce(OrderAgg::merge);
2.3 — Exactly-Once Semantics & Two-Phase Commit
Flink's internal checkpoint mechanism guarantees exactly-once state updates within the processing graph. To extend this guarantee to external sinks (Kafka, JDBC, filesystem), Flink uses the Two-Phase Commit (2PC) protocol, tightly coupled to checkpoint lifecyle events.
beginTransaction()
Called at the start of each checkpoint interval. The sink opens a new Kafka transactional producer (or DB transaction). All downstream invoke() calls write records into this open transaction.
preCommit()
Called when the checkpoint barrier reaches the sink. The sink flushes buffered records and transitions the Kafka producer to the prepared state (no commit yet). The transaction handle is serialized into the checkpoint state via snapshotState().
commit()
Called only after the CheckpointCoordinator confirms the checkpoint is globally complete (all ACKs received). The Kafka transaction is committed, making records visible to consumers. If the job crashes between preCommit and commit, recovery replays from the last checkpoint and retries the commit (idempotent commit guarantee).
abort()
If the checkpoint fails for any reason, abort() rolls back the open Kafka transaction. No partially-processed records become visible.
// End-to-end exactly-once Kafka sink
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("kafka:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("output-enriched")
.setValueSerializationSchema(new SimpleStringSchema())
.build())
// Enables the 2PC protocol internally
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
// Must be unique per sink to survive restarts; ties tx IDs to checkpoint IDs
.setTransactionalIdPrefix("fraud-pipeline-sink-v2")
.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
String.valueOf(10 * 60 * 1000)) // < broker transaction.max.timeout.ms
.build();
3.1 — Native Streaming vs. Micro-batching
The fundamental architectural divergence stems from each framework's origin story: Spark was built for batch processing and retrofitted with streaming; Flink was designed as a streaming-first engine from day one.
⚡ Apache Flink — Native Streaming
- One-at-a-time model: Each record flows through the operator DAG as it arrives — no accumulation delay.
- Pipelined execution: Operators run continuously; no re-planning per "batch".
- Latency: Sub-millisecond to low-millisecond for simple pipelines.
- Backpressure: Propagated natively via credit-based network buffers — no external throttling needed.
- State: Per-key state maintained locally in operator; checkpointed asynchronously.
- Exactly-once: Operator-level, coordinated via checkpointing + 2PC sinks.
🔥 Apache Spark — Micro-batch (Structured Streaming)
- Micro-batch model: Data accumulated over a configurable interval (100ms–seconds) then processed as a mini-batch.
- Planning overhead: Each batch incurs Spark query planning, task serialization, and scheduling overhead.
- Latency: Typically 200ms–seconds, constrained by batch boundary wait.
- Continuous Processing (experimental): Epoch-based, approaches native streaming, but limited operator support.
- State: Managed via HDFS/RocksDB StateStore; updated at batch boundaries.
- Exactly-once: Via idempotent sinks + WAL or transactional writes at batch commit.
3.2 — Latency vs. Throughput Trade-offs
| Dimension | Apache Flink | Spark Structured Streaming | Winner |
|---|---|---|---|
| End-to-end latency | Single-digit to low-ms (pipelined) | 100ms–seconds (batch boundary) | Flink |
| Peak throughput | Very high (vectorisation limited) | Very high (vectorised Tungsten execution) | Spark (ETL) |
| Stateful complexity | Native keyed state, arbitrary operators | Limited stateful ops in batch model | Flink |
| Exactly-once | Operator-level, comprehensive | Sink-level, requires idempotency | Flink |
| CEP (pattern matching) | Native FlinkCEP library | Not natively supported | Flink |
| Ecosystem & libraries | Growing (Kafka, Iceberg, Paimon) | Mature (MLlib, Delta, SparkSQL) | Spark |
| Operational maturity | Strong on Kubernetes / Ververica | Dominant on Databricks / EMR | Context-dependent |
| Event time semantics | First-class, watermarks native | Supported but more complex to configure | Flink |
| Batch processing | Supported (bounded stream) | Excellent (primary use case) | Spark |
Spark RTM (Real-Time Mode, 2024+): Databricks introduced RTM in Structured Streaming to break the micro-batch barrier via record-at-a-time continuous processing. However, RTM still requires checkpoint coordination at epoch boundaries and doesn't yet match Flink's full operator-level stateful streaming capabilities. For the majority of low-latency, stateful production workloads, Flink retains a clear architectural advantage.
4.1 — Where Flink is the Clear Winner
The FlinkCEP library enables pattern detection across event sequences with gap conditions, quantifiers, and time constraints — in a single stateful operator. Use for fraud rings, network intrusion, anomaly detection.
Pattern<Transaction, ?> pattern =
Pattern.<Transaction>begin("high")
.where(t -> t.amount > 5000)
.followedBy("foreign")
.where(t -> !t.isHomeCountry())
.within(Time.minutes(10));
Temporal table joins and async I/O allow Flink to enrich every event against multi-TB RocksDB state or external databases with millisecond latency. Avoids a full shuffle join at each batch boundary.
Credit card fraud alerts, infrastructure health signals, and real-time bidding decisions all require sub-100ms end-to-end. Flink's pipelined, event-at-a-time model is the only production-ready choice at this latency tier.
4.2 — Kappa Architecture & Table API/SQL
The Kappa Architecture (Jay Kreps, 2014) discards the Lambda architecture's separate batch and speed layers. Instead, a single streaming engine handles both real-time and historical reprocessing — by replaying events from the durable log (Kafka).
Flink's Table API and SQL are the primary enablers of Kappa in production, providing a unified relational abstraction that executes identically on bounded (batch replay) and unbounded (live stream) data. A single SQL query can backfill historical results and continue processing live events without code changes.
-- Flink SQL: unified stream + batch query (Kappa pattern)
-- Works on live Kafka stream AND bounded Iceberg snapshot
CREATE TABLE orders (
order_id BIGINT,
customer_id STRING,
amount DECIMAL(10,2),
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
) WITH ('connector' = 'kafka', 'topic' = 'orders', ...);
SELECT
TUMBLE_START(event_time, INTERVAL '5' MINUTE) AS window_start,
customer_id,
SUM(amount) AS revenue,
COUNT(*) AS order_count
FROM orders
GROUP BY
TUMBLE(event_time, INTERVAL '5' MINUTE),
customer_id;
-- Identical query runs on Kafka (streaming) or Iceberg (batch backfill)
5.1 — Backpressure & Credit-Based Flow Control
Backpressure in Flink is a first-class, desirable signal — it indicates a downstream operator cannot consume data as fast as the upstream produces it. Rather than dropping data (like Storm) or relying on external throttling (like early Spark Streaming), Flink propagates backpressure naturally through its credit-based network layer.
Each network connection between TaskManagers uses Netty for transport. A receiver publishes credit announcements after recycling buffers. The sender only transmits data buffers equal to the announced credits, preventing a fast producer from overwhelming a slow consumer. This decouples Flink's application-level flow control from the underlying TCP window, so one slow task doesn't block unrelated tasks sharing the same TCP connection.
Diagnosing backpressure: In the Flink Web UI, identify the operator with high backpressure score closest to the sink — its downstream neighbour is the actual bottleneck (it is processing slowly, unable to recycle buffers fast enough). Key metrics: backPressuredTimeMsPerSecond, outPoolUsage, inPoolUsage. The bottleneck operator shows near-zero backpressure (it's working flat out) while all upstream operators show near-100% backpressure.
5.2 — Data Skew Strategies
Skew occurs when a keyBy() operation routes disproportionate event volumes to one or a few subtasks. One overloaded subtask becomes the global bottleneck, starving all upstream operators of buffer credits.
Append a random salt [0, N) to hot keys for a two-stage aggregation. Stage 1 aggregates locally per salted key; Stage 2 merges across salts.
// Stage 1: partition with salt
stream.keyBy(e -> e.key + "_"
+ (int)(Math.random() * 32))
.reduce(localAgg);
// Stage 2: merge partials
result.keyBy(e -> e.originalKey)
.reduce(globalMerge);
Buffer a configurable number of input records per key before flushing to state. Amortises the serialization cost of RocksDB state access across many events. Dramatically reduces state overhead under skew.
-- Enable mini-batch in Table API
SET 'table.exec.mini-batch.enabled'
= 'true';
SET 'table.exec.mini-batch.size'
= '5000';
SET 'table.exec.mini-batch.allow-latency'
= '200 ms';
SQL optimiser automatically rewrites COUNT(DISTINCT user_id) to use a two-phase approach with bucket keys, distributing hotspot load. Enable via table.optimizer.distinct-agg.split.enabled=true.
Increase parallelism of the skewed operator alone via per-operator .setParallelism(N). Combined with a good hash function, this distributes the key space more evenly. For external skew (one Kafka partition slow), use watermark alignment with withIdleness().
// Async I/O for slow external lookups — prevents blocking the stream
DataStream<EnrichedOrder> enriched = AsyncDataStream.unorderedWait(
orders,
new AsyncCustomerLookup(redisClient),
500, TimeUnit.MILLISECONDS, // timeout per async request
100 // max concurrent in-flight requests
);
// RocksDB tuning for write-heavy workloads
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true);
backend.setRocksDBOptions(opts -> opts
.setWriteBufferSize(64 * 1024 * 1024) // 64 MB memtable
.setMaxWriteBufferNumber(3)
.setTargetFileSizeBase(64 * 1024 * 1024)); // SSTable size
env.setStateBackend(backend);
5.3 — Memory Configuration Quick Reference
| Memory Pool | Config Key | Role | Recommended |
|---|---|---|---|
| JVM Heap (user code) | taskmanager.memory.task.heap.size | User-defined functions, Java objects | 256 MB – 4 GB |
| Managed Memory | taskmanager.memory.managed.fraction | RocksDB block cache + write buffers | 0.4 (40% total) |
| Network Buffers | taskmanager.memory.network.fraction | Netty I/O buffers, credit pool | 0.1 (10% total) |
| Off-Heap Direct | taskmanager.memory.task.off-heap.size | Flink framework internal | 128 MB |
| JVM Overhead | taskmanager.memory.jvm-overhead.fraction | JVM metaspace, stacks, native | 0.1 |
All diagrams and analyses based on Apache Flink 1.19/2.0 documentation and primary research sources.