Skip to content

Pipelines API

earthcatalog.pipelines.incremental

Single-node incremental ingest from an AWS S3 Inventory file.

Supported inventory formats

  • CSV (plain or .gz) — AWS S3 Inventory default
  • Parquet — AWS S3 Inventory optional output; preferred at scale (typed, compressed, no quoting ambiguity). Read in row-group batches so memory is bounded regardless of file size.
  • manifest.json — AWS S3 Inventory manifest; references multiple Parquet data files in a private destination bucket. Pass the manifest URI as inventory_path; credentials for the destination bucket are read from ~/.aws/credentials (default profile) or from the AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY environment variables.

The inventory must have at minimum two columns named bucket and key (case-insensitive for CSV; exact for Parquet).

Delta ingestion

Pass since (a timezone-aware UTC datetime) to skip objects that were last modified before that cutoff:

  • Parquet: filters on the last_modified_date column; rows missing that column are passed through unchanged. The column may be either a string (ISO-8601) or a native Parquet TIMESTAMP — both are handled.
  • CSV: parses the last_modified_date column when a header row is present and the column is present; otherwise all rows are passed through (graceful degradation).

A typical 2-day delta run::

from datetime import datetime, timezone, timedelta
since = datetime.now(tz=timezone.utc) - timedelta(days=2)
run(inventory_path=..., since=since)

Memory notes

All S3 I/O goes through obstore. For S3 inventory files the full object is downloaded once via obstore.get().bytes():

  • Parquet: pq.ParquetFile.iter_batches() then reads one row-group at a time — peak RAM is bounded by batch_size rows.
  • CSV.gz: gzip.open(BytesIO(compressed_bytes)) — only the compressed bytes are held; decompression is line-by-line.
  • CSV (plain): TextIOWrapper(BytesIO(raw_bytes)) — one copy of the raw bytes; no extra full-string decode. For very large plain-text inventories prefer CSV.gz or Parquet to halve the peak RAM.

True zero-copy streaming from the obstore async byte stream is a planned improvement (obstore.GetResult.stream()); it would eliminate the full-file download for all formats.

Usage

python -m earthcatalog.pipelines.incremental \ --inventory /tmp/test_inventory.csv \ --catalog /tmp/earthcatalog.db \ --warehouse /tmp/earthcatalog_warehouse \ --since 2026-04-21 \ --limit 500

Using a real S3 Inventory manifest (requires AWS credentials):

python -m earthcatalog.pipelines.incremental \ --inventory s3://my-log-bucket/inventory/.../manifest.json \ --catalog /tmp/earthcatalog.db \ --warehouse /tmp/earthcatalog_warehouse \ --since 2026-04-21

Classes

Functions

run(inventory_path, catalog_path, warehouse_path, chunk_size=500, max_workers=16, limit=None, h3_resolution=1, partitioner=None, use_lock=True, batch_add_files=False, since=None, grid_config=None)

Read inventory → fetch STAC JSON from S3 in parallel → fan_out() → write_geoparquet() → add_files() to PyIceberg table.

Each (grid_partition, year) group is written as a separate GeoParquet file in hive-style layout. Files are registered in the Iceberg catalog via table.add_files().

Parameters

since: When set (timezone-aware UTC), only inventory rows with last_modified_date >= since are processed. Pass datetime.now(tz=timezone.utc) - timedelta(days=2) for a 2-day delta run. None processes the full inventory. batch_add_files: When False (default), table.add_files() is called after every chunk — one Iceberg snapshot per chunk. Safe for incremental daily runs: a mid-run crash leaves the catalog in a consistent partial state.

When ``True``, all GeoParquet paths are collected and registered in a
**single** ``table.add_files()`` call at the very end — exactly one
Iceberg snapshot regardless of how many chunks were processed.  Use
this for initial backfills to prevent snapshot explosion.
If the process crashes mid-run, no files are registered; re-run from
scratch.

The full run is wrapped in an S3Lock (set use_lock=False for tests). download_catalog / upload_catalog are called inside the lock so the SQLite catalog.db is safely synchronised with the configured store.

