Skip to content

Pipelines (CLI)

Each module is a Click command registered under the src.pipelines group. DVC calls these entrypoints via python -m src.pipelines <command> [args...].

Source

cli_load_data_from_source(output_path_matches, output_path_matches_raw)

Run the load-data-from-source DVC stage.

Source code in src/pipelines/source.py
@cli.command()
@click.argument("output_path_matches", type=click.Path(path_type=Path, dir_okay=False))
@click.argument(
    "output_path_matches_raw", type=click.Path(path_type=Path, dir_okay=False)
)
def cli_load_data_from_source(output_path_matches: Path, output_path_matches_raw: Path):
    """Run the load-data-from-source DVC stage."""
    load_data_from_source(output_path_matches, output_path_matches_raw)

Preprocess

cli_export_metadata(input_path)

Export match metadata (tournamentId, regionId maps) to DATA_METADATA_PATH.

Source code in src/pipelines/preprocess.py
@cli.command()
@click.argument(
    "input_path", type=click.Path(path_type=Path, exists=True, dir_okay=False)
)
def cli_export_metadata(input_path: Path):
    """Export match metadata (tournamentId, regionId maps) to ``DATA_METADATA_PATH``."""
    df_match_raw = pd.read_parquet(input_path)
    export_matches_metadata(df_match_raw, metadata_path=DATA_METADATA_PATH)

cli_preprocessing(input_path, output_finished_path, output_future_path, reference_date)

Run the preprocessing DVC stage.

Parameters:

Name Type Description Default
input_path Path

Parquet of raw match records.

required
output_finished_path Path

Destination for finished-match parquet.

required
output_future_path Path

Destination for future-match feature parquet.

required
reference_date

Cutoff datetime separating finished from future matches. Overrides params.yaml when provided.

required
Source code in src/pipelines/preprocess.py
@cli.command()
@click.argument(
    "input_path", type=click.Path(path_type=Path, exists=True, dir_okay=False)
)
@click.argument("output_finished_path", type=click.Path(path_type=Path, dir_okay=False))
@click.argument("output_future_path", type=click.Path(path_type=Path, dir_okay=False))
@click.option(
    "--reference-date",
    type=click.DateTime(formats=["%Y-%m-%dT%H:%M:%S", "%Y-%m-%d"]),
    default=None,
    help=(
        "Reference datetime used as 'now' when partitioning finished vs future matches. "
        "If omitted, value from params.yaml (preprocessing.reference_date) is used. "
        "If that is also null, wall-clock time minus 3 h is used (production ETL default). "
        "Pass an explicit value for reproducible DVC runs."
    ),
)
def cli_preprocessing(
    input_path: Path,
    output_finished_path: Path,
    output_future_path: Path,
    reference_date,
):
    """Run the preprocessing DVC stage.

    Args:
        input_path: Parquet of raw match records.
        output_finished_path: Destination for finished-match parquet.
        output_future_path: Destination for future-match feature parquet.
        reference_date: Cutoff datetime separating finished from future
            matches.  Overrides ``params.yaml`` when provided.
    """
    params = load_params()
    score_outlier_pct: float = params["preprocessing"]["score_outlier_pct"]

    # CLI flag takes priority; fall back to params; fall back to None (wall-clock in preprocess_and_split).
    if reference_date is None:
        raw = params["preprocessing"].get("reference_date")
        if raw is not None:
            from datetime import datetime

            reference_date = datetime.fromisoformat(str(raw))

    df_match_raw = pd.read_parquet(input_path)

    df_finished, df_future_X = preprocess_and_split(
        df_match_raw,
        score_outlier_pct=score_outlier_pct,
        reference_date=reference_date,
    )

    output_finished_path.parent.mkdir(parents=True, exist_ok=True)
    output_future_path.parent.mkdir(parents=True, exist_ok=True)

    df_finished.to_parquet(output_finished_path)
    df_future_X.to_parquet(output_future_path)

Features

cli_feature_engineering(input_path, output_path_features)

Run the feature-engineering DVC stage.

Source code in src/pipelines/features.py
@cli.command()
@click.argument(
    "input_path", type=click.Path(path_type=Path, exists=True, dir_okay=False)
)
@click.argument("output_path_features", type=click.Path(path_type=Path, dir_okay=False))
def cli_feature_engineering(input_path: Path, output_path_features: Path) -> None:
    """Run the feature-engineering DVC stage."""
    df_finished = pd.read_parquet(input_path)

    # Build team-match level table
    df_team_match = build_team_match_table(df_finished)
    leaky_cols = set(df_finished.columns.to_list() + df_team_match.columns.to_list())

    params = load_params()
    windows = params["features"]["window_sizes"]
    stats_cols = params["features"]["stats_cols"]

    # Overall features
    df_team_match = add_rolling_features(
        df_team_match,
        group_keys=["teamId"],
        windows=windows,
        stats_cols=stats_cols,
        prefix="all",
    )

    # Season features
    df_team_match = add_rolling_features(
        df_team_match,
        group_keys=["teamId", "seasonId"],
        windows=windows,
        stats_cols=stats_cols,
        prefix="season",
    )

    # Tournament features
    df_team_match = add_rolling_features(
        df_team_match,
        group_keys=["teamId", "tournamentId"],
        windows=windows,
        stats_cols=stats_cols,
        prefix="tournament",
    )

    # Home/Away features
    df_team_match = add_rolling_features(
        df_team_match,
        group_keys=["teamId", "is_home"],
        windows=windows,
        stats_cols=stats_cols,
        prefix="ha",
    )

    df_features = to_match_level(df_team_match, leaky_cols)

    # ELO features — computed on the finished-match timeline (pre-match
    # snapshot, no leakage).  Parameters come from features.elo so that
    # changes to k_factor / initial_rating / home_advantage trigger DVC
    # re-run automatically.
    elo_cfg = params["features"].get("elo", {})
    if elo_cfg.get("include", False):
        df_elo = compute_elo_ratings(
            df_finished,
            k_factor=float(elo_cfg.get("k_factor", 32.0)),
            initial_rating=float(elo_cfg.get("initial_rating", 1500.0)),
            home_advantage=float(elo_cfg.get("home_advantage", 50.0)),
            group_col=str(elo_cfg.get("group_col", "tournamentId")),
        )
        elo_cols = ["home_elo_pre", "away_elo_pre", "diff_elo_pre"]
        df_features = df_features.join(df_elo.set_index("id")[elo_cols], how="left")

    output_path_features.parent.mkdir(parents=True, exist_ok=True)
    df_features.to_parquet(output_path_features)

Validation (split)

cli_time_based_split(input_finished_path, input_features_path, output_dataset_path, output_train_ids_path, output_test_ids_path, output_folds_path)

Run the time-based-split DVC stage.

Source code in src/pipelines/validation.py
@cli.command()
@click.argument(
    "input_finished_path", type=click.Path(path_type=Path, exists=True, dir_okay=False)
)
@click.argument("input_features_path", type=click.Path(path_type=Path, dir_okay=False))
@click.argument("output_dataset_path", type=click.Path(path_type=Path, dir_okay=False))
@click.argument(
    "output_train_ids_path", type=click.Path(path_type=Path, dir_okay=False)
)
@click.argument("output_test_ids_path", type=click.Path(path_type=Path, dir_okay=False))
@click.argument("output_folds_path", type=click.Path(path_type=Path, dir_okay=False))
def cli_time_based_split(
    input_finished_path: Path,
    input_features_path: Path,
    output_dataset_path: Path,
    output_train_ids_path: Path,
    output_test_ids_path: Path,
    output_folds_path: Path,
):
    """Run the time-based-split DVC stage."""
    params = load_params()

    date_test_start = pd.Timestamp(params["temporal"]["test_start"], tz="UTC")
    valid_years_for_folds = list(
        range(
            params["temporal"]["folds_start_year"], params["temporal"]["folds_end_year"]
        )
    )

    df_finished = pd.read_parquet(input_finished_path)
    df_features = pd.read_parquet(input_features_path)

    df_train_ids, df_test_ids = split_time_based_on(df_finished, date_test_start)
    df_folds = make_year_folds(df_train_ids, valid_years_for_folds)
    df_dataset = df_finished.join(df_features, how="inner")

    output_dataset_path.parent.mkdir(parents=True, exist_ok=True)
    output_train_ids_path.parent.mkdir(parents=True, exist_ok=True)
    output_test_ids_path.parent.mkdir(parents=True, exist_ok=True)
    output_folds_path.parent.mkdir(parents=True, exist_ok=True)

    df_dataset.to_parquet(output_dataset_path)
    df_train_ids.to_parquet(output_train_ids_path)
    df_test_ids.to_parquet(output_test_ids_path)
    df_folds.to_parquet(output_folds_path)

Classification

cli_classification_models(input_dataset_path, input_train_ids_path, input_test_ids_path, input_folds_path, input_features_meta_path, output_run_id_path)

Run the classification-models DVC stage.

Source code in src/pipelines/classification.py
@cli.command()
@click.argument(
    "input_dataset_path", type=click.Path(path_type=Path, exists=True, dir_okay=False)
)
@click.argument("input_train_ids_path", type=click.Path(path_type=Path, dir_okay=False))
@click.argument("input_test_ids_path", type=click.Path(path_type=Path, dir_okay=False))
@click.argument("input_folds_path", type=click.Path(path_type=Path, dir_okay=False))
@click.argument(
    "input_features_meta_path", type=click.Path(path_type=Path, dir_okay=False)
)
@click.argument("output_run_id_path", type=click.Path(path_type=Path, dir_okay=False))
def cli_classification_models(
    input_dataset_path: Path,
    input_train_ids_path: Path,
    input_test_ids_path: Path,
    input_folds_path: Path,
    input_features_meta_path: Path,
    output_run_id_path: Path,
) -> None:
    """Run the classification-models DVC stage."""
    params = load_params()
    clf_params = params["classification"]

    df_dataset = pd.read_parquet(input_dataset_path)
    df_train_ids = pd.read_parquet(input_train_ids_path)
    df_test_ids = pd.read_parquet(input_test_ids_path)
    df_folds = pd.read_parquet(input_folds_path)
    df_features_meta = pd.read_parquet(input_features_meta_path)

    all_cols = df_dataset.columns.tolist()

    num_cols = select_model_features(
        df_features_meta,
        side=clf_params["side"],
        window_sizes=clf_params["window_sizes"],
        include_elo=clf_params.get("include_elo", True),
        include_rest_days=clf_params.get("include_rest_days", True),
        include_h2h=clf_params.get("include_h2h", True),
    )
    cat_cols = clf_params["cat_cols"]

    y_cols = clf_params["target_col"]
    experiment_name = clf_params["experiment_name"]
    experiment_hypothesis: str | None = clf_params.get("experiment_hypothesis")
    run_name: str | None = clf_params.get("run_name") or None
    frac = float(clf_params["frac"])

    X_cols = [col for col in all_cols if col in cat_cols + num_cols]

    class_weight = clf_params.get("class_weight") or None

    models = get_models_with_pipeline_for_clf(
        num_cols=num_cols,
        cat_cols=cat_cols,
        enabled=clf_params.get("models"),
        class_weight=class_weight,
    )

    # Provide MinIO credentials to boto3 / MLflow artifact store.
    # setdefault: respect credentials already in the environment (production),
    # fall back to settings values in development.
    _cfg = get_pipeline_config()
    os.environ.setdefault("MLFLOW_S3_ENDPOINT_URL", _cfg.minio_endpoint_url)
    os.environ.setdefault("AWS_ACCESS_KEY_ID", _cfg.minio_access_key)
    os.environ.setdefault("AWS_SECRET_ACCESS_KEY", _cfg.minio_secret_key)

    best_run_id, best_model_uri, best_model_name = make_classification_runs(
        experiment_name=experiment_name,
        tracking_uri=_cfg.mlflow_tracking_uri,
        dataset_path=str(input_dataset_path),
        df_dataset=df_dataset,
        df_train_ids=df_train_ids,
        df_test_ids=df_test_ids,
        df_folds=df_folds,
        X_cols=X_cols,
        y_cols=y_cols,
        models=models,
        frac=frac,
        cat_cols=cat_cols,
        num_cols=num_cols,
        experiment_hypothesis=experiment_hypothesis,
        run_name=run_name,
        dvc_params={
            "classification": {
                k: clf_params[k]
                for k in (
                    "side",
                    "window_sizes",
                    "cat_cols",
                    "frac",
                    "include_elo",
                    "include_rest_days",
                    "include_h2h",
                    "class_weight",
                )
                if k in clf_params
            },
            "temporal": {
                k: params.get("temporal", {}).get(k)
                for k in ("folds_start_year", "folds_end_year", "test_start")
                if params.get("temporal", {}).get(k) is not None
            },
        },
    )

    output_run_id_path.parent.mkdir(parents=True, exist_ok=True)
    with open(output_run_id_path, "w") as f:
        json.dump(
            {
                "run_id": best_run_id,
                "experiment_name": experiment_name,
                "model_uri": best_model_uri,
                "best_model_name": best_model_name,
            },
            f,
            indent=2,
        )

Tuning

CLI entrypoints for hyperparameter tuning stages (XGBoost, LogReg, HGB).

cli_tune_xgb(input_dataset_path, input_train_ids_path, input_folds_path, input_features_meta_path, output_best_params_path)

Run Optuna hyperparameter search for XGBoost and save best params.

Parameters:

Name Type Description Default
input_dataset_path Path

Parquet dataset with features and train IDs.

required
input_train_ids_path Path

Parquet with training match IDs.

required
input_folds_path Path

Parquet with CV fold definitions.

required
input_features_meta_path Path

Parquet features-meta table.

required
output_best_params_path Path

Destination for the best-params JSON.

required
Source code in src/pipelines/tune.py
@cli.command()
@click.argument(
    "input_dataset_path", type=click.Path(path_type=Path, exists=True, dir_okay=False)
)
@click.argument(
    "input_train_ids_path", type=click.Path(path_type=Path, exists=True, dir_okay=False)
)
@click.argument(
    "input_folds_path", type=click.Path(path_type=Path, exists=True, dir_okay=False)
)
@click.argument(
    "input_features_meta_path",
    type=click.Path(path_type=Path, exists=True, dir_okay=False),
)
@click.argument(
    "output_best_params_path", type=click.Path(path_type=Path, dir_okay=False)
)
def cli_tune_xgb(
    input_dataset_path: Path,
    input_train_ids_path: Path,
    input_folds_path: Path,
    input_features_meta_path: Path,
    output_best_params_path: Path,
) -> None:
    """Run Optuna hyperparameter search for XGBoost and save best params.

    Args:
        input_dataset_path: Parquet dataset with features and train IDs.
        input_train_ids_path: Parquet with training match IDs.
        input_folds_path: Parquet with CV fold definitions.
        input_features_meta_path: Parquet features-meta table.
        output_best_params_path: Destination for the best-params JSON.
    """
    params = load_params()
    clf_params = params["classification"]
    tuning_params = params.get("tuning", {})

    df_dataset = pd.read_parquet(input_dataset_path)
    df_train_ids = pd.read_parquet(input_train_ids_path)
    df_folds = pd.read_parquet(input_folds_path)
    df_features_meta = pd.read_parquet(input_features_meta_path)

    feat_params = params.get("features_selected", clf_params)
    num_cols = select_model_features(
        df_features_meta,
        side=feat_params["side"],
        window_sizes=feat_params["window_sizes"],
        include_elo=feat_params.get("include_elo", True),
        include_rest_days=feat_params.get("include_rest_days", True),
        include_h2h=feat_params.get("include_h2h", False),
    )
    cat_cols = feat_params["cat_cols"]
    all_cols = df_dataset.columns.tolist()
    X_cols = [c for c in all_cols if c in cat_cols + num_cols]

    _cfg = get_pipeline_config()
    os.environ.setdefault("MLFLOW_S3_ENDPOINT_URL", _cfg.minio_endpoint_url)
    os.environ.setdefault("AWS_ACCESS_KEY_ID", _cfg.minio_access_key)
    os.environ.setdefault("AWS_SECRET_ACCESS_KEY", _cfg.minio_secret_key)

    best_params = run_xgb_tuning(
        experiment_name=tuning_params.get(
            "experiment_name", clf_params["experiment_name"]
        ),
        tracking_uri=_cfg.mlflow_tracking_uri,
        df_dataset=df_dataset,
        df_train_ids=df_train_ids,
        df_folds=df_folds,
        X_cols=X_cols,
        y_col=clf_params["target_col"],
        num_cols=num_cols,
        cat_cols=cat_cols,
        n_trials=int(tuning_params.get("n_trials", 20)),
        frac=float(tuning_params.get("frac", 0.3)),
        feat_params=feat_params,
        run_kind=infer_run_kind(
            tuning_params.get("experiment_name", clf_params["experiment_name"]),
            "tuning",
        ),
    )

    output_best_params_path.parent.mkdir(parents=True, exist_ok=True)
    with open(output_best_params_path, "w") as f:
        json.dump(best_params, f, indent=2)

cli_tune_logreg(input_dataset_path, input_train_ids_path, input_folds_path, input_features_meta_path, output_best_params_path)

Run Optuna hyperparameter search for LogisticRegression and save best params.

Parameters:

Name Type Description Default
input_dataset_path Path

Parquet dataset with features and train IDs.

required
input_train_ids_path Path

Parquet with training match IDs.

required
input_folds_path Path

Parquet with CV fold definitions.

required
input_features_meta_path Path

Parquet features-meta table.

required
output_best_params_path Path

Destination for the best-params JSON.

required
Source code in src/pipelines/tune.py
@cli.command()
@click.argument(
    "input_dataset_path", type=click.Path(path_type=Path, exists=True, dir_okay=False)
)
@click.argument(
    "input_train_ids_path", type=click.Path(path_type=Path, exists=True, dir_okay=False)
)
@click.argument(
    "input_folds_path", type=click.Path(path_type=Path, exists=True, dir_okay=False)
)
@click.argument(
    "input_features_meta_path",
    type=click.Path(path_type=Path, exists=True, dir_okay=False),
)
@click.argument(
    "output_best_params_path", type=click.Path(path_type=Path, dir_okay=False)
)
def cli_tune_logreg(
    input_dataset_path: Path,
    input_train_ids_path: Path,
    input_folds_path: Path,
    input_features_meta_path: Path,
    output_best_params_path: Path,
) -> None:
    """Run Optuna hyperparameter search for LogisticRegression and save best params.

    Args:
        input_dataset_path: Parquet dataset with features and train IDs.
        input_train_ids_path: Parquet with training match IDs.
        input_folds_path: Parquet with CV fold definitions.
        input_features_meta_path: Parquet features-meta table.
        output_best_params_path: Destination for the best-params JSON.
    """
    params = load_params()
    clf_params = params["classification"]
    tuning_params = params.get("tuning_logreg", params.get("tuning", {}))

    if not tuning_params.get("enabled", True):
        output_best_params_path.parent.mkdir(parents=True, exist_ok=True)
        with open(output_best_params_path, "w") as f:
            json.dump({"cv_logloss": float("inf"), "best_params": {}}, f)
        return

    df_dataset = pd.read_parquet(input_dataset_path)
    df_train_ids = pd.read_parquet(input_train_ids_path)
    df_folds = pd.read_parquet(input_folds_path)
    df_features_meta = pd.read_parquet(input_features_meta_path)

    feat_params = params.get("features_selected", clf_params)
    num_cols = select_model_features(
        df_features_meta,
        side=feat_params["side"],
        window_sizes=feat_params["window_sizes"],
        include_elo=feat_params.get("include_elo", True),
        include_rest_days=feat_params.get("include_rest_days", True),
        include_h2h=feat_params.get("include_h2h", False),
    )
    cat_cols = feat_params["cat_cols"]
    all_cols = df_dataset.columns.tolist()
    X_cols = [c for c in all_cols if c in cat_cols + num_cols]

    _cfg = get_pipeline_config()
    os.environ.setdefault("MLFLOW_S3_ENDPOINT_URL", _cfg.minio_endpoint_url)
    os.environ.setdefault("AWS_ACCESS_KEY_ID", _cfg.minio_access_key)
    os.environ.setdefault("AWS_SECRET_ACCESS_KEY", _cfg.minio_secret_key)

    best_params = run_logreg_tuning(
        experiment_name=tuning_params.get(
            "experiment_name", clf_params["experiment_name"]
        ),
        tracking_uri=_cfg.mlflow_tracking_uri,
        df_dataset=df_dataset,
        df_train_ids=df_train_ids,
        df_folds=df_folds,
        X_cols=X_cols,
        y_col=clf_params["target_col"],
        num_cols=num_cols,
        cat_cols=cat_cols,
        n_trials=int(tuning_params.get("n_trials", 20)),
        frac=float(tuning_params.get("frac", 0.3)),
        feat_params=feat_params,
        run_kind=infer_run_kind(
            tuning_params.get("experiment_name", clf_params["experiment_name"]),
            "tuning",
        ),
    )

    output_best_params_path.parent.mkdir(parents=True, exist_ok=True)
    with open(output_best_params_path, "w") as f:
        json.dump(best_params, f, indent=2)

