Skip to content

Shared & Utilities

Shared Config

Shared infrastructure configuration — single source of truth for all layers.

Contains only the four env vars that every layer needs: - MLflow tracking URI - MinIO endpoint URL + credentials

Both src.pipelines._config.PipelineConfig and src.app.config sub-classes extend or re-declare these same env-var aliases, so the env-var contract is defined once here and referenced everywhere.

SharedInfraConfig

Bases: BaseSettings

Minimum infrastructure settings shared by all layers.

Subclass this in layer-specific settings classes; do not instantiate directly except via get_shared_config().

Field naming uses mlflow_* / minio_* prefixes to avoid name collisions when composed into larger settings objects. src.app.config classes use shorter field names (tracking_uri, access_key, ...) for ergonomics; they share only the env-var aliases.

Source code in src/shared/config.py
class SharedInfraConfig(BaseSettings):
    """Minimum infrastructure settings shared by all layers.

    Subclass this in layer-specific settings classes; do not instantiate
    directly except via ``get_shared_config()``.

    Field naming uses ``mlflow_*`` / ``minio_*`` prefixes to avoid name
    collisions when composed into larger settings objects.
    ``src.app.config`` classes use shorter field names (``tracking_uri``,
    ``access_key``, ...) for ergonomics; they share only the env-var aliases.
    """

    mlflow_tracking_uri: str = Field(..., validation_alias="MLFLOW_TRACKING_URL")
    minio_endpoint_url: str = Field(..., validation_alias="MINIO_ENDPOINT_URL")
    minio_access_key: str = Field(..., validation_alias="MINIO_USER")
    minio_secret_key: str = Field(..., validation_alias="MINIO_PASSWORD")

    model_config = {
        "env_file": _PACKAGE_ROOT / ".env",
        "env_file_encoding": "utf-8",
        "extra": "ignore",
    }

get_shared_config() cached

Return the shared infrastructure config singleton (lazy, cached).

Use get_shared_config.cache_clear() in tests to force re-instantiation after changing env vars with monkeypatch.

Returns:

Type Description
Singleton

class:SharedInfraConfig instance.

Source code in src/shared/config.py
@lru_cache(maxsize=1)
def get_shared_config() -> SharedInfraConfig:
    """Return the shared infrastructure config singleton (lazy, cached).

    Use ``get_shared_config.cache_clear()`` in tests to force re-instantiation
    after changing env vars with ``monkeypatch``.

    Returns:
        Singleton :class:`SharedInfraConfig` instance.
    """
    return SharedInfraConfig()

MLflow Metadata

MLflow metadata utilities: data lineage, pipeline context, model config.

Provides helpers to build consistent MLflow tag/param dicts so every run in the same experiment is unambiguously identifiable by its data snapshot, run mode, and model configuration.

Naming convention

Tags (context / lineage / run identity, not affecting model training):: data.version, data.hash, data.source_*, data.ingested_at, data.train_start, data.train_end, data.test_start, data.test_end, data.train_rows, data.test_rows, pipeline.git_sha, pipeline.dvc_exp_name, pipeline.params_hash

Params (configuration that directly affects the trained model):: model.name, model.target, model.feature_count, model.hyperparams_source

Experiment structure

Only two experiments are used:: matches_clf — production runs (train_eval, ablation, tuning, final_train) matches_clf_smoke — smoke / fast-dev runs

Note on data.source_created_at

The S3/MinIO HEAD-object response does NOT expose a distinct creation timestamp — it only provides LastModified (the time the object was last PUT/replaced). Therefore data.source_created_at cannot be determined reliably and is intentionally NOT logged. data.source_last_modified is the closest available proxy.

derive_features_profile(feat_params)

Derive a human-readable feature-profile label from a params dict.

Logic mirrors the ablation variant naming convention:

Parameters:

Name Type Description Default
feat_params dict

Feature configuration dict with boolean keys include_elo and include_h2h.

required

Returns:

Type Description
str

One of "stats_only", "no_h2h", or "full".