Source code in earthcatalog/pipelines/incremental.py
def run(
    inventory_path: str,
    catalog_path: str,
    warehouse_path: str,
    chunk_size: int = 500,
    max_workers: int = 16,
    limit: int | None = None,
    h3_resolution: int = 1,
    partitioner: object = None,
    use_lock: bool = True,
    batch_add_files: bool = False,
    since: datetime | None = None,
    grid_config=None,
) -> None:
    """
    Read inventory → fetch STAC JSON from S3 in parallel →
    fan_out() → write_geoparquet() → add_files() to PyIceberg table.

    Each ``(grid_partition, year)`` group is written as a separate GeoParquet
    file in hive-style layout.  Files are registered in the Iceberg catalog via
    ``table.add_files()``.

    Parameters
    ----------
    since:
        When set (timezone-aware UTC), only inventory rows with
        ``last_modified_date >= since`` are processed.  Pass
        ``datetime.now(tz=timezone.utc) - timedelta(days=2)`` for a
        2-day delta run.  ``None`` processes the full inventory.
    batch_add_files:
        When ``False`` (default), ``table.add_files()`` is called after every
        chunk — one Iceberg snapshot per chunk.  Safe for incremental daily
        runs: a mid-run crash leaves the catalog in a consistent partial state.

        When ``True``, all GeoParquet paths are collected and registered in a
        **single** ``table.add_files()`` call at the very end — exactly one
        Iceberg snapshot regardless of how many chunks were processed.  Use
        this for initial backfills to prevent snapshot explosion.
        If the process crashes mid-run, no files are registered; re-run from
        scratch.

    The full run is wrapped in an S3Lock (set ``use_lock=False`` for tests).
    ``download_catalog`` / ``upload_catalog`` are called inside the lock so
    the SQLite catalog.db is safely synchronised with the configured store.
    """
    if partitioner is None:
        partitioner = H3Partitioner(resolution=h3_resolution)

    warehouse = Path(warehouse_path)
    warehouse.mkdir(parents=True, exist_ok=True)

    def _ingest() -> None:
        download_catalog(catalog_path)

        catalog = _open_sqlite(db_path=catalog_path, warehouse_path=warehouse_path)
        table = get_or_create(catalog, grid_config=grid_config)
        print(f"Catalog  : {catalog_path}")
        print(f"Table    : {table.name()}")

        chunk: list[tuple[str, str]] = []
        total_items = 0
        total_rows = 0
        part_index = 0
        # Populated only when batch_add_files=True; flushed once at the end.
        pending_paths: list[str] = []

        def flush(chunk: list[tuple[str, str]]) -> None:
            nonlocal total_rows, part_index

            with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as pool:
                items = list(filter(None, pool.map(lambda bc: _fetch_item(*bc), chunk)))

            if not items:
                return

            fan_out_items = fan_out(items, partitioner)
            if not fan_out_items:
                return

            # Group by (grid_partition, year) — each group produces exactly one
            # Parquet file with a single partition value for both Iceberg transforms.
            groups = group_by_partition(fan_out_items)
            paths = []
            n_rows = 0

            for (cell, year), group_items in groups.items():
                year_str = str(year) if year is not None else "unknown"
                # Hive-style layout: grid_partition=<cell>/year=<year>/part_N.parquet
                out_dir = warehouse / f"grid_partition={cell}" / f"year={year_str}"
                out_dir.mkdir(parents=True, exist_ok=True)
                out_path = str(out_dir / f"part_{part_index:06d}_{uuid.uuid4().hex[:8]}.parquet")
                n = write_geoparquet(group_items, out_path)
                paths.append(out_path)
                n_rows += n

            if paths:
                if batch_add_files:
                    pending_paths.extend(paths)
                else:
                    table.add_files(paths)

            total_rows += n_rows
            part_index += 1
            print(
                f"  chunk {part_index}: {len(items)} items → "
                f"{n_rows} rows in {len(paths)} partition files"
            )

        print(f"Reading inventory: {inventory_path}")
        for bucket, key in _iter_inventory(inventory_path, since=since):
            if not key.endswith(".stac.json"):
                continue

            chunk.append((bucket, key))
            total_items += 1

            if len(chunk) >= chunk_size:
                flush(chunk)
                chunk.clear()

            if limit and total_items >= limit:
                break

        if chunk:
            flush(chunk)

        if batch_add_files and pending_paths:
            print(f"  registering {len(pending_paths)} files in a single snapshot …")
            table.add_files(pending_paths)

        upload_catalog(catalog_path)
        print(f"\nDone. {total_items} items → {total_rows} rows")
        print(f"Snapshots in catalog: {len(table.history())}")

    if use_lock:
        with S3Lock(owner="incremental"):
            _ingest()
    else:
        _ingest()

run_from_config(inventory_path, config, limit=None)

Drive the incremental pipeline from an AppConfig instance.

Parameters

inventory_path: Local path or s3:// URI to the S3 Inventory CSV, CSV.gz, or Parquet file. config: An :class:earthcatalog.config.AppConfig instance. limit: Optional cap on the number of STAC items processed (for testing).

Source code in earthcatalog/pipelines/incremental.py
def run_from_config(inventory_path: str, config: object, limit: int | None = None) -> None:
    """
    Drive the incremental pipeline from an ``AppConfig`` instance.

    Parameters
    ----------
    inventory_path:
        Local path or ``s3://`` URI to the S3 Inventory CSV, CSV.gz, or
        Parquet file.
    config:
        An :class:`earthcatalog.config.AppConfig` instance.
    limit:
        Optional cap on the number of STAC items processed (for testing).
    """
    partitioner = build_partitioner(config.grid)
    run(
        inventory_path=inventory_path,
        catalog_path=config.catalog.db_path,
        warehouse_path=config.catalog.warehouse,
        chunk_size=config.ingest.chunk_size,
        max_workers=config.ingest.max_workers,
        batch_add_files=config.ingest.batch_add_files,
        limit=limit,
        partitioner=partitioner,
        grid_config=config.grid,
    )

earthcatalog.pipelines.backfill

Staging-based backfill pipeline for earthcatalog.

Architecture (four phases, fully idempotent)

Phase 1 — Scheduler (head node) Reads the inventory manifest/CSV/Parquet and writes fixed-size chunk files (Parquet, one row per .stac.json item) to a staging prefix. Each chunk contains up to chunk_size (bucket, key) pairs. Already-written chunks are skipped on restart (idempotent).

Phase 2 — Ingest (one Dask task per chunk) Each worker: 1. Reads its chunk Parquet from the staging store. 2. Async-fetches all STAC JSONs using obstore.get_async with a TaskGroup + Semaphore. 3. Accumulates items in memory, fans out through the H3 partitioner. 4. Writes each (cell, year) group as NDJSON to: staging/{cell}/{year}/chunk_{id}.ndjson

Phase 3 — Compact (one Dask task per (cell, year)) Each worker: 1. Scans the staging prefix for ALL .ndjson files in its bucket. 2. Reads them into memory, deduplicates by id. 3. Writes up to compact_rows rows per GeoParquet file. 4. Does NOT delete NDJSON staging files (Phase 4 does that).

Phase 4 — Register (head node) 1. Drops and recreates the Iceberg table. 2. Registers all warehouse Parquet files via table.add_files(). 3. Uploads the catalog. 4. Deletes all staging files.

Delta mode (delta=True)

Lightweight append-only path for small incremental ingests.