cli_tune_hgb(input_dataset_path, input_train_ids_path, input_folds_path, input_features_meta_path, output_best_params_path)

Run Optuna hyperparameter search for HistGradientBoosting and save best params.

Parameters:

Name Type Description Default
input_dataset_path Path

Parquet dataset with features and train IDs.

required
input_train_ids_path Path

Parquet with training match IDs.

required
input_folds_path Path

Parquet with CV fold definitions.

required
input_features_meta_path Path

Parquet features-meta table.

required
output_best_params_path Path

Destination for the best-params JSON.

required
Source code in src/pipelines/tune.py
@cli.command()
@click.argument(
    "input_dataset_path", type=click.Path(path_type=Path, exists=True, dir_okay=False)
)
@click.argument(
    "input_train_ids_path", type=click.Path(path_type=Path, exists=True, dir_okay=False)
)
@click.argument(
    "input_folds_path", type=click.Path(path_type=Path, exists=True, dir_okay=False)
)
@click.argument(
    "input_features_meta_path",
    type=click.Path(path_type=Path, exists=True, dir_okay=False),
)
@click.argument(
    "output_best_params_path", type=click.Path(path_type=Path, dir_okay=False)
)
def cli_tune_hgb(
    input_dataset_path: Path,
    input_train_ids_path: Path,
    input_folds_path: Path,
    input_features_meta_path: Path,
    output_best_params_path: Path,
) -> None:
    """Run Optuna hyperparameter search for HistGradientBoosting and save best params.

    Args:
        input_dataset_path: Parquet dataset with features and train IDs.
        input_train_ids_path: Parquet with training match IDs.
        input_folds_path: Parquet with CV fold definitions.
        input_features_meta_path: Parquet features-meta table.
        output_best_params_path: Destination for the best-params JSON.
    """
    params = load_params()
    clf_params = params["classification"]
    tuning_params = params.get("tuning_hgb", params.get("tuning", {}))

    df_dataset = pd.read_parquet(input_dataset_path)
    df_train_ids = pd.read_parquet(input_train_ids_path)
    df_folds = pd.read_parquet(input_folds_path)
    df_features_meta = pd.read_parquet(input_features_meta_path)

    feat_params = params.get("features_selected", clf_params)
    num_cols = select_model_features(
        df_features_meta,
        side=feat_params["side"],
        window_sizes=feat_params["window_sizes"],
        include_elo=feat_params.get("include_elo", True),
        include_rest_days=feat_params.get("include_rest_days", True),
        include_h2h=feat_params.get("include_h2h", False),
    )
    cat_cols = feat_params["cat_cols"]
    all_cols = df_dataset.columns.tolist()
    X_cols = [c for c in all_cols if c in cat_cols + num_cols]

    _cfg = get_pipeline_config()
    os.environ.setdefault("MLFLOW_S3_ENDPOINT_URL", _cfg.minio_endpoint_url)
    os.environ.setdefault("AWS_ACCESS_KEY_ID", _cfg.minio_access_key)
    os.environ.setdefault("AWS_SECRET_ACCESS_KEY", _cfg.minio_secret_key)

    best_params = run_hgb_tuning(
        experiment_name=tuning_params.get(
            "experiment_name", clf_params["experiment_name"]
        ),
        tracking_uri=_cfg.mlflow_tracking_uri,
        df_dataset=df_dataset,
        df_train_ids=df_train_ids,
        df_folds=df_folds,
        X_cols=X_cols,
        y_col=clf_params["target_col"],
        num_cols=num_cols,
        cat_cols=cat_cols,
        n_trials=int(tuning_params.get("n_trials", 20)),
        frac=float(tuning_params.get("frac", 0.3)),
        feat_params=feat_params,
        run_kind=infer_run_kind(
            tuning_params.get("experiment_name", clf_params["experiment_name"]),
            "tuning",
        ),
    )

    output_best_params_path.parent.mkdir(parents=True, exist_ok=True)
    with open(output_best_params_path, "w") as f:
        json.dump(best_params, f, indent=2)

Final Training

CLI entrypoint for the final training stage.

Reads the winner from select_model stage (best_model.json) which contains the best model name, its tuned hyperparameters, and the CV log-loss score from fair Optuna competition across all candidates.

Retrains the winner on the full training set and evaluates once on the holdout. The holdout set is intentionally NOT touched before this stage.

cli_final_train(input_dataset_path, input_train_ids_path, input_test_ids_path, input_features_meta_path, input_best_model_path, output_final_run_id_path)

Retrain the winning model on the full training set and evaluate on holdout.

Parameters:

Name Type Description Default
input_dataset_path Path

Parquet dataset with features and split IDs.

required
input_train_ids_path Path

Parquet with training match IDs.

required
input_test_ids_path Path

Parquet with holdout match IDs.

required
input_features_meta_path Path

Parquet features-meta table.

required
input_best_model_path Path

JSON written by the select_model stage with model_name, best_params, and cv_logloss.

required
output_final_run_id_path Path

Destination for the final MLflow run ID JSON.

required
Source code in src/pipelines/final_train.py
@cli.command()
@click.argument(
    "input_dataset_path", type=click.Path(path_type=Path, exists=True, dir_okay=False)
)
@click.argument(
    "input_train_ids_path", type=click.Path(path_type=Path, exists=True, dir_okay=False)
)
@click.argument(
    "input_test_ids_path", type=click.Path(path_type=Path, exists=True, dir_okay=False)
)
@click.argument(
    "input_features_meta_path",
    type=click.Path(path_type=Path, exists=True, dir_okay=False),
)
@click.argument(
    "input_best_model_path",
    type=click.Path(path_type=Path, exists=True, dir_okay=False),
)
@click.argument(
    "output_final_run_id_path", type=click.Path(path_type=Path, dir_okay=False)
)
def cli_final_train(
    input_dataset_path: Path,
    input_train_ids_path: Path,
    input_test_ids_path: Path,
    input_features_meta_path: Path,
    input_best_model_path: Path,
    output_final_run_id_path: Path,
) -> None:
    """Retrain the winning model on the full training set and evaluate on holdout.

    Args:
        input_dataset_path: Parquet dataset with features and split IDs.
        input_train_ids_path: Parquet with training match IDs.
        input_test_ids_path: Parquet with holdout match IDs.
        input_features_meta_path: Parquet features-meta table.
        input_best_model_path: JSON written by the ``select_model`` stage
            with ``model_name``, ``best_params``, and ``cv_logloss``.
        output_final_run_id_path: Destination for the final MLflow run ID JSON.
    """
    params = load_params()
    clf_params = params["classification"]

    # Read selection result from select_model stage
    selection = json.loads(input_best_model_path.read_text())
    best_model_name: str = selection["model_name"]
    best_params: dict = selection["best_params"]
    cv_logloss: float = selection["cv_logloss"]

    if not best_params:
        raise ValueError(
            f"select_model produced empty best_params for model={best_model_name!r}. "
            "Ensure all tuning stages ran successfully before final_train."
        )

    # Calibration config (optional; defaults to disabled if key absent)
    calibration_config: dict = params.get("final_train", {}).get("calibration", {})

    df_dataset = pd.read_parquet(input_dataset_path)
    df_train_ids = pd.read_parquet(input_train_ids_path)
    df_test_ids = pd.read_parquet(input_test_ids_path)
    df_features_meta = pd.read_parquet(input_features_meta_path)

    all_cols = df_dataset.columns.tolist()
    feat_params = params.get("features_selected", clf_params)
    num_cols = select_model_features(
        df_features_meta,
        side=feat_params["side"],
        window_sizes=feat_params["window_sizes"],
        include_elo=feat_params.get("include_elo", True),
        include_rest_days=feat_params.get("include_rest_days", True),
        include_h2h=feat_params.get("include_h2h", False),
    )
    cat_cols = feat_params["cat_cols"]
    X_cols = [col for col in all_cols if col in cat_cols + num_cols]

    _cfg = get_pipeline_config()
    os.environ.setdefault("MLFLOW_S3_ENDPOINT_URL", _cfg.minio_endpoint_url)
    os.environ.setdefault("AWS_ACCESS_KEY_ID", _cfg.minio_access_key)
    os.environ.setdefault("AWS_SECRET_ACCESS_KEY", _cfg.minio_secret_key)

    final_run_id, final_model_uri = make_final_train_run(
        experiment_name=params.get("final_train", {}).get(
            "experiment_name", clf_params["experiment_name"]
        ),
        tracking_uri=_cfg.mlflow_tracking_uri,
        dataset_path=str(input_dataset_path),
        df_dataset=df_dataset,
        df_train_ids=df_train_ids,
        df_test_ids=df_test_ids,
        X_cols=X_cols,
        y_col=clf_params["target_col"],
        model_name=best_model_name,
        best_params=best_params,
        num_cols=num_cols,
        cat_cols=cat_cols,
        calibration_config=calibration_config,
        feat_params=feat_params,
        run_kind=infer_run_kind(
            params.get("final_train", {}).get(
                "experiment_name", clf_params["experiment_name"]
            ),
            "final_train",
        ),
    )

    output_final_run_id_path.parent.mkdir(parents=True, exist_ok=True)
    with open(output_final_run_id_path, "w") as f:
        json.dump(
            {
                "run_id": final_run_id,
                "experiment_name": params.get("final_train", {}).get(
                    "experiment_name", clf_params["experiment_name"]
                ),
                "model_uri": final_model_uri,
                "model_name": best_model_name,
                "selection_cv_logloss": cv_logloss,
            },
            f,
            indent=2,
        )

Inference

CLI entrypoint for the batch inference (feature pre-computation) stage.

Computes ELO ratings and rolling statistics for all matches — both upcoming (future) and completed (finished) — and writes the result to data/predictions/match_features.parquet for the serving layer.

This stage is an INDEPENDENT BRANCH of the pipeline. It depends only on preprocessing outputs (finished.parquet, future.parquet) and the feature metadata produced by feature_engineering. It does NOT depend on classification_models, tune_xgb, final_train, or register_model, and can run in parallel with the training branch.

compute_all_match_features(df_finished, df_future, feature_columns, windows, stats_cols)

Compute rolling features for ALL matches — upcoming and finished.

Finished matches carry actual outcome columns (outcome_1x2, homeScore, awayScore) so the serving layer can compare predictions against real results. Future matches have NaN for those columns. An is_future boolean column distinguishes the two sets.

Parameters:

Name Type Description Default
df_finished DataFrame

Completed matches with full stats and outcomes.

required
df_future DataFrame

Upcoming matches without scores.

required
feature_columns list[str]

Ordered list of feature names expected by the model.

required

Returns:

Type Description
DataFrame

DataFrame indexed by match id containing feature columns,

DataFrame

metadata columns, outcome columns, and is_future.

Source code in src/pipelines/inference.py
def compute_all_match_features(
    df_finished: pd.DataFrame,
    df_future: pd.DataFrame,
    feature_columns: list[str],
    windows: list[int],
    stats_cols: list[str],
) -> pd.DataFrame:
    """Compute rolling features for ALL matches — upcoming and finished.

    Finished matches carry actual outcome columns (``outcome_1x2``,
    ``homeScore``, ``awayScore``) so the serving layer can compare
    predictions against real results.  Future matches have NaN for those
    columns.  An ``is_future`` boolean column distinguishes the two sets.

    Args:
        df_finished: Completed matches with full stats and outcomes.
        df_future:   Upcoming matches without scores.
        feature_columns: Ordered list of feature names expected by the model.

    Returns:
        DataFrame indexed by match ``id`` containing feature columns,
        metadata columns, outcome columns, and ``is_future``.
    """
    future_ids = set(df_future["id"].tolist())
    df_future_prep = _prepare_future_rows(df_future)

    # Combine: all finished first, then future — sorted chronologically
    df_combined = pd.concat([df_finished, df_future_prep], axis=0, ignore_index=True)

    # Build team-level table (2 rows per match: home + away)
    df_team = build_team_match_table(df_combined)
    leaky_cols = set(df_combined.columns.tolist() + df_team.columns.tolist())

    # Rolling features (same groups as in feature_engineering)
    df_team = add_rolling_features(
        df_team,
        group_keys=["teamId"],
        windows=windows,
        stats_cols=stats_cols,
        prefix="all",
    )
    df_team = add_rolling_features(
        df_team,
        group_keys=["teamId", "seasonId"],
        windows=windows,
        stats_cols=stats_cols,
        prefix="season",
    )
    df_team = add_rolling_features(
        df_team,
        group_keys=["teamId", "tournamentId"],
        windows=windows,
        stats_cols=stats_cols,
        prefix="tournament",
    )
    df_team = add_rolling_features(
        df_team,
        group_keys=["teamId", "is_home"],
        windows=windows,
        stats_cols=stats_cols,
        prefix="ha",
    )

    df_features_all = to_match_level(df_team, leaky_cols)

    # Keep ALL matches (future + finished)
    all_ids = future_ids | set(df_finished["id"].tolist())
    df_all = df_features_all[df_features_all.index.isin(all_ids)].copy()

    # Attach display metadata from the combined source.
    # df_future already has team names resolved from JSON by the CLI caller.
    meta_cols = [
        "sex",
        "startTimeUtc",
        "homeTeamId",
        "awayTeamId",
        "homeTeamName",
        "awayTeamName",
        "tournamentName",
        "regionName",
    ]
    df_src = pd.concat([df_future.set_index("id"), df_finished.set_index("id")], axis=0)
    for col in meta_cols:
        if col in df_src.columns and col not in df_all.columns:
            df_all[col] = df_src[col].reindex(df_all.index)

    # Attach actual outcomes for finished matches (NaN for future — no leakage
    # because these columns were excluded from feature computation via leaky_cols).
    outcome_cols = ["outcome_1x2", "homeScore", "awayScore"]
    df_finished_idx = df_finished.set_index("id")
    for col in outcome_cols:
        if col in df_finished_idx.columns:
            df_all[col] = df_finished_idx[col].reindex(df_all.index)

    # Flag to distinguish upcoming from finished matches
    df_all["is_future"] = df_all.index.isin(future_ids)

    # Align to the model's expected feature columns (fill any gap with NaN)
    missing = [c for c in feature_columns if c not in df_all.columns]
    if missing:
        logger.warning("Missing feature columns filled with NaN: %s", missing)
    for col in missing:
        df_all[col] = np.nan

    # Return: feature_columns first (for ordering), then any additional computed
    # feature columns (e.g. coverage columns not yet tracked in features_meta),
    # then metadata + outcomes.  Keeping extra feature columns here ensures that
    # models trained on a superset of feature_columns (e.g. coverage) still
    # receive their required inputs without requiring features_meta to be
    # regenerated.
    attached_meta = [c for c in meta_cols if c in df_all.columns]
    attached_extra = [c for c in outcome_cols + ["is_future"] if c in df_all.columns]
    metadata_set = set(attached_meta + attached_extra)
    output_cols = [c for c in feature_columns if c in df_all.columns]
    output_set = set(output_cols)
    extra_feat_cols = [
        c for c in df_all.columns if c not in metadata_set and c not in output_set
    ]
    extra_meta = [c for c in attached_meta + attached_extra if c not in output_set]
    return df_all[output_cols + extra_feat_cols + extra_meta]

cli_batch_inference(input_future_path, input_finished_path, input_features_meta_path, output_path, output_predictions_path)

Compute model features and predictions for all matches and save to parquet.

Parameters:

Name Type Description Default
input_future_path Path

Parquet of future (upcoming) match records.

required
input_finished_path Path

Parquet of finished match records.

required
input_features_meta_path Path

Parquet features-meta table.

required
output_path Path

Destination for the combined features parquet.

required
output_predictions_path Path

Destination for the predictions parquet.

required
Source code in src/pipelines/inference.py
@cli.command()
@click.argument(
    "input_future_path",
    type=click.Path(path_type=Path, exists=True, dir_okay=False),
)
@click.argument(
    "input_finished_path",
    type=click.Path(path_type=Path, exists=True, dir_okay=False),
)
@click.argument(
    "input_features_meta_path",
    type=click.Path(path_type=Path, exists=True, dir_okay=False),
)
@click.argument(
    "output_path",
    type=click.Path(path_type=Path, dir_okay=False),
)
@click.argument(
    "output_predictions_path",
    type=click.Path(path_type=Path, dir_okay=False),
)
def cli_batch_inference(
    input_future_path: Path,
    input_finished_path: Path,
    input_features_meta_path: Path,
    output_path: Path,
    output_predictions_path: Path,
) -> None:
    """Compute model features and predictions for all matches and save to parquet.

    Args:
        input_future_path: Parquet of future (upcoming) match records.
        input_finished_path: Parquet of finished match records.
        input_features_meta_path: Parquet features-meta table.
        output_path: Destination for the combined features parquet.
        output_predictions_path: Destination for the predictions parquet.
    """
    logger.info("Loading data…")
    df_future = pd.read_parquet(input_future_path)
    df_finished = pd.read_parquet(input_finished_path)
    df_features_meta = pd.read_parquet(input_features_meta_path)

    # Resolve display names from metadata JSONs (dropped during preprocessing to
    # save space; kept here for display in the serving layer).
    # Applied to both future and finished so historical view shows team names.
    metadata_path = input_features_meta_path.parent.parent / "metadata"
    for side in ("home", "away"):
        id_col = f"{side}TeamId"
        name_col = f"{side}TeamName"
        meta_file = metadata_path / f"{id_col}.json"
        if meta_file.exists():
            name_map: dict[str, str] = json.loads(meta_file.read_text(encoding="utf-8"))
            for df in (df_future, df_finished):
                if id_col in df.columns:
                    df[name_col] = df[id_col].astype(str).map(name_map)
        else:
            logger.warning("Team name metadata not found: %s", meta_file)
    for id_col, name_col in (
        ("tournamentId", "tournamentName"),
        ("regionId", "regionName"),
    ):
        meta_file = metadata_path / f"{id_col}.json"
        if meta_file.exists():
            name_map = json.loads(meta_file.read_text(encoding="utf-8"))
            for df in (df_future, df_finished):
                if id_col in df.columns:
                    df[name_col] = df[id_col].astype(str).map(name_map)
        else:
            logger.warning("Name metadata not found: %s", meta_file)

    params = load_params()
    clf_params = params["classification"]
    feat_params = params.get("features_selected", clf_params)
    windows = feat_params["window_sizes"]
    stats_cols = params["features"]["stats_cols"]

    num_feature_cols = select_model_features(
        df_features_meta,
        side=feat_params["side"],
        window_sizes=feat_params["window_sizes"],
        include_elo=feat_params.get("include_elo", True),
        include_rest_days=feat_params.get("include_rest_days", True),
        include_h2h=feat_params.get("include_h2h", False),
    )
    feature_columns = num_feature_cols + (["sex"] if "sex" in df_future.columns else [])

    # ELO features are computed on the combined timeline and joined separately.
    # Strip them from feature_columns before calling compute_all_match_features
    # so the function doesn't fill them with NaN placeholders and emit a warning.
    _ELO_COLS = ["home_elo_pre", "away_elo_pre", "diff_elo_pre"]
    elo_params = params["features"].get("elo", {})
    elo_include = elo_params.get("include", False)
    feature_columns_no_elo = (
        [c for c in feature_columns if c not in _ELO_COLS]
        if elo_include
        else feature_columns
    )

    # Optionally trim finished matches used for rolling-feature computation.
    # ELO must use the full timeline, so keep df_finished intact for that step.
    _inference_params = params.get("inference", {})
    history_years = int(_inference_params.get("history_years", 0))
    if history_years > 0 and "startTimeUtc" in df_finished.columns:
        cutoff = pd.Timestamp.now(tz="UTC") - pd.DateOffset(years=history_years)
        df_finished_rolling = df_finished[df_finished["startTimeUtc"] >= cutoff].copy()
        logger.info(
            "history_years=%d: trimmed finished matches for rolling features: %d%d",
            history_years,
            len(df_finished),
            len(df_finished_rolling),
        )
    else:
        df_finished_rolling = df_finished

    logger.info(
        "Computing features for %d future + %d finished matches using %d feature columns…",
        len(df_future),
        len(df_finished_rolling),
        len(feature_columns),
    )
    df_result = compute_all_match_features(
        df_finished_rolling,
        df_future,
        feature_columns_no_elo,
        windows=windows,
        stats_cols=stats_cols,
    )

    # ELO features — computed on the FULL timeline (not trimmed), joined by match id.
    if elo_include:
        df_combined_for_elo = pd.concat(
            [df_finished, df_future], axis=0, ignore_index=True
        )
        df_elo = compute_elo_ratings(
            df_combined_for_elo,
            k_factor=float(elo_params.get("k_factor", 32.0)),
            initial_rating=float(elo_params.get("initial_rating", 1500.0)),
            home_advantage=float(elo_params.get("home_advantage", 50.0)),
            group_col=str(elo_params.get("group_col", "tournamentId")),
        )
        # Always join all three ELO columns regardless of which ones appear in
        # feature_columns: the model may require home/away ELO even when
        # features_meta only tracks diff_elo_pre (e.g. stale cache scenario).
        df_result = df_result.join(df_elo.set_index("id")[_ELO_COLS], how="left")
        logger.info("ELO features joined to inference output.")

    output_path.parent.mkdir(parents=True, exist_ok=True)
    df_result.to_parquet(output_path)
    n_future = df_result["is_future"].sum()
    n_finished = (~df_result["is_future"]).sum()
    logger.info(
        "Saved match features: %d rows × %d cols → %s (%d future, %d finished)",
        len(df_result),
        df_result.shape[1],
        output_path,
        n_future,
        n_finished,
    )
    inference_params = params.get("inference", {})
    if inference_params.get("upload_match_features", True):
        _upload_future_features_to_minio(output_path)
    else:
        logger.info("upload_match_features=false — skipping MinIO upload.")

    # --- Batch predictions --------------------------------------------------
    model_name = inference_params.get("model_name", "soccer-match-outcome")
    model_stage = inference_params.get("model_stage", "champion")
    df_predictions = _compute_predictions(
        df_result, feature_columns, model_name, model_stage
    )
    if not df_predictions.empty:
        output_predictions_path.parent.mkdir(parents=True, exist_ok=True)
        df_predictions.to_parquet(output_predictions_path)
        logger.info(
            "Saved predictions: %d rows → %s",
            len(df_predictions),
            output_predictions_path,
        )
        if inference_params.get("upload_predictions", True):
            _upload_predictions_to_minio(output_predictions_path)
        else:
            logger.info("upload_predictions=false — skipping MinIO upload.")
    else:
        logger.warning("Predictions not computed — skipping save and upload.")
        # Write empty file so DVC outs are satisfied
        output_predictions_path.parent.mkdir(parents=True, exist_ok=True)
        pd.DataFrame().to_parquet(output_predictions_path)

