Skip to content

Serving

Application Entry-point

lifespan(app) async

FastAPI lifespan: create DB tables and warm up the connection pool.

Source code in src/app/main.py
@asynccontextmanager
async def lifespan(app: FastAPI):  # noqa: ARG001
    """FastAPI lifespan: create DB tables and warm up the connection pool."""
    create_db_and_tables()
    # Warm up the SQLAlchemy connection pool before the first real request.
    # Without this, the first heavy query triggers pool init + ORM compilation
    # simultaneously, causing a memory spike that can exceed the k8s limit.
    from sqlalchemy import text as sa_text
    from sqlmodel import Session

    from src.data.database import engine

    with Session(engine) as _s:
        _s.exec(sa_text("SELECT 1"))  # type: ignore[call-overload]
    # ML model is loaded by the ml Celery worker (worker_ml.py), not here.
    # FastAPI dispatches prediction requests to the ml queue via Celery.
    logger.info("FastAPI application has been initialized")
    yield

prometheus_metrics()

Prometheus scrape endpoint.

Uses MultiProcessCollector when PROMETHEUS_MULTIPROC_DIR is set (required for Gunicorn multi-worker deployments).

Source code in src/app/main.py
@app.get("/metrics", include_in_schema=False, tags=["ops"])
def prometheus_metrics() -> Response:
    """Prometheus scrape endpoint.

    Uses ``MultiProcessCollector`` when ``PROMETHEUS_MULTIPROC_DIR`` is set
    (required for Gunicorn multi-worker deployments).
    """
    if "PROMETHEUS_MULTIPROC_DIR" in os.environ:
        registry = CollectorRegistry()
        multiprocess.MultiProcessCollector(registry)
        data = generate_latest(registry)
    else:
        data = generate_latest()
    return Response(content=data, media_type=CONTENT_TYPE_LATEST)

root() async

Return service name and links to the docs and health endpoints.

Source code in src/app/main.py
@app.get("/", tags=["root"])
async def root() -> dict:
    """Return service name and links to the docs and health endpoints."""
    return {"service": "SoccerPredictAI", "docs": "/docs", "health": "/healthcheck/"}

Database

create_db_and_tables()

Create all SQLModel-declared tables if they do not yet exist.

Source code in src/data/database.py
def create_db_and_tables():
    """Create all SQLModel-declared tables if they do not yet exist."""
    SQLModel.metadata.create_all(engine)

get_session()

Yield a SQLModel Session for use as a FastAPI dependency.

Source code in src/data/database.py
def get_session():
    """Yield a SQLModel ``Session`` for use as a FastAPI dependency."""
    with Session(engine) as session:
        yield session

remove_db_and_tables()

Drop all SQLModel-declared tables (used in tests and teardown).

Source code in src/data/database.py
def remove_db_and_tables():
    """Drop all SQLModel-declared tables (used in tests and teardown)."""
    SQLModel.metadata.drop_all(engine)

Dependencies

get_token_header(x_api_key=None) async

Validate the X-API-Key request header.

Uses hmac.compare_digest for constant-time comparison to prevent timing-based key enumeration attacks. Returns 401 when the header is absent or does not match the configured secret (FASTAPI_HEADER_TOKEN).

Parameters:

Name Type Description Default
x_api_key Annotated[str | None, Header(alias=x - api - key)]

Value of the X-API-Key request header, or None when the header is absent.

None
Source code in src/app/dependencies.py
async def get_token_header(
    x_api_key: Annotated[str | None, Header(alias="x-api-key")] = None,
) -> None:
    """Validate the ``X-API-Key`` request header.

    Uses ``hmac.compare_digest`` for constant-time comparison to prevent
    timing-based key enumeration attacks.  Returns 401 when the header is
    absent or does not match the configured secret (``FASTAPI_HEADER_TOKEN``).

    Args:
        x_api_key: Value of the ``X-API-Key`` request header, or ``None``
            when the header is absent.
    """
    expected = get_security_settings().token_header
    if x_api_key is None or not hmac.compare_digest(x_api_key, expected):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Missing or invalid X-API-Key header",
            headers={"WWW-Authenticate": "ApiKey"},
        )

get_query_token(token=None) async

Validate the token query parameter.

Parameters:

Name Type Description Default
token str | None

Value of the ?token= query parameter, or None when the parameter is absent.

None
Source code in src/app/dependencies.py
async def get_query_token(token: str | None = None) -> None:
    """Validate the ``token`` query parameter.

    Args:
        token: Value of the ``?token=`` query parameter, or ``None``
            when the parameter is absent.
    """
    expected = get_security_settings().token_query
    if token is None or not hmac.compare_digest(token, expected):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Missing or invalid token query parameter",
        )

Routers

list_predictions(pred_lookup, future_only=Query(default=False, description='Return only future matches'), limit=Query(default=None, ge=1, le=5000, description='Max rows to return'), offset=Query(default=0, ge=0, description='Rows to skip'))

Return rows from the in-memory predictions.parquet cache.

Includes both future and historical matches unless future_only is set. Only a fixed set of display columns is returned — feature vectors are omitted. Intended for the Data Explorer UI and diagnostic tooling.

Source code in src/app/routers/predict.py
@router.get(
    "/predictions/",
    response_model=None,
    status_code=status.HTTP_200_OK,
    summary="List all precomputed predictions (bulk read from predictions.parquet)",
)
def list_predictions(
    pred_lookup: _PredLookupDep,
    future_only: bool = Query(default=False, description="Return only future matches"),
    limit: int | None = Query(
        default=None, ge=1, le=5000, description="Max rows to return"
    ),
    offset: int = Query(default=0, ge=0, description="Rows to skip"),
) -> list[dict]:
    """Return rows from the in-memory predictions.parquet cache.

    Includes both future and historical matches unless *future_only* is set.
    Only a fixed set of display columns is returned — feature vectors are
    omitted.  Intended for the Data Explorer UI and diagnostic tooling.
    """
    rows = pred_lookup.list_matches()
    if future_only:
        rows = [r for r in rows if r.get("is_future")]
    rows = rows[offset : (offset + limit) if limit is not None else None]
    result = []
    for row in rows:
        entry: dict = {}
        for col in _PREDICTION_DISPLAY_COLS:
            val = row.get(col)
            if val is None:
                entry[col] = None
            elif hasattr(val, "item"):  # numpy scalar → Python native
                entry[col] = val.item()
            elif hasattr(val, "isoformat"):  # pandas Timestamp / datetime
                entry[col] = val.isoformat()
            else:
                entry[col] = val
        result.append(entry)
    return result

predict_precomputed(match_id, pred_lookup) async

Return the precomputed prediction for a match from predictions.parquet.

Predictions are produced by the batch_inference DVC stage which runs model.predict() over all matches and saves the result to MinIO. This endpoint reads directly from the in-memory cache — no Celery task, no MLflow model call at request time.

Returns 404 if the match is not found in the latest batch output.

Source code in src/app/routers/predict.py
@router.get(
    "/precomputed/{match_id}",
    response_model=PrecomputedPredictResponse,
    status_code=status.HTTP_200_OK,
    summary="Return precomputed prediction for a match (no Celery, no model call)",
)
async def predict_precomputed(
    match_id: int,
    pred_lookup: _PredLookupDep,
) -> PrecomputedPredictResponse:
    """Return the precomputed prediction for a match from predictions.parquet.

    Predictions are produced by the ``batch_inference`` DVC stage which runs
    ``model.predict()`` over all matches and saves the result to MinIO.
    This endpoint reads directly from the in-memory cache — no Celery task,
    no MLflow model call at request time.

    Returns 404 if the match is not found in the latest batch output.
    """
    row = pred_lookup.get_prediction(match_id)
    if row is None:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=(
                f"match_id={match_id} not found in precomputed predictions. "
                "Ensure the batch_inference DVC stage has been run and "
                "MINIO_BUCKET_PREDICTIONS is configured."
            ),
        )
    return PrecomputedPredictResponse(
        match_id=match_id,
        proba_home=float(row["proba_home"]),
        proba_draw=float(row["proba_draw"]),
        proba_away=float(row["proba_away"]),
        predicted_class=int(row["predicted_class"]),
        predicted_label=str(row.get("predicted_label", "")),
        is_future=bool(row["is_future"]) if "is_future" in row else None,
        start_time_utc=row.get("startTimeUtc"),
        home_team_name=row.get("homeTeamName"),
        away_team_name=row.get("awayTeamName"),
        model_run_id=row.get("model_run_id"),
        model_stage=row.get("model_stage"),
        predictions_computed_at=pred_lookup.predictions_computed_at,
    )

list_cards(cards_svc)

Return all precomputed predictions merged with Fonbet odds in one response.

Combines predictions.parquet and fonbet_odds.parquet on match_id. Each entry contains probabilities, predicted class, 1X2 odds, and a direct Fonbet URL (populated once the linking pipeline runs with fonbet_sport_id).

Served from the in-memory cache — no MinIO call at request time unless the underlying files have changed. Declared as a sync handler so FastAPI runs it in a thread pool and does not block the uvicorn event loop during MinIO I/O.

Source code in src/app/routers/predict.py
@router.get(
    "/cards/",
    response_model=None,
    status_code=status.HTTP_200_OK,
    summary="Precomputed predictions merged with Fonbet 1X2 odds",
)
def list_cards(cards_svc: _CardsDep) -> list[dict]:
    """Return all precomputed predictions merged with Fonbet odds in one response.

    Combines ``predictions.parquet`` and ``fonbet_odds.parquet`` on ``match_id``.
    Each entry contains probabilities, predicted class, 1X2 odds, and a direct
    Fonbet URL (populated once the linking pipeline runs with ``fonbet_sport_id``).

    Served from the in-memory cache — no MinIO call at request time unless the
    underlying files have changed.  Declared as a sync handler so FastAPI runs
    it in a thread pool and does not block the uvicorn event loop during MinIO I/O.
    """
    return cards_svc.list_cards()

list_region_roi(roi_svc)

Return flat-stake ROI statistics per region.

Data is produced by the live-betting pipeline stage (triggered daily by the soccer_ml_live_betting_01 Airflow DAG) and cached in memory with a 60-second MinIO re-check interval. Returns an empty list when roi_by_region.csv has not been produced yet.

Source code in src/app/routers/predict.py
@router.get(
    "/region-roi/",
    response_model=list[RegionRoiEntry],
    status_code=status.HTTP_200_OK,
    summary="ROI by region from the live-betting simulation",
)
def list_region_roi(roi_svc: _RegionRoiDep) -> list[RegionRoiEntry]:
    """Return flat-stake ROI statistics per region.

    Data is produced by the ``live-betting`` pipeline stage (triggered daily
    by the ``soccer_ml_live_betting_01`` Airflow DAG) and cached in memory
    with a 60-second MinIO re-check interval.  Returns an empty list when
    ``roi_by_region.csv`` has not been produced yet.
    """
    return [RegionRoiEntry(**row) for row in roi_svc.list_region_roi()]

list_odds(odds_svc) async

Return Fonbet 1X2 odds (odd_home, odd_draw, odd_away) for all matches.

Reads from fonbet_odds.parquet in the data-raw MinIO bucket. Returns an empty list if the file has not been produced yet.

Source code in src/app/routers/predict.py
@router.get(
    "/odds/",
    response_model=None,
    status_code=status.HTTP_200_OK,
    summary="Fonbet 1X2 odds for all matches",
)
async def list_odds(odds_svc: _OddsDep) -> list[dict]:
    """Return Fonbet 1X2 odds (odd_home, odd_draw, odd_away) for all matches.

    Reads from ``fonbet_odds.parquet`` in the data-raw MinIO bucket.
    Returns an empty list if the file has not been produced yet.
    """
    return odds_svc.list_odds()

predict_by_match_id(match_id, lookup, stage, request)

Submit a prediction task and wait synchronously for the result.

Features are read from match_features.parquet (in-memory cache). Returns 200 OK with the full prediction result once the Celery predict_match task completes (up to _SYNC_TIMEOUT seconds).

Returns 404 if match_id is not in the current batch feature output. Use ?stage=challenger to target the challenger model.

Source code in src/app/routers/predict.py
@router.get(
    "/{match_id}",
    status_code=status.HTTP_200_OK,
    response_model=None,
    summary="Run prediction for a match (synchronous, waits for result)",
)
def predict_by_match_id(
    match_id: int,
    lookup: _LookupDep,
    stage: StageDep,
    request: Request,
) -> JSONResponse:
    """Submit a prediction task and wait synchronously for the result.

    Features are read from ``match_features.parquet`` (in-memory cache).
    Returns 200 OK with the full prediction result once the Celery
    ``predict_match`` task completes (up to ``_SYNC_TIMEOUT`` seconds).

    Returns 404 if ``match_id`` is not in the current batch feature output.
    Use ``?stage=challenger`` to target the challenger model.
    """
    _validate_stage(stage)
    features = lookup.get_features(match_id)
    if features is None:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=(
                f"match_id={match_id} not found in precomputed features. "
                "Ensure the batch_inference DVC stage has been run."
            ),
        )
    fca = lookup.features_computed_at
    task = predict_match_task.apply_async(
        args=[match_id, features, fca.isoformat() if fca else None, stage],
        queue="ml",
    )
    PREDICTION_COUNT.labels(source="sync").inc()
    _t0 = time.perf_counter()
    result = _poll_task_result(task)
    PREDICTION_LATENCY.observe(time.perf_counter() - _t0)
    return JSONResponse(status_code=status.HTTP_200_OK, content=result)

model_info(stage, request)

Submit a model-info task and wait synchronously for the result.

Returns 200 OK with MLflow model metadata once the Celery get_model_info task completes (up to _SYNC_TIMEOUT seconds). The resolved result matches the ModelInfoResponse schema.

Results are cached in-process for _MODEL_INFO_TTL seconds per stage to avoid repeatedly dispatching Celery tasks for static metadata.

Source code in src/app/routers/predict.py
@router.get(
    "/model/info",
    status_code=status.HTTP_200_OK,
    response_model=None,
    summary="Retrieve MLflow model metadata (synchronous, waits for result)",
)
def model_info(
    stage: StageDep,
    request: Request,
) -> JSONResponse:
    """Submit a model-info task and wait synchronously for the result.

    Returns 200 OK with MLflow model metadata once the Celery
    ``get_model_info`` task completes (up to ``_SYNC_TIMEOUT`` seconds).
    The resolved result matches the ``ModelInfoResponse`` schema.

    Results are cached in-process for ``_MODEL_INFO_TTL`` seconds per stage
    to avoid repeatedly dispatching Celery tasks for static metadata.
    """
    _validate_stage(stage)
    now = time.monotonic()
    cached = _model_info_cache.get(stage)
    if cached is not None and (now - cached[1]) < _MODEL_INFO_TTL:
        return JSONResponse(status_code=status.HTTP_200_OK, content=cached[0])
    task = get_model_info_task.apply_async(args=[stage], queue="ml")
    result = _poll_task_result(task)
    _model_info_cache[stage] = (result, now)
    return JSONResponse(status_code=status.HTTP_200_OK, content=result)

get_drift_status()

Return the latest feature drift summary and refresh the Prometheus gauge.

Reads reports/drift/latest.json from the local filesystem first (DVC-managed runs), then falls back to MinIO (Kubernetes / Airflow runs).

Source code in src/app/routers/monitoring.py
@router.get("/drift")
def get_drift_status() -> dict:
    """Return the latest feature drift summary and refresh the Prometheus gauge.

    Reads ``reports/drift/latest.json`` from the local filesystem first
    (DVC-managed runs), then falls back to MinIO (Kubernetes / Airflow runs).
    """
    payload: dict | None = None

    # 1. Try local file (DVC / dev).
    if _DRIFT_REPORT_PATH.exists():
        try:
            with open(_DRIFT_REPORT_PATH) as f:
                payload = json.load(f)
        except (OSError, json.JSONDecodeError) as exc:
            logger.exception("Failed to read local drift report: %s", exc)

    # 2. Fall back to MinIO.
    if payload is None:
        settings = get_settings()
        bucket = settings.minio.bucket_predictions
        if bucket:
            try:
                s3 = create_client_s3()
                obj = s3.get_object(Bucket=bucket, Key=_DRIFT_MINIO_KEY)
                payload = json.loads(obj["Body"].read())
            except ClientError as exc:
                code = exc.response.get("Error", {}).get("Code", "")
                if code not in ("NoSuchKey", "404"):
                    logger.exception("Failed to read drift report from MinIO: %s", exc)

    if payload is None:
        return {"drift_score": None, "message": "Drift report not yet available."}

    score = payload.get("drift_score")
    if score is not None:
        DRIFT_SCORE.set(float(score))

    return payload

get_queue_stats()

Return active/scheduled/reserved task counts and worker stats.

Source code in src/app/routers/monitoring.py
@router.get("/celery/queues")
def get_queue_stats() -> dict:
    """Return active/scheduled/reserved task counts and worker stats."""
    inspect = celery_app.control.inspect(timeout=_INSPECT_TIMEOUT)
    return {
        "active": inspect.active(),
        "scheduled": inspect.scheduled(),
        "reserved": inspect.reserved(),
        "stats": inspect.stats(),
    }

get_workers()

Return active queues and ping status for all connected workers.

Source code in src/app/routers/monitoring.py
@router.get("/celery/workers")
def get_workers() -> dict:
    """Return active queues and ping status for all connected workers."""
    inspect = celery_app.control.inspect(timeout=_PING_TIMEOUT)
    return {
        "active_workers": inspect.active_queues(),
        "ping": inspect.ping(),
    }

get_task_status(task_id)

Return the current status and result for a Celery task.

Parameters:

Name Type Description Default
task_id str

Celery task UUID returned when the task was submitted.

required

Returns:

Type Description
dict

Dict with task_id, status, and result keys.

dict

result is None while the task is pending.

Source code in src/app/routers/monitoring.py
@router.get("/task_status/{task_id}")
def get_task_status(task_id: str) -> dict:
    """Return the current status and result for a Celery task.

    Args:
        task_id: Celery task UUID returned when the task was submitted.

    Returns:
        Dict with ``task_id``, ``status``, and ``result`` keys.
        ``result`` is ``None`` while the task is pending.
    """
    from celery.result import AsyncResult

    task_result = AsyncResult(task_id, app=celery_app)
    try:
        result = task_result.result if task_result.ready() else None
        if isinstance(result, BaseException):
            result = str(result)
    except Exception as exc:
        result = str(exc)
    return {
        "task_id": task_id,
        "status": task_result.status,
        "result": result,
    }

list_evidently_reports()

List all Evidently HTML reports stored in MinIO, newest first.

Returns a list of dicts with filename, last_modified, and url (presigned, valid for 1 hour).

Source code in src/app/routers/monitoring.py
@router.get("/reports")
def list_evidently_reports() -> list[dict]:
    """List all Evidently HTML reports stored in MinIO, newest first.

    Returns a list of dicts with ``filename``, ``last_modified``, and
    ``url`` (presigned, valid for 1 hour).
    """
    settings = get_settings()
    bucket = settings.minio.bucket_predictions
    if not bucket:
        raise HTTPException(
            status_code=503, detail="MINIO_BUCKET_PREDICTIONS not configured."
        )

    s3 = create_client_s3()
    try:
        paginator = s3.get_paginator("list_objects_v2")
        pages = paginator.paginate(Bucket=bucket, Prefix=_EVIDENTLY_PREFIX)
        objects = [
            obj
            for page in pages
            for obj in page.get("Contents", [])
            if obj["Key"].endswith(".html")
        ]
    except ClientError as exc:
        logger.exception("Failed to list Evidently reports from MinIO: %s", exc)
        raise HTTPException(
            status_code=500, detail="Failed to list reports from MinIO."
        )

    objects.sort(key=lambda o: o["LastModified"], reverse=True)

    reports = []
    for obj in objects:
        key = obj["Key"]
        filename = key.removeprefix(_EVIDENTLY_PREFIX)
        try:
            url = s3.generate_presigned_url(
                "get_object",
                Params={"Bucket": bucket, "Key": key},
                ExpiresIn=_REPORT_URL_EXPIRY,
            )
        except ClientError as exc:
            logger.warning("Could not generate presigned URL for %s: %s", key, exc)
            url = None
        reports.append(
            {
                "filename": filename,
                "last_modified": obj["LastModified"].isoformat(),
                "url": url,
            }
        )

    return reports

open_evidently_report(filename)

Redirect to a presigned URL for an Evidently HTML report.

Opens the report directly in the browser when accessed as a link.

Parameters:

Name Type Description Default
filename str

Report filename, e.g. drift_2026-05-26.html.

