Backfill Pipeline¶
The backfill pipeline processes the full historical catalog (ITS_LIVE velocity pairs since 1982) using a three-level Dask DAG. It is designed to run on a Dask cluster (local or Coiled) and is spot-instance resilient via per-file checkpoint/resume.
When to use it¶
- Initial full ingest of the complete ITS_LIVE archive (~millions of items).
- Re-ingesting a large prefix after a schema change.
- Any time
earthcatalog incrementalwould take too long on a single node.
For day-to-day updates use the incremental pipeline instead.
Three-level architecture¶
Level 0 — ingest_chunk (one Dask task per inventory chunk)
┌──────────────────────────────────────────────────────────────┐
│ • fetch STAC JSON from S3 (obstore, parallel) │
│ • fan_out → group_by_partition │
│ • write one GeoParquet file per (cell, year) to warehouse │
│ • return list[FileMetadata] (~200 B per file) │
│ ↳ Parquet data never crosses the network to the head node │
└──────────────────────────────────────────────────────────────┘
Level 1 — compact_group (one Dask task per (cell, year) bucket)
┌──────────────────────────────────────────────────────────────┐
│ • download all part files for the bucket │
│ • merge → deduplicate on id → sort by (platform, datetime) │
│ • write one consolidated GeoParquet file │
│ • delete the input part files │
│ • return single FileMetadata │
└──────────────────────────────────────────────────────────────┘
Level 2 — catalog registration (head node only)
┌──────────────────────────────────────────────────────────────┐
│ • table.add_files(all_compacted_paths) │
│ • one Iceberg snapshot for the entire backfill run │
│ • upload catalog.db to S3 │
└──────────────────────────────────────────────────────────────┘
Key invariant: Parquet data never travels through the Dask scheduler or head node. Workers own all I/O; the head node only handles inventory streaming, metadata collection (~200 B/file), and the final catalog snapshot.
Spot-instance resilience¶
The pipeline runs in per-file mini-run mode by default. Each inventory
chunk is processed as an independent Dask task with its own ingest_stats.json
checkpoint file. If a spot instance is preempted mid-run:
- Completed tasks'
ingest_stats.jsonfiles remain in the warehouse. - Re-running the pipeline with the same inventory skips already-processed chunks.
- The final Level-2 catalog registration collects all surviving files.
Usage¶
Synchronous (single process — good for debugging)¶
python -m earthcatalog.pipelines.backfill \
--inventory /tmp/inventory.csv \
--catalog /tmp/catalog.db \
--warehouse /tmp/warehouse \
--scheduler synchronous \
--limit 200
Local Dask cluster¶
python -m earthcatalog.pipelines.backfill \
--inventory /tmp/inventory.parquet \
--catalog /tmp/catalog.db \
--warehouse /tmp/warehouse \
--scheduler local \
--workers 4
Coiled cloud cluster¶
python -m earthcatalog.pipelines.backfill \
--inventory s3://its-live-data/inventory/latest.parquet \
--catalog /tmp/catalog.db \
--warehouse /tmp/warehouse \
--scheduler coiled \
--coiled-n-workers 20 \
--coiled-software itslive-ingest \
--coiled-region us-west-2
Compaction scale¶
At H3 resolution 1, the ITS_LIVE glacier coverage occupies roughly
100–200 cells globally. Multiplied by 44 years (1982–2025) gives
approximately 4,400–8,800 (cell, year) buckets for a full backfill.
Each Level-1 task is independent and runs in parallel on the Dask cluster —
compact_group = dask.delayed(_compact_group_impl).
For the typical maintenance case (compacting only the most recent year after an incremental update), ~100–200 tasks are run by the standalone compaction tool, sequentially in a few minutes.