Register Model

DVC pipeline stage: register the best MLflow run in the Model Registry.

Reads run_id.json produced by classification_models, creates/updates a registered model version, and transitions it to the configured stage.

The operation is idempotent: re-running with the same run_id is safe.

cli_register_model(input_run_id_path, output_registered_model_path, model_name, stage)

Register the best training run in the MLflow Model Registry.

Parameters:

Name Type Description Default
input_run_id_path Path

JSON with the run_id from the classification stage to register.

required
output_registered_model_path Path

Destination for the registered model metadata JSON.

required
model_name str | None

MLflow registered model name.

required
stage str | None

Initial alias or stage to assign (e.g. "challenger").

required
Source code in src/pipelines/register_model.py
@cli.command()
@click.argument(
    "input_run_id_path",
    type=click.Path(path_type=Path, exists=True, dir_okay=False),
)
@click.argument(
    "output_registered_model_path",
    type=click.Path(path_type=Path, dir_okay=False),
)
@click.option(
    "--model-name",
    default=None,
    help="Registered model name (default: from MLFLOW_MODEL_NAME env var)",
)
@click.option(
    "--stage",
    default=None,
    help="Target model stage (default: from MLFLOW_MODEL_STAGE env var)",
)
def cli_register_model(
    input_run_id_path: Path,
    output_registered_model_path: Path,
    model_name: str | None,
    stage: str | None,
) -> None:
    """Register the best training run in the MLflow Model Registry.

    Args:
        input_run_id_path: JSON with the ``run_id`` from the classification
            stage to register.
        output_registered_model_path: Destination for the registered model
            metadata JSON.
        model_name: MLflow registered model name.
        stage: Initial alias or stage to assign (e.g. ``"challenger"``).
    """
    with open(input_run_id_path) as f:
        payload = json.load(f)

    run_id: str = payload["run_id"]

    # model_name / model_stage come from params.yaml (Hydra-managed).
    # CLI flags --model-name / --stage are overrides for one-off runs.
    # get_pipeline_config().mlflow_* provides fallback defaults from env vars.
    _cfg = get_pipeline_config()
    _reg_params = load_params().get("register_model", {})
    model_name = model_name or _reg_params.get("model_name") or _cfg.mlflow_model_name
    stage = stage or _reg_params.get("model_stage") or _cfg.mlflow_model_stage

    mlflow.set_tracking_uri(_cfg.mlflow_tracking_uri)
    client = MlflowClient()

    # Create the registered model if it doesn't exist yet (idempotent)
    try:
        client.create_registered_model(model_name)
        logger.info("Created registered model: %s", model_name)
    except mlflow.exceptions.MlflowException as exc:
        if "already exists" in str(exc).lower() or "RESOURCE_ALREADY_EXISTS" in str(
            exc
        ):
            logger.info("Registered model already exists: %s", model_name)
        else:
            raise

    # Idempotency check: skip if alias already points to a version with this run_id
    try:
        existing = client.get_model_version_by_alias(model_name, stage)
        if existing.run_id == run_id:
            logger.info(
                "Alias '%s' already points to run_id=%s (version=%s), skipping.",
                stage,
                run_id,
                existing.version,
            )
            output_registered_model_path.parent.mkdir(parents=True, exist_ok=True)
            with open(output_registered_model_path, "w") as f:
                json.dump(
                    {
                        "run_id": run_id,
                        "model_name": model_name,
                        "version": existing.version,
                        "alias": stage,
                        "model_uri": payload.get("model_uri", ""),
                    },
                    f,
                    indent=2,
                )
            return
    except mlflow.exceptions.MlflowException:
        pass  # alias does not exist yet — proceed normally

    # model_uri is the authoritative artifact URI written by the training stage.
    # It is version-agnostic: MLflow 2.x stores "runs:/<id>/model",
    # MLflow 3.x stores "models:/m-<uuid>".
    if "model_uri" not in payload:
        raise KeyError(
            "'model_uri' missing from run_id.json. Re-run the classification_models "
            "stage to regenerate it with the current pipeline."
        )
    model_uri: str = payload["model_uri"]
    logger.info("Registering model version from %s under '%s'", model_uri, model_name)
    mv = mlflow.register_model(model_uri=model_uri, name=model_name)
    version = mv.version
    logger.info(
        "Registered model version %s for run_id=%s under '%s'",
        version,
        run_id,
        model_name,
    )

    # Set alias (MLflow 3.x replaces deprecated stages with aliases)
    client.set_registered_model_alias(
        name=model_name,
        alias=stage,
        version=version,
    )
    logger.info(
        "Set alias '%s' on '%s' v%s (run_id=%s)",
        stage,
        model_name,
        version,
        run_id,
    )

    output_registered_model_path.parent.mkdir(parents=True, exist_ok=True)
    with open(output_registered_model_path, "w") as f:
        json.dump(
            {
                "run_id": run_id,
                "model_name": model_name,
                "version": version,
                "alias": stage,
                "model_uri": model_uri,
            },
            f,
            indent=2,
        )
    logger.info("Registered model info written to %s", output_registered_model_path)

Data Validation (Great Expectations)

DVC pipeline stage: validate raw match data with Great Expectations.

Validates match_raw.parquet against the raw_match_suite expectation suite. Writes a JSON report to the output path and exits with code 1 on failure.

cli_validate_raw(input_path, output_report_path)

Validate raw match data against Great Expectations suite.

Parameters:

Name Type Description Default
input_path Path

Parquet of raw match records.

required
output_report_path Path

Destination for the validation report JSON.

required

Exits with code 1 if any expectation fails.

Source code in src/pipelines/validate_raw.py
@cli.command()
@click.argument(
    "input_path",
    type=click.Path(path_type=Path, exists=True, dir_okay=False),
)
@click.argument(
    "output_report_path",
    type=click.Path(path_type=Path, dir_okay=False),
)
def cli_validate_raw(input_path: Path, output_report_path: Path) -> None:
    """Validate raw match data against Great Expectations suite.

    Args:
        input_path: Parquet of raw match records.
        output_report_path: Destination for the validation report JSON.

    Exits with code 1 if any expectation fails.
    """
    df = pd.read_parquet(input_path)
    logger.info("Loaded raw data: shape=%s, columns=%s", df.shape, df.columns.tolist())

    context = gx.get_context(mode="ephemeral")
    suite = build_raw_suite(context)

    batch_definition = (
        context.data_sources.add_pandas("raw_source")
        .add_dataframe_asset("raw_asset")
        .add_batch_definition_whole_dataframe("raw_batch_def")
    )
    batch = batch_definition.get_batch(batch_parameters={"dataframe": df})
    results = batch.validate(suite)

    report = results.to_json_dict()
    output_report_path.parent.mkdir(parents=True, exist_ok=True)
    output_report_path.write_text(json.dumps(report, indent=2, default=str))
    logger.info("Validation report written to %s", output_report_path)

    if not results.success:
        failed = [
            r["expectation_config"]["type"]
            for r in report.get("results", [])
            if not r["success"]
        ]
        logger.error("Raw data validation FAILED. Failed expectations: %s", failed)
        sys.exit(1)

    total = len(report.get("results", []))
    logger.info("Raw data validation PASSED (%d expectations).", total)

DVC pipeline stage: validate preprocessed finished matches with Great Expectations.

Validates interim/finished.parquet against the finished_suite. Writes a JSON report and exits with code 1 on any expectation failure.

cli_validate_finished(input_path, output_report_path)

Validate preprocessed finished matches against Great Expectations suite.

Parameters:

Name Type Description Default
input_path Path

Parquet of preprocessed finished match records.

required
output_report_path Path

Destination for the validation report JSON.

required

Exits with code 1 if any expectation fails.

Source code in src/pipelines/validate_finished.py
@cli.command()
@click.argument(
    "input_path",
    type=click.Path(path_type=Path, exists=True, dir_okay=False),
)
@click.argument(
    "output_report_path",
    type=click.Path(path_type=Path, dir_okay=False),
)
def cli_validate_finished(input_path: Path, output_report_path: Path) -> None:
    """Validate preprocessed finished matches against Great Expectations suite.

    Args:
        input_path: Parquet of preprocessed finished match records.
        output_report_path: Destination for the validation report JSON.

    Exits with code 1 if any expectation fails.
    """
    df = pd.read_parquet(input_path)
    logger.info("Loaded finished data: shape=%s", df.shape)

    context = gx.get_context(mode="ephemeral")
    suite = build_finished_suite(context)

    batch_definition = (
        context.data_sources.add_pandas("finished_source")
        .add_dataframe_asset("finished_asset")
        .add_batch_definition_whole_dataframe("finished_batch_def")
    )
    batch = batch_definition.get_batch(batch_parameters={"dataframe": df})
    results = batch.validate(suite)

    report = results.to_json_dict()
    output_report_path.parent.mkdir(parents=True, exist_ok=True)
    output_report_path.write_text(json.dumps(report, indent=2, default=str))
    logger.info("Validation report written to %s", output_report_path)

    if not results.success:
        failed = [
            r["expectation_config"]["type"]
            for r in report.get("results", [])
            if not r["success"]
        ]
        logger.error("Finished data validation FAILED. Failed expectations: %s", failed)
        sys.exit(1)

    total = len(report.get("results", []))
    logger.info("Finished data validation PASSED (%d expectations).", total)

DVC pipeline stage: validate preprocessed future matches with Great Expectations.

Validates interim/future.parquet against the future_match_suite. Writes a JSON report and exits with code 1 on any expectation failure, including the anti-leakage check (score columns must be absent).

cli_validate_future(input_path, output_report_path)

Validate preprocessed future matches against Great Expectations suite.

Parameters:

Name Type Description Default
input_path Path

Parquet of preprocessed future match records.

required
output_report_path Path

Destination for the validation report JSON.

required

Also asserts that score and target columns are absent (anti-leakage). Exits with code 1 if any expectation fails.

Source code in src/pipelines/validate_future.py
@cli.command()
@click.argument(
    "input_path",
    type=click.Path(path_type=Path, exists=True, dir_okay=False),
)
@click.argument(
    "output_report_path",
    type=click.Path(path_type=Path, dir_okay=False),
)
def cli_validate_future(input_path: Path, output_report_path: Path) -> None:
    """Validate preprocessed future matches against Great Expectations suite.

    Args:
        input_path: Parquet of preprocessed future match records.
        output_report_path: Destination for the validation report JSON.

    Also asserts that score and target columns are absent (anti-leakage).
    Exits with code 1 if any expectation fails.
    """
    df = pd.read_parquet(input_path)
    logger.info("Loaded future data: shape=%s", df.shape)

    context = gx.get_context(mode="ephemeral")
    suite = build_future_suite(context)

    batch_definition = (
        context.data_sources.add_pandas("future_source")
        .add_dataframe_asset("future_asset")
        .add_batch_definition_whole_dataframe("future_batch_def")
    )
    batch = batch_definition.get_batch(batch_parameters={"dataframe": df})
    results = batch.validate(suite)

    report = results.to_json_dict()
    output_report_path.parent.mkdir(parents=True, exist_ok=True)
    output_report_path.write_text(json.dumps(report, indent=2, default=str))
    logger.info("Validation report written to %s", output_report_path)

    if not results.success:
        failed = [
            r["expectation_config"]["type"]
            for r in report.get("results", [])
            if not r["success"]
        ]
        logger.error("Validation FAILED. Failed expectations: %s", failed)
        sys.exit(1)

    logger.info("Validation passed.")

DVC pipeline stage: validate engineered features with Great Expectations.

Validates features.parquet against two suites: 1. Static schema suite (row count) 2. Dynamic per-column suite (completeness + value ranges, built from features_meta)

Writes a combined JSON report and exits with code 1 on any failure.

cli_validate_features(input_features_path, input_meta_path, output_report_path)

Validate engineered features against Great Expectations suites.

Parameters:

Name Type Description Default
input_features_path Path

Parquet with engineered feature columns.

required
input_meta_path Path

Parquet features-meta table.

required
output_report_path Path

Destination for the validation report JSON.

required

Exits with code 1 if any expectation fails.

Source code in src/pipelines/validate_features.py
@cli.command()
@click.argument(
    "input_features_path",
    type=click.Path(path_type=Path, exists=True, dir_okay=False),
)
@click.argument(
    "input_meta_path",
    type=click.Path(path_type=Path, exists=True, dir_okay=False),
)
@click.argument(
    "output_report_path",
    type=click.Path(path_type=Path, dir_okay=False),
)
def cli_validate_features(
    input_features_path: Path,
    input_meta_path: Path,
    output_report_path: Path,
) -> None:
    """Validate engineered features against Great Expectations suites.

    Args:
        input_features_path: Parquet with engineered feature columns.
        input_meta_path: Parquet features-meta table.
        output_report_path: Destination for the validation report JSON.

    Exits with code 1 if any expectation fails.
    """
    df_features = pd.read_parquet(input_features_path)
    df_meta = pd.read_parquet(input_meta_path)
    logger.info(
        "Loaded features: shape=%s, meta rows=%d",
        df_features.shape,
        len(df_meta),
    )

    feature_columns: list[str] = df_meta["name"].tolist()
    logger.info("Feature columns from meta: %d columns", len(feature_columns))

    # Sample the DataFrame for per-column validation to keep the GE run fast.
    # Schema validation (row count) always uses the full DataFrame.
    params = load_params()
    ge_sample_rows = int(params.get("data_quality", {}).get("ge_sample_rows", 0))
    df_for_column_validation = (
        df_features.sample(n=ge_sample_rows, random_state=42)
        if 0 < ge_sample_rows < len(df_features)
        else df_features
    )
    if ge_sample_rows > 0:
        logger.info(
            "Column validation will use %d sampled rows (full shape: %s).",
            len(df_for_column_validation),
            df_features.shape,
        )

    context = gx.get_context(mode="ephemeral")
    suite_schema = build_features_suite(context)
    suite_columns = build_features_column_expectations(context, feature_columns)

    results_schema = _run_suite(context, suite_schema, df_features, "schema")
    results_columns = _run_suite(
        context, suite_columns, df_for_column_validation, "columns"
    )

    combined_success = results_schema.success and results_columns.success
    combined_report = {
        "overall_success": combined_success,
        "schema_validation": results_schema.to_json_dict(),
        "column_validation": results_columns.to_json_dict(),
    }

    output_report_path.parent.mkdir(parents=True, exist_ok=True)
    output_report_path.write_text(json.dumps(combined_report, indent=2, default=str))
    logger.info("Validation report written to %s", output_report_path)

    if not combined_success:
        failed_schema = [
            r["expectation_config"]["type"]
            for r in combined_report["schema_validation"].get("results", [])
            if not r["success"]
        ]
        failed_cols = [
            r["expectation_config"].get("kwargs", {}).get("column", "unknown")
            for r in combined_report["column_validation"].get("results", [])
            if not r["success"]
        ]
        logger.error(
            "Features validation FAILED. Schema failures: %s. Column failures: %s",
            failed_schema,
            failed_cols,
        )
        sys.exit(1)

    total = len(combined_report["schema_validation"].get("results", [])) + len(
        combined_report["column_validation"].get("results", [])
    )
    logger.info("Features validation PASSED (%d expectations).", total)

Feature Metadata

Generate features_meta.parquet from configuration parameters only.

This stage is lightweight — it does not load any match data. Feature column names are derived deterministically from: - features.stats_cols - features.window_sizes - features.elo

This allows batch_inference to resolve its feature contract without depending on the heavy feature_engineering stage.

build_features_meta(stats_cols, window_sizes, elo_cfg)

Build features_meta DataFrame from config, without loading match data.

Parameters:

Name Type Description Default
stats_cols list[str]

Stat column names to generate windowed features for.

required
window_sizes list[int]

Rolling window sizes to enumerate.

required
elo_cfg dict

ELO configuration dict (expects include bool key).

required

Returns:

Type Description
DataFrame

DataFrame with columns name, side, scope, metric,

DataFrame

agg, window — one row per feature.

Source code in src/pipelines/features_meta.py
def build_features_meta(
    stats_cols: list[str],
    window_sizes: list[int],
    elo_cfg: dict,
) -> pd.DataFrame:
    """Build features_meta DataFrame from config, without loading match data.

    Args:
        stats_cols: Stat column names to generate windowed features for.
        window_sizes: Rolling window sizes to enumerate.
        elo_cfg: ELO configuration dict (expects ``include`` bool key).

    Returns:
        DataFrame with columns ``name``, ``side``, ``scope``, ``metric``,
        ``agg``, ``window`` — one row per feature.
    """
    cols: list[str] = []
    for scope in _SCOPES:
        for window in window_sizes:
            for stat in stats_cols:
                for side in _SIDES:
                    cols.append(f"{side}_{scope}_{stat}_mean_w{window}")
            for side in _SIDES:
                cols.append(f"{side}_{scope}_coverage_w{window}")

    rows = [parse_feature(col) for col in cols]

    if elo_cfg.get("include", False):
        for side in _SIDES:
            rows.append(
                {
                    "name": f"{side}_elo_pre",
                    "side": side,
                    "scope": "tournament",
                    "metric": "elo_pre",
                    "agg": "elo",
                    "window": 0,
                }
            )

    return pd.DataFrame(rows)

cli_generate_features_meta(output_path)

Run the generate-features-meta DVC stage.

Source code in src/pipelines/features_meta.py
@cli.command()
@click.argument("output_path", type=click.Path(path_type=Path, dir_okay=False))
def cli_generate_features_meta(output_path: Path) -> None:
    """Run the generate-features-meta DVC stage."""
    params = load_params()
    feat_params = params["features"]

    df_meta = build_features_meta(
        stats_cols=feat_params["stats_cols"],
        window_sizes=feat_params["window_sizes"],
        elo_cfg=feat_params.get("elo", {}),
    )

    output_path.parent.mkdir(parents=True, exist_ok=True)
    df_meta.to_parquet(output_path)