required
Source code in src/app/routers/monitoring.py
@router.get("/reports/{filename}")
def open_evidently_report(filename: str) -> RedirectResponse:
    """Redirect to a presigned URL for an Evidently HTML report.

    Opens the report directly in the browser when accessed as a link.

    Args:
        filename: Report filename, e.g. ``drift_2026-05-26.html``.
    """
    if not filename.endswith(".html"):
        raise HTTPException(
            status_code=400, detail="Only .html report files are supported."
        )

    settings = get_settings()
    bucket = settings.minio.bucket_predictions
    if not bucket:
        raise HTTPException(
            status_code=503, detail="MINIO_BUCKET_PREDICTIONS not configured."
        )

    key = f"{_EVIDENTLY_PREFIX}{filename}"
    s3 = create_client_s3()
    try:
        url = s3.generate_presigned_url(
            "get_object",
            Params={"Bucket": bucket, "Key": key},
            ExpiresIn=_REPORT_URL_EXPIRY,
        )
    except ClientError as exc:
        logger.exception("Failed to generate presigned URL for %s: %s", key, exc)
        raise HTTPException(
            status_code=404, detail=f"Report '{filename}' not found or inaccessible."
        )

    return RedirectResponse(url=url, status_code=302)

get_livescores(year=Query(default=None, ge=1998, le=2100), month=Query(default=None, ge=1, le=12), limit=Query(default=None, ge=1, le=10000), offset=Query(default=0, ge=0))

Return all matches filtered by year and optionally month.

Parameters:

Name Type Description Default
year int

Calendar year to filter by. Defaults to the current year.

Query(default=None, ge=1998, le=2100)
month int

Month (1–12) to filter within year. When omitted the full year is returned.

Query(default=None, ge=1, le=12)
limit int

Maximum number of rows to return.

Query(default=None, ge=1, le=10000)
offset int

Number of rows to skip (for pagination).

Query(default=0, ge=0)

Returns:

Type Description
List of

class:MatchRawLive objects ordered by

list[MatchRawLive]

startTimeUtc descending.

Source code in src/app/routers/livescores.py
@router.get("/", response_model=list[MatchRawLive])
def get_livescores(
    year: int = Query(default=None, ge=1998, le=2100),
    month: int = Query(default=None, ge=1, le=12),
    limit: int = Query(default=None, ge=1, le=10000),
    offset: int = Query(default=0, ge=0),
) -> list[MatchRawLive]:
    """Return all matches filtered by year and optionally month.

    Args:
        year: Calendar year to filter by.  Defaults to the current year.
        month: Month (1–12) to filter within *year*.  When omitted the
            full year is returned.
        limit: Maximum number of rows to return.
        offset: Number of rows to skip (for pagination).

    Returns:
        List of :class:`MatchRawLive` objects ordered by
        ``startTimeUtc`` descending.
    """
    now = datetime.now(timezone.utc)
    y = year or now.year

    if month:
        date_from = datetime(y, month, 1, tzinfo=timezone.utc)
        date_to = datetime(
            y, month, monthrange(y, month)[1], 23, 59, 59, tzinfo=timezone.utc
        )
    else:
        date_from = datetime(y, 1, 1, tzinfo=timezone.utc)
        date_to = datetime(y, 12, 31, 23, 59, 59, tzinfo=timezone.utc)

    with Session(engine) as session:
        stmt = (
            select(*_LIVE_COLS)
            .where(col(MatchRaw.startTimeUtc) >= date_from)
            .where(col(MatchRaw.startTimeUtc) <= date_to)
            .order_by(col(MatchRaw.startTimeUtc).desc())
            .offset(offset)
            .limit(limit)
        )
        rows = session.exec(stmt).all()

    return [MatchRawLive(**dict(zip(_LIVE_KEYS, row))) for row in rows]

healthcheck() async

Healthcheck endpoint for Kubernetes probes. Returns information about the application state.

Returns:

Type Description

HealthCheckResponse with status, version, worker PID, and

current process memory usage in MB.

Source code in src/app/routers/healthcheck.py
@router.get(
    "/healthcheck/",
    response_model=HealthCheckResponse,
    status_code=status.HTTP_200_OK,
    tags=["healthcheck"],
)
async def healthcheck():
    """
    Healthcheck endpoint for Kubernetes probes.
    Returns information about the application state.

    Returns:
        HealthCheckResponse with status, version, worker PID, and
        current process memory usage in MB.
    """
    return HealthCheckResponse(
        status="healthy",
        version=os.getenv("APP_VERSION", "unknown"),
        worker_id=os.getpid(),
        memory_usage=psutil.Process().memory_info().rss / 1024 / 1024,  # MB
        database=_check_database(),
    )

Services

FeatureLookupService

Load precomputed features for all matches from the batch inference output.

The parquet file is produced by the batch_inference DVC stage and has the match id as its index. It contains both upcoming matches and finished matches (with outcome_1x2, homeScore, awayScore). The service is loaded lazily on the first call and cached in-process.

Parameters:

Name Type Description Default
features_path Path | None

Absolute path to match_features.parquet.

None
Source code in src/app/services/predict.py
class FeatureLookupService:
    """Load precomputed features for all matches from the batch inference output.

    The parquet file is produced by the ``batch_inference`` DVC stage and has
    the match ``id`` as its index.  It contains both upcoming matches and
    finished matches (with ``outcome_1x2``, ``homeScore``, ``awayScore``).
    The service is loaded lazily on the first call and cached in-process.

    Args:
        features_path: Absolute path to ``match_features.parquet``.
    """

    def __init__(self, features_path: Path | None = None) -> None:
        """Initialise with an optional override for the features parquet path."""
        self._path = features_path or (
            DATA_PREDICTIONS_PATH / _FUTURE_FEATURES_FILENAME
        )
        self._df: pd.DataFrame | None = None
        # UTC timestamp of the loaded data: local st_mtime or MinIO LastModified.
        self._mtime: float | None = None
        # monotonic time of the last MinIO head_object check (avoids per-request calls).
        self._last_minio_check: float = 0.0

    # ------------------------------------------------------------------
    # MinIO helpers
    # ------------------------------------------------------------------

    def _minio_last_modified(self) -> float | None:
        """Return the MinIO LastModified timestamp for the features object, or None."""
        bucket = get_minio_settings().bucket_predictions
        if not bucket:
            return None
        try:
            client = boto3.client(
                "s3",
                endpoint_url=get_minio_settings().endpoint_url,
                aws_access_key_id=get_minio_settings().access_key,
                aws_secret_access_key=get_minio_settings().secret_key,
            )
            resp = client.head_object(Bucket=bucket, Key=_MINIO_PREDICTIONS_KEY)
            return resp["LastModified"].timestamp()
        except Exception as exc:  # noqa: BLE001
            logger.debug("MinIO head_object failed: %s", exc)
            return None

    def _load_from_minio(self) -> pd.DataFrame:
        """Read future_features.parquet directly from MinIO into memory."""
        bucket = get_minio_settings().bucket_predictions
        s3_url = f"s3://{bucket}/{_MINIO_PREDICTIONS_KEY}"
        try:
            df = pd.read_parquet(
                s3_url, storage_options=get_minio_settings().storage_options
            )
            logger.info(
                "Loaded future features from MinIO (%s): %d matches", s3_url, len(df)
            )
            return df
        except Exception:  # noqa: BLE001
            logger.exception(
                "Failed to load future_features.parquet from MinIO (%s)", s3_url
            )
            return pd.DataFrame()

    # ------------------------------------------------------------------
    # Core load logic
    # ------------------------------------------------------------------

    def _load(self) -> pd.DataFrame:
        # ── Local file (dev / Airflow server) ────────────────────────────
        if self._path.exists():
            current_mtime = self._path.stat().st_mtime
            if self._df is not None and current_mtime == self._mtime:
                return self._df
            self._df = pd.read_parquet(self._path)
            self._mtime = current_mtime
            logger.info(
                "Loaded future features from local: %d matches (mtime=%s)",
                len(self._df),
                datetime.fromtimestamp(current_mtime, tz=timezone.utc).isoformat(),
            )
            return self._df

        # ── MinIO (production K8s — no local data volume) ─────────────────
        now = time.monotonic()
        should_check = (now - self._last_minio_check) >= _FEATURE_CACHE_CHECK_INTERVAL
        if self._df is not None and not should_check:
            return self._df  # return cached; not time to re-check yet

        self._last_minio_check = now
        minio_mtime = self._minio_last_modified()

        if minio_mtime is None:
            if self._df is not None:
                logger.warning("MinIO unreachable — serving stale feature cache")
                return self._df
            logger.warning(
                "future_features.parquet not found locally or in MinIO — "
                "run the batch_inference DVC stage and ensure "
                "MINIO_BUCKET_PREDICTIONS is configured."
            )
            self._df = pd.DataFrame()
            self._mtime = None
            return self._df

        if self._df is not None and minio_mtime == self._mtime:
            return self._df  # MinIO file unchanged

        self._df = self._load_from_minio()
        self._mtime = minio_mtime
        return self._df

    def get_features(self, match_id: int) -> dict | None:
        """Return the feature dict for *match_id*, or ``None`` if not found.

        Args:
            match_id: Integer match identifier (index of the parquet file).

        Returns:
            Dict of feature name → value with NaN entries removed,
            or ``None`` when *match_id* is not in the features file.
        """
        df = self._load()
        if df.empty or match_id not in df.index:
            return None
        row = df.loc[match_id]
        # Convert to plain Python dict; drop NaN values to avoid serialisation issues
        return {k: v for k, v in row.to_dict().items() if pd.notna(v)}

    def list_matches(self) -> list[dict]:
        """Return a lightweight list of upcoming matches for UI display.

        Returns:
            List of dicts with ``match_id``, ``startTimeUtc``,
            ``homeTeamName``, ``awayTeamName``, ``homeTeamId``, and
            ``awayTeamId`` columns — filtered to unplayed matches only.
        """
        df = self._load()
        if df.empty:
            return []
        # Keep only matches that have not been played yet (no recorded outcome).
        if "outcome_1x2" in df.columns:
            df = df[df["outcome_1x2"].isna()]
        display_cols = [
            c
            for c in (
                "startTimeUtc",
                "homeTeamName",
                "awayTeamName",
                "homeTeamId",
                "awayTeamId",
            )
            if c in df.columns
        ]
        df_display = df[display_cols].copy()
        df_display.index.name = "match_id"
        return df_display.reset_index().to_dict(orient="records")

    @property
    def features_computed_at(self) -> datetime | None:
        """Return the UTC datetime when the feature file was last written.

        Corresponds to the last ``batch_inference`` DVC stage run.

        Returns:
            UTC-aware ``datetime`` derived from the file's ``mtime``,
            or ``None`` when no file has been loaded yet.
        """
        self._load()
        if self._mtime is None:
            return None
        return datetime.fromtimestamp(self._mtime, tz=timezone.utc)

features_computed_at property

Return the UTC datetime when the feature file was last written.

Corresponds to the last batch_inference DVC stage run.

Returns:

Type Description
datetime | None

UTC-aware datetime derived from the file's mtime,

datetime | None

or None when no file has been loaded yet.

get_features(match_id)

Return the feature dict for match_id, or None if not found.

Parameters:

Name Type Description Default
match_id int

Integer match identifier (index of the parquet file).

required

Returns:

Type Description
dict | None

Dict of feature name → value with NaN entries removed,

dict | None

or None when match_id is not in the features file.

Source code in src/app/services/predict.py
def get_features(self, match_id: int) -> dict | None:
    """Return the feature dict for *match_id*, or ``None`` if not found.

    Args:
        match_id: Integer match identifier (index of the parquet file).

    Returns:
        Dict of feature name → value with NaN entries removed,
        or ``None`` when *match_id* is not in the features file.
    """
    df = self._load()
    if df.empty or match_id not in df.index:
        return None
    row = df.loc[match_id]
    # Convert to plain Python dict; drop NaN values to avoid serialisation issues
    return {k: v for k, v in row.to_dict().items() if pd.notna(v)}

list_matches()

Return a lightweight list of upcoming matches for UI display.

Returns:

Type Description
list[dict]

List of dicts with match_id, startTimeUtc,

list[dict]

homeTeamName, awayTeamName, homeTeamId, and

list[dict]

awayTeamId columns — filtered to unplayed matches only.

Source code in src/app/services/predict.py
def list_matches(self) -> list[dict]:
    """Return a lightweight list of upcoming matches for UI display.

    Returns:
        List of dicts with ``match_id``, ``startTimeUtc``,
        ``homeTeamName``, ``awayTeamName``, ``homeTeamId``, and
        ``awayTeamId`` columns — filtered to unplayed matches only.
    """
    df = self._load()
    if df.empty:
        return []
    # Keep only matches that have not been played yet (no recorded outcome).
    if "outcome_1x2" in df.columns:
        df = df[df["outcome_1x2"].isna()]
    display_cols = [
        c
        for c in (
            "startTimeUtc",
            "homeTeamName",
            "awayTeamName",
            "homeTeamId",
            "awayTeamId",
        )
        if c in df.columns
    ]
    df_display = df[display_cols].copy()
    df_display.index.name = "match_id"
    return df_display.reset_index().to_dict(orient="records")

PredictionLookupService

Serve precomputed batch predictions from the batch_inference DVC stage output.

Mirrors the FeatureLookupService caching pattern: - Checks local file first (dev / CI). - Falls back to MinIO with a configurable re-check interval.

The parquet file is indexed by match id and must contain columns: proba_home, proba_draw, proba_away, predicted_class, predicted_label, optionally is_future, startTimeUtc, homeTeamName, awayTeamName, model_run_id, model_stage.

Source code in src/app/services/predict.py
class PredictionLookupService:
    """Serve precomputed batch predictions from the batch_inference DVC stage output.

    Mirrors the ``FeatureLookupService`` caching pattern:
    - Checks local file first (dev / CI).
    - Falls back to MinIO with a configurable re-check interval.

    The parquet file is indexed by match ``id`` and must contain columns:
    ``proba_home``, ``proba_draw``, ``proba_away``, ``predicted_class``,
    ``predicted_label``, optionally ``is_future``, ``startTimeUtc``,
    ``homeTeamName``, ``awayTeamName``, ``model_run_id``, ``model_stage``.
    """

    def __init__(self, predictions_path: Path | None = None) -> None:
        """Initialise with an optional override for the predictions parquet path."""
        self._path = predictions_path or (DATA_PREDICTIONS_PATH / _PREDICTIONS_FILENAME)
        self._df: pd.DataFrame | None = None
        self._mtime: float | None = None
        self._last_minio_check: float = 0.0

    # ------------------------------------------------------------------
    # MinIO helpers
    # ------------------------------------------------------------------

    def _minio_last_modified(self) -> float | None:
        """Return the MinIO LastModified for the predictions object, or None."""
        bucket = get_minio_settings().bucket_predictions
        if not bucket:
            return None
        try:
            client = boto3.client(
                "s3",
                endpoint_url=get_minio_settings().endpoint_url,
                aws_access_key_id=get_minio_settings().access_key,
                aws_secret_access_key=get_minio_settings().secret_key,
            )
            resp = client.head_object(Bucket=bucket, Key=_MINIO_BATCH_PREDICTIONS_KEY)
            return resp["LastModified"].timestamp()
        except Exception as exc:  # noqa: BLE001
            logger.debug("MinIO head_object (predictions) failed: %s", exc)
            return None

    def _load_from_minio(self) -> pd.DataFrame:
        """Read predictions.parquet directly from MinIO into memory."""
        bucket = get_minio_settings().bucket_predictions
        s3_url = f"s3://{bucket}/{_MINIO_BATCH_PREDICTIONS_KEY}"
        try:
            df = pd.read_parquet(
                s3_url, storage_options=get_minio_settings().storage_options
            )
            logger.info(
                "Loaded batch predictions from MinIO (%s): %d rows", s3_url, len(df)
            )
            return df
        except Exception:  # noqa: BLE001
            logger.exception(
                "Failed to load predictions.parquet from MinIO (%s)", s3_url
            )
            return pd.DataFrame()

    # ------------------------------------------------------------------
    # Core load logic
    # ------------------------------------------------------------------

    def _load(self) -> pd.DataFrame:
        # ── Local file (dev / CI) ─────────────────────────────────────────
        if self._path.exists():
            current_mtime = self._path.stat().st_mtime
            if self._df is not None and current_mtime == self._mtime:
                return self._df
            self._df = pd.read_parquet(self._path)
            self._mtime = current_mtime
            logger.info(
                "Loaded batch predictions from local: %d rows (mtime=%s)",
                len(self._df),
                datetime.fromtimestamp(current_mtime, tz=timezone.utc).isoformat(),
            )
            return self._df

        # ── MinIO (production K8s) ────────────────────────────────────────
        now = time.monotonic()
        should_check = (
            now - self._last_minio_check
        ) >= _PREDICTIONS_CACHE_CHECK_INTERVAL
        if self._df is not None and not should_check:
            return self._df

        self._last_minio_check = now
        minio_mtime = self._minio_last_modified()

        if minio_mtime is None:
            if self._df is not None:
                logger.warning("MinIO unreachable — serving stale predictions cache")
                return self._df
            logger.warning(
                "predictions.parquet not found locally or in MinIO — "
                "run the batch_inference DVC stage and ensure "
                "MINIO_BUCKET_PREDICTIONS is configured."
            )
            self._df = pd.DataFrame()
            self._mtime = None
            return self._df

        if self._df is not None and minio_mtime == self._mtime:
            return self._df

        self._df = self._load_from_minio()
        self._mtime = minio_mtime
        return self._df

    def get_prediction(self, match_id: int) -> dict | None:
        """Return the prediction dict for *match_id*, or ``None`` if not found.

        Args:
            match_id: Integer match identifier (index of the parquet file).

        Returns:
            Dict of column name → value with NaN entries removed,
            or ``None`` when *match_id* is not in the predictions file.
        """
        df = self._load()
        if df.empty or match_id not in df.index:
            return None
        row = df.loc[match_id]
        return {k: v for k, v in row.to_dict().items() if pd.notna(v)}

    def list_matches(self) -> list[dict]:
        """Return all prediction rows as a list of dicts for diagnostics.

        Returns:
            List of dicts with ``match_id`` and all prediction columns.
            Empty list when no predictions file has been loaded.
        """
        df = self._load()
        if df.empty:
            return []
        return df.reset_index().to_dict(orient="records")

    @property
    def predictions_computed_at(self) -> datetime | None:
        """Return the UTC datetime when predictions.parquet was last written.

        Returns:
            UTC-aware ``datetime`` derived from the file's ``mtime``,
            or ``None`` when no file has been loaded yet.
        """
        self._load()
        if self._mtime is None:
            return None
        return datetime.fromtimestamp(self._mtime, tz=timezone.utc)

predictions_computed_at property

Return the UTC datetime when predictions.parquet was last written.

Returns:

Type Description
datetime | None

UTC-aware datetime derived from the file's mtime,

datetime | None

or None when no file has been loaded yet.

get_prediction(match_id)

Return the prediction dict for match_id, or None if not found.

Parameters:

Name Type Description Default
match_id int

Integer match identifier (index of the parquet file).

required

Returns:

Type Description
dict | None

Dict of column name → value with NaN entries removed,

dict | None

or None when match_id is not in the predictions file.

Source code in src/app/services/predict.py
def get_prediction(self, match_id: int) -> dict | None:
    """Return the prediction dict for *match_id*, or ``None`` if not found.

    Args:
        match_id: Integer match identifier (index of the parquet file).

    Returns:
        Dict of column name → value with NaN entries removed,
        or ``None`` when *match_id* is not in the predictions file.
    """
    df = self._load()
    if df.empty or match_id not in df.index:
        return None
    row = df.loc[match_id]
    return {k: v for k, v in row.to_dict().items() if pd.notna(v)}

list_matches()

Return all prediction rows as a list of dicts for diagnostics.

Returns:

Type Description
list[dict]

List of dicts with match_id and all prediction columns.

list[dict]

Empty list when no predictions file has been loaded.

Source code in src/app/services/predict.py
def list_matches(self) -> list[dict]:
    """Return all prediction rows as a list of dicts for diagnostics.

    Returns:
        List of dicts with ``match_id`` and all prediction columns.
        Empty list when no predictions file has been loaded.
    """
    df = self._load()
    if df.empty:
        return []
    return df.reset_index().to_dict(orient="records")

FonbetOddsService

Serve Fonbet 1X2 odds from fonbet_odds.parquet in the data-raw MinIO bucket.

Produced by the fetch_fonbet_odds pipeline stage. Indexed by match_id. Follows the same lazy-load + interval-based refresh pattern as :class:PredictionLookupService.

