"""
Ingestie date statistice — citire din CSV/XLSX/Parquet,
transformare și salvare ca Parquet + actualizare catalog.
"""
import os
from pathlib import Path
from typing import Optional

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from rich.console import Console

from datahub_ingest.catalog import load_catalog, save_catalog

console = Console()


def read_source(source_path: str) -> pd.DataFrame:
    """Citește un fișier sursă în funcție de extensie."""
    ext = Path(source_path).suffix.lower()

    if ext == ".csv":
        return pd.read_csv(source_path)
    elif ext in (".xlsx", ".xls"):
        return pd.read_excel(source_path)
    elif ext == ".parquet":
        return pd.read_parquet(source_path)
    elif ext == ".json":
        return pd.read_json(source_path)
    else:
        raise ValueError(f"Format nesuportat: {ext}. Folosiți CSV, XLSX, Parquet sau JSON.")


def ingest_dataset(
    dataset_id: str,
    source_path: str,
    level: str,
    join_key: str,
    name: str = "",
    description: str = "",
    source_name: str = "",
    license_name: str = "",
    output_dir: str = "../data/statistics",
    catalog_path: str = "../data/catalog.json",
):
    """
    Procesează un fișier sursă de date statistice:
    1. Citește datele
    2. Validează coloana de join
    3. Salvează ca Parquet
    4. Actualizează catalogul
    """
    console.print(f"[blue]Citire sursă:[/blue] {source_path}")
    df = read_source(source_path)
    console.print(f"  → {len(df)} rânduri, {len(df.columns)} coloane")

    # Verify join key exists
    if join_key not in df.columns:
        console.print(f"[red]✕ Coloana de join '{join_key}' nu există![/red]")
        console.print(f"  Coloane disponibile: {', '.join(df.columns)}")
        raise ValueError(f"Join key '{join_key}' nu a fost găsit")

    # Convert join key to string for consistent matching
    df[join_key] = df[join_key].astype(str)

    # Detect column types
    columns_meta = []
    data_columns = [c for c in df.columns if c != join_key]

    for col in data_columns:
        dtype = df[col].dtype
        if pd.api.types.is_integer_dtype(dtype):
            col_type = "integer"
        elif pd.api.types.is_float_dtype(dtype):
            col_type = "float"
        elif pd.api.types.is_bool_dtype(dtype):
            col_type = "boolean"
        else:
            col_type = "string"

        columns_meta.append({
            "id": col,
            "name": col,  # Can be updated manually in catalog
            "description": "",
            "unit": "",
            "type": col_type,
        })

    # Save as Parquet
    os.makedirs(output_dir, exist_ok=True)
    output_path = os.path.join(output_dir, f"{dataset_id}_{level}.parquet")
    df.to_parquet(output_path, index=False, engine="pyarrow")

    file_size = os.path.getsize(output_path)
    console.print(
        f"[green]✓[/green] Salvat: {output_path} ({file_size / 1024:.1f} KB)"
    )

    # Update catalog
    catalog = load_catalog(catalog_path)

    # Check if dataset already exists
    existing = next((d for d in catalog["datasets"] if d["id"] == dataset_id), None)

    if existing:
        # Add level if not already present
        if level not in existing.get("geometry_levels", []):
            existing["geometry_levels"].append(level)
        # Update columns if they changed
        existing["columns"] = columns_meta
        console.print(f"[yellow]↻[/yellow] Set de date existent actualizat: {dataset_id}")
    else:
        # Add new dataset entry
        new_entry = {
            "id": dataset_id,
            "name": name or dataset_id,
            "description": description,
            "source": source_name,
            "source_url": "",
            "license": license_name,
            "license_url": "",
            "acquisition": "",
            "geometry_levels": [level],
            "temporal": {
                "type": None,
                "values": [],
            },
            "join_key": join_key,
            "columns": columns_meta,
        }
        catalog["datasets"].append(new_entry)
        console.print(f"[green]✓[/green] Intrare nouă în catalog: {dataset_id}")

    save_catalog(catalog, catalog_path)
    console.print(
        f"[green]✓[/green] Catalog actualizat: {catalog_path}"
    )