Model Selection & Promotion

DVC pipeline stage: select the best tuned model from all candidates.

Reads the tuning result files produced by tune_xgb, tune_logreg, and tune_hgb. Each file contains best_params dict AND a cv_logloss metric written by the corresponding tuning stage. This stage picks the candidate with the lowest mean CV log-loss and writes data/models/best_model.json which final_train reads to know which model to retrain on the full training set.

Design decisions

  • Decision is based purely on CV log-loss (same objective used in all three Optuna studies), so the comparison is fully apples-to-apples.
  • The output best_model.json contains both model_name and best_params so final_train needs only a single input file instead of three.
  • This stage is intentionally free of IO side-effects beyond writing the output file — no MLflow calls, no data reads.

cli_select_model(input_xgb_params_path, input_logreg_params_path, input_hgb_params_path, output_best_model_path)

Compare tuned model CV scores and write best_model.json for the final_train stage.

Parameters:

Name Type Description Default
input_xgb_params_path Path

JSON with best XGBoost tuning params.

required
input_logreg_params_path Path

JSON with best LogisticRegression tuning params.

required
input_hgb_params_path Path

JSON with best HGB tuning params.

required
output_best_model_path Path

Destination for the best-model selection JSON.

required
Source code in src/pipelines/select_model.py
@cli.command()
@click.argument(
    "input_xgb_params_path",
    type=click.Path(path_type=Path, exists=True, dir_okay=False),
)
@click.argument(
    "input_logreg_params_path",
    type=click.Path(path_type=Path, exists=True, dir_okay=False),
)
@click.argument(
    "input_hgb_params_path",
    type=click.Path(path_type=Path, exists=True, dir_okay=False),
)
@click.argument(
    "output_best_model_path", type=click.Path(path_type=Path, dir_okay=False)
)
def cli_select_model(
    input_xgb_params_path: Path,
    input_logreg_params_path: Path,
    input_hgb_params_path: Path,
    output_best_model_path: Path,
) -> None:
    """Compare tuned model CV scores and write best_model.json
    for the final_train stage.

    Args:
        input_xgb_params_path: JSON with best XGBoost tuning params.
        input_logreg_params_path: JSON with best LogisticRegression tuning params.
        input_hgb_params_path: JSON with best HGB tuning params.
        output_best_model_path: Destination for the best-model selection JSON.
    """
    params = load_params()
    clf_params = params["classification"]

    candidates: dict[str, dict] = {
        "xgb": json.loads(input_xgb_params_path.read_text()),
        "logreg": json.loads(input_logreg_params_path.read_text()),
        "hgb_numonly": json.loads(input_hgb_params_path.read_text()),
    }

    # Each tuning output has shape: {"best_params": {...}, "cv_logloss": float}
    scores = {name: data["cv_logloss"] for name, data in candidates.items()}
    best_name = min(scores, key=scores.__getitem__)
    best_params = candidates[best_name]["best_params"]
    best_score = scores[best_name]

    logger.info(
        "Model selection — CV log-loss scores: %s",
        {
            name: f"{score:.4f}"
            for name, score in sorted(scores.items(), key=lambda x: x[1])
        },
    )
    logger.info("Winner: %s (cv_logloss=%.4f)", best_name, best_score)

    # Log selection result to MLflow for traceability
    mlflow.set_tracking_uri(get_pipeline_config().mlflow_tracking_uri)
    _select_experiment_name = params.get("select_model", {}).get(
        "experiment_name", clf_params["experiment_name"]
    )
    set_experiment_active(_select_experiment_name)

    run_kind = infer_run_kind(_select_experiment_name, "select_model")
    _rk_prefix = f"{run_kind} | " if run_kind == "smoke" else ""

    with mlflow.start_run(
        run_name=f"{_rk_prefix}select_model",
        description="Model selection: compare CV log-loss across all tuned candidates",
    ):
        for name, score in scores.items():
            mlflow.log_metric(f"cv_logloss.{name}", score)
        mlflow.log_param("selected_model", best_name)
        mlflow.log_metric("cv_logloss.winner", best_score)

    output_best_model_path.parent.mkdir(parents=True, exist_ok=True)
    output_best_model_path.write_text(
        json.dumps(
            {
                "model_name": best_name,
                "best_params": best_params,
                "cv_logloss": best_score,
                "all_scores": scores,
                "experiment_name": params.get("select_model", {}).get(
                    "experiment_name", clf_params["experiment_name"]
                ),
            },
            indent=2,
        )
    )

DVC pipeline stage: promote a registered model to the 'candidate' alias.

Quality gate

The new model's promote_model.metric (default: final.logloss) must not exceed the current candidate's value by more than promote_model.tolerance (default: 0.002).

Alias scheme

smoke — always assigned by register_model; used for CI smoke tests. candidate — assigned here when the gate passes; safe for further manual review. champion — reserved for manual promotion (human sign-off); never touched here.

If no current candidate exists the gate is skipped and promotion always proceeds (first model registered to a fresh registry).

Failure semantics

A gate failure is an expected outcome — the stage exits successfully and writes promoted: false to the output JSON. The DVC pipeline does not fail. Downstream stages that depend on a specific alias will find that the alias still points to the previous candidate version.

cli_promote_model(input_registered_model_path, output_promoted_model_path)

Promote a registered model version to the candidate alias if quality gate passes.

Parameters:

Name Type Description Default
input_registered_model_path Path

JSON written by the register_model stage, containing the model_version to promote.

required
output_promoted_model_path Path

Destination for the promotion result JSON.

required
Source code in src/pipelines/promote_model.py
@cli.command()
@click.argument(
    "input_registered_model_path",
    type=click.Path(path_type=Path, exists=True, dir_okay=False),
)
@click.argument(
    "output_promoted_model_path",
    type=click.Path(path_type=Path, dir_okay=False),
)
def cli_promote_model(
    input_registered_model_path: Path,
    output_promoted_model_path: Path,
) -> None:
    """Promote a registered model version to the ``candidate`` alias
    if quality gate passes.

    Args:
        input_registered_model_path: JSON written by the ``register_model``
            stage, containing the ``model_version`` to promote.
        output_promoted_model_path: Destination for the promotion result JSON.
    """
    with open(input_registered_model_path) as f:
        payload = json.load(f)

    run_id: str = payload["run_id"]
    model_name: str = payload["model_name"]
    version: str = str(payload["version"])

    _cfg = get_pipeline_config()
    _params = load_params().get("promote_model", {})

    metric: str = _params.get("metric", "final.logloss")
    tolerance: float = float(_params.get("tolerance", 0.002))
    candidate_alias: str = _params.get("candidate_alias", "candidate")

    mlflow.set_tracking_uri(_cfg.mlflow_tracking_uri)
    client = MlflowClient()

    # ── Fetch new model's metric ───────────────────────────────────────────
    new_run = client.get_run(run_id)
    new_metric_raw = new_run.data.metrics.get(metric)
    if new_metric_raw is None:
        raise KeyError(
            f"Metric '{metric}' not found in run {run_id}. "
            "Ensure final_train logs this metric before running promote_model."
        )
    new_metric = float(new_metric_raw)

    # ── Fetch current candidate's metric (if any) ─────────────────────────
    current_metric: float = float("inf")
    current_version: str | None = None
    try:
        current = client.get_model_version_by_alias(model_name, candidate_alias)
        current_run = client.get_run(current.run_id)
        m = current_run.data.metrics.get(metric)
        if m is not None:
            current_metric = float(m)
            current_version = current.version
    except mlflow.exceptions.MlflowException:
        pass  # no current candidate → first registration, proceed unconditionally

    # ── Quality gate ──────────────────────────────────────────────────────
    promoted = new_metric <= current_metric + tolerance

    if promoted:
        client.set_registered_model_alias(
            name=model_name,
            alias=candidate_alias,
            version=version,
        )
        logger.info(
            "Promoted version %s to alias '%s': %s=%.4f ≤ current=%.4f + tol=%.4f",
            version,
            candidate_alias,
            metric,
            new_metric,
            current_metric,
            tolerance,
        )
    else:
        logger.warning(
            "Promotion skipped: new %s=%.4f > current=%.4f + tol=%.4f "
            "(new version=%s vs current candidate version=%s)",
            metric,
            new_metric,
            current_metric,
            tolerance,
            version,
            current_version,
        )

    output_promoted_model_path.parent.mkdir(parents=True, exist_ok=True)
    with open(output_promoted_model_path, "w") as f:
        json.dump(
            {
                "run_id": run_id,
                "model_name": model_name,
                "version": version,
                "promoted": promoted,
                "candidate_alias": candidate_alias if promoted else None,
                "metric": metric,
                "new_metric": new_metric,
                "current_metric": current_metric
                if current_metric != float("inf")
                else None,
                "tolerance": tolerance,
            },
            f,
            indent=2,
        )
    logger.info("Promotion result written to %s", output_promoted_model_path)

Live Scraping

Pipeline entrypoint: scrape livescores and persist to PostgreSQL.

Runs in a KubernetesPodOperator pod using the API image (Selenoid remote Chrome). Replaces the former update_livescores Celery task and PATCH /sources/livescores/.

Usage::

python -m src.pipelines.scrape_livescores         --date-end 2026-05-12         --count-days 3         [--no-update-db]         [--save-raw]

main(date_end, count_days, update_db, save_raw, timeout)

Scrape livescores from 1xbet/WhoScored for a window of past dates.

Parameters:

Name Type Description Default
date_end str

Last date to scrape, formatted YYYY-MM-DD.

required
count_days int

Number of days to scrape backwards from date_end.

required
update_db bool

Write parsed match records to PostgreSQL when True.

required
save_raw bool

Upload raw HTML responses to MinIO when True.

required
timeout int

Per-date Selenium page-load timeout in seconds.

required
Source code in src/pipelines/scrape_livescores.py
@click.command()
@click.option("--date-end", required=True, help="Last date to scrape (YYYY-MM-DD).")
@click.option(
    "--count-days",
    default=1,
    type=int,
    help="Days to scrape backwards from --date-end.",
)
@click.option(
    "--update-db/--no-update-db", default=True, help="Write records to PostgreSQL."
)
@click.option("--save-raw/--no-save-raw", default=False, help="Save raw HTML to MinIO.")
@click.option(
    "--timeout", default=60, type=int, help="Per-date Selenium timeout in seconds."
)
def main(
    date_end: str,
    count_days: int,
    update_db: bool,
    save_raw: bool,
    timeout: int,
) -> None:
    """Scrape livescores from 1xbet/WhoScored for a window of past dates.

    Args:
        date_end: Last date to scrape, formatted ``YYYY-MM-DD``.
        count_days: Number of days to scrape backwards from ``date_end``.
        update_db: Write parsed match records to PostgreSQL when ``True``.
        save_raw: Upload raw HTML responses to MinIO when ``True``.
        timeout: Per-date Selenium page-load timeout in seconds.
    """
    driver = create_webdriver()
    try:
        with Session(engine) as session:
            base = datetime.strptime(date_end, "%Y-%m-%d").date()
            for offset in range(count_days):
                date_livescores = base - timedelta(days=offset)
                date_livescores_str = date_livescores.strftime("%Y%m%d")
                logger.info("Scraping livescores for %s", date_livescores_str)
                url = f"https://1xbet.whoscored.com/livescores/data?d={date_livescores_str}"

                matches = None
                matches_raw: list = []
                last_err: Exception | None = None

                start = time.time()
                while (time.time() - start) < timeout:
                    try:
                        driver.get(url)
                        livescores_raw = driver.page_source
                        matches, matches_raw = get_list_livescore_matches(
                            livescores_raw
                        )
                        break
                    except Exception as exc:
                        last_err = exc
                        time.sleep(5)

                if matches is None:
                    raise RuntimeError(
                        f"Failed to load livescores for {date_livescores_str} "
                        f"within {timeout}s: {last_err}"
                    )

                if update_db:
                    new_matches: list[Match | MatchRaw] = []
                    update_matches: list[Match | MatchRaw] = []

                    for index, match in enumerate(matches):
                        current_match = session.get(Match, match.id)
                        if current_match is None:
                            new_matches.append(match)
                            new_matches.append(matches_raw[index])
                        elif current_match.status != 6:
                            update_matches.append(match)
                            update_matches.append(matches_raw[index])

                    session.add_all(new_matches)
                    for entry in update_matches:
                        session.merge(entry)
                else:
                    session.add_all(matches)
                    session.add_all(matches_raw)

                session.commit()
                logger.info("DB updated for %s", date_livescores_str)

                if save_raw:
                    storage.save_text_to_minio(
                        livescores_raw,
                        get_settings().minio.bucket_livescores_raw,
                        date_livescores_str,
                    )
                    logger.info("Raw HTML saved to MinIO for %s", date_livescores_str)
    finally:
        driver.quit()

Odds Collection (Fonbet)

Pipeline entrypoint: collect fon.bet odds snapshot via Selenium and save to MinIO.

Launches a headless Chrome via Selenoid, injects a CDP fetch/XHR interceptor, navigates to the fon.bet football page, and saves all captured JSON responses as a gzip-compressed snapshot.

Usage::

python -m src.pipelines.collect_fonbet_odds

main()

Collect daily fon.bet odds snapshot via Selenium and save to MinIO.

Source code in src/pipelines/collect_fonbet_odds.py
def main() -> None:
    """Collect daily fon.bet odds snapshot via Selenium and save to MinIO."""
    logger.info("Collecting fon.bet daily odds snapshot via Selenium...")
    try:
        path = save_daily_snapshot()
    except RuntimeError as exc:
        logger.error("fon.bet snapshot failed: %s", exc)
        sys.exit(1)
    logger.info("fon.bet snapshot saved: %s", path)

Pipeline: extract 1X2 odds from latest Fonbet snapshot for linked matches.

Reads match_links/fonbet_links.parquet to get the set of matched fonbet_event_id values, then loads the latest Fonbet snapshot from MinIO and extracts 1X2 coefficients (factors 921/922/923) for those events.

Output (upsert by fonbet_event_id): s3://{MINIO_BUCKET_DATA_RAW}/match_links/fonbet_odds.parquet

Columns:
    match_id          int64     – site match ID
    fonbet_event_id   int64     – Fonbet event ID
    odd_home          float64   – factor 921 (win for home)
    odd_draw          float64   – factor 922 (draw)
    odd_away          float64   – factor 923 (win for away)
    markets_count     int64     – total number of markets for this event
    snapshot_key      str       – MinIO key of the source snapshot
    fetched_at        datetime  – UTC timestamp of this pipeline run
Run

python -m src.pipelines.fetch_fonbet_odds

main()

Fetch Fonbet 1X2 odds for linked matches and write to MinIO.

Source code in src/pipelines/fetch_fonbet_odds.py
def main() -> None:
    """Fetch Fonbet 1X2 odds for linked matches and write to MinIO."""
    settings = get_minio_settings()
    s3 = boto3.client(
        "s3",
        endpoint_url=settings.endpoint_url,
        aws_access_key_id=settings.access_key,
        aws_secret_access_key=settings.secret_key,
        config=Config(signature_version="s3v4"),
    )

    # ── 1. Load links → matched fonbet_event_ids ──────────────────────
    logger.info("Loading fonbet_links.parquet from MinIO...")
    try:
        obj = s3.get_object(Bucket=settings.bucket_data_raw, Key=_LINKS_KEY)
        df_links = pd.read_parquet(io.BytesIO(obj["Body"].read()))
    except Exception as exc:
        logger.error("Failed to load fonbet_links.parquet: %s", exc)
        sys.exit(1)

    df_matched = df_links[df_links["fonbet_event_id"].notna()].copy()
    if df_matched.empty:
        logger.warning("No matched Fonbet event IDs in links store — nothing to do")
        return

    target_ids: set[int] = set(df_matched["fonbet_event_id"].astype(int).tolist())
    logger.info(
        "Links store: %d total rows, %d with Fonbet ID",
        len(df_links),
        len(target_ids),
    )

    # ── 2. Load latest Fonbet snapshot ────────────────────────────────
    logger.info("Listing Fonbet snapshots from MinIO...")
    resp = s3.list_objects_v2(Bucket=settings.bucket_data_raw, Prefix=_FONBET_PREFIX)
    keys = [o["Key"] for o in resp.get("Contents", []) if o["Key"].endswith(".json.gz")]
    if not keys:
        logger.error("No Fonbet snapshots found under %s", _FONBET_PREFIX)
        sys.exit(1)

    latest_key = max(keys)
    logger.info("Loading snapshot: %s", latest_key)
    obj = s3.get_object(Bucket=settings.bucket_data_raw, Key=latest_key)
    raw_fonbet: list[dict] = _json.loads(gzip.decompress(obj["Body"].read()))

    # ── 3. Parse snapshot → raw_events dict ──────────────────────────
    url_map: dict[str, Any] = {}
    for entry in raw_fonbet:
        url = entry.get("url", "")
        body = _body_of(entry)
        if body is not None:
            url_map[url] = body

    raw_events = _find_url(url_map, "events/list")
    if not isinstance(raw_events, dict):
        logger.error("events/list not found in Fonbet snapshot")
        sys.exit(1)

    n_factors = len(raw_events.get("customFactors", []))
    logger.info("customFactors in snapshot: %d entries", n_factors)

    # ── 4. Extract 1X2 odds ───────────────────────────────────────────
    df_odds = _extract_odds(raw_events, target_ids)
    n_full_1x2 = df_odds[["odd_home", "odd_draw", "odd_away"]].notna().all(axis=1).sum()
    logger.info(
        "Extracted odds: %d/%d events found in snapshot (%d with full 1X2)",
        len(df_odds),
        len(target_ids),
        n_full_1x2,
    )

    if df_odds.empty:
        logger.warning("No odds found for any matched event — nothing to save")
        return

    # ── 5. Join match_id (and fonbet_sport_id if available) from links ─
    link_cols = ["match_id", "fonbet_event_id"]
    if "fonbet_sport_id" in df_matched.columns:
        link_cols.append("fonbet_sport_id")
    id_map = df_matched[link_cols].copy()
    id_map["fonbet_event_id"] = id_map["fonbet_event_id"].astype(int)
    df_odds = df_odds.merge(id_map, on="fonbet_event_id", how="left")

    df_odds["snapshot_key"] = latest_key
    df_odds["fetched_at"] = pd.Timestamp.utcnow().replace(tzinfo=None)

    # Reorder columns: match_id first; fonbet_sport_id optional
    col_order = [
        "match_id",
        "fonbet_event_id",
        *(["fonbet_sport_id"] if "fonbet_sport_id" in df_odds.columns else []),
        "odd_home",
        "odd_draw",
        "odd_away",
        "markets_count",
        "snapshot_key",
        "fetched_at",
    ]
    df_odds = df_odds[col_order]

    # ── 6. Upsert to MinIO ────────────────────────────────────────────
    odds_url = f"s3://{settings.bucket_data_raw}/{_ODDS_KEY}"
    try:
        obj = s3.get_object(Bucket=settings.bucket_data_raw, Key=_ODDS_KEY)
        df_existing = pd.read_parquet(io.BytesIO(obj["Body"].read()))
        # Always overwrite existing rows — odds change over time
        df_existing = df_existing[
            ~df_existing["fonbet_event_id"].isin(df_odds["fonbet_event_id"])
        ]
        df_store = pd.concat([df_existing, df_odds], ignore_index=True)
        logger.info(
            "Existing odds: %d rows — upserted %d%d total",
            len(df_existing),
            len(df_odds),
            len(df_store),
        )
    except s3.exceptions.NoSuchKey:
        df_store = df_odds
        logger.info(
            "No existing odds store — creating fresh with %d rows", len(df_store)
        )
    except Exception as exc:
        logger.warning("Could not read existing odds store (%s) — overwriting", exc)
        df_store = df_odds

    df_store.to_parquet(odds_url, index=False, storage_options=settings.storage_options)

    n_with_odds = (
        df_store[["odd_home", "odd_draw", "odd_away"]].notna().all(axis=1).sum()
    )
    logger.info(
        "Saved %s%d/%d events with full 1X2 odds",
        odds_url,
        n_with_odds,
        len(df_store),
    )

Pipeline entrypoint: link Fonbet odds snapshots to upcoming site matches.

For every match in the upcoming MATCH_WINDOW_DAYS window (sourced from match_raw.parquet), finds the corresponding Fonbet event using a three-layer fuzzy-matching strategy (country → league → teams) and records the link in MinIO:

