Skip to content

StorageRegistry

The storage registry exposes the four production backends (CSV, JSON, Parquet, DuckDB) plus the auto-detect open(uri) convenience method. The backend is picked from the file extension; the SDK round-trips CanonicalDataset byte-for-byte across all four.

API reference

The full reference is generated from the SDK's docstrings via mkdocstrings.

StorageRegistry

Registry mapping StorageBackendStorage.

The SDK ships with five built-in registrations:

  • LOCAL_FILESLocalFilesStorage
  • JSONJSONStorage
  • CSVCSVStorage
  • PARQUETParquetStorage
  • DUCKDBDuckDBStorage

All five are placeholders that raise NotImplementedError. Consumers can:

  • register(backend, storage) to install a concrete storage over a placeholder.
  • get(backend) to look up the registered storage.

The registry is a plain object; callers can hold their own registry and pass it to StorageStage.

Source code in un_comtrade/storage/_base.py
class StorageRegistry:
    """Registry mapping `StorageBackend` → `Storage`.

    The SDK ships with five built-in registrations:

    - `LOCAL_FILES` → `LocalFilesStorage`
    - `JSON` → `JSONStorage`
    - `CSV` → `CSVStorage`
    - `PARQUET` → `ParquetStorage`
    - `DUCKDB` → `DuckDBStorage`

    All five are placeholders that raise
    `NotImplementedError`. Consumers can:

    - `register(backend, storage)` to install a
      concrete storage over a placeholder.
    - `get(backend)` to look up the registered
      storage.

    The registry is a plain object; callers can hold
    their own registry and pass it to
    `StorageStage`.
    """

    def __init__(
        self,
        *,
        storages: Mapping[StorageBackend, "Storage"] | None = None,
    ) -> None:
        self._storages: dict[StorageBackend, Storage] = {}
        self._register_defaults()
        if storages:
            for backend, storage in storages.items():
                self.register(backend, storage)

    def _register_defaults(self) -> None:
        """Register the SDK's built-in storages.

        The PARQUET placeholder is replaced by the
        concrete `ParquetWriter` (from
        `un_comtrade.storage.parquet`) when pyarrow
        is importable. The DUCKDB placeholder is
        replaced by the concrete `DuckDBWriter`
        when duckdb is importable. The JSON and CSV
        placeholders are replaced by the concrete
        `JSONWriter` / `CSVWriter` (from
        `un_comtrade.storage.file`). LocalFiles
        remains a placeholder until its engine lands.
        """
        self._storages[StorageBackend.LOCAL_FILES] = LocalFilesStorage()
        self._storages[StorageBackend.JSON] = JSONStorage()
        self._storages[StorageBackend.CSV] = CSVStorage()
        self._storages[StorageBackend.PARQUET] = ParquetStorage()
        self._storages[StorageBackend.DUCKDB] = DuckDBStorage()
        # Auto-promote placeholders when concrete
        # engines are importable. The imports are
        # wrapped in try/except so the storage
        # framework remains importable without
        # optional dependencies.
        try:
            from . import parquet as _parquet  # type: ignore[import-not-found]

            self._storages[StorageBackend.PARQUET] = (
                _parquet.ParquetWriter()
            )
        except ImportError:
            pass
        except Exception:  # pragma: no cover - defensive
            pass
        try:
            from . import duckdb as _duckdb  # type: ignore[import-not-found]

            self._storages[StorageBackend.DUCKDB] = (
                _duckdb.DuckDBWriter()
            )
        except ImportError:
            pass
        except Exception:  # pragma: no cover - defensive
            pass
        try:
            from . import file as _file  # type: ignore[import-not-found]

            self._storages[StorageBackend.JSON] = (
                _file.JSONWriter()
            )
            self._storages[StorageBackend.CSV] = (
                _file.CSVWriter()
            )
        except ImportError:
            pass
        except Exception:  # pragma: no cover - defensive
            pass

    def register(
        self,
        backend: StorageBackend,
        storage: "Storage",
    ) -> None:
        """Register (or replace) the storage for
        `backend`."""
        if not isinstance(backend, StorageBackend):
            raise TypeError(
                f"backend must be StorageBackend; got "
                f"{type(backend).__name__}"
            )
        if not hasattr(storage, "backend") or not callable(
            getattr(storage, "store", None)
        ):
            raise TypeError(
                f"storage must have a 'backend' attribute and a "
                f"'store' callable; got {type(storage).__name__}"
            )
        self._storages[backend] = storage

    def get(self, backend: StorageBackend) -> "Storage":
        """Return the registered storage for `backend`.

        Raises `StorageError` if no storage is
        registered for the requested backend.
        """
        try:
            return self._storages[backend]
        except KeyError as exc:
            raise StorageError(
                f"No storage registered for backend {backend.value!r}"
            ) from exc

    def supported_backends(self) -> tuple[StorageBackend, ...]:
        """Return the backends this registry knows about."""
        return tuple(self._storages.keys())

    def unregister(self, backend: StorageBackend) -> None:
        """Remove the storage for `backend`.

        Raises `StorageError` if no storage is
        registered.
        """
        if backend not in self._storages:
            raise StorageError(
                f"No storage registered for backend {backend.value!r}"
            )
        del self._storages[backend]

    # ----- File-system convenience ---------------------------------------

    #: Mapping of file-suffix → ``StorageBackend`` used by
    #: :meth:`open`. Keys are lowercased suffixes (including
    #: the leading dot). Multiple suffixes map to the same
    #: backend (``.pq`` and ``.parquet``, ``.ddb`` and ``.duckdb``).
    _EXTENSION_BACKEND: Mapping[str, "StorageBackend"] = {
        ".csv": StorageBackend.CSV,
        ".json": StorageBackend.JSON,
        ".parquet": StorageBackend.PARQUET,
        ".pq": StorageBackend.PARQUET,
        ".duckdb": StorageBackend.DUCKDB,
        ".ddb": StorageBackend.DUCKDB,
    }

    @staticmethod
    def _detect_backend(path: "Path") -> "StorageBackend":
        """Map ``path`` to a :class:`StorageBackend`.

        Resolution order:

        1. If ``path`` is a directory, scan for a file with
           a known suffix and use the first match.
        2. If ``path`` is a file with a known suffix, use the
           suffix.
        3. Otherwise, fall back to ``DUCKDB``.
        """
        from pathlib import Path as _Path

        p = _Path(path)
        if p.is_dir():
            for child in sorted(p.iterdir()):
                suffix = child.suffix.lower()
                if suffix in StorageRegistry._EXTENSION_BACKEND:
                    return StorageRegistry._EXTENSION_BACKEND[suffix]
            return StorageBackend.DUCKDB
        suffix = p.suffix.lower()
        if suffix in StorageRegistry._EXTENSION_BACKEND:
            return StorageRegistry._EXTENSION_BACKEND[suffix]
        if not suffix:
            return StorageBackend.DUCKDB
        raise StorageError(
            f"unsupported dataset extension {suffix!r}; "
            f"expected one of "
            f"{sorted(StorageRegistry._EXTENSION_BACKEND)}"
        )

    def open(
        self,
        uri: str | "Path",
        *,
        table_name: str = "trade_records",
        overwrite: bool = False,
        compression: str = "none",
    ) -> "CanonicalDataset":
        """Load a :class:`CanonicalDataset` from a previously-
        persisted file via the public Storage API.

        The backend is auto-detected from the file extension
        (``.csv`` / ``.json`` / ``.parquet`` / ``.duckdb``).
        A directory path is supported (scanned for the first
        known file type — the convention used by
        :class:`ParquetWriter`).

        Parameters
        ----------
        uri
            Path to a stored dataset. The extension
            determines the backend.
        table_name
            For DuckDB: the table to read from.
            Default ``"trade_records"``.
        overwrite
            Forwarded to ``StorageConfig`` (currently unused
            by reads).
        compression
            Forwarded to ``StorageConfig`` (currently unused
            by reads).

        Returns
        -------
        CanonicalDataset
            The deserialised dataset.

        Raises
        ------
        StorageError
            When ``uri`` does not exist, the extension is
            unsupported, or the backend cannot read the file.
        """
        from pathlib import Path as _Path

        p = _Path(uri)
        if not p.exists():
            raise StorageError(
                f"dataset path does not exist: {p}"
            )
        backend = self._detect_backend(p)
        storage = self.get(backend)
        config = StorageConfig(
            root=str(p),
            overwrite=overwrite,
            compression=compression,
            table_name=table_name,
        )
        return storage.read(config)

