Skip to content

Models

Metrics

plot_confusion_matrix_multiclass(y_true, y_pred, labels, normalize=None, title=None)

Plot a confusion matrix for a multiclass classification result.

Parameters:

Name Type Description Default
y_true

Ground-truth labels.

required
y_pred

Predicted labels.

required
labels

Ordered list of class labels for axis ticks.

required
normalize

Normalization mode passed to sklearn.metrics.confusion_matrix (None, "true", "pred", or "all").

None
title

Figure title. Defaults to "Confusion Matrix".

None

Returns:

Type Description

Matplotlib Figure with a single confusion-matrix axis.

Source code in src/models/metrics.py
def plot_confusion_matrix_multiclass(
    y_true, y_pred, labels, normalize=None, title=None
):
    """Plot a confusion matrix for a multiclass classification result.

    Args:
        y_true: Ground-truth labels.
        y_pred: Predicted labels.
        labels: Ordered list of class labels for axis ticks.
        normalize: Normalization mode passed to
            ``sklearn.metrics.confusion_matrix`` (``None``, ``"true"``,
            ``"pred"``, or ``"all"``).
        title: Figure title. Defaults to ``"Confusion Matrix"``.

    Returns:
        Matplotlib Figure with a single confusion-matrix axis.
    """
    cm = confusion_matrix(y_true, y_pred, labels=labels, normalize=normalize)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)

    fig, ax = plt.subplots(figsize=(6, 6))
    disp.plot(
        ax=ax, values_format=".2f" if normalize else "d", cmap=None, colorbar=False
    )
    ax.set_title(title or "Confusion Matrix")
    plt.tight_layout()
    return fig

multiclass_brier_score(y_true, proba, labels)

Compute the mean multiclass Brier score (one-hot encoding).

Parameters:

Name Type Description Default
y_true

Iterable of ground-truth class labels.

required
proba

Predicted probability matrix, shape (n, n_classes).

required
labels

Ordered list of class labels matching proba columns.

required

Returns:

Type Description

Mean Brier score across all samples (lower is better).

Source code in src/models/metrics.py
def multiclass_brier_score(y_true, proba, labels):
    """Compute the mean multiclass Brier score (one-hot encoding).

    Args:
        y_true: Iterable of ground-truth class labels.
        proba: Predicted probability matrix, shape ``(n, n_classes)``.
        labels: Ordered list of class labels matching ``proba`` columns.

    Returns:
        Mean Brier score across all samples (lower is better).
    """
    y_true_onehot = np.zeros_like(proba, dtype=float)
    label_to_idx = {label: i for i, label in enumerate(labels)}
    for i, y in enumerate(y_true):
        y_true_onehot[i, label_to_idx[y]] = 1.0
    return np.mean(np.sum((proba - y_true_onehot) ** 2, axis=1))

compute_ece(y_true, proba, labels, n_bins=10)

Compute macro-averaged multiclass Expected Calibration Error.

Parameters:

Name Type Description Default
y_true ndarray

Ground-truth class labels.

required
proba ndarray

Predicted probability matrix, shape (n, n_classes).

required
labels list

Ordered list of class labels matching proba columns.

required
n_bins int

Number of calibration bins.

10

Returns:

Type Description
float

Macro-averaged ECE across all classes (one-vs-rest, lower is better).

Source code in src/models/metrics.py
def compute_ece(
    y_true: np.ndarray,
    proba: np.ndarray,
    labels: list,
    n_bins: int = 10,
) -> float:
    """Compute macro-averaged multiclass Expected Calibration Error.

    Args:
        y_true: Ground-truth class labels.
        proba: Predicted probability matrix, shape ``(n, n_classes)``.
        labels: Ordered list of class labels matching ``proba`` columns.
        n_bins: Number of calibration bins.

    Returns:
        Macro-averaged ECE across all classes (one-vs-rest, lower is better).
    """
    y_arr = np.asarray(y_true)
    ece_total = 0.0
    bins = np.linspace(0.0, 1.0, n_bins + 1)
    for i, label in enumerate(labels):
        y_bin = (y_arr == label).astype(float)
        p_bin = proba[:, i]
        bin_indices = np.digitize(p_bin, bins[1:-1])
        ece_label = 0.0
        for b in range(n_bins):
            mask = bin_indices == b
            if mask.sum() == 0:
                continue
            acc = y_bin[mask].mean()
            conf = p_bin[mask].mean()
            ece_label += mask.sum() * abs(acc - conf)
        ece_total += ece_label / len(y_arr)
    return ece_total / len(labels)

extract_feature_importance(pipe, X_cols)

Extract feature importances from the clf step of a sklearn Pipeline.

Parameters:

Name Type Description Default
pipe

Fitted sklearn Pipeline with a clf named step.

required
X_cols list[str]

Feature column names in the order the pipeline was trained on.

required

Returns:

Type Description
DataFrame | None

DataFrame with feature and importance columns sorted

DataFrame | None

descending, or None when the step has no importances attribute.

Source code in src/models/metrics.py
def extract_feature_importance(pipe, X_cols: list[str]) -> pd.DataFrame | None:
    """Extract feature importances from the ``clf`` step of a sklearn Pipeline.

    Args:
        pipe: Fitted sklearn Pipeline with a ``clf`` named step.
        X_cols: Feature column names in the order the pipeline was trained on.

    Returns:
        DataFrame with ``feature`` and ``importance`` columns sorted
        descending, or ``None`` when the step has no importances attribute.
    """
    clf = getattr(pipe, "named_steps", {}).get("clf")
    if clf is None:
        return None
    if hasattr(clf, "feature_importances_"):
        importances = clf.feature_importances_
    elif hasattr(clf, "coef_"):
        importances = np.abs(clf.coef_).mean(axis=0)
    else:
        return None
    if len(importances) != len(X_cols):
        return None
    return (
        pd.DataFrame({"feature": X_cols, "importance": importances})
        .sort_values("importance", ascending=False)
        .reset_index(drop=True)
    )

plot_feature_importance(df_imp, top_n=20, title='Feature Importance')

Plot a horizontal bar chart of the top-N feature importances.

Parameters:

Name Type Description Default
df_imp DataFrame

DataFrame with feature and importance columns.

required
top_n int

Number of top features to display.

20
title str

Chart title.

'Feature Importance'

Returns:

Type Description
Figure

Matplotlib Figure with a horizontal bar chart.

Source code in src/models/metrics.py
def plot_feature_importance(
    df_imp: pd.DataFrame,
    top_n: int = 20,
    title: str = "Feature Importance",
) -> plt.Figure:
    """Plot a horizontal bar chart of the top-N feature importances.

    Args:
        df_imp: DataFrame with ``feature`` and ``importance`` columns.
        top_n: Number of top features to display.
        title: Chart title.

    Returns:
        Matplotlib Figure with a horizontal bar chart.
    """
    df_plot = df_imp.head(top_n).iloc[::-1]
    fig, ax = plt.subplots(figsize=(8, max(4, top_n * 0.35)))
    ax.barh(df_plot["feature"], df_plot["importance"])
    ax.set_xlabel("Importance")
    ax.set_title(title)
    plt.tight_layout()
    return fig

compute_segment_metrics(y_true, proba, labels, segments, segment_cols, min_samples=1)

Compute logloss and Brier score per segment value.

Parameters:

Name Type Description Default
y_true Series

Ground-truth class labels.

required
proba ndarray

Predicted probability matrix, shape (n, n_classes).

required
labels list

Ordered list of class labels.

required
segments DataFrame

DataFrame with segment columns aligned with y_true.

required
segment_cols list[str]

Column names in segments to group by.

required
min_samples int

Minimum segment size; smaller groups are skipped.

1

Returns:

Type Description
DataFrame

DataFrame with columns segment_col, segment_value, n,

DataFrame

logloss, brier — one row per (column, value) pair.

Source code in src/models/metrics.py
def compute_segment_metrics(
    y_true: pd.Series,
    proba: np.ndarray,
    labels: list,
    segments: pd.DataFrame,
    segment_cols: list[str],
    min_samples: int = 1,
) -> pd.DataFrame:
    """Compute logloss and Brier score per segment value.

    Args:
        y_true: Ground-truth class labels.
        proba: Predicted probability matrix, shape ``(n, n_classes)``.
        labels: Ordered list of class labels.
        segments: DataFrame with segment columns aligned with *y_true*.
        segment_cols: Column names in *segments* to group by.
        min_samples: Minimum segment size; smaller groups are skipped.

    Returns:
        DataFrame with columns ``segment_col``, ``segment_value``, ``n``,
        ``logloss``, ``brier`` — one row per (column, value) pair.
    """
    y_arr = y_true.to_numpy()
    rows = []
    for col in segment_cols:
        if col not in segments.columns:
            logger.warning(
                "Segment column '%s' not found in segments DataFrame; skipping.", col
            )
            continue
        seg_vals = segments[col].to_numpy()
        for val in np.unique(seg_vals[~pd.isnull(seg_vals)]):
            mask = seg_vals == val
            if mask.sum() < min_samples:
                continue
            try:
                ll = log_loss(y_arr[mask], proba[mask], labels=labels)
            except Exception:
                ll = float("nan")
            brier = multiclass_brier_score(y_arr[mask], proba[mask], labels)
            rows.append(
                {
                    "segment_col": col,
                    "segment_value": val,
                    "n": int(mask.sum()),
                    "logloss": float(ll),
                    "brier": float(brier),
                }
            )
    return pd.DataFrame(
        rows, columns=["segment_col", "segment_value", "n", "logloss", "brier"]
    )

plot_calibration_curves(y_true, proba, labels, label_names)

Plot one-vs-rest calibration reliability diagrams for each class.

Parameters:

Name Type Description Default
y_true Series

Ground-truth class labels.

required
proba ndarray

Predicted probability matrix, shape (n, n_classes).

required
labels list

Ordered list of class labels.

required
label_names dict

Mapping from label value to human-readable name for subplot titles.

required

Returns:

Type Description
Figure

Matplotlib Figure with one reliability diagram per class.

Source code in src/models/metrics.py
def plot_calibration_curves(
    y_true: pd.Series,
    proba: np.ndarray,
    labels: list,
    label_names: dict,
) -> plt.Figure:
    """Plot one-vs-rest calibration reliability diagrams for each class.

    Args:
        y_true: Ground-truth class labels.
        proba: Predicted probability matrix, shape ``(n, n_classes)``.
        labels: Ordered list of class labels.
        label_names: Mapping from label value to human-readable name for
            subplot titles.

    Returns:
        Matplotlib Figure with one reliability diagram per class.
    """
    n_classes = len(labels)
    fig, axes = plt.subplots(1, n_classes, figsize=(5 * n_classes, 4))
    if n_classes == 1:
        axes = [axes]
    y_arr = y_true.to_numpy() if hasattr(y_true, "to_numpy") else np.asarray(y_true)
    for i, label in enumerate(labels):
        y_bin = (y_arr == label).astype(int)
        fraction_pos, mean_pred = calibration_curve(y_bin, proba[:, i], n_bins=10)
        ax = axes[i]
        ax.plot(mean_pred, fraction_pos, "s-", label="Model")
        ax.plot([0, 1], [0, 1], "k--", label="Perfect")
        ax.set_xlabel("Mean predicted probability")
        ax.set_ylabel("Fraction of positives")
        ax.set_title(label_names.get(label, str(label)))
        ax.legend(loc="upper left")
    plt.tight_layout()
    return fig