s3://{MINIO_BUCKET_DATA_RAW}/match_links/fonbet_links.parquet

An existing fonbet_links.parquet is loaded first.

  • Rows where fonbet_event_id is not null (already matched) are kept as-is — no re-matching is done.
  • Rows where fonbet_event_id is null (previously unmatched) and any new match_id values not yet in the store are re-attempted against the latest Fonbet snapshot.

This makes the pipeline safely re-runnable and idempotent.

Usage::

python -m src.pipelines.link_fonbet_odds

MATCH_WINDOW_DAYS : int (default 3) How many days ahead to include in the matching window. FONBET_TIME_WINDOW_MIN : int (default 90) ±tolerance in minutes when comparing kick-off times. FONBET_COUNTRY_THR : int (default 80) Minimum fuzzy score to accept a site region → Fonbet country mapping. FONBET_LEAGUE_THR : int (default 65) Minimum fuzzy score to accept a site tournament → Fonbet league mapping. FONBET_COMBINED_THR : int (default 75) Minimum combined (home+away)/2 team score to accept a match. FONBET_PER_TEAM_THR : int (default 40) Minimum per-team score required alongside the combined threshold.

Match every row in df_site against df_fonbet_idx.

Parameters:

Name Type Description Default
df_site DataFrame

DataFrame of upcoming site matches with columns match_id, regionName, tournamentName, homeTeamName, awayTeamName, startTimeUtc.

required
df_fonbet_idx DataFrame

Fonbet event index with columns fonbet_event_id, country, league, home, away, start_utc.

required
time_window Timedelta

Maximum allowed kick-off time difference.

required
country_thr int

Fuzzy-score threshold for region → country mapping.

required
league_thr int

Fuzzy-score threshold for tournament → league mapping.

required
combined_thr int

Minimum (home+away)/2 combined team score.

required
per_team_thr int

Minimum per-team score alongside combined_thr.

required

Returns:

Type Description
DataFrame with one row per site match and columns
DataFrame

match_id, fonbet_event_id, match_score,

DataFrame

site_region, site_tourn, site_home, site_away,

DataFrame

site_start, fb_country, fb_league.

Source code in src/pipelines/link_fonbet_odds.py
def run_matching(
    df_site: pd.DataFrame,
    df_fonbet_idx: pd.DataFrame,
    *,
    time_window: pd.Timedelta,
    country_thr: int,
    league_thr: int,
    combined_thr: int,
    per_team_thr: int,
) -> pd.DataFrame:
    """Match every row in *df_site* against *df_fonbet_idx*.

    Args:
        df_site: DataFrame of upcoming site matches with columns
            ``match_id``, ``regionName``, ``tournamentName``,
            ``homeTeamName``, ``awayTeamName``, ``startTimeUtc``.
        df_fonbet_idx: Fonbet event index with columns ``fonbet_event_id``,
            ``country``, ``league``, ``home``, ``away``, ``start_utc``.
        time_window: Maximum allowed kick-off time difference.
        country_thr: Fuzzy-score threshold for region → country mapping.
        league_thr: Fuzzy-score threshold for tournament → league mapping.
        combined_thr: Minimum (home+away)/2 combined team score.
        per_team_thr: Minimum per-team score alongside ``combined_thr``.

    Returns:
        DataFrame with one row per site match and columns:
        ``match_id``, ``fonbet_event_id``, ``match_score``,
        ``site_region``, ``site_tourn``, ``site_home``, ``site_away``,
        ``site_start``, ``fb_country``, ``fb_league``.
    """
    from rapidfuzz import fuzz
    from rapidfuzz import process as fuzz_process

    # tz-naive start for comparison
    df_fonbet_idx = df_fonbet_idx.copy()
    df_fonbet_idx["start_naive"] = df_fonbet_idx["start_utc"].dt.tz_localize(None)

    if df_site["startTimeUtc"].dt.tz is not None:
        df_site = df_site.copy()
        df_site["startTimeUtc"] = df_site["startTimeUtc"].dt.tz_localize(None)

    # Layer 1: region → country
    fb_countries = [c for c in df_fonbet_idx["country"].dropna().unique() if c]
    site_regions = df_site["regionName"].dropna().unique().tolist()
    region_map: dict[str, str | None] = {}
    for sr in site_regions:
        best = fuzz_process.extractOne(sr, fb_countries, scorer=fuzz.token_sort_ratio)
        region_map[sr] = best[0] if best and best[1] >= country_thr else None

    mapped = sum(1 for v in region_map.values() if v)
    logger.info("Region mapping: %d/%d site regions mapped", mapped, len(site_regions))

    # Layer 2: tournament → league (per country)
    tourn_pairs = df_site[["regionName", "tournamentName"]].drop_duplicates()
    tourn_map: dict[tuple[str, str], str | None] = {}
    for _, row in tourn_pairs.iterrows():
        sr, st = row["regionName"], row["tournamentName"]
        fb_country = region_map.get(sr)
        if fb_country:
            fb_leagues = (
                df_fonbet_idx.loc[df_fonbet_idx["country"] == fb_country, "league"]
                .dropna()
                .unique()
                .tolist()
            )
            best = fuzz_process.extractOne(st, fb_leagues, scorer=_lscore)
            tourn_map[(sr, st)] = best[0] if best and best[1] >= league_thr else None
        else:
            tourn_map[(sr, st)] = None

    mapped_t = sum(1 for v in tourn_map.values() if v)
    logger.info(
        "Tournament mapping: %d/%d site tournaments mapped", mapped_t, len(tourn_map)
    )

    # Layer 3: event-level team matching
    results = []
    for _, site_row in df_site.iterrows():
        sr = site_row["regionName"]
        st = site_row["tournamentName"]
        site_start = site_row["startTimeUtc"]
        site_home = str(site_row.get("homeTeamName") or "")
        site_away = str(site_row.get("awayTeamName") or "")
        match_id = site_row["id"]

        fb_country = region_map.get(sr)
        fb_league = tourn_map.get((sr, st))

        mask_time = (df_fonbet_idx["start_naive"] >= site_start - time_window) & (
            df_fonbet_idx["start_naive"] <= site_start + time_window
        )

        if fb_country and fb_league:
            window = df_fonbet_idx[
                mask_time
                & (df_fonbet_idx["country"] == fb_country)
                & (df_fonbet_idx["league"] == fb_league)
            ]
        elif fb_country:
            window = df_fonbet_idx[mask_time & (df_fonbet_idx["country"] == fb_country)]
        else:
            window = df_fonbet_idx[mask_time]

        best_event_id, best_score, best_sport_id = None, 0.0, None
        for _, fb_row in window.iterrows():
            sh = _bscore(site_home, str(fb_row["team_home"]))
            sa = _bscore(site_away, str(fb_row["team_away"]))
            comb = (sh + sa) / 2
            if comb >= combined_thr and sh >= per_team_thr and sa >= per_team_thr:
                if comb > best_score:
                    best_score, best_event_id = comb, fb_row["event_id"]
                    best_sport_id = fb_row.get("sport_id")

        results.append(
            {
                "match_id": match_id,
                "fonbet_event_id": best_event_id,
                "fonbet_sport_id": best_sport_id,
                "match_score": round(best_score, 1) if best_event_id else None,
                "site_region": sr,
                "site_tourn": st,
                "site_home": site_home,
                "site_away": site_away,
                "site_start": site_start,
                "fb_country": fb_country,
                "fb_league": fb_league,
            }
        )

    return pd.DataFrame(results)

Run the Fonbet match-linking pipeline.

Source code in src/pipelines/link_fonbet_odds.py
def main() -> None:
    """Run the Fonbet match-linking pipeline."""
    settings = get_minio_settings()
    s3 = boto3.client(
        "s3",
        endpoint_url=settings.endpoint_url,
        aws_access_key_id=settings.access_key,
        aws_secret_access_key=settings.secret_key,
        config=Config(signature_version="s3v4"),
    )
    match_window_days = _int_env("MATCH_WINDOW_DAYS", 3)
    time_window = pd.Timedelta(minutes=_int_env("FONBET_TIME_WINDOW_MIN", 90))
    country_thr = _int_env("FONBET_COUNTRY_THR", 80)
    league_thr = _int_env("FONBET_LEAGUE_THR", 65)
    combined_thr = _int_env("FONBET_COMBINED_THR", 75)
    per_team_thr = _int_env("FONBET_PER_TEAM_THR", 40)

    # ── 1. Load latest Fonbet snapshot ─────────────────────────────────
    logger.info("Listing Fonbet snapshots from MinIO...")
    resp = s3.list_objects_v2(Bucket=settings.bucket_data_raw, Prefix=_FONBET_PREFIX)
    keys = [o["Key"] for o in resp.get("Contents", []) if o["Key"].endswith(".json.gz")]
    if not keys:
        logger.error("No Fonbet snapshots found under %s", _FONBET_PREFIX)
        sys.exit(1)

    latest_key = max(keys)
    logger.info("Loading snapshot: %s", latest_key)
    obj = s3.get_object(Bucket=settings.bucket_data_raw, Key=latest_key)
    raw_fonbet: list[dict] = _json.loads(gzip.decompress(obj["Body"].read()))

    # ── 2. Parse snapshot ──────────────────────────────────────────────
    url_map: dict[str, Any] = {}
    for entry in raw_fonbet:
        url = entry.get("url", "")
        body = _body_of(entry)
        if body is not None:
            url_map[url] = body

    raw_events = _find_url(url_map, "events/list")
    raw_geo = _find_url(url_map, "geoCategories")

    if not isinstance(raw_events, dict):
        logger.error("events/list not found in Fonbet snapshot")
        sys.exit(1)

    geo_id_to_country = _build_geo_lookup(
        raw_geo if isinstance(raw_geo, dict) else None
    )
    sport_lookup = _build_sport_lookup(raw_events, geo_id_to_country)
    df_fonbet_all = _parse_events(raw_events, sport_lookup)

    now_utc = pd.Timestamp.utcnow().tz_localize(None)
    cutoff_utc = now_utc + pd.Timedelta(days=match_window_days)

    df_fb_fonbet = (
        df_fonbet_all[
            df_fonbet_all["is_football"]
            & ~df_fonbet_all["is_esports"]
            & (df_fonbet_all["level"] == 1)
            & df_fonbet_all["team_home"].notna()
            & ~df_fonbet_all["team_home"].isin(_PLACEHOLDER_TEAMS)
            & df_fonbet_all["team_away"].notna()
            & df_fonbet_all["start_utc"].notna()
            & (df_fonbet_all["start_utc"].dt.tz_localize(None) >= now_utc)
            & (df_fonbet_all["start_utc"].dt.tz_localize(None) <= cutoff_utc)
        ]
        .copy()
        .reset_index(drop=True)
    )
    logger.info(
        "Fonbet football events in window: %d  (countries: %d)",
        len(df_fb_fonbet),
        df_fb_fonbet["country"].nunique(),
    )

    # ── 3. Load match_raw → future candidates ──────────────────────────
    logger.info("Loading match_raw.parquet from MinIO...")
    try:
        obj = s3.get_object(Bucket=settings.bucket_data_raw, Key=_MATCH_RAW_KEY)
        df_matches_raw = pd.read_parquet(io.BytesIO(obj["Body"].read()))
    except Exception as exc:
        logger.error("Failed to load match_raw.parquet: %s", exc)
        sys.exit(1)

    now_naive = pd.Timestamp.utcnow().replace(tzinfo=None)
    cutoff_naive = now_naive + pd.Timedelta(days=match_window_days)
    df_matches_future = df_matches_raw[
        (df_matches_raw["startTimeUtc"] >= now_naive)
        & (df_matches_raw["startTimeUtc"] <= cutoff_naive)
    ].copy()
    logger.info("Future site matches: %d", len(df_matches_future))

    if df_matches_future.empty:
        logger.warning("No future site matches — nothing to match")
        return

    # ── 4. Load existing link store (upsert: skip already matched) ─────
    links_url = f"s3://{settings.bucket_data_raw}/{_LINKS_KEY}"
    already_matched_ids: set[int] = set()
    df_existing: pd.DataFrame | None = None

    try:
        obj = s3.get_object(Bucket=settings.bucket_data_raw, Key=_LINKS_KEY)
        df_existing = pd.read_parquet(io.BytesIO(obj["Body"].read()))
        already_matched_ids = set(
            df_existing.loc[df_existing["fonbet_event_id"].notna(), "match_id"].tolist()
        )
        logger.info(
            "Existing store: %d rows, %d already matched",
            len(df_existing),
            len(already_matched_ids),
        )
    except ClientError as exc:
        if exc.response["Error"]["Code"] == "NoSuchKey":
            logger.info("No existing link store — starting fresh")
        else:
            logger.warning(
                "Could not read existing link store (%s) — starting fresh", exc
            )
    except Exception as exc:
        logger.warning("Could not read existing link store (%s) — starting fresh", exc)

    df_to_match = df_matches_future[
        ~df_matches_future["id"].isin(already_matched_ids)
    ].copy()
    logger.info(
        "Matches to process: %d  (skipping %d already matched)",
        len(df_to_match),
        len(already_matched_ids),
    )

    if df_to_match.empty:
        logger.info("All future matches already linked — nothing to do")
        return

    # ── 5. Run matching ────────────────────────────────────────────────
    logger.info("Running fuzzy matching...")
    df_batch = run_matching(
        df_to_match,
        df_fb_fonbet,
        time_window=time_window,
        country_thr=country_thr,
        league_thr=league_thr,
        combined_thr=combined_thr,
        per_team_thr=per_team_thr,
    )
    df_batch["linked_at"] = pd.Timestamp.utcnow().replace(tzinfo=None)

    n_new_matched = df_batch["fonbet_event_id"].notna().sum()
    logger.info(
        "Batch result: %d/%d matched  (%.1f%%)",
        n_new_matched,
        len(df_batch),
        n_new_matched / len(df_batch) * 100 if df_batch.shape[0] else 0,
    )

    # ── 6. Upsert and save ─────────────────────────────────────────────
    _save_cols = [
        "match_id",
        "fonbet_event_id",
        "fonbet_sport_id",
        "match_score",
        "site_region",
        "site_tourn",
        "site_home",
        "site_away",
        "site_start",
        "fb_country",
        "fb_league",
        "linked_at",
    ]

    if df_existing is not None:
        # Back-fill new columns that may be absent in old parquet files.
        if "fonbet_sport_id" not in df_existing.columns:
            df_existing["fonbet_sport_id"] = None
        # Drop rows being refreshed (unmatched ones), keep already-matched
        keep_existing = df_existing[df_existing["match_id"].isin(already_matched_ids)]
        store = pd.concat(
            [keep_existing[_save_cols], df_batch[_save_cols]], ignore_index=True
        )
    else:
        store = df_batch[_save_cols].copy()

    buf = io.BytesIO()
    store.to_parquet(buf, index=False)
    buf.seek(0)
    s3.put_object(
        Bucket=settings.bucket_data_raw,
        Key=_LINKS_KEY,
        Body=buf.getvalue(),
        ContentType="application/octet-stream",
    )

    total_matched = store["fonbet_event_id"].notna().sum()
    logger.info(
        "Saved %s%d/%d rows with Fonbet ID (%.1f%%)",
        links_url,
        total_matched,
        len(store),
        total_matched / len(store) * 100 if len(store) else 0,
    )

Odds Loading (Football-Data.co.uk)

CLI entrypoint: download Bet365 closing odds from football-data.co.uk.

cli_load_odds_fdco(output_path)

Download Bet365 closing odds from football-data.co.uk and save to parquet.

Parameters:

Name Type Description Default
output_path Path

Destination parquet file path.

required

Reads leagues and seasons from params.yaml odds_fdco block.

Source code in src/pipelines/load_odds_fdco.py
@cli.command()
@click.argument("output_path", type=click.Path(path_type=Path, dir_okay=False))
def cli_load_odds_fdco(output_path: Path) -> None:
    """Download Bet365 closing odds from football-data.co.uk and save to parquet.

    Args:
        output_path: Destination parquet file path.

    Reads leagues and seasons from params.yaml odds_fdco block.
    """
    params = load_params()
    cfg = params["odds_fdco"]
    load_odds_fdco(
        output_path=output_path,
        seasons=cfg["seasons"],
        leagues=cfg["leagues"],
        extra_leagues=cfg.get("extra_leagues"),
    )

Odds Export / Upload

Pipeline entrypoint: export a PostgreSQL table to MinIO as Parquet.

Replaces the former export_data_raw Celery task and GET /sources/export/{name_table}.

Usage::

python -m src.pipelines.export_to_storage --table match
python -m src.pipelines.export_to_storage --table match_raw

main(table)

Export a PostgreSQL table to MinIO as a parquet file.

Parameters:

Name Type Description Default
table str

Name of the PostgreSQL table to export. Must be one of the tables listed in _ALLOWED_TABLES.

required
Source code in src/pipelines/export_to_storage.py
@click.command()
@click.option(
    "--table",
    required=True,
    type=click.Choice(sorted(_ALLOWED_TABLES), case_sensitive=False),
    help="PostgreSQL table to export.",
)
def main(table: str) -> None:
    """Export a PostgreSQL table to MinIO as a parquet file.

    Args:
        table: Name of the PostgreSQL table to export. Must be one of
            the tables listed in ``_ALLOWED_TABLES``.
    """
    logger.info("Exporting table '%s' to MinIO...", table)

    with engine.connect() as conn:
        df = pd.read_sql_query(text(f"SELECT * FROM {table}"), conn)

    logger.info("Loaded %d rows from '%s'", len(df), table)

    settings = get_settings()
    dest = f"s3://{settings.minio.bucket_data_raw}/{table}.parquet"
    df.to_parquet(dest, storage_options=settings.minio.storage_options)

    logger.info("Exported '%s' → %s", table, dest)

CLI command: upload inference artifacts to MinIO after dvc repro.

Run immediately after dvc repro batch_inference in the inference pod so that downstream pods (e.g. ml_live_betting_01) can download the artifacts without re-running the full pipeline.

Artifacts uploaded

  • data/interim/finished.parquetartifacts/finished.parquet
  • data/predictions/predictions.parquetartifacts/predictions.parquet

Both are stored in MINIO_BUCKET_PREDICTIONS.

upload_artifacts()

Upload finished.parquet and predictions.parquet to MinIO after dvc repro.

Source code in src/pipelines/upload_artifacts.py
@cli.command()
def upload_artifacts() -> None:
    """Upload finished.parquet and predictions.parquet to MinIO after dvc repro."""
    try:
        from src.app.config.storage import get_minio_settings

        settings = get_minio_settings()
    except Exception as exc:
        logger.error("MinIO settings unavailable: %s", exc)
        raise SystemExit(1) from exc

    bucket = settings.bucket_predictions
    if not bucket:
        logger.error("MINIO_BUCKET_PREDICTIONS not set — cannot upload artifacts.")
        raise SystemExit(1)

    for local_rel, remote_key in _ARTIFACTS:
        local_path = Path(local_rel)
        if not local_path.exists():
            logger.error("Artifact not found: %s", local_path.resolve())
            raise SystemExit(1)
        s3_url = f"s3://{bucket}/{remote_key}"
        df = pd.read_parquet(local_path)
        df.to_parquet(s3_url, storage_options=settings.storage_options)
        logger.info("Uploaded %s%s (%d rows)", local_rel, s3_url, len(df))

Analysis & Reporting

CLI entrypoint for the error analysis stage.

Loads the registered champion model from MLflow, runs inference on the holdout set, slices errors by league (tournamentId), region (regionId), Elo gap bins, season, and home/away. Writes a Markdown report and figures.

This stage is OPTIONAL in the pipeline — it does not gate registration.

cli_error_analysis(input_dataset_path, input_test_ids_path, input_features_meta_path, output_predictions_path, output_report_dir, data_dir, metadata_dir, roi_analysis_dir)

Run error analysis on holdout set and write Markdown report + figures.

Parameters:

Name Type Description Default
input_dataset_path Path

Parquet dataset with features and holdout IDs.

required
input_test_ids_path Path

Parquet with holdout match IDs.

required
input_features_meta_path Path

Parquet features-meta table.

required
output_predictions_path Path

Destination for holdout predictions parquet.

required
output_report_dir Path

Directory for the Markdown report and figures.

required
data_dir Path

Directory containing DVC-versioned data artifacts.

required
metadata_dir Path

Directory with regionId.json / tournamentId.json lookup files.

required
roi_analysis_dir Path | None

Optional directory for ROI sub-report artifacts.

