Production Engineering Deep-Dive

Apache Flink Architecture
Technical Analysis

A comprehensive reference covering runtime internals, consistency models, performance tuning, and architectural trade-offs for production streaming stacks.

Flink 1.19 / 2.0 — 2025
5 Technical Sections
Production-Level Focus
Section 01
Core Architecture & Internals

1.1 — Runtime Stack: JobManager, TaskManager & Slot Allocation

The Flink runtime is a two-tier distributed system: a single (HA-enabled) JobManager acts as the cluster master, while one or more TaskManagers execute the actual stream processing workloads. A lightweight Client translates the user program into a serialized JobGraph and submits it — it is not part of the runtime execution path.

Flink Runtime Architecture — Process Interaction Model
Client JobGraph build submit JobManager ResourceManager slot de/allocation Dispatcher REST interface + WebUI JobMaster per-job scheduling + CP coord. schedule tasks TaskManager 1 (JVM Process) Slot 0 Op chain A→B Heap: 1/3 TM Slot 1 Op chain C Heap: 1/3 TM Slot 2 FREE Heap: 1/3 TM Network buffers (Netty) + Off-Heap Managed Memory Credit-based flow control between slots TaskManager 2 (JVM Process) Slot 0 Op chain D→Sink Slot 1 Op chain D→Sink Netty TCP
JobManager Internals

The JM contains three components: ResourceManager (slot lifecycle), Dispatcher (REST API + WebUI, one-to-many JobMasters), and JobMaster (one per job: schedules ExecutionGraph, coordinates checkpoints, handles failures). In HA mode, multiple JMs run with leader election via ZooKeeper or Kubernetes.

TaskManager & Slot Model

Each TM is a JVM process executing one or more task slots. A slot is the minimum unit of resource scheduling. Slots share the TM's managed memory equally but do not provide CPU isolation. By default, slots within the same TM share a SlotSharingGroup, allowing different operator subtasks to co-locate — maximising utilisation without over-provisioning.

💡

Production Insight: Set taskmanager.numberOfTaskSlots to match physical CPU cores. With fine-grained resource management (Flink 1.14+), you can request fractional slot profiles (e.g., 0.25 Core, 1 GB) per operator, eliminating wasteful uniform slot sizing for heterogeneous pipelines.

1.2 — Dataflow Graph Transformation Pipeline

Flink implements a four-stage compilation from user code to physical execution. Each stage applies progressively more physical reality — from logical semantics to actual parallelised tasks running on TaskManagers.

StreamGraph → JobGraph → ExecutionGraph → Physical Tasks
StreamGraph • StreamNodes • StreamEdges (DAG) • DataStream API ops • No chaining yet chain JobGraph • JobVertex (chained ops) • JobEdge (partitions) • Parallelism set • SlotSharingGroups expand ExecutionGraph • ExecutionVertex ×p • IntermResultPartition • CheckpointCoordinator • ExecutionAttemptIDs deploy Physical Tasks • Task (StreamTask) • invoke() loop • Assigned to Slots • Netty I/O threads
1

StreamGraph (Client-side)

Built from the DataStream API transformation chain. Each DataStream.map(), .keyBy(), etc. creates a StreamNode. Directed edges (StreamEdge) capture data flow. The StreamGraph is a logical plan — no physical resources assigned yet.

2

JobGraph (Operator Chaining — Client-side optimisation)

The StreamingJobGraphGenerator traverses the DAG breadth-first and identifies chainable operators (same parallelism, forwarding partitioner, no shuffle boundary). Chainable operators are merged into a single JobVertex, running in one thread with no serialization overhead between them. The result is submitted to the Dispatcher as a serialized blob with attached JARs.

3

ExecutionGraph (JobManager — parallel expansion)

The JobMaster inflates the JobGraph into a physical DAG. Each JobVertex with parallelism p spawns p ExecutionVertex instances. An operator with p=100 → 1 JobVertex, 100 ExecutionVertices. The CheckpointCoordinator, KvStateLocationRegistry, and failure recovery scaffolding are also initialised at this stage.