Source code in src/app/services/predict.py
class FonbetOddsService:
    """Serve Fonbet 1X2 odds from ``fonbet_odds.parquet``
    in the data-raw MinIO bucket.

    Produced by the ``fetch_fonbet_odds`` pipeline stage.  Indexed by
    ``match_id``.  Follows the same lazy-load + interval-based refresh
    pattern as :class:`PredictionLookupService`.
    """

    def __init__(self) -> None:
        """Initialise the service with an empty cache."""
        self._df: pd.DataFrame | None = None
        self._mtime: float | None = None
        self._last_check: float = 0.0

    def _minio_last_modified(self) -> float | None:
        try:
            client = boto3.client(
                "s3",
                endpoint_url=get_minio_settings().endpoint_url,
                aws_access_key_id=get_minio_settings().access_key,
                aws_secret_access_key=get_minio_settings().secret_key,
            )
            resp = client.head_object(
                Bucket=get_minio_settings().bucket_data_raw, Key=_FONBET_ODDS_KEY
            )
            return resp["LastModified"].timestamp()
        except Exception as exc:  # noqa: BLE001
            logger.debug("MinIO head_object (fonbet_odds) failed: %s", exc)
            return None

    def _load_from_minio(self) -> pd.DataFrame:
        bucket = get_minio_settings().bucket_data_raw
        s3_url = f"s3://{bucket}/{_FONBET_ODDS_KEY}"
        try:
            df = pd.read_parquet(
                s3_url, storage_options=get_minio_settings().storage_options
            )
            logger.info("Loaded Fonbet odds from MinIO: %d rows", len(df))
            return df.set_index("match_id")
        except Exception:  # noqa: BLE001
            logger.exception(
                "Failed to load fonbet_odds.parquet from MinIO (%s)", s3_url
            )
            return pd.DataFrame()

    def _load(self) -> pd.DataFrame:
        now = time.monotonic()
        should_check = (now - self._last_check) >= _ODDS_CACHE_CHECK_INTERVAL
        if self._df is not None and not should_check:
            return self._df
        self._last_check = now
        minio_mtime = self._minio_last_modified()
        if minio_mtime is None:
            if self._df is not None:
                logger.warning("MinIO unreachable — serving stale Fonbet odds cache")
                return self._df
            logger.warning(
                "fonbet_odds.parquet not found in MinIO — "
                "ensure the fetch_fonbet_odds stage has been run."
            )
            self._df = pd.DataFrame()
            self._mtime = None
            return self._df
        if self._df is not None and minio_mtime == self._mtime:
            return self._df
        self._df = self._load_from_minio()
        self._mtime = minio_mtime
        return self._df

    def list_odds(self) -> list[dict]:
        """Return 1X2 odds for all matches.

        Returns:
            List of dicts with ``match_id``, ``odd_home``, ``odd_draw``,
            and ``odd_away`` keys.  Non-finite float values are replaced
            with ``None`` for JSON safety.
        """
        import math

        df = self._load()
        if df.empty:
            return []
        cols = [c for c in ("odd_home", "odd_draw", "odd_away") if c in df.columns]
        records = df[cols].reset_index().to_dict(orient="records")
        # Replace NaN / inf floats with None for JSON-safe serialization.
        return [
            {
                k: (None if isinstance(v, float) and not math.isfinite(v) else v)
                for k, v in row.items()
            }
            for row in records
        ]

list_odds()

Return 1X2 odds for all matches.

Returns:

Type Description
list[dict]

List of dicts with match_id, odd_home, odd_draw,

list[dict]

and odd_away keys. Non-finite float values are replaced

list[dict]

with None for JSON safety.

Source code in src/app/services/predict.py
def list_odds(self) -> list[dict]:
    """Return 1X2 odds for all matches.

    Returns:
        List of dicts with ``match_id``, ``odd_home``, ``odd_draw``,
        and ``odd_away`` keys.  Non-finite float values are replaced
        with ``None`` for JSON safety.
    """
    import math

    df = self._load()
    if df.empty:
        return []
    cols = [c for c in ("odd_home", "odd_draw", "odd_away") if c in df.columns]
    records = df[cols].reset_index().to_dict(orient="records")
    # Replace NaN / inf floats with None for JSON-safe serialization.
    return [
        {
            k: (None if isinstance(v, float) and not math.isfinite(v) else v)
            for k, v in row.items()
        }
        for row in records
    ]

MatchCardService

Merged match cards: precomputed predictions + Fonbet odds in one place.

Merges predictions.parquet (via :class:PredictionLookupService) with fonbet_odds.parquet (via :class:FonbetOddsService) on match_id and holds the result in memory. Rebuilds when either source reports a new _mtime value — so it piggy-backs on their existing cache logic without adding extra MinIO calls.

Source code in src/app/services/predict.py
class MatchCardService:
    """Merged match cards: precomputed predictions + Fonbet odds in one place.

    Merges ``predictions.parquet`` (via :class:`PredictionLookupService`) with
    ``fonbet_odds.parquet`` (via :class:`FonbetOddsService`) on ``match_id``
    and holds the result in memory.  Rebuilds when either source reports a new
    ``_mtime`` value — so it piggy-backs on their existing cache logic without
    adding extra MinIO calls.
    """

    def __init__(
        self,
        pred_svc: PredictionLookupService,
        odds_svc: FonbetOddsService,
    ) -> None:
        """Initialise with injected prediction and odds service instances."""
        self._pred = pred_svc
        self._odds = odds_svc
        self._df: pd.DataFrame | None = None
        self._pred_mtime: float | None = None
        self._odds_mtime: float | None = None

    def _needs_rebuild(self) -> bool:
        return (
            self._df is None
            or self._pred._mtime != self._pred_mtime
            or self._odds._mtime != self._odds_mtime
        )

    def _build(self) -> pd.DataFrame:
        import math

        pred_df = self._pred._load()
        odds_df = self._odds._load()

        if odds_df.empty:
            self._df = pd.DataFrame()
            self._pred_mtime = self._pred._mtime
            self._odds_mtime = self._odds._mtime
            return self._df

        # Use Fonbet odds as the primary source so that ALL matches with odds
        # are shown — even those not yet present in predictions.parquet (e.g.
        # newly linked matches when the inference pipeline hasn't run yet).
        odds_reset = odds_df.reset_index()
        odds_keep = [
            c
            for c in (
                "match_id",
                "fonbet_event_id",
                "fonbet_sport_id",
                "odd_home",
                "odd_draw",
                "odd_away",
            )
            if c in odds_reset.columns
        ]
        df = odds_reset[odds_keep].copy()

        if not pred_df.empty:
            # Bring match_id back as a column from the predictions index.
            pred_reset = pred_df.reset_index()
            # The parquet index may be named 'id' rather than 'match_id'.
            if "match_id" not in pred_reset.columns:
                for candidate in ("id", "index"):
                    if candidate in pred_reset.columns:
                        pred_reset = pred_reset.rename(columns={candidate: "match_id"})
                        break
            # Drop odds columns already present in df to avoid _x/_y suffixes.
            pred_reset = pred_reset.drop(
                columns=[
                    c
                    for c in ("odd_home", "odd_draw", "odd_away")
                    if c in pred_reset.columns
                ]
            )
            df = df.merge(pred_reset, on="match_id", how="left")

        # Build Fonbet deep-link when both IDs are available.
        has_event_id = "fonbet_event_id" in df.columns
        has_sport_id = "fonbet_sport_id" in df.columns
        if has_event_id:

            def _url(row: pd.Series) -> str | None:
                eid = row.get("fonbet_event_id")
                if pd.isna(eid):
                    return None
                if has_sport_id:
                    sid = row.get("fonbet_sport_id")
                    if not pd.isna(sid):
                        return f"{_FONBET_URL_BASE}/{int(sid)}/{int(eid)}"
                return None

            df["fonbet_url"] = df.apply(_url, axis=1)

        # Replace NaN/inf floats with None for JSON safety.
        for col in (
            "odd_home",
            "odd_draw",
            "odd_away",
            "proba_home",
            "proba_draw",
            "proba_away",
        ):
            if col in df.columns:
                df[col] = df[col].apply(
                    lambda v: (
                        None if isinstance(v, float) and not math.isfinite(v) else v
                    )
                )

        self._df = df
        self._pred_mtime = self._pred._mtime
        self._odds_mtime = self._odds._mtime
        return self._df

    def list_cards(self) -> list[dict]:
        """Return merged match cards as a JSON-safe list of dicts.

        Each entry contains prediction probabilities, predicted class, Fonbet
        1X2 odds, and a direct Fonbet URL (when linking pipeline has run with
        the ``fonbet_sport_id`` column).
        """
        if self._needs_rebuild():
            self._build()
        df = self._df
        if df is None or df.empty:
            return []

        import math

        keep = [c for c in _CARD_COLS if c in df.columns]
        result: list[dict] = []
        for row in df[keep].to_dict(orient="records"):
            entry: dict = {}
            for k, v in row.items():
                if v is None:
                    entry[k] = None
                elif hasattr(v, "item"):  # numpy scalar → Python native
                    native = v.item()
                    entry[k] = (
                        None
                        if isinstance(native, float) and not math.isfinite(native)
                        else native
                    )
                elif hasattr(v, "isoformat"):  # pandas Timestamp / datetime
                    entry[k] = v.isoformat()
                elif isinstance(v, float) and not math.isfinite(v):
                    entry[k] = None
                else:
                    entry[k] = v
            result.append(entry)
        return result

list_cards()

Return merged match cards as a JSON-safe list of dicts.

Each entry contains prediction probabilities, predicted class, Fonbet 1X2 odds, and a direct Fonbet URL (when linking pipeline has run with the fonbet_sport_id column).

Source code in src/app/services/predict.py
def list_cards(self) -> list[dict]:
    """Return merged match cards as a JSON-safe list of dicts.

    Each entry contains prediction probabilities, predicted class, Fonbet
    1X2 odds, and a direct Fonbet URL (when linking pipeline has run with
    the ``fonbet_sport_id`` column).
    """
    if self._needs_rebuild():
        self._build()
    df = self._df
    if df is None or df.empty:
        return []

    import math

    keep = [c for c in _CARD_COLS if c in df.columns]
    result: list[dict] = []
    for row in df[keep].to_dict(orient="records"):
        entry: dict = {}
        for k, v in row.items():
            if v is None:
                entry[k] = None
            elif hasattr(v, "item"):  # numpy scalar → Python native
                native = v.item()
                entry[k] = (
                    None
                    if isinstance(native, float) and not math.isfinite(native)
                    else native
                )
            elif hasattr(v, "isoformat"):  # pandas Timestamp / datetime
                entry[k] = v.isoformat()
            elif isinstance(v, float) and not math.isfinite(v):
                entry[k] = None
            else:
                entry[k] = v
        result.append(entry)
    return result

RegionRoiService

Serve regional ROI data from roi_by_region.csv in the predictions bucket.

Written by the live-betting pipeline stage and uploaded to MinIO under analysis/live_betting/roi_by_region.csv. Follows the same lazy-load and interval-based refresh pattern as :class:PredictionLookupService. Returns an empty list when the file has not been produced yet.

Source code in src/app/services/predict.py
class RegionRoiService:
    """Serve regional ROI data from ``roi_by_region.csv`` in the predictions bucket.

    Written by the ``live-betting`` pipeline stage and uploaded to MinIO under
    ``analysis/live_betting/roi_by_region.csv``.  Follows the same lazy-load
    and interval-based refresh pattern as :class:`PredictionLookupService`.
    Returns an empty list when the file has not been produced yet.
    """

    def __init__(self) -> None:
        """Initialise the service with an empty cache."""
        self._df: pd.DataFrame | None = None
        self._mtime: float | None = None
        self._last_check: float = 0.0

    def _minio_last_modified(self) -> float | None:
        bucket = get_minio_settings().bucket_predictions
        if not bucket:
            return None
        try:
            client = boto3.client(
                "s3",
                endpoint_url=get_minio_settings().endpoint_url,
                aws_access_key_id=get_minio_settings().access_key,
                aws_secret_access_key=get_minio_settings().secret_key,
            )
            resp = client.head_object(Bucket=bucket, Key=_REGION_ROI_MINIO_KEY)
            return resp["LastModified"].timestamp()
        except Exception as exc:  # noqa: BLE001
            logger.debug("MinIO head_object (region_roi) failed: %s", exc)
            return None

    def _load_from_minio(self) -> pd.DataFrame:
        bucket = get_minio_settings().bucket_predictions
        s3_url = f"s3://{bucket}/{_REGION_ROI_MINIO_KEY}"
        try:
            df = pd.read_csv(
                s3_url, storage_options=get_minio_settings().storage_options
            )
            logger.info("Loaded region ROI from MinIO: %d rows", len(df))
            return df
        except Exception:  # noqa: BLE001
            logger.exception("Failed to load roi_by_region.csv from MinIO (%s)", s3_url)
            return pd.DataFrame()

    def _load(self) -> pd.DataFrame:
        now = time.monotonic()
        should_check = (now - self._last_check) >= _REGION_ROI_CACHE_CHECK_INTERVAL
        if self._df is not None and not should_check:
            return self._df
        self._last_check = now
        minio_mtime = self._minio_last_modified()
        if minio_mtime is None:
            if self._df is not None:
                logger.warning("MinIO unreachable — serving stale region ROI cache")
                return self._df
            logger.warning(
                "roi_by_region.csv not found in MinIO — "
                "run make live-betting to generate it."
            )
            self._df = pd.DataFrame()
            self._mtime = None
            return self._df
        if self._df is not None and minio_mtime == self._mtime:
            return self._df
        self._df = self._load_from_minio()
        self._mtime = minio_mtime
        return self._df

    def list_region_roi(self) -> list[dict]:
        """Return ROI per region as a JSON-safe list of dicts.

        Returns:
            List of dicts with ``region_name``, ``roi_pct``, ``n_bets``, and
            optionally ``hit_rate`` and ``regionId``.  Empty list when the
            file has not been produced yet.
        """
        import math

        df = self._load()
        if df.empty:
            return []
        keep_cols = [
            c
            for c in ("regionId", "region_name", "roi_pct", "n_bets", "hit_rate")
            if c in df.columns
        ]
        result: list[dict] = []
        for row in df[keep_cols].to_dict(orient="records"):
            entry: dict = {}
            for k, v in row.items():
                if v is None:
                    entry[k] = None
                elif hasattr(v, "item"):  # numpy scalar → Python native
                    native = v.item()
                    entry[k] = (
                        None
                        if isinstance(native, float) and not math.isfinite(native)
                        else native
                    )
                elif isinstance(v, float) and not math.isfinite(v):
                    entry[k] = None
                else:
                    entry[k] = v
            result.append(entry)
        return result

list_region_roi()

Return ROI per region as a JSON-safe list of dicts.

Returns:

Type Description
list[dict]

List of dicts with region_name, roi_pct, n_bets, and

list[dict]

optionally hit_rate and regionId. Empty list when the

list[dict]

file has not been produced yet.

Source code in src/app/services/predict.py
def list_region_roi(self) -> list[dict]:
    """Return ROI per region as a JSON-safe list of dicts.

    Returns:
        List of dicts with ``region_name``, ``roi_pct``, ``n_bets``, and
        optionally ``hit_rate`` and ``regionId``.  Empty list when the
        file has not been produced yet.
    """
    import math

    df = self._load()
    if df.empty:
        return []
    keep_cols = [
        c
        for c in ("regionId", "region_name", "roi_pct", "n_bets", "hit_rate")
        if c in df.columns
    ]
    result: list[dict] = []
    for row in df[keep_cols].to_dict(orient="records"):
        entry: dict = {}
        for k, v in row.items():
            if v is None:
                entry[k] = None
            elif hasattr(v, "item"):  # numpy scalar → Python native
                native = v.item()
                entry[k] = (
                    None
                    if isinstance(native, float) and not math.isfinite(native)
                    else native
                )
            elif isinstance(v, float) and not math.isfinite(v):
                entry[k] = None
            else:
                entry[k] = v
        result.append(entry)
    return result

PredictionService

Loads and serves a model from the MLflow Model Registry.

The model is loaded lazily on the first call to predict and then cached in-process for the lifetime of the worker.