required
Source code in src/pipelines/error_analysis.py
@cli.command()
@click.argument(
    "input_dataset_path", type=click.Path(path_type=Path, exists=True, dir_okay=False)
)
@click.argument(
    "input_test_ids_path", type=click.Path(path_type=Path, exists=True, dir_okay=False)
)
@click.argument(
    "input_features_meta_path",
    type=click.Path(path_type=Path, exists=True, dir_okay=False),
)
@click.argument(
    "output_predictions_path",
    type=click.Path(path_type=Path, dir_okay=False),
)
@click.argument(
    "output_report_dir", type=click.Path(path_type=Path, dir_okay=True, file_okay=False)
)
@click.option(
    "--data-dir",
    default="data/analysis/error",
    type=click.Path(path_type=Path, dir_okay=True, file_okay=False),
    help="Directory for CSV data outputs (segment CSVs, roi_simulation.csv). Defaults to data/analysis/error.",
)
@click.option(
    "--metadata-dir",
    default="data/metadata",
    type=click.Path(path_type=Path, dir_okay=True, file_okay=False),
    help="Path to data/metadata/ directory for team name index.",
)
@click.option(
    "--roi-analysis-dir",
    default=None,
    type=click.Path(path_type=Path, dir_okay=True, file_okay=False),
    help="Path to roi_analysis data dir (e.g. data/analysis/roi). "
    "If given, roi_pct per region is joined into the Error by region table.",
)
def cli_error_analysis(
    input_dataset_path: Path,
    input_test_ids_path: Path,
    input_features_meta_path: Path,
    output_predictions_path: Path,
    output_report_dir: Path,
    data_dir: Path,
    metadata_dir: Path,
    roi_analysis_dir: Path | None,
) -> None:
    """Run error analysis on holdout set and write Markdown report + figures.

    Args:
        input_dataset_path: Parquet dataset with features and holdout IDs.
        input_test_ids_path: Parquet with holdout match IDs.
        input_features_meta_path: Parquet features-meta table.
        output_predictions_path: Destination for holdout predictions parquet.
        output_report_dir: Directory for the Markdown report and figures.
        data_dir: Directory containing DVC-versioned data artifacts.
        metadata_dir: Directory with ``regionId.json`` / ``tournamentId.json``
            lookup files.
        roi_analysis_dir: Optional directory for ROI sub-report artifacts.
    """
    params = load_params()
    clf_params = params["classification"]

    _cfg = get_pipeline_config()
    os.environ.setdefault("MLFLOW_S3_ENDPOINT_URL", _cfg.minio_endpoint_url)
    os.environ.setdefault("AWS_ACCESS_KEY_ID", _cfg.minio_access_key)
    os.environ.setdefault("AWS_SECRET_ACCESS_KEY", _cfg.minio_secret_key)
    mlflow.set_tracking_uri(_cfg.mlflow_tracking_uri)

    # Load id→name mappings from metadata/ (values are strings keyed by str id)
    def _load_id_map(name: str) -> dict[str, str]:
        p = Path(metadata_dir) / f"{name}.json"
        if not p.exists():
            return {}
        with open(p) as _f:
            raw = json.load(_f)
        return {str(k): str(v) for k, v in raw.items()}

    region_map = _load_id_map("regionId")
    tournament_map = _load_id_map("tournamentId")

    df_dataset = pd.read_parquet(input_dataset_path)
    df_test_ids = pd.read_parquet(input_test_ids_path)
    df_features_meta = pd.read_parquet(input_features_meta_path)

    register_params = params["register_model"]
    model_name: str = register_params["model_name"]
    model_stage: str = register_params["model_stage"]
    model_uri = f"models:/{model_name}@{model_stage}"

    feat_params = params.get("features_selected", clf_params)
    num_cols = select_model_features(
        df_features_meta,
        side=feat_params["side"],
        window_sizes=feat_params["window_sizes"],
        include_elo=feat_params.get("include_elo", True),
        include_rest_days=feat_params.get("include_rest_days", True),
        include_h2h=feat_params.get("include_h2h", False),
    )
    cat_cols: list[str] = feat_params["cat_cols"]
    target_col: str = clf_params["target_col"]

    df_holdout = df_dataset[df_dataset["id"].isin(df_test_ids["id"])].copy()
    X_cols = [c for c in df_dataset.columns if c in cat_cols + num_cols]
    X_holdout = df_holdout[X_cols].copy()
    y_holdout = df_holdout[target_col].copy()
    target_labels = sorted(df_holdout[target_col].unique())

    logger.info("Loading model from %s", model_uri)
    model = mlflow.sklearn.load_model(model_uri)
    proba = model.predict_proba(X_holdout)

    overall = evaluate_clf(y=y_holdout, proba=proba, label_order=target_labels)

    # Save holdout predictions for downstream roi_analysis stage.
    # Includes match metadata (homeTeamId, awayTeamId, startTimeUtc) needed for odds join.
    proba_cols = [f"proba_{lbl}" for lbl in target_labels]
    df_preds = pd.DataFrame(proba, columns=proba_cols)
    df_preds["y_true"] = y_holdout.values
    _meta_cols = [
        c
        for c in ["id", "homeTeamId", "awayTeamId", "startTimeUtc"]
        if c in df_holdout.columns
    ]
    for col in _meta_cols:
        df_preds[col] = df_holdout[col].values
    Path(output_predictions_path).parent.mkdir(parents=True, exist_ok=True)
    df_preds.to_parquet(output_predictions_path, index=False)
    logger.info(
        "Holdout predictions saved to %s (%d rows)",
        output_predictions_path,
        len(df_preds),
    )

    reference_proba: np.ndarray | None = None
    odds_baseline_label = (
        "uniform prior (1/3 per class) — run roi_analysis stage for real Bet365 odds"
    )

    roi_overall = compute_flat_stake_roi(
        y_true=y_holdout.to_numpy(),
        model_proba=proba,
        label_order=target_labels,
        reference_proba=reference_proba,
    )

    _col_map = {"regionId": region_map, "tournamentId": tournament_map}
    segment_results: dict[str, pd.DataFrame] = {}
    for col in ["tournamentId", "regionId"]:
        if col not in df_holdout.columns:
            continue
        id_map = _col_map[col]
        rows = []
        for val in df_holdout[col].dropna().unique():
            mask = df_holdout[col] == val
            if mask.sum() < 30:
                continue
            m = evaluate_clf(
                y=y_holdout[mask], proba=proba[mask], label_order=target_labels
            )
            label = id_map.get(str(int(val)), str(val)) if id_map else str(val)
            rows.append({"segment": label, "id": val, "n": int(mask.sum()), **m})
        if rows:
            segment_results[col] = pd.DataFrame(rows).sort_values("logloss")

    elo_gap_rows: list[dict] = []
    elo_col = next(
        (c for c in df_holdout.columns if "elo" in c.lower() and "diff" in c.lower()),
        None,
    )
    if elo_col:
        df_holdout = df_holdout.copy()
        df_holdout["_elo_bin"] = df_holdout[elo_col].apply(_elo_gap_label)
        for bin_label in df_holdout["_elo_bin"].unique():
            mask = df_holdout["_elo_bin"] == bin_label
            if mask.sum() < 20:
                continue
            m = evaluate_clf(
                y=y_holdout[mask], proba=proba[mask], label_order=target_labels
            )
            elo_gap_rows.append({"elo_gap_bin": bin_label, "n": int(mask.sum()), **m})
    df_elo_bins = pd.DataFrame(elo_gap_rows) if elo_gap_rows else pd.DataFrame()

    season_rows: list[dict] = []
    if "seasonId" in df_holdout.columns:
        for season in df_holdout["seasonId"].dropna().unique():
            mask = df_holdout["seasonId"] == season
            if mask.sum() < 30:
                continue
            m = evaluate_clf(
                y=y_holdout[mask], proba=proba[mask], label_order=target_labels
            )
            season_rows.append({"seasonId": season, "n": int(mask.sum()), **m})
    df_seasons = (
        pd.DataFrame(season_rows).sort_values("seasonId")
        if season_rows
        else pd.DataFrame()
    )

    output_report_dir = Path(output_report_dir)
    data_dir = Path(data_dir)
    figures_dir = output_report_dir / "figures"
    output_report_dir.mkdir(parents=True, exist_ok=True)
    data_dir.mkdir(parents=True, exist_ok=True)
    figures_dir.mkdir(parents=True, exist_ok=True)

    for col, df_seg in segment_results.items():
        df_seg.to_csv(data_dir / f"segment_{col}.csv", index=False)
    if not df_elo_bins.empty:
        df_elo_bins.to_csv(data_dir / "elo_gap_bins.csv", index=False)
    if not df_seasons.empty:
        df_seasons.to_csv(data_dir / "season_metrics.csv", index=False)
    pd.DataFrame([roi_overall]).to_csv(data_dir / "roi_simulation.csv", index=False)

    if "regionId" in segment_results:
        df_r = segment_results["regionId"].head(20)
        fig, ax = plt.subplots(figsize=(10, 6))
        ax.barh(df_r["segment"], df_r["logloss"])
        ax.set_xlabel("Log-loss")
        ax.set_title("Log-loss by region (top-20, holdout set)")
        ax.invert_yaxis()
        plt.tight_layout()
        fig.savefig(figures_dir / "logloss_by_region.png", dpi=120)
        plt.close(fig)

    if not df_elo_bins.empty:
        fig, ax = plt.subplots(figsize=(8, 4))
        ax.bar(df_elo_bins["elo_gap_bin"], df_elo_bins["logloss"])
        ax.set_ylabel("Log-loss")
        ax.set_title("Log-loss by Elo gap bin (holdout set)")
        plt.tight_layout()
        fig.savefig(figures_dir / "logloss_by_elo_gap.png", dpi=120)
        plt.close(fig)

    lines = [
        "# Error Analysis — v1 Champion Model",
        "",
        f"> **Model**: `{model_name}` @ `{model_stage}`",
        f"> **Holdout slice**: 2024+ ({len(y_holdout)} matches)",
        f"> **ROI baseline**: {odds_baseline_label}",
        "",
        "## Overall holdout metrics",
        "",
        "| Metric | Value |",
        "|--------|-------|",
    ]
    for k, v in overall.items():
        lines.append(f"| {k} | {v:.4f} |")

    lines += [
        "",
        "## ROI simulation (flat-stake vs uniform prior)",
        "",
        "> Warning: simulation only — no live betting implied.",
        "",
        "| Metric | Value |",
        "|--------|-------|",
    ]
    for k, v in roi_overall.items():
        val_str = f"{v:.2f}" if isinstance(v, float) and not np.isnan(v) else str(v)
        lines.append(f"| {k} | {val_str} |")

    if not df_elo_bins.empty:
        lines += [
            "",
            "## Error by Elo gap bin",
            "",
            "![Elo gap](figures/logloss_by_elo_gap.png)",
            "",
        ]
        lines += [
            "| elo_gap_bin | n | logloss | brier | ece |",
            "|---|---|---|---|---|",
        ]
        for _, row in df_elo_bins.iterrows():
            lines.append(
                f"| {row.elo_gap_bin} | {row.n} | {row.logloss:.4f} | {row.brier:.4f} | {row.ece:.4f} |"
            )

    if "regionId" in segment_results:
        df_region_report = segment_results["regionId"].head(20).copy()
        roi_joined = False
        if roi_analysis_dir is not None:
            roi_region_path = Path(roi_analysis_dir) / "roi_by_region.csv"
            if roi_region_path.exists():
                df_roi_reg = pd.read_csv(roi_region_path)[
                    ["regionId", "n_bets", "roi_pct"]
                ]
                df_region_report = df_region_report.merge(
                    df_roi_reg, left_on="id", right_on="regionId", how="left"
                )
                roi_joined = True
            else:
                logger.warning(
                    "roi_by_region.csv not found in %s — skipping roi_pct join",
                    roi_analysis_dir,
                )
        lines += [
            "",
            "## Error by region (top-20)",
            "",
            "![Region](figures/logloss_by_region.png)",
            "",
        ]
        if roi_joined:
            lines += [
                "| region | n | logloss | brier | ece | n_bets | roi_pct |",
                "|---|---|---|---|---|---|---|",
            ]
            for _, row in df_region_report.iterrows():
                n_bets = int(row["n_bets"]) if pd.notna(row.get("n_bets")) else "—"
                roi = f"{row['roi_pct']:.2f}%" if pd.notna(row.get("roi_pct")) else "—"
                lines.append(
                    f"| {row['segment']} | {int(row['n'])} | {row['logloss']:.4f}"
                    f" | {row['brier']:.4f} | {row['ece']:.4f} | {n_bets} | {roi} |"
                )
        else:
            lines += ["| region | n | logloss | brier | ece |", "|---|---|---|---|---|"]
            for _, row in df_region_report.iterrows():
                lines.append(
                    f"| {row['segment']} | {int(row['n'])} | {row['logloss']:.4f} | {row['brier']:.4f} | {row['ece']:.4f} |"
                )

    if not df_seasons.empty:
        lines += ["", "## Error by season", ""]
        lines += ["| seasonId | n | logloss | brier | ece |", "|---|---|---|---|---|"]
        for _, row in df_seasons.iterrows():
            lines.append(
                f"| {row.seasonId} | {row.n} | {row.logloss:.4f} | {row.brier:.4f} | {row.ece:.4f} |"
            )

    report_path = output_report_dir / "index.md"
    report_path.write_text("\n".join(lines), encoding="utf-8")
    logger.info("Error analysis report written to %s", report_path)

CLI entrypoint for the ROI analysis stage.

Depends only on the holdout predictions saved by error_analysis and the Bet365 closing odds from football-data.co.uk (load_odds_fdco stage).

Separation rationale

Changing odds configuration (seasons, leagues) should not force a full model pipeline rerun. This stage runs independently of all ML stages — it only needs the holdout_predictions.parquet output of error_analysis plus the odds file.

Usage (standalone): dvc repro roi_analysis

cli_roi_analysis(input_predictions_path, input_odds_path, input_dataset_path, output_report_dir, data_dir, metadata_dir, error_analysis_dir)

Compute ROI simulation against Bet365 closing odds on the holdout set.

Parameters:

Name Type Description Default
input_predictions_path Path

Parquet with holdout predictions.

required
input_odds_path Path

Parquet with Bet365 closing odds.

required
input_dataset_path Path

Full dataset parquet (for region/ELO enrichment).

required
output_report_dir Path

Directory for report artifacts.

required
data_dir Path

Directory containing DVC-versioned data artifacts.

required
metadata_dir Path

Directory with segment lookup JSON files.

required
error_analysis_dir Path | None

Optional directory for error-analysis sub-reports.