Source code in src/utils/mlflow_meta.py
def derive_features_profile(feat_params: dict) -> str:
    """Derive a human-readable feature-profile label from a params dict.

    Logic mirrors the ablation variant naming convention:

    Args:
        feat_params: Feature configuration dict with boolean keys
            ``include_elo`` and ``include_h2h``.

    Returns:
        One of ``"stats_only"``, ``"no_h2h"``, or ``"full"``.
    """
    include_elo = feat_params.get("include_elo", True)
    include_h2h = feat_params.get("include_h2h", True)
    if not include_elo:
        return "stats_only"
    if not include_h2h:
        return "no_h2h"
    return "full"

set_experiment_active(experiment_name)

Set the active MLflow experiment, restoring it first if it was deleted.

MLflow raises MlflowException when set_experiment is called on a soft-deleted experiment. This helper detects that state and calls MlflowClient.restore_experiment before delegating to the standard mlflow.set_experiment.

Parameters

experiment_name: The experiment name to activate.

Returns

mlflow.entities.Experiment The active experiment object.

Source code in src/utils/mlflow_meta.py
def set_experiment_active(experiment_name: str) -> mlflow.entities.Experiment:
    """Set the active MLflow experiment, restoring it first if it was deleted.

    MLflow raises ``MlflowException`` when ``set_experiment`` is called on a
    soft-deleted experiment.  This helper detects that state and calls
    ``MlflowClient.restore_experiment`` before delegating to the standard
    ``mlflow.set_experiment``.

    Parameters
    ----------
    experiment_name:
        The experiment name to activate.

    Returns
    -------
    mlflow.entities.Experiment
        The active experiment object.
    """
    client = MlflowClient()
    experiment = client.get_experiment_by_name(experiment_name)
    if experiment is not None and experiment.lifecycle_stage == "deleted":
        client.restore_experiment(experiment.experiment_id)
        logger.info(
            "Restored deleted MLflow experiment '%s' (id=%s)",
            experiment_name,
            experiment.experiment_id,
        )
    return mlflow.set_experiment(experiment_name)

build_pipeline_context_tags()

Build MLflow tags describing the git/pipeline context of a run.

Returns:

Type Description
Dict with string tag keys and values
dict[str, str]

pipeline.git_sha — HEAD short commit hash;

dict[str, str]

pipeline.git_commit_message — HEAD commit subject line;

dict[str, str]

pipeline.params_hash — 16-char SHA-256 of params.yaml;

dict[str, str]

pipeline.dvc_rev — 16-char SHA-256 of dvc.lock.

Source code in src/utils/mlflow_meta.py
def build_pipeline_context_tags() -> dict[str, str]:
    """Build MLflow tags describing the git/pipeline context of a run.

    Returns:
        Dict with string tag keys and values:
        ``pipeline.git_sha``            — HEAD short commit hash;
        ``pipeline.git_commit_message`` — HEAD commit subject line;
        ``pipeline.params_hash``        — 16-char SHA-256 of params.yaml;
        ``pipeline.dvc_rev``            — 16-char SHA-256 of dvc.lock.
    """
    tags: dict[str, str] = {}
    sha = _get_git_sha()
    if sha:
        tags["pipeline.git_sha"] = sha
    msg = _get_git_commit_message()
    if msg:
        tags["pipeline.git_commit_message"] = msg
    params_hash = _hash_params_file()
    if params_hash:
        tags["pipeline.params_hash"] = params_hash
    dvc_rev = _get_dvc_rev()
    if dvc_rev:
        tags["pipeline.dvc_rev"] = dvc_rev
    return tags

build_data_lineage_tags(dataset_path, df_train=None, df_test=None, dvc_hash=None, raw_source_path=None)

Build MLflow tags describing dataset lineage for a training run.

Parameters

dataset_path: Path to the processed dataset parquet (used to locate minio sidecar if raw_source_path is not given). df_train: Training split DataFrame. If it contains startTimeUtc, temporal bounds and row count are logged. df_test: Test/holdout split DataFrame. Same conditions as df_train. dvc_hash: DVC MD5 hash of the dataset artifact (from get_dvc_hash). Logged as both data.version and data.hash. raw_source_path: Explicit path to the raw source parquet whose .minio.json sidecar contains the MinIO object metadata. Defaults to data/raw/match_raw.parquet relative to the project root.