4

Physical Tasks (TaskManager — runtime)

The JobMaster maps each ExecutionVertexExecution → deployed as a Task in a TM slot. The Task.invoke() method starts the operator loop. Each Execution has a unique ExecutionAttemptID to support retries and recovery across failure events.

// Client-side — DataStream API → StreamGraph → JobGraph
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);

DataStream<Order> orders = env
    .addSource(new FlinkKafkaConsumer<>("orders", schema, props))  // StreamNode 1
    .map(Order::enrich)                                         // chained → same JobVertex
    .keyBy(Order::getCustomerId)                               // shuffle boundary → new JobVertex
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .reduce(OrderAgg::merge);                                   // StreamNode 3

// Calling execute() triggers: StreamGraph → JobGraph → submit to JobManager
env.execute("Order Aggregation Pipeline");

1.3 — State Backends: HashMapStateBackend vs. EmbeddedRocksDB

Flink separates two orthogonal concerns: in-flight state storage (the State Backend) and durable checkpoint storage (the Checkpoint Storage). The former determines runtime access latency; the latter determines fault-tolerance durability.

Property HashMapStateBackend EmbeddedRocksDBStateBackend
Storage locationJVM heap (Java HashMap)Off-heap RocksDB on local disk
Max state sizeLimited by heap (GC pressure)Limited by local disk (TBs possible)
Read/Write costNanoseconds (direct object access)µs–ms (serialise + RocksDB lookup)
Incremental CPNo (full snapshots only)*Yes (SSTable diffs)
GC sensitivityHigh — objects on heapLow — off-heap native memory
Object reuseUnsafe (direct refs)Safe (serialise/deserialise)
Best forSmall state (<100 MB), low-latencyLarge stateful jobs, long windows, HA

* Flink 1.18+ introduces generalised incremental CP via the Changelog State Backend (DSTL), enabling incremental snapshots for HashMapStateBackend.

🔍

RocksDB internals: State is stored in column families per Flink state descriptor. Keys are composite: keyGroup | key | namespace (serialised bytes). On incremental checkpoint, Flink leverages RocksDB's native checkpoint (hard-links to SSTables) to upload only new/changed SSTable files to durable storage (S3/HDFS), dramatically reducing checkpoint size for slowly-changing state.

1.4 — Chandy-Lamport Variant: Asynchronous Barrier Snapshotting (ABS)

Flink uses a variant of the classic Chandy-Lamport distributed snapshot algorithm, adapted for directed acyclic streaming topologies. The core innovation is the checkpoint barrier — a special control record injected into all source streams simultaneously by the CheckpointCoordinator.

ABS Algorithm — Barrier Flow & State Snapshot Phases
Checkpoint Coordinator (JobManager) trigger CP#N Source 1 records offset → Source 2 records offset → ▶ B#N ▶ B#N Operator ALIGN barriers from all inputs → snapshot state async ↗ S3 async upload S3 / HDFS state handles ACK(state_handle) → Coordinator → COMMIT LEGEND Barrier flow State upload ACK / Commit Trigger Sync phase: ms Async phase: bg thread
1

Trigger

CheckpointCoordinator sends a triggerCheckpoint(cpId) RPC to all source tasks. Sources record their input offsets (e.g., Kafka partition offsets) and inject barrier B#N into their output streams.

2

Barrier Alignment (Aligned mode) / Unaligned Mode

In aligned mode, an operator waits for barriers from all input channels before snapshotting. Records arriving after a barrier on channel X but before alignment are buffered. In unaligned mode (Flink 1.11+, closer to original Chandy-Lamport), in-flight records are included in the checkpoint state, eliminating alignment stall at the cost of larger checkpoint size.

3

Async Snapshot