required
Source code in src/pipelines/roi_analysis.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
@cli.command()
@click.argument(
    "input_predictions_path",
    type=click.Path(path_type=Path, exists=True, dir_okay=False),
)
@click.argument(
    "input_odds_path",
    type=click.Path(path_type=Path, exists=True, dir_okay=False),
)
@click.argument(
    "input_dataset_path",
    type=click.Path(path_type=Path, exists=True, dir_okay=False),
)
@click.argument(
    "output_report_dir",
    type=click.Path(path_type=Path, dir_okay=True, file_okay=False),
)
@click.option(
    "--data-dir",
    default="data/analysis/roi",
    type=click.Path(path_type=Path, dir_okay=True, file_okay=False),
    help="Directory for CSV data outputs (roi_by_region.csv, etc.). Defaults to data/analysis/roi.",
)
@click.option(
    "--metadata-dir",
    default="data/metadata",
    type=click.Path(path_type=Path, dir_okay=True, file_okay=False),
    help="Path to data/metadata/ for team name lookup.",
)
@click.option(
    "--error-analysis-dir",
    default=None,
    type=click.Path(path_type=Path, dir_okay=True, file_okay=False),
    help="Path to error_analysis data dir (e.g. data/analysis/error). "
    "If given, logloss per region is joined and low-logloss regions are marked.",
)
def cli_roi_analysis(
    input_predictions_path: Path,
    input_odds_path: Path,
    input_dataset_path: Path,
    output_report_dir: Path,
    data_dir: Path,
    metadata_dir: Path,
    error_analysis_dir: Path | None,
) -> None:
    """Compute ROI simulation against Bet365 closing odds on the holdout set.

    Args:
        input_predictions_path: Parquet with holdout predictions.
        input_odds_path: Parquet with Bet365 closing odds.
        input_dataset_path: Full dataset parquet (for region/ELO enrichment).
        output_report_dir: Directory for report artifacts.
        data_dir: Directory containing DVC-versioned data artifacts.
        metadata_dir: Directory with segment lookup JSON files.
        error_analysis_dir: Optional directory for error-analysis sub-reports.
    """
    df_preds = pd.read_parquet(input_predictions_path)

    # Enrich predictions with regionId and ELO from the full dataset (join on id)
    _enrich_cols = ["id", "regionId", "diff_elo_pre"]
    df_dataset = pd.read_parquet(
        input_dataset_path,
        columns=[c for c in _enrich_cols],
    )
    df_preds = df_preds.reset_index(drop=True).merge(
        df_dataset.reset_index(drop=True), on="id", how="left"
    )

    proba_cols = sorted([c for c in df_preds.columns if c.startswith("proba_")])
    label_order = [int(c.replace("proba_", "")) for c in proba_cols]
    proba = df_preds[proba_cols].to_numpy()
    y_true = df_preds["y_true"].to_numpy()

    df_odds = pd.read_parquet(input_odds_path)

    # join_odds_to_holdout expects homeTeamId, awayTeamId, startTimeUtc columns
    reference_proba, actual_odds, league_codes = join_odds_to_holdout(
        df_holdout=df_preds,
        df_odds=df_odds,
        metadata_dir=Path(metadata_dir),
    )

    n_matched = int(np.sum(~np.isnan(reference_proba[:, 0])))
    n_total = len(df_preds)
    n_uniform = n_total - n_matched
    odds_label = f"Bet365 closing odds ({n_matched}/{n_total} matched, {n_uniform} uniform fallback)"
    logger.info("Odds baseline: %s", odds_label)

    # Only rows with real odds are meaningful for ROI vs bookmaker
    matched_mask = ~np.isnan(reference_proba[:, 0])

    roi = compute_flat_stake_roi(
        y_true=y_true,
        model_proba=proba,
        label_order=label_order,
        reference_proba=reference_proba,
        actual_odds=actual_odds,
    )

    # --- Segmented ROI (only matched rows) ---
    metadata_dir_p = Path(metadata_dir)
    region_map = _load_id_map(metadata_dir_p, "regionId")

    df_matched = df_preds[matched_mask].reset_index(drop=True)
    proba_matched = proba[matched_mask]
    y_true_matched = y_true[matched_mask]
    ref_matched = reference_proba[matched_mask]
    odds_matched = actual_odds[matched_mask]
    leagues_matched = league_codes[matched_mask]

    # ROI by league_code (from FDCO join)
    seg_col_league = "league_code"
    df_matched[seg_col_league] = leagues_matched
    seg_col_region = "regionId" if "regionId" in df_matched.columns else None

    df_roi_league: pd.DataFrame = pd.DataFrame()
    df_roi_region: pd.DataFrame = pd.DataFrame()
    df_roi_elo: pd.DataFrame = pd.DataFrame()

    if df_matched[seg_col_league].notna().any():
        df_roi_league = compute_roi_by_segment(
            y_true=y_true_matched,
            model_proba=proba_matched,
            label_order=label_order,
            segments=df_matched[[seg_col_league]],
            segment_col=seg_col_league,
            reference_proba=ref_matched,
            actual_odds=odds_matched,
            min_bets=10,
        )

    if seg_col_region:
        df_roi_region = compute_roi_by_segment(
            y_true=y_true_matched,
            model_proba=proba_matched,
            label_order=label_order,
            segments=df_matched[[seg_col_region]],
            segment_col=seg_col_region,
            reference_proba=ref_matched,
            actual_odds=odds_matched,
            min_bets=10,
        )
        if not df_roi_region.empty and region_map:
            df_roi_region["region_name"] = df_roi_region[seg_col_region].apply(
                lambda x: region_map.get(str(int(x)), str(x)) if pd.notna(x) else str(x)
            )

    # --- Join logloss from error_analysis (optional) ---
    logloss_threshold: float | None = None
    if error_analysis_dir is not None and not df_roi_region.empty:
        err_seg_path = Path(error_analysis_dir) / "segment_regionId.csv"
        if err_seg_path.exists():
            df_err = pd.read_csv(err_seg_path)
            logloss_threshold = float(df_err["logloss"].median())
            df_roi_region = df_roi_region.merge(
                df_err[["id", "logloss"]].rename(columns={"id": seg_col_region}),
                on=seg_col_region,
                how="left",
            )
            df_roi_region["low_logloss"] = df_roi_region["logloss"] < logloss_threshold
            logger.info(
                "Joined logloss for %d regions; low-logloss threshold (median): %.4f",
                df_roi_region["logloss"].notna().sum(),
                logloss_threshold,
            )
        else:
            logger.warning(
                "segment_regionId.csv not found in %s — skipping logloss join",
                error_analysis_dir,
            )

    # ROI by ELO gap bin
    elo_col = next(
        (c for c in df_matched.columns if "elo" in c.lower() and "diff" in c.lower()),
        None,
    )
    if elo_col:
        elo_bins = df_matched[elo_col].apply(_elo_gap_label)
        df_roi_elo = compute_roi_by_segment(
            y_true=y_true_matched,
            model_proba=proba_matched,
            label_order=label_order,
            segments=elo_bins.rename("elo_gap_bin").to_frame(),
            segment_col="elo_gap_bin",
            reference_proba=ref_matched,
            actual_odds=odds_matched,
            min_bets=10,
        )

    # --- Threshold analysis ---
    df_roi_threshold = compute_roi_by_threshold(
        y_true=y_true_matched,
        model_proba=proba_matched,
        label_order=label_order,
        reference_proba=ref_matched,
        actual_odds=odds_matched,
        min_bets=20,
    )

    output_report_dir = Path(output_report_dir)
    data_dir = Path(data_dir)
    output_report_dir.mkdir(parents=True, exist_ok=True)
    data_dir.mkdir(parents=True, exist_ok=True)

    pd.DataFrame([roi]).to_csv(data_dir / "roi_simulation.csv", index=False)
    if not df_roi_region.empty:
        df_roi_region.to_csv(data_dir / "roi_by_region.csv", index=False)
    if not df_roi_league.empty:
        df_roi_league.to_csv(data_dir / "roi_by_league.csv", index=False)
    if not df_roi_elo.empty:
        df_roi_elo.to_csv(data_dir / "roi_by_elo_bin.csv", index=False)
    if not df_roi_threshold.empty:
        df_roi_threshold.to_csv(data_dir / "roi_by_threshold.csv", index=False)

    # --- Charts ---
    figures_dir = output_report_dir / "figures"
    figures_dir.mkdir(exist_ok=True)

    if not df_roi_region.empty:
        label_col = (
            "region_name" if "region_name" in df_roi_region.columns else seg_col_region
        )
        df_plot = df_roi_region.sort_values("roi_pct", ascending=False).head(25)
        has_logloss = "low_logloss" in df_plot.columns
        fig, ax = plt.subplots(figsize=(10, 7))

        bar_colors = []
        for _, r in df_plot.iterrows():
            if r["roi_pct"] >= 0:
                bar_colors.append("#2ecc71")
            elif has_logloss and r.get("low_logloss", False):
                bar_colors.append("#e67e22")  # orange for low-logloss regions
            else:
                bar_colors.append("#e74c3c")

        y_labels = []
        for _, r in df_plot.iterrows():
            name = str(r[label_col])
            if has_logloss and r.get("low_logloss", False):
                name = f"★ {name}"
            y_labels.append(name)

        ax.barh(y_labels, df_plot["roi_pct"].tolist(), color=bar_colors)
        ax.axvline(0, color="black", linewidth=0.8, linestyle="--")
        ax.set_xlabel("ROI %")
        title = "ROI vs Bet365 by region (top-25 by bets, matched only)"
        if has_logloss and logloss_threshold is not None:
            title += (
                f"\n★ = logloss < {logloss_threshold:.3f} (median across all regions)"
            )
        ax.set_title(title)
        ax.invert_yaxis()
        if has_logloss:
            from matplotlib.patches import Patch

            legend_elements = [
                Patch(
                    facecolor="#e67e22",
                    label=f"Low logloss (< {logloss_threshold:.3f})",
                ),
                Patch(facecolor="#e74c3c", label="Other"),
            ]
            ax.legend(handles=legend_elements, loc="lower right", fontsize=8)
        plt.tight_layout()
        fig.savefig(figures_dir / "roi_by_region.png", dpi=120)
        plt.close(fig)

    if not df_roi_elo.empty:
        fig, ax = plt.subplots(figsize=(8, 4))
        colors = ["#2ecc71" if v >= 0 else "#e74c3c" for v in df_roi_elo["roi_pct"]]
        ax.bar(df_roi_elo["elo_gap_bin"], df_roi_elo["roi_pct"], color=colors)
        ax.axhline(0, color="black", linewidth=0.8, linestyle="--")
        ax.set_ylabel("ROI %")
        ax.set_title("ROI vs Bet365 by ELO gap bin (matched only)")
        plt.tight_layout()
        fig.savefig(figures_dir / "roi_by_elo_bin.png", dpi=120)
        plt.close(fig)

    if not df_roi_threshold.empty:
        fig, ax = plt.subplots(figsize=(9, 4))
        ax.plot(
            df_roi_threshold["min_edge"],
            df_roi_threshold["roi_pct"],
            marker="o",
            color="#3498db",
        )
        ax.axhline(0, color="black", linewidth=0.8, linestyle="--")
        ax.set_xlabel("Min edge threshold")
        ax.set_ylabel("ROI %")
        ax.set_title("ROI vs Bet365 by minimum edge threshold (matched only)")
        # Annotate n_bets
        for _, r in df_roi_threshold.iterrows():
            ax.annotate(
                f"n={int(r['n_bets'])}",
                (r["min_edge"], r["roi_pct"]),
                textcoords="offset points",
                xytext=(0, 8),
                ha="center",
                fontsize=8,
            )
        plt.tight_layout()
        fig.savefig(figures_dir / "roi_by_threshold.png", dpi=120)
        plt.close(fig)

    # --- Report ---
    lines = [
        "# ROI Analysis — Bet365 Closing Odds Benchmark",
        "",
        f"> **Odds baseline**: {odds_label}",
        f"> **Holdout size**: {n_total} matches | **Matched**: {n_matched}",
        "",
        "## Overall ROI (flat-stake vs Bet365 closing odds)",
        "",
        "> Simulation only — no live betting implied.",
        "",
        "| Metric | Value |",
        "|--------|-------|",
    ]
    for k, v in roi.items():
        val_str = f"{v:.4f}" if isinstance(v, float) and not np.isnan(v) else str(v)
        lines.append(f"| {k} | {val_str} |")

    if not df_roi_elo.empty:
        lines += [
            "",
            "## ROI by ELO gap bin (matched rows only)",
            "",
            "![ROI by ELO](figures/roi_by_elo_bin.png)",
            "",
            "| elo_gap_bin | n_bets | hit_rate | roi_pct |",
            "|---|---|---|---|",
        ]
        for _, row in df_roi_elo.sort_values("roi_pct", ascending=False).iterrows():
            lines.append(
                f"| {row['elo_gap_bin']} | {int(row['n_bets'])} | {row['hit_rate']:.3f} | {row['roi_pct']:.2f}% |"
            )

    if not df_roi_region.empty:
        label_col = (
            "region_name" if "region_name" in df_roi_region.columns else seg_col_region
        )
        has_logloss = "logloss" in df_roi_region.columns
        header = "| region | n_bets | hit_rate | roi_pct |"
        sep = "|---|---|---|---|"
        if has_logloss:
            header = "| region | n_bets | hit_rate | roi_pct | logloss |"
            sep = "|---|---|---|---|---|"
        lines += [
            "",
            "## ROI by region (matched rows only, ≥10 bets)",
            "",
            "![ROI by region](figures/roi_by_region.png)",
            "",
        ]
        if has_logloss and logloss_threshold is not None:
            lines.append(
                f"> ★ = logloss below median across all regions ({logloss_threshold:.3f})\n"
            )
        lines += [header, sep]
        for _, row in df_roi_region.sort_values("roi_pct", ascending=False).iterrows():
            name = str(row.get(label_col, row[seg_col_region]))
            if has_logloss and row.get("low_logloss", False):
                name = f"★ {name}"
            if has_logloss:
                ll = row["logloss"]
                ll_str = f"{ll:.4f}" if pd.notna(ll) else "—"
                lines.append(
                    f"| {name} | {int(row['n_bets'])} | {row['hit_rate']:.3f} | {row['roi_pct']:.2f}% | {ll_str} |"
                )
            else:
                lines.append(
                    f"| {name} | {int(row['n_bets'])} | {row['hit_rate']:.3f} | {row['roi_pct']:.2f}% |"
                )

    if not df_roi_league.empty:
        lines += [
            "",
            "## ROI by league (matched rows only, ≥10 bets)",
            "",
            "| league | n_bets | hit_rate | roi_pct |",
            "|---|---|---|---|",
        ]
        for _, row in df_roi_league.sort_values("roi_pct", ascending=False).iterrows():
            lines.append(
                f"| {row[seg_col_league]} | {int(row['n_bets'])} | {row['hit_rate']:.3f} | {row['roi_pct']:.2f}% |"
            )

    if not df_roi_threshold.empty:
        lines += [
            "",
            "## ROI vs minimum edge threshold (matched rows only)",
            "",
            "> Real B365 payout used. `min_edge = 0` means all matched bets placed.",
            "",
            "![ROI by threshold](figures/roi_by_threshold.png)",
            "",
            "| min_edge | n_bets | bet_rate | hit_rate | roi_pct |",
            "|---|---|---|---|---|",
        ]
        for _, row in df_roi_threshold.iterrows():
            lines.append(
                f"| {row['min_edge']:.2f} | {int(row['n_bets'])} | {row['bet_rate']:.3f}"
                f" | {row['hit_rate']:.3f} | {row['roi_pct']:.2f}% |"
            )

    (output_report_dir / "index.md").write_text("\n".join(lines), encoding="utf-8")
    logger.info("ROI analysis report written to %s", output_report_dir / "index.md")

    if not df_roi_region.empty:
        _upload_roi_to_minio(data_dir / "roi_by_region.csv")

CLI entrypoint for live betting strategy simulation on Fonbet odds.

Joins model predictions (batch_inference) with actual Fonbet live odds to simulate flat-stake and fractional-Kelly betting strategies on finished matches.

This stage is intentionally NOT part of the DVC graph: Fonbet odds accumulate continuously, and the simulation can be re-run at any time as new match results arrive. Run via make live-betting or directly::

python -m src.pipelines.live_betting \
    --fonbet-odds-path data/predictions/fonbet_odds.parquet

All analysis CSVs are written to data/analysis/live_betting/ by default.

cli_live_betting(predictions_path, finished_path, fonbet_odds_path, output_dir, metadata_dir, min_edge, min_bets, kelly_fraction, initial_bankroll, error_analysis_dir)

Simulate flat-stake and Kelly betting strategies on live Fonbet odds.

Parameters:

Name Type Description Default
predictions_path Path

Parquet with batch-inference predictions.

required
finished_path Path

Parquet of finished match records.

required
fonbet_odds_path Path

Parquet with Fonbet 1X2 odds.

required
output_dir Path

Directory for output CSVs and reports.

required
metadata_dir Path

Directory with segment lookup files.

required
min_edge float

Minimum model-implied edge to place a bet.

required
min_bets int

Minimum bets required to include a segment in reports.

required
kelly_fraction float

Fractional Kelly coefficient.

required
initial_bankroll float

Starting bankroll for Kelly simulation.

required
error_analysis_dir Path | None

Optional directory with per-segment error CSVs.

required
Source code in src/pipelines/live_betting.py
@cli.command("live-betting")
@click.option(
    "--predictions-path",
    default="data/predictions/predictions.parquet",
    type=click.Path(path_type=Path, dir_okay=False),
    show_default=True,
    help="Path to batch_inference predictions.parquet.",
)
@click.option(
    "--finished-path",
    default="data/interim/finished.parquet",
    type=click.Path(path_type=Path, dir_okay=False),
    show_default=True,
    help="Path to interim/finished.parquet (contains outcome_1x2).",
)
@click.option(
    "--fonbet-odds-path",
    default="data/predictions/fonbet_odds.parquet",
    type=click.Path(path_type=Path, dir_okay=False),
    show_default=True,
    help="Path to fonbet_odds.parquet. Downloaded from MinIO if absent.",
)
@click.option(
    "--output-dir",
    default="data/analysis/live_betting",
    type=click.Path(path_type=Path, dir_okay=True, file_okay=False),
    show_default=True,
    help="Directory for output CSV files.",
)
@click.option(
    "--metadata-dir",
    default="data/metadata",
    type=click.Path(path_type=Path, dir_okay=True, file_okay=False),
    show_default=True,
    help="Directory with id-to-name JSON mappings.",
)
@click.option(
    "--min-edge",
    default=0.02,
    type=float,
    show_default=True,
    help="Minimum edge (model_proba - market_proba) to place a bet.",
)
@click.option(
    "--min-bets",
    default=10,
    type=int,
    show_default=True,
    help="Minimum bets per segment to include in segment analysis.",
)
@click.option(
    "--kelly-fraction",
    default=0.25,
    type=float,
    show_default=True,
    help="Fractional Kelly multiplier (0 < f <= 1).",
)
@click.option(
    "--initial-bankroll",
    default=100.0,
    type=float,
    show_default=True,
    help="Starting bankroll for Kelly simulation.",
)
@click.option(
    "--error-analysis-dir",
    default=None,
    type=click.Path(path_type=Path, dir_okay=True, file_okay=False),
    show_default=True,
    help=(
        "Directory containing segment_regionId.csv from holdout error analysis. "
        "Used to join historical logloss onto regional ROI. "
        "Downloaded from MinIO if absent."
    ),
)
def cli_live_betting(
    predictions_path: Path,
    finished_path: Path,
    fonbet_odds_path: Path,
    output_dir: Path,
    metadata_dir: Path,
    min_edge: float,
    min_bets: int,
    kelly_fraction: float,
    initial_bankroll: float,
    error_analysis_dir: Path | None,
) -> None:
    """Simulate flat-stake and Kelly betting strategies on live Fonbet odds.

    Args:
        predictions_path: Parquet with batch-inference predictions.
        finished_path: Parquet of finished match records.
        fonbet_odds_path: Parquet with Fonbet 1X2 odds.
        output_dir: Directory for output CSVs and reports.
        metadata_dir: Directory with segment lookup files.
        min_edge: Minimum model-implied edge to place a bet.
        min_bets: Minimum bets required to include a segment in reports.
        kelly_fraction: Fractional Kelly coefficient.
        initial_bankroll: Starting bankroll for Kelly simulation.
        error_analysis_dir: Optional directory with per-segment error CSVs.
    """
    _download_metadata(Path(metadata_dir))

    if error_analysis_dir is not None:
        _download_segment_errors(Path(error_analysis_dir))

    if not predictions_path.exists():
        logger.info(
            "predictions.parquet not at %s — attempting MinIO download",
            predictions_path,
        )
        ok = _download_artifact(predictions_path, _MINIO_PREDICTIONS_KEY)
        if not ok or not predictions_path.exists():
            raise click.ClickException(
                f"Predictions not available at {predictions_path} and could not be "
                "downloaded from MinIO.  Run upload-artifacts after dvc repro batch_inference."
            )
    df_preds = pd.read_parquet(predictions_path)
    logger.info("Loaded predictions: %d rows", len(df_preds))

    if not finished_path.exists():
        logger.info(
            "finished.parquet not at %s — attempting MinIO download", finished_path
        )
        ok = _download_artifact(finished_path, _MINIO_FINISHED_KEY)
        if not ok or not finished_path.exists():
            raise click.ClickException(
                f"Finished data not available at {finished_path} and could not be "
                "downloaded from MinIO.  Run upload-artifacts after dvc repro batch_inference."
            )
    finished_cols = ["id", "outcome_1x2", "startTimeUtc", "tournamentId", "regionId"]
    df_finished = pd.read_parquet(finished_path)
    # If 'id' is stored as the parquet index, bring it back as a plain column.
    if df_finished.index.name == "id" and "id" not in df_finished.columns:
        df_finished = df_finished.reset_index()
    elif df_finished.index.name == "id":
        df_finished = df_finished.reset_index(drop=True)
    df_finished = df_finished[[c for c in finished_cols if c in df_finished.columns]]
    logger.info("Loaded finished matches: %d rows", len(df_finished))

    fonbet_odds_path = Path(fonbet_odds_path)
    if not fonbet_odds_path.exists():
        logger.info(
            "fonbet_odds.parquet not at %s — attempting MinIO download",
            fonbet_odds_path,
        )
        ok = _download_fonbet_odds(fonbet_odds_path)
        if not ok or not fonbet_odds_path.exists():
            raise click.ClickException(
                f"Fonbet odds not available at {fonbet_odds_path} and could not be "
                "downloaded from MinIO.  Run link_fonbet_odds first."
            )
    df_odds = pd.read_parquet(fonbet_odds_path)
    logger.info("Loaded Fonbet odds: %d rows", len(df_odds))

    df_sim = _build_simulation_df(df_preds, df_finished, df_odds)
    if df_sim.empty:
        logger.warning(
            "No matched finished matches with Fonbet odds. Nothing to simulate."
        )
        return

    n_sim = len(df_sim)
    logger.info("Running simulation on %d matched finished matches.", n_sim)

    proba = df_sim[["proba_home", "proba_draw", "proba_away"]].to_numpy()
    actual_odds = df_sim[["odd_home", "odd_draw", "odd_away"]].to_numpy()
    y_true = df_sim["outcome_1x2"].to_numpy()
    dates = (
        df_sim["startTimeUtc"].to_numpy() if "startTimeUtc" in df_sim.columns else None
    )

    flat_roi = compute_flat_stake_roi(
        y_true=y_true,
        model_proba=proba,
        label_order=_LABEL_ORDER,
        actual_odds=actual_odds,
    )

    kelly_roi = compute_kelly_roi(
        y_true=y_true,
        model_proba=proba,
        label_order=_LABEL_ORDER,
        actual_odds=actual_odds,
        fraction=kelly_fraction,
        initial_bankroll=initial_bankroll,
        min_edge=min_edge,
    )

    df_roi_threshold = compute_roi_by_threshold(
        y_true=y_true,
        model_proba=proba,
        label_order=_LABEL_ORDER,
        actual_odds=actual_odds,
        min_bets=min_bets,
    )

    tournament_map = _load_id_map(Path(metadata_dir), "tournamentId")
    region_map = _load_id_map(Path(metadata_dir), "regionId")

    df_roi_segment: pd.DataFrame = pd.DataFrame()
    if "tournamentId" in df_sim.columns and df_sim["tournamentId"].notna().any():
        df_roi_segment = compute_roi_by_segment(
            y_true=y_true,
            model_proba=proba,
            label_order=_LABEL_ORDER,
            segments=df_sim[["tournamentId"]],
            segment_col="tournamentId",
            actual_odds=actual_odds,
            min_bets=min_bets,
        )
        if not df_roi_segment.empty:
            if tournament_map:
                df_roi_segment["tournament_name"] = df_roi_segment[
                    "tournamentId"
                ].apply(
                    lambda x: (
                        tournament_map.get(str(int(x)), str(x))
                        if pd.notna(x)
                        else str(x)
                    )
                )
            if "regionId" in df_sim.columns and region_map:
                region_per_tourn = df_sim.drop_duplicates("tournamentId")[
                    ["tournamentId", "regionId"]
                ].set_index("tournamentId")["regionId"]
                df_roi_segment["regionId"] = df_roi_segment["tournamentId"].map(
                    region_per_tourn
                )
                df_roi_segment["region_name"] = df_roi_segment["regionId"].apply(
                    lambda x: (
                        region_map.get(str(int(x)), str(x)) if pd.notna(x) else str(x)
                    )
                )

    # ── ROI by region (mirrors roi_analysis.py logic) ─────────────────────────
    df_roi_region: pd.DataFrame = pd.DataFrame()
    if "regionId" in df_sim.columns and df_sim["regionId"].notna().any():
        df_roi_region = compute_roi_by_segment(
            y_true=y_true,
            model_proba=proba,
            label_order=_LABEL_ORDER,
            segments=df_sim[["regionId"]],
            segment_col="regionId",
            actual_odds=actual_odds,
            min_bets=min_bets,
        )
        if not df_roi_region.empty:
            if region_map:
                df_roi_region["region_name"] = df_roi_region["regionId"].apply(
                    lambda x: (
                        region_map.get(str(int(x)), str(x)) if pd.notna(x) else str(x)
                    )
                )
            # Join historical logloss from segment_regionId.csv when available.
            if error_analysis_dir is not None:
                err_seg_path = Path(error_analysis_dir) / "segment_regionId.csv"
                if err_seg_path.exists():
                    df_err = pd.read_csv(err_seg_path)
                    logloss_threshold = float(df_err["logloss"].median())
                    df_roi_region = df_roi_region.merge(
                        df_err[["id", "logloss"]].rename(columns={"id": "regionId"}),
                        on="regionId",
                        how="left",
                    )
                    df_roi_region["low_logloss"] = (
                        df_roi_region["logloss"] < logloss_threshold
                    )
                    logger.info(
                        "Joined logloss for %d regions; low-logloss threshold (median): %.4f",
                        df_roi_region["logloss"].notna().sum(),
                        logloss_threshold,
                    )
                else:
                    logger.warning(
                        "segment_regionId.csv not found in %s — logloss join skipped.",
                        error_analysis_dir,
                    )

    df_timeseries = compute_roi_timeseries(
        y_true=y_true,
        model_proba=proba,
        label_order=_LABEL_ORDER,
        actual_odds=actual_odds,
        dates=dates,
        min_edge=min_edge,
    )

    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    df_overall = pd.DataFrame(
        [
            {"strategy": "flat_stake", **flat_roi},
            {"strategy": f"kelly_{kelly_fraction:.2f}", **kelly_roi},
        ]
    )
    df_overall.to_csv(output_dir / "overall_roi.csv", index=False)
    logger.info("Wrote overall_roi.csv")

    df_roi_threshold.to_csv(output_dir / "roi_by_threshold.csv", index=False)
    logger.info("Wrote roi_by_threshold.csv (%d thresholds)", len(df_roi_threshold))

    if not df_roi_segment.empty:
        df_roi_segment.to_csv(output_dir / "roi_by_segment.csv", index=False)
        logger.info("Wrote roi_by_segment.csv (%d segments)", len(df_roi_segment))

    if not df_roi_region.empty:
        df_roi_region.to_csv(output_dir / "roi_by_region.csv", index=False)
        logger.info("Wrote roi_by_region.csv (%d regions)", len(df_roi_region))

    if not df_timeseries.empty:
        df_timeseries.to_csv(output_dir / "roi_timeseries.csv", index=False)
        logger.info("Wrote roi_timeseries.csv (%d bets)", len(df_timeseries))

    _upload_results_to_minio(output_dir)

    _write_betting_textfile(
        output_dir,
        flat_roi,
        n_sim,
        y_true=y_true,
        y_pred_labels=np.array([_LABEL_ORDER[i] for i in proba.argmax(axis=1)]),
    )

    logger.info(
        "Simulation complete — %d matches, %d bets (flat, edge>%.2f). "
        "Flat ROI: %.1f%%.  Kelly bankroll growth: %.1f%%.",
        n_sim,
        int(flat_roi.get("n_bets", 0)),
        min_edge,
        float(flat_roi.get("roi_pct", float("nan"))),
        float(kelly_roi.get("bankroll_growth_pct", float("nan"))),
    )