Source code in src/app/services/predict.py
class PredictionService:
    """Loads and serves a model from the MLflow Model Registry.

    The model is loaded lazily on the first call to ``predict`` and then
    cached in-process for the lifetime of the worker.
    """

    def __init__(
        self, tracking_uri: str, model_name: str, model_stage: str = "champion"
    ) -> None:
        """Initialise with MLflow tracking URI, model name, and target stage."""
        self._tracking_uri = tracking_uri
        self._model_name = model_name
        self._model_stage = model_stage
        self._model: Any = None  # mlflow.pyfunc.PyFuncModel, imported lazily
        # Protects the lazy-load critical section from concurrent reads
        # (e.g. gunicorn workers sharing the same process via threads).
        self._lock: threading.Lock = threading.Lock()
        # Redis client initialised lazily; None means "not yet checked".
        self._redis: Any = None
        self._redis_checked: bool = False

    # ------------------------------------------------------------------
    # Internal helpers
    # ------------------------------------------------------------------

    def _load_model(self) -> Any:
        # Fast path: model already loaded (no lock needed after init).
        if self._model is not None:
            return self._model
        # Slow path: double-checked locking prevents duplicate load
        # when multiple threads/workers call predict() simultaneously on startup.
        with self._lock:
            if self._model is None:
                # Deferred import: mlflow is only installed in the ml environment.
                # Must never execute at module-load time (api env has no mlflow).
                import mlflow
                import mlflow.pyfunc

                # MLflow's boto3 client reads these from os.environ at download time.
                # pydantic-settings populates Python objects but does NOT write back
                # to os.environ, so we must do it explicitly here.
                os.environ.setdefault(
                    "MLFLOW_S3_ENDPOINT_URL", get_minio_settings().endpoint_url
                )
                os.environ.setdefault(
                    "AWS_ACCESS_KEY_ID", get_minio_settings().access_key
                )
                os.environ.setdefault(
                    "AWS_SECRET_ACCESS_KEY", get_minio_settings().secret_key
                )
                mlflow.set_tracking_uri(self._tracking_uri)
                # MLflow 3.x: use alias URI (models:/name@alias)
                # Falls back gracefully to stage URI for older MLflow servers
                model_uri = f"models:/{self._model_name}@{self._model_stage}"
                logger.info("Loading model from MLflow: %s", model_uri)
                self._model = mlflow.pyfunc.load_model(model_uri)
                logger.info(
                    "Model loaded. run_id=%s",
                    getattr(self._model.metadata, "run_id", "unknown"),
                )
        return self._model

    def load(self) -> Any:
        """Eagerly load the model. Safe to call multiple times (idempotent).

        Call this during application startup (e.g. FastAPI lifespan or
        Celery ``worker_process_init``) to avoid paying the cold-start
        penalty on the first user request.

        Returns:
            The loaded ``mlflow.pyfunc.PyFuncModel`` instance.
        """
        return self._load_model()

    # ------------------------------------------------------------------
    # Redis prediction cache
    # ------------------------------------------------------------------

    def _get_redis(self) -> Any:
        """Return a connected Redis client or *None* if unavailable.

        Tried once per process; subsequent calls return the cached result
        without retrying so a missing Redis never blocks inference.
        """
        if self._redis_checked:
            return self._redis
        self._redis_checked = True
        try:
            import redis as redis_lib

            url = os.getenv(
                "REDIS_CACHE_URL",
                os.getenv("CELERY_RESULT_BACKEND", "redis://redis:6379/0"),
            )
            client = redis_lib.from_url(
                url, decode_responses=True, socket_connect_timeout=2
            )
            client.ping()
            self._redis = client
            logger.info("Prediction cache: connected to Redis at %s", url)
        except Exception as exc:
            logger.warning("Redis unavailable; prediction caching disabled: %s", exc)
        return self._redis

    def _cache_get(self, key: str) -> dict | None:
        r = self._get_redis()
        if r is None:
            return None
        try:
            value = r.get(key)
            return json.loads(value) if value else None
        except Exception as exc:
            logger.debug("Cache get failed for key=%s: %s", key, exc)
            return None

    def _cache_set(self, key: str, value: dict, ttl: int) -> None:
        r = self._get_redis()
        if r is None:
            return
        try:
            r.setex(key, ttl, json.dumps(value))
        except Exception as exc:
            logger.debug("Cache set failed for key=%s: %s", key, exc)

    def _predict_proba(self, df: pd.DataFrame) -> "Any":  # np.ndarray at runtime
        """Return (N, 3) probability array.

        Handles two common MLflow pyfunc flavours:

        - sklearn Pipeline with ``pyfunc_predict_fn='predict_proba'``:
          ``model.predict()`` returns an (N, 3) probability array.
        - sklearn Pipeline with default pyfunc:
          ``model.predict()`` returns (N,) labels; in that case we fall
          back to the underlying ``python_model``.
        """
        import numpy as np

        model = self._load_model()
        raw = model.predict(df)

        # Already a 2-D probability array
        if hasattr(raw, "ndim") and raw.ndim == 2:
            return raw

        # 1-D label output → try to access the underlying sklearn estimator
        logger.warning(
            "pyfunc.predict() returned 1-D output; attempting predict_proba fallback."
        )
        try:
            sklearn_model = model._model_impl.python_model
            proba = sklearn_model.predict_proba(df)
            return np.asarray(proba)
        except AttributeError:
            # Last resort: one-hot encode the hard labels
            labels = raw.astype(int)
            proba = np.zeros((len(labels), len(_LABEL_ORDER)), dtype=float)
            for i, lbl in enumerate(labels):
                if lbl in _LABEL_ORDER:
                    proba[i, _LABEL_ORDER.index(lbl)] = 1.0
            return proba

    # ------------------------------------------------------------------
    # Public interface
    # ------------------------------------------------------------------

    def get_model_info(self) -> dict:
        """Return model metadata from the MLflow Model Registry.

        Queries the registered model for the configured stage, then fetches
        run metrics and params.  Does NOT require the model to be loaded.

        Returns:
            Dict with ``model_name``, ``stage``, ``version``, ``run_id``,
            ``status``, and nested ``metrics`` / ``params`` sub-dicts.
        """
        import mlflow
        from mlflow.tracking import MlflowClient

        os.environ.setdefault(
            "MLFLOW_S3_ENDPOINT_URL", get_minio_settings().endpoint_url
        )
        os.environ.setdefault("AWS_ACCESS_KEY_ID", get_minio_settings().access_key)
        os.environ.setdefault("AWS_SECRET_ACCESS_KEY", get_minio_settings().secret_key)
        mlflow.set_tracking_uri(self._tracking_uri)

        client = MlflowClient()
        try:
            # MLflow 3.x: look up by alias (e.g. "Staging")
            mv = client.get_model_version_by_alias(self._model_name, self._model_stage)
        except Exception:
            mv = None
        if mv is None:
            return {
                "model_name": self._model_name,
                "stage": self._model_stage,
                "version": "unknown",
                "run_id": "unknown",
                "metrics": {},
                "params": {},
                "feature_names": [],
                "created_at": None,
            }

        run_id = mv.run_id
        if run_id is None:
            return {
                "params": {},
                "feature_names": [],
                "created_at": None,
            }
        run = client.get_run(run_id)
        metrics = dict(run.data.metrics)
        params = dict(run.data.params)

        # Try to retrieve feature names from the input schema of the loaded model
        feature_names: list[str] = []
        try:
            model = self._load_model()
            schema = model.metadata.get_input_schema()
            if schema is not None:
                feature_names = [c.name for c in schema.inputs]
        except Exception:
            logger.debug("Could not retrieve feature names from model schema.")

        return {
            "model_name": self._model_name,
            "stage": self._model_stage,
            "version": mv.version,
            "run_id": run_id,
            "metrics": metrics,
            "params": params,
            "feature_names": feature_names,
            "created_at": str(mv.creation_timestamp) if mv.creation_timestamp else None,
        }

    def predict(
        self,
        features: dict,
        match_id: int | None = None,
        features_computed_at: datetime | None = None,
    ) -> dict:
        """Run inference for a single match.

        Args:
            features: Feature dict matching model input schema.
            match_id: Optional identifier for downstream tracing.
            features_computed_at: UTC timestamp when features were produced
                (batch_inference stage).  Stored in the response for traceability.

        Returns:
            Dict compatible with ``PredictResponse`` schema.
        """
        model = self._load_model()
        run_id = getattr(model.metadata, "run_id", "unknown")
        cache_ttl = int(os.getenv("PREDICTION_CACHE_TTL", "3600"))

        # Cache key is scoped to (match_id, run_id) so it auto-invalidates
        # when the model is updated in the registry.
        cache_key: str | None = (
            f"predict:{match_id}:{run_id}" if match_id is not None else None
        )
        if cache_key:
            cached = self._cache_get(cache_key)
            if cached is not None:
                cached["cached"] = True
                return cached

        df = pd.DataFrame([features])

        input_schema = model.metadata.get_input_schema()

        # Restore columns that were dropped during JSON serialisation (NaN is not
        # JSON-safe so get_features() omits them).  The model's sklearn pipeline
        # handles NaN via its own imputer/transformer; we just need the column to
        # exist so ColumnTransformer does not raise "columns are missing".
        if input_schema is not None:
            for col_spec in input_schema.inputs:
                if col_spec.name not in df.columns:
                    df[col_spec.name] = float("nan")

        # Align column dtypes to the model's MLflow input schema.
        # Parquet round-trips can promote int → float (e.g. when NaNs are present);
        # MLflow's schema enforcement refuses unsafe float→int conversions.
        _MLFLOW_TO_NUMPY: dict[str, str] = {
            "integer": "int32",
            "long": "int64",
            "float": "float32",
            "double": "float64",
        }
        if input_schema is not None:
            for col_spec in input_schema.inputs:
                if col_spec.name in df.columns:
                    target = _MLFLOW_TO_NUMPY.get(col_spec.type.name)
                    if target is not None and str(df[col_spec.name].dtype) != target:
                        df[col_spec.name] = df[col_spec.name].astype(target)
            # Drop columns not in the model signature to avoid MLflow warnings
            # about extra inputs (metadata cols like homeTeamName, startTimeUtc…).
            schema_cols = {c.name for c in input_schema.inputs}
            extra_cols = [c for c in df.columns if c not in schema_cols]
            if extra_cols:
                df = df.drop(columns=extra_cols)

        proba = self._predict_proba(df)

        predicted_idx = int(proba.argmax(axis=1)[0])
        predicted_class = _LABEL_ORDER[predicted_idx]

        result: dict = {
            "match_id": match_id,
            "cached": False,
            "features_computed_at": (
                features_computed_at.isoformat() if features_computed_at else None
            ),
            "prediction": {
                "predicted_class": predicted_class,
                "probabilities": {
                    str(lbl): float(proba[0, i]) for i, lbl in enumerate(_LABEL_ORDER)
                },
                "model_version": self._model_stage,
                "model_run_id": run_id,
            },
        }
        if cache_key:
            self._cache_set(cache_key, result, ttl=cache_ttl)
        return result

load()

Eagerly load the model. Safe to call multiple times (idempotent).

Call this during application startup (e.g. FastAPI lifespan or Celery worker_process_init) to avoid paying the cold-start penalty on the first user request.

Returns:

Type Description
Any

The loaded mlflow.pyfunc.PyFuncModel instance.

Source code in src/app/services/predict.py
def load(self) -> Any:
    """Eagerly load the model. Safe to call multiple times (idempotent).

    Call this during application startup (e.g. FastAPI lifespan or
    Celery ``worker_process_init``) to avoid paying the cold-start
    penalty on the first user request.

    Returns:
        The loaded ``mlflow.pyfunc.PyFuncModel`` instance.
    """
    return self._load_model()

get_model_info()

Return model metadata from the MLflow Model Registry.

Queries the registered model for the configured stage, then fetches run metrics and params. Does NOT require the model to be loaded.

Returns:

Type Description
dict

Dict with model_name, stage, version, run_id,

dict

status, and nested metrics / params sub-dicts.

Source code in src/app/services/predict.py
def get_model_info(self) -> dict:
    """Return model metadata from the MLflow Model Registry.

    Queries the registered model for the configured stage, then fetches
    run metrics and params.  Does NOT require the model to be loaded.

    Returns:
        Dict with ``model_name``, ``stage``, ``version``, ``run_id``,
        ``status``, and nested ``metrics`` / ``params`` sub-dicts.
    """
    import mlflow
    from mlflow.tracking import MlflowClient

    os.environ.setdefault(
        "MLFLOW_S3_ENDPOINT_URL", get_minio_settings().endpoint_url
    )
    os.environ.setdefault("AWS_ACCESS_KEY_ID", get_minio_settings().access_key)
    os.environ.setdefault("AWS_SECRET_ACCESS_KEY", get_minio_settings().secret_key)
    mlflow.set_tracking_uri(self._tracking_uri)

    client = MlflowClient()
    try:
        # MLflow 3.x: look up by alias (e.g. "Staging")
        mv = client.get_model_version_by_alias(self._model_name, self._model_stage)
    except Exception:
        mv = None
    if mv is None:
        return {
            "model_name": self._model_name,
            "stage": self._model_stage,
            "version": "unknown",
            "run_id": "unknown",
            "metrics": {},
            "params": {},
            "feature_names": [],
            "created_at": None,
        }

    run_id = mv.run_id
    if run_id is None:
        return {
            "params": {},
            "feature_names": [],
            "created_at": None,
        }
    run = client.get_run(run_id)
    metrics = dict(run.data.metrics)
    params = dict(run.data.params)

    # Try to retrieve feature names from the input schema of the loaded model
    feature_names: list[str] = []
    try:
        model = self._load_model()
        schema = model.metadata.get_input_schema()
        if schema is not None:
            feature_names = [c.name for c in schema.inputs]
    except Exception:
        logger.debug("Could not retrieve feature names from model schema.")

    return {
        "model_name": self._model_name,
        "stage": self._model_stage,
        "version": mv.version,
        "run_id": run_id,
        "metrics": metrics,
        "params": params,
        "feature_names": feature_names,
        "created_at": str(mv.creation_timestamp) if mv.creation_timestamp else None,
    }

predict(features, match_id=None, features_computed_at=None)

Run inference for a single match.

Parameters:

Name Type Description Default
features dict

Feature dict matching model input schema.

required
match_id int | None

Optional identifier for downstream tracing.

None
features_computed_at datetime | None

UTC timestamp when features were produced (batch_inference stage). Stored in the response for traceability.

None

Returns:

Type Description
dict

Dict compatible with PredictResponse schema.

Source code in src/app/services/predict.py
def predict(
    self,
    features: dict,
    match_id: int | None = None,
    features_computed_at: datetime | None = None,
) -> dict:
    """Run inference for a single match.

    Args:
        features: Feature dict matching model input schema.
        match_id: Optional identifier for downstream tracing.
        features_computed_at: UTC timestamp when features were produced
            (batch_inference stage).  Stored in the response for traceability.

    Returns:
        Dict compatible with ``PredictResponse`` schema.
    """
    model = self._load_model()
    run_id = getattr(model.metadata, "run_id", "unknown")
    cache_ttl = int(os.getenv("PREDICTION_CACHE_TTL", "3600"))

    # Cache key is scoped to (match_id, run_id) so it auto-invalidates
    # when the model is updated in the registry.
    cache_key: str | None = (
        f"predict:{match_id}:{run_id}" if match_id is not None else None
    )
    if cache_key:
        cached = self._cache_get(cache_key)
        if cached is not None:
            cached["cached"] = True
            return cached

    df = pd.DataFrame([features])

    input_schema = model.metadata.get_input_schema()

    # Restore columns that were dropped during JSON serialisation (NaN is not
    # JSON-safe so get_features() omits them).  The model's sklearn pipeline
    # handles NaN via its own imputer/transformer; we just need the column to
    # exist so ColumnTransformer does not raise "columns are missing".
    if input_schema is not None:
        for col_spec in input_schema.inputs:
            if col_spec.name not in df.columns:
                df[col_spec.name] = float("nan")

    # Align column dtypes to the model's MLflow input schema.
    # Parquet round-trips can promote int → float (e.g. when NaNs are present);
    # MLflow's schema enforcement refuses unsafe float→int conversions.
    _MLFLOW_TO_NUMPY: dict[str, str] = {
        "integer": "int32",
        "long": "int64",
        "float": "float32",
        "double": "float64",
    }
    if input_schema is not None:
        for col_spec in input_schema.inputs:
            if col_spec.name in df.columns:
                target = _MLFLOW_TO_NUMPY.get(col_spec.type.name)
                if target is not None and str(df[col_spec.name].dtype) != target:
                    df[col_spec.name] = df[col_spec.name].astype(target)
        # Drop columns not in the model signature to avoid MLflow warnings
        # about extra inputs (metadata cols like homeTeamName, startTimeUtc…).
        schema_cols = {c.name for c in input_schema.inputs}
        extra_cols = [c for c in df.columns if c not in schema_cols]
        if extra_cols:
            df = df.drop(columns=extra_cols)

    proba = self._predict_proba(df)

    predicted_idx = int(proba.argmax(axis=1)[0])
    predicted_class = _LABEL_ORDER[predicted_idx]

    result: dict = {
        "match_id": match_id,
        "cached": False,
        "features_computed_at": (
            features_computed_at.isoformat() if features_computed_at else None
        ),
        "prediction": {
            "predicted_class": predicted_class,
            "probabilities": {
                str(lbl): float(proba[0, i]) for i, lbl in enumerate(_LABEL_ORDER)
            },
            "model_version": self._model_stage,
            "model_run_id": run_id,
        },
    }
    if cache_key:
        self._cache_set(cache_key, result, ttl=cache_ttl)
    return result

Schemas

PredictRequest

Bases: BaseModel

Input features for a single match prediction.

Features must match the model's training schema exactly. The model expects rolling-window difference features (side='diff', window=5) plus a categorical 'sex' column (0=men, 1=women).

Source code in src/app/schemas/predict.py
class PredictRequest(BaseModel):
    """Input features for a single match prediction.

    Features must match the model's training schema exactly.
    The model expects rolling-window difference features (side='diff', window=5)
    plus a categorical 'sex' column (0=men, 1=women).
    """

    match_id: int | None = Field(
        None, description="Optional match identifier for tracing"
    )
    features: dict[str, float | int | None] = Field(
        ...,
        description=(
            "Feature dict matching the model's input schema. "
            "Keys are feature names (e.g. diff_win_5_mean), values are numeric."
        ),
    )

    model_config = {
        "json_schema_extra": {
            "example": {
                "match_id": 12345,
                "features": {
                    "diff_win_5_mean": 0.2,
                    "diff_goals_for_5_mean": 0.4,
                    "diff_goals_against_5_mean": -0.1,
                    "sex": 0,
                },
            }
        }
    }

PredictionResult

Bases: BaseModel

Outcome probabilities and metadata for a single match prediction.

Source code in src/app/schemas/predict.py
class PredictionResult(BaseModel):
    """Outcome probabilities and metadata for a single match prediction."""

    predicted_class: int = Field(
        ..., description="Predicted outcome: 0=home win, 1=draw, 2=away win"
    )
    probabilities: dict[str, float] = Field(
        ..., description="Class probabilities keyed by outcome label"
    )
    model_version: str = Field(..., description="MLflow model stage/version used")
    model_run_id: str = Field(..., description="MLflow run ID for traceability")

PredictResponse

Bases: BaseModel

Full prediction response including match identity and inference metadata.

Source code in src/app/schemas/predict.py
class PredictResponse(BaseModel):
    """Full prediction response including match identity and inference metadata."""

    match_id: int | None
    prediction: PredictionResult
    features_computed_at: datetime | None = Field(
        None,
        description=(
            "UTC timestamp when the inference features were computed "
            "(batch_inference DVC stage). None for ad-hoc POST /predict/ calls."
        ),
    )
    cached: bool = Field(
        False,
        description="True when the result was served from the Redis prediction cache.",
    )

AsyncPredictRequest

Bases: BaseModel

Request body for POST /predict/async/.

Source code in src/app/schemas/predict.py
class AsyncPredictRequest(BaseModel):
    """Request body for POST /predict/async/."""

    match_id: int = Field(..., description="Match ID with precomputed features")

AsyncPredictResponse

Bases: BaseModel

Returned immediately after submitting an async prediction task.

Source code in src/app/schemas/predict.py
class AsyncPredictResponse(BaseModel):
    """Returned immediately after submitting an async prediction task."""

    task_id: str
    status: str = "submitted"
    status_url: str = Field(..., description="Poll this URL to get the result")

ModelInfoResponse

Bases: BaseModel

MLflow model metadata returned by GET /predict/model/info.

Source code in src/app/schemas/predict.py
class ModelInfoResponse(BaseModel):
    """MLflow model metadata returned by GET /predict/model/info."""

    model_name: str
    stage: str
    version: str
    run_id: str
    metrics: dict[str, float] = Field(default_factory=dict)
    params: dict[str, str] = Field(default_factory=dict)
    feature_names: list[str] = Field(default_factory=list)
    created_at: str | None = None

RegionRoiEntry

Bases: BaseModel

ROI statistics for a single region, served by GET /predict/region-roi/.

Source code in src/app/schemas/predict.py
class RegionRoiEntry(BaseModel):
    """ROI statistics for a single region, served by GET /predict/region-roi/."""

    region_name: str | None
    roi_pct: float | None
    n_bets: int
    hit_rate: float | None = None
    region_id: int | None = None

PrecomputedPredictResponse

Bases: BaseModel

Response for GET /predict/precomputed/{match_id}.

Served directly from predictions.parquet produced by the batch_inference DVC stage — no Celery task, no MLflow model call at request time.

Source code in src/app/schemas/predict.py
class PrecomputedPredictResponse(BaseModel):
    """Response for GET /predict/precomputed/{match_id}.

    Served directly from predictions.parquet produced by the batch_inference
    DVC stage — no Celery task, no MLflow model call at request time.
    """

    match_id: int
    proba_home: float = Field(..., description="P(home win)")
    proba_draw: float = Field(..., description="P(draw)")
    proba_away: float = Field(..., description="P(away win)")
    predicted_class: int = Field(
        ..., description="Argmax class: 0=home win, 1=draw, 2=away win"
    )
    predicted_label: str = Field(
        ..., description="Human-readable label: home_win | draw | away_win"
    )
    is_future: bool | None = None
    start_time_utc: datetime | None = None
    home_team_name: str | None = None
    away_team_name: str | None = None
    model_run_id: str | None = None
    model_stage: str | None = None
    predictions_computed_at: datetime | None = Field(
        None,
        description="UTC timestamp when predictions.parquet was last refreshed from MinIO.",
    )
    params: dict[str, str] = Field(default_factory=dict)
    feature_names: list[str] = Field(default_factory=list)
    created_at: str | None = None

LivescoresUpdateData

Bases: BaseModel

Parameters for triggering a WhoScored livescores database update.

Source code in src/app/schemas/models.py
class LivescoresUpdateData(BaseModel):
    """Parameters for triggering a WhoScored livescores database update."""

    date_end: datetime
    count_days: int = 1
    update_db: bool = True
    save_raw: bool = False

MatchRawLive

Bases: BaseModel

Projected subset of MatchRaw for live-scores display.

Source code in src/app/schemas/models.py
class MatchRawLive(BaseModel):
    """Projected subset of MatchRaw for live-scores display."""

    id: int
    status: Optional[int]
    startTimeUtc: Optional[datetime]
    # Home team
    homeTeamName: Optional[str]
    homeScore: Optional[int]
    # Away team
    awayScore: Optional[int]
    awayTeamName: Optional[str]
    # Tournament
    tournamentName: Optional[str]
    stageName: Optional[str]
    regionName: Optional[str]
    sex: Optional[int]

HealthCheckResponse

Bases: BaseModel

Schema for healthcheck endpoint response.

Source code in src/app/schemas/healthcheck.py
class HealthCheckResponse(BaseModel):
    """Schema for healthcheck endpoint response."""

    status: str = Field(
        ...,
        description="Health status of the application",
        examples=["healthy", "unhealthy"],
    )
    version: str = Field(
        ...,
        description="Application version",
        examples=["1.0.0", "unknown"],
    )
    worker_id: int = Field(
        ...,
        description="Process ID of the worker handling the request",
        examples=[1234],
    )
    memory_usage: float = Field(
        ...,
        description="Memory usage in MB",
        examples=[256.5],
    )
    database: bool = Field(
        True,
        description="True if the database is reachable",
        examples=[True],
    )

    model_config = {
        "json_schema_extra": {
            "examples": [
                {
                    "status": "healthy",
                    "version": "1.0.0",
                    "worker_id": 1234,
                    "memory_usage": 256.5,
                    "database": True,
                },
            ]
        },
    }

Celery Tasks

Celery task: asynchronous match outcome prediction.

Submitted by POST /predict/async/ and executed by the ml Celery worker. The result is stored in the Celery result backend and can be retrieved via GET /monitoring/task_status/{task_id}.

The task result has the same shape as PredictResponse so the Streamlit polling page can display it directly.

Architecture note

The PredictionService is initialised once per worker process via the worker_process_init Celery signal. This avoids loading the MLflow model (potentially hundreds of MB from MinIO) on every task invocation.

