Skip to content

Maintenance API

earthcatalog.maintenance.compact

Standalone compaction for the earthcatalog warehouse.

Purpose

The incremental pipeline (earthcatalog.pipelines.incremental.run) writes one new part file per (cell, year) bucket per run. After N incremental runs a bucket can have N part files, each potentially containing duplicate id rows if the same STAC item appeared in more than one inventory delta.

This tool:

  1. Scans the warehouse directory for all Parquet files, grouped by (grid_partition, year) bucket.
  2. For every bucket with ≥ threshold part files: merges them into one consolidated, deduplicated, sorted file (reusing :func:~earthcatalog.maintenance.compact._compact_group_impl).
  3. Rebuilds the Iceberg catalog from all files currently in the warehouse (a "repair table" operation) so stale and new paths are resolved in one clean snapshot.

since= / delta ingestion note

For now the incremental pipeline scans the full S3 Inventory file and filters rows client-side by last_modified_date >= since. This is correct and memory-efficient (streaming by row-group batch) but the I/O cost scales with the total inventory size, not the delta size.

When you set up an Athena cron job to emit a pre-filtered delta Parquet, you simply pass that smaller file as --inventory and omit --since; no pipeline changes are needed.

Usage

As a module::

from earthcatalog.maintenance.compact import compact_warehouse
compact_warehouse(
    warehouse_path="/tmp/earthcatalog_warehouse",
    catalog_path="/tmp/earthcatalog.db",
    threshold=2,
)

As a CLI::

python -m earthcatalog.maintenance.compact \
    --warehouse /tmp/earthcatalog_warehouse \
    --catalog   /tmp/earthcatalog.db \
    [--threshold 2]

# With S3 lock (recommended for production):
python -m earthcatalog.maintenance.compact \
    --warehouse s3://my-bucket/warehouse \
    --catalog   /tmp/earthcatalog.db \
    --use-lock

Classes

Functions

compact_warehouse(warehouse_path, catalog_path, threshold=2, use_lock=False, dry_run=False)

Compact all over-threshold buckets in warehouse_path and rebuild the Iceberg catalog.

Parameters

warehouse_path: Local path to the warehouse root (e.g. /tmp/earthcatalog_warehouse). Must be a local directory — S3 warehouse support is planned. catalog_path: Local path to the SQLite catalog file (e.g. /tmp/earthcatalog.db). threshold: Minimum number of part files in a bucket before it is compacted. Default: 2 (compact any bucket with more than one part file). use_lock: When True, wrap the entire operation in an :class:~earthcatalog.lock.S3Lock. Requires the lock store to be configured in :mod:earthcatalog.store_config. dry_run: When True, report what would be compacted but make no changes.

Returns

A summary dict::

{
    "buckets_scanned": int,
    "buckets_compacted": int,
    "files_before": int,
    "files_after": int,
}
Source code in earthcatalog/maintenance/compact.py
def compact_warehouse(
    warehouse_path: str,
    catalog_path: str,
    threshold: int = 2,
    use_lock: bool = False,
    dry_run: bool = False,
) -> dict[str, int]:
    """
    Compact all over-threshold buckets in *warehouse_path* and rebuild the
    Iceberg catalog.

    Parameters
    ----------
    warehouse_path:
        Local path to the warehouse root (e.g. ``/tmp/earthcatalog_warehouse``).
        Must be a local directory — S3 warehouse support is planned.
    catalog_path:
        Local path to the SQLite catalog file (e.g. ``/tmp/earthcatalog.db``).
    threshold:
        Minimum number of part files in a bucket before it is compacted.
        Default: 2 (compact any bucket with more than one part file).
    use_lock:
        When ``True``, wrap the entire operation in an
        :class:`~earthcatalog.lock.S3Lock`.  Requires the lock store to
        be configured in :mod:`earthcatalog.store_config`.
    dry_run:
        When ``True``, report what *would* be compacted but make no changes.

    Returns
    -------
    A summary dict::

        {
            "buckets_scanned": int,
            "buckets_compacted": int,
            "files_before": int,
            "files_after": int,
        }
    """

    def _run() -> dict[str, int]:
        return _compact_warehouse_impl(
            warehouse_path=warehouse_path,
            catalog_path=catalog_path,
            threshold=threshold,
            dry_run=dry_run,
        )

    if use_lock:
        from earthcatalog.lock import S3Lock

        with S3Lock(owner="compact"):
            return _run()
    else:
        return _run()