Phase 3 — Delta Compact Same as normal compact (NDJSON → GeoParquet) but output files are numbered starting from the next available index in the warehouse partition, so existing parquets are never overwritten.

Phase 4 — Delta Register Opens the existing Iceberg table (no drop) and calls table.add_files() with only the newly written parquets.

Spot resilience

Every phase is individually idempotent. If a spot instance is killed:

  • Phase 1: chunks already written survive; scheduler skips them.
  • Phase 2: NDJSON files written by dead workers survive and are picked up by Phase 3 on the next run.
  • Phase 3: each (cell, year) task scans S3 fresh.
  • Phase 4: catalog is rebuilt from scratch (drop + recreate).

Classes

Functions

write_chunks(inventory_path, staging_store, staging_prefix, chunk_size=100000, limit=None, since=None, write_concurrency=8)

Read inventory and write chunk Parquet files to the staging store.

Streams from the inventory iterator into chunk-sized buffers and writes each chunk as soon as it's full — no need to materialize the entire inventory in memory.

Already-written chunks are detected via a single obstore.list call upfront. Items belonging to existing chunks are still consumed from the inventory iterator but not buffered or written.

Writes are parallelized with a thread pool.

Returns list of chunk keys (including pre-existing ones).

Source code in earthcatalog/pipelines/backfill.py
def write_chunks(
    inventory_path: str,
    staging_store: object,
    staging_prefix: str,
    chunk_size: int = 100_000,
    limit: int | None = None,
    since: datetime | None = None,
    write_concurrency: int = 8,
) -> list[str]:
    """
    Read inventory and write chunk Parquet files to the staging store.

    Streams from the inventory iterator into chunk-sized buffers and writes
    each chunk as soon as it's full — no need to materialize the entire
    inventory in memory.

    Already-written chunks are detected via a single ``obstore.list`` call
    upfront. Items belonging to existing chunks are still consumed from the
    inventory iterator but not buffered or written.

    Writes are parallelized with a thread pool.

    Returns list of chunk keys (including pre-existing ones).
    """
    from concurrent.futures import ThreadPoolExecutor, as_completed

    import tqdm

    existing_ids = _list_existing_chunks(staging_store, staging_prefix)
    if existing_ids:
        max_existing = max(existing_ids)
        print(f"Phase 1: {len(existing_ids)} existing chunks found (up to chunk {max_existing})")
    else:
        max_existing = -1

    chunk_keys: dict[int, str] = {}
    written = 0
    skipped = len(existing_ids)
    chunk_id = 0
    buf: list[tuple[str, str]] = []
    total_items = 0

    for cid in existing_ids:
        chunk_keys[cid] = f"{staging_prefix}/chunk_{cid:06d}.parquet"

    contiguous_max = -1
    for i in range(max_existing + 1):
        if i in existing_ids:
            contiguous_max = i
        else:
            break

    items_to_skip = (contiguous_max + 1) * chunk_size

    with ThreadPoolExecutor(max_workers=write_concurrency) as pool:
        pending: dict[object, int] = {}
        pbar = tqdm.tqdm(desc="Phase 1", unit="chunk")

        def _collect(done_futures):
            nonlocal written
            for f in done_futures:
                key = f.result()
                cid = pending[f]
                chunk_keys[cid] = key
                written += 1
                del pending[f]
                pbar.update(1)

        inv_iter = _iter_inventory(inventory_path, since=since)

        if items_to_skip > 0:
            pbar.set_postfix_str(f"fast-forwarding {items_to_skip:,} items")
            chunk_id = contiguous_max + 1
            for b, k in inv_iter:
                if not k.endswith(".stac.json"):
                    continue
                total_items += 1
                if total_items >= items_to_skip:
                    break
            pbar.set_postfix_str(f"fast-forward done, at item {total_items:,}")

        for b, k in inv_iter:
            if not k.endswith(".stac.json"):
                continue
            total_items += 1

            buf.append((b, k))

            if len(buf) >= chunk_size:
                future = pool.submit(_write_one_chunk, chunk_id, buf, staging_store, staging_prefix)
                pending[future] = chunk_id
                chunk_id += 1
                buf = []

                done = [f for f in list(pending) if f.done()]
                _collect(done)

                pbar.set_postfix(
                    items=f"{total_items:,}",
                    written=written,
                    skipped=skipped,
                    pending=len(pending),
                    refresh=False,
                )

            if limit is not None and total_items >= limit:
                break

        if buf and chunk_id not in existing_ids:
            future = pool.submit(_write_one_chunk, chunk_id, buf, staging_store, staging_prefix)
            pending[future] = chunk_id

        for f in as_completed(pending):
            _collect([f])

        pbar.close()

    max_id = max(chunk_keys) if chunk_keys else -1
    result = [chunk_keys[i] for i in range(max_id + 1)]

    print(
        f"Scheduler: {total_items:,} .stac.json items → "
        f"{written} chunks written, {skipped} skipped (already exist)"
    )
    return result

ingest_chunk(chunk_key, staging_store, staging_prefix, pending_prefix, partitioner, fetch_concurrency=_FETCH_CONCURRENCY)

Phase 2 worker: read chunk → fetch items → fan-out → write NDJSON groups.

Failed items are written to pending_prefix/chunk_{id}.parquet for retry on the next run. If all items succeed, no pending file is created.

Returns a report dict with counts and the list of NDJSON keys written.