predict_match(self, match_id, features, features_computed_at=None, model_stage=None)

Run 1×2 inference for match_id using pre-computed features.

Parameters:

Name Type Description Default
match_id int

Identifier used for logging and response tracing.

required
features dict

Feature dict matching the model's input schema, produced by the batch_inference DVC stage.

required
features_computed_at str | None

ISO-8601 UTC string of when the features were computed (batch_inference mtime). Stored in the response for end-to-end traceability.

None
model_stage str | None

MLflow alias/stage to use (e.g. "champion", "challenger"). Defaults to settings.mlflow.model_stage when None.

None

Returns:

Type Description
dict

Dict compatible with PredictResponse.

Source code in src/app/tasks/predict.py
@celery_app.task(
    name="predict_match",
    bind=True,
    max_retries=2,
    default_retry_delay=10,
    queue="ml",
)
def predict_match(
    self,
    match_id: int,
    features: dict,
    features_computed_at: str | None = None,
    model_stage: str | None = None,
) -> dict:
    """Run 1×2 inference for *match_id* using pre-computed *features*.

    Args:
        match_id: Identifier used for logging and response tracing.
        features: Feature dict matching the model's input schema, produced by
            the ``batch_inference`` DVC stage.
        features_computed_at: ISO-8601 UTC string of when the features were
            computed (batch_inference mtime).  Stored in the response for
            end-to-end traceability.
        model_stage: MLflow alias/stage to use (e.g. "champion", "challenger").
            Defaults to ``settings.mlflow.model_stage`` when None.

    Returns:
        Dict compatible with ``PredictResponse``.
    """
    self.update_state(
        state="PROGRESS",
        meta={
            "status": f"Running inference for match_id={match_id} stage={model_stage}…"
        },
    )

    try:
        fca: datetime | None = (
            datetime.fromisoformat(features_computed_at).replace(tzinfo=timezone.utc)
            if features_computed_at
            else None
        )
        _t0 = time.perf_counter()
        result = _get_service(model_stage).predict(
            features=features, match_id=match_id, features_computed_at=fca
        )
        INFERENCE_LATENCY.observe(time.perf_counter() - _t0)

        # Track predicted probability per outcome class.
        for label_key, prob in (
            result.get("prediction", {}).get("probabilities", {}).items()
        ):
            PREDICTION_CONFIDENCE.labels(
                outcome=_LABEL_NAMES.get(str(label_key), str(label_key))
            ).observe(float(prob))

        logger.info(
            "Async prediction complete: match_id=%s stage=%s run_id=%s cached=%s",
            match_id,
            model_stage,
            result.get("prediction", {}).get("model_run_id", "?"),
            result.get("cached", False),
        )
        return result
    except ValueError as exc:
        # Unknown stage — retrying won't help, fail immediately.
        logger.error("Unknown model stage: %s", exc)
        raise
    except Exception as exc:
        logger.exception("Async prediction failed: match_id=%s", match_id)
        raise self.retry(exc=exc)

get_model_info(self, model_stage=None)

Retrieve MLflow model metadata from the registry.

Parameters:

Name Type Description Default
model_stage str | None

Stage/alias to query. Defaults to settings.mlflow.model_stage.

None

Returns:

Type Description
dict

Dict compatible with ModelInfoResponse.

Source code in src/app/tasks/predict.py
@celery_app.task(
    name="get_model_info",
    bind=True,
    max_retries=1,
    default_retry_delay=5,
    queue="ml",
)
def get_model_info(self, model_stage: str | None = None) -> dict:
    """Retrieve MLflow model metadata from the registry.

    Args:
        model_stage: Stage/alias to query.  Defaults to
            ``settings.mlflow.model_stage``.

    Returns:
        Dict compatible with ``ModelInfoResponse``.
    """
    try:
        return _get_service(model_stage).get_model_info()
    except ValueError as exc:
        logger.error("Unknown model stage: %s", exc)
        raise
    except Exception as exc:
        logger.exception("get_model_info task failed")
        raise self.retry(exc=exc)

Prometheus Metrics

Prometheus metrics registry for the SoccerPredictAI service.

All metric objects are defined here as module-level singletons so they are shared across the FastAPI app and Celery worker within the same process.

Gunicorn multiprocess note

When running under Gunicorn with multiple workers, prometheus-client requires PROMETHEUS_MULTIPROC_DIR to be set to a writable directory. The /metrics endpoint in main.py uses MultiProcessCollector automatically when that variable is present.

Worker

Celery worker entrypoint for the ML worker.

Handles ML inference tasks. Loads the MLflow model into memory on startup via worker_process_init signal defined in tasks.predict.

Start command

celery -A app.worker_ml:celery_app worker -Q ml

App-layer Storage

create_client_s3()

Return a boto3 S3 client configured for the MinIO endpoint.

Source code in src/app/data/storage.py
def create_client_s3():
    """Return a boto3 S3 client configured for the MinIO endpoint."""
    s3_client = boto3.client(
        "s3",
        endpoint_url=get_settings().minio.endpoint_url,
        aws_access_key_id=get_settings().minio.access_key,
        aws_secret_access_key=get_settings().minio.secret_key,
    )
    return s3_client

save_file_to_minio(file_path, bucket_name, object_name=None)

Upload a local file to a MinIO bucket.

Parameters:

Name Type Description Default
file_path

Path to the local file to upload.

required
bucket_name

Target MinIO bucket name.

required
object_name

Object key in the bucket. Defaults to the basename of file_path.

None

Returns:

Type Description

True on success.

Raises:

Type Description
Exception

Re-raised after logging on upload failure.

Source code in src/app/data/storage.py
def save_file_to_minio(file_path, bucket_name, object_name=None):
    """Upload a local file to a MinIO bucket.

    Args:
        file_path: Path to the local file to upload.
        bucket_name: Target MinIO bucket name.
        object_name: Object key in the bucket.  Defaults to the
            basename of *file_path*.

    Returns:
        ``True`` on success.

    Raises:
        Exception: Re-raised after logging on upload failure.
    """
    if object_name is None:
        object_name = os.path.basename(file_path)

    s3_client = create_client_s3()

    try:
        s3_client.upload_file(file_path, bucket_name, object_name)
        return True
    except Exception as e:
        logger.error("Failed to upload %s to MinIO: %s", object_name, e)
        raise

save_json_to_minio(data, bucket_name, object_name)

Serialise data to JSON and upload it to MinIO.

Parameters:

Name Type Description Default
data

JSON-serialisable object.

required
bucket_name

Target MinIO bucket name.

required
object_name

Object key in the bucket.

required

Returns:

Type Description

True on success.

Source code in src/app/data/storage.py
def save_json_to_minio(data, bucket_name, object_name):
    """Serialise *data* to JSON and upload it to MinIO.

    Args:
        data: JSON-serialisable object.
        bucket_name: Target MinIO bucket name.
        object_name: Object key in the bucket.

    Returns:
        ``True`` on success.
    """
    with tempfile.NamedTemporaryFile(
        mode="w", delete=False, suffix=".json"
    ) as temp_file:
        temp_file_path = temp_file.name
        json.dump(data, temp_file, indent=4)

    try:
        save_file_to_minio(temp_file_path, bucket_name, object_name)
        return True
    finally:
        if os.path.exists(temp_file_path):
            os.remove(temp_file_path)

save_binary_to_minio(data, bucket_name, object_name)

Upload a binary file-like object to MinIO.

Parameters:

Name Type Description Default
data

File-like object opened in binary mode (must support seek(0) and read()).

required
bucket_name

Target MinIO bucket name.

required
object_name

Object key in the bucket.

required

Returns:

Type Description

True on success.

Source code in src/app/data/storage.py
def save_binary_to_minio(data, bucket_name, object_name):
    """Upload a binary file-like object to MinIO.

    Args:
        data: File-like object opened in binary mode (must support
            ``seek(0)`` and ``read()``).
        bucket_name: Target MinIO bucket name.
        object_name: Object key in the bucket.

    Returns:
        ``True`` on success.
    """
    with tempfile.NamedTemporaryFile(delete=False) as temp_file:
        temp_file_path = temp_file.name
        data.seek(0)
        with open(temp_file_path, "wb") as file:
            file.write(data.read())

    try:
        save_file_to_minio(temp_file_path, bucket_name, object_name)
        return True
    finally:
        if os.path.exists(temp_file_path):
            os.remove(temp_file_path)

save_text_to_minio(data, bucket_name, object_name)

Upload a plain-text string to MinIO.

Parameters:

Name Type Description Default
data

String content to upload.

required
bucket_name

Target MinIO bucket name.

required
object_name

Object key in the bucket.

required

Returns:

Type Description

True on success.

Source code in src/app/data/storage.py
def save_text_to_minio(data, bucket_name, object_name):
    """Upload a plain-text string to MinIO.

    Args:
        data: String content to upload.
        bucket_name: Target MinIO bucket name.
        object_name: Object key in the bucket.

    Returns:
        ``True`` on success.
    """
    with tempfile.NamedTemporaryFile(delete=False, mode="w") as temp_file:
        temp_file_path = temp_file.name
        temp_file.write(data)

    try:
        save_file_to_minio(temp_file_path, bucket_name, object_name)
        return True
    finally:
        if os.path.exists(temp_file_path):
            os.remove(temp_file_path)

save_dataframe_to_minio(df, bucket_name, object_name)

Serialise a DataFrame to CSV and upload it to MinIO.

Parameters:

Name Type Description Default
df

pandas DataFrame to upload.

required
bucket_name

Target MinIO bucket name.

required
object_name

Object key in the bucket.

required

Returns:

Type Description

True on success.

Source code in src/app/data/storage.py
def save_dataframe_to_minio(df, bucket_name, object_name):
    """Serialise a DataFrame to CSV and upload it to MinIO.

    Args:
        df: pandas DataFrame to upload.
        bucket_name: Target MinIO bucket name.
        object_name: Object key in the bucket.

    Returns:
        ``True`` on success.
    """
    with tempfile.NamedTemporaryFile(
        delete=False, mode="w", suffix=".csv"
    ) as temp_file:
        temp_file_path = temp_file.name
        df.to_csv(temp_file, index=False)
    try:
        save_file_to_minio(temp_file_path, bucket_name, object_name)
        return True
    finally:
        if os.path.exists(temp_file_path):
            os.remove(temp_file_path)

get_file_from_minio(bucket_name, object_name, file_path=None)

Download an object from MinIO to a local file.

Parameters:

Name Type Description Default
bucket_name

Source MinIO bucket name.

required
object_name

Object key to download.

required
file_path

Local destination path. Defaults to object_name.

None

Returns:

Type Description

The resolved local file_path.

Raises:

Type Description
Exception

Re-raised after logging on download failure.

Source code in src/app/data/storage.py
def get_file_from_minio(bucket_name, object_name, file_path=None):
    """Download an object from MinIO to a local file.

    Args:
        bucket_name: Source MinIO bucket name.
        object_name: Object key to download.
        file_path: Local destination path.  Defaults to *object_name*.

    Returns:
        The resolved local *file_path*.

    Raises:
        Exception: Re-raised after logging on download failure.
    """
    if file_path is None:
        file_path = object_name

    s3_client = create_client_s3()

    try:
        s3_client.download_file(bucket_name, object_name, file_path)
        return file_path
    except Exception as e:
        logger.error("Failed to download %s from MinIO: %s", object_name, e)
        raise

get_binary_from_minio(bucket_name, object_name)

Retrieve binary data from MinIO and return it as bytes.

Parameters:

Name Type Description Default
bucket_name

Source MinIO bucket name.

required
object_name

Object key to retrieve.

required

Returns:

Type Description

Raw bytes content of the object.

Source code in src/app/data/storage.py
def get_binary_from_minio(bucket_name, object_name):
    """Retrieve binary data from MinIO and return it as bytes.

    Args:
        bucket_name: Source MinIO bucket name.
        object_name: Object key to retrieve.

    Returns:
        Raw bytes content of the object.
    """
    with tempfile.NamedTemporaryFile(delete=False, suffix=".bin") as temp_file:
        temp_file_path = temp_file.name

    try:
        get_file_from_minio(bucket_name, object_name, temp_file_path)
        with open(temp_file_path, "rb") as file:
            data = file.read()
        return data
    finally:
        if os.path.exists(temp_file_path):
            os.remove(temp_file_path)

get_text_from_minio(bucket_name, object_name)

Retrieve a text object from MinIO and return it as a string.

Parameters:

Name Type Description Default
bucket_name

Source MinIO bucket name.

required
object_name

Object key to retrieve.

required

Returns:

Type Description

String content of the object.

Source code in src/app/data/storage.py
def get_text_from_minio(bucket_name, object_name):
    """Retrieve a text object from MinIO and return it as a string.

    Args:
        bucket_name: Source MinIO bucket name.
        object_name: Object key to retrieve.

    Returns:
        String content of the object.
    """
    with tempfile.NamedTemporaryFile(delete=False, suffix=".txt") as temp_file:
        temp_file_path = temp_file.name

    try:
        get_file_from_minio(bucket_name, object_name, temp_file_path)
        with open(temp_file_path, "r") as file:
            data = file.read()
        return data
    finally:
        if os.path.exists(temp_file_path):
            os.remove(temp_file_path)

get_json_from_minio(bucket_name, object_name)

Download a JSON object from MinIO and parse it.

Parameters:

Name Type Description Default
bucket_name

Source MinIO bucket name.

required
object_name

Object key to retrieve.

required

Returns:

Type Description

Parsed Python object (dict or list).

Source code in src/app/data/storage.py
def get_json_from_minio(bucket_name, object_name):
    """Download a JSON object from MinIO and parse it.

    Args:
        bucket_name: Source MinIO bucket name.
        object_name: Object key to retrieve.

    Returns:
        Parsed Python object (dict or list).
    """
    with tempfile.NamedTemporaryFile(delete=False, suffix=".json") as temp_file:
        temp_file_path = temp_file.name

    try:
        get_file_from_minio(bucket_name, object_name, temp_file_path)
        with open(temp_file_path, "r") as file:
            data = json.load(file)
        return data
    finally:
        if os.path.exists(temp_file_path):
            os.remove(temp_file_path)

get_dataframe_from_minio(bucket_name, object_name)

Download a CSV object from MinIO and parse it as a DataFrame.

Parameters:

Name Type Description Default
bucket_name

Source MinIO bucket name.

required
object_name

Object key to retrieve.

required

Returns:

Type Description

pandas DataFrame parsed from the CSV content.

Source code in src/app/data/storage.py
def get_dataframe_from_minio(bucket_name, object_name):
    """Download a CSV object from MinIO and parse it as a DataFrame.

    Args:
        bucket_name: Source MinIO bucket name.
        object_name: Object key to retrieve.

    Returns:
        pandas DataFrame parsed from the CSV content.
    """
    with tempfile.NamedTemporaryFile(delete=False, suffix=".csv") as temp_file:
        temp_file_path = temp_file.name
    try:
        get_file_from_minio(bucket_name, object_name, temp_file_path)
        df = pd.read_csv(temp_file_path)
        return df
    finally:
        if os.path.exists(temp_file_path):
            os.remove(temp_file_path)

file_exists_in_minio(bucket_name, object_name)

Check whether an object exists in a MinIO bucket.

Parameters:

Name Type Description Default
bucket_name

MinIO bucket to query.

required
object_name

Object key to check.

required

Returns:

Type Description

True if the object exists, False otherwise.

Source code in src/app/data/storage.py
def file_exists_in_minio(bucket_name, object_name):
    """Check whether an object exists in a MinIO bucket.

    Args:
        bucket_name: MinIO bucket to query.
        object_name: Object key to check.

    Returns:
        ``True`` if the object exists, ``False`` otherwise.
    """
    s3_client = create_client_s3()

    try:
        s3_client.head_object(Bucket=bucket_name, Key=object_name)
        return True
    except Exception:
        return False

list_files_in_minio(bucket_name, prefix='')

List all object keys in a MinIO bucket under the given prefix.

Parameters:

Name Type Description Default
bucket_name

MinIO bucket to list.

required
prefix

Key prefix to filter results. Defaults to "" (list all objects).

''

Returns:

Type Description

List of object key strings.

Source code in src/app/data/storage.py
def list_files_in_minio(bucket_name, prefix=""):
    """List all object keys in a MinIO bucket under the given prefix.

    Args:
        bucket_name: MinIO bucket to list.
        prefix: Key prefix to filter results.  Defaults to ``""``
            (list all objects).

    Returns:
        List of object key strings.
    """
    s3_client = create_client_s3()
    paginator = s3_client.get_paginator("list_objects_v2")
    files = []
    for page in paginator.paginate(Bucket=bucket_name, Prefix=prefix):
        for obj in page.get("Contents", []):
            files.append(obj["Key"])
    return files

Configuration

Whoscored

Bases: BaseModel

Root container for WhoScored livescores API response schemas.

Source code in src/app/config/validate.py
class Whoscored(BaseModel):
    """Root container for WhoScored livescores API response schemas."""

    class Livescores(BaseModel):
        """WhoScored livescores feed payload."""

        createdAt: datetime
        version: int
        timeout: int
        tournaments: List["Tournament"]
        localization: "Localization"
        meta: "Meta"
        showOddsToggle: bool
        isSummary: bool
        oddsToggled: bool
        bookmakerCode: str
        bookmakers: List[str]
        timezoneOffset: int
        locale: str
        calendarType: Optional[Any] = None
        fixtureDate: Optional[Any] = None

        model_config = ConfigDict(extra="forbid")

    class MatchArgs(BaseModel):
        """Arguments payload for a WhoScored match centre request."""

        matchId: int
        matchCentreData: "MatchCentreData"
        matchCentreEventTypeJson: Dict[str, float]  # constant
        formationIdNameMappings: Dict[str, str]  # constant

        model_config = ConfigDict(extra="forbid")

    class MatchHeader(BaseModel):
        """Header metadata for a WhoScored match entry."""

        id: int
        homeTeamId: int
        awayTeamId: int
        homeTeamName: str
        awayTeamName: str
        kickoffTime: datetime
        submissionTime: datetime
        status: int
        elapsed: str
        halftimeScore: Optional[str]
        fulltimeScore: Optional[str]
        extratimeScore: Optional[str]
        penaltyScore: Optional[str]
        finalScore: Optional[str]
        homeTeamCountryName: str
        awayTeamCountryName: str

        @field_validator("kickoffTime", "submissionTime", mode="before")
        @classmethod
        def parse_datetime(cls, v):
            """Parse a datetime string in ``dd/mm/YYYY HH:MM:SS`` format."""
            if isinstance(v, str):
                return datetime.strptime(v, "%d/%m/%Y %H:%M:%S")
            return v

        model_config = ConfigDict(extra="forbid")

    livescores: type[Livescores] = Livescores
    match_live_args: type[MatchArgs] = MatchArgs
    match_live_header: type[MatchHeader] = MatchHeader

Livescores

Bases: BaseModel

WhoScored livescores feed payload.

Source code in src/app/config/validate.py
class Livescores(BaseModel):
    """WhoScored livescores feed payload."""

    createdAt: datetime
    version: int
    timeout: int
    tournaments: List["Tournament"]
    localization: "Localization"
    meta: "Meta"
    showOddsToggle: bool
    isSummary: bool
    oddsToggled: bool
    bookmakerCode: str
    bookmakers: List[str]
    timezoneOffset: int
    locale: str
    calendarType: Optional[Any] = None
    fixtureDate: Optional[Any] = None

    model_config = ConfigDict(extra="forbid")

MatchArgs

Bases: BaseModel

Arguments payload for a WhoScored match centre request.

Source code in src/app/config/validate.py
class MatchArgs(BaseModel):
    """Arguments payload for a WhoScored match centre request."""

    matchId: int
    matchCentreData: "MatchCentreData"
    matchCentreEventTypeJson: Dict[str, float]  # constant
    formationIdNameMappings: Dict[str, str]  # constant

    model_config = ConfigDict(extra="forbid")

MatchHeader

Bases: BaseModel

Header metadata for a WhoScored match entry.

Source code in src/app/config/validate.py
class MatchHeader(BaseModel):
    """Header metadata for a WhoScored match entry."""

    id: int
    homeTeamId: int
    awayTeamId: int
    homeTeamName: str
    awayTeamName: str
    kickoffTime: datetime
    submissionTime: datetime
    status: int
    elapsed: str
    halftimeScore: Optional[str]
    fulltimeScore: Optional[str]
    extratimeScore: Optional[str]
    penaltyScore: Optional[str]
    finalScore: Optional[str]
    homeTeamCountryName: str
    awayTeamCountryName: str

    @field_validator("kickoffTime", "submissionTime", mode="before")
    @classmethod
    def parse_datetime(cls, v):
        """Parse a datetime string in ``dd/mm/YYYY HH:MM:SS`` format."""
        if isinstance(v, str):
            return datetime.strptime(v, "%d/%m/%Y %H:%M:%S")
        return v

    model_config = ConfigDict(extra="forbid")

