Skip to content

ETLFacade

The ETL facade composes pipelines of stages (fetch, transform, filter, aggregate, export) with shared configuration injection. Pipelines are declared as a list of stage definitions; the SDK executes them sequentially with structured error propagation.

API reference

The full reference is generated from the SDK's docstrings via mkdocstrings.

ETLFacade

Public facade for the ETL pipeline layer.

Exposed via :attr:un_comtrade.client.ComtradeClient.etl so callers can build pipelines that share the client's :class:un_comtrade.config.Configuration without re-supplying it each time.

Construction::

client = ComtradeClient()
pipeline = client.etl.pipeline(
    name="trade_ingest",
    stages=(stage_spec_a, stage_spec_b),
)
result = pipeline.run(source=...)

The facade does not duplicate the ETLPipeline runner; it is a thin factory that injects the client's configuration.

Source code in un_comtrade/etl.py
class ETLFacade:
    """Public facade for the ETL pipeline layer.

    Exposed via :attr:`un_comtrade.client.ComtradeClient.etl` so
    callers can build pipelines that share the client's
    :class:`un_comtrade.config.Configuration` without re-supplying
    it each time.

    Construction::

        client = ComtradeClient()
        pipeline = client.etl.pipeline(
            name="trade_ingest",
            stages=(stage_spec_a, stage_spec_b),
        )
        result = pipeline.run(source=...)

    The facade does not duplicate the ``ETLPipeline`` runner; it
    is a thin factory that injects the client's configuration.
    """

    def __init__(self, configuration: "Any") -> None:
        self._configuration = configuration

    @property
    def configuration(self) -> "Any":
        """The :class:`un_comtrade.config.Configuration` this
        facade injects into new pipelines."""
        return self._configuration

    def pipeline(
        self,
        name: str,
        stages: "tuple[StageSpec, ...] | list[StageSpec]",
    ) -> "ETLPipeline":
        """Build an :class:`ETLPipeline` that inherits the
        client's configuration.

        Parameters
        ----------
        name
            Pipeline identifier (mirrored on the resulting
            :class:`PipelineResult.pipeline_name`).
        stages
            Ordered tuple / list of :class:`StageSpec`
            entries.

        Returns
        -------
        ETLPipeline
            A ready-to-run pipeline. Call ``pipeline.run(source)``
            with the input dataset / payload.
        """
        return ETLPipeline(
            name=name,
            stages=tuple(stages),
            config=self._configuration,
        )

configuration property

configuration: 'Any'

The :class:un_comtrade.config.Configuration this facade injects into new pipelines.

pipeline

pipeline(
    name: str,
    stages: "tuple[StageSpec, ...] | list[StageSpec]",
) -> "ETLPipeline"

Build an :class:ETLPipeline that inherits the client's configuration.

Parameters

name Pipeline identifier (mirrored on the resulting :class:PipelineResult.pipeline_name). stages Ordered tuple / list of :class:StageSpec entries.

Returns

ETLPipeline A ready-to-run pipeline. Call pipeline.run(source) with the input dataset / payload.

Source code in un_comtrade/etl.py
def pipeline(
    self,
    name: str,
    stages: "tuple[StageSpec, ...] | list[StageSpec]",
) -> "ETLPipeline":
    """Build an :class:`ETLPipeline` that inherits the
    client's configuration.

    Parameters
    ----------
    name
        Pipeline identifier (mirrored on the resulting
        :class:`PipelineResult.pipeline_name`).
    stages
        Ordered tuple / list of :class:`StageSpec`
        entries.

    Returns
    -------
    ETLPipeline
        A ready-to-run pipeline. Call ``pipeline.run(source)``
        with the input dataset / payload.
    """
    return ETLPipeline(
        name=name,
        stages=tuple(stages),
        config=self._configuration,
    )

ETLPipeline dataclass

Declarative orchestrator for an ETL pipeline.

The pipeline is composed of a tuple of StageSpec entries. Stages run in the order declared; each stage receives the previous stage's output (or the source, for the first stage) and the shared PipelineContext.