register

register(
    backend: StorageBackend, storage: "Storage"
) -> None

Register (or replace) the storage for backend.

Source code in un_comtrade/storage/_base.py
def register(
    self,
    backend: StorageBackend,
    storage: "Storage",
) -> None:
    """Register (or replace) the storage for
    `backend`."""
    if not isinstance(backend, StorageBackend):
        raise TypeError(
            f"backend must be StorageBackend; got "
            f"{type(backend).__name__}"
        )
    if not hasattr(storage, "backend") or not callable(
        getattr(storage, "store", None)
    ):
        raise TypeError(
            f"storage must have a 'backend' attribute and a "
            f"'store' callable; got {type(storage).__name__}"
        )
    self._storages[backend] = storage

get

get(backend: StorageBackend) -> 'Storage'

Return the registered storage for backend.

Raises StorageError if no storage is registered for the requested backend.

Source code in un_comtrade/storage/_base.py
def get(self, backend: StorageBackend) -> "Storage":
    """Return the registered storage for `backend`.

    Raises `StorageError` if no storage is
    registered for the requested backend.
    """
    try:
        return self._storages[backend]
    except KeyError as exc:
        raise StorageError(
            f"No storage registered for backend {backend.value!r}"
        ) from exc

supported_backends

supported_backends() -> tuple[StorageBackend, ...]