parse_datetime(v) classmethod

Parse a datetime string in dd/mm/YYYY HH:MM:SS format.

Source code in src/app/config/validate.py
@field_validator("kickoffTime", "submissionTime", mode="before")
@classmethod
def parse_datetime(cls, v):
    """Parse a datetime string in ``dd/mm/YYYY HH:MM:SS`` format."""
    if isinstance(v, str):
        return datetime.strptime(v, "%d/%m/%Y %H:%M:%S")
    return v

Incident

Bases: BaseModel

A single match incident (goal, card, or substitution).

Source code in src/app/config/validate.py
class Incident(BaseModel):
    """A single match incident (goal, card, or substitution)."""

    minute: str
    type: int
    subType: int
    playerName: Optional[str] = None
    playerId: int
    participatingPlayerName: Optional[str] = None
    participatingPlayerId: int
    runningScore: Optional[Any] = None
    field: int
    period: int

    model_config = ConfigDict(extra="forbid")

Match

Bases: BaseModel

WhoScored livescores summary entry for a single match.

Source code in src/app/config/validate.py
class Match(BaseModel):
    """WhoScored livescores summary entry for a single match."""

    stageId: int
    id: int
    status: int
    startTime: datetime
    homeTeamId: int
    homeTeamName: str
    homeYellowCards: int
    homeRedCards: int
    awayTeamId: int
    awayTeamName: str
    awayYellowCards: int
    awayRedCards: int
    hasIncidentsSummary: bool
    hasPreview: bool
    scoreChangedAt: Optional[datetime] = None
    elapsed: str
    lastScorer: Optional[int] = None
    isTopMatch: bool
    homeTeamCountryCode: Optional[str] = None
    awayTeamCountryCode: Optional[str] = None
    commentCount: int
    isLineupConfirmed: bool
    isStreamAvailable: bool
    matchIsOpta: bool
    homeTeamCountryName: str
    awayTeamCountryName: str
    startTimeUtc: datetime
    homeScore: Optional[int] = None
    awayScore: Optional[int] = None
    incidents: List[Incident] = []
    bets: Optional["Bets"] = None
    aggregateWinnerField: Optional[Any] = None
    winnerField: Optional[Any] = None
    period: int
    extraResultField: Optional[Any] = None
    homeExtratimeScore: Optional[int] = None
    awayExtratimeScore: Optional[int] = None
    homePenaltyScore: Optional[int] = None
    awayPenaltyScore: Optional[int] = None
    startedAtUtc: Optional[datetime] = None
    firstHalfEndedAtUtc: Optional[datetime] = None
    secondHalfStartedAtUtc: Optional[datetime] = None

    @field_validator("scoreChangedAt", mode="before")
    @classmethod
    def handle_empty_datetime(cls, v):
        """Coerce empty string to None for optional datetime fields."""
        if v == "":
            return None
        return v

    model_config = ConfigDict(extra="forbid")

handle_empty_datetime(v) classmethod

Coerce empty string to None for optional datetime fields.

Source code in src/app/config/validate.py
@field_validator("scoreChangedAt", mode="before")
@classmethod
def handle_empty_datetime(cls, v):
    """Coerce empty string to None for optional datetime fields."""
    if v == "":
        return None
    return v

Tournament

Bases: BaseModel

A tournament stage grouping a set of matches.

Source code in src/app/config/validate.py
class Tournament(BaseModel):
    """A tournament stage grouping a set of matches."""

    tournamentId: int
    stageId: int
    stageName: str
    regionId: int
    tournamentName: Optional[str] = None
    seasonName: str
    seasonId: int
    stageSortOrder: int
    sex: int
    tournamentSortOrder: Optional[int] = None
    regionCode: Optional[str] = None
    regionName: str
    isOpta: bool
    navigationDisplayMode: int
    matches: List[Match]

    model_config = ConfigDict(extra="forbid")

Localization

Bases: BaseModel

UI label strings returned by the WhoScored livescores feed.

Source code in src/app/config/validate.py
class Localization(BaseModel):
    """UI label strings returned by the WhoScored livescores feed."""

    match_status_all: str
    match_status_live: str
    match_status_upcoming: str
    livescores_info_refresh: str
    livescores_info_starts_hrs_mins: str
    livescores_info_starts_mins: str
    livescores_info_loading: str
    favourite_info_please: str
    favourite_lnk_singin: str
    favourite_lnk_register: str
    info_today: str
    lbl_user_odds_display_type: str
    sex_abbrv_woman: str
    ls_view_in_mc: str
    lnk_datepicker_view_current_week: str
    lnk_datepicker_view_current_month: str
    fltr_livescores_summary_yesterday: str
    fltr_livescores_summary_today: str
    fltr_livescores_summary_tomorrow: str
    GK: str
    DR: str
    DC: str
    DL: str
    DMR: str
    DMC: str
    DML: str
    MR: str
    MC: str
    ML: str
    AMR: str
    AMC: str
    AM: str
    AML: str
    FWR: str
    FW: str
    FC: str
    FWL: str
    Sub: str
    abd: str
    aet: str
    can: str
    et1: str
    et2: str
    ft: str
    ht: str
    pen: str
    post: str
    susp: str

    model_config = ConfigDict(extra="forbid")

Meta

Bases: BaseModel

WhoScored feed metadata containing the global CDN base URL.

Source code in src/app/config/validate.py
class Meta(BaseModel):
    """WhoScored feed metadata containing the global CDN base URL."""

    imagesUrlBase: HttpUrl

    model_config = ConfigDict(extra="forbid")

MatchCentreData

Bases: BaseModel

Full match centre payload including events, stats, and formations.

Source code in src/app/config/validate.py
class MatchCentreData(BaseModel):
    """Full match centre payload including events, stats, and formations."""

    playerIdNameDictionary: Dict[str, str]
    periodMinuteLimits: Dict[str, int]  # constant
    timeStamp: Optional[datetime] = None
    attendance: int
    venueName: Optional[str] = None
    referee: "Referee"
    weatherCode: str
    elapsed: str
    startTime: datetime
    startDate: datetime
    score: str
    htScore: str
    ftScore: str
    etScore: str
    pkScore: str
    statusCode: int
    periodCode: int
    home: "Team"
    away: "Team"
    maxMinute: int
    minuteExpanded: int
    maxPeriod: int
    expandedMinutes: "ExpandedMinutes"
    expandedMaxMinute: int
    periodEndMinutes: Dict[str, int]
    commonEvents: List
    events: List["IncidentEvents"]
    timeoutInSeconds: int

    @field_validator("timeStamp", "startTime", "startDate", mode="before")
    def parse_multi_format_datetime(cls, v):
        """Parse datetime strings in multiple WhoScored date formats."""
        if isinstance(v, datetime):
            return v
        if isinstance(v, str):
            for fmt in ("%d/%m/%Y %H:%M:%S", "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"):
                try:
                    return datetime.strptime(v, fmt)
                except ValueError:
                    continue
        raise ValueError(f"Invalid datetime format: {v}")

    model_config = ConfigDict(extra="forbid")

parse_multi_format_datetime(v)

Parse datetime strings in multiple WhoScored date formats.

Source code in src/app/config/validate.py
@field_validator("timeStamp", "startTime", "startDate", mode="before")
def parse_multi_format_datetime(cls, v):
    """Parse datetime strings in multiple WhoScored date formats."""
    if isinstance(v, datetime):
        return v
    if isinstance(v, str):
        for fmt in ("%d/%m/%Y %H:%M:%S", "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"):
            try:
                return datetime.strptime(v, fmt)
            except ValueError:
                continue
    raise ValueError(f"Invalid datetime format: {v}")

Referee

Bases: BaseModel

Match referee information from the WhoScored match centre.

Source code in src/app/config/validate.py
class Referee(BaseModel):
    """Match referee information from the WhoScored match centre."""

    officialId: int
    firstName: str
    lastName: str
    hasParticipatedMatches: bool
    name: str

    model_config = ConfigDict(extra="forbid")

Team

Bases: BaseModel

Team data payload from the WhoScored match centre.

Source code in src/app/config/validate.py
class Team(BaseModel):
    """Team data payload from the WhoScored match centre."""

    teamId: int
    formations: List["Formation"]
    stats: "TeamStats"
    incidentEvents: List["IncidentEvents"]
    shotZones: "ShotZones"
    name: str
    countryName: str
    players: List["Player"]
    managerName: Optional[str] = None
    scores: "Scores"
    field: str
    averageAge: float

    model_config = ConfigDict(extra="forbid")

Formation

Bases: BaseModel

Tactical formation snapshot for a given match period.

Source code in src/app/config/validate.py
class Formation(BaseModel):
    """Tactical formation snapshot for a given match period."""

    formationId: int
    formationName: str
    captainPlayerId: Optional[int] = None
    period: int
    startMinuteExpanded: int
    endMinuteExpanded: int
    jerseyNumbers: List[int]
    formationSlots: List[int]
    playerIds: List[int]
    formationPositions: List["Position"]
    subOnPlayerId: Optional[int] = None
    subOffPlayerId: Optional[int] = None

    model_config = ConfigDict(extra="forbid")

Position

Bases: BaseModel

Pitch coordinate for a player's formation slot.

Source code in src/app/config/validate.py
class Position(BaseModel):
    """Pitch coordinate for a player's formation slot."""

    vertical: float
    horizontal: float

    model_config = ConfigDict(extra="forbid")

TeamStats

Bases: BaseModel

Aggregate per-minute statistics for a team in a match.

Source code in src/app/config/validate.py
class TeamStats(BaseModel):
    """Aggregate per-minute statistics for a team in a match."""

    minutesWithStats: List[int] = []
    ratings: Dict[str, float] = {}

    # Shooting
    shotsTotal: Dict[str, int] = {}
    shotsOnTarget: Dict[str, int] = {}
    shotsOffTarget: Dict[str, int] = {}
    shotsBlocked: Dict[str, int] = {}
    shotsOnPost: Dict[str, int] = {}

    # Defensive actions
    clearances: Dict[str, int] = {}
    interceptions: Dict[str, int] = {}

    # Possession
    possession: Dict[str, int] = {}
    touches: Dict[str, int] = {}

    # Passing
    passesTotal: Dict[str, int] = {}
    passesAccurate: Dict[str, int] = {}
    passesKey: Dict[str, int] = {}
    passSuccess: Dict[str, float] = {}

    # Aerial duels
    aerialsTotal: Dict[str, int] = {}
    aerialsWon: Dict[str, int] = {}
    aerialSuccess: Dict[str, float] = {}
    offensiveAerials: Dict[str, int] = {}
    defensiveAerials: Dict[str, int] = {}

    # Set pieces
    cornersTotal: Dict[str, int] = {}
    cornersAccurate: Dict[str, int] = {}
    throwInsTotal: Dict[str, int] = {}
    throwInsAccurate: Dict[str, int] = {}
    throwInAccuracy: Dict[str, float] = {}

    # Fouls and offsides
    offsidesCaught: Dict[str, int] = {}
    foulsCommited: Dict[str, int] = {}

    # Tackles
    tacklesTotal: Dict[str, int] = {}
    tackleSuccessful: Dict[str, int] = {}
    tackleUnsuccesful: Dict[str, int] = {}
    tackleSuccess: Dict[str, float] = {}

    # Dribbling
    dribbledPast: Dict[str, int] = {}
    dribblesWon: Dict[str, int] = {}
    dribblesAttempted: Dict[str, int] = {}
    dribblesLost: Dict[str, int] = {}
    dribbleSuccess: Dict[str, float] = {}
    dispossessed: Dict[str, int] = {}

    # Errors
    errors: Dict[str, int] = {}

    model_config = ConfigDict(extra="forbid")

IncidentEvents

Bases: BaseModel

A single event entry from the WhoScored match centre event stream.

Source code in src/app/config/validate.py
class IncidentEvents(BaseModel):
    """A single event entry from the WhoScored match centre event stream."""

    id: float
    eventId: Optional[int] = None
    minute: int
    second: Optional[int] = None
    teamId: int
    playerId: Optional[int] = None
    relatedEventId: Optional[int] = None
    relatedPlayerId: Optional[int] = None
    x: float
    y: float
    expandedMinute: Optional[int] = None
    period: Optional["ValueDisplayName"] = None
    type: "ValueDisplayName"
    outcomeType: "ValueDisplayName"
    qualifiers: List["Qualifier"]
    satisfiedEventsTypes: List[int]
    isTouch: bool
    blockedX: Optional[float] = None
    blockedY: Optional[float] = None
    goalMouthZ: Optional[float] = None
    goalMouthY: Optional[float] = None
    isOwnGoal: Optional[bool] = None
    isGoal: Optional[bool] = None
    isShot: Optional[bool] = None
    endX: Optional[float] = None
    endY: Optional[float] = None
    cardType: Optional["ValueDisplayName"] = None

    model_config = ConfigDict(extra="forbid")

ValueDisplayName

Bases: BaseModel

Integer code paired with a human-readable display name.

Source code in src/app/config/validate.py
class ValueDisplayName(BaseModel):
    """Integer code paired with a human-readable display name."""

    value: int
    displayName: str

    model_config = ConfigDict(extra="forbid")

Qualifier

Bases: BaseModel

Event qualifier providing additional context for an incident.

Source code in src/app/config/validate.py
class Qualifier(BaseModel):
    """Event qualifier providing additional context for an incident."""

    type: "ValueDisplayName"
    value: Optional[str] = None

    model_config = ConfigDict(extra="forbid")

ShotStat

Bases: BaseModel

Goal and attempt counts for a single shot zone.

Source code in src/app/config/validate.py
class ShotStat(BaseModel):
    """Goal and attempt counts for a single shot zone."""

    goalCount: int
    count: int

    model_config = ConfigDict(extra="forbid")

ZoneStats

Bases: BaseModel

Shot statistics keyed by period for a pitch zone.

Source code in src/app/config/validate.py
class ZoneStats(BaseModel):
    """Shot statistics keyed by period for a pitch zone."""

    stats: Dict[str, ShotStat] = {}

    model_config = ConfigDict(extra="forbid")

ShotZones

Bases: BaseModel

Shot outcome counts broken down by pitch zone for a team.

Source code in src/app/config/validate.py
class ShotZones(BaseModel):
    """Shot outcome counts broken down by pitch zone for a team."""

    # High shots (above crossbar height)
    missHighLeft: ZoneStats = ZoneStats()
    missHighCentre: ZoneStats = ZoneStats()
    missHighRight: ZoneStats = ZoneStats()

    # Side misses
    missLeft: ZoneStats = ZoneStats()
    missRight: ZoneStats = ZoneStats()

    # Post/crossbar hits
    postLeft: ZoneStats = ZoneStats()
    postCentre: ZoneStats = ZoneStats()
    postRight: ZoneStats = ZoneStats()

    # On target high shots
    onTargetHighLeft: ZoneStats = ZoneStats()
    onTargetHighCentre: ZoneStats = ZoneStats()
    onTargetHighRight: ZoneStats = ZoneStats()

    # On target low shots
    onTargetLowLeft: ZoneStats = ZoneStats()
    onTargetLowCentre: ZoneStats = ZoneStats()
    onTargetLowRight: ZoneStats = ZoneStats()

    model_config = ConfigDict(extra="forbid")

PlayerStats

Bases: BaseModel

Per-minute player statistics from the WhoScored match centre.

Source code in src/app/config/validate.py
class PlayerStats(BaseModel):
    """Per-minute player statistics from the WhoScored match centre."""

    ratings: Dict[str, float] = {}

    # Shooting
    shotsTotal: Dict[str, float] = {}
    shotsOnTarget: Dict[str, float] = {}
    shotsOffTarget: Dict[str, float] = {}
    shotsBlocked: Dict[str, float] = {}
    shotsOnPost: Dict[str, float] = {}
    goals: Dict[str, float] = {}
    assists: Dict[str, float] = {}

    # Defensive actions
    clearances: Dict[str, float] = {}
    interceptions: Dict[str, float] = {}

    # Possession
    possession: Dict[str, float] = {}
    touches: Dict[str, float] = {}

    # Passing
    passesTotal: Dict[str, float] = {}
    passesAccurate: Dict[str, float] = {}
    passesKey: Dict[str, float] = {}
    passSuccess: Dict[str, float] = {}

    # Aerial duels
    aerialsTotal: Dict[str, float] = {}
    aerialsWon: Dict[str, float] = {}
    aerialSuccess: Dict[str, float] = {}
    offensiveAerials: Dict[str, float] = {}
    defensiveAerials: Dict[str, float] = {}

    # Set pieces
    cornersTotal: Dict[str, float] = {}
    cornersAccurate: Dict[str, float] = {}
    throwInsTotal: Dict[str, float] = {}
    throwInsAccurate: Dict[str, float] = {}
    throwInAccuracy: Dict[str, float] = {}

    # Fouls and offsides
    offsidesCaught: Dict[str, float] = {}

    # Tackles
    tacklesTotal: Dict[str, float] = {}
    tackleSuccessful: Dict[str, float] = {}
    tackleUnsuccesful: Dict[str, float] = {}
    tackleSuccess: Dict[str, float] = {}
    dribbledPast: Dict[str, float] = {}

    # Goalkeeper specific stats
    totalSaves: Dict[str, float] = {}
    collected: Dict[str, float] = {}
    parriedSafe: Dict[str, float] = {}
    parriedDanger: Dict[str, float] = {}
    claimsHigh: Dict[str, float] = {}

    # Dribbling stats
    dribblesWon: Dict[str, float] = {}
    dribblesAttempted: Dict[str, float] = {}
    dribblesLost: Dict[str, float] = {}
    dribbleSuccess: Dict[str, float] = {}
    dispossessed: Dict[str, float] = {}

    # Fouls and discipline
    foulsCommited: Dict[str, float] = {}
    foulsDrawn: Dict[str, float] = {}
    yellowCards: Dict[str, float] = {}
    redCards: Dict[str, float] = {}

    # Errors
    errors: Dict[str, float] = {}

    # Other stats
    claimsTotal: Dict[str, float] = {}
    claimsGround: Dict[str, float] = {}

    model_config = ConfigDict(extra="forbid")

Player

Bases: BaseModel

Player entry in a team's squad for a specific match.

Source code in src/app/config/validate.py
class Player(BaseModel):
    """Player entry in a team's squad for a specific match."""

    playerId: int
    shirtNo: Optional[int] = None
    name: str
    position: Optional[str] = None
    height: Optional[int] = None
    weight: Optional[int] = None
    age: Optional[int] = None
    subbedInPlayerId: Optional[int] = None
    subbedInPeriod: Optional["ValueDisplayName"] = None
    subbedInExpandedMinute: Optional[int] = None
    subbedOutPlayerId: Optional[int] = None
    subbedOutPeriod: Optional["ValueDisplayName"] = None
    subbedOutExpandedMinute: Optional[int] = None
    isFirstEleven: bool = False
    isManOfTheMatch: bool = False
    field: str
    stats: PlayerStats = Field(default_factory=PlayerStats)

    model_config = ConfigDict(extra="forbid")

Scores

Bases: BaseModel

Score totals at each period boundary for a team.

Source code in src/app/config/validate.py
class Scores(BaseModel):
    """Score totals at each period boundary for a team."""

    halftime: Optional[int] = None
    fulltime: Optional[int] = None
    running: Optional[int] = None
    extratime: Optional[int] = None
    penalty: Optional[int] = None

    model_config = ConfigDict(extra="forbid")

ExpandedMinutes

Bases: BaseModel

Mapping from real match minute to expanded minute index per period.

Source code in src/app/config/validate.py
class ExpandedMinutes(BaseModel):
    """Mapping from real match minute to expanded minute index per period."""

    period_1: Dict[str, int] = Field(default_factory=dict, alias="1")
    period_2: Dict[str, int] = Field(default_factory=dict, alias="2")
    period_3: Dict[str, int] = Field(default_factory=dict, alias="3")
    period_4: Dict[str, int] = Field(default_factory=dict, alias="4")

    model_config = ConfigDict(extra="forbid")

Offer

Bases: BaseModel

A bookmaker odds offer for a single bet outcome.

Source code in src/app/config/validate.py
class Offer(BaseModel):
    """A bookmaker odds offer for a single bet outcome."""

    oddsDecimal: str
    oddsFractional: str
    oddsUS: str
    clickOutUrl: str
    bettingProvider: str
    providerId: int

    model_config = ConfigDict(extra="forbid")