evaluate_clf(y, proba, label_order)

Evaluate a multiclass classifier and return a flat metrics dict.

Parameters:

Name Type Description Default
y DataFrame

Ground-truth labels (Series or array-like).

required
proba ndarray

Predicted probabilities, shape (n, n_classes). Rows with NaN/Inf are replaced with a uniform prior and a warning is emitted.

required
label_order list

Ordered list of class labels aligned with proba columns.

required

Returns:

Type Description
Dict with keys

logloss, brier, ece, roc_auc_ovr, accuracy,

dict

balanced_accuracy, f1_macro, f1_weighted,

dict

precision_class_{label}, recall_class_{label}.

Source code in src/models/metrics.py
def evaluate_clf(
    y: pd.DataFrame,
    proba: np.ndarray,
    label_order: list,
) -> dict:
    """Evaluate a multiclass classifier and return a flat metrics dict.

    Args:
        y: Ground-truth labels (Series or array-like).
        proba: Predicted probabilities, shape ``(n, n_classes)``.
            Rows with NaN/Inf are replaced with a uniform prior and
            a warning is emitted.
        label_order: Ordered list of class labels aligned with
            ``proba`` columns.

    Returns:
        Dict with keys: logloss, brier, ece, roc_auc_ovr, accuracy,
        balanced_accuracy, f1_macro, f1_weighted,
        precision_class_{label}, recall_class_{label}.
    """
    proba = np.array(proba, dtype=float)
    nan_rows = ~np.isfinite(proba).all(axis=1)
    if nan_rows.any():
        import logging as _logging

        _logging.getLogger(__name__).warning(
            "evaluate_clf: %d/%d rows have NaN/Inf probabilities — "
            "replacing with uniform prior (model likely diverged).",
            int(nan_rows.sum()),
            len(proba),
        )
        proba[nan_rows] = 1.0 / len(label_order)
    logloss = log_loss(y, proba, labels=label_order)
    brier = multiclass_brier_score(y, proba, label_order)
    ece = compute_ece(np.asarray(y), proba, label_order)

    pred_test = np.array(label_order)[np.argmax(proba, axis=1)]
    accuracy = accuracy_score(y, pred_test)
    bal_acc = balanced_accuracy_score(y, pred_test)

    f1_macro = f1_score(y, pred_test, average="macro")
    f1_weighted = f1_score(y, pred_test, average="weighted")

    try:
        roc_auc_ovr = roc_auc_score(
            y, proba, multi_class="ovr", average="macro", labels=label_order
        )
    except Exception:
        roc_auc_ovr = float("nan")

    precision_per_class = precision_score(
        y, pred_test, labels=label_order, average=None, zero_division=0
    )
    recall_per_class = recall_score(
        y, pred_test, labels=label_order, average=None, zero_division=0
    )

    metrics: dict[str, float] = {
        "logloss": float(logloss),
        "brier": float(brier),
        "ece": float(ece),
        "roc_auc_ovr": float(roc_auc_ovr),
        "accuracy": float(accuracy),
        "balanced_accuracy": float(bal_acc),
        "f1_macro": float(f1_macro),
        "f1_weighted": float(f1_weighted),
    }
    for i, label in enumerate(label_order):
        metrics[f"precision_class_{label}"] = float(precision_per_class[i])
        metrics[f"recall_class_{label}"] = float(recall_per_class[i])
    return metrics

Pipelines (sklearn)

WeightedXGBClassifier

Bases: XGBClassifier

XGBClassifier extended to accept the sklearn-style class_weight parameter.

XGBoost does not support class_weight natively in its constructor. This wrapper converts class_weight to a sample_weight array on each :meth:fit call, mirroring the behaviour of sklearn estimators.

Source code in src/models/pipelines.py
class WeightedXGBClassifier(XGBClassifier):
    """XGBClassifier extended to accept the sklearn-style ``class_weight`` parameter.

    XGBoost does not support ``class_weight`` natively in its constructor.
    This wrapper converts ``class_weight`` to a ``sample_weight`` array on
    each :meth:`fit` call, mirroring the behaviour of sklearn estimators.
    """

    def __init__(self, *, class_weight=None, **kwargs):
        """Initialize the classifier, optionally accepting ``class_weight``."""
        super().__init__(**kwargs)
        self.class_weight = class_weight

    def get_xgb_params(self):
        """Return XGBoost booster params, excluding sklearn-only ``class_weight``."""
        params = super().get_xgb_params()
        params.pop("class_weight", None)
        return params

    def fit(self, X, y, sample_weight=None, **kwargs):
        """Fit the XGBoost model, computing sample weights when needed.

        When ``class_weight`` was set in ``__init__`` and no explicit
        ``sample_weight`` is supplied, computes per-sample weights via
        ``sklearn.utils.class_weight.compute_sample_weight``.

        Args:
            X: Feature matrix.
            y: Target labels.
            sample_weight: Optional explicit sample weight array.
                When provided, ``class_weight`` is ignored.
            **kwargs: Additional arguments forwarded to
                ``XGBClassifier.fit``.

        Returns:
            Fitted estimator (self).
        """
        if self.class_weight is not None and sample_weight is None:
            sample_weight = compute_sample_weight(self.class_weight, y)
        return super().fit(X, y, sample_weight=sample_weight, **kwargs)

get_xgb_params()

Return XGBoost booster params, excluding sklearn-only class_weight.

Source code in src/models/pipelines.py
def get_xgb_params(self):
    """Return XGBoost booster params, excluding sklearn-only ``class_weight``."""
    params = super().get_xgb_params()
    params.pop("class_weight", None)
    return params

fit(X, y, sample_weight=None, **kwargs)

Fit the XGBoost model, computing sample weights when needed.

When class_weight was set in __init__ and no explicit sample_weight is supplied, computes per-sample weights via sklearn.utils.class_weight.compute_sample_weight.

Parameters:

Name Type Description Default
X

Feature matrix.

required
y

Target labels.

required
sample_weight

Optional explicit sample weight array. When provided, class_weight is ignored.

None
**kwargs

Additional arguments forwarded to XGBClassifier.fit.

{}

Returns:

Type Description

Fitted estimator (self).

Source code in src/models/pipelines.py
def fit(self, X, y, sample_weight=None, **kwargs):
    """Fit the XGBoost model, computing sample weights when needed.

    When ``class_weight`` was set in ``__init__`` and no explicit
    ``sample_weight`` is supplied, computes per-sample weights via
    ``sklearn.utils.class_weight.compute_sample_weight``.

    Args:
        X: Feature matrix.
        y: Target labels.
        sample_weight: Optional explicit sample weight array.
            When provided, ``class_weight`` is ignored.
        **kwargs: Additional arguments forwarded to
            ``XGBClassifier.fit``.

    Returns:
        Fitted estimator (self).
    """
    if self.class_weight is not None and sample_weight is None:
        sample_weight = compute_sample_weight(self.class_weight, y)
    return super().fit(X, y, sample_weight=sample_weight, **kwargs)

get_models_with_pipeline_for_clf(num_cols, cat_cols, enabled=None, class_weight=None)

Build sklearn Pipelines for each supported classifier.

Available models: "baseline", "logreg", "sgd_logloss", "hgb_numonly", "xgb".

Parameters:

Name Type Description Default
num_cols list[str]

Numeric feature column names.

required
cat_cols list[str]

Categorical feature column names.

required
enabled list[str] | None

Optional allowlist of model keys to include. When None, all models are returned.

None
class_weight

Class-weight spec forwarded to each classifier that supports it. Accepts None, "balanced", or a dict. OmegaConf string-keyed dicts are coerced to int keys.

None

Returns:

Type Description
dict[str, Pipeline]

Dict mapping model name to a fitted-ready sklearn Pipeline.

Source code in src/models/pipelines.py
def get_models_with_pipeline_for_clf(
    num_cols: list[str],
    cat_cols: list[str],
    enabled: list[str] | None = None,
    class_weight=None,
) -> dict[str, Pipeline]:
    """Build sklearn Pipelines for each supported classifier.

    Available models: ``"baseline"``, ``"logreg"``, ``"sgd_logloss"``,
    ``"hgb_numonly"``, ``"xgb"``.

    Args:
        num_cols: Numeric feature column names.
        cat_cols: Categorical feature column names.
        enabled: Optional allowlist of model keys to include. When
            ``None``, all models are returned.
        class_weight: Class-weight spec forwarded to each classifier
            that supports it. Accepts ``None``, ``"balanced"``, or a
            dict. OmegaConf string-keyed dicts are coerced to int keys.

    Returns:
        Dict mapping model name to a fitted-ready sklearn Pipeline.
    """
    numeric_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="median")),
            (
                "scaler",
                StandardScaler(with_mean=False),
            ),  # with_mean=False to preserve sparsity after OHE
        ]
    )

    categorical_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="most_frequent")),
            ("ohe", OneHotEncoder(handle_unknown="ignore")),
        ]
    )

    preprocess_linear = ColumnTransformer(
        transformers=[
            ("num", numeric_transformer, num_cols),
            ("cat", categorical_transformer, cat_cols),
        ],
        remainder="drop",
    )

    preprocess_numeric_only = ColumnTransformer(
        transformers=[
            (
                "num",
                Pipeline(
                    steps=[
                        ("imputer", SimpleImputer(strategy="median")),
                    ]
                ),
                num_cols,
            ),
        ],
        remainder="drop",
    )

    _cw = _coerce_class_weight(class_weight)

    models = {
        "baseline": DummyClassifier(strategy="prior"),
        "logreg": Pipeline(
            steps=[
                ("prep", preprocess_linear),
                (
                    "clf",
                    LogisticRegression(
                        max_iter=3000,
                        solver="saga",
                        class_weight=_cw,
                        random_state=42,
                    ),
                ),
            ]
        ),
        "sgd_logloss": Pipeline(
            steps=[
                ("prep", preprocess_linear),
                (
                    "clf",
                    SGDClassifier(
                        loss="log_loss",
                        alpha=1e-4,
                        penalty="l2",
                        max_iter=3000,
                        tol=1e-3,
                        random_state=42,
                        class_weight=_cw,
                    ),
                ),
            ]
        ),
        "hgb_numonly": Pipeline(
            steps=[
                ("prep", preprocess_numeric_only),
                (
                    "clf",
                    HistGradientBoostingClassifier(
                        max_depth=6,
                        learning_rate=0.05,
                        max_iter=300,
                        random_state=42,
                        class_weight=_cw,
                    ),
                ),
            ]
        ),
        "xgb": Pipeline(
            steps=[
                ("prep", preprocess_numeric_only),
                (
                    "clf",
                    WeightedXGBClassifier(
                        eval_metric="mlogloss",
                        tree_method="hist",  # memory-efficient histogram algorithm; required for large datasets
                        max_bin=128,  # reduce histogram memory (default 256); ~2x less RAM with negligible accuracy loss
                        random_state=42,
                        n_jobs=1,
                        class_weight=_cw,
                    ),
                ),
            ]
        ),
    }
    if enabled is not None:
        models = {k: v for k, v in models.items() if k in enabled}
    return models

Hyperparameter Tuning

Hyperparameter optimisation for model candidates via Optuna.