Source code in src/utils/mlflow_meta.py
def build_data_lineage_tags(
    dataset_path: str | Path,
    df_train: pd.DataFrame | None = None,
    df_test: pd.DataFrame | None = None,
    dvc_hash: str | None = None,
    raw_source_path: str | Path | None = None,
) -> dict[str, str]:
    """Build MLflow tags describing dataset lineage for a training run.

    Parameters
    ----------
    dataset_path:
        Path to the processed dataset parquet (used to locate minio sidecar
        if ``raw_source_path`` is not given).
    df_train:
        Training split DataFrame.  If it contains ``startTimeUtc``, temporal
        bounds and row count are logged.
    df_test:
        Test/holdout split DataFrame.  Same conditions as ``df_train``.
    dvc_hash:
        DVC MD5 hash of the dataset artifact (from ``get_dvc_hash``).  Logged
        as both ``data.version`` and ``data.hash``.
    raw_source_path:
        Explicit path to the raw source parquet whose ``.minio.json`` sidecar
        contains the MinIO object metadata.  Defaults to
        ``data/raw/match_raw.parquet`` relative to the project root.
    """
    tags: dict[str, str] = {}

    if dvc_hash:
        tags["data.version"] = dvc_hash
        tags["data.hash"] = dvc_hash

    # MinIO source metadata from sidecar file written by load_data_from_sources.
    raw_path = raw_source_path or (_PROJECT_ROOT / "data" / "raw" / "match_raw.parquet")
    minio = _load_minio_meta(raw_path)

    if minio.get("bucket"):
        tags["data.source_bucket"] = str(minio["bucket"])
    if minio.get("key"):
        tags["data.source_key"] = str(minio["key"])
    if minio.get("etag"):
        # Strip surrounding quotes that S3/MinIO adds to ETags.
        tags["data.source_etag"] = str(minio["etag"]).strip('"')
    # S3/MinIO does not expose a distinct object creation timestamp.
    # last_modified is the time the object was last PUT/replaced.
    if minio.get("last_modified"):
        tags["data.source_last_modified"] = str(minio["last_modified"])
    if minio.get("ingested_at"):
        tags["data.ingested_at"] = str(minio["ingested_at"])

    # Temporal split boundaries and row counts.
    _time_col = "startTimeUtc"
    if df_train is not None and _time_col in df_train.columns:
        tags["data.train_start"] = str(df_train[_time_col].min())
        tags["data.train_end"] = str(df_train[_time_col].max())
        tags["data.train_rows"] = str(len(df_train))

    if df_test is not None and _time_col in df_test.columns:
        tags["data.test_start"] = str(df_test[_time_col].min())
        tags["data.test_end"] = str(df_test[_time_col].max())
        tags["data.test_rows"] = str(len(df_test))

    return tags

build_features_selected_params(feat_params)

Build MLflow params describing the features_selected configuration.

Logs the exact feature set used for tuning, final_train, and inference so any downstream run can be reproduced without inspecting params.yaml.

Parameters

feat_params: Dict from params["features_selected"] (or params["classification"] as a fallback). Expected keys: side, window_sizes, include_elo, include_rest_days, include_h2h, cat_cols.

Returns

dict[str, Any] MLflow param dict keyed by features.* names.

Source code in src/utils/mlflow_meta.py
def build_features_selected_params(feat_params: dict) -> dict[str, Any]:
    """Build MLflow params describing the features_selected configuration.

    Logs the exact feature set used for tuning, final_train, and inference
    so any downstream run can be reproduced without inspecting params.yaml.

    Parameters
    ----------
    feat_params:
        Dict from ``params["features_selected"]`` (or ``params["classification"]``
        as a fallback).  Expected keys: ``side``, ``window_sizes``,
        ``include_elo``, ``include_rest_days``, ``include_h2h``, ``cat_cols``.

    Returns
    -------
    dict[str, Any]
        MLflow param dict keyed by ``features.*`` names.
    """
    return {
        "features.side": str(feat_params.get("side", [])),
        "features.window_sizes": str(feat_params.get("window_sizes", [])),
        "features.cat_cols": str(feat_params.get("cat_cols", [])),
        "features.include_elo": str(feat_params.get("include_elo", True)),
        "features.include_rest_days": str(feat_params.get("include_rest_days", True)),
        "features.include_h2h": str(feat_params.get("include_h2h", True)),
        "features.class_weight": str(feat_params.get("class_weight", None)),
    }

