Show code
from pathlib import Path
import sys
project_root = Path("../..").resolve()
sys.path.insert(0, str(project_root))
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import Markdown, displaybatch_inference pipeline, Airflow-driven live scraping, and /predict/cards/ API
Dima Ivanov
May 28, 2026
Previous reports used historical closing odds from football-data.co.uk (FDCO) as benchmark. Since 2026-05-15 the system collects live pre-match odds from Fonbet autonomously via a three-stage Airflow DAG chain (Selenoid scrape → fuzzy match → factor extraction).
This report documents:
batch_inference — the DVC stage that produces model predictions for all upcoming and recent matches/predict/cards/ API endpoint that merges predictions with live odds| Component | Type | Schedule |
|---|---|---|
batch_inference |
DVC stage | triggered after final_train or on-demand |
soccer_etl_odds_fonbet_01_raw |
Airflow DAG | every 4 h |
soccer_etl_odds_fonbet_02_link |
Airflow DAG | triggered after DAG 01 |
soccer_etl_odds_fonbet_03_odds |
Airflow DAG | triggered after DAG 02 |
The batch_inference DVC stage runs independently of the training branch. It ingests data/interim/future.parquet and data/interim/finished.parquet, recomputes features on the full timeline, and scores every match via the champion MLflow model alias.
deps = {
"Inputs": [
"data/interim/future.parquet",
"data/interim/finished.parquet",
"data/features/features_meta.parquet",
"data/metadata/homeTeamId.json",
"data/metadata/awayTeamId.json",
],
"Outputs": [
"data/predictions/match_features.parquet -> MinIO predictions bucket",
"data/predictions/predictions.parquet -> MinIO predictions bucket",
],
"Key params": [
"inference.model_name = soccer-match-outcome",
"inference.model_stage = champion",
"inference.history_years = 2 (rolling stats window; ELO uses full timeline)",
"inference.upload_match_features = true",
"inference.upload_predictions = true",
],
}
for section, items in deps.items():
print(f"\n{section}:")
for item in items:
print(f" * {item}")
Inputs:
* data/interim/future.parquet
* data/interim/finished.parquet
* data/features/features_meta.parquet
* data/metadata/homeTeamId.json
* data/metadata/awayTeamId.json
Outputs:
* data/predictions/match_features.parquet -> MinIO predictions bucket
* data/predictions/predictions.parquet -> MinIO predictions bucket
Key params:
* inference.model_name = soccer-match-outcome
* inference.model_stage = champion
* inference.history_years = 2 (rolling stats window; ELO uses full timeline)
* inference.upload_match_features = true
* inference.upload_predictions = true
steps = [
("1", "concat", "Concatenate finished + future chronologically"),
("2", "rolling", "4 group-key axes x 7 window sizes x 5 stat cols -> ~140 rolling features"),
("3", "elo", "ELO on full timeline (not trimmed by history_years), joined by match id"),
("4", "select", "select_model_features() -> align to model input schema (507 features)"),
("5", "predict", "Chunked inference (50 000 rows/chunk) via models:/soccer-match-outcome@champion"),
("6", "upload", "Push match_features.parquet + predictions.parquet to MinIO"),
]
df_steps = pd.DataFrame(steps, columns=["#", "Step", "Description"])
display(Markdown(df_steps.to_markdown(index=False)))| # | Step | Description |
|---|---|---|
| 1 | concat | Concatenate finished + future chronologically |
| 2 | rolling | 4 group-key axes x 7 window sizes x 5 stat cols -> ~140 rolling features |
| 3 | elo | ELO on full timeline (not trimmed by history_years), joined by match id |
| 4 | select | select_model_features() -> align to model input schema (507 features) |
| 5 | predict | Chunked inference (50 000 rows/chunk) via models:/soccer-match-outcome@champion |
| 6 | upload | Push match_features.parquet + predictions.parquet to MinIO |
_preds_path = project_root / "data" / "predictions" / "predictions.parquet"
df_preds = pd.read_parquet(_preds_path)
_future = df_preds[df_preds["is_future"]].copy()
_finished = df_preds[~df_preds["is_future"]].copy()
print(f"Total rows: {len(df_preds):,}")
print(f" Future: {len(_future):,} "
f"({_future['startTimeUtc'].min().date()} - {_future['startTimeUtc'].max().date()})")
print(f" Finished: {len(_finished):,} "
f"({_finished['startTimeUtc'].min().date()} - {_finished['startTimeUtc'].max().date()})")
print(f"\nModel stage: {df_preds['model_stage'].iloc[0]}")
print(f"Model run_id: {df_preds['model_run_id'].iloc[0][:16]}...")Total rows: 116,394
Future: 2,712 (2010-07-21 - 2026-05-31)
Finished: 113,682 (2024-05-16 - 2026-05-16)
Model stage: champion
Model run_id: 9e156226cffa4c7d...
if "tournamentName" in _future.columns:
_tourn = (
_future
.groupby(["regionName", "tournamentName"])
.agg(matches=("is_future", "count"))
.sort_values("matches", ascending=False)
.head(15)
.reset_index()
)
display(Markdown("**Upcoming matches by tournament (top 15)**"))
display(Markdown(_tourn.to_markdown(index=False)))
else:
print("tournamentName column not present")tournamentName column not present
fig, axes = plt.subplots(1, 3, figsize=(12, 4), sharey=True)
_pcols = [
("proba_home", "#4CAF50", "P(home win)"),
("proba_draw", "#FF9800", "P(draw)"),
("proba_away", "#2196F3", "P(away win)"),
]
for ax, (col, color, label) in zip(axes, _pcols):
ax.hist(_future[col].dropna(), bins=30, color=color, alpha=0.8, edgecolor="white")
ax.axvline(_future[col].mean(), color="black", linestyle="--", linewidth=1.2,
label=f"mean={_future[col].mean():.3f}")
ax.set_xlabel(label)
if ax is axes[0]:
ax.set_ylabel("Matches")
ax.legend(fontsize=8)
plt.suptitle("Predicted probability distributions - upcoming matches", y=1.02)
plt.tight_layout()
plt.show()_label_counts = _future["predicted_label"].value_counts()
_label_colors = {"home_win": "#4CAF50", "draw": "#FF9800", "away_win": "#2196F3"}
fig, ax = plt.subplots(figsize=(5, 3.5))
bars = ax.bar(
_label_counts.index,
_label_counts.values,
color=[_label_colors.get(l, "gray") for l in _label_counts.index],
)
for bar, val in zip(bars, _label_counts.values):
ax.text(bar.get_x() + bar.get_width() / 2, bar.get_height() + 0.5,
str(val), ha="center", va="bottom", fontsize=9)
ax.set_ylabel("Matches")
ax.set_title("Predicted outcome class - upcoming matches")
plt.tight_layout()
plt.show()Since 2026-05-15 live pre-match odds are collected from Fonbet via a three-stage Airflow DAG chain. All data lands in MinIO (MINIO_BUCKET_DATA_RAW).
fon.bet/sports/football
| (Selenoid headless Chrome + CDP XHR intercept)
v
MinIO: odds_fonbet/date=YYYY-MM-DD/HHMMSS.json.gz <- DAG 01 (every 4 h)
| (fuzzy 3-layer: region -> league -> teams + kickoff +-90 min)
v
MinIO: match_links/fonbet_links.parquet <- DAG 02 (triggered after 01)
| (extract factors 921=home / 922=draw / 923=away from customFactors[])
v
MinIO: match_links/fonbet_odds.parquet <- DAG 03 (triggered after 02)
|
v
FastAPI: /predict/cards/ <- left-join on match_id with predictions.parquet
How it works: launches headless Chrome via Selenoid, injects a CDP JavaScript hook (window._fonbetCapture) that intercepts all fetch and XHR requests before page navigation. Navigates to fon.bet/sports/football and captures 30–40 JSON responses (largest: events/listBase ~9 MB). Saves gzip-compressed snapshot to MinIO.
_dag01 = [
("DAG ID", "soccer_etl_odds_fonbet_01_raw"),
("Schedule", "every 4 h"),
("Timeout", "15 min, retries: 2 x 10 min"),
("Command", "python -m src.pipelines.collect_fonbet_odds"),
("Output", "MinIO: odds_fonbet/date=YYYY-MM-DD/HHMMSS.json.gz"),
("Format", "gzip JSON array: [{url, body}, ...] (~30-40 responses)"),
("Key URLs", "events/listBase, geoCategories, sportCategories, factorsCatalog, logos"),
]
display(Markdown(pd.DataFrame(_dag01, columns=["Field", "Value"]).to_markdown(index=False)))| Field | Value |
|---|---|
| DAG ID | soccer_etl_odds_fonbet_01_raw |
| Schedule | every 4 h |
| Timeout | 15 min, retries: 2 x 10 min |
| Command | python -m src.pipelines.collect_fonbet_odds |
| Output | MinIO: odds_fonbet/date=YYYY-MM-DD/HHMMSS.json.gz |
| Format | gzip JSON array: [{url, body}, …] (~30-40 responses) |
| Key URLs | events/listBase, geoCategories, sportCategories, factorsCatalog, logos |
_dag02 = [
("DAG ID", "soccer_etl_odds_fonbet_02_link"),
("Timeout", "10 min"),
("Input", "latest odds_fonbet/*.json.gz + match_raw.parquet (MinIO)"),
("Output", "MinIO: match_links/fonbet_links.parquet (upsert)"),
("Window", "MATCH_WINDOW_DAYS = 3 (upcoming matches only)"),
("Upsert", "rows with existing fonbet_event_id are never re-matched"),
]
display(Markdown(pd.DataFrame(_dag02, columns=["Field", "Value"]).to_markdown(index=False)))| Field | Value |
|---|---|
| DAG ID | soccer_etl_odds_fonbet_02_link |
| Timeout | 10 min |
| Input | latest odds_fonbet/*.json.gz + match_raw.parquet (MinIO) |
| Output | MinIO: match_links/fonbet_links.parquet (upsert) |
| Window | MATCH_WINDOW_DAYS = 3 (upcoming matches only) |
| Upsert | rows with existing fonbet_event_id are never re-matched |
Three-layer matching algorithm:
| Layer | Field | Method | Threshold (env var) |
|---|---|---|---|
| 1 | regionName → Fonbet country |
token_sort_ratio |
FONBET_COUNTRY_THR = 80 |
| 2 | tournamentName → Fonbet league |
token_set_ratio + partial_ratio |
FONBET_LEAGUE_THR = 65 |
| 3 | team names + kickoff ±90 min | combined + per-team score | FONBET_COMBINED_THR = 75 AND FONBET_PER_TEAM_THR = 40 |
Output schema — fonbet_links.parquet:
_ls = [
("match_id", "int64", "Site match ID"),
("fonbet_event_id", "int64", "Fonbet event ID"),
("fonbet_sport_id", "int64", "Fonbet sport node ID (used for deep-link)"),
("match_score", "float64", "Fuzzy match confidence (0-100)"),
("site_region", "str", "Site region name"),
("site_tourn", "str", "Site tournament name"),
("site_home", "str", "Site home team"),
("site_away", "str", "Site away team"),
("site_start", "datetime", "Kickoff UTC"),
("fb_country", "str", "Matched Fonbet country"),
("fb_league", "str", "Matched Fonbet league"),
]
display(Markdown(pd.DataFrame(_ls, columns=["Column", "Type", "Description"]).to_markdown(index=False)))| Column | Type | Description |
|---|---|---|
| match_id | int64 | Site match ID |
| fonbet_event_id | int64 | Fonbet event ID |
| fonbet_sport_id | int64 | Fonbet sport node ID (used for deep-link) |
| match_score | float64 | Fuzzy match confidence (0-100) |
| site_region | str | Site region name |
| site_tourn | str | Site tournament name |
| site_home | str | Site home team |
| site_away | str | Site away team |
| site_start | datetime | Kickoff UTC |
| fb_country | str | Matched Fonbet country |
| fb_league | str | Matched Fonbet league |
_dag03 = [
("DAG ID", "soccer_etl_odds_fonbet_03_odds"),
("Timeout", "5 min"),
("Input", "match_links/fonbet_links.parquet + latest odds_fonbet/*.json.gz"),
("Output", "MinIO: match_links/fonbet_odds.parquet (upsert by fonbet_event_id)"),
("Factors", "921 = home win | 922 = draw | 923 = away win (from customFactors[])"),
("Upsert", "existing rows always overwritten (odds change over time)"),
]
display(Markdown(pd.DataFrame(_dag03, columns=["Field", "Value"]).to_markdown(index=False)))| Field | Value |
|---|---|
| DAG ID | soccer_etl_odds_fonbet_03_odds |
| Timeout | 5 min |
| Input | match_links/fonbet_links.parquet + latest odds_fonbet/*.json.gz |
| Output | MinIO: match_links/fonbet_odds.parquet (upsert by fonbet_event_id) |
| Factors | 921 = home win |
| Upsert | existing rows always overwritten (odds change over time) |
Output schema — fonbet_odds.parquet:
_os = [
("match_id", "int64", "Site match ID"),
("fonbet_event_id", "int64", "Fonbet event ID"),
("fonbet_sport_id", "int64", "Fonbet sport node ID"),
("odd_home", "float64", "Factor 921 - 1X2 home win"),
("odd_draw", "float64", "Factor 922 - 1X2 draw"),
("odd_away", "float64", "Factor 923 - 1X2 away win"),
("markets_count", "int64", "Total markets available for this event"),
("snapshot_key", "str", "MinIO object key of source snapshot"),
("fetched_at", "datetime", "Extraction timestamp (UTC, tz-naive)"),
]
display(Markdown(pd.DataFrame(_os, columns=["Column", "Type", "Description"]).to_markdown(index=False)))| Column | Type | Description |
|---|---|---|
| match_id | int64 | Site match ID |
| fonbet_event_id | int64 | Fonbet event ID |
| fonbet_sport_id | int64 | Fonbet sport node ID |
| odd_home | float64 | Factor 921 - 1X2 home win |
| odd_draw | float64 | Factor 922 - 1X2 draw |
| odd_away | float64 | Factor 923 - 1X2 away win |
| markets_count | int64 | Total markets available for this event |
| snapshot_key | str | MinIO object key of source snapshot |
| fetched_at | datetime | Extraction timestamp (UTC, tz-naive) |
MatchCardService left-joins on match_id with Fonbet odds as the primary source, so matches with odds but no prediction are still included. A Fonbet deep-link https://fon.bet/sports/football/{sport_id}/{event_id} is built when fonbet_sport_id is available. Cache invalidates when either source _mtime changes (checked every 120 s for odds, 60 s for predictions).
_cc = [
("match_id", "Match site ID"),
("homeTeamName", "Home team"),
("awayTeamName", "Away team"),
("tournamentName", "Tournament"),
("regionName", "Region / country"),
("startTimeUtc", "Kickoff (UTC)"),
("homeScore", "Home goals - null for future"),
("awayScore", "Away goals - null for future"),
("outcome_1x2", "True outcome - null for future"),
("proba_home", "Model P(home win)"),
("proba_draw", "Model P(draw)"),
("proba_away", "Model P(away win)"),
("predicted_class", "Predicted outcome class (0=home / 1=draw / 2=away)"),
("predicted_label", "Predicted outcome label"),
("is_future", "True = upcoming match"),
("odd_home", "Fonbet live 1X2 home win"),
("odd_draw", "Fonbet live 1X2 draw"),
("odd_away", "Fonbet live 1X2 away win"),
("fonbet_url", "Deep-link to Fonbet event page"),
("model_run_id", "MLflow run ID of scoring model"),
("model_stage", "MLflow model alias (champion)"),
]
display(Markdown(pd.DataFrame(_cc, columns=["Column", "Description"]).to_markdown(index=False)))| Column | Description |
|---|---|
| match_id | Match site ID |
| homeTeamName | Home team |
| awayTeamName | Away team |
| tournamentName | Tournament |
| regionName | Region / country |
| startTimeUtc | Kickoff (UTC) |
| homeScore | Home goals - null for future |
| awayScore | Away goals - null for future |
| outcome_1x2 | True outcome - null for future |
| proba_home | Model P(home win) |
| proba_draw | Model P(draw) |
| proba_away | Model P(away win) |
| predicted_class | Predicted outcome class (0=home / 1=draw / 2=away) |
| predicted_label | Predicted outcome label |
| is_future | True = upcoming match |
| odd_home | Fonbet live 1X2 home win |
| odd_draw | Fonbet live 1X2 draw |
| odd_away | Fonbet live 1X2 away win |
| fonbet_url | Deep-link to Fonbet event page |
| model_run_id | MLflow run ID of scoring model |
| model_stage | MLflow model alias (champion) |
display(Markdown(r"""
**Value edge formula** - vig-stripped market probability vs model:
$$
p_{\text{market},k} = \frac{1 / \text{odd}_{k}}{\sum_j 1 / \text{odd}_j}
\qquad
\text{edge}_k = p_{\text{model},k} - p_{\text{market},k}
$$
A positive edge on outcome $k$ means the model assigns higher probability than the market implies.
**Key difference vs report 05**: FDCO used **closing** odds (static, post-kickoff),
whereas Fonbet odds here are **live pre-match** and refresh ~every 4 h.
"""))Value edge formula - vig-stripped market probability vs model:
\[ p_{\text{market},k} = \frac{1 / \text{odd}_{k}}{\sum_j 1 / \text{odd}_j} \qquad \text{edge}_k = p_{\text{model},k} - p_{\text{market},k} \]
A positive edge on outcome \(k\) means the model assigns higher probability than the market implies.
Key difference vs report 05: FDCO used closing odds (static, post-kickoff), whereas Fonbet odds here are live pre-match and refresh ~every 4 h.
_tr = [
("Data source", "football-data.co.uk (FDCO)", "Fonbet (fon.bet) - live"),
("Collection", "Batch HTTP download", "Selenoid + CDP XHR intercept"),
("Coverage", "Top-tier leagues, 3 seasons", "All football events on Fonbet"),
("Odds type", "Closing odds (post-match, static)", "Pre-match (live, ~4 h refresh)"),
("Match linking", "Tournament + date key join", "3-layer fuzzy match"),
("History", "Seasons 2324 / 2425 / 2526", "From 2026-05-15 onwards"),
("Pipeline", "DVC stage (load_odds_fdco)", "Airflow DAG chain (not in DVC)"),
("Storage", "data/raw/odds_fdco.parquet (local + DVC)", "MinIO: match_links/fonbet_odds.parquet"),
("API exposure", "Report 05 holdout analysis only", "/predict/cards/ + /predict/odds/"),
]
df_tr = pd.DataFrame(_tr, columns=["Aspect", "FDCO (historical)", "Fonbet (live)"])
display(Markdown(df_tr.to_markdown(index=False)))| Aspect | FDCO (historical) | Fonbet (live) |
|---|---|---|
| Data source | football-data.co.uk (FDCO) | Fonbet (fon.bet) - live |
| Collection | Batch HTTP download | Selenoid + CDP XHR intercept |
| Coverage | Top-tier leagues, 3 seasons | All football events on Fonbet |
| Odds type | Closing odds (post-match, static) | Pre-match (live, ~4 h refresh) |
| Match linking | Tournament + date key join | 3-layer fuzzy match |
| History | Seasons 2324 / 2425 / 2526 | From 2026-05-15 onwards |
| Pipeline | DVC stage (load_odds_fdco) | Airflow DAG chain (not in DVC) |
| Storage | data/raw/odds_fdco.parquet (local + DVC) | MinIO: match_links/fonbet_odds.parquet |
| API exposure | Report 05 holdout analysis only | /predict/cards/ + /predict/odds/ |
display(Markdown(f"""
**Current inference state**
- `batch_inference` produces predictions for **{len(_future):,} upcoming** and
**{len(_finished):,} recently finished** matches.
- Future window: `{_future['startTimeUtc'].min().date()}` to `{_future['startTimeUtc'].max().date()}`
- Scoring model: `{df_preds['model_stage'].iloc[0]}` alias,
MLflow run `{df_preds['model_run_id'].iloc[0][:16]}...`
**Fonbet live odds** (since 2026-05-15)
- 3-stage Airflow chain, every 4 h: `fonbet_01_raw` -> `fonbet_02_link` -> `fonbet_03_odds`
- Raw snapshots: `MinIO: odds_fonbet/date=YYYY-MM-DD/HHMMSS.json.gz`
- Matched odds served via `/predict/cards/` (requires `X-API-Key`)
**Next milestones**
- Accumulate Fonbet history to replicate the FDCO ROI simulation with live pre-match odds
- Add coverage dashboard: % of upcoming matches successfully linked (DAG 02 match rate)
- Monitor fuzzy-match quality (low `match_score` events)
"""))Current inference state
batch_inference produces predictions for 2,712 upcoming and 113,682 recently finished matches.2010-07-21 to 2026-05-31champion alias, MLflow run 9e156226cffa4c7d...Fonbet live odds (since 2026-05-15)
fonbet_01_raw -> fonbet_02_link -> fonbet_03_oddsMinIO: odds_fonbet/date=YYYY-MM-DD/HHMMSS.json.gz/predict/cards/ (requires X-API-Key)Next milestones
match_score events)