Design decisions

  • One Optuna study per (model, experiment_name, frac) combination so results are comparable across data-size ablations.
  • Each trial is a nested MLflow run so the full trial history is visible in the MLflow UI alongside the standard training runs.
  • Objective: minimise mean CV log-loss across walk-forward folds (identical evaluation protocol to make_classification_runs).
  • n_jobs=1 inside XGBoost to prevent thread contention when Optuna spawns multiple trials (n_jobs on the study controls parallelism).
  • Best parameters are written back to the parent run as tuned.* tags so the register-model step can read them without additional MLflow queries.
  • Three model families are tuned independently and compared in select_model: LogisticRegression, HistGradientBoosting, XGBoost.

run_xgb_tuning(experiment_name, tracking_uri, df_dataset, df_train_ids, df_folds, X_cols, y_col, num_cols, cat_cols, n_trials=20, frac=1.0, study_name=None, run_kind=None, feat_params=None)

Run an Optuna study to tune XGBoost hyperparameters.

Each Optuna trial is logged as a nested MLflow run under a parent run named xgb_tuning_frac-{frac}.

Parameters

experiment_name: MLflow experiment name (same as the main classification experiment). tracking_uri: MLflow tracking server URI. df_dataset: Full dataset with features + target. df_train_ids: DataFrame with an id column marking training-set matches. df_folds: Walk-forward fold definitions from split_data stage. X_cols: Feature columns (cat + num). y_col: Target column name. num_cols: Numeric feature columns (for ColumnTransformer). cat_cols: Categorical feature columns (for ColumnTransformer). n_trials: Number of Optuna trials. frac: Fraction of training data to use (same semantics as main training). study_name: Optional Optuna study name. Defaults to f"xgb_tuning_{experiment_name}_frac{frac}".

Returns

dict Best hyperparameters found by Optuna.

Source code in src/models/tuning.py
def run_xgb_tuning(
    experiment_name: str,
    tracking_uri: str,
    df_dataset: pd.DataFrame,
    df_train_ids: pd.DataFrame,
    df_folds: pd.DataFrame,
    X_cols: list[str],
    y_col: str,
    num_cols: list[str],
    cat_cols: list[str],
    n_trials: int = 20,
    frac: float = 1.0,
    study_name: str | None = None,
    run_kind: str | None = None,
    feat_params: dict | None = None,
) -> dict:
    """Run an Optuna study to tune XGBoost hyperparameters.

    Each Optuna trial is logged as a nested MLflow run under a parent run
    named ``xgb_tuning_frac-{frac}``.

    Parameters
    ----------
    experiment_name:
        MLflow experiment name (same as the main classification experiment).
    tracking_uri:
        MLflow tracking server URI.
    df_dataset:
        Full dataset with features + target.
    df_train_ids:
        DataFrame with an ``id`` column marking training-set matches.
    df_folds:
        Walk-forward fold definitions from ``split_data`` stage.
    X_cols:
        Feature columns (cat + num).
    y_col:
        Target column name.
    num_cols:
        Numeric feature columns (for ColumnTransformer).
    cat_cols:
        Categorical feature columns (for ColumnTransformer).
    n_trials:
        Number of Optuna trials.
    frac:
        Fraction of training data to use (same semantics as main training).
    study_name:
        Optional Optuna study name. Defaults to
        ``f"xgb_tuning_{experiment_name}_frac{frac}"``.

    Returns
    -------
    dict
        Best hyperparameters found by Optuna.
    """
    import optuna
    from sklearn.compose import ColumnTransformer
    from sklearn.impute import SimpleImputer
    from sklearn.pipeline import Pipeline

    optuna.logging.set_verbosity(optuna.logging.WARNING)

    mlflow.set_tracking_uri(tracking_uri)
    set_experiment_active(experiment_name)

    df_train = df_dataset[df_dataset["id"].isin(df_train_ids["id"])].copy()
    df_train = df_train.tail(int(len(df_train) * frac))
    target_labels = sorted(df_train[y_col].unique())

    study_name = study_name or f"xgb_tuning_{experiment_name}_frac{frac}"

    _rk = run_kind or "tuning"
    _rk_prefix = "smoke | " if _rk == "smoke" else ""
    _base_name = f"{_rk_prefix}tuning | frac={frac}"
    _trial_run_ids: dict[int, str] = {}

    _cw = _coerce_class_weight(feat_params.get("class_weight") if feat_params else None)

    def objective(trial: optuna.Trial) -> float:
        """Optuna objective: evaluate XGBoost params on walk-forward CV."""
        params = {
            "n_estimators": trial.suggest_int("n_estimators", 100, 600, step=50),
            "max_depth": trial.suggest_int("max_depth", 3, 8),
            "learning_rate": trial.suggest_float("learning_rate", 1e-3, 0.2, log=True),
            "subsample": trial.suggest_float("subsample", 0.5, 1.0),
            "colsample_bytree": trial.suggest_float("colsample_bytree", 0.5, 1.0),
            "min_child_weight": trial.suggest_int("min_child_weight", 1, 20),
            "reg_alpha": trial.suggest_float("reg_alpha", 1e-4, 10.0, log=True),
            "reg_lambda": trial.suggest_float("reg_lambda", 1e-4, 10.0, log=True),
        }

        preprocessor = ColumnTransformer(
            transformers=[
                (
                    "num",
                    SimpleImputer(strategy="median"),
                    num_cols,
                ),
            ],
            remainder="drop",
        )
        pipe = Pipeline(
            steps=[
                ("prep", preprocessor),
                (
                    "clf",
                    WeightedXGBClassifier(
                        **params,
                        class_weight=_cw,
                        eval_metric="mlogloss",
                        random_state=42,
                        n_jobs=1,  # prevent thread contention in parallel trials
                    ),
                ),
            ]
        )

        cv_loss = _cv_logloss(pipe, df_train, df_folds, X_cols, y_col, target_labels)

        with mlflow.start_run(
            run_name=f"{_base_name} | trial {trial.number:03d}"
        ) as _trial_run:
            _trial_run_ids[trial.number] = _trial_run.info.run_id
            mlflow.log_params(
                {
                    "tuning.n_trials": n_trials,
                    "tuning.frac": frac,
                    "tuning.n_features": len(X_cols),
                    "tuning.study_name": study_name,
                    **(
                        build_features_selected_params(feat_params)
                        if feat_params
                        else {}
                    ),
                }
            )
            mlflow.log_params({f"xgb.{k}": v for k, v in params.items()})
            mlflow.log_metric("cv.logloss_mean", cv_loss)
            mlflow.log_metric("trial.number", trial.number)

        return cv_loss

    study = optuna.create_study(
        study_name=study_name,
        direction="minimize",
        sampler=optuna.samplers.TPESampler(seed=42),
    )
    study.optimize(objective, n_trials=n_trials, show_progress_bar=False)

    best_params = study.best_params
    best_value = study.best_value

    with mlflow.start_run(run_id=_trial_run_ids[study.best_trial.number]):
        mlflow.set_tag("tuning.is_best", "true")
        mlflow.log_params({f"best.{k}": v for k, v in best_params.items()})
        mlflow.log_metric("best.cv_logloss", best_value)
        best_pipe = get_models_with_pipeline_for_clf(
            num_cols=num_cols, cat_cols=cat_cols, enabled=["xgb"], class_weight=_cw
        )["xgb"]
        best_pipe.set_params(**{f"clf__{k}": v for k, v in best_params.items()})
        _log_best_trial_metrics(
            best_pipe, df_train, df_folds, X_cols, y_col, target_labels
        )

    logger.info(
        "Optuna study finished: best CV logloss=%.4f, params=%s",
        best_value,
        best_params,
    )

    return {"best_params": best_params, "cv_logloss": best_value}

run_logreg_tuning(experiment_name, tracking_uri, df_dataset, df_train_ids, df_folds, X_cols, y_col, num_cols, cat_cols, n_trials=20, frac=1.0, study_name=None, run_kind=None, feat_params=None)

Run an Optuna study to tune LogisticRegression hyperparameters.

Searches over regularisation strength (C) and penalty type.

Parameters

experiment_name: MLflow experiment name. tracking_uri: MLflow tracking server URI. df_dataset: Full dataset with features, target, and split IDs. df_train_ids: DataFrame with id column for training matches. df_folds: Cross-validation fold definitions. X_cols: Feature column names. y_col: Target column name. num_cols: Numeric feature column names. cat_cols: Categorical feature column names. n_trials: Number of Optuna trials. frac: Fraction of training data to use. study_name: Optuna study name. Auto-generated when None. run_kind: MLflow run kind tag (e.g. "tuning" or "smoke"). feat_params: Feature-selection params dict for MLflow logging.

Returns

dict Best hyperparameters compatible with sklearn.linear_model.LogisticRegression.

Source code in src/models/tuning.py
def run_logreg_tuning(
    experiment_name: str,
    tracking_uri: str,
    df_dataset: pd.DataFrame,
    df_train_ids: pd.DataFrame,
    df_folds: pd.DataFrame,
    X_cols: list[str],
    y_col: str,
    num_cols: list[str],
    cat_cols: list[str],
    n_trials: int = 20,
    frac: float = 1.0,
    study_name: str | None = None,
    run_kind: str | None = None,
    feat_params: dict | None = None,
) -> dict:
    """Run an Optuna study to tune LogisticRegression hyperparameters.

    Searches over regularisation strength (C) and penalty type.

    Parameters
    ----------
    experiment_name:
        MLflow experiment name.
    tracking_uri:
        MLflow tracking server URI.
    df_dataset:
        Full dataset with features, target, and split IDs.
    df_train_ids:
        DataFrame with ``id`` column for training matches.
    df_folds:
        Cross-validation fold definitions.
    X_cols:
        Feature column names.
    y_col:
        Target column name.
    num_cols:
        Numeric feature column names.
    cat_cols:
        Categorical feature column names.
    n_trials:
        Number of Optuna trials.
    frac:
        Fraction of training data to use.
    study_name:
        Optuna study name.  Auto-generated when ``None``.
    run_kind:
        MLflow run kind tag (e.g. ``"tuning"`` or ``"smoke"``).
    feat_params:
        Feature-selection params dict for MLflow logging.

    Returns
    -------
    dict
        Best hyperparameters compatible with
        ``sklearn.linear_model.LogisticRegression``.
    """
    import optuna
    from sklearn.compose import ColumnTransformer
    from sklearn.impute import SimpleImputer
    from sklearn.linear_model import LogisticRegression
    from sklearn.pipeline import Pipeline
    from sklearn.preprocessing import StandardScaler

    optuna.logging.set_verbosity(optuna.logging.WARNING)

    mlflow.set_tracking_uri(tracking_uri)
    set_experiment_active(experiment_name)

    df_train = df_dataset[df_dataset["id"].isin(df_train_ids["id"])].copy()
    df_train = df_train.tail(int(len(df_train) * frac))
    target_labels = sorted(df_train[y_col].unique())

    study_name = study_name or f"logreg_tuning_{experiment_name}_frac{frac}"
    _rk = run_kind or "tuning"
    _rk_prefix = "smoke | " if _rk == "smoke" else ""
    _base_name = f"{_rk_prefix}logreg_tuning | frac={frac}"
    _trial_run_ids: dict[int, str] = {}
    _cw = _coerce_class_weight(feat_params.get("class_weight") if feat_params else None)

    def objective(trial: optuna.Trial) -> float:
        """Optuna objective for logistic regression hyperparameter search."""
        penalty = trial.suggest_categorical("penalty", ["l1", "l2"])
        params = {
            "C": trial.suggest_float("C", 1e-4, 10.0, log=True),
            "penalty": penalty,
            "solver": "saga",
            "max_iter": 3000,
            "random_state": 42,
        }

        preprocessor = ColumnTransformer(
            transformers=[
                (
                    "num",
                    Pipeline(
                        [
                            ("imp", SimpleImputer(strategy="median")),
                            ("scl", StandardScaler()),
                        ]
                    ),
                    num_cols,
                ),
                ("cat", SimpleImputer(strategy="most_frequent"), cat_cols),
            ],
            remainder="drop",
        )
        pipe = Pipeline(
            [
                ("prep", preprocessor),
                ("clf", LogisticRegression(**params, class_weight=_cw)),
            ]
        )

        cv_loss = _cv_logloss(pipe, df_train, df_folds, X_cols, y_col, target_labels)

        with mlflow.start_run(
            run_name=f"{_base_name} | trial {trial.number:03d}"
        ) as _trial_run:
            _trial_run_ids[trial.number] = _trial_run.info.run_id
            mlflow.log_params(
                {
                    "tuning.model": "logreg",
                    "tuning.n_trials": n_trials,
                    "tuning.frac": frac,
                    "tuning.n_features": len(X_cols),
                    "tuning.study_name": study_name,
                    **(
                        build_features_selected_params(feat_params)
                        if feat_params
                        else {}
                    ),
                }
            )
            mlflow.log_params({f"logreg.{k}": v for k, v in params.items()})
            mlflow.log_metric("cv.logloss_mean", cv_loss)

        return cv_loss

    study = optuna.create_study(
        study_name=study_name,
        direction="minimize",
        sampler=optuna.samplers.TPESampler(seed=42),
    )
    study.optimize(objective, n_trials=n_trials, show_progress_bar=False)

    best_params = study.best_params
    best_value = study.best_value

    with mlflow.start_run(run_id=_trial_run_ids[study.best_trial.number]):
        mlflow.set_tag("tuning.is_best", "true")
        mlflow.log_params({f"best.{k}": v for k, v in best_params.items()})
        mlflow.log_metric("best.cv_logloss", best_value)
        best_pipe = get_models_with_pipeline_for_clf(
            num_cols=num_cols, cat_cols=cat_cols, enabled=["logreg"], class_weight=_cw
        )["logreg"]
        best_pipe.set_params(**{f"clf__{k}": v for k, v in best_params.items()})
        _log_best_trial_metrics(
            best_pipe, df_train, df_folds, X_cols, y_col, target_labels
        )

    logger.info(
        "LogReg Optuna study finished: best CV logloss=%.4f, params=%s",
        best_value,
        best_params,
    )

    return {"best_params": best_params, "cv_logloss": best_value}