This is orchestration only: the pipeline does NOT implement any concrete stage. Stages are supplied by the caller (built by the StageSpec.factory callables). The MVP supports a sequential pipeline; parallel / streaming is reserved for future versions per OQ-ETL-002.

Construction::

pipeline = ETLPipeline(
    name="trade_ingest",
    stages=(
        StageSpec(
            name="extract",
            kind=StageKind.EXTRACT,
            factory=lambda ctx: MyExtractor(...),
        ),
        StageSpec(
            name="validate",
            kind=StageKind.VALIDATE,
            factory=lambda ctx: MyValidator(...),
        ),
        ...
    ),
    config={"batch_size": 1000},
)

Execution::

result = pipeline.run(source=response_envelope)
if result.status is PipelineStatus.SUCCESS:
    for record in result.output:
        ...
else:
    for error in result.errors:
        ...

Mutability:

  • name is required and non-empty.
  • stages is required and MUST be a sequence of StageSpec. The constructor accepts lists / tuples but freezes to a tuple.
  • config is a free-form mapping; the pipeline shallow-copies it into the PipelineContext at run time so stages cannot mutate the pipeline's config.

Composition:

  • with_stage(spec) returns a NEW pipeline with the stage appended. The original is unchanged (the pipeline is logically immutable once constructed; this matches the frozen semantics of the SDK's other orchestrators).
Source code in un_comtrade/etl.py
@dataclass
class ETLPipeline:
    """Declarative orchestrator for an ETL pipeline.

    The pipeline is composed of a tuple of `StageSpec`
    entries. Stages run in the order declared; each
    stage receives the previous stage's output (or
    the source, for the first stage) and the shared
    `PipelineContext`.

    This is **orchestration only**: the pipeline does
    NOT implement any concrete stage. Stages are
    supplied by the caller (built by the
    `StageSpec.factory` callables). The MVP supports
    a sequential pipeline; parallel / streaming is
    reserved for future versions per OQ-ETL-002.

    Construction::

        pipeline = ETLPipeline(
            name="trade_ingest",
            stages=(
                StageSpec(
                    name="extract",
                    kind=StageKind.EXTRACT,
                    factory=lambda ctx: MyExtractor(...),
                ),
                StageSpec(
                    name="validate",
                    kind=StageKind.VALIDATE,
                    factory=lambda ctx: MyValidator(...),
                ),
                ...
            ),
            config={"batch_size": 1000},
        )

    Execution::

        result = pipeline.run(source=response_envelope)
        if result.status is PipelineStatus.SUCCESS:
            for record in result.output:
                ...
        else:
            for error in result.errors:
                ...

    Mutability:

    - `name` is required and non-empty.
    - `stages` is required and MUST be a sequence of
      `StageSpec`. The constructor accepts lists /
      tuples but freezes to a tuple.
    - `config` is a free-form mapping; the pipeline
      shallow-copies it into the `PipelineContext` at
      run time so stages cannot mutate the pipeline's
      config.

    Composition:

    - `with_stage(spec)` returns a NEW pipeline with
      the stage appended. The original is unchanged
      (the pipeline is logically immutable once
      constructed; this matches the frozen semantics
      of the SDK's other orchestrators).
    """

    name: str
    stages: tuple[StageSpec, ...]
    config: Mapping[str, Any] = field(default_factory=dict)

    def __post_init__(self) -> None:
        if not isinstance(self.name, str) or not self.name.strip():
            raise ValueError(
                f"ETLPipeline.name must be a non-empty string; "
                f"got {self.name!r}"
            )
        # Accept lists / sequences but force tuple.
        if not isinstance(self.stages, tuple):
            object.__setattr__(self, "stages", tuple(self.stages))
        for i, spec in enumerate(self.stages):
            if not isinstance(spec, StageSpec):
                raise TypeError(
                    f"stages[{i}] must be StageSpec; got "
                    f"{type(spec).__name__}"
                )
        # Reject duplicate stage names (timings map is
        # keyed by name; duplicates would collide).
        seen: set[str] = set()
        for spec in self.stages:
            if spec.name in seen:
                raise ValueError(
                    f"Duplicate stage name {spec.name!r} in "
                    f"pipeline {self.name!r}"
                )
            seen.add(spec.name)
        if not isinstance(self.config, Mapping):
            raise TypeError(
                f"ETLPipeline.config must be a Mapping; got "
                f"{type(self.config).__name__}"
            )

    # ----- Composition ----------------------------------------------------

    def with_stage(self, spec: StageSpec) -> "ETLPipeline":
        """Return a new pipeline with `spec` appended.

        The original pipeline is unchanged. Used to
        build pipelines incrementally.
        """
        if not isinstance(spec, StageSpec):
            raise TypeError(
                f"spec must be StageSpec; got "
                f"{type(spec).__name__}"
            )
        return ETLPipeline(
            name=self.name,
            stages=self.stages + (spec,),
            config=self.config,
        )

    def with_config(self, **overrides: Any) -> "ETLPipeline":
        """Return a new pipeline with `config` overridden.

        Original pipeline is unchanged.
        """
        new_config: dict[str, Any] = dict(self.config)
        new_config.update(overrides)
        return ETLPipeline(
            name=self.name,
            stages=self.stages,
            config=new_config,
        )

    # ----- Inspection -----------------------------------------------------

    @property
    def stage_names(self) -> tuple[str, ...]:
        """Tuple of stage names in execution order."""
        return tuple(spec.name for spec in self.stages)

    @property
    def stage_kinds(self) -> tuple[StageKind, ...]:
        """Tuple of stage kinds in execution order."""
        return tuple(spec.kind for spec in self.stages)

    # ----- Execution ------------------------------------------------------

    def run(self, source: Any) -> PipelineResult:
        """Execute the pipeline against `source`.

        Stages run in declared order. The first stage
        receives `source` as its input; subsequent
        stages receive the previous stage's output. A
        stage failure (any `Exception`) short-circuits
        the pipeline and records `FAILED` status; the
        exception is NOT re-raised so the caller can
        inspect the partial output.

        Returns a `PipelineResult` capturing output,
        warnings, errors, and per-stage timings. Even
        on failure the result is returned (with
        `status=FAILED`); the pipeline never raises
        from `run()` itself.
        """
        context = PipelineContext(
            pipeline_name=self.name,
            config=dict(self.config),
        )
        context.started_at = context.now()
        output: Any = source
        status: PipelineStatus = PipelineStatus.SUCCESS

        try:
            for spec in self.stages:
                stage_started = context.now()
                try:
                    stage = spec.factory(context)
                except Exception as exc:
                    # Factory itself failed; treat as a
                    # stage failure (the stage was never
                    # invoked).
                    context.error(
                        f"stage {spec.name!r} factory raised "
                        f"{type(exc).__name__}: {exc}"
                    )
                    status = PipelineStatus.FAILED
                    break
                try:
                    output = stage(output, context)
                except PipelineError as exc:
                    context.error(
                        f"stage {spec.name!r} raised "
                        f"PipelineError: {exc}"
                    )
                    status = PipelineStatus.FAILED
                    break
                except Exception as exc:
                    context.error(
                        f"stage {spec.name!r} raised "
                        f"{type(exc).__name__}: {exc}"
                    )
                    status = PipelineStatus.FAILED
                    break
                finally:
                    stage_finished = context.now()
                    duration = (
                        stage_finished - stage_started
                    ).total_seconds()
                    context.stage_durations[spec.name] = duration
        finally:
            context.finished_at = context.now()

        return PipelineResult(
            pipeline_name=self.name,
            status=status,
            output=output,
            warnings=list(context.warnings),
            errors=list(context.errors),
            records_in=context.records_in,
            records_out=context.records_out,
            started_at=context.started_at,
            finished_at=context.finished_at,
            stage_durations=dict(context.stage_durations),
        )

stage_names property

stage_names: tuple[str, ...]

Tuple of stage names in execution order.

stage_kinds property

stage_kinds: tuple[StageKind, ...]

Tuple of stage kinds in execution order.

with_stage

with_stage(spec: StageSpec) -> 'ETLPipeline'

Return a new pipeline with spec appended.

The original pipeline is unchanged. Used to build pipelines incrementally.

Source code in un_comtrade/etl.py
def with_stage(self, spec: StageSpec) -> "ETLPipeline":
    """Return a new pipeline with `spec` appended.

    The original pipeline is unchanged. Used to
    build pipelines incrementally.
    """
    if not isinstance(spec, StageSpec):
        raise TypeError(
            f"spec must be StageSpec; got "
            f"{type(spec).__name__}"
        )
    return ETLPipeline(
        name=self.name,
        stages=self.stages + (spec,),
        config=self.config,
    )

with_config

with_config(**overrides: Any) -> 'ETLPipeline'

Return a new pipeline with config overridden.

Original pipeline is unchanged.

Source code in un_comtrade/etl.py
def with_config(self, **overrides: Any) -> "ETLPipeline":
    """Return a new pipeline with `config` overridden.

    Original pipeline is unchanged.
    """
    new_config: dict[str, Any] = dict(self.config)
    new_config.update(overrides)
    return ETLPipeline(
        name=self.name,
        stages=self.stages,
        config=new_config,
    )

run

run(source: Any) -> PipelineResult

Execute the pipeline against source.

Stages run in declared order. The first stage receives source as its input; subsequent stages receive the previous stage's output. A stage failure (any Exception) short-circuits the pipeline and records FAILED status; the exception is NOT re-raised so the caller can inspect the partial output.

Returns a PipelineResult capturing output, warnings, errors, and per-stage timings. Even on failure the result is returned (with status=FAILED); the pipeline never raises from run() itself.

Source code in un_comtrade/etl.py
def run(self, source: Any) -> PipelineResult:
    """Execute the pipeline against `source`.

    Stages run in declared order. The first stage
    receives `source` as its input; subsequent
    stages receive the previous stage's output. A
    stage failure (any `Exception`) short-circuits
    the pipeline and records `FAILED` status; the
    exception is NOT re-raised so the caller can
    inspect the partial output.

    Returns a `PipelineResult` capturing output,
    warnings, errors, and per-stage timings. Even
    on failure the result is returned (with
    `status=FAILED`); the pipeline never raises
    from `run()` itself.
    """
    context = PipelineContext(
        pipeline_name=self.name,
        config=dict(self.config),
    )
    context.started_at = context.now()
    output: Any = source
    status: PipelineStatus = PipelineStatus.SUCCESS

    try:
        for spec in self.stages:
            stage_started = context.now()
            try:
                stage = spec.factory(context)
            except Exception as exc:
                # Factory itself failed; treat as a
                # stage failure (the stage was never
                # invoked).
                context.error(
                    f"stage {spec.name!r} factory raised "
                    f"{type(exc).__name__}: {exc}"
                )
                status = PipelineStatus.FAILED
                break
            try:
                output = stage(output, context)
            except PipelineError as exc:
                context.error(
                    f"stage {spec.name!r} raised "
                    f"PipelineError: {exc}"
                )
                status = PipelineStatus.FAILED
                break
            except Exception as exc:
                context.error(
                    f"stage {spec.name!r} raised "
                    f"{type(exc).__name__}: {exc}"
                )
                status = PipelineStatus.FAILED
                break
            finally:
                stage_finished = context.now()
                duration = (
                    stage_finished - stage_started
                ).total_seconds()
                context.stage_durations[spec.name] = duration
    finally:
        context.finished_at = context.now()

    return PipelineResult(
        pipeline_name=self.name,
        status=status,
        output=output,
        warnings=list(context.warnings),
        errors=list(context.errors),
        records_in=context.records_in,
        records_out=context.records_out,
        started_at=context.started_at,
        finished_at=context.finished_at,
        stage_durations=dict(context.stage_durations),
    )

Examples

from un_comtrade import ComtradeClient

with ComtradeClient() as client:
    pipeline = client.etl.pipeline(
        name="india_exports",
        stages=[
            ("fetch", {"reporter_code": 699, "period": "2022"}),
            ("export", {"path": "india_exports.parquet"}),
        ],
    )
    pipeline.run()