Monitoring (Pipeline Stages)

DVC pipeline stage: monitor feature drift using Evidently.

Resolves the champion model's run_id from the MLflow Model Registry, loads recent production feature records from data/predictions/match_features.parquet, runs Evidently drift analysis, writes results to reports/, and updates the model_feature_drift_score Prometheus metric via a textfile collectd can scrape.

Usage (DVC-managed, see dvc.yaml): python -m src.pipelines cli-monitor-drift \ \

The stage is also triggered after batch inference by airflow/dags/ml_monitor_drift_01.py (KubernetesPodOperator, same CLI entrypoint).

cli_monitor_drift(input_predictions_features_path, output_drift_json_path, output_report_dir, metrics_textfile, stattest_threshold)

Compute feature drift between reference and recent production features.

The champion model's run_id is resolved live from the MLflow Model Registry (model name and alias from config / env vars), so no local artifact file is required. This makes the command safe to run inside ephemeral Kubernetes pods that start with an empty data volume.

Parameters:

Name Type Description Default
input_predictions_features_path Path

Parquet of recent production features (batch_inference output).

required
output_drift_json_path Path

Destination for the drift summary JSON.

required
output_report_dir Path

Directory for the full Evidently HTML report.

required
metrics_textfile Path | None

Optional path for a DVC-metrics text file.

required
stattest_threshold float

Evidently p-value threshold for drift detection.

required
Source code in src/pipelines/monitor_drift.py
@cli.command()
@click.argument(
    "input_predictions_features_path",
    type=click.Path(path_type=Path, dir_okay=False),
)
@click.argument(
    "output_drift_json_path",
    type=click.Path(path_type=Path, dir_okay=False),
)
@click.option(
    "--output-report-dir",
    type=click.Path(path_type=Path, file_okay=False),
    default=Path("reports/evidently"),
    show_default=True,
    help="Directory for Evidently HTML reports.",
)
@click.option(
    "--metrics-textfile",
    type=click.Path(path_type=Path, dir_okay=False),
    default=None,
    help=(
        "Optional: write Prometheus textfile to this path "
        "(for node_exporter textfile collector)."
    ),
)
@click.option(
    "--stattest-threshold",
    type=float,
    default=0.05,
    show_default=True,
    help="p-value threshold for per-feature drift tests.",
)
def cli_monitor_drift(
    input_predictions_features_path: Path,
    output_drift_json_path: Path,
    output_report_dir: Path,
    metrics_textfile: Path | None,
    stattest_threshold: float,
) -> None:
    """Compute feature drift between reference and recent production features.

    The champion model's run_id is resolved live from the MLflow Model Registry
    (model name and alias from config / env vars), so no local artifact file is
    required.  This makes the command safe to run inside ephemeral Kubernetes
    pods that start with an empty data volume.

    Args:
        input_predictions_features_path: Parquet of recent production
            features (batch_inference output).
        output_drift_json_path: Destination for the drift summary JSON.
        output_report_dir: Directory for the full Evidently HTML report.
        metrics_textfile: Optional path for a DVC-metrics text file.
        stattest_threshold: Evidently p-value threshold for drift detection.
    """
    # -----------------------------------------------------------------------
    # 0. Ensure input data is available (download from MinIO if needed).
    # -----------------------------------------------------------------------
    features_path = Path(input_predictions_features_path)
    if not features_path.exists():
        logger.info("%s not found locally — attempting MinIO download.", features_path)
        ok = _download_artifact(features_path, _MINIO_MATCH_FEATURES_KEY)
        if not ok or not features_path.exists():
            raise click.ClickException(
                f"{features_path} not available locally and could not be "
                "downloaded from MinIO."
            )

    cfg = get_pipeline_config()
    mlflow.set_tracking_uri(cfg.mlflow_tracking_uri)

    # -----------------------------------------------------------------------
    # 1. Resolve the champion model's run_id from the MLflow Model Registry.
    # -----------------------------------------------------------------------
    client = MlflowClient()
    mv = client.get_model_version_by_alias(
        cfg.mlflow_model_name, cfg.mlflow_model_stage
    )
    run_id: str = mv.run_id
    logger.info(
        "Champion model: name=%s alias=%s version=%s run_id=%s",
        cfg.mlflow_model_name,
        cfg.mlflow_model_stage,
        mv.version,
        run_id,
    )

    with tempfile.TemporaryDirectory() as tmp:
        ref_local = mlflow.artifacts.download_artifacts(
            artifact_uri=f"runs:/{run_id}/reference/reference_features.parquet",
            dst_path=tmp,
        )
        reference_df = pd.read_parquet(ref_local)

    logger.info("Reference snapshot: %d rows, %d features", *reference_df.shape)

    # -----------------------------------------------------------------------
    # 2. Load recent production features.
    # -----------------------------------------------------------------------
    current_df = pd.read_parquet(features_path)
    logger.info("Current production features: %d rows, %d features", *current_df.shape)

    if len(current_df) < _MIN_ROWS:
        logger.warning(
            "Only %d rows of production features available (minimum %d). "
            "Skipping drift analysis and writing drift_score=0.",
            len(current_df),
            _MIN_ROWS,
        )
        drift_score = 0.0
        n_features = len(reference_df.columns)
        n_drifted = 0
        html_report = None
    else:
        # Align columns: only compare features present in both DataFrames.
        common_cols = sorted(set(reference_df.columns) & set(current_df.columns))
        result = compute_drift(
            reference_df=reference_df[common_cols],
            current_df=current_df[common_cols],
            stattest_threshold=stattest_threshold,
        )
        drift_score = result.drift_score
        n_features = result.n_features
        n_drifted = result.n_drifted
        html_report = result.html_report
        logger.info(
            "Drift score: %.4f (%d / %d features drifted)",
            drift_score,
            n_drifted,
            n_features,
        )

    # -----------------------------------------------------------------------
    # 3. Persist outputs.
    # -----------------------------------------------------------------------
    ts = datetime.now(tz=timezone.utc).isoformat()

    output_drift_json_path.parent.mkdir(parents=True, exist_ok=True)
    payload = {
        "drift_score": drift_score,
        "n_features": n_features,
        "n_drifted": n_drifted,
        "run_id": run_id,
        "timestamp": ts,
    }
    with open(output_drift_json_path, "w") as f:
        json.dump(payload, f, indent=2)
    logger.info("Drift summary written to %s", output_drift_json_path)

    # Upload JSON to MinIO so the API /monitoring/drift endpoint can read it.
    _upload_artifact(output_drift_json_path, "reports/drift/latest.json")

    if html_report is not None:
        output_report_dir.mkdir(parents=True, exist_ok=True)
        report_path = output_report_dir / f"drift_{ts[:10]}.html"
        report_path.write_bytes(html_report)
        logger.info("Evidently HTML report written to %s", report_path)
        _upload_artifact(report_path, f"reports/evidently/drift_{ts[:10]}.html")

    # -----------------------------------------------------------------------
    # 4. Expose drift score as Prometheus textfile (optional).
    # -----------------------------------------------------------------------
    if metrics_textfile is not None:
        Path(metrics_textfile).parent.mkdir(parents=True, exist_ok=True)
        registry = CollectorRegistry()
        g = Gauge(
            "model_feature_drift_score",
            "Evidently dataset drift score (share of drifted features).",
            registry=registry,
        )
        g.set(drift_score)
        write_to_textfile(str(metrics_textfile), registry)
        logger.info(
            "Prometheus textfile metric written to %s (drift_score=%.4f)",
            metrics_textfile,
            drift_score,
        )
        _push_to_gateway(registry, job="soccer_drift_monitor")

CLI entrypoint: monitor ML quality on finished production matches.

Downloads predictions and finished match results, joins them, computes classification quality metrics across rolling windows, writes a Prometheus textfile, and saves an Evidently HTML report.

Usage::

python -m src.pipelines monitor-ml-quality \
    --predictions-path data/predictions/predictions.parquet \
    --finished-path data/interim/finished.parquet \
    --output-dir data/analysis/ml_quality

Triggered daily by airflow/dags/ml_monitor_quality_01.py.

cli_monitor_ml_quality(predictions_path, finished_path, output_dir, evidently_report_dir, metrics_textfile)

Compute ML quality metrics for rolling windows and write monitoring outputs.

Source code in src/pipelines/monitor_ml_quality.py
@cli.command("monitor-ml-quality")
@click.option(
    "--predictions-path",
    default="data/predictions/predictions.parquet",
    type=click.Path(path_type=Path, dir_okay=False),
    show_default=True,
    help="Path to batch_inference predictions.parquet.",
)
@click.option(
    "--finished-path",
    default="data/interim/finished.parquet",
    type=click.Path(path_type=Path, dir_okay=False),
    show_default=True,
    help="Path to finished.parquet containing outcome_1x2.",
)
@click.option(
    "--output-dir",
    default="data/analysis/ml_quality",
    type=click.Path(path_type=Path, dir_okay=True, file_okay=False),
    show_default=True,
    help="Directory for output JSON and Prometheus textfile.",
)
@click.option(
    "--evidently-report-dir",
    default="reports/evidently",
    type=click.Path(path_type=Path, dir_okay=True, file_okay=False),
    show_default=True,
    help="Directory for Evidently HTML reports.",
)
@click.option(
    "--metrics-textfile",
    default=None,
    type=click.Path(path_type=Path, dir_okay=False),
    show_default=True,
    help="Path for Prometheus node_exporter textfile. Defaults to <output-dir>/ml_quality.prom.",
)
def cli_monitor_ml_quality(
    predictions_path: Path,
    finished_path: Path,
    output_dir: Path,
    evidently_report_dir: Path,
    metrics_textfile: Path | None,
) -> None:
    """Compute ML quality metrics for rolling windows and write monitoring outputs."""
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    if metrics_textfile is None:
        metrics_textfile = output_dir / "ml_quality.prom"

    # 1. Ensure input data is available.
    for local_path, minio_key in [
        (predictions_path, _MINIO_PREDICTIONS_KEY),
        (finished_path, _MINIO_FINISHED_KEY),
    ]:
        if not Path(local_path).exists():
            logger.info("%s not found locally — attempting MinIO download.", local_path)
            ok = _download_artifact(Path(local_path), minio_key)
            if not ok or not Path(local_path).exists():
                raise click.ClickException(
                    f"{local_path} not available locally and could not be downloaded from MinIO."
                )

    # 2. Load and join predictions with finished match outcomes.
    df = _load_joined_df(Path(predictions_path), Path(finished_path))
    if df.empty:
        logger.warning("No matched finished matches — nothing to monitor.")
        return

    # 3. Compute quality metrics per rolling window.
    proba_cols = ["proba_home", "proba_draw", "proba_away"]
    quality_results: dict[str, MLQualityResult | None] = {}

    for window_label, days in _WINDOWS.items():
        df_win = _filter_window(df, days)
        if len(df_win) < _MIN_ROWS:
            logger.warning(
                "Window '%s': only %d rows (minimum %d) — skipping.",
                window_label,
                len(df_win),
                _MIN_ROWS,
            )
            quality_results[window_label] = None
            continue

        y_true = df_win["outcome_1x2"].to_numpy(dtype=int)
        y_proba = df_win[proba_cols].to_numpy(dtype=float)
        quality_results[window_label] = compute_ml_quality(
            y_true, y_proba, _LABEL_ORDER
        )
        r = quality_results[window_label]
        assert r is not None
        logger.info(
            "Window '%s': n=%d  logloss=%.4f  ece=%.4f  hit_rate=%.4f",
            window_label,
            r.n_matches,
            r.logloss,
            r.ece,
            r.hit_rate,
        )

    # 4. Compute prediction drift (recent 7d vs 30d baseline).
    pred_drift_score: float | None = None
    pred_drift_n_drifted: int | None = None
    pred_drift_html: bytes | None = None

    now = datetime.now(tz=timezone.utc)
    if "startTimeUtc" in df.columns:
        cutoff_7d = now - timedelta(days=7)
        cutoff_30d = now - timedelta(days=30)
        df_current = df[df["startTimeUtc"] >= cutoff_7d][proba_cols]
        df_ref = df[
            (df["startTimeUtc"] >= cutoff_30d) & (df["startTimeUtc"] < cutoff_7d)
        ][proba_cols]

        if len(df_ref) >= _MIN_ROWS and len(df_current) >= _MIN_ROWS:
            try:
                pred_result = compute_prediction_drift(df_ref, df_current)
                pred_drift_score = pred_result.prediction_drift_score
                pred_drift_n_drifted = pred_result.n_drifted_cols
                pred_drift_html = pred_result.html_report
                logger.info(
                    "Prediction drift: score=%.4f, drifted_cols=%d/3.",
                    pred_drift_score,
                    pred_drift_n_drifted,
                )
            except Exception:
                logger.exception("Prediction drift computation failed — skipping.")
        else:
            logger.info(
                "Insufficient data for prediction drift (ref=%d, current=%d, min=%d).",
                len(df_ref),
                len(df_current),
                _MIN_ROWS,
            )

    # 5. Persist Evidently HTML report.
    ts = datetime.now(tz=timezone.utc).strftime("%Y-%m-%d")
    if pred_drift_html is not None:
        evidently_report_dir = Path(evidently_report_dir)
        evidently_report_dir.mkdir(parents=True, exist_ok=True)
        report_path = evidently_report_dir / f"prediction_drift_{ts}.html"
        report_path.write_bytes(pred_drift_html)
        logger.info("Evidently HTML report written to %s.", report_path)
        _upload_artifact(
            report_path,
            f"reports/evidently/prediction_drift_{ts}.html",
        )

    # 6. Write Prometheus textfile.
    _write_prometheus_textfile(
        Path(metrics_textfile),
        quality_results,
        pred_drift_score,
        pred_drift_n_drifted,
    )

    # 7. Write summary JSON.
    summary: dict = {
        "timestamp": datetime.now(tz=timezone.utc).isoformat(),
        "windows": {},
        "prediction_drift": {
            "score": pred_drift_score,
            "n_drifted_cols": pred_drift_n_drifted,
        },
    }
    for window_label, result in quality_results.items():
        if result is None:
            summary["windows"][window_label] = None
        else:
            summary["windows"][window_label] = {
                "n_matches": result.n_matches,
                "logloss": result.logloss,
                "ece": result.ece,
                "hit_rate": result.hit_rate,
                "hit_rate_home": None
                if np.isnan(result.hit_rate_home)
                else result.hit_rate_home,
                "hit_rate_draw": None
                if np.isnan(result.hit_rate_draw)
                else result.hit_rate_draw,
                "hit_rate_away": None
                if np.isnan(result.hit_rate_away)
                else result.hit_rate_away,
                "mean_confidence": result.mean_confidence,
            }

    summary_path = output_dir / f"ml_quality_{ts}.json"
    with open(summary_path, "w") as f:
        json.dump(summary, f, indent=2)
    logger.info("Quality summary written to %s.", summary_path)

    computed = [w for w, r in quality_results.items() if r is not None]
    logger.info("ML quality monitoring complete. Windows computed: %s.", computed)

Internal

Minimal configuration for ML pipeline stages.

Reads only the env vars required by training/evaluation pipelines: MLflow tracking URI, MinIO credentials, and model registry defaults.

Intentionally isolated from src.app.config (which instantiates DatabaseSettings and other app-layer services not needed during training). Extends src.shared.config.SharedInfraConfig to avoid duplicating the four shared env-var field declarations.

PipelineConfig

Bases: SharedInfraConfig

Settings used exclusively by DVC pipeline stages.

Inherits from SharedInfraConfig
  • mlflow_tracking_uri (MLFLOW_TRACKING_URL)
  • minio_endpoint_url (MINIO_ENDPOINT_URL)
  • minio_access_key (MINIO_USER)
  • minio_secret_key (MINIO_PASSWORD)

Adds pipeline-specific model-registry defaults:

Source code in src/pipelines/_config.py
class PipelineConfig(SharedInfraConfig):
    """Settings used exclusively by DVC pipeline stages.

    Inherits from SharedInfraConfig:
      - mlflow_tracking_uri  (MLFLOW_TRACKING_URL)
      - minio_endpoint_url   (MINIO_ENDPOINT_URL)
      - minio_access_key     (MINIO_USER)
      - minio_secret_key     (MINIO_PASSWORD)

    Adds pipeline-specific model-registry defaults:
    """

    mlflow_model_name: str = Field(
        "soccer-match-outcome", validation_alias="MLFLOW_MODEL_NAME"
    )
    mlflow_model_stage: str = Field("smoke", validation_alias="MLFLOW_MODEL_STAGE")

get_pipeline_config() cached

Return the pipeline config singleton (lazy, cached).

Deferred instantiation prevents pydantic-settings from validating required env vars at import time. Pipeline stages that do not need MLflow / MinIO (e.g. load_data_from_sources) import this module without triggering a ValidationError just because their sibling modules do.

Use get_pipeline_config.cache_clear() in tests to reset the cache after changing env vars with monkeypatch.

Returns:

Type Description
Singleton

class:PipelineConfig instance.

Source code in src/pipelines/_config.py
@lru_cache(maxsize=1)
def get_pipeline_config() -> PipelineConfig:
    """Return the pipeline config singleton (lazy, cached).

    Deferred instantiation prevents pydantic-settings from validating
    required env vars at *import time*.  Pipeline stages that do not need
    MLflow / MinIO (e.g. load_data_from_sources) import this module without
    triggering a ValidationError just because their sibling modules do.

    Use ``get_pipeline_config.cache_clear()`` in tests to reset the cache
    after changing env vars with ``monkeypatch``.

    Returns:
        Singleton :class:`PipelineConfig` instance.
    """
    return PipelineConfig()