Synchronous phase: For HashMap backend — copy-on-write hash table snapshot (ms). For RocksDB — flush MemTable to disk, hard-link to SSTables (ms). Stream processing resumes. Asynchronous phase: Background thread uploads state to durable storage. Upload does not block processing.

4

Commit

Each operator sends an ACK with a StateHandle (pointer to durable state) to the Coordinator. Once all ACKs arrive, the Coordinator marks the checkpoint as complete. Transactional sinks then commit their pending writes (2PC commit phase).

# flink-conf.yaml — production checkpoint config
state.backend: rocksdb
state.backend.incremental: true
state.checkpoints.dir: s3://prod-bucket/flink-checkpoints
execution.checkpointing.interval: 60000          # 60s
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.timeout: 300000           # 5min max
execution.checkpointing.min-pause: 30000
state.backend.rocksdb.memory.managed: true
taskmanager.memory.managed.fraction: 0.4          # 40% for RocksDB
Section 02
Time & Consistency Models

2.1 — Event Time vs. Processing Time

Flink distinguishes three notions of time. Only Event Time provides deterministic, reproducible results regardless of processing delays, network jitter, or replay scenarios.

⏱ Processing Time

Uses the wall-clock of the machine processing the event. Lowest latency, non-deterministic. Two identical jobs may produce different results if they process data at different wall-clock times. Suitable for monitoring dashboards tolerant of imprecision.

Lowest latencyNon-deterministic
📅 Event Time

Uses the timestamp embedded in the event payload (e.g., Kafka message timestamp or application-level field). Deterministic, reproducible, handles out-of-order events. Requires watermarks. Ideal for financial, IoT, fraud detection pipelines.

DeterministicRequires watermarks

2.2 — Watermark Mechanics

A watermark W(t) is a monotonically increasing timestamp assertion: "all events with event-time < t have been observed". Watermarks flow as special records through the operator DAG. Every window operator and time-based join uses the watermark to decide when to close a window and emit results.

Watermark propagation through parallel subtasks — min-watermark rule
Partition P0 W = 10:04:55 Partition P1 W = 10:04:48 Partition P2 W = 10:04:52 WatermarkGen BoundedOutOfOrder lag = 10s W = 10:04:38 KeyBy subtask 0 W_min = input min KeyBy subtask 1 W_min = input min ⚠ Min-Watermark Rule W_out = min(all input Ws) TumblingWindow Fires when W ≥ window_end + lateness ⚡ trigger & emit
⚠️

The idle partition problem: If one Kafka partition is idle, its watermark stays frozen at its last seen event. The global watermark (which is the minimum across all subtask watermarks) is pinned, blocking window triggering. Mitigation: use WatermarkStrategy.withIdleness(Duration.ofSeconds(30)) to mark idle sources and exclude them from the min computation.

// BoundedOutOfOrderness Watermark Strategy
WatermarkStrategy<Order> strategy = WatermarkStrategy
    .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10)) // max expected lateness
    .withTimestampAssigner((order, ts) -> order.getEventTimestamp())
    .withIdleness(Duration.ofSeconds(30));            // unblock idle partitions

DataStream<Order> stream = env
    .fromSource(kafkaSource, strategy, "Kafka Orders")
    .keyBy(Order::getCustomerId)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .allowedLateness(Time.minutes(2))       // re-fire window for late arrivals
    .sideOutputLateData(lateTag)              // route very-late to side output
    .reduce(OrderAgg::merge);

2.3 — Exactly-Once Semantics & Two-Phase Commit

Flink's internal checkpoint mechanism guarantees exactly-once state updates within the processing graph. To extend this guarantee to external sinks (Kafka, JDBC, filesystem), Flink uses the Two-Phase Commit (2PC) protocol, tightly coupled to checkpoint lifecyle events.