Source code in earthcatalog/pipelines/backfill.py
def ingest_chunk(
    chunk_key: str,
    staging_store: object,
    staging_prefix: str,
    pending_prefix: str,
    partitioner: object,
    fetch_concurrency: int = _FETCH_CONCURRENCY,
) -> dict[str, Any]:
    """
    Phase 2 worker: read chunk → fetch items → fan-out → write NDJSON groups.

    Failed items are written to ``pending_prefix/chunk_{id}.parquet`` for
    retry on the next run.  If all items succeed, no pending file is created.

    Returns a report dict with counts and the list of NDJSON keys written.
    """
    raw = memoryview(obstore.get(staging_store, chunk_key).bytes()).tobytes()
    tbl = pq.ParquetFile(io.BytesIO(raw)).read()
    pairs = [
        (str(b), str(k))
        for b, k in zip(tbl.column("bucket").to_pylist(), tbl.column("key").to_pylist())
    ]

    source_items = len(pairs)
    if not pairs:
        return {
            "chunk_key": chunk_key,
            "source_items": 0,
            "fetched_items": 0,
            "fetch_failures": 0,
            "groups_written": 0,
            "fan_out_counter": {},
            "ndjson_keys": [],
        }

    from obstore.store import S3Store as _S3Store

    bucket = pairs[0][0]
    source_store = _S3Store(bucket=bucket, region="us-west-2", skip_signature=True)
    items, failed = asyncio.run(
        _fetch_all_async(pairs, concurrency=fetch_concurrency, store=source_store)
    )
    fetch_failures = len(failed)

    chunk_id = chunk_key.rsplit("/", 1)[-1].replace("chunk_", "").replace(".parquet", "")

    if failed:
        now = datetime.now(UTC).isoformat()
        pending_key = f"{pending_prefix}/chunk_{chunk_id}.parquet"
        fail_tbl = pa.table(
            {
                "bucket": [b for b, _, _ in failed],
                "key": [k for _, k, _ in failed],
                "error": [e for _, _, e in failed],
                "timestamp": [now] * len(failed),
            }
        )
        buf = io.BytesIO()
        pq.write_table(fail_tbl, buf)
        obstore.put(staging_store, pending_key, buf.getvalue())
    else:
        pending_key = f"{pending_prefix}/chunk_{chunk_id}.parquet"
        try:
            obstore.delete(staging_store, pending_key)
        except (FileNotFoundError, Exception):
            pass

    empty_report = {
        "chunk_key": chunk_key,
        "source_items": source_items,
        "fetched_items": len(items),
        "fetch_failures": fetch_failures,
        "groups_written": 0,
        "fan_out_counter": {},
        "ndjson_keys": [],
    }

    if not items:
        empty_report["fetched_items"] = 0
        return empty_report

    fan_out_items = fan_out(items, partitioner)
    if not fan_out_items:
        return empty_report

    cells_per_id: dict[str, set[str]] = defaultdict(set)
    for item in fan_out_items:
        cells_per_id[item["id"]].add(item["properties"].get("grid_partition", "__none__"))
    fan_out_counter: dict[int, int] = dict(Counter(len(cells) for cells in cells_per_id.values()))

    groups = group_by_partition(fan_out_items)
    ndjson_keys: list[str] = []

    for (cell, year), group_items in groups.items():
        ndjson_key = f"{staging_prefix}/{cell}/{year}/chunk_{chunk_id}.ndjson"
        n = _write_ndjson(group_items, staging_store, ndjson_key)
        if n > 0:
            ndjson_keys.append(ndjson_key)

    marker_key = f"{staging_prefix}/_completed/chunk_{chunk_id}.done"
    obstore.put(staging_store, marker_key, b"ok")

    return {
        "chunk_key": chunk_key,
        "source_items": source_items,
        "fetched_items": len(items),
        "fetch_failures": fetch_failures,
        "groups_written": len(ndjson_keys),
        "fan_out_counter": fan_out_counter,
        "ndjson_keys": ndjson_keys,
    }

compact_cell_year(cell, year, staging_store, staging_prefix, warehouse_store, compact_rows=100000)

Phase 3 worker: stream NDJSON → write GeoParquet to warehouse_store.

Source code in earthcatalog/pipelines/backfill.py
def compact_cell_year(
    cell: str,
    year: str,
    staging_store: object,
    staging_prefix: str,
    warehouse_store: object,
    compact_rows: int = 100_000,
) -> dict[str, Any]:
    """Phase 3 worker: stream NDJSON → write GeoParquet to *warehouse_store*."""
    prefix = f"{staging_prefix}/{cell}/{year}/"
    ndjson_keys = _scan_ndjson(staging_store, prefix)

    if not ndjson_keys:
        return _empty_compact_report(cell, year)

    key_prefix = f"grid_partition={cell}/year={year}"

    def _write_fn(batch: list[dict], key: str) -> int:
        return _write_parquet_to_store(batch, warehouse_store, key)

    def _out_key(idx: int) -> str:
        return f"{key_prefix}/part_{idx:06d}.parquet"

    return _stream_compact(
        cell, year, staging_store, ndjson_keys, compact_rows, _write_fn, _out_key
    )

compact_cell_year_delta(cell, year, staging_store, staging_prefix, warehouse_store, compact_rows=100000)

Phase 3 delta worker: compact NDJSON → new GeoParquet (no overwrite).

Source code in earthcatalog/pipelines/backfill.py
def compact_cell_year_delta(
    cell: str,
    year: str,
    staging_store: object,
    staging_prefix: str,
    warehouse_store: object,
    compact_rows: int = 100_000,
) -> dict[str, Any]:
    """Phase 3 delta worker: compact NDJSON → new GeoParquet (no overwrite)."""
    prefix = f"{staging_prefix}/{cell}/{year}/"
    ndjson_keys = _scan_ndjson(staging_store, prefix)

    if not ndjson_keys:
        return _empty_compact_report(cell, year)

    start_idx = _next_part_index(warehouse_store, cell, year)
    key_prefix = f"grid_partition={cell}/year={year}"

    def _write_fn(batch: list[dict], key: str) -> int:
        return _write_parquet_to_store(batch, warehouse_store, key)

    def _out_key(idx: int) -> str:
        return f"{key_prefix}/part_{start_idx + idx:06d}.parquet"

    return _stream_compact(
        cell, year, staging_store, ndjson_keys, compact_rows, _write_fn, _out_key
    )