run_hgb_tuning(experiment_name, tracking_uri, df_dataset, df_train_ids, df_folds, X_cols, y_col, num_cols, cat_cols, n_trials=20, frac=1.0, study_name=None, run_kind=None, feat_params=None)

Run an Optuna study to tune HistGradientBoostingClassifier hyperparameters.

Searches over max_depth, learning_rate, max_iter, and l2_regularization.

Parameters

experiment_name: MLflow experiment name. tracking_uri: MLflow tracking server URI. df_dataset: Full dataset with features, target, and split IDs. df_train_ids: DataFrame with id column for training matches. df_folds: Cross-validation fold definitions. X_cols: Feature column names. y_col: Target column name. num_cols: Numeric feature column names. cat_cols: Categorical feature column names. n_trials: Number of Optuna trials. frac: Fraction of training data to use. study_name: Optuna study name. Auto-generated when None. run_kind: MLflow run kind tag (e.g. "tuning" or "smoke"). feat_params: Feature-selection params dict for MLflow logging.

Returns

dict Best hyperparameters compatible with sklearn.ensemble.HistGradientBoostingClassifier.

Source code in src/models/tuning.py
def run_hgb_tuning(
    experiment_name: str,
    tracking_uri: str,
    df_dataset: pd.DataFrame,
    df_train_ids: pd.DataFrame,
    df_folds: pd.DataFrame,
    X_cols: list[str],
    y_col: str,
    num_cols: list[str],
    cat_cols: list[str],
    n_trials: int = 20,
    frac: float = 1.0,
    study_name: str | None = None,
    run_kind: str | None = None,
    feat_params: dict | None = None,
) -> dict:
    """Run an Optuna study to tune HistGradientBoostingClassifier hyperparameters.

    Searches over max_depth, learning_rate, max_iter, and l2_regularization.

    Parameters
    ----------
    experiment_name:
        MLflow experiment name.
    tracking_uri:
        MLflow tracking server URI.
    df_dataset:
        Full dataset with features, target, and split IDs.
    df_train_ids:
        DataFrame with ``id`` column for training matches.
    df_folds:
        Cross-validation fold definitions.
    X_cols:
        Feature column names.
    y_col:
        Target column name.
    num_cols:
        Numeric feature column names.
    cat_cols:
        Categorical feature column names.
    n_trials:
        Number of Optuna trials.
    frac:
        Fraction of training data to use.
    study_name:
        Optuna study name.  Auto-generated when ``None``.
    run_kind:
        MLflow run kind tag (e.g. ``"tuning"`` or ``"smoke"``).
    feat_params:
        Feature-selection params dict for MLflow logging.

    Returns
    -------
    dict
        Best hyperparameters compatible with
        ``sklearn.ensemble.HistGradientBoostingClassifier``.
    """
    import optuna
    from sklearn.compose import ColumnTransformer
    from sklearn.ensemble import HistGradientBoostingClassifier
    from sklearn.impute import SimpleImputer
    from sklearn.pipeline import Pipeline

    optuna.logging.set_verbosity(optuna.logging.WARNING)

    mlflow.set_tracking_uri(tracking_uri)
    set_experiment_active(experiment_name)

    df_train = df_dataset[df_dataset["id"].isin(df_train_ids["id"])].copy()
    df_train = df_train.tail(int(len(df_train) * frac))
    target_labels = sorted(df_train[y_col].unique())

    study_name = study_name or f"hgb_tuning_{experiment_name}_frac{frac}"
    _rk = run_kind or "tuning"
    _rk_prefix = "smoke | " if _rk == "smoke" else ""
    _base_name = f"{_rk_prefix}hgb_tuning | frac={frac}"
    _trial_run_ids: dict[int, str] = {}
    _cw = _coerce_class_weight(feat_params.get("class_weight") if feat_params else None)

    def objective(trial: optuna.Trial) -> float:
        """Optuna objective for HistGradientBoosting hyperparameter search."""
        params = {
            "max_depth": trial.suggest_int("max_depth", 3, 10),
            "learning_rate": trial.suggest_float("learning_rate", 1e-3, 0.2, log=True),
            "max_iter": trial.suggest_int("max_iter", 100, 500, step=50),
            "l2_regularization": trial.suggest_float(
                "l2_regularization", 1e-4, 10.0, log=True
            ),
            "min_samples_leaf": trial.suggest_int("min_samples_leaf", 10, 100),
            "random_state": 42,
        }

        preprocessor = ColumnTransformer(
            transformers=[("num", SimpleImputer(strategy="median"), num_cols)],
            remainder="drop",
        )
        pipe = Pipeline(
            [
                ("prep", preprocessor),
                ("clf", HistGradientBoostingClassifier(**params, class_weight=_cw)),
            ]
        )

        cv_loss = _cv_logloss(pipe, df_train, df_folds, X_cols, y_col, target_labels)

        with mlflow.start_run(
            run_name=f"{_base_name} | trial {trial.number:03d}"
        ) as _trial_run:
            _trial_run_ids[trial.number] = _trial_run.info.run_id
            mlflow.log_params(
                {
                    "tuning.model": "hgb",
                    "tuning.n_trials": n_trials,
                    "tuning.frac": frac,
                    "tuning.n_features": len(X_cols),
                    "tuning.study_name": study_name,
                    **(
                        build_features_selected_params(feat_params)
                        if feat_params
                        else {}
                    ),
                }
            )
            mlflow.log_params({f"hgb.{k}": v for k, v in params.items()})
            mlflow.log_metric("cv.logloss_mean", cv_loss)

        return cv_loss

    study = optuna.create_study(
        study_name=study_name,
        direction="minimize",
        sampler=optuna.samplers.TPESampler(seed=42),
    )
    study.optimize(objective, n_trials=n_trials, show_progress_bar=False)

    best_params = study.best_params
    best_value = study.best_value

    with mlflow.start_run(run_id=_trial_run_ids[study.best_trial.number]):
        mlflow.set_tag("tuning.is_best", "true")
        mlflow.log_params({f"best.{k}": v for k, v in best_params.items()})
        mlflow.log_metric("best.cv_logloss", best_value)
        best_pipe = get_models_with_pipeline_for_clf(
            num_cols=num_cols,
            cat_cols=cat_cols,
            enabled=["hgb_numonly"],
            class_weight=_cw,
        )["hgb_numonly"]
        best_pipe.set_params(**{f"clf__{k}": v for k, v in best_params.items()})
        _log_best_trial_metrics(
            best_pipe, df_train, df_folds, X_cols, y_col, target_labels
        )

    logger.info(
        "HGB Optuna study finished: best CV logloss=%.4f, params=%s",
        best_value,
        best_params,
    )

    return {"best_params": best_params, "cv_logloss": best_value}

Classification

make_classification_runs(experiment_name, tracking_uri, dataset_path, df_dataset, df_train_ids, df_test_ids, df_folds, X_cols, y_cols, models, frac, cat_cols, num_cols, experiment_hypothesis=None, dvc_params=None, run_name=None)

Train and evaluate all model/frac combinations, log to MLflow.

Artifacts are NOT registered in the Model Registry here. Registration (including alias assignment) is the responsibility of the dedicated register_model pipeline stage.

Parameters:

Name Type Description Default
experiment_name str

MLflow experiment name.

required
tracking_uri str

MLflow tracking server URI.

required
dataset_path str

Filesystem path to the dataset parquet (logged as MLflow input source).

required
df_dataset DataFrame

Full dataset with features, target, and split IDs.

required
df_train_ids DataFrame

DataFrame with id column for training matches.

required
df_test_ids DataFrame

DataFrame with id column for holdout matches.

required
df_folds DataFrame

Cross-validation fold definitions, logged as CSV.

required
X_cols list

Feature column names in model-expected order.

required
y_cols str

Target column name.

required
models dict

Dict of model name → sklearn Pipeline.

required
frac float

Fraction of training data to use (for smoke runs).

required
cat_cols list

Categorical feature column names.

required
num_cols list

Numeric feature column names.

required
experiment_hypothesis str | None

Optional hypothesis tag for the MLflow run.

None
dvc_params dict | None

Full DVC params dict to flatten and log.

None
run_name str | None

Optional display name prefix for the run.

None

Returns:

Type Description
str

Tuple of (run_id, model_uri, model_name) for the best run (lowest

str

holdout logloss). model_uri is the authoritative URI returned by

str

log_model — version-agnostic across MLflow 2.x and 3.x.

