Skip to content

Data

Parameters

load_params(params_path='params.yaml')

Load and return the project params.yaml as a dict.

Parameters:

Name Type Description Default
params_path str | Path

Path to the params file. Relative paths are resolved against the project root (two levels above src/data/), making the function safe to call from any working directory.

'params.yaml'

Returns:

Type Description
dict[str, Any]

Parsed YAML content as a plain dict.

Raises:

Type Description
FileNotFoundError

When the resolved path does not exist.

ValueError

When the file does not contain a YAML mapping.

Source code in src/data/params.py
def load_params(params_path: str | Path = "params.yaml") -> dict[str, Any]:
    """Load and return the project params.yaml as a dict.

    Args:
        params_path: Path to the params file.  Relative paths are resolved
            against the project root (two levels above ``src/data/``),
            making the function safe to call from any working directory.

    Returns:
        Parsed YAML content as a plain ``dict``.

    Raises:
        FileNotFoundError: When the resolved path does not exist.
        ValueError: When the file does not contain a YAML mapping.
    """
    path = Path(params_path)
    if not path.is_absolute():
        path = (_PROJECT_ROOT / path).resolve()

    if not path.exists():
        raise FileNotFoundError(
            f"params file not found: {path}\n"
            f"  (resolved relative to project root: {_PROJECT_ROOT})\n"
            f"  (cwd: {Path.cwd()})"
        )

    with path.open("r", encoding="utf-8") as f:
        data = yaml.safe_load(f) or {}

    if not isinstance(data, dict):
        raise ValueError(f"params file must be a mapping, got: {type(data).__name__}")

    return data

Source

load_data_from_source(output_path_matches, output_path_matches_raw)

Download match data files from the configured MinIO source.

Calls export_data_raw for each path; files are only re-downloaded when the remote ETag or size has changed.

Parameters:

Name Type Description Default
output_path_matches Path

Local destination for the processed matches parquet file.

required
output_path_matches_raw Path

Local destination for the raw matches parquet file.

required
Source code in src/data/source.py
def load_data_from_source(
    output_path_matches: Path, output_path_matches_raw: Path
) -> None:
    """Download match data files from the configured MinIO source.

    Calls ``export_data_raw`` for each path; files are only
    re-downloaded when the remote ETag or size has changed.

    Args:
        output_path_matches: Local destination for the processed
            matches parquet file.
        output_path_matches_raw: Local destination for the raw
            matches parquet file.
    """
    list_files = [
        output_path_matches,
        output_path_matches_raw,
    ]
    for file in list_files:
        export_data_raw(file)

Preprocessing

export_matches_metadata(df_match_raw, metadata_path)

Export ID-to-name mapping JSON files from a raw match DataFrame.

Writes one JSON file per entity type (tournamentId, regionId, stageId, seasonId, homeTeamId, awayTeamId, homeTeamCountryName, awayTeamCountryName) to metadata_path. Each file maps the numeric ID (or name) to its corresponding string name or code.

Parameters:

Name Type Description Default
df_match_raw DataFrame

Raw match DataFrame containing name columns such as tournamentName, regionName, etc.

required
metadata_path Path

Directory where JSON files are written.

required
Source code in src/data/preprocess.py
def export_matches_metadata(df_match_raw: pd.DataFrame, metadata_path: Path) -> None:
    """Export ID-to-name mapping JSON files from a raw match DataFrame.

    Writes one JSON file per entity type (tournamentId, regionId,
    stageId, seasonId, homeTeamId, awayTeamId, homeTeamCountryName,
    awayTeamCountryName) to ``metadata_path``.  Each file maps the
    numeric ID (or name) to its corresponding string name or code.

    Args:
        df_match_raw: Raw match DataFrame containing name columns such
            as ``tournamentName``, ``regionName``, etc.
        metadata_path: Directory where JSON files are written.
    """

    def save_meta_name_mapping(df: pd.DataFrame, names: list[str]) -> None:
        """Write an id-to-name mapping JSON for the given column pair."""
        mapping = dict(df[names].drop_duplicates(names[0]).fillna("").values)

        with open(metadata_path / f"{names[0]}.json", "w", encoding="utf-8") as f:
            json.dump(mapping, f, ensure_ascii=False, indent=2)

    save_meta_name_mapping(df_match_raw, ["stageId", "stageName"])
    save_meta_name_mapping(df_match_raw, ["tournamentId", "tournamentName"])
    save_meta_name_mapping(df_match_raw, ["regionId", "regionName"])  # regionCode
    save_meta_name_mapping(df_match_raw, ["seasonId", "seasonName"])
    save_meta_name_mapping(df_match_raw, ["homeTeamId", "homeTeamName"])
    save_meta_name_mapping(df_match_raw, ["awayTeamId", "awayTeamName"])
    save_meta_name_mapping(df_match_raw, ["homeTeamCountryName", "homeTeamCountryCode"])
    save_meta_name_mapping(df_match_raw, ["awayTeamCountryName", "awayTeamCountryCode"])
    logger.info("Metadata exported successfully.")

load_matches_metadata(metadata_path)

Load all ID-to-name mapping JSONs from the metadata directory.

Parameters:

Name Type Description Default
metadata_path Path

Directory containing the JSON files produced by export_matches_metadata.

required

Returns:

Type Description
dict

Dict keyed by entity type (e.g. "tournamentId") whose

dict

values are the corresponding id-to-name mapping dicts.

Source code in src/data/preprocess.py
def load_matches_metadata(metadata_path: Path) -> dict:
    """Load all ID-to-name mapping JSONs from the metadata directory.

    Args:
        metadata_path: Directory containing the JSON files produced by
            ``export_matches_metadata``.

    Returns:
        Dict keyed by entity type (e.g. ``"tournamentId"``) whose
        values are the corresponding id-to-name mapping dicts.
    """

    def load_meta_name_mapping(name: str) -> dict:
        """Read a single id-to-name mapping JSON from the metadata directory."""
        with open(metadata_path / f"{name}.json", "r", encoding="utf-8") as f:
            mapping = json.load(f)
        return mapping

    metadata = {
        "stageId": load_meta_name_mapping("stageId"),
        "tournamentId": load_meta_name_mapping("tournamentId"),
        "regionId": load_meta_name_mapping("regionId"),
        "seasonId": load_meta_name_mapping("seasonId"),
        "homeTeamId": load_meta_name_mapping("homeTeamId"),
        "awayTeamId": load_meta_name_mapping("awayTeamId"),
        "homeTeamCountryName": load_meta_name_mapping("homeTeamCountryName"),
        "awayTeamCountryName": load_meta_name_mapping("awayTeamCountryName"),
    }
    return metadata

preprocess_and_split(df_matches, score_outlier_pct=0.9999, reference_date=None)

Preprocess raw match data and split into finished and future sets.

Drops irrelevant columns, downcasts dtypes, derives classification and regression targets, clips extreme score outliers, and splits by match status (6=finished, 1=upcoming).

Parameters:

Name Type Description Default
df_matches DataFrame

Raw match DataFrame from match_raw.parquet.

required
score_outlier_pct float

Upper quantile threshold for clipping homeScore and awayScore (e.g. 0.9999 removes the top 0.01% of scores, which are typically forfeits).

0.9999
reference_date datetime | None

Override for "now" used only in logging. Defaults to UTC now minus 3 hours.

None

Returns:

Type Description
Tuple of (df_finished, df_future
-df_finished

status=6 matches with targets and downcast dtypes.

-df_future

status=1 matches without score/target columns.

Source code in src/data/preprocess.py
def preprocess_and_split(
    df_matches: pd.DataFrame,
    score_outlier_pct: float = 0.9999,
    reference_date: dt.datetime | None = None,
) -> tuple[pd.DataFrame, pd.DataFrame]:
    """Preprocess raw match data and split into finished and future sets.

    Drops irrelevant columns, downcasts dtypes, derives classification
    and regression targets, clips extreme score outliers, and splits by
    match status (6=finished, 1=upcoming).

    Args:
        df_matches: Raw match DataFrame from ``match_raw.parquet``.
        score_outlier_pct: Upper quantile threshold for clipping
            homeScore and awayScore (e.g. 0.9999 removes the top
            0.01% of scores, which are typically forfeits).
        reference_date: Override for "now" used only in logging.
            Defaults to UTC now minus 3 hours.

    Returns:
        Tuple of (df_finished, df_future):
        - df_finished: status=6 matches with targets and downcast
          dtypes.
        - df_future: status=1 matches without score/target columns.
    """
    df_matches.index = df_matches["id"]

    df_matches.drop(
        columns=[
            "stageName",
            "tournamentName",
            "regionName",
            "regionCode",
            "seasonName",
            "homeTeamName",
            "awayTeamName",
            "homeTeamCountryCode",
            "awayTeamCountryCode",
            "stageSortOrder",
            "tournamentSortOrder",
            "isOpta",
            "startTime",
            "navigationDisplayMode",
            "hasIncidentsSummary",
            "hasPreview",
            "scoreChangedAt",
            "elapsed",
            "lastScorer",
            "isTopMatch",
            "commentCount",
            "isLineupConfirmed",
            "isStreamAvailable",
            "matchIsOpta",
            "aggregateWinnerField",
            "winnerField",
            "period",
            "extraResultField",
            "startedAtUtc",
            "firstHalfEndedAtUtc",
            "secondHalfStartedAtUtc",
            "incidents",
            "bets",
            "matchArgs",
            "matchHeader",
        ],
        inplace=True,
    )

    df_matches["id"] = df_matches["id"].astype(int32)
    df_matches["sex"] = df_matches["sex"].astype(int8)
    df_matches["tournamentId"] = df_matches["tournamentId"].astype(int16)
    df_matches["stageId"] = df_matches["stageId"].astype(int16)
    df_matches["regionId"] = df_matches["regionId"].astype(int16)
    df_matches["seasonId"] = df_matches["seasonId"].astype(int16)
    df_matches["homeTeamId"] = df_matches["homeTeamId"].astype(int16)
    df_matches["awayTeamId"] = df_matches["awayTeamId"].astype(int16)
    df_matches["homeTeamCountryName"] = df_matches["homeTeamCountryName"].astype(
        "category"
    )
    df_matches["awayTeamCountryName"] = df_matches["awayTeamCountryName"].astype(
        "category"
    )

    df_matches["startTimeUtc"] = pd.to_datetime(df_matches["startTimeUtc"], utc=True)
    df_matches.sort_values(by=["startTimeUtc"], ascending=True, inplace=True)

    ids_finished = df_matches[df_matches["status"] == 6].index
    ids_future = df_matches[df_matches["status"] == 1].index
    df_matches.drop(columns=["status"], inplace=True)

    df_finished = df_matches.loc[ids_finished].copy()
    df_finished["homeScore"] = df_finished["homeScore"].astype(int8)
    df_finished["awayScore"] = df_finished["awayScore"].astype(int8)

    # target columns for regression ["homeScore","awayScore", "sumScore", "diffScore"]
    df_finished["sumScore"] = df_finished["homeScore"] + df_finished["awayScore"]
    df_finished["diffScore"] = df_finished["homeScore"] - df_finished["awayScore"]

    df_finished["extratime"] = (
        ~df_finished["homeExtratimeScore"].isna()
        | ~df_finished["awayExtratimeScore"].isna()
    )
    df_finished["penalty"] = (
        ~df_finished["homePenaltyScore"].isna()
        | ~df_finished["awayPenaltyScore"].isna()
    )
    df_finished["fulltime"] = ~(df_finished["extratime"] | df_finished["penalty"])

    # target columns for classification ["outcome_1x2"]
    # Derived BEFORE clipping so win/draw/loss depends only on which score is larger.
    df_finished["homeWin"] = df_finished["homeScore"] > df_finished["awayScore"]
    df_finished["awayWin"] = df_finished["awayScore"] > df_finished["homeScore"]
    df_finished["draw"] = df_finished["homeScore"] == df_finished["awayScore"]
    df_finished["outcome_1x2"] = df_finished[
        ["homeWin", "draw", "awayWin"]
    ].values.argmax(axis=1)
    df_finished["outcome_1x2"] = df_finished["outcome_1x2"].astype(int8)

    # Clip outlier scores — high-percentile threshold per column on the finished split.
    # Clips only the extreme tail (~0.01% of matches: forfeits/technical results);
    # matches are kept (not dropped) so team history stays continuous.
    for _score_col in ("homeScore", "awayScore"):
        _threshold = int(df_finished[_score_col].quantile(score_outlier_pct))
        _n_clipped = int((df_finished[_score_col] > _threshold).sum())
        df_finished[_score_col] = df_finished[_score_col].clip(upper=_threshold)
        if _n_clipped:
            logger.info(
                "Clipped %d rows in %s above threshold %d (pct=%.4f)",
                _n_clipped,
                _score_col,
                _threshold,
                score_outlier_pct,
            )

    cols_to_drop = [
        "homeExtratimeScore",
        "awayExtratimeScore",
        "homePenaltyScore",
        "awayPenaltyScore",
        "homeYellowCards",
        "awayYellowCards",
        "homeRedCards",
        "awayRedCards",
        "homeWin",
        "awayWin",
        "draw",
        "fulltime",
        "extratime",
        "penalty",
    ]
    df_finished.drop(columns=cols_to_drop, inplace=True)

    df_future = df_matches.loc[ids_future].copy()

    future_features_columns = [
        "homeScore",
        "awayScore",
        "homeExtratimeScore",
        "awayExtratimeScore",
        "homePenaltyScore",
        "awayPenaltyScore",
        "homeYellowCards",
        "awayYellowCards",
        "homeRedCards",
        "awayRedCards",
    ]
    df_future.drop(columns=future_features_columns, inplace=True)

    split_info = {
        "input_matches": len(df_matches),
        "finished_matches (status=6)": len(df_finished),
        "future_matches (status=1)": len(df_future),
        "deleted_matches": len(df_matches) - len(df_finished) - len(df_future),
        "sorted_by_startTimeUtc": True,
    }
    logger.info(
        "matches data split info:\n%s",
        json.dumps(split_info, indent=2, ensure_ascii=False),
    )

    targets_info = {
        "target_col_classification": "outcome_1x2",
        "target_cols_regression": ["homeScore", "awayScore", "sumScore", "diffScore"],
    }
    logger.debug(
        "targets info: %s",
        json.dumps(targets_info, ensure_ascii=False),
    )

    start_matches_range = df_finished["startTimeUtc"].min()
    end_matches_range = df_finished["startTimeUtc"].max()

    if reference_date is not None:
        datetime_now = reference_date.replace(tzinfo=None)
    else:
        datetime_now = (
            dt.datetime.now(dt.timezone.utc) - dt.timedelta(hours=3)
        ).replace(tzinfo=None)

    start_future_matches_range = df_future["startTimeUtc"].min()
    end_future_matches_range = df_future["startTimeUtc"].max()

    logger.info(
        "Finished matches range from %s to %s",
        start_matches_range,
        end_matches_range,
    )
    logger.debug("Datetime now: %s", datetime_now.strftime("%Y-%m-%d %H:%M:%S"))
    logger.info(
        "Future matches range from %s to %s",
        start_future_matches_range,
        end_future_matches_range,
    )

    return df_finished, df_future

Splitting

split_time_based_on(df, date_test_start)

Split a match DataFrame into train/val and test sets by date.

All rows with startTimeUtc < date_test_start go to the train/val set; the remainder form the test set.

Parameters:

Name Type Description Default
df DataFrame

Match DataFrame with startTimeUtc and id columns.

required
date_test_start Timestamp

Cutoff timestamp (exclusive for train, inclusive for test).

required

Returns:

Type Description
DataFrame

Tuple of (df_train_ids, df_test_ids) — each contains only

DataFrame

the id and startTimeUtc columns.

Source code in src/data/splitting.py
def split_time_based_on(
    df: pd.DataFrame, date_test_start: pd.Timestamp
) -> tuple[pd.DataFrame, pd.DataFrame]:
    """Split a match DataFrame into train/val and test sets by date.

    All rows with ``startTimeUtc < date_test_start`` go to the
    train/val set; the remainder form the test set.

    Args:
        df: Match DataFrame with ``startTimeUtc`` and ``id`` columns.
        date_test_start: Cutoff timestamp (exclusive for train,
            inclusive for test).

    Returns:
        Tuple of (df_train_ids, df_test_ids) — each contains only
        the ``id`` and ``startTimeUtc`` columns.
    """
    train_val_mask = df["startTimeUtc"] < date_test_start
    test_mask = ~train_val_mask

    df_train_ids = df[train_val_mask][["id", "startTimeUtc"]].copy()
    df_test_ids = df[test_mask][["id", "startTimeUtc"]].copy()

    logger.info("Split: train/val=%d, test=%d", len(df_train_ids), len(df_test_ids))
    return df_train_ids, df_test_ids

make_year_folds(df_train_val, valid_years)

Create walk-forward CV fold definitions, one fold per calendar year.

For each year in valid_years, all data before that year is used as the training fold and the named year is used as the validation fold. Folds where either split is empty are silently skipped.

Parameters:

Name Type Description Default
df_train_val DataFrame

Training/validation DataFrame with a startTimeUtc column.

required
valid_years list[int]

List of calendar years to use as validation windows (e.g. [2022, 2023, 2024]).

required

Returns:

Type Description
DataFrame with columns

fold, year, train_start, train_end,

DataFrame

valid_start, valid_end, train_rows, valid_rows.

Source code in src/data/splitting.py
def make_year_folds(df_train_val: pd.DataFrame, valid_years: list[int]) -> pd.DataFrame:
    """Create walk-forward CV fold definitions, one fold per calendar year.

    For each year in ``valid_years``, all data before that year is used
    as the training fold and the named year is used as the validation
    fold.  Folds where either split is empty are silently skipped.

    Args:
        df_train_val: Training/validation DataFrame with a
            ``startTimeUtc`` column.
        valid_years: List of calendar years to use as validation
            windows (e.g. ``[2022, 2023, 2024]``).

    Returns:
        DataFrame with columns: fold, year, train_start, train_end,
        valid_start, valid_end, train_rows, valid_rows.
    """
    folds = []
    for i, y in enumerate(valid_years, start=1):
        valid_start = pd.Timestamp(f"{y}-01-01", tz="UTC")
        valid_end = pd.Timestamp(f"{y + 1}-01-01", tz="UTC")

        train_fold = df_train_val[df_train_val["startTimeUtc"] < valid_start]
        valid_fold = df_train_val[
            (df_train_val["startTimeUtc"] >= valid_start)
            & (df_train_val["startTimeUtc"] < valid_end)
        ]
        if len(train_fold) == 0 or len(valid_fold) == 0:
            continue

        folds.append(
            {
                "fold": i,
                "year": y,
                "train_start": train_fold["startTimeUtc"].min(),
                "train_end": train_fold["startTimeUtc"].max(),
                "valid_start": valid_fold["startTimeUtc"].min(),
                "valid_end": valid_fold["startTimeUtc"].max(),
                "train_rows": len(train_fold),
                "valid_rows": len(valid_fold),
            }
        )
    logger.info("Created %d folds for years: %s", len(folds), valid_years)
    return pd.DataFrame(folds)

Storage (DVC / MinIO)

create_client_s3()

Create a boto3 S3 client configured from environment variables.

Reads MINIO_ENDPOINT_URL, MINIO_USER, and MINIO_PASSWORD from the environment.

Returns:

Type Description
client

Configured boto3 S3 client pointed at the MinIO endpoint.

Source code in src/data/storage.py
def create_client_s3() -> boto3.client:
    """Create a boto3 S3 client configured from environment variables.

    Reads ``MINIO_ENDPOINT_URL``, ``MINIO_USER``, and
    ``MINIO_PASSWORD`` from the environment.

    Returns:
        Configured boto3 S3 client pointed at the MinIO endpoint.
    """
    s3_client = boto3.client(
        "s3",
        endpoint_url=os.environ["MINIO_ENDPOINT_URL"],
        aws_access_key_id=os.environ["MINIO_USER"],
        aws_secret_access_key=os.environ["MINIO_PASSWORD"],
    )
    return s3_client

get_file_from_minio(bucket_name, object_name, file_path=None)

Download a file from MinIO to local disk.

Parameters:

Name Type Description Default
bucket_name

Name of the MinIO bucket.

required
object_name

S3 key (path) of the object to download.

required
file_path

Local destination path. Defaults to object_name.

None

Returns:

Type Description

Local path where the file was saved.

Raises:

Type Description
Exception

Re-raises any boto3 download error after logging it.

Source code in src/data/storage.py
def get_file_from_minio(bucket_name, object_name, file_path=None):
    """Download a file from MinIO to local disk.

    Args:
        bucket_name: Name of the MinIO bucket.
        object_name: S3 key (path) of the object to download.
        file_path: Local destination path. Defaults to ``object_name``.

    Returns:
        Local path where the file was saved.

    Raises:
        Exception: Re-raises any boto3 download error after logging it.
    """
    if file_path is None:
        file_path = object_name

    s3_client = create_client_s3()

    try:
        s3_client.download_file(bucket_name, object_name, file_path)
        return file_path
    except Exception as e:
        logger.error("Failed to download %s from MinIO: %s", object_name, e)
        raise

get_metadata_from_minio(bucket_name, object_name)

Retrieve object metadata (HEAD) from MinIO without downloading.

Parameters:

Name Type Description Default
bucket_name

Name of the MinIO bucket.

required
object_name

S3 key (path) of the object.

required

Returns:

Type Description

HEAD response dict from boto3 (ETag, ContentLength,

LastModified, etc.).

Raises:

Type Description
Exception

Re-raises any boto3 error after logging it.

Source code in src/data/storage.py
def get_metadata_from_minio(bucket_name, object_name):
    """Retrieve object metadata (HEAD) from MinIO without downloading.

    Args:
        bucket_name: Name of the MinIO bucket.
        object_name: S3 key (path) of the object.

    Returns:
        HEAD response dict from boto3 (ETag, ContentLength,
        LastModified, etc.).

    Raises:
        Exception: Re-raises any boto3 error after logging it.
    """
    s3_client = create_client_s3()

    try:
        response = s3_client.head_object(Bucket=bucket_name, Key=object_name)
        return response
    except Exception as e:
        logger.error("Failed to retrieve metadata for %s: %s", object_name, e)
        raise

export_data_raw(local_path, bucket=None)

Download a raw data file from MinIO only when it has changed.

Compares ETag and size from a local .minio.json sidecar file against the remote object. Downloads and updates the sidecar only when the remote has changed or the local file is absent.

Parameters:

Name Type Description Default
local_path Path

Local destination path for the downloaded file.

required
bucket str | None

MinIO bucket name. Defaults to the MINIO_BUCKET_DATA_RAW environment variable.

None

Returns:

Type Description
dict

HEAD response dict for the remote object (always the live

dict

MinIO response, regardless of whether a download occurred).

Source code in src/data/storage.py
def export_data_raw(local_path: Path, bucket: str | None = None) -> dict:
    """Download a raw data file from MinIO only when it has changed.

    Compares ETag and size from a local ``.minio.json`` sidecar file
    against the remote object.  Downloads and updates the sidecar only
    when the remote has changed or the local file is absent.

    Args:
        local_path: Local destination path for the downloaded file.
        bucket: MinIO bucket name. Defaults to the
            ``MINIO_BUCKET_DATA_RAW`` environment variable.

    Returns:
        HEAD response dict for the remote object (always the live
        MinIO response, regardless of whether a download occurred).
    """
    s3 = create_client_s3()

    bucket = bucket or os.environ["MINIO_BUCKET_DATA_RAW"]

    local_path = Path(local_path).resolve()
    local_path.parent.mkdir(parents=True, exist_ok=True)

    key = local_path.name
    meta_path = local_path.with_suffix(local_path.suffix + ".minio.json")

    remote = s3.head_object(Bucket=bucket, Key=key)
    remote_etag = remote.get("ETag")
    remote_size = remote.get("ContentLength")
    remote_last_modified = remote.get("LastModified")

    local_meta = None
    if meta_path.exists():
        try:
            local_meta = json.loads(meta_path.read_text(encoding="utf-8"))
        except Exception:
            local_meta = None

    local_etag = (local_meta or {}).get("etag")
    local_size = (local_meta or {}).get("size")

    changed = (
        (not local_path.exists())
        or (local_etag != remote_etag)
        or (local_size != remote_size)
    )

    if changed:
        s3.download_file(bucket, key, str(local_path))
        meta_payload = {
            "etag": remote_etag,
            "size": remote_size,
            "last_modified": remote_last_modified.isoformat()
            if remote_last_modified
            else None,
            # bucket / key allow downstream stages to reconstruct the source URI.
            "bucket": bucket,
            "key": key,
            # ingested_at: wall-clock time when this pipeline stage pulled the object.
            # Distinct from last_modified (when the object was PUT in MinIO).
            "ingested_at": datetime.datetime.now(
                datetime.UTC
            ).isoformat(),  # timezone-aware UTC
        }
        meta_path.write_text(
            json.dumps(meta_payload, ensure_ascii=False, indent=2), encoding="utf-8"
        )

        logger.info("Downloaded/updated: %s", local_path)
    else:
        logger.debug("Up-to-date (no change): %s", local_path)

    return remote

get_dvc_hash(out_path, lock_path=None)

Look up the DVC MD5 hash for a pipeline output in dvc.lock.

Parameters:

Name Type Description Default
out_path str

DVC output path string as written in dvc.yaml (e.g. "data/interim/finished.parquet").

required
lock_path Path | None

Path to the dvc.lock file. Defaults to ./dvc.lock in the current working directory.

None

Returns:

Type Description
str

MD5 hash string for the matched output.

Raises:

Type Description
ValueError

If out_path is not found in dvc.lock.

Source code in src/data/storage.py
def get_dvc_hash(out_path: str, lock_path: Path | None = None) -> str:
    """Look up the DVC MD5 hash for a pipeline output in dvc.lock.

    Args:
        out_path: DVC output path string as written in ``dvc.yaml``
            (e.g. ``"data/interim/finished.parquet"``).
        lock_path: Path to the ``dvc.lock`` file. Defaults to
            ``./dvc.lock`` in the current working directory.

    Returns:
        MD5 hash string for the matched output.

    Raises:
        ValueError: If ``out_path`` is not found in ``dvc.lock``.
    """
    resolved = lock_path or Path("dvc.lock")

    with open(resolved) as f:
        meta = yaml.safe_load(f)

    for stage in meta.get("stages", {}).values():
        for out in stage.get("outs", []):
            if out.get("path") == out_path:
                return out["md5"]

    raise ValueError(f"Output {out_path} not found in dvc.lock")

ORM Models

Match

Bases: SQLModel

Compact match record used by the internal ORM (match table).

Source code in src/data/models.py
class Match(SQLModel, table=True):
    """Compact match record used by the internal ORM (``match`` table)."""

    __tablename__ = "match"

    # Tournament information
    tournamentId: int = Field(default=None, index=True)
    stageId: int = Field(default=None, index=True)
    regionId: int = Field(default=None, index=True)
    seasonId: int = Field(default=None, index=True)
    sex: int | None = None

    # Match information
    id: int = Field(primary_key=True)
    status: int = Field(default=None, index=True)
    startTime: datetime = Field(default=None, index=True)

    # Home team
    homeTeamId: int = Field(default=None, index=True)
    homeScore: int | None = None
    homeExtratimeScore: int | None = None
    homePenaltyScore: int | None = None

    # Away team
    awayTeamId: int = Field(default=None, index=True)
    awayScore: int | None = None
    awayExtratimeScore: int | None = None
    awayPenaltyScore: int | None = None

    # Match details
    elapsed: str | None = Field(default=None, max_length=50)
    startTimeUtc: datetime = Field(default=None, index=True)
    period: int | None = Field(default=None, index=True)

MatchRaw

Bases: SQLModel

Full raw match record mirroring the WhoScored livescores payload.

Source code in src/data/models.py
class MatchRaw(SQLModel, table=True):
    """Full raw match record mirroring the WhoScored livescores payload."""

    __tablename__ = "match_raw"

    # Tournament information
    tournamentId: int = Field(nullable=False)
    stageId: int = Field(nullable=False)
    stageName: Optional[str] = Field(default=None, max_length=255)
    regionId: Optional[int] = None
    tournamentName: Optional[str] = Field(default=None, max_length=255)
    seasonName: Optional[str] = Field(default=None, max_length=50)
    seasonId: Optional[int] = None
    stageSortOrder: Optional[int] = None
    sex: Optional[int] = None
    tournamentSortOrder: Optional[int] = None
    regionCode: Optional[str] = Field(default=None, max_length=10)
    regionName: Optional[str] = Field(default=None, max_length=100)
    isOpta: Optional[bool] = None
    navigationDisplayMode: Optional[int] = None

    # Match information
    id: int = Field(primary_key=True)
    status: Optional[int] = Field(default=None, index=True)
    startTime: Optional[datetime] = None

    # Home team
    homeTeamId: int = Field(nullable=False)
    homeTeamName: Optional[str] = Field(default=None, max_length=255)
    homeYellowCards: Optional[int] = None
    homeRedCards: Optional[int] = None
    homeTeamCountryCode: Optional[str] = Field(default=None, max_length=10)
    homeTeamCountryName: Optional[str] = Field(default=None, max_length=100)
    homeScore: Optional[int] = None
    homeExtratimeScore: Optional[int] = None
    homePenaltyScore: Optional[int] = None

    # Away team
    awayTeamId: int = Field(nullable=False)
    awayTeamName: Optional[str] = Field(default=None, max_length=255)
    awayYellowCards: Optional[int] = None
    awayRedCards: Optional[int] = None
    awayTeamCountryCode: Optional[str] = Field(default=None, max_length=10)
    awayTeamCountryName: Optional[str] = Field(default=None, max_length=100)
    awayScore: Optional[int] = None
    awayExtratimeScore: Optional[int] = None
    awayPenaltyScore: Optional[int] = None

    # Match details
    hasIncidentsSummary: Optional[bool] = None
    hasPreview: Optional[bool] = None
    scoreChangedAt: Optional[datetime] = None
    elapsed: Optional[str] = Field(default=None, max_length=50)
    lastScorer: Optional[int] = None
    isTopMatch: Optional[bool] = None
    commentCount: Optional[int] = None
    isLineupConfirmed: Optional[bool] = None
    isStreamAvailable: Optional[bool] = None
    matchIsOpta: Optional[bool] = None
    startTimeUtc: Optional[datetime] = None
    aggregateWinnerField: Optional[int] = None
    winnerField: Optional[int] = None
    period: Optional[int] = None
    extraResultField: Optional[int] = None

    # Match timeline
    startedAtUtc: Optional[datetime] = None
    firstHalfEndedAtUtc: Optional[datetime] = None
    secondHalfStartedAtUtc: Optional[datetime] = None

    # Additional data
    incidents: Optional[int] = None
    bets: Optional[int] = None
    matchArgs: Optional[bool] = None
    matchHeader: Optional[bool] = None

Scraper

create_webdriver()

Create a Selenoid Remote WebDriver with a randomised user-agent.

Returns:

Type Description
Remote

Configured webdriver.Remote instance connected to the

Remote

Selenoid server at get_server_for_scraper().

Source code in src/data/scraper/driver.py
def create_webdriver() -> webdriver.Remote:
    """Create a Selenoid Remote WebDriver with a randomised user-agent.

    Returns:
        Configured ``webdriver.Remote`` instance connected to the
        Selenoid server at ``get_server_for_scraper()``.
    """
    user_agents = [
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.81 Safari/537.36",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/95.0.4638.69 Safari/537.36",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.45 Safari/537.36",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36",
    ]

    webdriver_arguments = [
        "--locale=en_US",
        f"user-agent={random.choice(user_agents)}",
        "--disable-blink-features=AutomationControlled",
        "--disable-infobars",
        "--start-maximized",
        "--disable-extensions",
        "--profile-directory=Default",
        "--incognito",
        "--disable-plugins-discovery",
        "--disable-popup-blocking",
        "--disable-gpu",
        "--window-size=1920x1080",
        #
        "--no-sandbox",
        "--disable-dev-shm-usage",
        "--remote-debugging-port=9222",
        "--disable-web-security",
        "--allow-running-insecure-content",
        "--ignore-certificate-errors",
        "--disable-software-rasterizer",
        "--disable-background-timer-throttling",
        "--disable-backgrounding-occluded-windows",
        "--disable-renderer-backgrounding",
        "--mute-audio",
        "--disable-notifications",
        "--disable-translate",
        "--disable-features=IsolateOrigins,site-per-process",
        "--disable-sync",
    ]

    chrome_options = Options()

    for arg in webdriver_arguments:
        chrome_options.add_argument(arg)

    chrome_options.set_capability("browserVersion", "125.0")
    chrome_options.set_capability("selenoid:options", {"enableVNC": True})

    scraper_server = get_server_for_scraper()

    driver = webdriver.Remote(
        command_executor=f"http://{scraper_server}/wd/hub", options=chrome_options
    )
    return driver

get_server_for_scraper()

Return the Selenoid server IP from scraper settings.

Source code in src/data/scraper/driver.py
def get_server_for_scraper() -> str:
    """Return the Selenoid server IP from scraper settings."""
    return get_scraper_settings().server_ip

Livescores Validation

get_list_livescore_matches(livescores_raw)

Parse a raw WhoScored livescores payload into ORM objects.

Strips HTML tags from livescores_raw, validates the JSON against the Pydantic Whoscored.livescores schema, and returns two parallel lists of validated match objects.

Parameters:

Name Type Description Default
livescores_raw str

Raw HTML/JSON string from the livescores scraper.

required

Returns:

Type Description
list[Match]

Tuple of (matches, matches_raw) where each element is a list

list[MatchRaw]

of SQLModel instances ready to be upserted into the database.

Source code in src/data/validation/livescores.py
def get_list_livescore_matches(
    livescores_raw: str,
) -> tuple[list[Match], list[MatchRaw]]:
    """Parse a raw WhoScored livescores payload into ORM objects.

    Strips HTML tags from *livescores_raw*, validates the JSON against
    the Pydantic ``Whoscored.livescores`` schema, and returns two
    parallel lists of validated match objects.

    Args:
        livescores_raw: Raw HTML/JSON string from the livescores scraper.

    Returns:
        Tuple of (matches, matches_raw) where each element is a list
        of SQLModel instances ready to be upserted into the database.
    """
    json_content = re.sub(r"<[^>]+>", "", livescores_raw).strip()
    matches = []
    matches_raw = []

    if json_content:
        data = json.loads(json_content)
        try:
            validated_data = get_settings().whoscored.livescores(**data)
            for tournament in validated_data.tournaments:
                tournament_data = tournament.model_dump(exclude={"matches"})

                for match in tournament.matches:
                    match_data = match.model_dump(exclude={"incidents", "bets"})

                    add_data = {
                        "incidents": len(match.incidents),
                        "bets": 1 if match.bets else 0,
                    }

                    merged = {
                        **tournament_data,
                        **match_data,
                        **add_data,
                    }

                    matches.append(Match.model_validate(merged))
                    matches_raw.append(MatchRaw.model_validate(merged))

        except ValidationError as e:
            logger.warning("Validation error: %s", e)
    else:
        logger.warning("Empty JSON content in livescores payload")

    return matches, matches_raw

Odds (Football-Data.co.uk)

Download and normalize closing odds from football-data.co.uk.

Two data sources are supported:

  1. Standard leagues (mmz4281): URL: https://www.football-data.co.uk/mmz4281/{season}/{league_code}.csv Odds: B365H/D/A (Bet365 opening/closing)

  2. Extra leagues (/new/): URL: https://www.football-data.co.uk/new/{country_code}.csv Odds: PSCH/PSCD/PSCA (Pinnacle closing) — ~95% row coverage; B365CH/CD/CA (Bet365 closing) — ~5-15% row coverage Uses Pinnacle closing as p_home/p_draw/p_away reference. b365h/d/a set from B365CH/CD/CA where available, else NaN.

Output schema (one row per match): season : str — e.g. "2425" or "2024" (extra leagues) league_code : str — e.g. "E0" or "NOR" league_name : str — human-readable name from params.yaml date : datetime64 — match date (UTC midnight) home_team : str — home team name as per FDCO away_team : str — away team name as per FDCO ftr : str — full-time result (H/D/A), if available b365h : float — Bet365 decimal odds home win (NaN for extra w/o B365) b365d : float — Bet365 decimal odds draw (NaN for extra w/o B365) b365a : float — Bet365 decimal odds away win(NaN for extra w/o B365) vig : float — bookmaker margin (1/h + 1/d + 1/a) p_home : float — vig-stripped implied probability home win p_draw : float — vig-stripped implied probability draw p_away : float — vig-stripped implied probability away win

fetch_league_csv(season, league_code, timeout=30)

Download a single season/league CSV from football-data.co.uk.

Returns the raw DataFrame with all available columns. Raises requests.HTTPError on non-200 responses.

Source code in src/data/odds_fdco.py
def fetch_league_csv(season: str, league_code: str, timeout: int = 30) -> pd.DataFrame:
    """Download a single season/league CSV from football-data.co.uk.

    Returns the raw DataFrame with all available columns.
    Raises requests.HTTPError on non-200 responses.
    """
    url = f"{_BASE_URL}/{season}/{league_code}.csv"
    logger.info("Fetching %s", url)
    resp = requests.get(url, timeout=timeout, headers={"User-Agent": "Mozilla/5.0"})
    resp.raise_for_status()
    return pd.read_csv(io.StringIO(resp.text), encoding="latin-1")

normalize_fdco(df, season, league_code)

Extract and normalize key columns from a raw FDCO DataFrame.

Drops rows with missing Bet365 odds or team names. Computes vig-stripped implied probabilities.

Returns DataFrame with schema defined in module docstring. Returns empty DataFrame if any required odds column is missing.

Source code in src/data/odds_fdco.py
def normalize_fdco(df: pd.DataFrame, season: str, league_code: str) -> pd.DataFrame:
    """Extract and normalize key columns from a raw FDCO DataFrame.

    Drops rows with missing Bet365 odds or team names.
    Computes vig-stripped implied probabilities.

    Returns DataFrame with schema defined in module docstring.
    Returns empty DataFrame if any required odds column is missing.
    """
    missing_odds = [c for c in _ODDS_COLS if c not in df.columns]
    if missing_odds:
        logger.warning(
            "League %s/%s: missing odds columns %s — skipping",
            season,
            league_code,
            missing_odds,
        )
        return pd.DataFrame()

    available = [c for c in _KEY_COLS if c in df.columns]
    out = df[available].dropna(subset=_ODDS_COLS + ["HomeTeam", "AwayTeam"]).copy()

    rename_map: dict[str, str] = {
        "Date": "date",
        "HomeTeam": "home_team",
        "AwayTeam": "away_team",
        "FTR": "ftr",
        "B365H": "b365h",
        "B365D": "b365d",
        "B365A": "b365a",
    }
    out = out.rename(columns={k: v for k, v in rename_map.items() if k in out.columns})
    out["date"] = pd.to_datetime(out["date"], dayfirst=True, errors="coerce")
    out = out.dropna(subset=["date"])

    out["vig"] = 1.0 / out["b365h"] + 1.0 / out["b365d"] + 1.0 / out["b365a"]
    out["p_home"] = (1.0 / out["b365h"]) / out["vig"]
    out["p_draw"] = (1.0 / out["b365d"]) / out["vig"]
    out["p_away"] = (1.0 / out["b365a"]) / out["vig"]

    out["season"] = season
    out["league_code"] = league_code

    base_cols = ["season", "league_code", "date", "home_team", "away_team"]
    opt_cols = ["ftr"] if "ftr" in out.columns else []
    odds_cols = ["b365h", "b365d", "b365a", "vig", "p_home", "p_draw", "p_away"]

    return out[base_cols + opt_cols + odds_cols].reset_index(drop=True)

fetch_extra_league_csv(country_code, timeout=30)

Download the extra-league CSV for a country from football-data.co.uk/new/.

The /new/ endpoint returns a single file covering all available seasons. Raises requests.HTTPError on non-200 responses.

Source code in src/data/odds_fdco.py
def fetch_extra_league_csv(country_code: str, timeout: int = 30) -> pd.DataFrame:
    """Download the extra-league CSV for a country from football-data.co.uk/new/.

    The /new/ endpoint returns a single file covering all available seasons.
    Raises requests.HTTPError on non-200 responses.
    """
    url = f"{_EXTRA_BASE_URL}/{country_code}.csv"
    logger.info("Fetching %s", url)
    resp = requests.get(url, timeout=timeout, headers={"User-Agent": "Mozilla/5.0"})
    resp.raise_for_status()
    return pd.read_csv(io.StringIO(resp.text), encoding="latin-1")

normalize_fdco_extra(df, country_code, league_name, season_years)

Normalize a raw extra-league DataFrame from the /new/ endpoint.

Extra-league CSVs differ from standard ones
  • Team columns are Home/Away (not HomeTeam/AwayTeam).
  • Odds columns are Pinnacle closing (PSCH/PSCD/PSCA) and optionally Bet365 closing (B365CH/B365CD/B365CA).
  • A numeric Season column (year) is used for date filtering.

Reference probabilities are vig-stripped from Pinnacle closing odds. b365h/d/a are filled from B365CH/CD/CA where available, else NaN.

Parameters:

Name Type Description Default
df DataFrame

raw DataFrame from fetch_extra_league_csv.

required
country_code str

country code used as league_code (e.g. "NOR").

required
league_name str

human-readable name (e.g. "Norway Eliteserien").

required
season_years list[int]

list of calendar years to keep (e.g. [2023, 2024, 2025, 2026]).

required

Returns:

Type Description
DataFrame

Normalized DataFrame with the same schema as normalize_fdco.

DataFrame

Empty DataFrame if Pinnacle odds columns are absent.

Source code in src/data/odds_fdco.py
def normalize_fdco_extra(
    df: pd.DataFrame,
    country_code: str,
    league_name: str,
    season_years: list[int],
) -> pd.DataFrame:
    """Normalize a raw extra-league DataFrame from the /new/ endpoint.

    Extra-league CSVs differ from standard ones:
      - Team columns are ``Home``/``Away`` (not ``HomeTeam``/``AwayTeam``).
      - Odds columns are Pinnacle closing (PSCH/PSCD/PSCA) and optionally
        Bet365 closing (B365CH/B365CD/B365CA).
      - A numeric ``Season`` column (year) is used for date filtering.

    Reference probabilities are vig-stripped from Pinnacle closing odds.
    b365h/d/a are filled from B365CH/CD/CA where available, else NaN.

    Args:
        df: raw DataFrame from ``fetch_extra_league_csv``.
        country_code: country code used as ``league_code`` (e.g. "NOR").
        league_name: human-readable name (e.g. "Norway Eliteserien").
        season_years: list of calendar years to keep
            (e.g. [2023, 2024, 2025, 2026]).

    Returns:
        Normalized DataFrame with the same schema as ``normalize_fdco``.
        Empty DataFrame if Pinnacle odds columns are absent.
    """
    # Normalize BOM in column names (UTF-8 BOM read as latin-1 → 'Country')
    df = df.rename(columns=lambda c: c.lstrip("\ufeff\xef\xbb\xbf"))

    missing_ps = [c for c in _EXTRA_PS_COLS if c not in df.columns]
    if missing_ps:
        logger.warning(
            "Extra league %s: missing Pinnacle columns %s — skipping",
            country_code,
            missing_ps,
        )
        return pd.DataFrame()

    if "Home" not in df.columns or "Away" not in df.columns:
        logger.warning(
            "Extra league %s: missing Home/Away columns — skipping", country_code
        )
        return pd.DataFrame()

    out = df.copy()

    # Parse dates first, then filter by year range (Season column format varies
    # across countries: int 2024, str "2024/2025", str "2024" — date is reliable).
    out = out.rename(columns={"Home": "home_team", "Away": "away_team", "Date": "date"})
    out["date"] = pd.to_datetime(out["date"], dayfirst=True, errors="coerce")
    out = out.dropna(subset=["date"])

    if season_years:
        min_year = min(season_years)
        out = out[out["date"].dt.year >= min_year].copy()

    # Drop rows with missing Pinnacle odds or team names
    out = out.dropna(subset=_EXTRA_PS_COLS + ["home_team", "away_team"])
    if out.empty:
        return pd.DataFrame()

    # Pinnacle vig-stripped reference probabilities
    out["vig"] = 1.0 / out["PSCH"] + 1.0 / out["PSCD"] + 1.0 / out["PSCA"]
    out["p_home"] = (1.0 / out["PSCH"]) / out["vig"]
    out["p_draw"] = (1.0 / out["PSCD"]) / out["vig"]
    out["p_away"] = (1.0 / out["PSCA"]) / out["vig"]

    # Bet365 closing as actual payout odds (NaN where unavailable)
    for src, dst in zip(_EXTRA_B365C_COLS, ["b365h", "b365d", "b365a"]):
        out[dst] = out[src] if src in out.columns else float("nan")

    # FTR from Res column if present
    if "Res" in out.columns:
        res_map = {"H": "H", "D": "D", "A": "A"}
        out["ftr"] = out["Res"].map(res_map)

    out["season"] = (
        out["Season"].astype(str) if "Season" in out.columns else country_code
    )
    out["league_code"] = country_code
    out["league_name"] = league_name

    base_cols = ["season", "league_code", "date", "home_team", "away_team"]
    opt_cols = ["ftr"] if "ftr" in out.columns else []
    odds_cols = ["b365h", "b365d", "b365a", "vig", "p_home", "p_draw", "p_away"]

    return out[base_cols + opt_cols + odds_cols].reset_index(drop=True)

load_odds_fdco(output_path, seasons, leagues, extra_leagues=None)

Download, normalize, and save closing odds to parquet.

Parameters:

Name Type Description Default
output_path Path

destination .parquet file path.

required
seasons list[str]

list of season codes for standard leagues, e.g. ["2425", "2324"].

required
leagues list[dict[str, str]]

list of dicts with "code" and "name" keys (standard mmz4281 leagues).

required
extra_leagues list[dict[str, str]] | None

optional list of dicts with "code" and "name" keys for extra-league countries (fetched from /new/{code}.csv). These use Pinnacle closing odds as reference probabilities.

None

Raises:

Type Description
RuntimeError

if no data could be fetched from any source.

Source code in src/data/odds_fdco.py
def load_odds_fdco(
    output_path: Path,
    seasons: list[str],
    leagues: list[dict[str, str]],
    extra_leagues: list[dict[str, str]] | None = None,
) -> None:
    """Download, normalize, and save closing odds to parquet.

    Args:
        output_path: destination .parquet file path.
        seasons: list of season codes for standard leagues, e.g. ["2425", "2324"].
        leagues: list of dicts with ``"code"`` and ``"name"`` keys
            (standard mmz4281 leagues).
        extra_leagues: optional list of dicts with "code" and "name" keys for
            extra-league countries (fetched from /new/{code}.csv).
            These use Pinnacle closing odds as reference probabilities.

    Raises:
        RuntimeError: if no data could be fetched from any source.
    """
    frames: list[pd.DataFrame] = []

    # --- Standard leagues (mmz4281 / B365) ---
    for season in seasons:
        for league in leagues:
            code = league["code"]
            name = league.get("name", code)
            try:
                raw = fetch_league_csv(season, code)
                normalized = normalize_fdco(raw, season, code)
                if not normalized.empty:
                    normalized["league_name"] = name
                    frames.append(normalized)
                    logger.info("  %s/%s: %d rows", season, code, len(normalized))
            except requests.HTTPError as exc:
                logger.warning("Skipping %s/%s: HTTP %s", season, code, exc)

    # --- Extra leagues (/new/ / Pinnacle closing) ---
    if extra_leagues:
        # Derive calendar years to keep from seasons list (e.g. "2324" → 2023, 2024)
        season_years: list[int] = []
        for s in seasons:
            if len(s) == 4:
                season_years.append(int("20" + s[:2]))
                season_years.append(int("20" + s[2:]))
        season_years = sorted(set(season_years))

        for league in extra_leagues:
            code = league["code"]
            name = league.get("name", code)
            try:
                raw = fetch_extra_league_csv(code)
                normalized = normalize_fdco_extra(raw, code, name, season_years)
                if not normalized.empty:
                    frames.append(normalized)
                    logger.info("  extra/%s: %d rows", code, len(normalized))
            except requests.HTTPError as exc:
                logger.warning("Skipping extra/%s: HTTP %s", code, exc)

    if not frames:
        raise RuntimeError(
            "No odds data fetched — check params.yaml odds_fdco.seasons and odds_fdco.leagues"
        )

    df = pd.concat(frames, ignore_index=True)

    output_path = Path(output_path)
    output_path.parent.mkdir(parents=True, exist_ok=True)
    df.to_parquet(output_path, index=False)

    logger.info(
        "Saved %d rows across %d league-seasons to %s",
        len(df),
        len(frames),
        output_path,
    )

Odds (Fonbet)

Production module for collecting fon.bet odds snapshots via Selenium.

fon.bet is a single-page application (SPA) — there is no public REST API available via a simple HTTP GET. All data is loaded via XHR/fetch requests made by the browser JavaScript bundle. This module injects a Chrome DevTools Protocol (CDP) script into the page before navigation, intercepts every JSON response the SPA fetches, and stores the complete snapshot.

Provides

save_daily_snapshot(output_dir) -> str Launch a headless Chrome via Selenoid, navigate to the fon.bet football page, capture all JSON API responses, and persist a single compressed archive per run:

Writes to MinIO (primary):
    s3://{MINIO_BUCKET_DATA_RAW}/odds_fonbet/date=YYYY-MM-DD/HHMMSS.json.gz
Falls back to local filesystem:
    {output_dir}/date=YYYY-MM-DD/HHMMSS.json.gz
Snapshot format

A gzip-compressed JSON array of response objects::

[
    {"url": "https://…/events/listBase?…", "body": "{…raw JSON string…}"},
    {"url": "https://…/geoCategories?…",   "body": "{…}"},
    …
]

The body field contains the raw JSON string exactly as returned by the server — not re-parsed. Each captured entry is a JSON API response the SPA loaded during page initialisation. Typically 30–40 responses per snapshot, with the main events/listBase response (~9 MB) being the largest.

Key endpoints captured (by URL fragment): events/listBase — all events + sports hierarchy + geo versions sportEvent/geoCategories — country/region lookup (ISO + internal IDs) sportEvent/sportCategories — league/tournament categories factorsCatalog/… — available bet types (factors) line/logos — team/league logo metadata

Usage in downstream code::

import gzip, json
from src.data.odds_fonbet import save_daily_snapshot, load_snapshot

# collect a fresh snapshot
path = save_daily_snapshot()

# load an existing snapshot
captured = load_snapshot(path)
events_raw = json.loads(
    next(c["body"] for c in captured if "events/listBase" in c["url"])
)

save_daily_snapshot(output_dir='data/raw/odds_fonbet')

Capture the full fon.bet page load and save all JSON responses.

Uses Selenium + CDP to intercept every JSON API call the SPA makes on page load, then writes the complete list as a gzip-compressed JSON file.

Partition layout (Hive-compatible)::

odds_fonbet/date=YYYY-MM-DD/HHMMSS.json.gz

Writes to MinIO when MINIO_* environment variables are configured (primary)::

s3://{MINIO_BUCKET_DATA_RAW}/odds_fonbet/date=YYYY-MM-DD/HHMMSS.json.gz

Falls back to local filesystem::

{output_dir}/date=YYYY-MM-DD/HHMMSS.json.gz

Parameters:

Name Type Description Default
output_dir Path | str

Local fallback root directory. Ignored when MinIO is available.

'data/raw/odds_fonbet'

Returns:

Type Description
str

URI or path of the written file as a string.

Raises:

Type Description
RuntimeError

if Selenium capture returns no data.

Source code in src/data/odds_fonbet.py
def save_daily_snapshot(output_dir: Path | str = "data/raw/odds_fonbet") -> str:
    """Capture the full fon.bet page load and save all JSON responses.

    Uses Selenium + CDP to intercept every JSON API call the SPA makes on
    page load, then writes the complete list as a gzip-compressed JSON file.

    Partition layout (Hive-compatible)::

        odds_fonbet/date=YYYY-MM-DD/HHMMSS.json.gz

    Writes to MinIO when MINIO_* environment variables are configured (primary)::

        s3://{MINIO_BUCKET_DATA_RAW}/odds_fonbet/date=YYYY-MM-DD/HHMMSS.json.gz

    Falls back to local filesystem::

        {output_dir}/date=YYYY-MM-DD/HHMMSS.json.gz

    Args:
        output_dir: Local fallback root directory. Ignored when MinIO is available.

    Returns:
        URI or path of the written file as a string.

    Raises:
        RuntimeError: if Selenium capture returns no data.
    """
    import fsspec

    captured = _capture_fonbet_json()
    if not captured:
        raise RuntimeError("fon.bet capture returned no data — skipping save")

    now = datetime.now(timezone.utc)
    date_part = now.strftime("%Y-%m-%d")
    file_stem = now.strftime("%H%M%S")
    partition = f"odds_fonbet/date={date_part}/{file_stem}.json.gz"

    raw_gz = gzip.compress(_json.dumps(captured, ensure_ascii=False).encode("utf-8"))

    try:
        settings = get_minio_settings()
        uri = f"s3://{settings.bucket_data_raw}/{partition}"
        with fsspec.open(uri, "wb", **settings.storage_options) as fh:
            fh.write(raw_gz)
        logger.info(
            "Saved fon.bet snapshot (%d responses, %d bytes) to %s",
            len(captured),
            len(raw_gz),
            uri,
        )
        return uri
    except Exception as exc:
        logger.warning(
            "MinIO not available (%s); falling back to local filesystem", exc
        )

    local_path = Path(output_dir) / f"date={date_part}" / f"{file_stem}.json.gz"
    local_path.parent.mkdir(parents=True, exist_ok=True)
    local_path.write_bytes(raw_gz)
    logger.info(
        "Saved fon.bet snapshot (%d responses, %d bytes) to %s",
        len(captured),
        len(raw_gz),
        local_path,
    )
    return str(local_path)

load_snapshot(path)

Load a previously saved fon.bet snapshot from a local or S3 path.

Parameters:

Name Type Description Default
path str | Path

Local path or s3://… URI to a .json.gz snapshot file.

required

Returns:

Type Description
List of ``{"url"

str, "body": str}`` dicts.

Source code in src/data/odds_fonbet.py
def load_snapshot(path: str | Path) -> list[dict]:
    """Load a previously saved fon.bet snapshot from a local or S3 path.

    Args:
        path: Local path or ``s3://…`` URI to a ``.json.gz`` snapshot file.

    Returns:
        List of ``{"url": str, "body": str}`` dicts.
    """
    import fsspec

    path_str = str(path)
    if path_str.startswith("s3://"):
        settings = get_minio_settings()
        with fsspec.open(path_str, "rb", **settings.storage_options) as fh:
            raw_bytes = fh.read()
    else:
        raw_bytes = Path(path_str).read_bytes()

    return _json.loads(gzip.decompress(raw_bytes).decode("utf-8"))

Odds (Join)

Join football-data.co.uk odds to the holdout set by team name and date.

The WhoScored dataset uses numeric teamIds; FDCO uses string team names. Join strategy: 1. Build teamId -> name index from data/metadata/homeTeamId.json + awayTeamId.json 2. Exact match on (date, home_team_lower, away_team_lower) 3. Fuzzy fallback via fuzzywuzzy (threshold=85) for unmatched rows

Returns a tuple of three arrays aligned to df_holdout row order

reference_proba : np.ndarray (n, 3) — vig-stripped implied probabilities actual_odds : np.ndarray (n, 3) — raw Bet365 decimal odds (b365h/d/a) league_codes : np.ndarray (n,) — FDCO league_code str, or None if unmatched

Unmatched rows contain NaN in reference_proba and actual_odds.

join_odds_to_holdout(df_holdout, df_odds, metadata_dir)

Return odds arrays aligned to df_holdout row order.

Parameters:

Name Type Description Default
df_holdout DataFrame

holdout DataFrame; must contain homeTeamId, awayTeamId, and startTimeUtc columns.

required
df_odds DataFrame

normalized FDCO DataFrame; must contain date, home_team, away_team, p_home, p_draw, p_away, b365h, b365d, b365a, league_code columns.

required
metadata_dir Path

path to the data/metadata/ directory.

required

Returns:

Type Description
Tuple of three arrays, all shape (len(df_holdout), ...
-reference_proba

(n, 3) float — vig-stripped probs; NaN if unmatched

-actual_odds

(n, 3) float — B365 decimal odds; NaN if unmatched

-league_codes

(n,) object — FDCO league_code str; None if unmatched

Source code in src/data/odds_join.py
def join_odds_to_holdout(
    df_holdout: pd.DataFrame,
    df_odds: pd.DataFrame,
    metadata_dir: Path,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Return odds arrays aligned to df_holdout row order.

    Args:
        df_holdout: holdout DataFrame; must contain homeTeamId, awayTeamId,
                    and startTimeUtc columns.
        df_odds: normalized FDCO DataFrame; must contain date, home_team,
                 away_team, p_home, p_draw, p_away, b365h, b365d, b365a,
                 league_code columns.
        metadata_dir: path to the data/metadata/ directory.

    Returns:
        Tuple of three arrays, all shape (len(df_holdout), ...):
        - reference_proba : (n, 3) float  — vig-stripped probs; NaN if unmatched
        - actual_odds     : (n, 3) float  — B365 decimal odds; NaN if unmatched
        - league_codes    : (n,) object   — FDCO league_code str; None if unmatched
    """

    n = len(df_holdout)
    result = np.full((n, 3), fill_value=np.nan)
    actual_odds = np.full((n, 3), fill_value=np.nan)
    league_codes = np.full(n, fill_value=None, dtype=object)

    team_index = _load_team_index(metadata_dir)
    if not team_index:
        logger.warning("Empty team index — odds join will return all NaN")
        return result, actual_odds, league_codes

    required_holdout = {"homeTeamId", "awayTeamId", "startTimeUtc"}
    missing = required_holdout - set(df_holdout.columns)
    if missing:
        logger.warning("df_holdout missing columns %s — odds join skipped", missing)
        return result, actual_odds, league_codes

    required_odds = {
        "date",
        "home_team",
        "away_team",
        "p_home",
        "p_draw",
        "p_away",
        "b365h",
        "b365d",
        "b365a",
    }
    missing_odds = required_odds - set(df_odds.columns)
    if missing_odds:
        logger.warning("df_odds missing columns %s — odds join skipped", missing_odds)
        return result, actual_odds, league_codes

    # ------------------------------------------------------------------ #
    # Phase 1: vectorised exact merge                                      #
    # ------------------------------------------------------------------ #
    # Build normalised key columns on the holdout side (all vectorised).
    hdf = pd.DataFrame({"_row": np.arange(n)}, index=df_holdout.index)
    hdf["_date"] = pd.to_datetime(df_holdout["startTimeUtc"]).dt.normalize()
    hdf["_home"] = (
        df_holdout["homeTeamId"]
        .map(lambda x: team_index.get(int(x), ""))
        .str.strip()
        .str.lower()
    )
    hdf["_away"] = (
        df_holdout["awayTeamId"]
        .map(lambda x: team_index.get(int(x), ""))
        .str.strip()
        .str.lower()
    )

    # Build normalised key columns on the odds side.
    odf = df_odds[
        [
            "date",
            "home_team",
            "away_team",
            "p_home",
            "p_draw",
            "p_away",
            "b365h",
            "b365d",
            "b365a",
        ]
    ].copy()
    if "league_code" in df_odds.columns:
        odf["league_code"] = df_odds["league_code"]
    else:
        odf["league_code"] = None
    odf["_date"] = pd.to_datetime(odf["date"]).dt.normalize()
    odf["_home"] = odf["home_team"].str.strip().str.lower()
    odf["_away"] = odf["away_team"].str.strip().str.lower()
    odf = odf.drop(columns=["date", "home_team", "away_team"])

    merged = hdf.merge(odf, on=["_date", "_home", "_away"], how="left")

    exact_mask = merged["p_home"].notna()
    exact = int(exact_mask.sum())

    exact_rows = merged[exact_mask]
    result[exact_rows["_row"].to_numpy()] = exact_rows[
        ["p_home", "p_draw", "p_away"]
    ].to_numpy()
    actual_odds[exact_rows["_row"].to_numpy()] = exact_rows[
        ["b365h", "b365d", "b365a"]
    ].to_numpy()
    league_codes[exact_rows["_row"].to_numpy()] = exact_rows["league_code"].to_numpy()

    # ------------------------------------------------------------------ #
    # Phase 2: fuzzy fallback — only for unmatched rows on covered dates  #
    # ------------------------------------------------------------------ #
    # Use rapidfuzz (C extension, ~50× faster than pure-Python fuzzywuzzy).
    try:
        from rapidfuzz import fuzz as rffuzz  # noqa: PLC0415
        from rapidfuzz import process as rfprocess  # noqa: PLC0415

        _ratio = rffuzz.ratio
        _extract_one = rfprocess.extractOne
    except ImportError:  # fallback — still correct, just slower
        from fuzzywuzzy import fuzz as _fwfuzz  # noqa: PLC0415

        _ratio = _fwfuzz.ratio
        _extract_one = None

    fdco_dates: set = set(odf["_date"].unique())
    # Per-date away index: (date, fdco_home) -> list[fdco_away]
    # Per-row lookup: (date, fdco_home, fdco_away) -> val tuple
    fdco_date_away: dict = {}
    odds_lookup_normed: dict = {}
    for row in odf.to_dict("records"):
        d, h, a = row["_date"], row["_home"], row["_away"]
        fdco_date_away.setdefault((d, h), []).append(a)
        odds_lookup_normed[(d, h, a)] = (
            row["p_home"],
            row["p_draw"],
            row["p_away"],
            row["b365h"],
            row["b365d"],
            row["b365a"],
            row["league_code"],
        )

    # Pre-compute global home name cache: unique WhoScored home name -> best FDCO
    # home name (or None).  5 k WhoScored names × 300 FDCO names is done once in
    # C via rapidfuzz cdist — far cheaper than computing per-row.
    unmatched_hdf = merged.loc[~exact_mask, ["_row", "_date", "_home", "_away"]]
    unmatched_on_covered = unmatched_hdf[unmatched_hdf["_date"].isin(fdco_dates)]

    # Per-date home index for correct scoping of fuzzy search (same as old code,
    # but we only compute fuzz scores once per unique home name per date group).
    fdco_date_home: dict = {}
    for d, h in fdco_date_away:
        fdco_date_home.setdefault(d, []).append(h)

    fuzzy = 0

    if _extract_one is not None:
        # rapidfuzz path: group unmatched rows by date; run cdist once per date.
        import numpy as _np  # noqa: PLC0415
        from rapidfuzz import process as _rfp  # noqa: PLC0415

        for match_date, grp in unmatched_on_covered.groupby("_date"):
            same_day_fdco_homes = fdco_date_home.get(match_date, [])
            if not same_day_fdco_homes:
                continue
            unique_homes = grp["_home"].unique().tolist()
            scores = _rfp.cdist(
                unique_homes, same_day_fdco_homes, scorer=rffuzz.ratio, workers=1
            )
            best_idx = _np.argmax(scores, axis=1)
            best_scores_arr = scores[_np.arange(len(unique_homes)), best_idx]
            home_day_cache = {
                ws: (
                    same_day_fdco_homes[best_idx[i]]
                    if best_scores_arr[i] >= _FUZZY_THRESHOLD
                    else None
                )
                for i, ws in enumerate(unique_homes)
            }
            for row in grp.to_dict("records"):
                cached_home = home_day_cache.get(row["_home"])
                if cached_home is None:
                    continue
                away_candidates = fdco_date_away.get((match_date, cached_home), [])
                if not away_candidates:
                    continue
                match = _extract_one(
                    row["_away"],
                    away_candidates,
                    scorer=rffuzz.ratio,
                    score_cutoff=_FUZZY_THRESHOLD,
                )
                if match is None:
                    continue
                val = odds_lookup_normed.get((match_date, cached_home, match[0]))
                if val is None:
                    continue
                result[row["_row"]] = val[:3]
                actual_odds[row["_row"]] = val[3:6]
                league_codes[row["_row"]] = val[6]
                fuzzy += 1
    else:
        # fuzzywuzzy fallback (per-row, same logic as original)
        for row in unmatched_on_covered.to_dict("records"):
            home_name, away_name = row["_home"], row["_away"]
            match_date, i = row["_date"], row["_row"]
            same_day_homes = fdco_date_home.get(match_date, [])
            best_s, best_h = 0.0, ""
            for cand in same_day_homes:
                s = _ratio(home_name, cand)
                if s > best_s:
                    best_s, best_h = s, cand
            if best_s < _FUZZY_THRESHOLD:
                continue
            away_candidates = fdco_date_away.get((match_date, best_h), [])
            best_as, best_a = 0.0, None
            for a_cand in away_candidates:
                s = _ratio(away_name, a_cand)
                if s > best_as:
                    best_as, best_a = s, a_cand
            if best_as < _FUZZY_THRESHOLD or best_a is None:
                continue
            val = odds_lookup_normed.get((match_date, best_h, best_a))
            if val is None:
                continue
            result[i] = val[:3]
            actual_odds[i] = val[3:6]
            league_codes[i] = val[6]
            fuzzy += 1

    total = exact + fuzzy
    logger.info(
        "Odds join: %d/%d matched (%d exact, %d fuzzy), %d unmatched (uniform fallback)",
        total,
        n,
        exact,
        fuzzy,
        n - total,
    )
    return result, actual_odds, league_codes