BetOption

Bases: BaseModel

A named bet option with one or more bookmaker offers.

Source code in src/app/config/validate.py
class BetOption(BaseModel):
    """A named bet option with one or more bookmaker offers."""

    betName: str
    betId: int
    offers: List[Offer]

    model_config = ConfigDict(extra="forbid")

Bets

Bases: BaseModel

1X2 odds container with home, draw, and away bet options.

Source code in src/app/config/validate.py
class Bets(BaseModel):
    """1X2 odds container with home, draw, and away bet options."""

    home: BetOption
    draw: BetOption
    away: BetOption

    model_config = ConfigDict(extra="forbid")

TopParameters

Bases: BaseModel

Fonbet API parameters controlling top-events and sports filtering.

Source code in src/app/config/validate_bets.py
class TopParameters(BaseModel):
    """Fonbet API parameters controlling top-events and sports filtering."""

    topEventsCount: int
    topLiveEventsCount: int
    topSportsCount: int
    minEventPriorityForSport: int
    minPriorityEventsCountPerSport: int
    minEventsCountPerSport: int
    minLiveEventsCountPerSport: int

    model_config = ConfigDict(extra="forbid")

TournamentInfo

Bases: BaseModel

Fonbet tournament metadata entry.

Source code in src/app/config/validate_bets.py
class TournamentInfo(BaseModel):
    """Fonbet tournament metadata entry."""

    id: int
    caption: str
    basicSportId: Optional[int] = None
    backgroundColor: Optional[int] = None
    icon: Optional[str] = None
    tabCaption: Optional[str] = None
    hasSeasonStandings: Optional[bool] = None
    tabTournamentAlias: Optional[str] = None

    model_config = ConfigDict(extra="forbid")

Sport

Bases: BaseModel

Fonbet sport or competition node from the events catalogue.

Source code in src/app/config/validate_bets.py
class Sport(BaseModel):
    """Fonbet sport or competition node from the events catalogue."""

    id: int
    name: str
    parentId: Optional[int] = None
    regionId: Optional[int] = None
    startTime: Optional[datetime] = None
    kind: Optional[str] = None
    sortOrder: Optional[str] = None
    alias: Optional[str] = None
    parentIds: Optional[List[int]] = None
    outrightTableOraIds: Optional[List[int]] = None
    outrightTableLinks: Optional[List[Dict[str, Any]]] = None
    tournamentInfoId: Optional[int] = None
    sportCategoryId: Optional[int] = None
    geoCategoryId: Optional[int] = None
    tabCaption: Optional[str] = None
    specialTableId: Optional[int] = None
    searchKeyWords: Optional[str] = None

    model_config = ConfigDict(extra="forbid")

Event

Bases: BaseModel

Fonbet betting event entry (match or tournament node).

Source code in src/app/config/validate_bets.py
class Event(BaseModel):
    """Fonbet betting event entry (match or tournament node)."""

    id: int
    sportId: int
    parentId: Optional[int] = None
    level: Optional[int] = None
    team1: Optional[str] = None
    team2: Optional[str] = None
    startTime: Optional[datetime] = None
    sortOrder: Optional[str] = None
    num: Optional[int] = None
    kind: Optional[int] = None
    rootKind: Optional[int] = None
    team1Id: Optional[int] = None
    team2Id: Optional[int] = None
    name: Optional[str] = None
    place: Optional[str] = None
    statisticsType: Optional[str] = None
    statisticsTypes: Optional[str] = None
    priority: Optional[int] = None
    matchOfTheDay: Optional[bool] = None
    applicability: Optional[List[str]] = None
    specialTableId: Optional[int] = None
    team1RegionId: Optional[int] = None
    team1UseRegionFlag: Optional[bool] = None
    team2RegionId: Optional[int] = None
    team2UseRegionFlag: Optional[bool] = None
    team1s: Optional[str] = None
    team1m: Optional[str] = None
    team2s: Optional[str] = None
    team2m: Optional[str] = None
    tourEventIds: Optional[List[str]] = None
    tv: Optional[List[int]] = None
    info: Optional[str] = None
    team1Aliases: Optional[str] = None
    team2Aliases: Optional[str] = None
    noEventView: Optional[bool] = None
    willStartSoon: Optional[bool] = None
    state: Optional[dict] = None
    notMatch: Optional[bool] = None
    substituteRootEvent: Optional[bool] = None
    kindShortName: Optional[str] = None
    tvFrame: Optional[list] = None
    minExpress: Optional[int] = None
    hasPromos: Optional[List[str]] = None

    model_config = ConfigDict(extra="forbid")

CustomFactor

Bases: BaseModel

Custom Fonbet betting factors for a specific event.

Source code in src/app/config/validate_bets.py
class CustomFactor(BaseModel):
    """Custom Fonbet betting factors for a specific event."""

    e: int  # event id
    factors: List[Dict[str, Any]]
    countAll: Optional[int] = None

    model_config = ConfigDict(extra="forbid")

PariEvents

Bases: BaseModel

Root Fonbet events catalogue packet from the live odds feed.

Source code in src/app/config/validate_bets.py
class PariEvents(BaseModel):
    """Root Fonbet events catalogue packet from the live odds feed."""

    packetVersion: int
    fromVersion: int
    catalogTablesVersion: int
    catalogSpecialTablesVersion: int
    catalogEventViewVersion: int
    sportBasicMarketsVersion: int
    sportBasicFactorsVersion: int
    independentFactorsVersion: int
    factorsVersion: int
    comboFactorsVersion: int
    sportKindsVersion: int
    topCompetitionsVersion: int
    eventSmartFiltersVersion: int
    geoCategoriesVersion: int
    sportCategoriesVersion: int
    topParameters: TopParameters
    tournamentInfos: List[TournamentInfo]
    sports: List[Sport]
    events: List[Event]
    topEvents: Optional[Dict[str, Any]] = None
    eventBlocks: Optional[List[Any]] = None
    eventMiscs: Optional[List[Any]] = None
    liveEventInfos: Optional[List[Any]] = None
    customFactors: List[CustomFactor]
    publicPromos: Optional[List[Any]] = None

    model_config = ConfigDict(extra="forbid")

ColumnType

Bases: str, Enum

SQL column types

Source code in src/app/config/database.py
class ColumnType(str, Enum):
    """SQL column types"""

    INTEGER = "integer"
    VARCHAR = "varchar"
    TEXT = "text"
    TIMESTAMP = "timestamp"
    DATE = "date"
    REAL = "real"
    BOOLEAN = "boolean"
    SERIAL = "serial"
    BIGINT = "bigint"
    DECIMAL = "decimal"
    INET = "inet"

ColumnConstraint

Bases: str, Enum

SQL column constraints

Source code in src/app/config/database.py
class ColumnConstraint(str, Enum):
    """SQL column constraints"""

    PRIMARY_KEY = "PRIMARY KEY"
    NOT_NULL = "NOT NULL"
    UNIQUE = "UNIQUE"
    FOREIGN_KEY = "REFERENCES"

Column

Bases: BaseModel

Database column definition

Source code in src/app/config/database.py
class Column(BaseModel):
    """Database column definition"""

    name: str
    type: str
    length: Optional[int] = None
    constraints: Optional[List[str]] = None

    def to_sql(self) -> str:
        """Convert column definition to SQL syntax"""
        type_str = self.type
        if self.length and self.type in [ColumnType.VARCHAR, ColumnType.DECIMAL]:
            type_str = f"{self.type}({self.length})"

        # Quote the column name to handle reserved keywords
        column_def = f'"{self.name}" {type_str}'

        if self.constraints:
            column_def += f" {' '.join(self.constraints)}"

        return column_def

to_sql()

Convert column definition to SQL syntax

Source code in src/app/config/database.py
def to_sql(self) -> str:
    """Convert column definition to SQL syntax"""
    type_str = self.type
    if self.length and self.type in [ColumnType.VARCHAR, ColumnType.DECIMAL]:
        type_str = f"{self.type}({self.length})"

    # Quote the column name to handle reserved keywords
    column_def = f'"{self.name}" {type_str}'

    if self.constraints:
        column_def += f" {' '.join(self.constraints)}"

    return column_def

Table

Bases: BaseModel

Database table definition

Source code in src/app/config/database.py
class Table(BaseModel):
    """Database table definition"""

    name: str
    columns: List[Column]

    def create_statement(self) -> str:
        """Generate CREATE TABLE SQL statement"""
        columns_sql = ", ".join(col.to_sql() for col in self.columns)
        return f"CREATE TABLE IF NOT EXISTS {self.name} ({columns_sql});"

create_statement()

Generate CREATE TABLE SQL statement

Source code in src/app/config/database.py
def create_statement(self) -> str:
    """Generate CREATE TABLE SQL statement"""
    columns_sql = ", ".join(col.to_sql() for col in self.columns)
    return f"CREATE TABLE IF NOT EXISTS {self.name} ({columns_sql});"

DatabaseSettings

Bases: BaseSettings

PostgreSQL connection settings for all project databases.

Reads credentials and connection parameters from SOCCER_POSTGRES_* env vars. Toggle use_internal to switch between external (docker-compose dev) and in-cluster (K8s) host/port.

Source code in src/app/config/database.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
class DatabaseSettings(BaseSettings):
    """PostgreSQL connection settings for all project databases.

    Reads credentials and connection parameters from ``SOCCER_POSTGRES_*``
    env vars.  Toggle ``use_internal`` to switch between external
    (docker-compose dev) and in-cluster (K8s) host/port.
    """

    user: str = Field("postgres", validation_alias="SOCCER_POSTGRES_USER")
    password: str = Field(..., validation_alias="SOCCER_POSTGRES_PASSWORD")
    host: str = Field("localhost", validation_alias="SOCCER_POSTGRES_HOST")
    port: int = Field(5432, validation_alias="SOCCER_POSTGRES_PORT")
    use_internal: bool = Field(True, validation_alias="SOCCER_POSTGRES_USE_INTERNAL")
    host_internal: str = Field(
        "localhost", validation_alias="SOCCER_POSTGRES_HOST_INTERNAL"
    )
    port_internal: int = Field(5432, validation_alias="SOCCER_POSTGRES_PORT_INTERNAL")
    db_soccer: str = Field("tickets", validation_alias="SOCCER_POSTGRES_DB_SOCCER")
    db_mlflow: str = Field("mlflow", validation_alias="SOCCER_POSTGRES_DB_MLFLOW")
    db_airflow: str = Field("airflow", validation_alias="SOCCER_POSTGRES_DB_AIRFLOW")
    db_base: str = Field("base", validation_alias="SOCCER_POSTGRES_DB_BASE")

    # Table definitions
    tables: ClassVar[Dict[str, Table]] = {
        "matches": Table(
            name="matches",
            columns=[
                # Tournament information
                Column(
                    name="tournamentId",
                    type=ColumnType.INTEGER,
                    constraints=[ColumnConstraint.NOT_NULL],
                ),
                Column(
                    name="stageId",
                    type=ColumnType.INTEGER,
                    constraints=[ColumnConstraint.NOT_NULL],
                ),
                Column(name="stageName", type=ColumnType.VARCHAR, length=255),
                Column(name="regionId", type=ColumnType.INTEGER),
                Column(name="tournamentName", type=ColumnType.VARCHAR, length=255),
                Column(name="seasonName", type=ColumnType.VARCHAR, length=50),
                Column(name="seasonId", type=ColumnType.INTEGER),
                Column(name="stageSortOrder", type=ColumnType.INTEGER),
                Column(  # Participant gender (1 — male, 2 — female)
                    name="sex", type=ColumnType.INTEGER
                ),
                Column(name="tournamentSortOrder", type=ColumnType.INTEGER),
                Column(name="regionCode", type=ColumnType.VARCHAR, length=10),
                Column(name="regionName", type=ColumnType.VARCHAR, length=100),
                Column(  # Flag indicating data is provided by Opta (a stats company)
                    name="isOpta", type=ColumnType.BOOLEAN
                ),
                Column(  # Navigation display mode (likely internal logic) - [0, 2, 3]
                    name="navigationDisplayMode", type=ColumnType.INTEGER
                ),
                # Match information
                Column(
                    name="id",
                    type=ColumnType.INTEGER,
                    constraints=[ColumnConstraint.PRIMARY_KEY],
                ),
                Column(  # Match status
                    name="status",
                    type=ColumnType.INTEGER,
                ),
                Column(name="startTime", type=ColumnType.TIMESTAMP),
                # Home team
                Column(
                    name="homeTeamId",
                    type=ColumnType.INTEGER,
                    constraints=[ColumnConstraint.NOT_NULL],
                ),
                Column(name="homeTeamName", type=ColumnType.VARCHAR, length=255),
                Column(name="homeYellowCards", type=ColumnType.INTEGER),
                Column(name="homeRedCards", type=ColumnType.INTEGER),
                Column(name="homeTeamCountryCode", type=ColumnType.VARCHAR, length=10),
                Column(name="homeTeamCountryName", type=ColumnType.VARCHAR, length=100),
                Column(name="homeScore", type=ColumnType.INTEGER),
                Column(name="homeExtratimeScore", type=ColumnType.INTEGER),
                Column(name="homePenaltyScore", type=ColumnType.INTEGER),
                # Away team
                Column(
                    name="awayTeamId",
                    type=ColumnType.INTEGER,
                    constraints=[ColumnConstraint.NOT_NULL],
                ),
                Column(name="awayTeamName", type=ColumnType.VARCHAR, length=255),
                Column(name="awayYellowCards", type=ColumnType.INTEGER),
                Column(name="awayRedCards", type=ColumnType.INTEGER),
                Column(name="awayTeamCountryCode", type=ColumnType.VARCHAR, length=10),
                Column(name="awayTeamCountryName", type=ColumnType.VARCHAR, length=100),
                Column(name="awayScore", type=ColumnType.INTEGER),
                Column(name="awayExtratimeScore", type=ColumnType.INTEGER),
                Column(name="awayPenaltyScore", type=ColumnType.INTEGER),
                # Match details
                Column(  # Whether a brief incidents summary for the match exists
                    name="hasIncidentsSummary", type=ColumnType.BOOLEAN
                ),
                Column(  # Whether a match preview exists
                    name="hasPreview", type=ColumnType.BOOLEAN
                ),
                Column(  # Time of the last score change (used to drive LIVESCORE)
                    name="scoreChangedAt", type=ColumnType.TIMESTAMP
                ),
                Column(  # Match status or match time in minutes (format - 21')
                    # ['45+' - extra time in 1H, 'HT' - half-time, 'FT' - full time, '90+' - added time]
                    # [ 'Can', 'PEN', '', '?', 'Abd', 'Post', 'AET']
                    name="elapsed",
                    type=ColumnType.VARCHAR,
                    length=50,
                ),
                Column(name="lastScorer", type=ColumnType.INTEGER),
                Column(name="isTopMatch", type=ColumnType.BOOLEAN),
                Column(name="commentCount", type=ColumnType.INTEGER),
                Column(  # Whether the team line-ups are confirmed
                    name="isLineupConfirmed", type=ColumnType.BOOLEAN
                ),
                Column(  # Whether a match stream is available
                    name="isStreamAvailable", type=ColumnType.BOOLEAN
                ),
                Column(  # Flag indicating the match is from Opta data
                    name="matchIsOpta", type=ColumnType.BOOLEAN
                ),
                Column(name="startTimeUtc", type=ColumnType.TIMESTAMP),
                Column(  # Identifier of the aggregate-score winner over two matches (if any)
                    name="aggregateWinnerField", type=ColumnType.INTEGER
                ),
                Column(  # Identifier of the match winner
                    name="winnerField", type=ColumnType.INTEGER
                ),
                Column(  #  Match period identifier
                    # in finished matches: 0 - no ML, [1, 2, 3, 5, 6, 7] - has ML
                    # in current matches (still unclear): 1 2 3 7 - FT, 5 - AET, 6 - PEN
                    name="period",
                    type=ColumnType.INTEGER,
                ),
                Column(name="extraResultField", type=ColumnType.INTEGER),
                # Match timeline
                Column(name="startedAtUtc", type=ColumnType.TIMESTAMP),
                Column(name="firstHalfEndedAtUtc", type=ColumnType.TIMESTAMP),
                Column(name="secondHalfStartedAtUtc", type=ColumnType.TIMESTAMP),
                # Additional data
                Column(name="incidents", type=ColumnType.INTEGER),
                Column(name="bets", type=ColumnType.INTEGER),
                Column(name="matchArgs", type=ColumnType.BOOLEAN),
                Column(name="matchHeader", type=ColumnType.BOOLEAN),
            ],
        ),
        "matches_live_header": Table(
            name="matches_live_header",
            columns=[
                Column(
                    name="id",
                    type=ColumnType.INTEGER,
                    constraints=[
                        ColumnConstraint.NOT_NULL,
                        "REFERENCES matches(id)",
                        ColumnConstraint.UNIQUE,
                    ],
                ),
                Column(name="homeTeamId", type=ColumnType.INTEGER),
                Column(name="awayTeamId", type=ColumnType.INTEGER),
                Column(name="homeTeamName", type=ColumnType.VARCHAR, length=255),
                Column(name="awayTeamName", type=ColumnType.VARCHAR, length=255),
                Column(name="kickoffTime", type=ColumnType.TIMESTAMP),
                Column(name="submissionTime", type=ColumnType.TIMESTAMP),
                Column(name="status", type=ColumnType.INTEGER),
                Column(name="elapsed", type=ColumnType.VARCHAR, length=50),
                Column(name="halftimeScore", type=ColumnType.VARCHAR, length=20),
                Column(name="fulltimeScore", type=ColumnType.VARCHAR, length=20),
                Column(name="extratimeScore", type=ColumnType.VARCHAR, length=20),
                Column(name="penaltyScore", type=ColumnType.VARCHAR, length=20),
                Column(name="finalScore", type=ColumnType.VARCHAR, length=20),
                Column(name="homeTeamCountryName", type=ColumnType.VARCHAR, length=100),
                Column(name="awayTeamCountryName", type=ColumnType.VARCHAR, length=100),
            ],
        ),
        "matches_live_players": Table(
            name="matches_live_players",
            columns=[
                Column(
                    name="id",
                    type=ColumnType.INTEGER,
                    constraints=[
                        ColumnConstraint.NOT_NULL,
                        "REFERENCES matches(id)",
                    ],
                ),
                Column(name="playerId", type=ColumnType.INTEGER),
                Column(name="playerName", type=ColumnType.VARCHAR, length=255),
            ],
        ),
        "matches_live_info": Table(
            name="matches_live_info",
            columns=[
                Column(
                    name="id",
                    type=ColumnType.INTEGER,
                    constraints=[
                        ColumnConstraint.NOT_NULL,
                        "REFERENCES matches(id)",
                    ],
                ),
                Column(name="timeStamp", type=ColumnType.TIMESTAMP),
                Column(name="attendance", type=ColumnType.INTEGER),
                Column(name="venueName", type=ColumnType.VARCHAR, length=255),
                Column(name="weatherCode", type=ColumnType.VARCHAR, length=50),
                Column(name="elapsed", type=ColumnType.VARCHAR, length=20),
                Column(name="startTime", type=ColumnType.TIMESTAMP),
                Column(name="startDate", type=ColumnType.TIMESTAMP),
                Column(name="score", type=ColumnType.VARCHAR, length=20),
                Column(name="htScore", type=ColumnType.VARCHAR, length=20),
                Column(name="ftScore", type=ColumnType.VARCHAR, length=20),
                Column(name="etScore", type=ColumnType.VARCHAR, length=20),
                Column(name="pkScore", type=ColumnType.VARCHAR, length=20),
                Column(name="statusCode", type=ColumnType.INTEGER),
                Column(name="periodCode", type=ColumnType.INTEGER),
                Column(name="maxMinute", type=ColumnType.INTEGER),
                Column(name="minuteExpanded", type=ColumnType.INTEGER),
                Column(name="maxPeriod", type=ColumnType.INTEGER),
                Column(name="expandedMaxMinute", type=ColumnType.INTEGER),
                Column(name="timeoutInSeconds", type=ColumnType.INTEGER),
            ],
        ),
        "matches_live_referees": Table(
            name="matches_live_referees",
            columns=[
                Column(
                    name="id",
                    type=ColumnType.INTEGER,
                    constraints=[
                        ColumnConstraint.NOT_NULL,
                        "REFERENCES matches(id)",
                    ],
                ),
                Column(
                    name="officialId",
                    type=ColumnType.INTEGER,
                    constraints=[ColumnConstraint.NOT_NULL],
                ),
                Column(name="firstName", type=ColumnType.VARCHAR, length=100),
                Column(name="lastName", type=ColumnType.VARCHAR, length=100),
                Column(name="hasParticipatedMatches", type=ColumnType.BOOLEAN),
                Column(name="name", type=ColumnType.VARCHAR, length=200),
            ],
        ),
        "matches_live_home_info": Table(
            name="matches_live_home_info",
            columns=[
                Column(
                    name="id",
                    type=ColumnType.INTEGER,
                    constraints=[
                        ColumnConstraint.NOT_NULL,
                        "REFERENCES matches(id)",
                    ],
                ),
                Column(name="teamId", type=ColumnType.INTEGER),
                Column(name="name", type=ColumnType.VARCHAR, length=255),
                Column(name="countryName", type=ColumnType.VARCHAR, length=100),
                Column(name="managerName", type=ColumnType.VARCHAR, length=255),
                Column(name="field", type=ColumnType.VARCHAR, length=10),
                Column(name="averageAge", type=ColumnType.REAL),
                Column(name="halftime", type=ColumnType.INTEGER),
                Column(name="fulltime", type=ColumnType.INTEGER),
                Column(name="running", type=ColumnType.INTEGER),
                Column(name="extratime", type=ColumnType.INTEGER),
                Column(name="penalty", type=ColumnType.INTEGER),
            ],
        ),
        "matches_live_away_info": Table(
            name="matches_live_away_info",
            columns=[
                Column(
                    name="id",
                    type=ColumnType.INTEGER,
                    constraints=[
                        ColumnConstraint.NOT_NULL,
                        "REFERENCES matches(id)",
                    ],
                ),
                Column(name="teamId", type=ColumnType.INTEGER),
                Column(name="name", type=ColumnType.VARCHAR, length=255),
                Column(name="countryName", type=ColumnType.VARCHAR, length=100),
                Column(name="managerName", type=ColumnType.VARCHAR, length=255),
                Column(name="field", type=ColumnType.VARCHAR, length=10),
                Column(name="averageAge", type=ColumnType.REAL),
                Column(name="halftime", type=ColumnType.INTEGER),
                Column(name="fulltime", type=ColumnType.INTEGER),
                Column(name="running", type=ColumnType.INTEGER),
                Column(name="extratime", type=ColumnType.INTEGER),
                Column(name="penalty", type=ColumnType.INTEGER),
            ],
        ),
        "matches_live_home_stats": Table(
            name="matches_live_home_stats",
            columns=[
                Column(
                    name="id",
                    type=ColumnType.INTEGER,
                    constraints=[
                        ColumnConstraint.NOT_NULL,
                        "REFERENCES matches(id)",
                    ],
                ),
                Column(name="minute", type=ColumnType.INTEGER),
                Column(name="period", type=ColumnType.INTEGER),
                Column(name="ratings", type=ColumnType.REAL),
                Column(name="shotsTotal", type=ColumnType.INTEGER),
                Column(name="shotsOnTarget", type=ColumnType.INTEGER),
                Column(name="shotsOffTarget", type=ColumnType.INTEGER),
                Column(name="shotsBlocked", type=ColumnType.INTEGER),
                Column(name="shotsOnPost", type=ColumnType.INTEGER),
                Column(name="clearances", type=ColumnType.INTEGER),
                Column(name="interceptions", type=ColumnType.INTEGER),
                Column(name="possession", type=ColumnType.INTEGER),
                Column(name="touches", type=ColumnType.INTEGER),
                Column(name="passesTotal", type=ColumnType.INTEGER),
                Column(name="passesAccurate", type=ColumnType.INTEGER),
                Column(name="passesKey", type=ColumnType.INTEGER),
                Column(name="passSuccess", type=ColumnType.REAL),
                Column(name="aerialsTotal", type=ColumnType.INTEGER),
                Column(name="aerialsWon", type=ColumnType.INTEGER),
                Column(name="aerialSuccess", type=ColumnType.REAL),
                Column(name="offensiveAerials", type=ColumnType.INTEGER),
                Column(name="defensiveAerials", type=ColumnType.INTEGER),
                Column(name="cornersTotal", type=ColumnType.INTEGER),
                Column(name="cornersAccurate", type=ColumnType.INTEGER),
                Column(name="throwInsTotal", type=ColumnType.INTEGER),
                Column(name="throwInsAccurate", type=ColumnType.INTEGER),
                Column(name="throwInAccuracy", type=ColumnType.REAL),
                Column(name="offsidesCaught", type=ColumnType.INTEGER),
                Column(name="foulsCommited", type=ColumnType.INTEGER),
                Column(name="tacklesTotal", type=ColumnType.INTEGER),
                Column(name="tackleSuccessful", type=ColumnType.INTEGER),
                Column(name="tackleUnsuccesful", type=ColumnType.INTEGER),
                Column(name="tackleSuccess", type=ColumnType.REAL),
                Column(name="dribbledPast", type=ColumnType.INTEGER),
                Column(name="dribblesWon", type=ColumnType.INTEGER),
                Column(name="dribblesAttempted", type=ColumnType.INTEGER),
                Column(name="dribblesLost", type=ColumnType.INTEGER),
                Column(name="dribbleSuccess", type=ColumnType.REAL),
                Column(name="dispossessed", type=ColumnType.INTEGER),
                Column(name="errors", type=ColumnType.INTEGER),
            ],
        ),
        "matches_live_away_stats": Table(
            name="matches_live_away_stats",
            columns=[
                Column(
                    name="id",
                    type=ColumnType.INTEGER,
                    constraints=[
                        ColumnConstraint.NOT_NULL,
                        "REFERENCES matches(id)",
                    ],
                ),
                Column(name="minute", type=ColumnType.INTEGER),
                Column(name="period", type=ColumnType.INTEGER),
                Column(name="ratings", type=ColumnType.REAL),
                Column(name="shotsTotal", type=ColumnType.INTEGER),
                Column(name="shotsOnTarget", type=ColumnType.INTEGER),
                Column(name="shotsOffTarget", type=ColumnType.INTEGER),
                Column(name="shotsBlocked", type=ColumnType.INTEGER),
                Column(name="shotsOnPost", type=ColumnType.INTEGER),
                Column(name="clearances", type=ColumnType.INTEGER),
                Column(name="interceptions", type=ColumnType.INTEGER),
                Column(name="possession", type=ColumnType.INTEGER),
                Column(name="touches", type=ColumnType.INTEGER),
                Column(name="passesTotal", type=ColumnType.INTEGER),
                Column(name="passesAccurate", type=ColumnType.INTEGER),
                Column(name="passesKey", type=ColumnType.INTEGER),
                Column(name="passSuccess", type=ColumnType.REAL),
                Column(name="aerialsTotal", type=ColumnType.INTEGER),
                Column(name="aerialsWon", type=ColumnType.INTEGER),
                Column(name="aerialSuccess", type=ColumnType.REAL),
                Column(name="offensiveAerials", type=ColumnType.INTEGER),
                Column(name="defensiveAerials", type=ColumnType.INTEGER),
                Column(name="cornersTotal", type=ColumnType.INTEGER),
                Column(name="cornersAccurate", type=ColumnType.INTEGER),
                Column(name="throwInsTotal", type=ColumnType.INTEGER),
                Column(name="throwInsAccurate", type=ColumnType.INTEGER),
                Column(name="throwInAccuracy", type=ColumnType.REAL),
                Column(name="offsidesCaught", type=ColumnType.INTEGER),
                Column(name="foulsCommited", type=ColumnType.INTEGER),
                Column(name="tacklesTotal", type=ColumnType.INTEGER),
                Column(name="tackleSuccessful", type=ColumnType.INTEGER),
                Column(name="tackleUnsuccesful", type=ColumnType.INTEGER),
                Column(name="tackleSuccess", type=ColumnType.REAL),
                Column(name="dribbledPast", type=ColumnType.INTEGER),
                Column(name="dribblesWon", type=ColumnType.INTEGER),
                Column(name="dribblesAttempted", type=ColumnType.INTEGER),
                Column(name="dribblesLost", type=ColumnType.INTEGER),
                Column(name="dribbleSuccess", type=ColumnType.REAL),
                Column(name="dispossessed", type=ColumnType.INTEGER),
                Column(name="errors", type=ColumnType.INTEGER),
            ],
        ),
        "proxy": Table(
            name="proxy",
            columns=[
                Column(name="source", type=ColumnType.VARCHAR, length=50),
                Column(name="ip", type=ColumnType.VARCHAR, length=15),
                Column(name="port", type=ColumnType.INTEGER),
                Column(name="code", type=ColumnType.VARCHAR, length=4),
                Column(name="anonymity", type=ColumnType.VARCHAR, length=32),
                Column(name="https", type=ColumnType.BOOLEAN),
                Column(name="datetime_add", type=ColumnType.TIMESTAMP),
                Column(name="last_checked", type=ColumnType.TIMESTAMP),
            ],
        ),
    }

    model_config = {
        "env_file": PACKAGE_ROOT / ".env",
        "env_file_encoding": "utf-8",
        "extra": "ignore",
    }

    type_map: ClassVar[dict] = {
        "integer": "Int64",
        "bigint": "Int64",
        "real": "float",
        "decimal": "float",
        "boolean": "bool",
        "timestamp": "datetime64[ns]",
        "date": "datetime64[ns]",
        "varchar": "string",
        "text": "string",
        "serial": "Int64",
        "inet": "string",
    }

    def get_connection_string(self, db_name: Optional[str] = None) -> str:
        """Build a PostgreSQL connection URL for *db_name*.

        Args:
            db_name: Target database name.  Defaults to ``self.db_soccer``.

        Returns:
            SQLAlchemy-compatible ``postgresql://user:pass@host:port/db``
            connection string.
        """
        db = db_name or self.db_soccer
        host = self.host_internal if self.use_internal else self.host
        port = self.port_internal if self.use_internal else self.port
        return f"postgresql://{self.user}:{self.password}@{host}:{port}/{db}"

