Ingest Guide¶
Operational reference for bulk backfill and daily delta ingest.
Quick reference¶
import earthcatalog as ec
from obstore.store import S3Store
store = S3Store(bucket="my-bucket", region="us-west-2")
catalog = ec.open(store=store, base="s3://my-bucket/catalog")
# Bulk backfill (Dask/Coiled, spot-resilient 4-phase pipeline)
catalog.bulk_ingest("s3://bucket/inventory/full.parquet", mode="full",
create_client=lambda: coiled.Client(n_workers=100))
# Single-node backfill (no Dask)
catalog.ingest("s3://bucket/inventory/full.parquet", mode="full")
# Daily delta
catalog.ingest("s3://bucket/delta/daily.parquet", mode="delta",
update_hash_index=True)
1. Bulk backfill¶
First-time ingest of the full catalog. Two paths:
With Dask/Coiled (ec.bulk_ingest)¶
Runs the 4-phase staging pipeline: chunk files → async S3 fetch → NDJSON intermediates → compact to GeoParquet → register with Iceberg. Spot-resilient — interrupted chunks are retried on restart, NDJSON survives, completion markers track progress.
catalog.bulk_ingest("s3://bucket/inventory/full.parquet", mode="full",
create_client=lambda: coiled.Client(
n_workers=20,
vm_type="m6i.xlarge",
))
Pass create_client=None (default) to use an existing Dask client or fall back
to sequential execution if no cluster is available.
Single-node (ec.ingest)¶
Simpler path using ThreadPoolExecutor. No intermediate files — fetch →
fan-out → write → register. Suitable for inventories under ~1M items.
catalog.ingest("s3://bucket/inventory/full.parquet", mode="full",
chunk_size=10000, update_hash_index=True)
2. Daily delta¶
Process new items since the last ingest. Appends files without overwriting, updates the hash index for duplicate detection.
Filter by modification date to skip old items:
from datetime import UTC, datetime, timedelta
catalog.ingest("s3://bucket/delta.parquet", mode="delta",
since=datetime.now(UTC) - timedelta(days=2),
update_hash_index=True)
3. Verification¶
catalog.stats() # per-partition row/file counts from Iceberg manifests
catalog.unique_item_count() # unique STAC items (hash index footer, no full scan)
catalog.info() # grid metadata and catalog info object
DuckDB query with spatial pruning:
from shapely.geometry import box
import duckdb
greenland = box(-60, 60, -20, 85)
paths = catalog.search_files(greenland, start_datetime="2020-01-01")
con = duckdb.connect()
con.execute("INSTALL spatial; LOAD spatial;")
df = con.execute(f"""
SELECT id, platform, datetime
FROM read_parquet({paths})
WHERE ST_Intersects(geometry, ST_GeomFromText('{greenland.wkt}'))
LIMIT 10
""").df()
CLI reference¶
The old python -m earthcatalog.pipelines.backfill CLI is still available for
backward compatibility but new deployments should use the Python API above.