class EarthCatalog:
"""Simplified facade for querying an EarthCatalog.
Combines PyIceberg catalog, table, and CatalogInfo into a single interface
for spatial/temporal queries with automatic file pruning.
Example::
from earthcatalog import open as ec_open
from obstore.store import S3Store
from shapely.geometry import Point
store = S3Store(bucket='my-bucket', region='us-west-2')
ec = ec_open(store=store, base='s3://my-bucket/catalog')
point = Point(-133.99, 58.74)
paths = ec.search_files(point, start_datetime='2020-01-01')
"""
def __init__(
self,
catalog: object,
table: Table,
info: CatalogInfo,
store: object | None = None,
*,
catalog_key: str | None = None,
):
"""Initialize an EarthCatalog facade.
Args:
catalog: PyIceberg SqlCatalog instance
table: PyIceberg Table instance
info: CatalogInfo with grid metadata
store: obstore Store instance (for reading hash index from S3)
catalog_key: Key within *store* where catalog.db is persisted.
Required for ingest() which needs to upload changes.
"""
self._catalog = catalog
self._table = table
self._info = info
self._store = store
self._catalog_key = catalog_key
def search_files(
self,
geom,
start_datetime: str | datetime | None = None,
end_datetime: str | datetime | None = None,
) -> list[str]:
"""Return Parquet file paths for partitions intersecting *geom*."""
return self._info.file_paths(
self._table,
geom,
start_datetime=start_datetime,
end_datetime=end_datetime,
)
def search(self, **kwargs):
"""Search across the catalog, returning a deferred ``EarthCatalogItemSearch``.
Accepts the same kwargs as :func:`rustac.search`:
``intersects``, ``bbox``, ``datetime``, ``filter`` (CQL2 JSON),
``ids``, ``collections``, ``max_items``, ``limit``, ``sortby``,
``include``, ``exclude``, ``query``, etc.
Use the top-level ``datetime`` kwarg for temporal filtering. Do
**not** reference ``datetime`` inside the CQL2 ``filter`` —
rustac generates broken SQL when ``datetime`` appears in a CQL2
expression.
Performance
-----------
For fastest results use :meth:`duck_search` with ``format="native"``
(DuckDB parallel I/O, ~2× faster across all query types).
``search()`` and ``search_to_arrow()`` use rustac (sequential per-file)
and have comparable speed. See :doc:`/operations/search_performance`
for detailed benchmarks.
Returns
-------
EarthCatalogItemSearch
A lazy, pystac_client-compatible search result. No I/O until
``items()``, ``item_collection()``, or ``pages()`` is called.
"""
from .search import EarthCatalogItemSearch, _FileSearchEngine
engine = _FileSearchEngine(prune_fn=self._search_prune)
return EarthCatalogItemSearch(
params=kwargs,
engine=engine,
table=self._table,
anonymous_ctx=self._cleared_env_s3,
)
def search_to_arrow(self, **kwargs):
"""Search across the catalog, returning a PyArrow table."""
from .search import _FileSearchEngine
engine = _FileSearchEngine(prune_fn=self._search_prune)
with self._cleared_env_s3():
return engine.search_to_arrow(**kwargs)
def search_uris(self, **kwargs):
"""Return asset URIs as a DataFrame with ``(id, uri)`` columns.
Accepts the same kwargs as :meth:`search` (``intersects``, ``bbox``,
``datetime``, ``filter``, ``max_items``, etc.).
Uses ``search_files()`` + DuckDB internally, reading **only** the
``id`` and ``assets`` columns from S3 — fastest way to get download
URLs for thousands of items. Returns a ``pandas.DataFrame``.
Examples::
import cql2
df = catalog.search_uris(
intersects={"type": "Point", "coordinates": [-45, 70]},
datetime="2020-01-01/2020-12-31",
filter=cql2.parse_text("percent_valid_pixels >= 80").to_json(),
max_items=100,
)
# df has columns: id, uri
for _, row in df.iterrows():
print(row.id, row.uri)
"""
import json
import duckdb
from shapely.geometry import shape
from .search import _extract_datetime_range
# --- geometry ---
geom = None
if "intersects" in kwargs:
geom = shape(kwargs["intersects"])
elif "bbox" in kwargs:
from shapely.geometry import box
b = kwargs["bbox"]
geom = box(b[0], b[1], b[2], b[3])
# --- Iceberg pruning ---
start_dt, end_dt = _extract_datetime_range(**kwargs)
paths = self._info.file_paths(
self._table,
geom,
start_datetime=start_dt,
end_datetime=end_dt,
)
if not paths:
import pandas as pd
return pd.DataFrame({"id": [], "uri": []})
# --- build SQL (read only id + assets) ---
path_list = ", ".join(repr(p) for p in paths)
conditions: list[str] = []
if geom is not None:
conditions.append(f"ST_Intersects(geometry, ST_GeomFromText('{geom.wkt}'))")
if start_dt is not None:
conditions.append(f"datetime >= '{start_dt}'")
if end_dt is not None:
conditions.append(f"datetime <= '{end_dt}'")
raw_filter = kwargs.get("filter")
if raw_filter is not None:
from .search import _cql2_to_sql
conditions.append(_cql2_to_sql(raw_filter))
where = " AND ".join(conditions) if conditions else "TRUE"
max_items = kwargs.get("max_items")
sql = f"""SELECT id, assets FROM read_parquet([{path_list}]) WHERE {where}"""
# --- execute (Arrow → list is faster than pandas iterrows) ---
con = duckdb.connect()
con.execute("INSTALL spatial; LOAD spatial;")
con.execute("SET s3_access_key_id='';")
con.execute("SET s3_secret_access_key='';")
con.execute("SET s3_session_token='';")
table = con.execute(sql).to_arrow_table()
if max_items is not None and table.num_rows > max_items:
table = table.slice(0, max_items)
# --- extract data URIs from JSON assets ---
ids = table.column("id").to_pylist()
assets_list = table.column("assets").to_pylist()
uris = []
for a in assets_list:
href = None
if a:
try:
href = json.loads(a).get("data", {}).get("href")
except (json.JSONDecodeError, AttributeError):
pass
uris.append(href)
import pandas as pd
return pd.DataFrame({"id": ids, "uri": uris})
def duck_search(self, **kwargs):
"""Search using DuckDB, returning results as a ``pandas.DataFrame``.
Accepts the same kwargs as :meth:`search` (``intersects``, ``bbox``,
``datetime``, ``filter``, ``max_items``, etc.).
DuckDB reads Parquet files in parallel internally, making this
**~2× faster** than :meth:`search` across all query types.
Returns a DataFrame with flat columns — no pystac conversion
overhead. For pystac Items use :meth:`search` (lazy iteration).
Examples::
df = catalog.duck_search(
intersects={"type": "Point", "coordinates": [-45, 70]},
datetime="1980-01-01/2015-12-31",
max_items=100,
)
# df is a pandas.DataFrame
print(df.columns.tolist())
"""
import duckdb
from shapely.geometry import shape
from .search import _cql2_to_sql, _extract_datetime_range
geom = None
if "intersects" in kwargs:
geom = shape(kwargs["intersects"])
elif "bbox" in kwargs:
from shapely.geometry import box
b = kwargs["bbox"]
geom = box(b[0], b[1], b[2], b[3])
start_dt, end_dt = _extract_datetime_range(**kwargs)
paths = self._info.file_paths(
self._table, geom, start_datetime=start_dt, end_datetime=end_dt
)
if not paths:
import pandas as pd
return pd.DataFrame()
path_list = ", ".join(repr(p) for p in paths)
conditions: list[str] = []
if geom is not None:
conditions.append(f"ST_Intersects(geometry, ST_GeomFromText('{geom.wkt}'))")
if start_dt is not None:
conditions.append(f"datetime >= '{start_dt}'")
if end_dt is not None:
conditions.append(f"datetime <= '{end_dt}'")
raw_filter = kwargs.get("filter")
if raw_filter is not None:
conditions.append(_cql2_to_sql(raw_filter))
where = " AND ".join(conditions) if conditions else "TRUE"
max_items = kwargs.get("max_items")
# LIMIT omitted — triggers 7× slower plan for multi-file reads
sql = f"SELECT * FROM read_parquet([{path_list}]) WHERE {where}"
con = duckdb.connect()
con.execute("INSTALL spatial; LOAD spatial;")
con.execute("SET s3_access_key_id='';")
con.execute("SET s3_secret_access_key='';")
con.execute("SET s3_session_token='';")
df = con.execute(sql).fetchdf()
if max_items is not None and len(df) > max_items:
df = df.head(max_items)
return df
def _search_prune(self, geom, start_datetime=None, end_datetime=None):
"""Prune warehouse files via Iceberg partition metadata (zero I/O)."""
return self._info.file_paths(
self._table, geom, start_datetime=start_datetime, end_datetime=end_datetime
)
def _cleared_env_s3(self):
"""Context manager: clear AWS cred env vars so rustac/DuckDB use unsigned requests.
rustac and DuckDB read ``AWS_ACCESS_KEY_ID`` / ``AWS_SECRET_ACCESS_KEY`` from the
environment rather than using the obstore store's auth. When the store was created
as anonymous (``skip_signature``) or the environment has no credentials, this
context manager temporarily removes them and sets ``AWS_NO_SIGN_REQUEST=yes``.
"""
import os
from contextlib import contextmanager
anonymous = not os.environ.get("AWS_ACCESS_KEY_ID")
if not anonymous and self._store is not None and hasattr(self._store, "config"):
anonymous = self._store.config.get("skip_signature") in (True, "true")
@contextmanager
def _ctx():
if not anonymous:
yield
return
saved = {
"AWS_ACCESS_KEY_ID": os.environ.pop("AWS_ACCESS_KEY_ID", None),
"AWS_SECRET_ACCESS_KEY": os.environ.pop("AWS_SECRET_ACCESS_KEY", None),
"AWS_SESSION_TOKEN": os.environ.pop("AWS_SESSION_TOKEN", None),
}
os.environ["AWS_NO_SIGN_REQUEST"] = "yes"
try:
yield
finally:
os.environ.pop("AWS_NO_SIGN_REQUEST", None)
for k, v in saved.items():
if v is not None:
os.environ[k] = v
return _ctx()
def stats(self) -> list[dict]:
"""Return per-partition row counts and file sizes from Iceberg metadata."""
return self._info.stats(self._table)
def unique_item_count(self) -> int:
"""Return the count of unique STAC items from the hash index."""
default_hash_index_path = None
if self._catalog is not None:
warehouse = self._catalog.properties.get("warehouse", "")
if warehouse:
default_hash_index_path = warehouse.rstrip("/") + "_id_hashes.parquet"
return self._info.unique_item_count(self._table, self._store, default_hash_index_path)
def info(self) -> CatalogInfo:
"""Return the grid metadata and catalog statistics object."""
return self._info
def ingest(
self,
inventory_path: str,
*,
mode: str = "auto",
chunk_size: int = 10000,
limit: int | None = None,
since: datetime | None = None,
update_hash_index: bool = False,
) -> dict:
"""Ingest STAC items from an S3 Inventory into the catalog.
Unified entry point replacing both ``backfill.run_backfill`` and
``incremental.run``. Handles full backfill (drop+recreate table)
and delta append (add files to existing table).
The caller is responsible for holding an S3Lock around this call
when running against a shared store (use ``self.lock()``).
"""
import os
import uuid
from concurrent.futures import ThreadPoolExecutor
from earthcatalog.grids import build_partitioner
from earthcatalog.pipelines.incremental import _fetch_item, _iter_inventory
from .hash_index import (
merge_hashes_from_parquets,
read_hashes,
write_hashes,
)
from .transform import (
fan_out,
group_by_partition,
write_geoparquet_s3,
)
if not os.environ.get("AWS_ACCESS_KEY_ID"):
raise RuntimeError(
"No AWS credentials found in environment. "
"ingest() requires write access to S3. "
"Set AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY or use an IAM role."
)
if mode == "auto":
try:
n = sum(s["row_count"] for s in self._info.stats(self._table))
mode = "delta" if n > 0 else "full"
except Exception:
mode = "full"
is_delta = mode == "delta"
from earthcatalog.config import GridConfig
grid_cfg = GridConfig(
type=self._info.grid_type,
resolution=self._info.grid_resolution,
boundaries_path=self._info.boundaries_path,
id_field=self._info.id_field,
)
partitioner = build_partitioner(grid_cfg)
warehouse_root = self._catalog.properties.get("warehouse", "")
uri = self._catalog.properties.get("uri", "")
local_db = uri.removeprefix("sqlite:///") if uri else "/tmp/earthcatalog.db"
if self._store and self._catalog_key:
self.download_catalog(local_db)
if not is_delta:
from pyiceberg.exceptions import NoSuchTableError
try:
self._catalog.drop_table(FULL_NAME)
except NoSuchTableError:
pass
try:
self._catalog.create_namespace(NAMESPACE)
except Exception:
pass
self._table = get_or_create(self._catalog, grid_config=grid_cfg)
total_items = 0
total_rows = 0
written_keys: list[str] = []
batch: list[tuple[str, str]] = []
def _flush(chunk: list[tuple[str, str]]) -> None:
nonlocal total_rows
with ThreadPoolExecutor(max_workers=16) as pool:
items = list(filter(None, pool.map(lambda bc: _fetch_item(*bc), chunk)))
if not items:
return
fo = fan_out(items, partitioner)
if not fo:
return
for (cell, year), group_items in group_by_partition(fo).items():
year_str = str(year) if year is not None else "unknown"
part_tag = uuid.uuid4().hex[:8]
s3_key = f"grid_partition={cell}/year={year_str}/part_{part_tag}.parquet"
n, _ = write_geoparquet_s3(group_items, self._store, s3_key)
if n > 0:
written_keys.append(s3_key)
total_rows += n
print(f"Ingesting from: {inventory_path}")
for bucket, key in _iter_inventory(inventory_path, since=since):
if not key.endswith(".stac.json"):
continue
batch.append((bucket, key))
total_items += 1
if len(batch) >= chunk_size:
_flush(batch)
batch.clear()
if limit and total_items >= limit:
break
if batch:
_flush(batch)
if written_keys:
full_paths = [f"{warehouse_root.rstrip('/')}/{k}" for k in written_keys]
batch_sz = 2000
for i in range(0, len(full_paths), batch_sz):
self._table.add_files(full_paths[i : i + batch_sz])
print(f"Registered {len(full_paths)} files in Iceberg catalog.")
if update_hash_index and written_keys:
hash_index_path = self._table.properties.get("earthcatalog.hash_index_path")
if not hash_index_path:
hash_index_path = f"{warehouse_root.rstrip('/')}_id_hashes.parquet"
with self._table.transaction() as tx:
tx.set_properties(**{"earthcatalog.hash_index_path": hash_index_path})
if hash_index_path.startswith("s3://"):
import re as _re
m = _re.match(r"s3://([^/]+)/(.+)", hash_index_path)
if m:
hash_key = m.group(2)
existing = read_hashes(self._store, hash_key)
print(f" Existing hashes: {len(existing):,}")
updated, n_new = merge_hashes_from_parquets(
full_paths, existing, store=self._store
)
print(f" New hashes: {n_new:,} from {len(full_paths)} files")
write_hashes(updated, self._store, hash_key)
else:
print("WARN: hash index update skipped — only s3:// paths supported")
if self._store and self._catalog_key:
self.upload_catalog(local_db)
result = {
"items_processed": total_items,
"rows_written": total_rows,
"files_registered": len(written_keys),
}
print(f"Done. {total_items} items -> {total_rows} rows in {len(written_keys)} files")
return result
def bulk_ingest(
self,
inventory_path: str,
*,
mode: str = "auto",
chunk_size: int = 100_000,
compact_rows: int = 100_000,
limit: int | None = None,
since: datetime | None = None,
update_hash_index: bool = False,
staging_prefix: str | None = None,
create_client: Callable[[], object] | None = None,
skip_inventory: bool = False,
skip_ingest: bool = False,
retry_pending: bool = False,
) -> None:
"""Ingest large inventories using a distributed Dask cluster."""
import os
from datetime import UTC
from datetime import datetime as _dt
from earthcatalog.config import GridConfig
from earthcatalog.grids import build_partitioner
from earthcatalog.pipelines.backfill import run_backfill
if not os.environ.get("AWS_ACCESS_KEY_ID"):
raise RuntimeError(
"No AWS credentials found in environment. "
"bulk_ingest() requires write access to S3. "
"Set AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY or use an IAM role."
)
warehouse_root = self._catalog.properties.get("warehouse", "")
uri = self._catalog.properties.get("uri", "")
local_db = uri.removeprefix("sqlite:///")
grid_cfg = GridConfig(
type=self._info.grid_type,
resolution=self._info.grid_resolution,
boundaries_path=self._info.boundaries_path,
id_field=self._info.id_field,
)
partitioner = build_partitioner(grid_cfg)
if staging_prefix is None:
date_str = _dt.now(UTC).strftime("%Y%m%d")
staging_prefix = f"bulk_ingest/{date_str}"
delta = True
if mode == "full":
delta = False
elif mode == "auto":
try:
n = sum(s["row_count"] for s in self._info.stats(self._table))
delta = n > 0
except Exception:
delta = False
if self._store and self._catalog_key:
self.download_catalog(local_db)
from . import store_config
old_store = store_config.get_store()
old_key = store_config.get_catalog_key()
try:
store_config.set_store(self._store)
if self._catalog_key:
store_config.set_catalog_key(self._catalog_key)
run_backfill(
inventory_path=inventory_path,
catalog_path=local_db,
staging_store=self._store,
staging_prefix=staging_prefix,
warehouse_store=self._store,
warehouse_root=warehouse_root,
partitioner=partitioner,
chunk_size=chunk_size,
compact_rows=compact_rows,
limit=limit,
since=since,
use_lock=False,
upload=True,
skip_inventory=skip_inventory,
skip_ingest=skip_ingest,
retry_pending=retry_pending,
delta=delta,
create_client=create_client,
update_hash_index=update_hash_index,
hash_index_path=self._table.properties.get("earthcatalog.hash_index_path"),
)
finally:
store_config.set_store(old_store)
store_config.set_catalog_key(old_key)
def download_catalog(self, local_path: str) -> None:
"""Download catalog.db from the backing store to *local_path*."""
download_catalog(local_path, store=self._store)
def upload_catalog(self, local_path: str) -> None:
"""Upload catalog.db from *local_path* to the backing store."""
upload_catalog(local_path, store=self._store)
def compact(
self,
threshold: int = 2,
dry_run: bool = False,
) -> dict[str, int]:
"""Compact over-threshold partition buckets and rebuild the Iceberg catalog.
Wraps :func:`earthcatalog.maintenance.compact.compact_warehouse` using this
catalog's warehouse path and local catalog database.
Parameters
----------
threshold:
Minimum number of part files in a bucket before it is compacted.
Default: 2 (compact any bucket with more than one part file).
dry_run:
When ``True``, report what *would* be compacted but make no changes.
Returns
-------
Summary dict with keys ``buckets_scanned``, ``buckets_compacted``,
``files_before``, ``files_after``.
"""
from earthcatalog.maintenance.compact import compact_warehouse
warehouse_path = self._catalog.properties.get("warehouse", "")
uri = self._catalog.properties.get("uri", "")
local_db = uri.removeprefix("sqlite:///")
return compact_warehouse(
warehouse_path=warehouse_path,
catalog_path=local_db,
threshold=threshold,
dry_run=dry_run,
)
def lock(self, owner: str, ttl_hours: int = 12):
"""Return an S3Lock that uses this EarthCatalog's store and key."""
from .lock import S3Lock
lock_key = getattr(self._catalog, "_lock_key", None) or ".lock"
return S3Lock(owner=owner, ttl_hours=ttl_hours, store=self._store, key=lock_key)
def cells_for_geometry(self, geom) -> list[str]:
"""Return the partition keys that intersect *geom*."""
return self._info.cells_for_geometry(geom)
def cell_list_sql(self, geom) -> str:
"""Return a SQL fragment suitable for ``WHERE grid_partition IN (...)``."""
return self._info.cell_list_sql(geom)
@property
def grid_type(self) -> str:
"""Return the grid partitioning system type."""
return self._info.grid_type
@property
def grid_resolution(self) -> int | None:
"""Return the H3/S2 resolution (None for GeoJSON grids)."""
return self._info.grid_resolution
@property
def table(self):
"""Return the underlying PyIceberg Table (for advanced use)."""
return self._table
def _repr_html_(self) -> str:
"""Return an HTML representation for Jupyter notebooks.
Single-column layout with metadata table and collapsible top partitions.
Reads only Iceberg manifests — no Parquet data is scanned.
"""
rows = [("Grid type", self._info.grid_type)]
if self._info.grid_type == "h3":
rows.append(("H3 resolution", str(self._info.grid_resolution)))
else:
rows.append(("Boundaries", self._info.boundaries_path or "N/A"))
warehouse_path = self._catalog.properties.get("warehouse", "") if self._catalog else ""
if warehouse_path:
rows.append(("Warehouse", warehouse_path))
hash_idx = self._table.properties.get("earthcatalog.hash_index_path")
rows.append(("Hash index", "Available" if hash_idx else "Not available"))
table_html = "<table style='border-collapse: collapse; width: 100%; margin: 0;'>"
for label, value in rows:
table_html += f"""
<tr style='border-bottom: 1px solid currentColor;'>
<td style='padding: 6px 10px; border: none; width: 180px;'>{label}</td>
<td style='padding: 6px 10px; border: none;'><strong>{value}</strong></td>
</tr>"""
table_html += "</table>"
stats = self._info.stats(self._table)
bottom_html = ""
if stats:
total_files = self._info.total_files(self._table)
total_rows = sum(s["row_count"] for s in stats)
warehouse = self._catalog.properties.get("warehouse", "") if self._catalog else ""
default_hi = warehouse.rstrip("/") + "_id_hashes.parquet" if warehouse else None
unique = self._info.unique_item_count(self._table, self._store, default_hi)
stat_rows = [
("Total files", f"{total_files:,}"),
("Total rows", f"{total_rows:,}"),
("Unique items", f"{unique:,}"),
("Partitions", f"{len(stats):,}"),
]
stats_table = "<table style='border-collapse: collapse; width: 100%; font-size: 13px; margin: 0;'>"
for label, value in stat_rows:
stats_table += f"""
<tr style='border-bottom: 1px solid currentColor;'>
<td style='padding: 4px 6px; border: none; width: 180px;'>{label}</td>
<td style='padding: 4px 6px; border: none;'><strong>{value}</strong></td>
</tr>"""
stats_table += "</table>"
top_cells = self._info.top_cells(self._table, limit=3)
top_html = ""
if top_cells:
top_rows = ""
for cell in top_cells:
top_rows += f"""
<tr style='border-bottom: 1px solid currentColor;'>
<td style='padding: 4px 6px; border: none; width: 180px; font-family: monospace;'>{cell["grid_partition"][:12]}...</td>
<td style='padding: 4px 6px; border: none;'>{cell["row_count"]:,} rows</td>
</tr>"""
top_html = f"""
<details style='margin-top: 12px;'>
<summary style='font-weight: 600; cursor: pointer;'>Top partitions</summary>
<table style='border-collapse: collapse; width: 100%; font-size: 13px; margin: 8px 0 0 0;'>{top_rows}</table>
</details>"""
bottom_html = f"""
<div style='font-weight: 600; margin-top: 12px;'>Statistics</div>
{stats_table}
{top_html}"""
return f"""
<div style='border: 1px solid currentColor; padding: 15px; border-radius: 5px; font-family: var(--jp-code-font-family, monospace); opacity: 0.9; text-align: left; max-width: 800px;'>
<div style='font-size: 16px; font-weight: 600; margin-bottom: 12px; display: flex; align-items: center; gap: 8px;'>
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 24 24" width="20" height="20" fill="none" stroke="currentColor" stroke-width="1.5" stroke-linecap="round" stroke-linejoin="round">
<circle cx="12" cy="12" r="10"/>
<path d="M2 12h20M12 2c-3.3 2-5.5 5.5-5.5 10s2.2 8 5.5 10c3.3-2 5.5-5.5 5.5-10S15.3 4 12 2z"/>
</svg>
<span>EarthCatalog</span>
</div>
{table_html}
{bottom_html}
</div>"""
def __repr__(self) -> str:
if self._info.grid_type == "h3":
return f"EarthCatalog(grid_type='h3', resolution={self._info.grid_resolution})"
return (
f"EarthCatalog(grid_type='geojson', "
f"boundaries_path={self._info.boundaries_path!r}, "
f"id_field={self._info.id_field!r})"
)