Return the backends this registry knows about.

Source code in un_comtrade/storage/_base.py
def supported_backends(self) -> tuple[StorageBackend, ...]:
    """Return the backends this registry knows about."""
    return tuple(self._storages.keys())

unregister

unregister(backend: StorageBackend) -> None

Remove the storage for backend.

Raises StorageError if no storage is registered.

Source code in un_comtrade/storage/_base.py
def unregister(self, backend: StorageBackend) -> None:
    """Remove the storage for `backend`.

    Raises `StorageError` if no storage is
    registered.
    """
    if backend not in self._storages:
        raise StorageError(
            f"No storage registered for backend {backend.value!r}"
        )
    del self._storages[backend]

open

open(
    uri: str | "Path",
    *,
    table_name: str = "trade_records",
    overwrite: bool = False,
    compression: str = "none",
) -> "CanonicalDataset"

Load a :class:CanonicalDataset from a previously- persisted file via the public Storage API.

The backend is auto-detected from the file extension (.csv / .json / .parquet / .duckdb). A directory path is supported (scanned for the first known file type — the convention used by :class:ParquetWriter).

Parameters

uri Path to a stored dataset. The extension determines the backend. table_name For DuckDB: the table to read from. Default "trade_records". overwrite Forwarded to StorageConfig (currently unused by reads). compression Forwarded to StorageConfig (currently unused by reads).

Returns

CanonicalDataset The deserialised dataset.

Raises

StorageError When uri does not exist, the extension is unsupported, or the backend cannot read the file.

Source code in un_comtrade/storage/_base.py
def open(
    self,
    uri: str | "Path",
    *,
    table_name: str = "trade_records",
    overwrite: bool = False,
    compression: str = "none",
) -> "CanonicalDataset":
    """Load a :class:`CanonicalDataset` from a previously-
    persisted file via the public Storage API.

    The backend is auto-detected from the file extension
    (``.csv`` / ``.json`` / ``.parquet`` / ``.duckdb``).
    A directory path is supported (scanned for the first
    known file type — the convention used by
    :class:`ParquetWriter`).

    Parameters
    ----------
    uri
        Path to a stored dataset. The extension
        determines the backend.
    table_name
        For DuckDB: the table to read from.
        Default ``"trade_records"``.
    overwrite
        Forwarded to ``StorageConfig`` (currently unused
        by reads).
    compression
        Forwarded to ``StorageConfig`` (currently unused
        by reads).

    Returns
    -------
    CanonicalDataset
        The deserialised dataset.

    Raises
    ------
    StorageError
        When ``uri`` does not exist, the extension is
        unsupported, or the backend cannot read the file.
    """
    from pathlib import Path as _Path

    p = _Path(uri)
    if not p.exists():
        raise StorageError(
            f"dataset path does not exist: {p}"
        )
    backend = self._detect_backend(p)
    storage = self.get(backend)
    config = StorageConfig(
        root=str(p),
        overwrite=overwrite,
        compression=compression,
        table_name=table_name,
    )
    return storage.read(config)

Examples

from un_comtrade import ComtradeClient

with ComtradeClient() as client:
    exports = client.trade.get_exports(reporter_code=699, period="2022")
    client.storage.open("india_exports_2022.parquet").write(exports)