register_and_cleanup(catalog_path, warehouse_root, staging_store, staging_prefix, warehouse_store=None, upload=True, h3_resolution=None, hash_index_path=None)

Phase 4: rebuild Iceberg catalog from warehouse files, upload, cleanup staging.

  1. Drop and recreate the Iceberg table.
  2. Scan warehouse for all Parquet files.
  3. Register via table.add_files().
  4. Upload catalog.
  5. Delete all staging files.

hash_index_path defaults to {warehouse_root}_id_hashes.parquet.

When warehouse_store is provided it is used for store-based listing instead of local filesystem glob (which was the pre-Phase-C fallback).

Source code in earthcatalog/pipelines/backfill.py
def register_and_cleanup(
    catalog_path: str,
    warehouse_root: str,
    staging_store: object,
    staging_prefix: str,
    warehouse_store: object | None = None,
    upload: bool = True,
    h3_resolution: int | None = None,
    hash_index_path: str | None = None,
) -> None:
    """
    Phase 4: rebuild Iceberg catalog from warehouse files, upload, cleanup staging.

    1. Drop and recreate the Iceberg table.
    2. Scan warehouse for all Parquet files.
    3. Register via table.add_files().
    4. Upload catalog.
    5. Delete all staging files.

    ``hash_index_path`` defaults to ``{warehouse_root}_id_hashes.parquet``.

    When *warehouse_store* is provided it is used for store-based listing
    instead of local filesystem glob (which was the pre-Phase-C fallback).
    """
    from pyiceberg.exceptions import NamespaceAlreadyExistsError, NoSuchTableError

    from earthcatalog.catalog import FULL_NAME, ICEBERG_SCHEMA, NAMESPACE, PARTITION_SPEC

    if hash_index_path is None:
        hash_index_path = f"{warehouse_root.rstrip('/')}_id_hashes.parquet"

    catalog = _open_sqlite(db_path=catalog_path, warehouse_path=warehouse_root)

    try:
        catalog.create_namespace(NAMESPACE)
    except NamespaceAlreadyExistsError:
        pass

    try:
        catalog.drop_table(FULL_NAME)
        print("Dropped stale Iceberg table.")
    except NoSuchTableError:
        pass

    props: dict[str, str] = {PROP_GRID_TYPE: "h3", PROP_HASH_INDEX_PATH: hash_index_path}
    if h3_resolution is not None:
        props[PROP_GRID_RESOLUTION] = str(h3_resolution)

    table = catalog.create_table(
        identifier=FULL_NAME,
        schema=ICEBERG_SCHEMA,
        partition_spec=PARTITION_SPEC,
        properties=props,
    )

    if warehouse_store is not None:
        all_paths = _list_warehouse_keys(warehouse_store, warehouse_root)
    else:
        # Fallback: local filesystem glob (legacy path)
        import re as _re

        all_paths = [
            str(f)
            for f in Path(warehouse_root).glob("**/*.parquet")
            if _re.search(
                r"grid_partition=[^/]+/year=[^/]+/[^/]+\.parquet$",
                str(f.relative_to(warehouse_root)),
            )
        ]

    if all_paths:
        batch_size = 2000
        for i in range(0, len(all_paths), batch_size):
            table.add_files(all_paths[i : i + batch_size])
        print(f"Registered {len(all_paths):,} files in Iceberg catalog.")

    if upload:
        upload_catalog(catalog_path)

    cleanup_staging(staging_store, staging_prefix)

register_delta(catalog_path, warehouse_root, new_parquet_paths, staging_store, staging_prefix, upload=True, h3_resolution=None, hash_index_path=None, update_hash_index=False)

Phase 4 delta: add new parquet files to existing Iceberg table (no drop).

Opens (or creates) the Iceberg table, calls table.add_files() with only the newly written parquets, then uploads the catalog. Existing warehouse files are never touched.

When update_hash_index is True, the warehouse hash index is updated by reading the id column from each newly written parquet file, hashing each ID, and merging into the existing index. This is Plan B: read from the actual warehouse files that were just registered, so the index exactly reflects what is in the catalog.

hash_index_path defaults to {warehouse_root}_id_hashes.parquet.