build_model_metadata_params(model_name, target, num_feature_count, cat_feature_count, best_params=None)

Build MLflow params describing model configuration.

Parameters

model_name: Short algorithm identifier (e.g. 'xgb', 'logreg'). target: Target column name. num_feature_count: Number of numeric feature columns. cat_feature_count: Number of categorical feature columns. best_params: Tuned hyperparameter dict. When non-empty, model.hyperparams_source is 'tuned'; otherwise 'default'.

Returns

dict[str, Any] MLflow param dict keyed by model.* names.

Note: model.family is intentionally NOT included here — it is a contextual tag, not a reproducibility param. Use build_run_scope_tags to log it.

Source code in src/utils/mlflow_meta.py
def build_model_metadata_params(
    model_name: str,
    target: str,
    num_feature_count: int,
    cat_feature_count: int,
    best_params: dict | None = None,
) -> dict[str, Any]:
    """Build MLflow params describing model configuration.

    Parameters
    ----------
    model_name:
        Short algorithm identifier (e.g. ``'xgb'``, ``'logreg'``).
    target:
        Target column name.
    num_feature_count:
        Number of numeric feature columns.
    cat_feature_count:
        Number of categorical feature columns.
    best_params:
        Tuned hyperparameter dict.  When non-empty, ``model.hyperparams_source``
        is ``'tuned'``; otherwise ``'default'``.

    Returns
    -------
    dict[str, Any]
        MLflow param dict keyed by ``model.*`` names.

    Note: ``model.family`` is intentionally NOT included here — it is a
    contextual tag, not a reproducibility param.  Use ``build_run_scope_tags``
    to log it.
    """
    return {
        "model.name": model_name,
        "model.target": target,
        "model.feature_count": num_feature_count + cat_feature_count,
        "model.hyperparams_source": "tuned" if best_params else "default",
    }

get_features_profile(variant)

Return the features.profile tag value for a pipeline variant name.

Parameters:

Name Type Description Default
variant str

Pipeline variant name (e.g. "baseline", "elo_only").

required

Returns:

Type Description
str

Canonical features.profile tag string, or the variant name itself

str

if not in the known map.

Source code in src/utils/mlflow_meta.py
def get_features_profile(variant: str) -> str:
    """Return the ``features.profile`` tag value for a pipeline variant name.

    Args:
        variant: Pipeline variant name (e.g. ``"baseline"``, ``"elo_only"``).

    Returns:
        Canonical ``features.profile`` tag string, or the variant name itself
        if not in the known map.
    """
    return _FEATURES_PROFILE_MAP.get(variant, variant)

infer_run_kind(experiment_name, stage_kind)

Derive pipeline.run_kind from experiment name and stage.

If 'smoke' appears in the experiment name, returns 'smoke' regardless of stage_kind. This reflects that smoke runs are a reduced-scale variant of any stage, not a separate stage type.

Parameters

experiment_name: MLflow experiment name (e.g. 'matches_clf_smoke'). stage_kind: Default kind when not a smoke run: 'train_eval', 'ablation', 'tuning', or 'final_train'.

Returns

str 'smoke' when the experiment name contains 'smoke', otherwise stage_kind unchanged.

Source code in src/utils/mlflow_meta.py
def infer_run_kind(experiment_name: str, stage_kind: str) -> str:
    """Derive ``pipeline.run_kind`` from experiment name and stage.

    If ``'smoke'`` appears in the experiment name, returns ``'smoke'``
    regardless of ``stage_kind``.  This reflects that smoke runs are a
    reduced-scale variant of any stage, not a separate stage type.

    Parameters
    ----------
    experiment_name:
        MLflow experiment name (e.g. ``'matches_clf_smoke'``).
    stage_kind:
        Default kind when not a smoke run: ``'train_eval'``, ``'ablation'``,
        ``'tuning'``, or ``'final_train'``.

    Returns
    -------
    str
        ``'smoke'`` when the experiment name contains ``'smoke'``, otherwise
        ``stage_kind`` unchanged.
    """
    if "smoke" in experiment_name.lower():
        return "smoke"
    return stage_kind