2PC Sink — Checkpoint-Aligned Transaction Lifecycle
CP N-1 commit CP N-2 invoke(): write records to open transaction beginTransaction() called at CP N-1 start Barrier #N preCommit() flush + close Kafka tx (no commit) snapshotState() tx handle → checkpoint state ACK CP complete commit() Kafka tx committed → records visible beginTransaction() for CP N+1 💥 FAIL Recover from last checkpoint → abort + retry commit
1

beginTransaction()

Called at the start of each checkpoint interval. The sink opens a new Kafka transactional producer (or DB transaction). All downstream invoke() calls write records into this open transaction.

2

preCommit()

Called when the checkpoint barrier reaches the sink. The sink flushes buffered records and transitions the Kafka producer to the prepared state (no commit yet). The transaction handle is serialized into the checkpoint state via snapshotState().

3

commit()

Called only after the CheckpointCoordinator confirms the checkpoint is globally complete (all ACKs received). The Kafka transaction is committed, making records visible to consumers. If the job crashes between preCommit and commit, recovery replays from the last checkpoint and retries the commit (idempotent commit guarantee).

4

abort()

If the checkpoint fails for any reason, abort() rolls back the open Kafka transaction. No partially-processed records become visible.

// End-to-end exactly-once Kafka sink
KafkaSink<String> sink = KafkaSink.<String>builder()
    .setBootstrapServers("kafka:9092")
    .setRecordSerializer(KafkaRecordSerializationSchema.builder()
        .setTopic("output-enriched")
        .setValueSerializationSchema(new SimpleStringSchema())
        .build())
    // Enables the 2PC protocol internally
    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    // Must be unique per sink to survive restarts; ties tx IDs to checkpoint IDs
    .setTransactionalIdPrefix("fraud-pipeline-sink-v2")
    .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
        String.valueOf(10 * 60 * 1000)) // < broker transaction.max.timeout.ms
    .build();
Section 04
Best Use Cases & Patterns

4.1 — Where Flink is the Clear Winner

🔍 Complex Event Processing

The FlinkCEP library enables pattern detection across event sequences with gap conditions, quantifiers, and time constraints — in a single stateful operator. Use for fraud rings, network intrusion, anomaly detection.

Pattern<Transaction, ?> pattern =
  Pattern.<Transaction>begin("high")
    .where(t -> t.amount > 5000)
  .followedBy("foreign")
    .where(t -> !t.isHomeCountry())
  .within(Time.minutes(10));
🗄 Large-Scale Stateful Lookups

Temporal table joins and async I/O allow Flink to enrich every event against multi-TB RocksDB state or external databases with millisecond latency. Avoids a full shuffle join at each batch boundary.

🚨 Low-Latency Alerting

Credit card fraud alerts, infrastructure health signals, and real-time bidding decisions all require sub-100ms end-to-end. Flink's pipelined, event-at-a-time model is the only production-ready choice at this latency tier.

1ms
Min E2E latency
TB
RocksDB state scale
1:1
Event time determinism
2PC
Sink guarantee

4.2 — Kappa Architecture & Table API/SQL

The Kappa Architecture (Jay Kreps, 2014) discards the Lambda architecture's separate batch and speed layers. Instead, a single streaming engine handles both real-time and historical reprocessing — by replaying events from the durable log (Kafka).

Kappa Architecture with Apache Flink
Event Sources IoT / CDC / Logs Apache Kafka Durable log Infinite retention Replay from t=0 live stream historical replay Apache Flink Table API / Flink SQL DataStream CEP / State Unified Batch + Stream engine Serving Layer Iceberg / Paimon (lakehouse) Elasticsearch / Druid Key-Value (Redis/Aerospike) Dashboard (Grafana)

Flink's Table API and SQL are the primary enablers of Kappa in production, providing a unified relational abstraction that executes identically on bounded (batch replay) and unbounded (live stream) data. A single SQL query can backfill historical results and continue processing live events without code changes.

-- Flink SQL: unified stream + batch query (Kappa pattern)
-- Works on live Kafka stream AND bounded Iceberg snapshot