Source code in earthcatalog/pipelines/backfill.py
def register_delta(
    catalog_path: str,
    warehouse_root: str,
    new_parquet_paths: list[str],
    staging_store: object,
    staging_prefix: str,
    upload: bool = True,
    h3_resolution: int | None = None,
    hash_index_path: str | None = None,
    update_hash_index: bool = False,
) -> None:
    """
    Phase 4 delta: add new parquet files to existing Iceberg table (no drop).

    Opens (or creates) the Iceberg table, calls ``table.add_files()``
    with only the newly written parquets, then uploads the catalog.
    Existing warehouse files are never touched.

    When *update_hash_index* is True, the warehouse hash index is updated
    by reading the ``id`` column from each newly written parquet file,
    hashing each ID, and merging into the existing index.  This is Plan B:
    read from the actual warehouse files that were just registered, so the
    index exactly reflects what is in the catalog.

    ``hash_index_path`` defaults to ``{warehouse_root}_id_hashes.parquet``.
    """
    from pyiceberg.exceptions import (
        NamespaceAlreadyExistsError,
        NoSuchNamespaceError,
        NoSuchTableError,
    )

    from earthcatalog.catalog import FULL_NAME, ICEBERG_SCHEMA, NAMESPACE, PARTITION_SPEC

    if hash_index_path is None:
        hash_index_path = f"{warehouse_root.rstrip('/')}_id_hashes.parquet"

    catalog = _open_sqlite(db_path=catalog_path, warehouse_path=warehouse_root)

    try:
        catalog.create_namespace(NAMESPACE)
    except (NamespaceAlreadyExistsError, NoSuchNamespaceError):
        pass

    props: dict[str, str] = {PROP_GRID_TYPE: "h3"}
    if h3_resolution is not None:
        props[PROP_GRID_RESOLUTION] = str(h3_resolution)
    props[PROP_HASH_INDEX_PATH] = hash_index_path

    try:
        table = catalog.load_table(FULL_NAME)
        print(f"Opened existing Iceberg table {FULL_NAME}")
        missing = {k: v for k, v in props.items() if k not in table.properties}
        if missing:
            with table.transaction() as tx:
                tx.set_properties(**missing)
    except NoSuchTableError:
        table = catalog.create_table(
            identifier=FULL_NAME,
            schema=ICEBERG_SCHEMA,
            partition_spec=PARTITION_SPEC,
            properties=props,
        )
        print(f"Created new Iceberg table {FULL_NAME}")

    if new_parquet_paths:
        # Convert relative paths to full s3:// URIs for PyIceberg table.add_files().
        # Phase 3 compact returns relative keys like "grid_partition=.../part_N.parquet".
        # Local paths (absolute or in test scenarios) are left as-is.
        full_paths = []
        for p in new_parquet_paths:
            if p.startswith("s3://"):
                full_paths.append(p)  # already full S3 URI
            elif p.startswith("/") or (len(p) > 1 and p[1] == ":"):
                full_paths.append(p)  # local absolute path (Windows or Unix)
            else:
                # relative path — convert to full S3 URI
                full_paths.append(f"{warehouse_root.rstrip('/')}/{p}")
        new_parquet_paths = full_paths

        batch_size = 2000
        for i in range(0, len(new_parquet_paths), batch_size):
            table.add_files(new_parquet_paths[i : i + batch_size])
        print(f"Added {len(new_parquet_paths):,} new files to Iceberg catalog.")
    else:
        print("No new parquet files to register.")

    if update_hash_index and new_parquet_paths:
        _update_hash_index_from_parquets(
            new_parquet_paths=new_parquet_paths,
            hash_index_path=hash_index_path,
            warehouse_root=warehouse_root,
        )

    if upload:
        upload_catalog(catalog_path)

    cleanup_staging(staging_store, staging_prefix)

cleanup_staging(staging_store, staging_prefix)

Delete completion markers and pending files. Keeps chunks and NDJSON.

Source code in earthcatalog/pipelines/backfill.py
def cleanup_staging(staging_store: object, staging_prefix: str) -> int:
    """Delete completion markers and pending files. Keeps chunks and NDJSON."""
    deleted = 0
    for batch in obstore.list(staging_store, prefix=f"{staging_prefix}/_completed/"):
        for obj in batch:
            k: str = obj["path"]
            try:
                obstore.delete(staging_store, k)
                deleted += 1
            except Exception:
                pass
    for batch in obstore.list(staging_store, prefix=f"{staging_prefix}/pending_chunks/"):
        for obj in batch:
            k: str = obj["path"]
            try:
                obstore.delete(staging_store, k)
                deleted += 1
            except Exception:
                pass
    print(f"Cleanup: deleted {deleted:,} staging files")
    return deleted

run_backfill(inventory_path, catalog_path, staging_store, staging_prefix, warehouse_store, warehouse_root, partitioner=None, h3_resolution=None, chunk_size=100000, compact_rows=100000, fetch_concurrency=256, limit=None, since=None, use_lock=True, upload=True, skip_inventory=False, skip_ingest=False, retry_pending=False, delta=False, create_client=None, hash_index_path=None, update_hash_index=False)

Four-phase staging-based backfill pipeline.

Parameters

inventory_path: Local path or s3:// URI to inventory (CSV, Parquet, or manifest.json). catalog_path: Local path for SQLite catalog. staging_store: obstore-compatible store for staging (chunks + NDJSON). staging_prefix: Key prefix within staging_store (e.g. "ingest"). warehouse_store: obstore-compatible store for warehouse GeoParquet output. Unused for local filesystem (warehouse_root is used directly). warehouse_root: Path or s3:// URI for the warehouse root (used by add_files). partitioner: H3Partitioner or similar. When None, the partitioner is built from h3_resolution (see below). h3_resolution: H3 resolution for the default partitioner. When None the resolution is read from the existing Iceberg table's properties. If the table does not exist yet and no resolution is given, a ValueError is raised. chunk_size: Items per chunk Parquet in Phase 1. compact_rows: Max rows per output GeoParquet file in Phase 3. skip_ingest: If True, skip Phase 2 entirely and go straight to Phase 3 (Compact). Phase 3 scans S3 for existing NDJSON files. Useful when Phase 2 already completed but Phase 3 needs to be re-run (e.g. with bigger instances). retry_pending: If True, Phase 2 retries chunks that had fetch failures (stored in pending_chunks/). If False (default), pending chunks are logged but skipped — Phase 3 proceeds with whatever succeeded. delta: If True, run in delta mode: Phase 3 writes new parquets without overwriting existing ones, and Phase 4 adds files to the existing Iceberg table instead of dropping and recreating it. create_client: Optional callable that returns a Dask Client. Called lazily right before Phase 2 (after Phase 1 completes). Used for Coiled to avoid idle cluster timeout during long Phase 1 runs.