Source code in src/models/classification.py
def make_classification_runs(
    experiment_name: str,
    tracking_uri: str,
    dataset_path: str,
    df_dataset: pd.DataFrame,
    df_train_ids: pd.DataFrame,
    df_test_ids: pd.DataFrame,
    df_folds: pd.DataFrame,
    X_cols: list,
    y_cols: str,
    models: dict,
    frac: float,
    cat_cols: list,
    num_cols: list,
    experiment_hypothesis: str | None = None,
    dvc_params: dict | None = None,
    run_name: str | None = None,
) -> tuple[str, str, str]:
    """Train and evaluate all model/frac combinations, log to MLflow.

    Artifacts are NOT registered in the Model Registry here.
    Registration (including alias assignment) is the responsibility of
    the dedicated ``register_model`` pipeline stage.

    Args:
        experiment_name: MLflow experiment name.
        tracking_uri: MLflow tracking server URI.
        dataset_path: Filesystem path to the dataset parquet (logged as
            MLflow input source).
        df_dataset: Full dataset with features, target, and split IDs.
        df_train_ids: DataFrame with ``id`` column for training matches.
        df_test_ids: DataFrame with ``id`` column for holdout matches.
        df_folds: Cross-validation fold definitions, logged as CSV.
        X_cols: Feature column names in model-expected order.
        y_cols: Target column name.
        models: Dict of model name → sklearn Pipeline.
        frac: Fraction of training data to use (for smoke runs).
        cat_cols: Categorical feature column names.
        num_cols: Numeric feature column names.
        experiment_hypothesis: Optional hypothesis tag for the MLflow run.
        dvc_params: Full DVC params dict to flatten and log.
        run_name: Optional display name prefix for the run.

    Returns:
        Tuple of (run_id, model_uri, model_name) for the best run (lowest
        holdout logloss).  ``model_uri`` is the authoritative URI returned by
        ``log_model`` — version-agnostic across MLflow 2.x and 3.x.
    """
    mlflow.set_tracking_uri(tracking_uri)
    set_experiment_active(experiment_name)

    stage = "train_eval"
    _is_smoke = infer_run_kind(experiment_name, stage) == "smoke"
    _rk_prefix = "smoke | " if _is_smoke else ""

    # Accumulate (run_id, model_uri, holdout_logloss, model_name) for all model runs
    _run_candidates: list[tuple[str, str, float, str]] = []

    _base_name = run_name if run_name else f"frac={frac}"
    _base_name = f"{_rk_prefix}{_base_name}"

    df_train = df_dataset[df_dataset["id"].isin(df_train_ids["id"])].copy()
    df_holdout = df_dataset[df_dataset["id"].isin(df_test_ids["id"])].copy()

    target_col = y_cols
    target_labels = sorted(df_train[target_col].unique())

    for name_model, pipe in models.items():
        with mlflow.start_run(run_name=f"{_base_name} | {name_model}") as model_run:
            logger.info(
                "Starting run: %s (id=%s)",
                f"{_base_name} | {name_model}",
                model_run.info.run_id,
            )
            mlflow.set_tags(build_pipeline_context_tags())
            mlflow.set_tag("pipeline.stage", "classification_models")
            mlflow.set_tag(
                "pipeline.run_kind", infer_run_kind(experiment_name, "train_eval")
            )
            if experiment_hypothesis:
                mlflow.set_tag("experiment.hypothesis", experiment_hypothesis)
            if dvc_params:
                mlflow.log_params(_flatten_params(dvc_params))
            mlflow.log_text(df_folds.to_csv(index=False), "folds.csv")
            fold_metrics_list = []

            for _, fold in df_folds.iterrows():
                df_cv_train = df_train[
                    (df_train["startTimeUtc"] >= fold.train_start)
                    & (df_train["startTimeUtc"] <= fold.train_end)
                ]
                df_cv_valid = df_train[
                    (df_train["startTimeUtc"] >= fold.valid_start)
                    & (df_train["startTimeUtc"] <= fold.valid_end)
                ]
                df_cv_train = df_cv_train.tail(int(df_cv_train.shape[0] * frac))

                X_train = df_cv_train[X_cols].copy()
                y_train = df_cv_train[target_col].copy()
                X_valid = df_cv_valid[X_cols].copy()
                y_valid = df_cv_valid[target_col].copy()

                start_time = pd.Timestamp.now()
                pipe.fit(X_train, y_train)
                time_fit = (pd.Timestamp.now() - start_time).total_seconds()

                metrics = evaluate_clf(
                    y=y_valid,
                    proba=pipe.predict_proba(X_valid),
                    label_order=target_labels,
                )
                metrics["time_fit_sec"] = time_fit
                fold_metrics_list.append(metrics)

            df_cv = pd.DataFrame(fold_metrics_list)
            cv_summary = {}
            for col in df_cv.columns:
                cv_summary[f"cv.{col}_mean"] = float(df_cv[col].mean())
                cv_summary[f"cv.{col}_std"] = float(df_cv[col].std())
                cv_summary[f"cv.{col}_min"] = float(df_cv[col].min())
                cv_summary[f"cv.{col}_max"] = float(df_cv[col].max())
            df_cv["fold"] = df_folds["fold"]
            mlflow.log_metrics(cv_summary)
            mlflow.log_text(df_cv.to_csv(index=False), "cv_metrics.csv")

            df_frac_train = df_train.tail(int(df_train.shape[0] * frac))

            X_train = df_frac_train[X_cols].copy()
            y_train = df_frac_train[target_col].copy()
            X_valid = df_holdout[X_cols].copy()
            y_valid = df_holdout[target_col].copy()

            dataset = mlflow.data.from_pandas(
                X_train,
                source=dataset_path,
                name="matches_train",
            )
            dvc_hash = get_dvc_hash(dataset_path)
            mlflow.log_input(dataset, context="training")

            # ── Lineage, pipeline context, model metadata ──────────
            mlflow.set_tags(
                build_data_lineage_tags(
                    dataset_path=dataset_path,
                    df_train=df_train,
                    df_test=df_holdout,
                    dvc_hash=dvc_hash,
                )
            )
            mlflow.log_params(
                {
                    **build_model_metadata_params(
                        model_name=name_model,
                        target=target_col,
                        num_feature_count=len(num_cols),
                        cat_feature_count=len(cat_cols),
                    ),
                    "data.train_frac": frac,
                    "features.cat": len(cat_cols),
                    "features.num": len(num_cols),
                }
            )

            start_time = pd.Timestamp.now()
            pipe.fit(X_train, y_train)
            time_fit = (pd.Timestamp.now() - start_time).total_seconds()

            metrics = evaluate_clf(
                y=y_valid,
                proba=pipe.predict_proba(X_valid),
                label_order=target_labels,
            )
            metrics["time_fit_sec"] = time_fit

            mlflow.log_metrics({f"final.{k}": float(v) for k, v in metrics.items()})
            # Cast integer feature columns to float64 so the logged
            # MLflow signature uses float dtype.  Without this, columns
            # that have NaN at inference time (cold-start rolling features)
            # are silently promoted to float64 by NumPy, which then
            # triggers MLflow's schema enforcement error.
            int_cols = X_train.select_dtypes(include=["integer"]).columns.tolist()
            X_example = X_train.head(5).astype({c: "float64" for c in int_cols})
            model_info = mlflow.sklearn.log_model(
                sk_model=pipe,
                name=_ARTIFACT_PATH,
                pyfunc_predict_fn="predict_proba",
                input_example=X_example,
            )
            logger.info(
                "log_model done: run_id=%s artifact_uri=%s",
                model_run.info.run_id,
                model_info.model_uri,
            )

            fig = plot_confusion_matrix_multiclass(
                y_valid, pipe.predict(X_valid), target_labels
            )

            mlflow.log_figure(
                fig,
                f"confusion_matrix_frac-{frac}_features-{len(X_cols)}.png",
            )
            _run_candidates.append(
                (
                    model_run.info.run_id,
                    model_info.model_uri,  # authoritative URI from MLflow
                    metrics.get("logloss", float("inf")),
                    name_model,
                )
            )

    best_run_id, best_model_uri, _, best_model_name = min(
        _run_candidates, key=lambda x: x[2]
    )
    logger.info(
        "Best run_id=%s model_uri=%s model=%s (holdout logloss)",
        best_run_id,
        best_model_uri,
        best_model_name,
    )
    return best_run_id, best_model_uri, best_model_name

Final Training

Final training stage: retrain the winning model on the full training set.

Design decisions

  • This is the ONLY stage that evaluates on the holdout set for the purpose of reporting. All model selection (screening, tuning) is done exclusively on cross-validation to prevent test-set leakage.
  • best_params are applied via the sklearn Pipeline param convention (step__param) so they are compatible with any Pipeline-wrapped model.
  • For XGBoost the tuned params replace the defaults from get_models_with_pipeline_for_clf.
  • When calibration is enabled, the model is trained on the chronologically earliest (1 - calib_frac) portion of the training set, then a CalibratedClassifierCV wrapper is fitted on the most recent calib_frac portion (temporal split — no random split on time-series data).
  • The calibrated pipeline is registered to MLflow; raw ECE is also logged for comparison so the calibration benefit is visible in the report.

make_final_train_run(experiment_name, tracking_uri, dataset_path, df_dataset, df_train_ids, df_test_ids, X_cols, y_col, model_name, best_params, num_cols, cat_cols, calibration_config=None, run_kind='final_train', feat_params=None)

Retrain winning model on full training set; evaluate once on holdout.

Parameters

experiment_name: MLflow experiment name (same as classification experiment). tracking_uri: MLflow tracking server URI. dataset_path: Filesystem path to the dataset parquet (logged as MLflow input source). df_dataset: Full dataset containing features, target, and split identifiers. df_train_ids: DataFrame with id column marking training-set matches. df_test_ids: DataFrame with id column marking holdout-set matches. X_cols: Feature columns (cat + num) in the order the model expects. y_col: Target column name. model_name: Key into get_models_with_pipeline_for_clf (e.g. "xgb"). best_params: Hyperparameters from the tuning stage. Applied via sklearn Pipeline step__param convention (e.g. {"n_estimators": 300} becomes clf__n_estimators=300). Pass an empty dict to use defaults. num_cols: Numeric feature columns (passed to pipeline factory). cat_cols: Categorical feature columns (passed to pipeline factory). calibration_config: Optional dict with keys enabled (bool), method (str), calib_frac (float), min_calib_samples (int). When enabled=True a temporal calibration split is applied: the model is trained on the earliest (1 - calib_frac) training rows, then a CalibratedClassifierCV wrapper is fitted on the remaining most-recent training rows. The calibrated pipeline is registered.

Returns

tuple[str, str] (run_id, model_uri) for the final MLflow run.