CREATE TABLE orders (
  order_id    BIGINT,
  customer_id STRING,
  amount      DECIMAL(10,2),
  event_time  TIMESTAMP(3),
  WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
) WITH ('connector' = 'kafka', 'topic' = 'orders', ...);

SELECT
  TUMBLE_START(event_time, INTERVAL '5' MINUTE) AS window_start,
  customer_id,
  SUM(amount) AS revenue,
  COUNT(*) AS order_count
FROM orders
GROUP BY
  TUMBLE(event_time, INTERVAL '5' MINUTE),
  customer_id;
-- Identical query runs on Kafka (streaming) or Iceberg (batch backfill)
Section 05
Performance Tuning

5.1 — Backpressure & Credit-Based Flow Control

Backpressure in Flink is a first-class, desirable signal — it indicates a downstream operator cannot consume data as fast as the upstream produces it. Rather than dropping data (like Storm) or relying on external throttling (like early Spark Streaming), Flink propagates backpressure naturally through its credit-based network layer.

Credit-Based Flow Control — Receiver-Driven Rate Limiting
Sender (TM-A) outBufferQueue credits: 3 → send 3 buf credit announcement: "I have 3 free buffers" (via Netty channel announce) data buffers flow only when credits > 0 buf buf buf Receiver (TM-B) inBufferPool backpressure → credit=0 ⚠ If credit=0: Sender blocks → backpressure propagates upstream through ALL connected TM channels — source eventually slows Kafka consumer

Each network connection between TaskManagers uses Netty for transport. A receiver publishes credit announcements after recycling buffers. The sender only transmits data buffers equal to the announced credits, preventing a fast producer from overwhelming a slow consumer. This decouples Flink's application-level flow control from the underlying TCP window, so one slow task doesn't block unrelated tasks sharing the same TCP connection.

🔬

Diagnosing backpressure: In the Flink Web UI, identify the operator with high backpressure score closest to the sink — its downstream neighbour is the actual bottleneck (it is processing slowly, unable to recycle buffers fast enough). Key metrics: backPressuredTimeMsPerSecond, outPoolUsage, inPoolUsage. The bottleneck operator shows near-zero backpressure (it's working flat out) while all upstream operators show near-100% backpressure.

5.2 — Data Skew Strategies

Skew occurs when a keyBy() operation routes disproportionate event volumes to one or a few subtasks. One overloaded subtask becomes the global bottleneck, starving all upstream operators of buffer credits.

🔑 Key Salting / Partial Aggregation

Append a random salt [0, N) to hot keys for a two-stage aggregation. Stage 1 aggregates locally per salted key; Stage 2 merges across salts.

// Stage 1: partition with salt
stream.keyBy(e -> e.key + "_"
  + (int)(Math.random() * 32))
  .reduce(localAgg);
// Stage 2: merge partials
result.keyBy(e -> e.originalKey)
  .reduce(globalMerge);
📊 MiniBatch Aggregation (Table API)

Buffer a configurable number of input records per key before flushing to state. Amortises the serialization cost of RocksDB state access across many events. Dramatically reduces state overhead under skew.

-- Enable mini-batch in Table API
SET 'table.exec.mini-batch.enabled'
  = 'true';
SET 'table.exec.mini-batch.size'
  = '5000';
SET 'table.exec.mini-batch.allow-latency'
  = '200 ms';
⚙️ Local-Global Aggregation

SQL optimiser automatically rewrites COUNT(DISTINCT user_id) to use a two-phase approach with bucket keys, distributing hotspot load. Enable via table.optimizer.distinct-agg.split.enabled=true.

🔀 Parallelism Increase

Increase parallelism of the skewed operator alone via per-operator .setParallelism(N). Combined with a good hash function, this distributes the key space more evenly. For external skew (one Kafka partition slow), use watermark alignment with withIdleness().

// Async I/O for slow external lookups — prevents blocking the stream
DataStream<EnrichedOrder> enriched = AsyncDataStream.unorderedWait(
    orders,
    new AsyncCustomerLookup(redisClient),
    500, TimeUnit.MILLISECONDS,  // timeout per async request
    100                           // max concurrent in-flight requests
);

// RocksDB tuning for write-heavy workloads
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true);
backend.setRocksDBOptions(opts -> opts
    .setWriteBufferSize(64 * 1024 * 1024)       // 64 MB memtable
    .setMaxWriteBufferNumber(3)
    .setTargetFileSizeBase(64 * 1024 * 1024));   // SSTable size
