A four-phase, hardware-first strategy for a 20 PB data lake with 330 million files — targeting NameNode GC relief, compaction pipelines, cold-data offload, and modern open table formats.
Switch from the legacy CMS collector (stop-the-world pauses) to G1GC (pauseless, region-based). Then right-size the heap using the formula above. Cloudera recommends 1 GB of heap per 1 million blocks as a baseline, adjusted upward for your RPC workload.
# /etc/hadoop/conf/hadoop-env.sh — NAMENODE JVM TUNING # Target: 330M files × 1.5 blocks × ~200B overhead → recommend 256–384 GB export HADOOP_NAMENODE_OPTS="\ -Xms256g -Xmx256g \ -XX:+UseG1GC \ -XX:G1HeapRegionSize=32m \ -XX:MaxGCPauseMillis=200 \ -XX:InitiatingHeapOccupancyPercent=35 \ -XX:+ParallelRefProcEnabled \ -XX:ConcGCThreads=8 \ -XX:ParallelGCThreads=16 \ -XX:+ExplicitGCInvokesConcurrent \ -XX:+HeapDumpOnOutOfMemoryError \ -XX:HeapDumpPath=/var/log/hadoop/nn-heapdump.hprof \ -XX:+PrintGCDetails -XX:+PrintGCDateStamps \ -Xloggc:/var/log/hadoop/nn-gc.log \ -Dhadoop.security.logger=INFO,RFAS" # Increase handler threads to reduce RPC queue depth # Add to hdfs-site.xml: # dfs.namenode.handler.count = 3× sqrt(number_of_datanodes) # For 100 DataNodes → handler.count = 30 (minimum) # For 500 DataNodes → handler.count = 70
| hdfs-site.xml Property | Recommended Value | Effect |
|---|---|---|
dfs.namenode.handler.count | 3 × √(DataNodes) | Reduces RPC queue saturation |
dfs.namenode.service.handler.count | 50–100 | Dedicated service RPC threads |
dfs.namenode.checkpoint.period | 3600 (1 hr) | Reduces fsimage save overhead |
dfs.namenode.checkpoint.txns | 1000000 | Triggers checkpoint by edit count |
dfs.namenode.avoid.read.stale.datanode | true | Stops reads from slow DataNodes |
ipc.server.max.response.size | 5242880 | Prevents large RPC response OOM |
HAR creates a filesystem layer on top of HDFS, packaging many small files into a small number of HDFS files while preserving the logical namespace. It is the fastest path to reducing inode count without re-encoding data. Best applied to immutable, cold partitions (e.g., logs older than 90 days).
# Step 1: Identify cold partitions (not accessed in 90+ days) hdfs dfs -stat "%n %Y" /data/logs/2023/* | \ awk -v cutoff=$(date -d '-90 days' +%s000) '$2 < cutoff {print $1}' # Step 2: Create HAR archive (runs a MR job) hadoop archive \ -archiveName logs_2023_q1.har \ -p /data/logs/2023 q1 \ /archives/logs/2023/ # Step 3: Verify archive is readable hadoop fs -ls har:///archives/logs/2023/logs_2023_q1.har # Step 4: Delete originals ONLY after validation hadoop fs -rm -r /data/logs/2023/q1 # Expected: 1 HAR = 2 index files + N data files (vs. thousands of originals) # Inode reduction ratio: typically 100:1 to 1000:1 for log workloads
har:// URIHDFS Federation allows multiple independent NameNodes, each managing a portion of the namespace (mount table) while sharing the same pool of DataNodes. This distributes heap pressure across JVM instances and eliminates the single-NameNode SPOF.
<configuration> <!-- ViewFS: client-side namespace federation routing --> <property> <name>fs.defaultFS</name> <value>viewfs://ClusterX</value> </property> <property> <name>fs.viewfs.mounttable.ClusterX.link./data/raw</name> <value>hdfs://nn1:9000/data/raw</value> </property> <property> <name>fs.viewfs.mounttable.ClusterX.link./data/processed</name> <value>hdfs://nn2:9000/data/processed</value> </property> <property> <name>fs.viewfs.mounttable.ClusterX.link./data/archive</name> <value>hdfs://nn3:9000/data/archive</value> </property> </configuration>
The core idea: read all small files in a partition, re-partition by target block size, and write as a single optimally-sized Parquet or ORC file. The pipeline is idempotent — safe to re-run. Target output file size: 256 MB – 512 MB to match HDFS block size.
from pyspark.sql import SparkSession from pyspark.sql.functions import col, count import subprocess, logging logging.basicConfig(level=logging.INFO) log = logging.getLogger("CompactionPipeline") TARGET_FILE_BYTES = 268_435_456 # 256 MB MIN_FILE_THRESHOLD = 134_217_728 # compact partitions with avg file < 128 MB def build_spark(): return (SparkSession.builder .appName("SmallFileCompaction") .config("spark.sql.files.maxPartitionBytes", "268435456") .config("spark.sql.shuffle.partitions", "400") .config("spark.sql.adaptive.enabled", "true") .config("spark.sql.adaptive.coalescePartitions.enabled", "true") .config("spark.sql.adaptive.coalescePartitions.minPartitionNum", "1") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .getOrCreate()) def needs_compaction(spark, path): """Return True if avg file size < 128 MB or file count > 500.""" fs_result = subprocess.run( ["hdfs", "dfs", "-count", path], capture_output=True, text=True) parts = fs_result.stdout.split() dirs, files, size_bytes = int(parts[0]), int(parts[1]), int(parts[2]) avg = size_bytes // max(files, 1) log.info(f" → {path}: {files} files, avg {avg/1e6:.1f} MB") return files > 500 or avg < MIN_FILE_THRESHOLD def compact_partition(spark, src_path, tgt_path, fmt="parquet"): """Read, repartition to optimal count, write compressed output.""" df = spark.read.format(fmt).load(src_path) total_bytes = spark.sparkContext._jvm.org.apache.hadoop \ .fs.FileSystem.get(spark._jsc.hadoopConfiguration()) \ .getContentSummary( spark.sparkContext._jvm.org.apache.hadoop.fs.Path(src_path) ).getLength() n_partitions = max(1, int(total_bytes / TARGET_FILE_BYTES)) log.info(f" → Writing {n_partitions} partitions to {tgt_path}") (df.coalesce(n_partitions) .write .mode("overwrite") .option("compression", "snappy") .format(fmt) .save(tgt_path)) return n_partitions def run_pipeline(root_path, partitions): spark = build_spark() for part in partitions: src = f"{root_path}/{part}" tmp = f"{root_path}/_compact_tmp/{part}" if not needs_compaction(spark, src): log.info(f"SKIP {part} — already optimal") continue log.info(f"COMPACT {part}") n = compact_partition(spark, src, tmp) # Atomic swap: rename tmp → src subprocess.run(["hdfs", "dfs", "-rm", "-r", src]) subprocess.run(["hdfs", "dfs", "-mv", tmp, src]) log.info(f" ✓ Done. Output: {n} file(s).") spark.stop() if __name__ == "__main__": # Drive from Airflow / cron; supply partition list from metadata scan run_pipeline("/data/events", ["dt=2024-01", "dt=2024-02", "dt=2024-03"])
Hive ACID tables generate delta files for every INSERT, UPDATE, or DELETE. Enable the Hive Compactor to automatically merge these into base files on a schedule.
-- Enable ACID and auto-compaction in hive-site.xml equivalent SET hive.support.concurrency = true; SET hive.exec.dynamic.partition.mode = nonstrict; SET hive.compactor.initiator.on = true; SET hive.compactor.worker.threads = 4; -- Create optimal bucketed ORC table (avoids over-partitioning) CREATE TABLE events_optimized ( event_id STRING, user_id BIGINT, event_type STRING, payload STRING, event_ts TIMESTAMP ) PARTITIONED BY (event_date STRING) -- coarse: date only, not hour CLUSTERED BY (user_id) INTO 64 BUCKETS STORED AS ORC TBLPROPERTIES ( 'transactional' = 'true', 'orc.compress' = 'SNAPPY', 'orc.stripe.size' = '268435456', -- 256 MB stripe 'hive.compactor.minor.delta.num.threshold' = '10', 'hive.compactor.major.size.threshold' = '1073741824' -- 1 GB ); -- Manually trigger compaction on a specific partition ALTER TABLE events_optimized PARTITION (event_date='2024-01-01') COMPACT 'MAJOR'; -- Check compaction queue status SHOW COMPACTIONS;
| Format | Best For | Compression | Predicate Pushdown | HDFS Inode Efficiency | Recommended Codec |
|---|---|---|---|---|---|
| Parquet | Spark analytics, wide tables, ML feature stores | 60–80% vs CSV | ✅ Column + row-group stats | ⭐⭐⭐⭐⭐ (large files) | Snappy / ZSTD |
| ORC | Hive ACID, updates/deletes, Presto | 65–85% vs CSV | ✅ Bloom filter + stripe stats | ⭐⭐⭐⭐⭐ (large files) | ZLIB / Snappy |
| Avro | Streaming ingestion, schema evolution | 30–50% vs CSV | ❌ Row-based | ⭐⭐⭐ (many small avro = bad) | Deflate / Snappy |
| SequenceFile | HAR replacement, MR compatibility | 20–40% vs raw | ❌ | ⭐⭐⭐⭐ (good for consolidation) | Block-level Snappy |
Set trigger(processingTime='15 minutes') and use maxFilesPerTrigger
to batch many records into one large Parquet file instead of one per micro-batch.
Partition by event_date (daily), never by event_hour or user_id.
Over-partitioning is the #1 cause of new small files in streaming pipelines.
After every ETL job, trigger a compaction check DAG that scans output partitions and fires the Spark compaction pipeline if average file size < 128 MB.
Set namespace quotas (hdfs dfsadmin -setQuota 5000 /data/team_x)
to force teams to compact before adding new files, creating a structural incentive.
| Dimension | Apache Ozone | MinIO | HDFS (status quo) |
|---|---|---|---|
| Metadata Architecture | Ozone Manager (OM) + Storage Container Manager (SCM) — decoupled | Distributed etcd-based metadata | Single NameNode heap (bottleneck) |
| Scalability | 10B+ objects proven (Preferred Networks case) | Petabyte-scale, proven in cloud-native | ~500M objects practical limit |
| Small File Handling | ⭐⭐⭐⭐⭐ — SCM manages blocks independently of namespace | ⭐⭐⭐⭐ — Object-per-file, no block fragmentation | ⭐⭐ — 150 B/inode in NN heap |
| Hadoop Ecosystem Compatibility | Native — ships as part of Hadoop 3.x | S3A connector (gateway mode) | Native |
| Iceberg Integration | Native atomic rename + locking | S3-compatible (needs external lock) | Native (for table operations) |
| License | Apache 2.0 (free) | AGPL-3.0 / commercial | Apache 2.0 (free) |
| Erasure Coding | ✅ RS(6,3) reduces storage to 1.5× vs 3× replication | ✅ EC supported | ✅ EC available in HDFS 3.x |
| TCO (on existing hw) | 💰 Zero license + 35–50% storage reduction via EC | 💰 AGPL free tier; commercial for enterprise | 💰 Current — no new cost but hitting limits |
Assume ~60–70% of 20 PB is cold data (not accessed in 90+ days). Migrating 12–14 PB cold to Ozone with Erasure Coding (RS 6+3 = 1.5× overhead vs 3× replication) produces significant storage savings on the same hardware.
# Ozone must be running alongside HDFS (Hadoop 3.2+ includes Ozone) # Step 1: Initialize Ozone volume/bucket for cold archive ozone sh volume create /vol-archive ozone sh bucket create /vol-archive/cold-data --replication EC --ecData 6 --ecParity 3 # Step 2: DistCp cold partition to Ozone (runs as MapReduce, no extra HW) hadoop distcp \ -Dmapreduce.job.queuename=compaction \ -bandwidth 100 \ -m 50 \ hdfs://nn1:9000/data/archive/logs/2022/ \ o3fs://cold-data.vol-archive.ozone-service/logs/2022/ # Step 3: Verify checksum integrity hadoop distcp \ -Ddistcp.options.verify=true \ -overwrite \ hdfs://nn1:9000/data/archive/logs/2022/ \ o3fs://cold-data.vol-archive.ozone-service/logs/2022/ # Step 4: Delete HDFS original after validation (reduces NN inodes) hdfs dfs -rm -r /data/archive/logs/2022/ # Estimated throughput: ~500 MB/s across 50 mappers → 14 PB ≈ 8 days
* TCO model includes: storage hardware cost (EC savings), NameNode server upgrades, operator hours for maintenance, and licensing. All solutions use existing hardware cluster.
The traditional Hive approach requires the NameNode to LIST directories for every query plan —
at 330M files this produces tens of thousands of RPC calls per job. Iceberg replaces directory listing
with a self-contained metadata tree stored as regular files.
| Iceberg Layer | What It Stores | Format | NameNode Impact |
|---|---|---|---|
| Catalog | Table → current metadata.json pointer | Hive Metastore / REST / Nessie | 🟢 Zero NN calls |
| Metadata File | Schema, partition spec, current snapshot | JSON (1 file per commit) | 🟢 1 file read per query plan |
| Manifest List | List of manifest files for a snapshot | Avro (1 per snapshot) | 🟢 1 file read |
| Manifest Files | File paths + column-level min/max stats | Avro (N per snapshot) | 🟡 N reads (N ≪ file count) |
| Data Files | Actual records | Parquet / ORC / Avro | 🟡 Only files matching predicates |
-- Option A: Migrate in-place (Spark 3.x with Iceberg Spark extensions) CALL catalog.system.migrate('default.events_legacy'); -- Option B: Create new Iceberg table and insert-overwrite CREATE TABLE catalog.default.events_iceberg ( event_id STRING, user_id BIGINT, event_type STRING, event_ts TIMESTAMP ) USING ICEBERG PARTITIONED BY (days(event_ts)) -- hidden partitioning — no partition cols in data TBLPROPERTIES ( 'write.format.default' = 'parquet', 'write.parquet.compression-codec' = 'zstd', 'write.target-file-size-bytes' = '268435456', -- 256 MB target 'write.distribution-mode' = 'hash', 'history.expire.min-snapshots-to-keep' = '5' ); -- Compact Iceberg table (rewrite small files into optimal size) CALL catalog.system.rewrite_data_files( table => 'default.events_iceberg', strategy => 'binpack', options => map( 'target-file-size-bytes', '268435456', 'min-input-files', '5' ) ); -- Expire old snapshots to clean manifest bloat CALL catalog.system.expire_snapshots( table => 'default.events_iceberg', older_than => TIMESTAMP '2024-01-01 00:00:00', retain_last => 7 );
from pyspark.sql import SparkSession spark = (SparkSession.builder .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") .config("spark.sql.catalog.catalog", "org.apache.iceberg.spark.SparkCatalog") .config("spark.sql.catalog.catalog.type", "hive") .config("spark.sql.catalog.catalog.uri", "thrift://hive-metastore:9083") .config("spark.sql.catalog.catalog.warehouse", "hdfs://nn1:9000/warehouse/iceberg") # Tune Iceberg write for large files .config("spark.sql.iceberg.planning.preserve-data-grouping", "true") .config("spark.sql.iceberg.handle-timestamp-without-timezone", "true") .getOrCreate())
Apache Doris can serve as a unified query layer on top of HDFS/Ozone/Iceberg using its external catalog (Multi-Catalog) feature — queries execute against Doris's internal vectorized engine without pulling data off HDFS, reducing NameNode read amplification.
-- Register Iceberg catalog in Doris (Multi-Catalog) CREATE CATALOG iceberg_hdfs PROPERTIES ( 'type' = 'iceberg', 'iceberg.catalog.type' = 'hms', 'hive.metastore.uris' = 'thrift://hive-metastore:9083', 'hadoop.fs.defaultFS' = 'hdfs://nn1:9000', 'dfs.nameservices' = 'mycluster' ); -- Query without touching NameNode LIST — metadata served from manifests SELECT event_type, COUNT(*) AS cnt FROM iceberg_hdfs.default.events_iceberg WHERE event_ts >= '2024-01-01' GROUP BY event_type ORDER BY cnt DESC;
| Phase | Timeline | Key Actions | Expected NN Inode Reduction | Cost |
|---|---|---|---|---|
| Phase 1A | Day 1–3 | JVM G1GC tuning, increase heap to 256 GB, raise handler.count | 0% (stabilization only) | $0 |
| Phase 1B | Week 1–2 | HAR on cold log archives (> 90 days) | 15–25% (archive namespace) | $0 |
| Phase 1C | Week 2–4 | HDFS Federation (3-NameNode split) + ViewFS | Distributes load 3× | Staff time only |
| Phase 2 | Month 1–3 | Spark compaction pipeline, migrate to Parquet/ORC, ingestion governance | 50–80% on processed data | Compute time only |
| Phase 3 | Month 3–6 | Deploy Ozone, DistCp cold 12–14 PB, enable EC RS(6,3) | 60% of total inodes migrated off HDFS NN | Staff time + existing HW |
| Phase 4 | Month 6–12 | Migrate hot tables to Iceberg, schedule rewrite_data_files, optionally add Doris | Eliminates future NN LIST RPC storms | Staff time only |