Source code in src/models/final_train.py
def make_final_train_run(
    experiment_name: str,
    tracking_uri: str,
    dataset_path: str,
    df_dataset: pd.DataFrame,
    df_train_ids: pd.DataFrame,
    df_test_ids: pd.DataFrame,
    X_cols: list[str],
    y_col: str,
    model_name: str,
    best_params: dict,
    num_cols: list[str],
    cat_cols: list[str],
    calibration_config: dict | None = None,
    run_kind: str = "final_train",
    feat_params: dict | None = None,
) -> tuple[str, str]:
    """Retrain winning model on full training set; evaluate once on holdout.

    Parameters
    ----------
    experiment_name:
        MLflow experiment name (same as classification experiment).
    tracking_uri:
        MLflow tracking server URI.
    dataset_path:
        Filesystem path to the dataset parquet (logged as MLflow input source).
    df_dataset:
        Full dataset containing features, target, and split identifiers.
    df_train_ids:
        DataFrame with ``id`` column marking training-set matches.
    df_test_ids:
        DataFrame with ``id`` column marking holdout-set matches.
    X_cols:
        Feature columns (cat + num) in the order the model expects.
    y_col:
        Target column name.
    model_name:
        Key into ``get_models_with_pipeline_for_clf`` (e.g. ``"xgb"``).
    best_params:
        Hyperparameters from the tuning stage.  Applied via sklearn Pipeline
        ``step__param`` convention (e.g. ``{"n_estimators": 300}`` becomes
        ``clf__n_estimators=300``).  Pass an empty dict to use defaults.
    num_cols:
        Numeric feature columns (passed to pipeline factory).
    cat_cols:
        Categorical feature columns (passed to pipeline factory).
    calibration_config:
        Optional dict with keys ``enabled`` (bool), ``method`` (str),
        ``calib_frac`` (float), ``min_calib_samples`` (int).
        When ``enabled=True`` a temporal calibration split is applied:
        the model is trained on the earliest (1 - calib_frac) training rows,
        then a ``CalibratedClassifierCV`` wrapper is fitted on the remaining
        most-recent training rows.  The calibrated pipeline is registered.

    Returns
    -------
    tuple[str, str]
        ``(run_id, model_uri)`` for the final MLflow run.
    """
    mlflow.set_tracking_uri(tracking_uri)
    set_experiment_active(experiment_name)

    df_train = df_dataset[df_dataset["id"].isin(df_train_ids["id"])].copy()
    df_holdout = df_dataset[df_dataset["id"].isin(df_test_ids["id"])].copy()

    target_labels = sorted(df_train[y_col].unique())

    _class_weight = feat_params.get("class_weight") if feat_params else None
    models = get_models_with_pipeline_for_clf(
        num_cols=num_cols, cat_cols=cat_cols, class_weight=_class_weight
    )
    if model_name not in models:
        raise ValueError(
            f"Unknown model_name={model_name!r}. Available: {list(models.keys())}"
        )
    pipe = models[model_name]

    # Apply tuned hyperparameters via sklearn Pipeline convention (clf__param_name)
    if best_params:
        sklearn_params = {f"clf__{k}": v for k, v in best_params.items()}
        pipe.set_params(**sklearn_params)
        logger.info(
            "Applied %d tuned hyperparameters to pipeline '%s'",
            len(best_params),
            model_name,
        )
    else:
        logger.info(
            "No best_params provided — using default hyperparameters for '%s'",
            model_name,
        )

    X_train = df_train[X_cols].copy()
    y_train = df_train[y_col].copy()
    X_holdout = df_holdout[X_cols].copy()
    y_holdout = df_holdout[y_col].copy()

    _rk_prefix = f"{run_kind} | " if run_kind == "smoke" else ""
    run_name = f"{_rk_prefix}final_train | model={model_name}"
    description = (
        f"Final retrain of '{model_name}' on full training set "
        f"({len(X_train)} rows). Holdout evaluation is performed here only."
    )

    with mlflow.start_run(run_name=run_name, description=description) as run:
        dvc_hash = get_dvc_hash(dataset_path)
        mlflow.set_tags(build_pipeline_context_tags())
        mlflow.set_tags(
            build_data_lineage_tags(
                dataset_path=dataset_path,
                df_train=df_train,
                df_test=df_holdout,
                dvc_hash=dvc_hash,
            )
        )

        # Free large DataFrames before training — X/y arrays carry everything needed.
        # Keeps only the groupby columns required for post-training segment metrics.
        _seg_cols = [c for c in ["tournamentId", "regionId"] if c in df_holdout.columns]
        _df_holdout_seg = (
            df_holdout[_seg_cols].reset_index(drop=True) if _seg_cols else None
        )
        del df_train, df_holdout

        mlflow.log_params(
            {
                **build_model_metadata_params(
                    model_name=model_name,
                    target=y_col,
                    num_feature_count=len(num_cols),
                    cat_feature_count=len(cat_cols),
                    best_params=best_params,
                ),
                "data.train_frac": 1.0,
                "features.num": len(num_cols),
                "features.cat": len(cat_cols),
                **(build_features_selected_params(feat_params) if feat_params else {}),
                **{f"best.{k}": v for k, v in best_params.items()},
            }
        )

        dataset = mlflow.data.from_pandas(
            X_train,
            source=dataset_path,
            name="matches_train_final",
        )
        mlflow.log_input(dataset, context="training")

        # ── Training (with optional post-hoc calibration) ──────────────────
        calib_cfg = calibration_config or {}
        calib_enabled: bool = bool(calib_cfg.get("enabled", False))
        production_pipe = pipe  # replaced below if calibration succeeds

        start_time = pd.Timestamp.now()
        if calib_enabled:
            method: str = str(calib_cfg.get("method", "isotonic"))
            calib_frac: float = float(calib_cfg.get("calib_frac", 0.15))
            min_calib: int = int(calib_cfg.get("min_calib_samples", 100))
            n_calib = max(min_calib, int(len(X_train) * calib_frac))

            if n_calib >= len(X_train):
                logger.warning(
                    "Calibration set size (%d) >= training set (%d) — "
                    "falling back to full training without calibration.",
                    n_calib,
                    len(X_train),
                )
                pipe.fit(X_train, y_train)
            else:
                # Temporal split: earliest rows for training, most-recent for calibration.
                X_tr, X_cal = X_train.iloc[:-n_calib], X_train.iloc[-n_calib:]
                y_tr, y_cal = y_train.iloc[:-n_calib], y_train.iloc[-n_calib:]
                pipe.fit(X_tr, y_tr)
                frozen = FrozenEstimator(pipe)

                if method == "auto":
                    # Compare sigmoid vs isotonic; register the one with lower ECE
                    # on the calibration set — no holdout leakage.
                    _candidates: dict[str, tuple] = {}
                    for _m in ("sigmoid", "isotonic"):
                        _cal = CalibratedClassifierCV(frozen, method=_m)
                        _cal.fit(X_cal, y_cal)
                        _ece = compute_ece(
                            y_cal.to_numpy(),
                            _cal.predict_proba(X_cal),
                            target_labels,
                        )
                        _candidates[_m] = (_cal, _ece)
                        logger.info(
                            "Calibration candidate %s: calib-set ECE=%.4f", _m, _ece
                        )
                    chosen_method = min(_candidates, key=lambda m: _candidates[m][1])
                    calibrated, _ = _candidates[chosen_method]
                    mlflow.log_params(
                        {
                            "calibration.auto.sigmoid_ece": float(
                                _candidates["sigmoid"][1]
                            ),
                            "calibration.auto.isotonic_ece": float(
                                _candidates["isotonic"][1]
                            ),
                            "calibration.auto.chosen_method": chosen_method,
                        }
                    )
                    logger.info("Auto-calibration: chose '%s'", chosen_method)
                    method = chosen_method
                else:
                    # FrozenEstimator (sklearn 1.6+) prevents CalibratedClassifierCV
                    # from refitting the estimator — equivalent to the removed cv='prefit'.
                    calibrated = CalibratedClassifierCV(frozen, method=method)
                    calibrated.fit(X_cal, y_cal)

                production_pipe = calibrated
                mlflow.log_params(
                    {
                        "calibration.enabled": True,
                        "calibration.method": method,
                        "calibration.calib_frac": calib_frac,
                        "calibration.n_calib_samples": n_calib,
                    }
                )
                logger.info(
                    "Calibration applied: method=%s, n_calib=%d",
                    method,
                    n_calib,
                )
        else:
            mlflow.log_param("calibration.enabled", False)
            pipe.fit(X_train, y_train)

        time_fit = (pd.Timestamp.now() - start_time).total_seconds()

        # ── Holdout evaluation — the ONE authoritative evaluation ──────────
        metrics = evaluate_clf(
            y=y_holdout,
            proba=production_pipe.predict_proba(X_holdout),
            label_order=target_labels,
        )
        metrics["time_fit_sec"] = time_fit
        mlflow.log_metrics({f"final.{k}": float(v) for k, v in metrics.items()})

        # When calibration was applied, log raw ECE for comparison.
        if calib_enabled and production_pipe is not pipe:
            ece_raw = compute_ece(
                y_holdout.to_numpy(), pipe.predict_proba(X_holdout), target_labels
            )
            mlflow.log_metric("final.ece_raw", float(ece_raw))
            mlflow.log_metric("final.ece_calibrated", metrics["ece"])
            logger.info(
                "Calibration ECE: raw=%.4f → calibrated=%.4f",
                ece_raw,
                metrics["ece"],
            )

        logger.info(
            "Final holdout evaluation: logloss=%.4f, accuracy=%.4f, ece=%.4f",
            metrics.get("logloss", float("nan")),
            metrics.get("accuracy", float("nan")),
            metrics.get("ece", float("nan")),
        )

        # ── Log model (production_pipe = calibrated if enabled, raw otherwise) ──
        int_cols = X_train.select_dtypes(include=["integer"]).columns.tolist()
        X_example = X_train.head(5).astype({c: "float64" for c in int_cols})
        model_info = mlflow.sklearn.log_model(
            sk_model=production_pipe,
            name=_ARTIFACT_PATH,
            pyfunc_predict_fn="predict_proba",
            input_example=X_example,
        )
        logger.info(
            "log_model done: run_id=%s artifact_uri=%s",
            run.info.run_id,
            model_info.model_uri,
        )

        # ── Artifacts ─────────────────────────────────────────────────────
        fig_cm = plot_confusion_matrix_multiclass(
            y_holdout, production_pipe.predict(X_holdout), target_labels
        )
        mlflow.log_figure(fig_cm, "confusion_matrix_final.png")
        plt.close(fig_cm)

        # Feature importance is extracted from the raw pipeline (the calibrated
        # wrapper does not expose feature_importances_ directly).
        df_imp = extract_feature_importance(pipe, X_cols)
        if df_imp is not None:
            mlflow.log_text(df_imp.to_csv(index=False), "feature_importances.csv")
            fig_imp = plot_feature_importance(
                df_imp,
                top_n=20,
                title=f"Feature importance — {model_name} final (top 20)",
            )
            mlflow.log_figure(fig_imp, "feature_importance_top20.png")
            plt.close(fig_imp)

        proba_holdout = production_pipe.predict_proba(X_holdout)
        df_proba = pd.DataFrame(
            proba_holdout,
            columns=[f"proba_class_{c}" for c in target_labels],
        )
        df_proba["y_true"] = y_holdout.values
        mlflow.log_text(df_proba.to_csv(index=False), "holdout_probabilities.csv")

        segment_cols_available = _seg_cols
        if segment_cols_available and _df_holdout_seg is not None:
            df_seg = compute_segment_metrics(
                y_true=y_holdout,
                proba=proba_holdout,
                labels=target_labels,
                segments=_df_holdout_seg,
                segment_cols=segment_cols_available,
            )
            mlflow.log_text(df_seg.to_csv(index=False), "segment_metrics.csv")
            logger.info(
                "Logged segment metrics for %d segment column(s)",
                len(segment_cols_available),
            )

        label_names = {0: "Home win", 1: "Draw", 2: "Away win"}
        fig_cal = plot_calibration_curves(
            y_true=y_holdout,
            proba=proba_holdout,
            labels=target_labels,
            label_names={lbl: label_names.get(lbl, str(lbl)) for lbl in target_labels},
        )
        mlflow.log_figure(fig_cal, "calibration_curves.png")
        plt.close(fig_cal)

        # Reference feature snapshot for drift monitoring.
        # A stratified 10 000-row sample of training features is logged as a
        # parquet artifact so the monitor_drift stage can use it as its baseline.
        _ref_size = min(10_000, len(X_train))
        X_ref = X_train.sample(n=_ref_size, random_state=42, replace=False)
        with tempfile.TemporaryDirectory() as _tmp:
            _ref_path = Path(_tmp) / "reference_features.parquet"
            X_ref.to_parquet(_ref_path, index=False)
            mlflow.log_artifact(str(_ref_path), artifact_path="reference")
        logger.info("Logged reference_features.parquet (%d rows) to MLflow.", _ref_size)

        return run.info.run_id, model_info.model_uri