env.setStateBackend(backend);

5.3 — Memory Configuration Quick Reference

Memory PoolConfig KeyRoleRecommended
JVM Heap (user code)taskmanager.memory.task.heap.sizeUser-defined functions, Java objects256 MB – 4 GB
Managed Memorytaskmanager.memory.managed.fractionRocksDB block cache + write buffers0.4 (40% total)
Network Bufferstaskmanager.memory.network.fractionNetty I/O buffers, credit pool0.1 (10% total)
Off-Heap Directtaskmanager.memory.task.off-heap.sizeFlink framework internal128 MB
JVM Overheadtaskmanager.memory.jvm-overhead.fractionJVM metaspace, stacks, native0.1
References
Sources & Further Reading
1
Apache Flink — Official Architecture Documentation
nightlies.apache.org/flink/flink-docs-master/docs/concepts/flink-architecture/
Apache Software Foundation, 2024–2025
2
Jobs and Scheduling — ExecutionGraph internals
nightlies.apache.org/flink/flink-docs-master/docs/internals/job_scheduling/
Apache Software Foundation
3
State Backends — HashMapStateBackend & EmbeddedRocksDB
nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/
Apache Software Foundation
4
Stateful Stream Processing — Chandy-Lamport ABS
nightlies.apache.org/flink/flink-docs-master/docs/concepts/stateful-stream-processing/
Apache Software Foundation
5
Lightweight Asynchronous Snapshots for Distributed Dataflows (original ABS paper)
arxiv.org/abs/1506.08603
Ewen, Tzoumas, Hueske, Metzger — VLDB 2015
6
Using RocksDB State Backend in Apache Flink: When and How
flink.apache.org/2021/01/18/using-rocksdb-state-backend...
Apache Flink Blog, 2021
7
Generating Watermarks — WatermarkStrategy API
nightlies.apache.org/flink/.../generating_watermarks/
Apache Software Foundation
8
Flink State Management and Checkpointing — Production Guide
conduktor.io/glossary/flink-state-management-and-checkpointing
Conduktor, 2025
9
Apache Spark vs Apache Flink — Streaming Framework Comparison
estuary.dev/blog/apache-spark-vs-flink
Estuary, January 2026
10
Breaking the Microbatch Barrier: Spark RTM Architecture
databricks.com/blog/breaking-microbatch-barrier...
Databricks Engineering Blog, 2024
11
Identifying Backpressure — Credit-Based Flow Control
apxml.com/courses/...identifying-backpressure
APXML Flink Course, 2024
12
Disaggregated State Management in Apache Flink® 2.0
vldb.org/pvldb/vol18/p4846-mei.pdf
Mei, Xia — VLDB 2025
13
In-depth Analysis of Flink Job Execution — StreamGraph → ExecutionGraph
alibabacloud.com/blog/...flink-job-execution...
Alibaba Cloud Community
14
Chandy-Lamport Algorithm — Distributed Snapshots
doi.org/10.1145/214451.214456
Chandy, Lamport — ACM TOCS, 1985
15
Stream Processing with Apache Flink (Book)
Fabian Hueske, Vasia Kalavri — O'Reilly Media, 2019
Apache Flink Technical Reference — Compiled March 2025 — Big Data Engineering Series
All diagrams and analyses based on Apache Flink 1.19/2.0 documentation and primary research sources.