get_connection_string(db_name=None)

Build a PostgreSQL connection URL for db_name.

Parameters:

Name Type Description Default
db_name Optional[str]

Target database name. Defaults to self.db_soccer.

None

Returns:

Type Description
SQLAlchemy-compatible ``postgresql

//user:pass@host:port/db``

str

connection string.

Source code in src/app/config/database.py
def get_connection_string(self, db_name: Optional[str] = None) -> str:
    """Build a PostgreSQL connection URL for *db_name*.

    Args:
        db_name: Target database name.  Defaults to ``self.db_soccer``.

    Returns:
        SQLAlchemy-compatible ``postgresql://user:pass@host:port/db``
        connection string.
    """
    db = db_name or self.db_soccer
    host = self.host_internal if self.use_internal else self.host
    port = self.port_internal if self.use_internal else self.port
    return f"postgresql://{self.user}:{self.password}@{host}:{port}/{db}"

get_db_settings() cached

Return the database settings singleton (lazy, cached).

Source code in src/app/config/database.py
@lru_cache(maxsize=1)
def get_db_settings() -> DatabaseSettings:
    """Return the database settings singleton (lazy, cached)."""
    return DatabaseSettings()

MLFlowSettings

Bases: BaseSettings

MLflow tracking server and model registry settings.

Credentials and server URL are read from MLFLOW_* env vars. model_name and model_stage default to values from the inference block in params.yaml and can be overridden via env vars for CI/CD or hotfixes.

Source code in src/app/config/mlflow.py
class MLFlowSettings(BaseSettings):
    """MLflow tracking server and model registry settings.

    Credentials and server URL are read from ``MLFLOW_*`` env vars.
    ``model_name`` and ``model_stage`` default to values from the
    ``inference`` block in ``params.yaml`` and can be overridden via
    env vars for CI/CD or hotfixes.
    """

    user: str = Field(..., validation_alias="MLFLOW_USER")
    password: str = Field(..., validation_alias="MLFLOW_PASSWORD")
    tracking_uri: str = Field(..., validation_alias="MLFLOW_TRACKING_URL")
    # Defaults come from params.yaml; env vars override for CI/CD or hotfixes.
    model_name: str = Field(_DEFAULT_MODEL_NAME, validation_alias="MLFLOW_MODEL_NAME")
    # Primary stage / alias used when no explicit stage is requested.
    model_stage: str = Field(
        _DEFAULT_MODEL_STAGE, validation_alias="MLFLOW_MODEL_STAGE"
    )
    # Optional CSV env override: MLFLOW_MODEL_STAGES=champion,challenger
    # When unset, effective_stages uses the params.yaml list.
    # Stored as plain str to prevent pydantic-settings JSON decoding.
    model_stages_csv: str = Field("", validation_alias="MLFLOW_MODEL_STAGES")

    @property
    def effective_stages(self) -> list[str]:
        """Stages to pre-load in the ML worker.

        Priority:
        1. MLFLOW_MODEL_STAGES env var (CSV) — for CI/CD overrides.
        2. inference.model_stages list from params.yaml.
        3. Fallback: [model_stage].
        """
        if self.model_stages_csv:
            return [s.strip() for s in self.model_stages_csv.split(",") if s.strip()]
        return _DEFAULT_MODEL_STAGES or [self.model_stage]

    model_config = {
        "env_file": PACKAGE_ROOT / ".env",
        "env_file_encoding": "utf-8",
        "extra": "ignore",
    }

effective_stages property

Stages to pre-load in the ML worker.

Priority: 1. MLFLOW_MODEL_STAGES env var (CSV) — for CI/CD overrides. 2. inference.model_stages list from params.yaml. 3. Fallback: [model_stage].

get_mlflow_settings() cached

Return the MLflow settings singleton (lazy, cached).

Source code in src/app/config/mlflow.py
@lru_cache(maxsize=1)
def get_mlflow_settings() -> MLFlowSettings:
    """Return the MLflow settings singleton (lazy, cached)."""
    return MLFlowSettings()

ScraperSettings

Bases: BaseSettings

Selenoid scraper infrastructure settings.

Reads the Selenoid server IP from the SCRAPER_IP env var.

Source code in src/app/config/scraper.py
class ScraperSettings(BaseSettings):
    """Selenoid scraper infrastructure settings.

    Reads the Selenoid server IP from the ``SCRAPER_IP`` env var.
    """

    server_ip: str = Field(..., validation_alias="SCRAPER_IP")

    model_config = {
        "env_file": PACKAGE_ROOT / ".env",
        "env_file_encoding": "utf-8",
        "extra": "ignore",
    }

get_scraper_settings() cached

Return the scraper settings singleton (lazy, cached).

Source code in src/app/config/scraper.py
@lru_cache(maxsize=1)
def get_scraper_settings() -> ScraperSettings:
    """Return the scraper settings singleton (lazy, cached)."""
    return ScraperSettings()

SecuritySettings

Bases: BaseSettings

API authentication and CORS configuration.

Reads the header token (FASTAPI_HEADER_TOKEN), query token (FASTAPI_QUERY_TOKEN), and the CORS allow-list CSV (CORS_ALLOWED_ORIGINS) from env vars.

Source code in src/app/config/security.py
class SecuritySettings(BaseSettings):
    """API authentication and CORS configuration.

    Reads the header token (``FASTAPI_HEADER_TOKEN``), query token
    (``FASTAPI_QUERY_TOKEN``), and the CORS allow-list CSV
    (``CORS_ALLOWED_ORIGINS``) from env vars.
    """

    token_header: str = Field(..., validation_alias="FASTAPI_HEADER_TOKEN")
    token_query: str = Field(..., validation_alias="FASTAPI_QUERY_TOKEN")
    # CORS allow-list: comma-separated origins, e.g.
    #   CORS_ALLOWED_ORIGINS="https://ui.example.com,https://admin.example.com"
    # Empty string = no cross-origin requests allowed (safe default for prod).
    # Use "*" only for local development.
    cors_allowed_origins_csv: str = Field("", validation_alias="CORS_ALLOWED_ORIGINS")

    @property
    def cors_allowed_origins(self) -> list[str]:
        """Parse the CORS allow-list from the ``CORS_ALLOWED_ORIGINS`` CSV env var.

        Returns:
            Empty list if unset (blocks all cross-origin requests),
            ``["*"]`` for the wildcard value, or a list of origin
            strings for explicit allow-lists.
        """
        raw = self.cors_allowed_origins_csv.strip()
        if not raw:
            return []
        if raw == "*":
            return ["*"]
        return [o.strip() for o in raw.split(",") if o.strip()]

    model_config = {
        "env_file": PACKAGE_ROOT / ".env",
        "env_file_encoding": "utf-8",
        "extra": "ignore",
    }

cors_allowed_origins property

Parse the CORS allow-list from the CORS_ALLOWED_ORIGINS CSV env var.

Returns:

Type Description
list[str]

Empty list if unset (blocks all cross-origin requests),

list[str]

["*"] for the wildcard value, or a list of origin

list[str]

strings for explicit allow-lists.

get_security_settings() cached

Return the security settings singleton (lazy, cached).

Source code in src/app/config/security.py
@lru_cache(maxsize=1)
def get_security_settings() -> SecuritySettings:
    """Return the security settings singleton (lazy, cached)."""
    return SecuritySettings()

MinioSettings

Bases: BaseSettings

MinIO / S3-compatible object storage settings.

Reads credentials and endpoint URL from MINIO_* env vars. All project buckets are declared explicitly so that references are discoverable and auditable in one place.

Source code in src/app/config/storage.py
class MinioSettings(BaseSettings):
    """MinIO / S3-compatible object storage settings.

    Reads credentials and endpoint URL from ``MINIO_*`` env vars.
    All project buckets are declared explicitly so that references
    are discoverable and auditable in one place.
    """

    access_key: str = Field(..., validation_alias="MINIO_USER")
    secret_key: str = Field(..., validation_alias="MINIO_PASSWORD")
    endpoint_url: str = Field(..., validation_alias="MINIO_ENDPOINT_URL")
    bucket_data_raw: str = Field(..., validation_alias="MINIO_BUCKET_DATA_RAW")
    bucket_livescores_raw: str = Field(
        ..., validation_alias="MINIO_BUCKET_LIVESCORES_RAW"
    )
    bucket_livescores: str = Field(..., validation_alias="MINIO_BUCKET_LIVESCORES")
    bucket_matches_live_raw: str = Field(
        ..., validation_alias="MINIO_BUCKET_MATCHES_LIVES_RAW"
    )
    bucker_ui: str = Field(..., validation_alias="MINIO_BUCKET_UI")
    bucket_predictions: str = Field("", validation_alias="MINIO_BUCKET_PREDICTIONS")

    @property
    def storage_options(self) -> dict:
        """Return fsspec/s3fs storage options for pandas/pyarrow I/O.

        Returns:
            Dict with ``key``, ``secret``, and ``client_kwargs``
            (containing ``endpoint_url``) suitable for
            ``pd.read_parquet(..., storage_options=...)``.
        """
        return {
            "key": self.access_key,
            "secret": self.secret_key,
            "client_kwargs": {
                "endpoint_url": self.endpoint_url,
            },
        }

    model_config = {
        "env_file": PACKAGE_ROOT / ".env",
        "env_file_encoding": "utf-8",
        "extra": "ignore",
    }

storage_options property

Return fsspec/s3fs storage options for pandas/pyarrow I/O.

Returns:

Type Description
dict

Dict with key, secret, and client_kwargs

dict

(containing endpoint_url) suitable for

dict

pd.read_parquet(..., storage_options=...).

get_minio_settings() cached

Return the MinIO settings singleton (lazy, cached).

Source code in src/app/config/storage.py
@lru_cache(maxsize=1)
def get_minio_settings() -> MinioSettings:
    """Return the MinIO settings singleton (lazy, cached)."""
    return MinioSettings()

on_starting(server)

Log a message when the Gunicorn arbiter starts.

Source code in src/app/config/gunicorn.py
def on_starting(server):  # noqa: ARG001
    """Log a message when the Gunicorn arbiter starts."""
    _logger.info("Starting Gunicorn server...")

when_ready(server)

Log a message when Gunicorn finishes initialisation and is ready.

Source code in src/app/config/gunicorn.py
def when_ready(server):  # noqa: ARG001
    """Log a message when Gunicorn finishes initialisation and is ready."""
    _logger.info("Gunicorn is ready. Listening on: %s", bind)

worker_int(worker)

Log worker PID when the worker process receives SIGINT.

Source code in src/app/config/gunicorn.py
def worker_int(worker):
    """Log worker PID when the worker process receives SIGINT."""
    _logger.info("Worker %s received SIGINT", worker.pid)

worker_abort(worker)

Log worker PID when the worker process receives SIGABRT.

Source code in src/app/config/gunicorn.py
def worker_abort(worker):
    """Log worker PID when the worker process receives SIGABRT."""
    _logger.info("Worker %s received SIGABRT", worker.pid)