ROI Simulation (Evaluation)

ROI simulation for portfolio evaluation.

Pure functions — no IO, no MLflow, no side effects. Called from the evaluation pipeline stage; IO is at the caller boundary.

Design notes

  • No real betting odds are available in the dataset (WhoScored does not provide them). The baseline is therefore a uniform prior (1/3 per outcome), which represents the "naive bettor who bets every match equally".
  • The Kelly-inspired flat-stake simulation places 1 unit on the outcome with the highest model-implied edge over the reference probability.
  • Results are clearly labelled as a simulation on historical data; they carry no implication of live profitability.
  • If market odds become available (e.g. from a Pinnacle / Betfair feed), pass them as reference_proba to get a realistic edge estimate.

compute_flat_stake_roi(y_true, model_proba, label_order, reference_proba=None, actual_odds=None, stake=1.0)

Simulate flat-stake ROI: bet on outcome where model edge > 0.

The model bets stake on the outcome with the largest positive edge (model_proba - reference_proba). If no outcome has positive edge for a match, no bet is placed.

Parameters

y_true: True match outcomes, same encoding as label_order. model_proba: Model predicted probabilities, shape (n, n_classes). label_order: Ordered list of class labels matching model_proba columns. reference_proba: Reference probabilities used to compute edge. Defaults to uniform (1 / n_classes) when None. actual_odds: Real bookmaker decimal odds, shape (n, n_classes) aligned to model_proba. When provided, payout = stake * decimal_odd (realistic). When None, payout = stake / model_proba (optimistic model-implied). NaN values fall back to model-implied. stake: Flat stake per bet (in abstract units).

Returns

dict with keys: - n_matches — total matches in slice - n_bets — matches where edge > 0 - bet_rate — n_bets / n_matches - n_correct_bets — bets placed on the correct outcome - hit_rate — correct bets / total bets - total_staked — n_bets * stake - gross_return — sum of payouts for winning bets - net_profit — gross_return - total_staked - roi_pct — net_profit / total_staked * 100

Source code in src/models/evaluation/roi_simulation.py
def compute_flat_stake_roi(
    y_true: np.ndarray,
    model_proba: np.ndarray,
    label_order: list,
    reference_proba: np.ndarray | None = None,
    actual_odds: np.ndarray | None = None,
    stake: float = 1.0,
) -> dict[str, float]:
    """Simulate flat-stake ROI: bet on outcome where model edge > 0.

    The model bets ``stake`` on the outcome with the largest positive edge
    (``model_proba - reference_proba``).  If no outcome has positive edge for
    a match, no bet is placed.

    Parameters
    ----------
    y_true:
        True match outcomes, same encoding as ``label_order``.
    model_proba:
        Model predicted probabilities, shape ``(n, n_classes)``.
    label_order:
        Ordered list of class labels matching ``model_proba`` columns.
    reference_proba:
        Reference probabilities used to compute edge.  Defaults to uniform
        (``1 / n_classes``) when ``None``.
    actual_odds:
        Real bookmaker decimal odds, shape ``(n, n_classes)`` aligned to
        ``model_proba``.  When provided, payout = ``stake * decimal_odd``
        (realistic).  When ``None``, payout = ``stake / model_proba``
        (optimistic model-implied).  NaN values fall back to model-implied.
    stake:
        Flat stake per bet (in abstract units).

    Returns
    -------
    dict with keys:
        - ``n_matches``       — total matches in slice
        - ``n_bets``          — matches where edge > 0
        - ``bet_rate``        — n_bets / n_matches
        - ``n_correct_bets``  — bets placed on the correct outcome
        - ``hit_rate``        — correct bets / total bets
        - ``total_staked``    — n_bets * stake
        - ``gross_return``    — sum of payouts for winning bets
        - ``net_profit``      — gross_return - total_staked
        - ``roi_pct``         — net_profit / total_staked * 100
    """
    n_classes = len(label_order)
    label_to_idx = {lbl: i for i, lbl in enumerate(label_order)}

    if reference_proba is None:
        reference_proba = np.full_like(model_proba, fill_value=1.0 / n_classes)

    edge = model_proba - reference_proba  # shape (n, n_classes)
    best_edge_idx = np.argmax(edge, axis=1)
    best_edge_val = edge[np.arange(len(edge)), best_edge_idx]

    bet_mask = best_edge_val > 0
    n_bets = int(bet_mask.sum())

    if n_bets == 0:
        return {
            "n_matches": len(y_true),
            "n_bets": 0,
            "bet_rate": 0.0,
            "n_correct_bets": 0,
            "hit_rate": float("nan"),
            "total_staked": 0.0,
            "gross_return": 0.0,
            "net_profit": 0.0,
            "roi_pct": float("nan"),
        }

    y_true_arr = np.asarray(y_true)
    true_idx = np.array([label_to_idx[lbl] for lbl in y_true_arr])

    bet_proba = model_proba[np.arange(len(model_proba)), best_edge_idx]
    correct_bets = (best_edge_idx == true_idx) & bet_mask

    total_staked = n_bets * stake

    if actual_odds is not None:
        # Realistic payout: stake * bookmaker decimal odd.
        # Rows with NaN actual_odds (unmatched) fall back to model-implied payout.
        bet_actual = actual_odds[np.arange(len(actual_odds)), best_edge_idx]
        real_win_mask = correct_bets & bet_mask & ~np.isnan(bet_actual)
        model_win_mask = correct_bets & bet_mask & np.isnan(bet_actual)
        gross_return = float(
            (stake * bet_actual[real_win_mask]).sum()
            + (stake / bet_proba[model_win_mask]).sum()
        )
    else:
        # Optimistic fallback: model sets its own payout (1 / model_proba).
        gross_return = float((stake / bet_proba[bet_mask & correct_bets]).sum())
    net_profit = gross_return - total_staked

    return {
        "n_matches": len(y_true),
        "n_bets": n_bets,
        "bet_rate": float(n_bets / len(y_true)),
        "n_correct_bets": int(correct_bets.sum()),
        "hit_rate": float(correct_bets[bet_mask].mean()),
        "total_staked": float(total_staked),
        "gross_return": gross_return,
        "net_profit": net_profit,
        "roi_pct": float(net_profit / total_staked * 100),
    }

compute_roi_by_segment(y_true, model_proba, label_order, segments, segment_col, reference_proba=None, actual_odds=None, stake=1.0, min_bets=10)

Compute flat-stake ROI per segment value (e.g. per league / region).

Only segments with at least min_bets placed bets are returned.

Parameters

y_true: True match outcomes, same encoding as label_order. model_proba: Model predicted probabilities, shape (n, n_classes). label_order: Ordered list of class labels matching model_proba columns. segments: DataFrame with segment columns aligned with y_true. segment_col: Column name in segments to group by. reference_proba: Reference probabilities used to compute edge. Defaults to uniform. actual_odds: Real bookmaker decimal odds, shape (n, n_classes). stake: Flat stake per bet. min_bets: Minimum number of placed bets for a segment to be included.

Returns

pd.DataFrame One row per qualifying segment value, with columns from compute_flat_stake_roi plus the segment identifier column.

Source code in src/models/evaluation/roi_simulation.py
def compute_roi_by_segment(
    y_true: np.ndarray,
    model_proba: np.ndarray,
    label_order: list,
    segments: pd.DataFrame,
    segment_col: str,
    reference_proba: np.ndarray | None = None,
    actual_odds: np.ndarray | None = None,
    stake: float = 1.0,
    min_bets: int = 10,
) -> pd.DataFrame:
    """Compute flat-stake ROI per segment value (e.g. per league / region).

    Only segments with at least ``min_bets`` placed bets are returned.

    Parameters
    ----------
    y_true:
        True match outcomes, same encoding as ``label_order``.
    model_proba:
        Model predicted probabilities, shape ``(n, n_classes)``.
    label_order:
        Ordered list of class labels matching ``model_proba`` columns.
    segments:
        DataFrame with segment columns aligned with *y_true*.
    segment_col:
        Column name in *segments* to group by.
    reference_proba:
        Reference probabilities used to compute edge.  Defaults to uniform.
    actual_odds:
        Real bookmaker decimal odds, shape ``(n, n_classes)``.
    stake:
        Flat stake per bet.
    min_bets:
        Minimum number of placed bets for a segment to be included.

    Returns
    -------
    pd.DataFrame
        One row per qualifying segment value, with columns from
        ``compute_flat_stake_roi`` plus the segment identifier column.
    """
    rows = []
    seg_vals = segments[segment_col].to_numpy()
    for val in np.unique(seg_vals[~pd.isnull(seg_vals)]):
        mask = seg_vals == val
        ref_slice = reference_proba[mask] if reference_proba is not None else None
        odds_slice = actual_odds[mask] if actual_odds is not None else None
        result = compute_flat_stake_roi(
            y_true=y_true[mask],
            model_proba=model_proba[mask],
            label_order=label_order,
            reference_proba=ref_slice,
            actual_odds=odds_slice,
            stake=stake,
        )
        if result["n_bets"] < min_bets:
            continue
        rows.append({segment_col: val, **result})
    return pd.DataFrame(rows)

compute_roi_by_threshold(y_true, model_proba, label_order, reference_proba=None, actual_odds=None, thresholds=None, stake=1.0, min_bets=20)

Compute ROI for progressively stricter edge thresholds.

For each threshold t, only bets where best_edge > t are placed. Helps identify the minimum edge required for positive expected value.

Parameters

thresholds: List of minimum-edge values to test. Defaults to [0.0, 0.02, 0.05, 0.10, 0.15, 0.20, 0.25, 0.30]. min_bets: Skip a threshold row if fewer than this many bets would be placed.

Source code in src/models/evaluation/roi_simulation.py
def compute_roi_by_threshold(
    y_true: np.ndarray,
    model_proba: np.ndarray,
    label_order: list,
    reference_proba: np.ndarray | None = None,
    actual_odds: np.ndarray | None = None,
    thresholds: list[float] | None = None,
    stake: float = 1.0,
    min_bets: int = 20,
) -> pd.DataFrame:
    """Compute ROI for progressively stricter edge thresholds.

    For each threshold ``t``, only bets where ``best_edge > t`` are placed.
    Helps identify the minimum edge required for positive expected value.

    Parameters
    ----------
    thresholds:
        List of minimum-edge values to test.  Defaults to
        [0.0, 0.02, 0.05, 0.10, 0.15, 0.20, 0.25, 0.30].
    min_bets:
        Skip a threshold row if fewer than this many bets would be placed.
    """
    if thresholds is None:
        thresholds = [0.0, 0.02, 0.05, 0.10, 0.15, 0.20, 0.25, 0.30]

    n_classes = len(label_order)
    _ref = (
        np.where(np.isnan(reference_proba), 1.0 / n_classes, reference_proba)
        if reference_proba is not None
        else np.full_like(model_proba, 1.0 / n_classes)
    )
    edge = model_proba - _ref
    best_edge_idx = np.argmax(edge, axis=1)
    best_edge_val = edge[np.arange(len(edge)), best_edge_idx]

    n_total = len(best_edge_val)
    rows = []
    for t in thresholds:
        mask = best_edge_val > t
        n_bets = int(mask.sum())
        if n_bets < min_bets:
            continue
        result = compute_flat_stake_roi(
            y_true=y_true[mask],
            model_proba=model_proba[mask],
            label_order=label_order,
            reference_proba=reference_proba[mask]
            if reference_proba is not None
            else None,
            actual_odds=actual_odds[mask] if actual_odds is not None else None,
            stake=stake,
        )
        # Override n_matches/n_bets/bet_rate to reflect coverage over the full input,
        # not just the pre-filtered slice (where bet_rate would be trivially 1.0).
        result["n_matches"] = n_total
        result["n_bets"] = n_bets
        result["bet_rate"] = n_bets / n_total
        rows.append({"min_edge": t, **result})
    return pd.DataFrame(rows)