Source code in earthcatalog/pipelines/backfill.py
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
def run_backfill(
    inventory_path: str,
    catalog_path: str,
    staging_store: object,
    staging_prefix: str,
    warehouse_store: object,
    warehouse_root: str,
    partitioner: object | None = None,
    h3_resolution: int | None = None,
    chunk_size: int = 100_000,
    compact_rows: int = 100_000,
    fetch_concurrency: int = 256,
    limit: int | None = None,
    since: datetime | None = None,
    use_lock: bool = True,
    upload: bool = True,
    skip_inventory: bool = False,
    skip_ingest: bool = False,
    retry_pending: bool = False,
    delta: bool = False,
    create_client: Callable[[], object] | None = None,
    hash_index_path: str | None = None,
    update_hash_index: bool = False,
) -> None:
    """
    Four-phase staging-based backfill pipeline.

    Parameters
    ----------
    inventory_path:
        Local path or s3:// URI to inventory (CSV, Parquet, or manifest.json).
    catalog_path:
        Local path for SQLite catalog.
    staging_store:
        obstore-compatible store for staging (chunks + NDJSON).
    staging_prefix:
        Key prefix within staging_store (e.g. "ingest").
    warehouse_store:
        obstore-compatible store for warehouse GeoParquet output.
        Unused for local filesystem (warehouse_root is used directly).
    warehouse_root:
        Path or s3:// URI for the warehouse root (used by add_files).
    partitioner:
        H3Partitioner or similar.  When *None*, the partitioner is built from
        *h3_resolution* (see below).
    h3_resolution:
        H3 resolution for the default partitioner.  When *None* the resolution
        is read from the existing Iceberg table's properties.  If the table
        does not exist yet and no resolution is given, a ``ValueError`` is
        raised.
    chunk_size:
        Items per chunk Parquet in Phase 1.
    compact_rows:
        Max rows per output GeoParquet file in Phase 3.
    skip_ingest:
        If True, skip Phase 2 entirely and go straight to Phase 3 (Compact).
        Phase 3 scans S3 for existing NDJSON files. Useful when Phase 2 already
        completed but Phase 3 needs to be re-run (e.g. with bigger instances).
    retry_pending:
        If True, Phase 2 retries chunks that had fetch failures (stored in
        pending_chunks/). If False (default), pending chunks are logged but
        skipped — Phase 3 proceeds with whatever succeeded.
    delta:
        If True, run in delta mode: Phase 3 writes new parquets without
        overwriting existing ones, and Phase 4 adds files to the existing
        Iceberg table instead of dropping and recreating it.
    create_client:
        Optional callable that returns a Dask Client. Called lazily
        right before Phase 2 (after Phase 1 completes). Used for
        Coiled to avoid idle cluster timeout during long Phase 1 runs.
    """
    if partitioner is None:
        if h3_resolution is not None:
            partitioner = H3Partitioner(resolution=h3_resolution)
        else:
            from pyiceberg.exceptions import NoSuchTableError

            from earthcatalog.catalog import FULL_NAME

            catalog = _open_sqlite(db_path=catalog_path, warehouse_path=warehouse_root)
            try:
                table = catalog.load_table(FULL_NAME)
            except NoSuchTableError:
                raise ValueError(
                    "No existing table and no --h3-resolution given. "
                    "Specify --h3-resolution for the initial backfill."
                )

            raw = table.properties.get(PROP_GRID_RESOLUTION)
            if raw is not None:
                partitioner = H3Partitioner(resolution=int(raw))
                print(f"Auto-detected H3 resolution {int(raw)} from catalog")
            else:
                partitioner = H3Partitioner(resolution=1)
                print("No resolution property on table, defaulting to H3 resolution 1")

    resolved_resolution = (
        partitioner.resolution if hasattr(partitioner, "resolution") else h3_resolution
    )

    def _run() -> None:
        # Phase 1 — Scheduler
        print("\n" + "=" * 64)
        print("Phase 1 — Scheduler: writing chunk files")
        print("=" * 64)

        if skip_inventory:
            existing_ids = _list_existing_chunks(staging_store, f"{staging_prefix}/chunks")
            chunk_keys = [
                f"{staging_prefix}/chunks/chunk_{cid:06d}.parquet" for cid in sorted(existing_ids)
            ]
            print(f"Phase 1: --skip-inventory, using {len(chunk_keys)} existing chunks")
        else:
            chunk_keys = write_chunks(
                inventory_path=inventory_path,
                staging_store=staging_store,
                staging_prefix=f"{staging_prefix}/chunks",
                chunk_size=chunk_size,
                limit=limit,
                since=since,
            )

        # Create Dask client once — shared by Phase 2 and Phase 3
        client = _get_client(create_client)

        # Phase 2 — Ingest
        all_ndjson_keys: list[str] = []

        if skip_ingest:
            print("\n" + "=" * 64)
            print("Phase 2 — Ingest: SKIPPED (--skip-ingest)")
            print("=" * 64)
        else:
            print("\n" + "=" * 64)
            print("Phase 2 — Ingest: fetching items + writing NDJSON")
            print("=" * 64)

            pending_prefix = f"{staging_prefix}/pending_chunks"
            ndjson_prefix = f"{staging_prefix}/staging"

            completed_ids = _list_completed_chunk_ids(staging_store, ndjson_prefix, pending_prefix)
            if completed_ids:
                print(f"Phase 2: {len(completed_ids)} chunks already completed, skipping")

            pending_ids = _list_existing_chunks(staging_store, pending_prefix)
            if pending_ids and not retry_pending:
                print(
                    f"Phase 2: {len(pending_ids)} pending chunks recorded (use --retry-pending to retry)"
                )

            if retry_pending and pending_ids:
                pending_keys = [
                    f"{pending_prefix}/chunk_{cid:06d}.parquet" for cid in sorted(pending_ids)
                ]
                print(f"Phase 2: retrying {len(pending_ids)} pending chunks")
                chunk_keys = pending_keys

            if completed_ids:

                def _chunk_id_from_key(key: str) -> int:
                    fname = key.rsplit("/", 1)[-1]
                    return int(fname.removeprefix("chunk_").removesuffix(".parquet"))

                chunk_keys = [
                    ck for ck in chunk_keys if _chunk_id_from_key(ck) not in completed_ids
                ]
                print(f"Phase 2: {len(chunk_keys)} chunks remaining after skip")

            n_tasks = len(chunk_keys)

            if n_tasks > 0:
                if client is not None:
                    from dask.distributed import as_completed as distributed_ac

                    print(f"Submitting {n_tasks} ingest tasks via client.map …")
                    futures = client.map(
                        ingest_chunk,
                        chunk_keys,
                        staging_store=staging_store,
                        staging_prefix=f"{staging_prefix}/staging",
                        pending_prefix=pending_prefix,
                        partitioner=partitioner,
                        fetch_concurrency=fetch_concurrency,
                    )
                    with tqdm.tqdm(total=n_tasks, desc="Phase 2", unit="chunk") as pbar:
                        total_items = total_fetched = total_failures = total_groups = 0
                        for future in distributed_ac(futures):
                            r = future.result()
                            total_items += r["source_items"]
                            total_fetched += r["fetched_items"]
                            total_failures += r["fetch_failures"]
                            total_groups += r["groups_written"]
                            all_ndjson_keys.extend(r.get("ndjson_keys", []))
                            pbar.set_postfix(
                                fetched=f"{total_fetched:,}",
                                failed=total_failures,
                                groups=total_groups,
                                refresh=False,
                            )
                            pbar.update(1)
                else:
                    print(f"Processing {n_tasks} ingest tasks sequentially …")
                    with tqdm.tqdm(total=n_tasks, desc="Phase 2", unit="chunk") as pbar:
                        for ck in chunk_keys:
                            r = ingest_chunk(
                                chunk_key=ck,
                                staging_store=staging_store,
                                staging_prefix=f"{staging_prefix}/staging",
                                pending_prefix=pending_prefix,
                                partitioner=partitioner,
                                fetch_concurrency=fetch_concurrency,
                            )
                            all_ndjson_keys.extend(r.get("ndjson_keys", []))
                            pbar.update(1)

        # Phase 3 — Compact
        print("\n" + "=" * 64)
        print("Phase 3 — Compact: NDJSON → GeoParquet")
        print("=" * 64)

        ndjson_base = f"{staging_prefix}/staging"
        buckets: dict[tuple[str, str], None] = {}

        for nk in all_ndjson_keys:
            parts = nk.split("/")
            if len(parts) >= 3:
                buckets[(parts[-3], parts[-2])] = None

        if not buckets:
            for batch in obstore.list(staging_store, prefix=f"{ndjson_base}/"):
                for obj in batch:
                    path: str = obj["path"]
                    parts = path.split("/")
                    if len(parts) >= 3 and parts[-1].endswith(".ndjson"):
                        buckets[(parts[-3], parts[-2])] = None

        compact_results: list[dict] = []

        if not buckets:
            print("No NDJSON groups to compact.")
        else:
            compact_fn = compact_cell_year_delta if delta else compact_cell_year
            bucket_list = list(buckets.keys())
            common_kwargs: dict[str, Any] = {
                "staging_store": staging_store,
                "staging_prefix": f"{staging_prefix}/staging",
                "compact_rows": compact_rows,
                "warehouse_store": warehouse_store,
            }

            print(f"Submitting {len(bucket_list)} compact tasks …")

            if client is not None:
                from dask.distributed import as_completed as distributed_ac

                def _compact_task(cell_year):
                    cell, year = cell_year
                    return compact_fn(cell=cell, year=year, **common_kwargs)

                futures = client.map(_compact_task, bucket_list)
                with tqdm.tqdm(total=len(bucket_list), desc="Phase 3", unit="partition") as pbar:
                    for future in distributed_ac(futures):
                        r = future.result()
                        compact_results.append(r)
                        pbar.set_postfix(
                            rows=f"{r['output_rows']:,}",
                            refresh=False,
                        )
                        pbar.update(1)
            else:
                for cell, year in tqdm.tqdm(bucket_list, desc="Phase 3", unit="partition"):
                    kwargs = {
                        "cell": cell,
                        "year": year,
                        **common_kwargs,
                    }
                    compact_results.append(compact_fn(**kwargs))

            total_input = sum(r["input_items"] for r in compact_results)
            total_unique = sum(r["unique_items"] for r in compact_results)
            total_output = sum(r["output_rows"] for r in compact_results)
            total_files = sum(len(r["output_files"]) for r in compact_results)
            print(
                f"Compact done: {total_input:,} items → {total_unique:,} unique "
                f"→ {total_output:,} rows in {total_files:,} files"
            )

        # Phase 4 — Register + cleanup
        print("\n" + "=" * 64)
        if delta:
            print("Phase 4 — Delta Register: add files to existing catalog")
        else:
            print("Phase 4 — Register: rebuild catalog + cleanup")
        print("=" * 64)

        if delta:
            new_paths = []
            for r in compact_results:
                new_paths.extend(r.get("output_files", []))
            register_delta(
                catalog_path=catalog_path,
                warehouse_root=warehouse_root,
                new_parquet_paths=new_paths,
                staging_store=staging_store,
                staging_prefix=staging_prefix,
                h3_resolution=resolved_resolution,
                upload=upload,
                hash_index_path=hash_index_path,
                update_hash_index=update_hash_index,
            )
        else:
            register_and_cleanup(
                catalog_path=catalog_path,
                warehouse_root=warehouse_root,
                staging_store=staging_store,
                staging_prefix=staging_prefix,
                warehouse_store=warehouse_store,
                h3_resolution=resolved_resolution,
                upload=upload,
                hash_index_path=hash_index_path,
            )

    if use_lock:
        from earthcatalog.lock import S3Lock

        with S3Lock(owner="backfill-v2"):
            _run()
    else:
        _run()