vig_strip(odds)

Convert decimal odds matrix to vig-stripped (fair) probabilities.

Parameters

odds: Decimal odds, shape (n, n_classes). NaN values are propagated.

Returns

np.ndarray of shape (n, n_classes) where each row sums to 1.0.

Source code in src/models/evaluation/roi_simulation.py
def vig_strip(odds: np.ndarray) -> np.ndarray:
    """Convert decimal odds matrix to vig-stripped (fair) probabilities.

    Parameters
    ----------
    odds:
        Decimal odds, shape ``(n, n_classes)``.  NaN values are propagated.

    Returns
    -------
    np.ndarray of shape ``(n, n_classes)`` where each row sums to 1.0.
    """
    raw = 1.0 / odds
    overround = np.nansum(raw, axis=1, keepdims=True)
    # avoid division by zero for all-NaN rows
    overround = np.where(overround == 0, np.nan, overround)
    return raw / overround

compute_kelly_roi(y_true, model_proba, label_order, actual_odds, fraction=0.25, initial_bankroll=100.0, min_edge=0.02)

Fractional Kelly staking simulation with real market odds.

Kelly formula per bet: f = edge / (odds - 1) where edge = model_proba - market_proba (vig-stripped). Actual stake = fraction * f * current_bankroll (fractional Kelly). Bankroll is updated after each bet. No bet is placed when edge <= min_edge or odds are NaN.

Parameters

y_true: True match outcomes encoded with label_order. model_proba: Model probabilities, shape (n, n_classes). label_order: Class labels aligned with model_proba columns. actual_odds: Bookmaker decimal odds, shape (n, n_classes). Rows with all-NaN odds are skipped. fraction: Kelly fraction (0 < fraction ≤ 1). Default 0.25 (quarter-Kelly). initial_bankroll: Starting bankroll in abstract units. min_edge: Minimum edge required to place a bet.

Returns

dict with: n_matches, n_bets, bet_rate, n_correct_bets, hit_rate, total_staked, gross_return, net_profit, roi_pct, final_bankroll, bankroll_growth_pct.

Source code in src/models/evaluation/roi_simulation.py
def compute_kelly_roi(
    y_true: np.ndarray,
    model_proba: np.ndarray,
    label_order: list,
    actual_odds: np.ndarray,
    fraction: float = 0.25,
    initial_bankroll: float = 100.0,
    min_edge: float = 0.02,
) -> dict[str, float]:
    """Fractional Kelly staking simulation with real market odds.

    Kelly formula per bet: ``f = edge / (odds - 1)`` where
    ``edge = model_proba - market_proba`` (vig-stripped).
    Actual stake = ``fraction * f * current_bankroll`` (fractional Kelly).
    Bankroll is updated after each bet. No bet is placed when
    ``edge <= min_edge`` or odds are NaN.

    Parameters
    ----------
    y_true:
        True match outcomes encoded with ``label_order``.
    model_proba:
        Model probabilities, shape ``(n, n_classes)``.
    label_order:
        Class labels aligned with ``model_proba`` columns.
    actual_odds:
        Bookmaker decimal odds, shape ``(n, n_classes)``.  Rows with all-NaN
        odds are skipped.
    fraction:
        Kelly fraction (0 < fraction ≤ 1). Default 0.25 (quarter-Kelly).
    initial_bankroll:
        Starting bankroll in abstract units.
    min_edge:
        Minimum edge required to place a bet.

    Returns
    -------
    dict with:
        ``n_matches``, ``n_bets``, ``bet_rate``, ``n_correct_bets``,
        ``hit_rate``, ``total_staked``, ``gross_return``, ``net_profit``,
        ``roi_pct``, ``final_bankroll``, ``bankroll_growth_pct``.
    """
    label_to_idx = {lbl: i for i, lbl in enumerate(label_order)}

    market_proba = vig_strip(actual_odds)
    edge = model_proba - market_proba  # (n, n_classes)
    best_idx = np.argmax(edge, axis=1)
    best_edge = edge[np.arange(len(edge)), best_idx]
    best_odds = actual_odds[np.arange(len(actual_odds)), best_idx]

    bet_mask = (best_edge > min_edge) & ~np.isnan(best_odds)
    n_bets = int(bet_mask.sum())

    if n_bets == 0:
        return {
            "n_matches": len(y_true),
            "n_bets": 0,
            "bet_rate": 0.0,
            "n_correct_bets": 0,
            "hit_rate": float("nan"),
            "total_staked": 0.0,
            "gross_return": 0.0,
            "net_profit": 0.0,
            "roi_pct": float("nan"),
            "final_bankroll": float(initial_bankroll),
            "bankroll_growth_pct": 0.0,
        }

    y_true_arr = np.asarray(y_true)
    true_idx = np.array([label_to_idx[lbl] for lbl in y_true_arr])

    bankroll = float(initial_bankroll)
    total_staked = 0.0
    gross_return = 0.0
    n_correct = 0

    bet_indices = np.where(bet_mask)[0]
    for i in bet_indices:
        o = float(best_odds[i])
        e = float(best_edge[i])
        # Kelly fraction: f* = edge / (odds - 1)
        kelly_f = e / (o - 1.0) if o > 1.0 else 0.0
        kelly_f = max(0.0, min(kelly_f, 1.0))  # clamp to [0, 1]
        stake = fraction * kelly_f * bankroll

        if stake < 1e-8:
            continue

        total_staked += stake
        if best_idx[i] == true_idx[i]:
            payout = stake * o
            gross_return += payout
            bankroll += payout - stake
            n_correct += 1
        else:
            bankroll -= stake

    net_profit = gross_return - total_staked

    return {
        "n_matches": len(y_true),
        "n_bets": n_bets,
        "bet_rate": float(n_bets / len(y_true)),
        "n_correct_bets": n_correct,
        "hit_rate": float(n_correct / n_bets) if n_bets > 0 else float("nan"),
        "total_staked": float(total_staked),
        "gross_return": float(gross_return),
        "net_profit": float(net_profit),
        "roi_pct": float(net_profit / total_staked * 100)
        if total_staked > 0
        else float("nan"),
        "final_bankroll": float(bankroll),
        "bankroll_growth_pct": float(
            (bankroll - initial_bankroll) / initial_bankroll * 100
        ),
    }

compute_roi_timeseries(y_true, model_proba, label_order, actual_odds, dates=None, min_edge=0.0, stake=1.0)

Build a bet-by-bet P&L record sorted chronologically.

Each row in the output corresponds to one match where a bet was placed (edge > min_edge and odds are available). Accumulating this DataFrame over time gives the strategy's running P&L curve.

Parameters

y_true: True match outcomes encoded with label_order. model_proba: Model probabilities, shape (n, n_classes). label_order: Class labels aligned with model_proba columns. actual_odds: Bookmaker decimal odds, shape (n, n_classes). dates: Optional array of match dates (datetime or str) for chronological ordering. When None, input order is preserved. min_edge: Minimum edge to place a bet. stake: Flat stake per bet.

Returns

pd.DataFrame with columns: date, bet_outcome (label), true_outcome (label), model_proba_bet, market_proba_bet, edge, odds, stake, payout, profit, cumulative_profit, cumulative_staked, cumulative_roi_pct, cumulative_bets.

Source code in src/models/evaluation/roi_simulation.py
def compute_roi_timeseries(
    y_true: np.ndarray,
    model_proba: np.ndarray,
    label_order: list,
    actual_odds: np.ndarray,
    dates: np.ndarray | None = None,
    min_edge: float = 0.0,
    stake: float = 1.0,
) -> pd.DataFrame:
    """Build a bet-by-bet P&L record sorted chronologically.

    Each row in the output corresponds to one match where a bet was placed
    (edge > min_edge and odds are available).  Accumulating this DataFrame
    over time gives the strategy's running P&L curve.

    Parameters
    ----------
    y_true:
        True match outcomes encoded with ``label_order``.
    model_proba:
        Model probabilities, shape ``(n, n_classes)``.
    label_order:
        Class labels aligned with ``model_proba`` columns.
    actual_odds:
        Bookmaker decimal odds, shape ``(n, n_classes)``.
    dates:
        Optional array of match dates (datetime or str) for chronological
        ordering.  When ``None``, input order is preserved.
    min_edge:
        Minimum edge to place a bet.
    stake:
        Flat stake per bet.

    Returns
    -------
    pd.DataFrame with columns:
        ``date``, ``bet_outcome`` (label), ``true_outcome`` (label),
        ``model_proba_bet``, ``market_proba_bet``, ``edge``, ``odds``,
        ``stake``, ``payout``, ``profit``,
        ``cumulative_profit``, ``cumulative_staked``, ``cumulative_roi_pct``,
        ``cumulative_bets``.
    """
    label_to_idx = {lbl: i for i, lbl in enumerate(label_order)}

    market_proba = vig_strip(actual_odds)
    edge_mat = model_proba - market_proba
    best_idx = np.argmax(edge_mat, axis=1)
    best_edge = edge_mat[np.arange(len(edge_mat)), best_idx]
    best_odds = actual_odds[np.arange(len(actual_odds)), best_idx]

    bet_mask = (best_edge > min_edge) & ~np.isnan(best_odds)

    y_true_arr = np.asarray(y_true)
    true_idx = np.array([label_to_idx[lbl] for lbl in y_true_arr])

    rows = []
    for i in np.where(bet_mask)[0]:
        correct = best_idx[i] == true_idx[i]
        o = float(best_odds[i])
        payout = stake * o if correct else 0.0
        rows.append(
            {
                "date": dates[i] if dates is not None else None,
                "bet_outcome": label_order[best_idx[i]],
                "true_outcome": y_true_arr[i],
                "model_proba_bet": float(model_proba[i, best_idx[i]]),
                "market_proba_bet": float(market_proba[i, best_idx[i]]),
                "edge": float(best_edge[i]),
                "odds": o,
                "stake": float(stake),
                "payout": payout,
                "profit": payout - stake,
            }
        )

    if not rows:
        return pd.DataFrame(
            columns=[
                "date",
                "bet_outcome",
                "true_outcome",
                "model_proba_bet",
                "market_proba_bet",
                "edge",
                "odds",
                "stake",
                "payout",
                "profit",
                "cumulative_profit",
                "cumulative_staked",
                "cumulative_roi_pct",
                "cumulative_bets",
            ]
        )

    df = pd.DataFrame(rows)
    if dates is not None:
        df = df.sort_values("date").reset_index(drop=True)

    df["cumulative_bets"] = np.arange(1, len(df) + 1)
    df["cumulative_profit"] = df["profit"].cumsum()
    df["cumulative_staked"] = df["stake"].cumsum()
    df["cumulative_roi_pct"] = df["cumulative_profit"] / df["cumulative_staked"] * 100

    return df