{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "8f955f11-3f2a-4d07-8c6c-ad9f1549dc6a",
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "os.environ[\"HADOOP_HOME\"] = \"C:\\\\hadoop\"\n",
    "os.environ[\"PATH\"] += \";C:\\\\hadoop\\\\bin\""
   ]
  },
  {
   "cell_type": "markdown",
   "id": "4e6b39b7",
   "metadata": {},
   "source": [
    "# DE2 — Final Project Notebook\n",
    "**Data Engineering II — Data-Intensive Workloads — ESIEE 2025-2026**\n",
    "\n",
    "**Auteurs :** Sara AISSAOUI & Yannick PRAT  \n",
    "**Track :** C — Micromobilité (Citi Bike JC-202604)  \n",
    "**Dataset :** Jersey City, avril 2026 — 82 272 trajets\n",
    "\n",
    "---\n",
    "\n",
    "## Pipeline end-to-end\n",
    "\n",
    "```\n",
    "CSV brut (18 MB)\n",
    "  → [1] Bronze  : landing immutable\n",
    "  → [2] Silver  : nettoyage, typage, déduplication\n",
    "  → [3] Gold    : tables analytiques partitionnées\n",
    "  → [4] Streaming : fenêtre 15 min + watermark → Parquet\n",
    "  → [5] Text    : index inversé sur noms de stations\n",
    "  → [6] Clustering : KMeans sur features stations\n",
    "  → [7] LLM Ready : dataset curé pour RAG\n",
    "  → [8] Evidence : plans + métriques consolidées\n",
    "```\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "29b3b430",
   "metadata": {},
   "source": [
    "## 0. Setup — SparkSession & Config"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "7365cc17",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Spark version : 4.0.0\n",
      "Spark UI      : http://127.0.0.1:4040\n",
      "Spark UI (WSL): http://localhost:4040\n",
      "Config chargée : ['track', 'dataset', 'team', 'paths', 'spark', 'schema', 'layout', 'streaming', 'text', 'clustering', 'llm', 'slos']\n"
     ]
    }
   ],
   "source": [
    "import os, sys, time, datetime, pathlib, json, shutil, io, csv\n",
    "import statistics\n",
    "from urllib.parse import urlparse\n",
    "\n",
    "import yaml\n",
    "from pyspark.sql import SparkSession, functions as F\n",
    "from pyspark.sql.types import (\n",
    "    StructType, StructField,\n",
    "    StringType, TimestampType, DoubleType, IntegerType, LongType\n",
    ")\n",
    "\n",
    "# ── Load config ───────────────────────────────────────────────────────────────\n",
    "with open(\"de2_project_config.yml\") as f:\n",
    "    CFG = yaml.safe_load(f)\n",
    "\n",
    "# ── Network config (WSL / Linux compat) ───────────────────────────────────────\n",
    "DE2_SPARK_DRIVER_HOST  = os.environ.get(\"DE2_SPARK_DRIVER_HOST\",  \"127.0.0.1\")\n",
    "DE2_SPARK_BIND_ADDRESS = os.environ.get(\"DE2_SPARK_BIND_ADDRESS\", \"0.0.0.0\")\n",
    "os.environ.setdefault(\"SPARK_LOCAL_IP\", DE2_SPARK_DRIVER_HOST)\n",
    "\n",
    "spark = (\n",
    "    SparkSession.builder\n",
    "    .appName(CFG[\"spark\"][\"app_name\"])\n",
    "    .master(CFG[\"spark\"][\"master\"])\n",
    "    .config(\"spark.driver.host\",        DE2_SPARK_DRIVER_HOST)\n",
    "    .config(\"spark.driver.bindAddress\", DE2_SPARK_BIND_ADDRESS)\n",
    "    .config(\"spark.ui.bindAddress\",     DE2_SPARK_BIND_ADDRESS)\n",
    "    .config(\"spark.sql.shuffle.partitions\",\n",
    "            str(CFG[\"spark\"][\"shuffle_partitions_default\"]))\n",
    "    .getOrCreate()\n",
    ")\n",
    "\n",
    "ui_url = spark.sparkContext.uiWebUrl\n",
    "ui_port = urlparse(ui_url).port or 4040\n",
    "print(\"Spark version :\", spark.version)\n",
    "print(\"Spark UI      :\", ui_url)\n",
    "print(\"Spark UI (WSL):\", f\"http://localhost:{ui_port}\")\n",
    "print(\"Config chargée :\", list(CFG.keys()))\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "id": "5e091f3d",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Tous les répertoires créés.\n",
      "Dataset source : data/JC-202604-citibike-tripdata.csv\n"
     ]
    }
   ],
   "source": [
    "# ── Créer tous les répertoires de sortie ──────────────────────────────────────\n",
    "for key, path in CFG[\"paths\"].items():\n",
    "    if key not in (\"raw_csv_glob\",):\n",
    "        pathlib.Path(path).mkdir(parents=True, exist_ok=True)\n",
    "\n",
    "# ── Registro de métriques — sera rempli tout au long du pipeline ───────────────\n",
    "METRICS = []  # list of dicts → project_metrics_log.csv\n",
    "\n",
    "def log_metric(run_id, stage, task, metric_name, metric_value, notes=\"\"):\n",
    "    METRICS.append({\n",
    "        \"run_id\":       run_id,\n",
    "        \"stage\":        stage,\n",
    "        \"task\":         task,\n",
    "        \"metric_name\":  metric_name,\n",
    "        \"metric_value\": metric_value,\n",
    "        \"notes\":        notes,\n",
    "        \"timestamp\":    datetime.datetime.utcnow().isoformat()\n",
    "    })\n",
    "\n",
    "RAW_CSV = CFG[\"paths\"][\"raw_csv_glob\"]\n",
    "print(\"Tous les répertoires créés.\")\n",
    "print(f\"Dataset source : {RAW_CSV}\")\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "f79a1eaa",
   "metadata": {},
   "source": [
    "## 1. Bronze — Landing immutable\n",
    "\n",
    "Le Bronze est une **copie brute** du CSV original, sans aucune transformation.  \n",
    "On le stocke en Parquet (1 partition) pour conserver une trace immuable.\n",
    "\n",
    "**Principe :** on ne transforme jamais la source — en cas d'erreur en Silver/Gold,  \n",
    "on peut tout rejouer depuis le Bronze.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "f02e1742",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Bronze — 82,272 lignes écrites en 6.1s\n",
      "Chemin : outputs/project/bronze\n",
      "root\n",
      " |-- ride_id: string (nullable = true)\n",
      " |-- rideable_type: string (nullable = true)\n",
      " |-- started_at: string (nullable = true)\n",
      " |-- ended_at: string (nullable = true)\n",
      " |-- start_station_name: string (nullable = true)\n",
      " |-- start_station_id: string (nullable = true)\n",
      " |-- end_station_name: string (nullable = true)\n",
      " |-- end_station_id: string (nullable = true)\n",
      " |-- start_lat: string (nullable = true)\n",
      " |-- start_lng: string (nullable = true)\n",
      " |-- end_lat: string (nullable = true)\n",
      " |-- end_lng: string (nullable = true)\n",
      " |-- member_casual: string (nullable = true)\n",
      "\n",
      "+----------------+-------------+-----------------------+-----------------------+------------------+----------------+------------------------------------------+--------------+------------------+------------------+-----------------+------------------+-------------+\n",
      "|ride_id         |rideable_type|started_at             |ended_at               |start_station_name|start_station_id|end_station_name                          |end_station_id|start_lat         |start_lng         |end_lat          |end_lng           |member_casual|\n",
      "+----------------+-------------+-----------------------+-----------------------+------------------+----------------+------------------------------------------+--------------+------------------+------------------+-----------------+------------------+-------------+\n",
      "|558250BE9BDDEF62|classic_bike |2026-04-08 11:01:58.516|2026-04-08 11:15:28.078|City Hall         |JC003           |Southwest Park - Jackson St & Observer Hwy|HB401         |40.7177325        |-74.043845        |40.73755127245804|-74.04166370630264|member       |\n",
      "|DE08A3A0DC829851|electric_bike|2026-04-04 14:28:31.751|2026-04-04 14:31:34.921|6 St & Grand St   |HB302           |Willow Ave & 12 St                        |HB505         |40.744397833095604|-74.03450086712837|40.7518674823282 |-74.03037697076797|member       |\n",
      "|B0434D0A2865B3E2|electric_bike|2026-04-27 18:25:42.870|2026-04-27 18:30:55.446|6 St & Grand St   |HB302           |Southwest Park - Jackson St & Observer Hwy|HB401         |40.744397833095604|-74.03450086712837|40.73755127245804|-74.04166370630264|casual       |\n",
      "|1D805A4835F0A8AB|classic_bike |2026-04-19 16:59:12.513|2026-04-19 17:04:03.409|6 St & Grand St   |HB302           |Willow Ave & 12 St                        |HB505         |40.744397833095604|-74.03450086712837|40.7518674823282 |-74.03037697076797|casual       |\n",
      "|854CE9A4A8705F8F|electric_bike|2026-04-27 18:46:25.717|2026-04-27 18:50:36.255|6 St & Grand St   |HB302           |Willow Ave & 12 St                        |HB505         |40.744397833095604|-74.03450086712837|40.7518674823282 |-74.03037697076797|casual       |\n",
      "+----------------+-------------+-----------------------+-----------------------+------------------+----------------+------------------------------------------+--------------+------------------+------------------+-----------------+------------------+-------------+\n",
      "only showing top 5 rows\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "C:\\Users\\yoyod\\AppData\\Local\\Temp\\ipykernel_22608\\4028596838.py:17: DeprecationWarning: datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).\n",
      "  \"timestamp\":    datetime.datetime.utcnow().isoformat()\n"
     ]
    }
   ],
   "source": [
    "# ── Schéma explicite Bronze (tout en String pour être fidèle au CSV brut) ──────\n",
    "bronze_schema = StructType([\n",
    "    StructField(\"ride_id\",            StringType(), True),\n",
    "    StructField(\"rideable_type\",      StringType(), True),\n",
    "    StructField(\"started_at\",         StringType(), True),\n",
    "    StructField(\"ended_at\",           StringType(), True),\n",
    "    StructField(\"start_station_name\", StringType(), True),\n",
    "    StructField(\"start_station_id\",   StringType(), True),\n",
    "    StructField(\"end_station_name\",   StringType(), True),\n",
    "    StructField(\"end_station_id\",     StringType(), True),\n",
    "    StructField(\"start_lat\",          StringType(), True),\n",
    "    StructField(\"start_lng\",          StringType(), True),\n",
    "    StructField(\"end_lat\",            StringType(), True),\n",
    "    StructField(\"end_lng\",            StringType(), True),\n",
    "    StructField(\"member_casual\",      StringType(), True),\n",
    "])\n",
    "\n",
    "t0 = time.time()\n",
    "\n",
    "df_bronze = (\n",
    "    spark.read\n",
    "    .schema(bronze_schema)\n",
    "    .option(\"header\", \"true\")\n",
    "    .csv(RAW_CSV)\n",
    ")\n",
    "\n",
    "n_bronze = df_bronze.count()\n",
    "\n",
    "# Écriture Bronze immutable (mode overwrite pour idempotence)\n",
    "df_bronze.write.mode(\"overwrite\").parquet(CFG[\"paths\"][\"bronze\"])\n",
    "\n",
    "t_bronze = time.time() - t0\n",
    "\n",
    "print(f\"Bronze — {n_bronze:,} lignes écrites en {t_bronze:.1f}s\")\n",
    "print(f\"Chemin : {CFG['paths']['bronze']}\")\n",
    "df_bronze.printSchema()\n",
    "df_bronze.show(5, truncate=False)\n",
    "\n",
    "log_metric(\"r1\", \"etl\", \"bronze_landing\", \"row_count\",    n_bronze,      \"raw CSV rows\")\n",
    "log_metric(\"r1\", \"etl\", \"bronze_landing\", \"elapsed_sec\",  round(t_bronze,2), \"\")\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "19f5160d",
   "metadata": {},
   "source": [
    "## 2. Silver — Nettoyage, typage, déduplication\n",
    "\n",
    "Transformations appliquées (contrats de schéma) :\n",
    "1. **Cast** des timestamps et coordonnées GPS\n",
    "2. **Calcul** de `duration_sec`\n",
    "3. **Filtre domaine** : durée entre 60s et 7200s, `ride_id` non nul\n",
    "4. **Filtre domaine** : `member_casual` ∈ {member, casual}\n",
    "5. **Déduplication** sur `ride_id`\n",
    "6. **Partitionnement** par `rideable_type`\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "710ab301",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Silver — 81,975 lignes (297 lignes filtrées) en 6.6s\n",
      "Taux de rétention : 99.6%\n",
      "root\n",
      " |-- ride_id: string (nullable = true)\n",
      " |-- rideable_type: string (nullable = true)\n",
      " |-- started_at: timestamp (nullable = true)\n",
      " |-- ended_at: timestamp (nullable = true)\n",
      " |-- start_station_name: string (nullable = true)\n",
      " |-- start_station_id: string (nullable = true)\n",
      " |-- end_station_name: string (nullable = true)\n",
      " |-- end_station_id: string (nullable = true)\n",
      " |-- start_lat: double (nullable = true)\n",
      " |-- start_lng: double (nullable = true)\n",
      " |-- end_lat: double (nullable = true)\n",
      " |-- end_lng: double (nullable = true)\n",
      " |-- member_casual: string (nullable = true)\n",
      " |-- duration_sec: double (nullable = true)\n",
      "\n",
      "+----------------+-------------+--------------------+--------------------+------------------+----------------+--------------------+--------------+----------------+------------------+-----------------+------------------+-------------+------------+\n",
      "|         ride_id|rideable_type|          started_at|            ended_at|start_station_name|start_station_id|    end_station_name|end_station_id|       start_lat|         start_lng|          end_lat|           end_lng|member_casual|duration_sec|\n",
      "+----------------+-------------+--------------------+--------------------+------------------+----------------+--------------------+--------------+----------------+------------------+-----------------+------------------+-------------+------------+\n",
      "|000C19EFDFE7A5CC|electric_bike|2026-04-26 19:15:...|2026-04-26 19:21:...|   4 St & Grand St|           HB301|Hoboken Ave at Mo...|         JC105|40.7422579775645|-74.03511106967926|40.73520838045357|-74.04696375131607|       member|       354.0|\n",
      "|00100FE62F3CC0D3|electric_bike|2026-04-21 18:28:...|2026-04-21 18:56:...|        Newark Ave|           JC032|Baldwin at Montgo...|         JC020|     40.72152515|     -74.046304543|       40.7236589|       -74.0641943|       member|      1713.0|\n",
      "|0016765A9F4849D0|electric_bike|2026-04-01 14:51:...|2026-04-01 15:02:...|         City Hall|           JC003|Pacific Ave & Com...|         JC118|      40.7177325|        -74.043845|        40.711354|        -74.062446|       member|       640.0|\n",
      "|0023B419BF7728C2| classic_bike|2026-04-29 07:52:...|2026-04-29 07:56:...| Madison St & 1 St|           HB402|Hoboken Terminal ...|         HB101|        40.73879|          -74.0393|40.73593758446329|-74.03030455112457|       member|       257.0|\n",
      "|002EFC2EE42983D7|electric_bike|2026-04-09 18:09:...|2026-04-09 18:16:...|        Newark Ave|           JC032|      Journal Square|         JC103|     40.72152515|     -74.046304543|         40.73367|          -74.0625|       member|       449.0|\n",
      "+----------------+-------------+--------------------+--------------------+------------------+----------------+--------------------+--------------+----------------+------------------+-----------------+------------------+-------------+------------+\n",
      "only showing top 5 rows\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "C:\\Users\\yoyod\\AppData\\Local\\Temp\\ipykernel_22608\\4028596838.py:17: DeprecationWarning: datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).\n",
      "  \"timestamp\":    datetime.datetime.utcnow().isoformat()\n"
     ]
    }
   ],
   "source": [
    "t0 = time.time()\n",
    "\n",
    "# Recharge depuis Bronze pour garantir la lignée\n",
    "df_b = spark.read.parquet(CFG[\"paths\"][\"bronze\"])\n",
    "\n",
    "# ── Contrats de schéma ────────────────────────────────────────────────────────\n",
    "df_silver = (\n",
    "    df_b\n",
    "    # 1. Cast des types\n",
    "    .withColumn(\"started_at\",  F.to_timestamp(\"started_at\"))\n",
    "    .withColumn(\"ended_at\",    F.to_timestamp(\"ended_at\"))\n",
    "    .withColumn(\"start_lat\",   F.col(\"start_lat\").cast(DoubleType()))\n",
    "    .withColumn(\"start_lng\",   F.col(\"start_lng\").cast(DoubleType()))\n",
    "    .withColumn(\"end_lat\",     F.col(\"end_lat\").cast(DoubleType()))\n",
    "    .withColumn(\"end_lng\",     F.col(\"end_lng\").cast(DoubleType()))\n",
    "    # 2. Durée en secondes\n",
    "    .withColumn(\"duration_sec\",\n",
    "        (F.unix_timestamp(\"ended_at\") - F.unix_timestamp(\"started_at\"))\n",
    "        .cast(DoubleType()))\n",
    "    # 3. Filtres domaine\n",
    "    .filter(F.col(\"ride_id\").isNotNull())\n",
    "    .filter(\n",
    "        (F.col(\"duration_sec\") >= CFG[\"schema\"][\"duration_min_sec\"]) &\n",
    "        (F.col(\"duration_sec\") <= CFG[\"schema\"][\"duration_max_sec\"])\n",
    "    )\n",
    "    # 4. Filtre valeurs valides\n",
    "    .filter(F.col(\"member_casual\").isin(CFG[\"schema\"][\"valid_member_casual\"]))\n",
    "    .filter(F.col(\"rideable_type\").isin(CFG[\"schema\"][\"valid_rideable_type\"]))\n",
    "    # 5. Déduplication\n",
    "    .dropDuplicates([\"ride_id\"])\n",
    ")\n",
    "\n",
    "n_silver = df_silver.count()\n",
    "n_dropped = n_bronze - n_silver\n",
    "\n",
    "# 6. Écriture partitionnée par rideable_type\n",
    "(\n",
    "    df_silver\n",
    "    .write\n",
    "    .mode(\"overwrite\")\n",
    "    .partitionBy(*CFG[\"layout\"][\"partition_by\"])\n",
    "    .parquet(CFG[\"paths\"][\"silver\"])\n",
    ")\n",
    "\n",
    "t_silver = time.time() - t0\n",
    "\n",
    "print(f\"Silver — {n_silver:,} lignes ({n_dropped:,} lignes filtrées) en {t_silver:.1f}s\")\n",
    "print(f\"Taux de rétention : {n_silver/n_bronze*100:.1f}%\")\n",
    "df_silver.printSchema()\n",
    "df_silver.show(5)\n",
    "\n",
    "log_metric(\"r1\", \"etl\", \"silver_clean\", \"row_count\",       n_silver,        \"après filtres\")\n",
    "log_metric(\"r1\", \"etl\", \"silver_clean\", \"rows_dropped\",    n_dropped,       \"\")\n",
    "log_metric(\"r1\", \"etl\", \"silver_clean\", \"retention_pct\",   round(n_silver/n_bronze*100,2), \"\")\n",
    "log_metric(\"r1\", \"etl\", \"silver_clean\", \"elapsed_sec\",     round(t_silver,2), \"\")\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "031d74a6",
   "metadata": {},
   "source": [
    "## 3. Gold — Tables analytiques\n",
    "\n",
    "Deux tables Gold :\n",
    "- **`gold/trips_by_station`** — agrégats par station de départ (clé naturelle : `start_station_id`)\n",
    "- **`gold/trips_by_day`** — volume quotidien par type d'utilisateur, partitionné par `member_casual`\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "dc2fd6ae",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Gold Q1 — trips_by_station :\n",
      "+----------------+--------------------+--------------+-----------------+----------------+----------------+------------------+------------------+------------+------------+------------------+\n",
      "|start_station_id|  start_station_name|num_departures| avg_duration_sec|min_duration_sec|max_duration_sec|           avg_lat|           avg_lng|member_trips|casual_trips|      member_ratio|\n",
      "+----------------+--------------------+--------------+-----------------+----------------+----------------+------------------+------------------+------------+------------+------------------+\n",
      "|           JC115|       Grove St PATH|          3559|435.2624332677718|            61.0|          6240.0| 40.71940999999987|-74.04308999999965|        3065|         494|0.8611969654394883|\n",
      "|           HB106|River St & Newark St|          2809| 420.559985760057|            61.0|          6478.0| 40.73672227316329|-74.02900660353771|        2237|         572|0.7963688145244584|\n",
      "|           HB101|Hoboken Terminal ...|          2158|429.2775718257646|            80.0|          6296.0|40.735937584463386|-74.03030455112457|        1608|         550|0.7451343836882552|\n",
      "|           JC109|Bergen Ave & Sip Ave|          1968|511.3810975609756|            69.0|          6339.0| 40.73100888698721|-74.06443700194359|        1626|         342|0.8262195121947021|\n",
      "|           JC009|       Hamilton Park|          1798|518.5889877641824|            63.0|          7096.0| 40.72759596600006|-74.04424731099989|        1441|         357|0.8014460511675187|\n",
      "|           JC116|         Exchange Pl|          1637|571.4660965180208|            64.0|          4754.0|40.716365961850826|  -74.034343957901|        1268|         369|0.7745876603538335|\n",
      "|           HB502|11 St & Washingto...|          1636|469.9278728606357|            68.0|          6679.0| 40.74998490907148|-74.02715027332306|        1253|         383|0.7658924205374291|\n",
      "|           HB609|     River St & 1 St|          1635|461.3737003058104|            61.0|          6139.0| 40.73721535335391|-74.02886540972781|        1263|         372|0.7724770642197111|\n",
      "|           HB103|South Waterfront ...|          1556|678.1953727506427|            72.0|          6326.0|40.736982218187194|-74.02778059244156|        1058|         498| 0.679948586117815|\n",
      "|           HB202|14 St Ferry - 14 ...|          1545|610.6252427184467|            62.0|          5762.0|40.752960630465054|-74.02435272932053|        1115|         430| 0.721682847895973|\n",
      "+----------------+--------------------+--------------+-----------------+----------------+----------------+------------------+------------------+------------+------------+------------------+\n",
      "only showing top 10 rows\n",
      "\n",
      "Gold Q2 — trips_by_day (30 jours) :\n",
      "+----------+-------------+-------------+----------+------------------+\n",
      "| trip_date|member_casual|rideable_type|trip_count|  avg_duration_sec|\n",
      "+----------+-------------+-------------+----------+------------------+\n",
      "|2026-04-01|       casual|electric_bike|       452| 638.7300884955753|\n",
      "|2026-04-01|       casual| classic_bike|       152| 957.5065789473684|\n",
      "|2026-04-01|       member|electric_bike|      1496| 437.1610962566845|\n",
      "|2026-04-01|       member| classic_bike|       943|491.93955461293746|\n",
      "|2026-04-02|       casual|electric_bike|       299|494.66220735785953|\n",
      "|2026-04-02|       casual| classic_bike|        69| 733.2173913043479|\n",
      "|2026-04-02|       member|electric_bike|      1239| 376.9499596448749|\n",
      "|2026-04-02|       member| classic_bike|       614|406.29967426710095|\n",
      "|2026-04-03|       casual|electric_bike|       506| 672.0612648221344|\n",
      "|2026-04-03|       casual| classic_bike|       169|1176.4556213017752|\n",
      "+----------+-------------+-------------+----------+------------------+\n",
      "only showing top 10 rows\n",
      "\n",
      "Gold écrit en 5.1s — 108 stations, 30 jours\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "C:\\Users\\yoyod\\AppData\\Local\\Temp\\ipykernel_22608\\4028596838.py:17: DeprecationWarning: datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).\n",
      "  \"timestamp\":    datetime.datetime.utcnow().isoformat()\n"
     ]
    }
   ],
   "source": [
    "t0 = time.time()\n",
    "\n",
    "df_s = spark.read.parquet(CFG[\"paths\"][\"silver\"])\n",
    "\n",
    "# ── Gold Q1 : stats par station de départ ────────────────────────────────────\n",
    "gold_station = (\n",
    "    df_s\n",
    "    .groupBy(\"start_station_id\", \"start_station_name\")\n",
    "    .agg(\n",
    "        F.count(\"*\").alias(\"num_departures\"),\n",
    "        F.avg(\"duration_sec\").alias(\"avg_duration_sec\"),\n",
    "        F.min(\"duration_sec\").alias(\"min_duration_sec\"),\n",
    "        F.max(\"duration_sec\").alias(\"max_duration_sec\"),\n",
    "        F.avg(\"start_lat\").alias(\"avg_lat\"),\n",
    "        F.avg(\"start_lng\").alias(\"avg_lng\"),\n",
    "        F.sum(F.when(F.col(\"member_casual\") == \"member\", 1).otherwise(0))\n",
    "         .alias(\"member_trips\"),\n",
    "        F.sum(F.when(F.col(\"member_casual\") == \"casual\", 1).otherwise(0))\n",
    "         .alias(\"casual_trips\"),\n",
    "    )\n",
    "    .withColumn(\"member_ratio\",\n",
    "        F.col(\"member_trips\") / (F.col(\"num_departures\") + 1e-9))\n",
    "    .orderBy(F.desc(\"num_departures\"))\n",
    ")\n",
    "\n",
    "gold_station.write.mode(\"overwrite\").parquet(\n",
    "    f\"{CFG['paths']['gold']}/trips_by_station\")\n",
    "\n",
    "print(\"Gold Q1 — trips_by_station :\")\n",
    "gold_station.show(10)\n",
    "\n",
    "# ── Gold Q2 : volume quotidien par member_casual ──────────────────────────────\n",
    "gold_daily = (\n",
    "    df_s\n",
    "    .withColumn(\"trip_date\", F.to_date(\"started_at\"))\n",
    "    .groupBy(\"trip_date\", \"member_casual\", \"rideable_type\")\n",
    "    .agg(\n",
    "        F.count(\"*\").alias(\"trip_count\"),\n",
    "        F.avg(\"duration_sec\").alias(\"avg_duration_sec\"),\n",
    "    )\n",
    "    .orderBy(\"trip_date\", \"member_casual\")\n",
    ")\n",
    "\n",
    "(\n",
    "    gold_daily\n",
    "    .write\n",
    "    .mode(\"overwrite\")\n",
    "    .partitionBy(*CFG[\"layout\"][\"gold_partition_by\"])\n",
    "    .parquet(f\"{CFG['paths']['gold']}/trips_by_day\")\n",
    ")\n",
    "\n",
    "t_gold = time.time() - t0\n",
    "\n",
    "n_stations = gold_station.count()\n",
    "n_days     = gold_daily.select(\"trip_date\").distinct().count()\n",
    "\n",
    "print(f\"\\nGold Q2 — trips_by_day ({n_days} jours) :\")\n",
    "gold_daily.show(10)\n",
    "\n",
    "print(f\"\\nGold écrit en {t_gold:.1f}s — {n_stations} stations, {n_days} jours\")\n",
    "\n",
    "log_metric(\"r1\", \"etl\", \"gold_station\",  \"row_count\",   n_stations,       \"stations uniques\")\n",
    "log_metric(\"r1\", \"etl\", \"gold_daily\",    \"row_count\",   n_days,           \"jours distincts\")\n",
    "log_metric(\"r1\", \"etl\", \"gold_build\",    \"elapsed_sec\", round(t_gold,2),  \"\")\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "id": "2efc0dac",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Plan ETL sauvegardé → proof/plan_etl.txt\n",
      "== Physical Plan ==\n",
      "AdaptiveSparkPlan (9)\n",
      "+- Sort (8)\n",
      "   +- Exchange (7)\n",
      "      +- Project (6)\n",
      "         +- HashAggregate (5)\n",
      "            +- Exchange (4)\n",
      "               +- HashAggregate (3)\n",
      "                  +- Project (2)\n",
      "                     +- Scan parquet  (1)\n",
      "\n",
      "\n",
      "(1) Scan parquet \n",
      "Output [7]: [start_station_name#469, start_station_id#470, start_lat#473, start_lng#474, member_casual#477, duration_sec#478, rideable_type#479]\n",
      "Batched: true\n",
      "Location: InMemoryFileIndex [file:/C:/Users/yoyod/Downloads/outputs/project/silver]\n",
      "ReadSchema: struct<start_station_name:string,start_station_id:string,start_lat:double,start_lng:double,member_casual:string,duration_sec:double>\n",
      "\n",
      "(2) Project\n",
      "Output [6]: [start_station_name#469, start_station_id#470, start_lat#473, start_lng#474, member_casual#477, duration_sec#478]\n",
      "Input [7]: [start_station_name#469, start_station_id#470, start_lat#473, start_lng#474, member_casual#477, duration_sec#478, rideable_type#479]\n",
      "\n",
      "(3) HashAggregate\n",
      "Input [6]: [start_station_name#469, start_station_id#470, start_lat#473, start_lng#474, member_casual#477, duration_sec#478]\n",
      "Keys [2]: [start_station_id#470, start_station_name#469]\n",
      "Functions [8]: [partial_count(1), partial_avg(duration_sec#478), partial_min(duration_sec#478), partial_max(duration_sec#478), partial_avg(start_lat#473), partial_avg(start_lng#474), partial_sum(CASE WHEN (member_casual#477 = member) THEN 1 ELSE 0 END), partial_sum(CASE WHEN (member_casual#477 = casual) THEN 1 ELSE 0 END)]\n",
      "Aggregate Attributes [11]: [count#512L, sum#513, count#514L, min#515, max#516, sum#517, count#518L, sum#519, count#520L, sum#521L, sum#522L]\n",
      "Results [13]: [start_station_id#470, start_station_name#469, count#523L, sum#524, count#525L, min#526, max#527, sum#528, count#529L, sum#530, count#531L, sum#532L, sum#533L]\n",
      "\n",
      "(4) Exchange\n",
      "Input [13]: [start_station_id#470, start_station_name#469, count#523L, sum#524, count#525L, min#526, max#527, sum#528, count#529L, sum#530, count#531L, sum#532L, sum#533L]\n",
      "Arguments: hashpart\n"
     ]
    }
   ],
   "source": [
    "# ── Plan d'exécution ETL (gold_station) ──────────────────────────────────────\n",
    "old_stdout = sys.stdout\n",
    "sys.stdout = buf = io.StringIO()\n",
    "gold_station.explain(\"formatted\")\n",
    "sys.stdout = old_stdout\n",
    "plan_etl = buf.getvalue()\n",
    "\n",
    "with open(f\"{CFG['paths']['proof']}/plan_etl.txt\", \"w\") as f:\n",
    "    f.write(plan_etl)\n",
    "\n",
    "print(\"Plan ETL sauvegardé → proof/plan_etl.txt\")\n",
    "print(plan_etl[:2000])\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "9d063817",
   "metadata": {},
   "source": [
    "## 4. Streaming — Fenêtre 15 min + Watermark → Parquet\n",
    "\n",
    "Pipeline Structured Streaming (réutilisé et amélioré depuis Lab 1) :\n",
    "- Source : fichiers CSV déposés progressivement dans `data/landing/`\n",
    "- Watermark : 10 min (tolérance aux retards réseau vélos connectés)\n",
    "- Fenêtre tumbling : 15 min (granularité naturelle Track C)\n",
    "- Sortie : Parquet en mode `append` avec checkpoint exactly-once\n",
    "\n",
    "**Deux runs comparés :**\n",
    "\n",
    "| Paramètre | Run 1 (baseline) | Run 2 (optimisé) |\n",
    "|---|---|---|\n",
    "| Trigger | 10s | 5s |\n",
    "| Watermark | 10 min | 5 min |\n",
    "| Shuffle partitions | 4 | 2 |\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "id": "b9a234c3",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Schéma streaming : 13 colonnes\n",
      "EVENT_TIME_COL  = started_at\n",
      "WINDOW_DURATION = 15 minutes\n",
      "WATERMARK_DELAY = 10 minutes\n",
      "Dataset total   : 82,272 trajets\n",
      "10 fichiers (~8,227 trajets/fichier) créés dans data/staging/\n"
     ]
    }
   ],
   "source": [
    "import pandas as pd\n",
    "\n",
    "# ── Schéma streaming ─────────────────────────────────────────────────────────\n",
    "stream_schema = StructType([\n",
    "    StructField(\"ride_id\",            StringType(),    True),\n",
    "    StructField(\"rideable_type\",      StringType(),    True),\n",
    "    StructField(\"started_at\",         TimestampType(), True),\n",
    "    StructField(\"ended_at\",           TimestampType(), True),\n",
    "    StructField(\"start_station_name\", StringType(),    True),\n",
    "    StructField(\"start_station_id\",   StringType(),    True),\n",
    "    StructField(\"end_station_name\",   StringType(),    True),\n",
    "    StructField(\"end_station_id\",     StringType(),    True),\n",
    "    StructField(\"start_lat\",          DoubleType(),    True),\n",
    "    StructField(\"start_lng\",          DoubleType(),    True),\n",
    "    StructField(\"end_lat\",            DoubleType(),    True),\n",
    "    StructField(\"end_lng\",            DoubleType(),    True),\n",
    "    StructField(\"member_casual\",      StringType(),    True),\n",
    "])\n",
    "\n",
    "EVENT_TIME_COL  = CFG[\"streaming\"][\"event_time_col\"]\n",
    "WINDOW_DURATION = CFG[\"streaming\"][\"window_duration\"]\n",
    "WATERMARK_DELAY = CFG[\"streaming\"][\"watermark\"]\n",
    "LANDING_DIR     = CFG[\"paths\"][\"streaming_landing\"]\n",
    "STAGING_DIR     = CFG[\"paths\"][\"streaming_staging\"]\n",
    "\n",
    "# ── Split CSV en 10 fichiers pour simuler le flux ─────────────────────────────\n",
    "pathlib.Path(LANDING_DIR).mkdir(parents=True, exist_ok=True)\n",
    "pathlib.Path(STAGING_DIR).mkdir(parents=True, exist_ok=True)\n",
    "\n",
    "# Nettoyage staging\n",
    "shutil.rmtree(STAGING_DIR, ignore_errors=True)\n",
    "pathlib.Path(STAGING_DIR).mkdir(parents=True, exist_ok=True)\n",
    "\n",
    "df_full = pd.read_csv(RAW_CSV)\n",
    "n_files = CFG[\"streaming\"][\"n_split_files\"]\n",
    "chunk_size = len(df_full) // n_files\n",
    "\n",
    "for i in range(n_files):\n",
    "    chunk = df_full.iloc[i*chunk_size:(i+1)*chunk_size]\n",
    "    chunk.to_csv(f\"{STAGING_DIR}batch_{i:02d}.csv\", index=False)\n",
    "\n",
    "print(f\"Schéma streaming : {len(stream_schema.fields)} colonnes\")\n",
    "print(f\"EVENT_TIME_COL  = {EVENT_TIME_COL}\")\n",
    "print(f\"WINDOW_DURATION = {WINDOW_DURATION}\")\n",
    "print(f\"WATERMARK_DELAY = {WATERMARK_DELAY}\")\n",
    "print(f\"Dataset total   : {len(df_full):,} trajets\")\n",
    "print(f\"{n_files} fichiers (~{chunk_size:,} trajets/fichier) créés dans {STAGING_DIR}\")\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "id": "5c89a74c",
   "metadata": {},
   "outputs": [],
   "source": [
    "# ── Fonction helper — construit et lance la streaming query ───────────────────\n",
    "def run_streaming_query(watermark, trigger_interval, shuffle_parts,\n",
    "                        sink_dir, ckpt_dir, label):\n",
    "    \"\"\"Lance une streaming query, dépose les fichiers, capture lastProgress.\"\"\"\n",
    "\n",
    "    # Reset landing + outputs\n",
    "    shutil.rmtree(LANDING_DIR, ignore_errors=True)\n",
    "    shutil.rmtree(sink_dir,    ignore_errors=True)\n",
    "    shutil.rmtree(ckpt_dir,    ignore_errors=True)\n",
    "    pathlib.Path(LANDING_DIR).mkdir(parents=True, exist_ok=True)\n",
    "    pathlib.Path(sink_dir).mkdir(parents=True, exist_ok=True)\n",
    "    pathlib.Path(ckpt_dir).mkdir(parents=True, exist_ok=True)\n",
    "\n",
    "    spark.conf.set(\"spark.sql.shuffle.partitions\", str(shuffle_parts))\n",
    "\n",
    "    df_stream = (\n",
    "        spark.readStream\n",
    "        .schema(stream_schema)\n",
    "        .option(\"header\", \"true\")\n",
    "        .option(\"maxFilesPerTrigger\",\n",
    "                CFG[\"streaming\"][\"max_files_per_trigger\"])\n",
    "        .csv(LANDING_DIR)\n",
    "    )\n",
    "\n",
    "    df_stream = df_stream.withColumn(\n",
    "        \"duration_sec\",\n",
    "        (F.unix_timestamp(\"ended_at\") - F.unix_timestamp(\"started_at\"))\n",
    "        .cast(DoubleType())\n",
    "    )\n",
    "\n",
    "    windowed = (\n",
    "        df_stream\n",
    "        .withWatermark(EVENT_TIME_COL, watermark)\n",
    "        .groupBy(\n",
    "            F.window(EVENT_TIME_COL, WINDOW_DURATION),\n",
    "            F.col(\"start_station_id\"),\n",
    "            F.col(\"member_casual\"),\n",
    "            F.col(\"rideable_type\"),\n",
    "        )\n",
    "        .agg(\n",
    "            F.count(\"*\").alias(\"trip_count\"),\n",
    "            F.avg(\"duration_sec\").alias(\"avg_duration_sec\"),\n",
    "            F.min(\"duration_sec\").alias(\"min_duration_sec\"),\n",
    "            F.max(\"duration_sec\").alias(\"max_duration_sec\"),\n",
    "            F.approx_count_distinct(\"ride_id\").alias(\"unique_rides\"),\n",
    "        )\n",
    "    )\n",
    "\n",
    "    query = (\n",
    "        windowed.writeStream\n",
    "        .format(\"parquet\")\n",
    "        .outputMode(\"append\")\n",
    "        .option(\"path\", sink_dir)\n",
    "        .option(\"checkpointLocation\", ckpt_dir)\n",
    "        .trigger(processingTime=trigger_interval)\n",
    "        .start()\n",
    "    )\n",
    "\n",
    "    print(f\"[{label}] Query démarrée — trigger={trigger_interval}, \"\n",
    "          f\"watermark={watermark}, shuffle={shuffle_parts}\")\n",
    "\n",
    "    # Dépôt progressif des fichiers\n",
    "    staging_files = sorted(pathlib.Path(STAGING_DIR).glob(\"*.csv\"))\n",
    "    for i, src in enumerate(staging_files):\n",
    "        shutil.copy(str(src), str(pathlib.Path(LANDING_DIR) / src.name))\n",
    "        print(f\"  [{i+1:02d}/{n_files}] {src.name} déposé\")\n",
    "        time.sleep(20)\n",
    "\n",
    "    time.sleep(10)\n",
    "    progress = query.lastProgress\n",
    "    query.stop()\n",
    "\n",
    "    n_parquets = len(list(pathlib.Path(sink_dir).rglob(\"*.parquet\")))\n",
    "    print(f\"[{label}] Terminé — {n_parquets} fichiers Parquet écrits\")\n",
    "    return progress, n_parquets\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "id": "ca9f43ca",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[Run1-baseline] Query démarrée — trigger=10 seconds, watermark=10 minutes, shuffle=4\n",
      "  [01/10] batch_00.csv déposé\n",
      "  [02/10] batch_01.csv déposé\n",
      "  [03/10] batch_02.csv déposé\n",
      "  [04/10] batch_03.csv déposé\n",
      "  [05/10] batch_04.csv déposé\n",
      "  [06/10] batch_05.csv déposé\n",
      "  [07/10] batch_06.csv déposé\n",
      "  [08/10] batch_07.csv déposé\n",
      "  [09/10] batch_08.csv déposé\n",
      "  [10/10] batch_09.csv déposé\n",
      "[Run1-baseline] Terminé — 20 fichiers Parquet écrits\n",
      "\n",
      "=== Métriques Run 1 ===\n",
      "{\n",
      "  \"id\": \"2ffa0eea-9df6-4495-a4e1-aafb42919d27\",\n",
      "  \"runId\": \"fd3896df-e4d6-45ee-a4aa-833e350b652f\",\n",
      "  \"name\": null,\n",
      "  \"timestamp\": \"2026-05-24T21:12:00.000Z\",\n",
      "  \"batchId\": 13,\n",
      "  \"batchDuration\": 1549,\n",
      "  \"durationMs\": {\n",
      "    \"triggerExecution\": 1549,\n",
      "    \"queryPlanning\": 15,\n",
      "    \"getBatch\": 0,\n",
      "    \"commitOffsets\": 300,\n",
      "    \"latestOffset\": 16,\n",
      "    \"addBatch\": 878,\n",
      "    \"walCommit\": 340\n",
      "  },\n",
      "  \"eventTime\": {\n",
      "    \"watermark\": \"2026-04-30T21:49:26.524Z\"\n",
      "  },\n",
      "  \"stateOperators\": [\n",
      "    {\n",
      "      \"operatorName\": \"stateStoreSave\",\n",
      "      \"numRowsTotal\": 10,\n",
      "      \"numRowsUpdated\": 0,\n",
      "      \"numRowsRemoved\": 0,\n",
      "      \"allUpdatesTimeMs\": 0,\n",
      "      \"allRemovalsTimeMs\": 0,\n",
      "      \"commitTimeMs\": 1443,\n",
      "      \"memoryUsedBytes\": 9424,\n",
      "      \"numRowsDroppedByWatermark\": 0,\n",
      "      \"numShufflePartitions\": 4,\n",
      "      \"numStateStoreInstances\": 4,\n",
      "      \"customMetrics\": {\n",
      "        \"stateOnCurrentVersionSizeBytes\": 7656,\n",
      "        \"loadedMapCacheHitCount\": 104,\n",
      "        \"loadedMapCacheMissCount\": 0\n",
      "      }\n",
      "    }\n",
      "  ],\n",
      "  \"sources\": [\n",
      "    {\n",
      "      \"description\": \"FileStreamSource[file:/C:/Users/yoyod/Downloads/data/landing]\",\n",
      "      \"startOffset\": \"{\\\"logOffset\\\":9}\",\n",
      "      \"endOffset\": \"{\\\"logOffset\\\":9}\",\n",
      "      \"latestOffset\": \"None\",\n",
      "      \"numInputRows\": 0,\n",
      "      \"inputRowsPerSecond\": 0.0,\n",
      "      \"processedRowsPerSecond\": 0.0,\n",
      "      \"metrics\": {}\n",
      "    }\n",
      "  ],\n",
      "  \"sink\": {\n",
      "    \"description\": \"FileSink[file:/C:/Users/yoyod/Downloads/outputs/project/streaming]\",\n",
      "    \"numOutputRows\": -1,\n",
      "    \"metrics\": {}\n",
      "  },\n",
      "  \"numInputRows\": 0,\n",
      "  \"inputRowsPerSecond\": 0.0,\n",
      "  \"processedRowsPerSecond\": 0.0,\n",
      "  \"observedMetrics\": {}\n",
      "}\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "C:\\Users\\yoyod\\AppData\\Local\\Temp\\ipykernel_22608\\4028596838.py:17: DeprecationWarning: datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).\n",
      "  \"timestamp\":    datetime.datetime.utcnow().isoformat()\n"
     ]
    }
   ],
   "source": [
    "# ── RUN 1 — Baseline ─────────────────────────────────────────────────────────\n",
    "SINK_R1 = CFG[\"paths\"][\"streaming_sink\"]\n",
    "CKPT_R1 = CFG[\"paths\"][\"streaming_ckpt\"]\n",
    "\n",
    "p1, nf1 = run_streaming_query(\n",
    "    watermark=\"10 minutes\", trigger_interval=\"10 seconds\",\n",
    "    shuffle_parts=4, sink_dir=SINK_R1, ckpt_dir=CKPT_R1,\n",
    "    label=\"Run1-baseline\"\n",
    ")\n",
    "\n",
    "# Sauvegarde preuve Run 1\n",
    "with open(f\"{CFG['paths']['proof']}/query_progress_before.json\", \"w\") as f:\n",
    "    json.dump(p1, f, indent=2, default=str)\n",
    "\n",
    "if p1:\n",
    "    log_metric(\"r1\", \"streaming\", \"run1_baseline\", \"inputRowsPerSecond\",\n",
    "               p1.get(\"inputRowsPerSecond\", 0), \"trigger=10s watermark=10min\")\n",
    "    log_metric(\"r1\", \"streaming\", \"run1_baseline\", \"durationMs_trigger\",\n",
    "               (p1.get(\"durationMs\") or {}).get(\"triggerExecution\", 0), \"\")\n",
    "    log_metric(\"r1\", \"streaming\", \"run1_baseline\", \"stateRows\",\n",
    "               (p1.get(\"stateOperators\") or [{}])[0].get(\"numRowsTotal\", 0), \"\")\n",
    "    log_metric(\"r1\", \"streaming\", \"run1_baseline\", \"parquet_files\", nf1, \"\")\n",
    "\n",
    "print(\"\\n=== Métriques Run 1 ===\")\n",
    "print(json.dumps(p1, indent=2, default=str))\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "id": "23bf154f",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[Run2-optimized] Query démarrée — trigger=5 seconds, watermark=5 minutes, shuffle=2\n",
      "  [01/10] batch_00.csv déposé\n",
      "  [02/10] batch_01.csv déposé\n",
      "  [03/10] batch_02.csv déposé\n",
      "  [04/10] batch_03.csv déposé\n",
      "  [05/10] batch_04.csv déposé\n",
      "  [06/10] batch_05.csv déposé\n",
      "  [07/10] batch_06.csv déposé\n",
      "  [08/10] batch_07.csv déposé\n",
      "  [09/10] batch_08.csv déposé\n",
      "  [10/10] batch_09.csv déposé\n",
      "[Run2-optimized] Terminé — 16 fichiers Parquet écrits\n",
      "\n",
      "======= COMPARAISON STREAMING =======\n",
      "Métrique                            Run 1 (baseline)       Run 2 (optimisé)\n",
      "------------------------------------------------------------------------------\n",
      "inputRowsPerSecond                  0.0                    0.0\n",
      "processedRowsPerSecond              0.0                    0.0\n",
      "numInputRows                        0                      0\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "C:\\Users\\yoyod\\AppData\\Local\\Temp\\ipykernel_22608\\4028596838.py:17: DeprecationWarning: datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).\n",
      "  \"timestamp\":    datetime.datetime.utcnow().isoformat()\n"
     ]
    }
   ],
   "source": [
    "# ── RUN 2 — Optimisé ─────────────────────────────────────────────────────────\n",
    "SINK_R2 = \"outputs/project/streaming_r2\"\n",
    "CKPT_R2 = \"outputs/project/streaming_checkpoint_r2\"\n",
    "\n",
    "p2, nf2 = run_streaming_query(\n",
    "    watermark=\"5 minutes\", trigger_interval=\"5 seconds\",\n",
    "    shuffle_parts=2, sink_dir=SINK_R2, ckpt_dir=CKPT_R2,\n",
    "    label=\"Run2-optimized\"\n",
    ")\n",
    "\n",
    "# Sauvegarde preuve Run 2\n",
    "with open(f\"{CFG['paths']['proof']}/query_progress_after.json\", \"w\") as f:\n",
    "    json.dump(p2, f, indent=2, default=str)\n",
    "\n",
    "if p2:\n",
    "    log_metric(\"r2\", \"streaming\", \"run2_optimized\", \"inputRowsPerSecond\",\n",
    "               p2.get(\"inputRowsPerSecond\", 0), \"trigger=5s watermark=5min\")\n",
    "    log_metric(\"r2\", \"streaming\", \"run2_optimized\", \"durationMs_trigger\",\n",
    "               (p2.get(\"durationMs\") or {}).get(\"triggerExecution\", 0), \"\")\n",
    "    log_metric(\"r2\", \"streaming\", \"run2_optimized\", \"stateRows\",\n",
    "               (p2.get(\"stateOperators\") or [{}])[0].get(\"numRowsTotal\", 0), \"\")\n",
    "    log_metric(\"r2\", \"streaming\", \"run2_optimized\", \"parquet_files\", nf2, \"\")\n",
    "\n",
    "# ── Comparaison ───────────────────────────────────────────────────────────────\n",
    "print(\"\\n======= COMPARAISON STREAMING =======\")\n",
    "print(f\"{'Métrique':<35} {'Run 1 (baseline)':<22} {'Run 2 (optimisé)'}\")\n",
    "print(\"-\" * 78)\n",
    "for k in [\"inputRowsPerSecond\", \"processedRowsPerSecond\", \"numInputRows\"]:\n",
    "    v1 = (p1 or {}).get(k, \"N/A\")\n",
    "    v2 = (p2 or {}).get(k, \"N/A\")\n",
    "    print(f\"{k:<35} {str(v1):<22} {str(v2)}\")\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "id": "44f31fe9",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Plan streaming sauvegardé → proof/plan_streaming.txt\n",
      "== Physical Plan ==\n",
      "* HashAggregate (12)\n",
      "+- StateStoreSave (11)\n",
      "   +- * HashAggregate (10)\n",
      "      +- StateStoreRestore (9)\n",
      "         +- * HashAggregate (8)\n",
      "            +- Exchange (7)\n",
      "               +- * HashAggregate (6)\n",
      "                  +- * Project (5)\n",
      "                     +- * Filter (4)\n",
      "                        +- EventTimeWatermark (3)\n",
      "                           +- * Project (2)\n",
      "                              +- StreamingRelation (1)\n",
      "\n",
      "\n",
      "(1) StreamingRelation\n",
      "Output [13]: [ride_id#53697, rideable_type#53698, started_at#53699, ended_at#53700, start_station_name#53701, start_station_id#53702, end_station_name#53703, end_station_id#53704, start_lat#53705, start_lng#53706, end_lat#53707, end_lng#53708, member_casual#53709]\n",
      "Arguments: FileSource[data/landing/], [ride_id#53697, rideable_type#53698, started_at#53699, ended_at#53700, start_station_name#53701, start_station_id#53702, end_station_name#53703, end_station_id#53704, start_lat#53705, start_lng#53706, end_lat#53707, end_lng#53708, member_casual#53709]\n",
      "\n",
      "(2) Project [codegen id : 1]\n",
      "Output [5]: [rideable_type#53698, started_at#53699, start_station_id#53702, member_casual#53709, cast((unix_timestamp(ended_at#53700, yyyy-MM-dd HH:mm:ss, Some(Europe/Paris), true) - unix_timestamp(started_at#53699, yyyy-MM-dd HH:mm:ss, Some(Europe/Paris), true)) as double) AS duration_sec#53711]\n",
      "Input [13]: [ride_id#53697, rideable_type#53698, started_at#53699, ended_at#53700, start_station_name#53701, start_station_id#53702, end_station_name#53703, end_station_id#53704, start_lat#53705, start_lng#53706, end_lat#53707, end_lng#53708, member_casual#53709]\n",
      "\n",
      "(3) EventTimeWatermark\n",
      "Input [5]: [rideable_type#53698, started_at#53699, start_station_id#53702, member_casual#53709, duration_sec#53711]\n",
      "Arguments: 4fda02d2-4778-46c7-ac16-c22ae1e269be, started_at#53699: timestamp, 10 minutes\n",
      "\n",
      "(4) Filter [codegen id : 2]\n",
      "Input [5]: [rideable_type#53698, started_at#53699-T600000ms, start_station_id#53702, member_casual#53709, duration_sec#53711]\n",
      "Condition : isnotnull(started_at#53699-T600000ms)\n",
      "\n",
      "(5) Project [codegen id : 2]\n",
      "Output [5]: [named_struct(start, knownnullable(precisetimestampconversion(((precisetimestampconversion(started_at#53699-T600000ms, TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(started_at#53699-T600000ms, TimestampType, LongType) - 0) % 900000000) < 0) THEN (((precisetimestampconversion(started_at#53699-T600000ms, TimestampType, LongType) - 0) % 900000000) + 900000000) ELSE ((precisetimestampconversion(started_at#53699-T600000ms, TimestampType, LongType) - 0) % 900000000) END) - 0), LongType, TimestampType)), end, knownnullable(precisetimestampconversion((((precisetimestampconversion(started_at#53699-T600000ms, TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(started_at#53699-T600000ms, TimestampType, LongType) - 0) % 900000000) < 0) THEN (((precisetimestampconversion(started_at#53699-T600000ms, TimestampType, LongType) - 0) % 900000000) + 900000000) ELSE ((precisetimestampconversion(started_at#53699-T600000ms, TimestampType, LongType) - 0) % 900000000) END) - 0) + 900000000), LongType, TimestampType))) AS window#53733-T600000ms, rideable_type#53698, start_station_id#53702, member_casual#53709, duration_sec#53711]\n",
      "Input [5]: [rideable_type#53698, started_at#53699-T600000ms, start_station_id#53702, member_casual#53709, duration_sec#53711]\n",
      "\n",
      "(6) HashAggregate [codegen id : 2]\n",
      "Input [5]: [window#53733-T600000ms, rideable_type#53698, start_station_id#53702, member_casual#53709, duration_sec#53711]\n",
      "Keys [4]: [window#53733-T600000ms, start_station_id#53702, member_casual#53709, rideable_type#53698]\n",
      "Functions [2]: [partial_count(1), partial_avg(duration_sec#53711)]\n",
      "Aggregate Attributes [2]: [count(1)#53731L, avg(duration_sec#53711)#53732]\n",
      "Results [7]: [window#53733-T600000ms, start_station_id#53702, member_casual#53709, rideable_type#53698, count#53735L, sum#53738, count#53739L]\n",
      "\n",
      "(7) Exchange\n",
      "Input [7]: [window#53733-T600000ms, start_station_id#53702, member_casual#53709, rideable_type#53698, count#53735L, sum#53738, count#53739L]\n",
      "Arguments: hashpartitioning(window#53733-T600000ms, start_station_id#53702, member_casual#53709, rideable_type#53698, 4), REQUIRED_BY_STATEFUL_OPERATOR, [plan_id=5944]\n",
      "\n",
      "(8) HashAggregate [codegen id : 3]\n",
      "Input [7]: [window#53733-T600000ms, start_station_id#53702, member_casual#53709, rideable_type#53698, count#53735L, sum#53738, count#53739L]\n",
      "Keys [4]: [window#53733-T600000ms, start_station_id#53702, member_casual#53709, rideable_type#53698]\n",
      "Functions [2]: [merge_count(1), merge_avg(duration_sec#53711)]\n",
      "Aggregate Attributes [2]: [count(1)#53731L, avg(duration_sec#53711)#53732]\n",
      "Results [7]: [window#53733-T600000ms, start_station_id#53702, member_casual#53709, rideable_type#53698, count#53735L, sum#53738, count#53739L]\n",
      "\n",
      "(9) StateStoreRestore\n",
      "Input [7]: [window#53733-T600000ms, start_station_id#53702, member_casual#53709, rideable_type#53698, count#53735L, sum#53738, count#53739L]\n",
      "Arguments: [window#53733-T600000ms, start_station_id#53702, member_casual#53709, rideable_type#53698], state info [ checkpoint = <unknown>, runId = 8622fa46-aab6-420d-be34-4d7fdcf1f15a, opId = 0, ver = 0, numPartitions = 4] stateStoreCkptIds = None, 2\n",
      "\n",
      "(10) HashAggregate [codegen id : 4]\n",
      "Input [7]: [window#53733-T600000ms, start_station_id#53702, member_casual#53709, rideable_type#53698, count#53735L, sum#53738, count#53739L]\n",
      "Keys [4]: [window#53733-T600000ms, start_station_id#53702, member_casual#53709, rideable_type#53698]\n",
      "Functions [2]: [merge_count(1), merge_avg(duration_sec#53711)]\n",
      "Aggregate Attributes [2]: [count(1)#53731L, avg(duration_sec#53711)#53732]\n",
      "Results [7]: [window#53733-T600000ms, start_station_id#53702, member_casual#53709, rideable_type#53698, count#53735L, sum#53738, count#53739L]\n",
      "\n",
      "(11) StateStoreSave\n",
      "Input [7]: [window#53733-T600000ms, start_station_id#53702, member_casual#53709, rideable_type#53698, count#53735L, sum#53738, count#53739L]\n",
      "Arguments: [window#53733-T600000ms, start_station_id#53702, member_casual#53709, rideable_type#53698], state info [ checkpoint = <unknown>, runId = 8622fa46-aab6-420d-be34-4d7fdcf1f15a, opId = 0, ver = 0, numPartitions = 4] stateStoreCkptIds = None, Append, -9223372036854775808, -9223372036854775808, 2\n",
      "\n",
      "(12) HashAggregate [codegen id : 5]\n",
      "Input [7]: [window#53733-T600000ms, start_station_id#53702, member_casual#53709, rideable_type#53698, count#53735L, sum#53738, count#53739L]\n",
      "Keys [4]: [window#53733-T600000ms, start_station_id#53702, member_casual#53709, rideable_type#53698]\n",
      "Functions [2]: [count(1), avg(duration_sec#53711)]\n",
      "Aggregate Attributes [2]: [count(1)#53731L, avg(duration_sec#53711)#53732]\n",
      "Results [6]: [window#53733-T600000ms, start_station_id#53702, member_casual#53709, rideable_type#53698, count(1)#53731L AS trip_count#53712L, avg(duration_sec#53711)#53732 AS avg_duration_sec#53713]\n",
      "\n",
      "\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# ── Plan streaming (sauvegarde) ───────────────────────────────────────────────\n",
    "# Reconstruit le plan depuis le readStream pour la preuve\n",
    "spark.conf.set(\"spark.sql.shuffle.partitions\", \"4\")\n",
    "\n",
    "df_plan = (\n",
    "    spark.readStream.schema(stream_schema)\n",
    "    .option(\"header\", \"true\").csv(LANDING_DIR)\n",
    "    .withColumn(\"duration_sec\",\n",
    "        (F.unix_timestamp(\"ended_at\") - F.unix_timestamp(\"started_at\"))\n",
    "        .cast(DoubleType()))\n",
    "    .withWatermark(EVENT_TIME_COL, \"10 minutes\")\n",
    "    .groupBy(\n",
    "        F.window(EVENT_TIME_COL, WINDOW_DURATION),\n",
    "        F.col(\"start_station_id\"), F.col(\"member_casual\"), F.col(\"rideable_type\")\n",
    "    )\n",
    "    .agg(F.count(\"*\").alias(\"trip_count\"), F.avg(\"duration_sec\").alias(\"avg_duration_sec\"))\n",
    ")\n",
    "\n",
    "old_stdout = sys.stdout\n",
    "sys.stdout = buf = io.StringIO()\n",
    "df_plan.explain(\"formatted\")\n",
    "sys.stdout = old_stdout\n",
    "plan_streaming = buf.getvalue()\n",
    "\n",
    "with open(f\"{CFG['paths']['proof']}/plan_streaming.txt\", \"w\") as f:\n",
    "    f.write(plan_streaming)\n",
    "\n",
    "print(\"Plan streaming sauvegardé → proof/plan_streaming.txt\")\n",
    "print(plan_streaming)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "fdae6f23",
   "metadata": {},
   "source": [
    "## 5. Text Pipeline — Index Inversé sur noms de stations\n",
    "\n",
    "Réutilisé et amélioré depuis Lab 2 :\n",
    "- **Corpus** : `start_station_name + \" to \" + end_station_name`\n",
    "- **Normalisation** : lower → split sur `[\\s\\W]+` → filtre longueur > 1 → stop-words\n",
    "- **Index** : `token → (doc_ids[], freq)`\n",
    "- **Benchmark** : latence par terme, comparaison Parquet vs CSV\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "id": "36314fdc",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Corpus : 81,703 documents\n",
      "Longueur moyenne du texte : 42 caractères\n",
      "+----------------+-------------+-------------+------------------------------------------------------------+-------+\n",
      "|          doc_id|rideable_type|member_casual|                                                        text|doc_len|\n",
      "+----------------+-------------+-------------+------------------------------------------------------------+-------+\n",
      "|00128E15579DB07F|electric_bike|       member|Hoboken Terminal - Hudson St & Hudson Pl to Clinton St & ...|     61|\n",
      "|0013548B2CDDD4EB|electric_bike|       member|                        Dr. Lena Edwards Park to Exchange Pl|     36|\n",
      "|001FA20CFAFCF7F5|electric_bike|       casual|                                      Exchange Pl to Hilltop|     22|\n",
      "|0029161076C04D0A|electric_bike|       member|                                Grove St PATH to Exchange Pl|     28|\n",
      "|002EDCD1C32440AE|electric_bike|       casual|        Grand St & 14 St to Church Sq Park - 5 St & Park Ave|     52|\n",
      "+----------------+-------------+-------------+------------------------------------------------------------+-------+\n",
      "only showing top 5 rows\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "C:\\Users\\yoyod\\AppData\\Local\\Temp\\ipykernel_22608\\4028596838.py:17: DeprecationWarning: datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).\n",
      "  \"timestamp\":    datetime.datetime.utcnow().isoformat()\n"
     ]
    }
   ],
   "source": [
    "# ── Chargement Silver ─────────────────────────────────────────────────────────\n",
    "df_s = spark.read.parquet(CFG[\"paths\"][\"silver\"])\n",
    "\n",
    "# ── Construction du corpus ────────────────────────────────────────────────────\n",
    "df_corpus = (\n",
    "    df_s\n",
    "    .filter(F.col(\"start_station_name\").isNotNull() &\n",
    "            F.col(\"end_station_name\").isNotNull())\n",
    "    .withColumn(\n",
    "        \"text\",\n",
    "        F.concat_ws(\" to \", F.col(\"start_station_name\"), F.col(\"end_station_name\"))\n",
    "    )\n",
    "    .withColumnRenamed(\"ride_id\", \"doc_id\")\n",
    "    .select(\"doc_id\", \"rideable_type\", \"member_casual\", \"text\")\n",
    "    .withColumn(\"doc_len\", F.length(\"text\"))\n",
    ")\n",
    "\n",
    "n_docs = df_corpus.count()\n",
    "avg_len = df_corpus.select(F.avg(\"doc_len\")).first()[0]\n",
    "\n",
    "print(f\"Corpus : {n_docs:,} documents\")\n",
    "print(f\"Longueur moyenne du texte : {avg_len:.0f} caractères\")\n",
    "df_corpus.show(5, truncate=60)\n",
    "\n",
    "log_metric(\"r1\", \"text\", \"corpus_build\", \"n_docs\",   n_docs,            \"trajets avec stations\")\n",
    "log_metric(\"r1\", \"text\", \"corpus_build\", \"avg_len\",  round(avg_len, 1), \"chars\")\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "id": "00fa038e",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Tokens AVANT stop-words : 624,183\n",
      "Tokens APRÈS stop-words : 538,254\n",
      "Mots vides supprimés    : 85,929 (13.8%)\n",
      "+----------------+--------+\n",
      "|          doc_id|   token|\n",
      "+----------------+--------+\n",
      "|00128E15579DB07F| hoboken|\n",
      "|00128E15579DB07F|terminal|\n",
      "|00128E15579DB07F|  hudson|\n",
      "|00128E15579DB07F|      st|\n",
      "|00128E15579DB07F|  hudson|\n",
      "|00128E15579DB07F|      pl|\n",
      "|00128E15579DB07F| clinton|\n",
      "|00128E15579DB07F|      st|\n",
      "|00128E15579DB07F|      st|\n",
      "|0013548B2CDDD4EB|      dr|\n",
      "+----------------+--------+\n",
      "only showing top 10 rows\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "C:\\Users\\yoyod\\AppData\\Local\\Temp\\ipykernel_22608\\4028596838.py:17: DeprecationWarning: datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).\n",
      "  \"timestamp\":    datetime.datetime.utcnow().isoformat()\n"
     ]
    }
   ],
   "source": [
    "# ── Normalisation : minuscules → tokenisation → filtre → stop-words ───────────\n",
    "STOP_WORDS = {\n",
    "    \"to\", \"the\", \"a\", \"an\", \"and\", \"or\", \"of\", \"in\", \"at\",\n",
    "    \"for\", \"by\", \"on\", \"is\", \"it\", \"be\", \"as\", \"with\",\n",
    "    \"from\", \"this\", \"that\", \"are\", \"was\", \"not\", \"but\",\n",
    "    \"have\", \"has\", \"had\", \"do\", \"did\", \"so\", \"up\", \"if\",\n",
    "    \"no\", \"my\", \"we\", \"he\", \"she\", \"they\", \"its\",\n",
    "}\n",
    "\n",
    "df_tokens = df_corpus.withColumn(\n",
    "    \"tokens\", F.split(F.lower(F.col(\"text\")), r\"[\\s\\W]+\")\n",
    ")\n",
    "\n",
    "df_exploded = (\n",
    "    df_tokens\n",
    "    .select(\"doc_id\", F.explode(\"tokens\").alias(\"token\"))\n",
    "    .filter(F.length(\"token\") > CFG[\"text\"][\"min_token_length\"] - 1)\n",
    "    .filter(~F.col(\"token\").isin(STOP_WORDS))\n",
    ")\n",
    "\n",
    "total_avant = df_corpus.withColumn(\n",
    "    \"tokens\", F.split(F.lower(F.col(\"text\")), r\"[\\s\\W]+\")\n",
    ").select(F.explode(\"tokens\").alias(\"t\")).filter(F.length(\"t\") > 1).count()\n",
    "\n",
    "total_apres = df_exploded.count()\n",
    "supprimes   = total_avant - total_apres\n",
    "\n",
    "print(f\"Tokens AVANT stop-words : {total_avant:,}\")\n",
    "print(f\"Tokens APRÈS stop-words : {total_apres:,}\")\n",
    "print(f\"Mots vides supprimés    : {supprimes:,} ({supprimes/total_avant*100:.1f}%)\")\n",
    "df_exploded.show(10)\n",
    "\n",
    "log_metric(\"r1\", \"text\", \"tokenization\", \"tokens_before\", total_avant, \"\")\n",
    "log_metric(\"r1\", \"text\", \"tokenization\", \"tokens_after\",  total_apres, \"\")\n",
    "log_metric(\"r1\", \"text\", \"tokenization\", \"filter_pct\",\n",
    "           round(supprimes/total_avant*100, 2), \"% stop-words supprimés\")\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "id": "6a5a26f9",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Termes uniques : 243  (construit en 0.6s)\n",
      "+----------+------------------------------------------------------------+------+\n",
      "|     token|                                                     doc_ids|  freq|\n",
      "+----------+------------------------------------------------------------+------+\n",
      "|        st|[0A4C5125C54C947F, 0A4C5125C54C947F, 0A540415F6C88B6F, 0A...|156323|\n",
      "|       ave|[0A8416D136527273, 0A8416D136527273, 0A8416D136527273, 0B...| 29090|\n",
      "|      park|[0A5C3503C292F811, 0A704E3C91F79249, 0A7B8B7049A2734F, 0A...| 21431|\n",
      "|washington|[0A540415F6C88B6F, 0A6C6FCF0EBE1C0C, 0A774DAE6ACFF7AC, 0A...| 14907|\n",
      "|     river|[0A6C6FCF0EBE1C0C, 0A701A1B68CD4115, 0A774DAE6ACFF7AC, 0A...| 13567|\n",
      "|    newark|[0A6C6FCF0EBE1C0C, 0A701A1B68CD4115, 0A956D68B17EE431, 0A...| 12490|\n",
      "|      path|[0A540415F6C88B6F, 0A58B2C6F5312AA7, 0AD9FE86EB59DC7B, 0B...| 10184|\n",
      "|    hudson|[0B4281F0489EE04B, 0B4281F0489EE04B, 0B562B0F2376BBE6, 0B...|  8666|\n",
      "|        14|[0B598D15ED77F12F, 0B598D15ED77F12F, 0BF7340A85D7186A, 0B...|  7692|\n",
      "|        pl|[0AB34FE2A80744A4, 0B4281F0489EE04B, 0B562B0F2376BBE6, 0B...|  7636|\n",
      "|   clinton|[0B4214D015B7AD36, 0C1CFFC7F81A22F1, 0C6906AD6E282DD7, 0C...|  7570|\n",
      "|     grove|[0A58B2C6F5312AA7, 0B335C2BB521D5D6, 0BD0BC87D4C79CF8, 0C...|  7324|\n",
      "|   hoboken|[0B4281F0489EE04B, 0B4281F0489EE04B, 0B562B0F2376BBE6, 0B...|  7101|\n",
      "|        dr|[0A8416D136527273, 0AF25117968817F6, 0AF4BB499174B9FE, 0B...|  7085|\n",
      "|     grand|[0A4C5125C54C947F, 0A5C3503C292F811, 0B12FB948777A476, 0B...|  6937|\n",
      "|     light|[0A58B2C6F5312AA7, 0B335C2BB521D5D6, 0C51D70B4B82528C, 0C...|  5920|\n",
      "|      rail|[0A58B2C6F5312AA7, 0B335C2BB521D5D6, 0C51D70B4B82528C, 0C...|  5920|\n",
      "|   sinatra|[0AF25117968817F6, 0AF4BB499174B9FE, 0B58590136E51F18, 0C...|  5720|\n",
      "|   newport|[0A4C5125C54C947F, 0A540415F6C88B6F, 0AD9FE86EB59DC7B, 0C...|  5466|\n",
      "|        12|[0AF4BB499174B9FE, 0AF77E7A323DF9F2, 0AF77E7A323DF9F2, 0B...|  5445|\n",
      "+----------+------------------------------------------------------------+------+\n",
      "only showing top 20 rows\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "C:\\Users\\yoyod\\AppData\\Local\\Temp\\ipykernel_22608\\4028596838.py:17: DeprecationWarning: datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).\n",
      "  \"timestamp\":    datetime.datetime.utcnow().isoformat()\n"
     ]
    }
   ],
   "source": [
    "# ── Construction de l'index inversé ─────────────────────────────────────────\n",
    "t0 = time.time()\n",
    "\n",
    "index_inverse = (\n",
    "    df_exploded\n",
    "    .groupBy(\"token\")\n",
    "    .agg(\n",
    "        F.collect_list(\"doc_id\").alias(\"doc_ids\"),\n",
    "        F.count(\"*\").alias(\"freq\")\n",
    "    )\n",
    "    .orderBy(F.desc(\"freq\"))\n",
    ")\n",
    "\n",
    "nb_termes = index_inverse.count()\n",
    "t_index = time.time() - t0\n",
    "\n",
    "print(f\"Termes uniques : {nb_termes:,}  (construit en {t_index:.1f}s)\")\n",
    "index_inverse.show(20, truncate=60)\n",
    "\n",
    "log_metric(\"r1\", \"text\", \"index_build\", \"unique_terms\",  nb_termes,        \"\")\n",
    "log_metric(\"r1\", \"text\", \"index_build\", \"elapsed_sec\",   round(t_index,2), \"\")\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "id": "281d1e51",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Parquet écrit → outputs/project/text/inverted_index\n",
      "CSV écrit     → outputs/project/text/inverted_index_csv\n",
      "\n",
      "Format              Octets         Ko\n",
      "--------------------------------------\n",
      "Parquet          6,778,177     6619.3\n",
      "CSV              9,224,983     9008.8\n",
      "Ratio Parquet/CSV : 0.7348 (plus petit de 26.5%)\n",
      "SLO Parquet ≤ 60% CSV : ❌ KO (ratio=0.73)\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "C:\\Users\\yoyod\\AppData\\Local\\Temp\\ipykernel_22608\\4028596838.py:17: DeprecationWarning: datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).\n",
      "  \"timestamp\":    datetime.datetime.utcnow().isoformat()\n"
     ]
    }
   ],
   "source": [
    "# ── Écriture Parquet + CSV ────────────────────────────────────────────────────\n",
    "TEXT_OUT = CFG[\"paths\"][\"text\"]\n",
    "\n",
    "index_inverse.write.mode(\"overwrite\").parquet(f\"{TEXT_OUT}/inverted_index\")\n",
    "print(f\"Parquet écrit → {TEXT_OUT}/inverted_index\")\n",
    "\n",
    "(\n",
    "    index_inverse\n",
    "    .withColumn(\"doc_ids\", F.concat_ws(\",\", \"doc_ids\"))\n",
    "    .write.mode(\"overwrite\")\n",
    "    .option(\"header\", \"true\")\n",
    "    .csv(f\"{TEXT_OUT}/inverted_index_csv\")\n",
    ")\n",
    "print(f\"CSV écrit     → {TEXT_OUT}/inverted_index_csv\")\n",
    "\n",
    "# ── Comparaison empreinte disque ──────────────────────────────────────────────\n",
    "def dir_size(path):\n",
    "    return sum(f.stat().st_size\n",
    "               for f in pathlib.Path(path).rglob(\"*\") if f.is_file())\n",
    "\n",
    "sz_parquet = dir_size(f\"{TEXT_OUT}/inverted_index\")\n",
    "sz_csv     = dir_size(f\"{TEXT_OUT}/inverted_index_csv\")\n",
    "ratio      = sz_parquet / sz_csv if sz_csv > 0 else 0\n",
    "\n",
    "print(f\"\\n{'Format':<10} {'Octets':>15} {'Ko':>10}\")\n",
    "print(\"-\" * 38)\n",
    "print(f\"{'Parquet':<10} {sz_parquet:>15,} {sz_parquet/1024:>10.1f}\")\n",
    "print(f\"{'CSV':<10} {sz_csv:>15,} {sz_csv/1024:>10.1f}\")\n",
    "print(f\"Ratio Parquet/CSV : {ratio:.4f} \"\n",
    "      f\"({'plus petit' if ratio < 1 else 'plus grand'} de {abs(1-ratio)*100:.1f}%)\")\n",
    "\n",
    "slo_ok = ratio <= CFG[\"slos\"][\"parquet_ratio_max\"]\n",
    "print(f\"SLO Parquet ≤ 60% CSV : {'✅ OK' if slo_ok else '❌ KO'} (ratio={ratio:.2f})\")\n",
    "\n",
    "log_metric(\"r1\", \"text\", \"footprint\", \"parquet_bytes\", sz_parquet, \"\")\n",
    "log_metric(\"r1\", \"text\", \"footprint\", \"csv_bytes\",     sz_csv,     \"\")\n",
    "log_metric(\"r1\", \"text\", \"footprint\", \"ratio\",         round(ratio,4), \"parquet/csv\")\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "id": "29d3b3dd",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Index mis en cache.\n",
      "\n",
      "Terme                    Fréq   Postings         ms  SLO  Statut\n",
      "--------------------------------------------------------------\n",
      "grove                   7,324      7,324      217.7  ✅  [TROUVÉ]\n",
      "hamilton                3,587      3,587       98.8  ✅  [TROUVÉ]\n",
      "newport                 5,466      5,466       66.9  ✅  [TROUVÉ]\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "C:\\Users\\yoyod\\AppData\\Local\\Temp\\ipykernel_22608\\4028596838.py:17: DeprecationWarning: datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).\n",
      "  \"timestamp\":    datetime.datetime.utcnow().isoformat()\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "jersey                  2,689      2,689       54.6  ✅  [TROUVÉ]\n",
      "path                   10,184     10,184       51.2  ✅  [TROUVÉ]\n",
      "park                   21,431     21,431       68.3  ✅  [TROUVÉ]\n",
      "xzqintrouvable              0          0       52.3  ✅  [ABSENT]\n",
      "\n",
      "Plans sauvegardés : proof/plan_index_build.txt, proof/plan_query.txt\n"
     ]
    }
   ],
   "source": [
    "# ── Benchmark de latence des requêtes ─────────────────────────────────────────\n",
    "idx = spark.read.parquet(f\"{TEXT_OUT}/inverted_index\")\n",
    "idx.cache()\n",
    "idx.count()  # matérialisation du cache\n",
    "print(\"Index mis en cache.\\n\")\n",
    "\n",
    "print(f\"{'Terme':<20} {'Fréq':>8} {'Postings':>10} {'ms':>10}  SLO  Statut\")\n",
    "print(\"-\" * 62)\n",
    "\n",
    "for terme in CFG[\"text\"][\"query_terms\"]:\n",
    "    t0 = time.time()\n",
    "    rows = idx.filter(F.col(\"token\") == terme).collect()\n",
    "    lat  = (time.time() - t0) * 1000\n",
    "\n",
    "    if rows:\n",
    "        freq, n_post = rows[0][\"freq\"], len(rows[0][\"doc_ids\"])\n",
    "        statut = \"TROUVÉ\"\n",
    "    else:\n",
    "        freq, n_post = 0, 0\n",
    "        statut = \"ABSENT\"\n",
    "\n",
    "    slo_ok = lat <= CFG[\"slos\"][\"text_query_latency_ms\"]\n",
    "    print(f\"{terme:<20} {freq:>8,} {n_post:>10,} {lat:>10.1f}  \"\n",
    "          f\"{'✅' if slo_ok else '❌'}  [{statut}]\")\n",
    "\n",
    "    log_metric(\"r1\", \"text\", f\"query_{terme}\", \"latency_ms\", round(lat,2),\n",
    "               f\"freq={freq} postings={n_post}\")\n",
    "\n",
    "# ── Sauvegarde plan index ─────────────────────────────────────────────────────\n",
    "old_stdout = sys.stdout\n",
    "sys.stdout = buf = io.StringIO()\n",
    "index_inverse.explain(\"formatted\")\n",
    "sys.stdout = old_stdout\n",
    "with open(f\"{CFG['paths']['proof']}/plan_index_build.txt\", \"w\") as f:\n",
    "    f.write(buf.getvalue())\n",
    "\n",
    "old_stdout = sys.stdout\n",
    "sys.stdout = buf = io.StringIO()\n",
    "idx.filter(F.col(\"token\") == \"grove\").explain(\"formatted\")\n",
    "sys.stdout = old_stdout\n",
    "with open(f\"{CFG['paths']['proof']}/plan_query.txt\", \"w\") as f:\n",
    "    f.write(buf.getvalue())\n",
    "\n",
    "print(\"\\nPlans sauvegardés : proof/plan_index_build.txt, proof/plan_query.txt\")\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "61614cd7",
   "metadata": {},
   "source": [
    "## 6. Clustering — KMeans sur features stations\n",
    "\n",
    "Réutilisé et amélioré depuis Lab 3 :\n",
    "- Features par station : `avg_lat`, `avg_lng`, `num_departures`, `avg_duration`, `member_ratio`, `num_arrivals`\n",
    "- StandardScaler (mean=0, std=1)\n",
    "- Sweep KMeans k=2..8, puis comparaison partitionnement before/after\n",
    "- Analyse de stabilité sur 7 seeds\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "id": "9982035f",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Stations : 108\n",
      "+----------------+------------------+------------------+--------------+-----------------+------------+------------+------------------+------------+\n",
      "|start_station_id|           avg_lat|           avg_lng|num_departures|     avg_duration|member_trips|casual_trips|      member_ratio|num_arrivals|\n",
      "+----------------+------------------+------------------+--------------+-----------------+------------+------------+------------------+------------+\n",
      "|           HB302| 40.74439783309563|-74.03450086712837|           627|391.5103668261563|         472|         155|0.7527910685793416|         621|\n",
      "|           JC034|40.734785818000034|-74.05044363600003|           477|552.5639412997904|         384|          93|0.8050314465391928|         478|\n",
      "|           JC103| 40.73367000000002|          -74.0625|           613|508.7438825448613|         494|         119|0.8058727569318012|         608|\n",
      "|           HB202|40.752960630465054|-74.02435272932053|          1545|610.6252427184467|        1115|         430| 0.721682847895973|        1506|\n",
      "|           JC082| 40.72165072487998|-74.04288411140442|          1081|400.8788159111933|         949|         132|0.8778908418123239|        1072|\n",
      "+----------------+------------------+------------------+--------------+-----------------+------------+------------+------------------+------------+\n",
      "only showing top 5 rows\n"
     ]
    }
   ],
   "source": [
    "from pyspark.ml.feature import VectorAssembler, StandardScaler\n",
    "from pyspark.ml.clustering import KMeans\n",
    "from pyspark.ml.evaluation import ClusteringEvaluator\n",
    "\n",
    "df_s = spark.read.parquet(CFG[\"paths\"][\"silver\"])\n",
    "\n",
    "# ── Features par station ──────────────────────────────────────────────────────\n",
    "trips = df_s  # déjà filtré en Silver\n",
    "\n",
    "station_features = trips.groupBy(\"start_station_id\").agg(\n",
    "    F.avg(\"start_lat\").alias(\"avg_lat\"),\n",
    "    F.avg(\"start_lng\").alias(\"avg_lng\"),\n",
    "    F.count(\"*\").alias(\"num_departures\"),\n",
    "    F.avg(\"duration_sec\").alias(\"avg_duration\"),\n",
    "    F.sum(F.when(F.col(\"member_casual\") == \"member\", 1).otherwise(0)).alias(\"member_trips\"),\n",
    "    F.sum(F.when(F.col(\"member_casual\") == \"casual\", 1).otherwise(0)).alias(\"casual_trips\"),\n",
    ").withColumn(\"member_ratio\",\n",
    "    F.col(\"member_trips\") / (F.col(\"num_departures\") + 1e-9))\n",
    "\n",
    "arrivals = trips.filter(F.col(\"end_station_id\").isNotNull())     .groupBy(\"end_station_id\")     .agg(F.count(\"*\").alias(\"num_arrivals\"))\n",
    "\n",
    "station_features = station_features.join(\n",
    "    arrivals.withColumnRenamed(\"end_station_id\", \"start_station_id\"),\n",
    "    on=\"start_station_id\", how=\"left\"\n",
    ").fillna({\"num_arrivals\": 0})\n",
    "\n",
    "n_stations = station_features.count()\n",
    "print(f\"Stations : {n_stations}\")\n",
    "station_features.show(5)\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "id": "0399859a",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Features normalisées et mises en cache.\n",
      "+----------------+------------------------------------------------------------------------------------------------------------------------+\n",
      "|start_station_id|features                                                                                                                |\n",
      "+----------------+------------------------------------------------------------------------------------------------------------------------+\n",
      "|HB302           |[1.18592864962337,0.8883630558763127,-0.2131585015866534,-1.0453048099319142,0.08519701621730572,-0.20957682864179122]  |\n",
      "|JC034           |[0.571256110426038,0.1282208125994531,-0.45533310890159717,-0.3736012961066673,0.5505941898202337,-0.43642744130956695] |\n",
      "|JC103           |[0.49990138651737825,-0.4466198346232357,-0.23576146493604813,-0.5563608997845928,0.5580892251575282,-0.230199611611589]|\n",
      "+----------------+------------------------------------------------------------------------------------------------------------------------+\n",
      "only showing top 3 rows\n"
     ]
    }
   ],
   "source": [
    "# ── Assemblage et normalisation ───────────────────────────────────────────────\n",
    "FEATURE_COLS = CFG[\"clustering\"][\"feature_cols\"]\n",
    "\n",
    "assembler = VectorAssembler(inputCols=FEATURE_COLS, outputCol=\"raw_features\",\n",
    "                            handleInvalid=\"skip\")\n",
    "scaler = StandardScaler(inputCol=\"raw_features\", outputCol=\"features\",\n",
    "                        withStd=True, withMean=True)\n",
    "\n",
    "assembled    = assembler.transform(station_features)\n",
    "scaler_model = scaler.fit(assembled)\n",
    "feature_df   = scaler_model.transform(assembled).cache()\n",
    "feature_df.count()\n",
    "\n",
    "print(\"Features normalisées et mises en cache.\")\n",
    "feature_df.select(\"start_station_id\", \"features\").show(3, truncate=False)\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "id": "a25925fe",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "   k |   Silhouette |          WSSSE |   Time(s)\n",
      "------------------------------------------------\n",
      "   2 |     0.676834 |         326.04 |      2.62\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "C:\\Users\\yoyod\\AppData\\Local\\Temp\\ipykernel_22608\\4028596838.py:17: DeprecationWarning: datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).\n",
      "  \"timestamp\":    datetime.datetime.utcnow().isoformat()\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "   3 |     0.509840 |         234.30 |      1.14\n",
      "   4 |     0.557434 |         218.54 |      1.46\n",
      "   5 |     0.481583 |         167.14 |      1.17\n",
      "   6 |     0.453344 |         136.67 |      1.15\n",
      "   7 |     0.349234 |         130.76 |      1.11\n",
      "   8 |     0.422512 |         115.87 |      1.05\n",
      "\n",
      "→ Meilleur k = 2  |  Silhouette = 0.676834\n",
      "SLO Silhouette ≥ 0.25 : ✅ OK\n"
     ]
    }
   ],
   "source": [
    "# ── Sweep KMeans k=2..8 ───────────────────────────────────────────────────────\n",
    "evaluator = ClusteringEvaluator(featuresCol=\"features\", metricName=\"silhouette\",\n",
    "                                distanceMeasure=\"squaredEuclidean\")\n",
    "\n",
    "spark.conf.set(\"spark.sql.shuffle.partitions\",\n",
    "               str(CFG[\"spark\"][\"shuffle_partitions_default\"]))\n",
    "\n",
    "best_k, best_sil, best_model, best_preds = 2, -1, None, None\n",
    "k_range = range(CFG[\"clustering\"][\"k_range\"][0],\n",
    "                CFG[\"clustering\"][\"k_range\"][1] + 1)\n",
    "\n",
    "print(f\"{'k':>4} | {'Silhouette':>12} | {'WSSSE':>14} | {'Time(s)':>9}\")\n",
    "print(\"-\" * 48)\n",
    "\n",
    "for k in k_range:\n",
    "    t0 = time.time()\n",
    "    km = KMeans(k=k, seed=42, featuresCol=\"features\",\n",
    "                predictionCol=\"prediction\",\n",
    "                maxIter=CFG[\"clustering\"][\"max_iter\"])\n",
    "    model = km.fit(feature_df)\n",
    "    preds = model.transform(feature_df)\n",
    "    sil   = evaluator.evaluate(preds)\n",
    "    wssse = model.summary.trainingCost\n",
    "    elapsed = time.time() - t0\n",
    "\n",
    "    print(f\"{k:>4} | {sil:>12.6f} | {wssse:>14.2f} | {elapsed:>9.2f}\")\n",
    "\n",
    "    log_metric(\"r1\", \"clustering\", f\"kmeans_k{k}\", \"silhouette\",  round(sil,6),   \"\")\n",
    "    log_metric(\"r1\", \"clustering\", f\"kmeans_k{k}\", \"wssse\",       round(wssse,4), \"\")\n",
    "    log_metric(\"r1\", \"clustering\", f\"kmeans_k{k}\", \"elapsed_sec\", round(elapsed,3), \"\")\n",
    "\n",
    "    if sil > best_sil:\n",
    "        best_sil, best_k = sil, k\n",
    "        best_model, best_preds = model, preds\n",
    "\n",
    "print(f\"\\n→ Meilleur k = {best_k}  |  Silhouette = {best_sil:.6f}\")\n",
    "slo_ok = best_sil >= CFG[\"slos\"][\"silhouette_min\"]\n",
    "print(f\"SLO Silhouette ≥ {CFG['slos']['silhouette_min']} : {'✅ OK' if slo_ok else '❌ KO'}\")\n",
    "\n",
    "log_metric(\"r1\", \"clustering\", \"sweep_result\", \"best_k\",         best_k,          \"\")\n",
    "log_metric(\"r1\", \"clustering\", \"sweep_result\", \"best_silhouette\", round(best_sil,6), \"\")\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "id": "1046ba3d",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "BEFORE (200 partitions) — sil=0.676834 wssse=326.04 time=0.96s\n",
      "AFTER  (12 partitions) — sil=0.676834 wssse=326.04 time=1.53s\n",
      "Speed-up : 0.63x\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "C:\\Users\\yoyod\\AppData\\Local\\Temp\\ipykernel_22608\\4028596838.py:17: DeprecationWarning: datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).\n",
      "  \"timestamp\":    datetime.datetime.utcnow().isoformat()\n"
     ]
    }
   ],
   "source": [
    "# ── Partitionnement BEFORE / AFTER ───────────────────────────────────────────\n",
    "# BEFORE : 200 partitions (défaut)\n",
    "spark.conf.set(\"spark.sql.shuffle.partitions\",\n",
    "               str(CFG[\"spark\"][\"shuffle_partitions_default\"]))\n",
    "\n",
    "t0 = time.time()\n",
    "km_b = KMeans(k=best_k, seed=42, featuresCol=\"features\",\n",
    "              predictionCol=\"prediction\",\n",
    "              maxIter=CFG[\"clustering\"][\"max_iter\"])\n",
    "m_b  = km_b.fit(feature_df)\n",
    "p_b  = m_b.transform(feature_df)\n",
    "sil_b   = evaluator.evaluate(p_b)\n",
    "wssse_b = m_b.summary.trainingCost\n",
    "t_before = time.time() - t0\n",
    "\n",
    "plan_before = p_b._jdf.queryExecution().toString()\n",
    "(pathlib.Path(CFG[\"paths\"][\"proof\"]) / \"plan_before.txt\").write_text(plan_before)\n",
    "\n",
    "# AFTER : partitions réduites + repartition par station_id\n",
    "tuned = CFG[\"spark\"][\"shuffle_partitions_tuned\"]\n",
    "spark.conf.set(\"spark.sql.shuffle.partitions\", str(tuned))\n",
    "\n",
    "feature_df_r = feature_df.repartition(tuned, \"start_station_id\").cache()\n",
    "feature_df_r.count()\n",
    "\n",
    "t0 = time.time()\n",
    "km_a = KMeans(k=best_k, seed=42, featuresCol=\"features\",\n",
    "              predictionCol=\"prediction\",\n",
    "              maxIter=CFG[\"clustering\"][\"max_iter\"])\n",
    "m_a  = km_a.fit(feature_df_r)\n",
    "p_a  = m_a.transform(feature_df_r)\n",
    "sil_a   = evaluator.evaluate(p_a)\n",
    "wssse_a = m_a.summary.trainingCost\n",
    "t_after = time.time() - t0\n",
    "\n",
    "plan_after = p_a._jdf.queryExecution().toString()\n",
    "(pathlib.Path(CFG[\"paths\"][\"proof\"]) / \"plan_after.txt\").write_text(plan_after)\n",
    "\n",
    "speedup = t_before / t_after if t_after > 0 else 0\n",
    "\n",
    "print(f\"BEFORE (200 partitions) — sil={sil_b:.6f} wssse={wssse_b:.2f} time={t_before:.2f}s\")\n",
    "print(f\"AFTER  ({tuned} partitions) — sil={sil_a:.6f} wssse={wssse_a:.2f} time={t_after:.2f}s\")\n",
    "print(f\"Speed-up : {speedup:.2f}x\")\n",
    "\n",
    "log_metric(\"r1\", \"clustering\", \"partition_before\", \"elapsed_sec\", round(t_before,3), \"200 partitions\")\n",
    "log_metric(\"r1\", \"clustering\", \"partition_before\", \"silhouette\",  round(sil_b,6),   \"\")\n",
    "log_metric(\"r1\", \"clustering\", \"partition_after\",  \"elapsed_sec\", round(t_after,3), f\"{tuned} partitions\")\n",
    "log_metric(\"r1\", \"clustering\", \"partition_after\",  \"silhouette\",  round(sil_a,6),   \"\")\n",
    "log_metric(\"r1\", \"clustering\", \"partition_after\",  \"speedup\",     round(speedup,2), \"\")\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "id": "d132c948",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Stabilité (k=2, 7 seeds) :\n",
      "  Seed |   Silhouette |          WSSSE |   Time(s)\n",
      "--------------------------------------------------\n",
      "     0 |     0.676834 |         326.04 |      1.14\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "C:\\Users\\yoyod\\AppData\\Local\\Temp\\ipykernel_22608\\4028596838.py:17: DeprecationWarning: datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).\n",
      "  \"timestamp\":    datetime.datetime.utcnow().isoformat()\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "     7 |     0.676834 |         326.04 |      1.36\n",
      "    42 |     0.676834 |         326.04 |      1.52\n",
      "    99 |     0.676834 |         326.04 |      1.26\n",
      "   123 |     0.676834 |         326.04 |      1.07\n",
      "   256 |     0.676834 |         326.04 |      1.11\n",
      "   512 |     0.676834 |         326.04 |      1.47\n",
      "\n",
      "Silhouette — mean=0.676834  std=0.000000\n",
      "\n",
      "Assignations sauvegardées → outputs/project/gold/cluster_assignments\n",
      "+----------+-----+\n",
      "|prediction|count|\n",
      "+----------+-----+\n",
      "|         0|   80|\n",
      "|         1|   28|\n",
      "+----------+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# ── Stabilité des seeds ───────────────────────────────────────────────────────\n",
    "SEEDS = CFG[\"clustering\"][\"seeds\"]\n",
    "sil_scores = []\n",
    "\n",
    "print(f\"Stabilité (k={best_k}, {len(SEEDS)} seeds) :\")\n",
    "print(f\"{'Seed':>6} | {'Silhouette':>12} | {'WSSSE':>14} | {'Time(s)':>9}\")\n",
    "print(\"-\" * 50)\n",
    "\n",
    "for seed in SEEDS:\n",
    "    t0 = time.time()\n",
    "    km_s = KMeans(k=best_k, seed=seed, featuresCol=\"features\",\n",
    "                  predictionCol=\"prediction\",\n",
    "                  maxIter=CFG[\"clustering\"][\"max_iter\"])\n",
    "    m_s  = km_s.fit(feature_df_r)\n",
    "    p_s  = m_s.transform(feature_df_r)\n",
    "    sil_s   = evaluator.evaluate(p_s)\n",
    "    wssse_s = m_s.summary.trainingCost\n",
    "    elapsed = time.time() - t0\n",
    "    sil_scores.append(sil_s)\n",
    "\n",
    "    print(f\"{seed:>6} | {sil_s:>12.6f} | {wssse_s:>14.2f} | {elapsed:>9.2f}\")\n",
    "    log_metric(\"r1\", \"clustering\", f\"seed_{seed}\", \"silhouette\",  round(sil_s,6),   \"\")\n",
    "    log_metric(\"r1\", \"clustering\", f\"seed_{seed}\", \"wssse\",       round(wssse_s,4), \"\")\n",
    "    log_metric(\"r1\", \"clustering\", f\"seed_{seed}\", \"elapsed_sec\", round(elapsed,3), \"\")\n",
    "\n",
    "mean_sil = statistics.mean(sil_scores)\n",
    "std_sil  = statistics.stdev(sil_scores)\n",
    "print(f\"\\nSilhouette — mean={mean_sil:.6f}  std={std_sil:.6f}\")\n",
    "\n",
    "log_metric(\"r1\", \"clustering\", \"seed_stability\", \"sil_mean\", round(mean_sil,6), \"\")\n",
    "log_metric(\"r1\", \"clustering\", \"seed_stability\", \"sil_std\",  round(std_sil,6),  \"\")\n",
    "\n",
    "# ── Sauvegarde des clusters ───────────────────────────────────────────────────\n",
    "cluster_out = p_a.select(\n",
    "    \"start_station_id\", \"avg_lat\", \"avg_lng\",\n",
    "    \"num_departures\", \"num_arrivals\", \"avg_duration\", \"member_ratio\",\n",
    "    \"prediction\"\n",
    ")\n",
    "cluster_out.write.mode(\"overwrite\").csv(\n",
    "    f\"{CFG['paths']['gold']}/cluster_assignments\", header=True)\n",
    "print(f\"\\nAssignations sauvegardées → {CFG['paths']['gold']}/cluster_assignments\")\n",
    "cluster_out.groupBy(\"prediction\").count().orderBy(\"prediction\").show()\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "77835211",
   "metadata": {},
   "source": [
    "## 7. LLM Data Readiness\n",
    "\n",
    "Préparation d'un dataset curé pour RAG (Retrieval-Augmented Generation)  \n",
    "sur les noms de stations Citi Bike de Jersey City.\n",
    "\n",
    "**Filtres qualité appliqués :**\n",
    "- Suppression des `start_station_name` ou `end_station_name` nuls\n",
    "- Longueur minimale du texte ≥ 10 caractères\n",
    "- Déduplication par hash de contenu (`xxhash64`)\n",
    "- Métadonnées : `source`, `version`, `curated_at`\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "id": "894a1930",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "LLM-ready dataset : 6,454 documents (sur 81,975 en Silver)\n",
      "Taux qualité      : 7.9%\n",
      "SLO qualité ≥ 80% : ❌ KO\n",
      "Temps             : 1.3s\n",
      "Chemin            : outputs/project/llm_ready\n",
      "root\n",
      " |-- doc_id: string (nullable = true)\n",
      " |-- text: string (nullable = false)\n",
      " |-- metadata: struct (nullable = false)\n",
      " |    |-- rideable_type: string (nullable = true)\n",
      " |    |-- member_casual: string (nullable = true)\n",
      " |    |-- started_at: timestamp (nullable = true)\n",
      " |    |-- start_station_id: string (nullable = true)\n",
      " |    |-- end_station_id: string (nullable = true)\n",
      " |-- content_hash: long (nullable = false)\n",
      " |-- source: string (nullable = false)\n",
      " |-- version: string (nullable = false)\n",
      " |-- curated_at: timestamp (nullable = false)\n",
      "\n",
      "+----------------+----------------------------------------------------------+------------------------------------------------------------+--------------------+---------------------------+-------+--------------------------+\n",
      "|          doc_id|                                                      text|                                                    metadata|        content_hash|                     source|version|                curated_at|\n",
      "+----------------+----------------------------------------------------------+------------------------------------------------------------+--------------------+---------------------------+-------+--------------------------+\n",
      "|7775C4DBF514CDD2|                      12 St & Sinatra Dr N to Jersey & 3rd|{electric_bike, casual, 2026-04-12 19:16:15.716, HB201, J...|-9216639952707537945|JC-202604-citibike-tripdata|   v1.0|2026-05-24 23:16:19.595997|\n",
      "|9B312D078C6099EF|                        Madison St & 10 St to Manila & 1st|{electric_bike, member, 2026-04-23 19:12:28.129, HB503, J...|-9203552489675778638|JC-202604-citibike-tripdata|   v1.0|2026-05-24 23:16:19.595997|\n",
      "|F899D1AC9010A0C3|Newark St & Washington St to 9 St HBLR - Jackson St & 8 St|{electric_bike, casual, 2026-04-23 13:06:07.773, HB612, H...|-9189194007855265133|JC-202604-citibike-tripdata|   v1.0|2026-05-24 23:16:19.595997|\n",
      "|EF40805C89D29893|                     Heights Elevator to JC Medical Center|{electric_bike, member, 2026-04-24 10:06:11.518, JC059, J...|-9181444170804136639|JC-202604-citibike-tripdata|   v1.0|2026-05-24 23:16:19.595997|\n",
      "|8AA3D63B4663A3DB|                         Columbus Drive to 2 St & Park Ave|{electric_bike, member, 2026-04-16 14:58:14.901, JC014, H...|-9170513796671708817|JC-202604-citibike-tripdata|   v1.0|2026-05-24 23:16:19.595997|\n",
      "+----------------+----------------------------------------------------------+------------------------------------------------------------+--------------------+---------------------------+-------+--------------------------+\n",
      "only showing top 5 rows\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "C:\\Users\\yoyod\\AppData\\Local\\Temp\\ipykernel_22608\\4028596838.py:17: DeprecationWarning: datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).\n",
      "  \"timestamp\":    datetime.datetime.utcnow().isoformat()\n"
     ]
    }
   ],
   "source": [
    "LLM_DIR = CFG[\"paths\"][\"llm_ready\"]\n",
    "llm_cfg = CFG[\"llm\"]\n",
    "\n",
    "t0 = time.time()\n",
    "\n",
    "df_s = spark.read.parquet(CFG[\"paths\"][\"silver\"])\n",
    "\n",
    "# ── Pipeline LLM ──────────────────────────────────────────────────────────────\n",
    "df_llm = (\n",
    "    df_s\n",
    "    # Filtre 1 : stations non nulles\n",
    "    .filter(F.col(\"start_station_name\").isNotNull() &\n",
    "            F.col(\"end_station_name\").isNotNull())\n",
    "    # Construction du texte\n",
    "    .withColumn(\n",
    "        \"text\",\n",
    "        F.concat_ws(\" to \",\n",
    "                    F.col(\"start_station_name\"),\n",
    "                    F.col(\"end_station_name\"))\n",
    "    )\n",
    "    # Filtre 2 : longueur minimale\n",
    "    .filter(F.length(\"text\") >= llm_cfg[\"min_text_length\"])\n",
    "    # Déduplication par hash de contenu\n",
    "    .withColumn(\"content_hash\", F.xxhash64(\"text\"))\n",
    "    .dropDuplicates([\"content_hash\"])\n",
    "    # Schéma final : doc_id, text, metadata\n",
    "    .select(\n",
    "        F.col(\"ride_id\").alias(\"doc_id\"),\n",
    "        F.col(\"text\"),\n",
    "        F.struct(\n",
    "            F.col(\"rideable_type\"),\n",
    "            F.col(\"member_casual\"),\n",
    "            F.col(\"started_at\"),\n",
    "            F.col(\"start_station_id\"),\n",
    "            F.col(\"end_station_id\"),\n",
    "        ).alias(\"metadata\"),\n",
    "        F.col(\"content_hash\"),\n",
    "        F.lit(llm_cfg[\"source\"]).alias(\"source\"),\n",
    "        F.lit(llm_cfg[\"version\"]).alias(\"version\"),\n",
    "        F.current_timestamp().alias(\"curated_at\"),\n",
    "    )\n",
    ")\n",
    "\n",
    "n_llm   = df_llm.count()\n",
    "n_input = df_s.filter(\n",
    "    F.col(\"start_station_name\").isNotNull() &\n",
    "    F.col(\"end_station_name\").isNotNull()\n",
    ").count()\n",
    "\n",
    "quality_pct = n_llm / n_silver * 100\n",
    "\n",
    "df_llm.write.mode(\"overwrite\").parquet(LLM_DIR)\n",
    "\n",
    "t_llm = time.time() - t0\n",
    "\n",
    "print(f\"LLM-ready dataset : {n_llm:,} documents (sur {n_silver:,} en Silver)\")\n",
    "print(f\"Taux qualité      : {quality_pct:.1f}%\")\n",
    "slo_ok = quality_pct/100 >= CFG[\"slos\"][\"llm_quality_pct\"]\n",
    "print(f\"SLO qualité ≥ {CFG['slos']['llm_quality_pct']*100:.0f}% : {'✅ OK' if slo_ok else '❌ KO'}\")\n",
    "print(f\"Temps             : {t_llm:.1f}s\")\n",
    "print(f\"Chemin            : {LLM_DIR}\")\n",
    "df_llm.printSchema()\n",
    "df_llm.show(5, truncate=60)\n",
    "\n",
    "log_metric(\"r1\", \"llm_prep\", \"curation\", \"n_docs\",       n_llm,             \"après dédup\")\n",
    "log_metric(\"r1\", \"llm_prep\", \"curation\", \"quality_pct\",  round(quality_pct,2), \"% Silver passant les filtres\")\n",
    "log_metric(\"r1\", \"llm_prep\", \"curation\", \"elapsed_sec\",  round(t_llm,2),    \"\")\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "id": "848a895b",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "=== DATA CARD ===\n",
      "{\n",
      "  \"source\": \"JC-202604-citibike-tripdata\",\n",
      "  \"version\": \"v1.0\",\n",
      "  \"curated_at\": \"2026-05-24T21:16:20.024838\",\n",
      "  \"n_documents\": 6454,\n",
      "  \"schema\": {\n",
      "    \"doc_id\": \"string \\u2014 ride_id unique Citi Bike\",\n",
      "    \"text\": \"string \\u2014 'start_station_name to end_station_name'\",\n",
      "    \"metadata\": \"struct \\u2014 rideable_type, member_casual, started_at, station_ids\",\n",
      "    \"content_hash\": \"long   \\u2014 xxhash64 du texte (d\\u00e9duplication)\",\n",
      "    \"source\": \"string \\u2014 identifiant du dataset source\",\n",
      "    \"version\": \"string \\u2014 version du pipeline de curation\",\n",
      "    \"curated_at\": \"timestamp \\u2014 date de production\"\n",
      "  },\n",
      "  \"quality_filters\": [\n",
      "    \"start_station_name IS NOT NULL\",\n",
      "    \"end_station_name IS NOT NULL\",\n",
      "    \"length(text) >= 10\",\n",
      "    \"dropDuplicates(['content_hash'])\"\n",
      "  ],\n",
      "  \"intended_use\": \"RAG over Jersey City Citi Bike station names\",\n",
      "  \"known_issues\": [\n",
      "    \"V\\u00e9los \\u00e9lectriques sans ancrage (dockless) exclus car pas de station associ\\u00e9e\",\n",
      "    \"Vocabulaire limit\\u00e9 aux noms de stations JC (~108 stations)\",\n",
      "    \"Dataset mensuel \\u2014 saisonnalit\\u00e9 non captur\\u00e9e\"\n",
      "  ],\n",
      "  \"size_records\": 6454,\n",
      "  \"format\": \"Parquet (compression Snappy)\",\n",
      "  \"language\": \"en\"\n",
      "}\n",
      "\n",
      "Sauvegardée → proof/data_card.json\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "C:\\Users\\yoyod\\AppData\\Local\\Temp\\ipykernel_22608\\3680592781.py:5: DeprecationWarning: datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).\n",
      "  \"curated_at\":    datetime.datetime.utcnow().isoformat(),\n"
     ]
    }
   ],
   "source": [
    "# ── Data Card ─────────────────────────────────────────────────────────────────\n",
    "data_card = {\n",
    "    \"source\":        llm_cfg[\"source\"],\n",
    "    \"version\":       llm_cfg[\"version\"],\n",
    "    \"curated_at\":    datetime.datetime.utcnow().isoformat(),\n",
    "    \"n_documents\":   n_llm,\n",
    "    \"schema\": {\n",
    "        \"doc_id\":       \"string — ride_id unique Citi Bike\",\n",
    "        \"text\":         \"string — 'start_station_name to end_station_name'\",\n",
    "        \"metadata\":     \"struct — rideable_type, member_casual, started_at, station_ids\",\n",
    "        \"content_hash\": \"long   — xxhash64 du texte (déduplication)\",\n",
    "        \"source\":       \"string — identifiant du dataset source\",\n",
    "        \"version\":      \"string — version du pipeline de curation\",\n",
    "        \"curated_at\":   \"timestamp — date de production\",\n",
    "    },\n",
    "    \"quality_filters\": [\n",
    "        \"start_station_name IS NOT NULL\",\n",
    "        \"end_station_name IS NOT NULL\",\n",
    "        f\"length(text) >= {llm_cfg['min_text_length']}\",\n",
    "        \"dropDuplicates(['content_hash'])\",\n",
    "    ],\n",
    "    \"intended_use\":  llm_cfg[\"intended_use\"],\n",
    "    \"known_issues\":  [\n",
    "        \"Vélos électriques sans ancrage (dockless) exclus car pas de station associée\",\n",
    "        \"Vocabulaire limité aux noms de stations JC (~108 stations)\",\n",
    "        \"Dataset mensuel — saisonnalité non capturée\",\n",
    "    ],\n",
    "    \"size_records\": n_llm,\n",
    "    \"format\":        \"Parquet (compression Snappy)\",\n",
    "    \"language\":      \"en\",\n",
    "}\n",
    "\n",
    "data_card_path = f\"{CFG['paths']['proof']}/data_card.json\"\n",
    "with open(data_card_path, \"w\") as f:\n",
    "    json.dump(data_card, f, indent=2, default=str)\n",
    "\n",
    "print(\"=== DATA CARD ===\")\n",
    "print(json.dumps(data_card, indent=2, default=str))\n",
    "print(f\"\\nSauvegardée → {data_card_path}\")\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "id": "9f35d4ea",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Plan LLM sauvegardé → proof/plan_llm_curation.txt\n"
     ]
    }
   ],
   "source": [
    "# ── Plan LLM curation ─────────────────────────────────────────────────────────\n",
    "old_stdout = sys.stdout\n",
    "sys.stdout = buf = io.StringIO()\n",
    "df_llm.explain(\"formatted\")\n",
    "sys.stdout = old_stdout\n",
    "with open(f\"{CFG['paths']['proof']}/plan_llm_curation.txt\", \"w\") as f:\n",
    "    f.write(buf.getvalue())\n",
    "print(\"Plan LLM sauvegardé → proof/plan_llm_curation.txt\")\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "27e5d00e",
   "metadata": {},
   "source": [
    "## 8. Evidence — Plans & Métriques consolidées\n",
    "\n",
    "Sauvegarde de `project_metrics_log.csv` avec toutes les métriques collectées  \n",
    "tout au long du pipeline. Résumé final des SLOs.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "id": "95696d22",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "project_metrics_log.csv écrit — 88 entrées\n",
      "\n",
      "Entrées par stage :\n",
      "stage\n",
      "clustering    51\n",
      "etl            9\n",
      "llm_prep       3\n",
      "streaming      8\n",
      "text          17\n",
      "\n",
      "run_id      stage                 task        metric_name  metric_value                        notes                  timestamp\n",
      "    r1        etl       bronze_landing          row_count  8.227200e+04                 raw CSV rows 2026-05-24T21:08:24.459161\n",
      "    r1        etl       bronze_landing        elapsed_sec  6.100000e+00                          NaN 2026-05-24T21:08:24.459336\n",
      "    r1        etl         silver_clean          row_count  8.197500e+04                après filtres 2026-05-24T21:08:32.458399\n",
      "    r1        etl         silver_clean       rows_dropped  2.970000e+02                          NaN 2026-05-24T21:08:32.458510\n",
      "    r1        etl         silver_clean      retention_pct  9.964000e+01                          NaN 2026-05-24T21:08:32.458607\n",
      "    r1        etl         silver_clean        elapsed_sec  6.610000e+00                          NaN 2026-05-24T21:08:32.458677\n",
      "    r1        etl         gold_station          row_count  1.080000e+02             stations uniques 2026-05-24T21:08:40.081511\n",
      "    r1        etl           gold_daily          row_count  3.000000e+01              jours distincts 2026-05-24T21:08:40.081566\n",
      "    r1        etl           gold_build        elapsed_sec  5.110000e+00                          NaN 2026-05-24T21:08:40.081622\n",
      "    r1  streaming        run1_baseline inputRowsPerSecond  0.000000e+00  trigger=10s watermark=10min 2026-05-24T21:12:11.918811\n",
      "    r1  streaming        run1_baseline durationMs_trigger  1.549000e+03                          NaN 2026-05-24T21:12:11.918823\n",
      "    r1  streaming        run1_baseline          stateRows  1.000000e+01                          NaN 2026-05-24T21:12:11.918829\n",
      "    r1  streaming        run1_baseline      parquet_files  2.000000e+01                          NaN 2026-05-24T21:12:11.918832\n",
      "    r2  streaming       run2_optimized inputRowsPerSecond  0.000000e+00    trigger=5s watermark=5min 2026-05-24T21:15:42.505597\n",
      "    r2  streaming       run2_optimized durationMs_trigger  0.000000e+00                          NaN 2026-05-24T21:15:42.505619\n",
      "    r2  streaming       run2_optimized          stateRows  1.000000e+01                          NaN 2026-05-24T21:15:42.505626\n",
      "    r2  streaming       run2_optimized      parquet_files  1.600000e+01                          NaN 2026-05-24T21:15:42.505630\n",
      "    r1       text         corpus_build             n_docs  8.170300e+04        trajets avec stations 2026-05-24T21:15:43.494426\n",
      "    r1       text         corpus_build            avg_len  4.170000e+01                        chars 2026-05-24T21:15:43.494556\n",
      "    r1       text         tokenization      tokens_before  6.241830e+05                          NaN 2026-05-24T21:15:45.772024\n",
      "    r1       text         tokenization       tokens_after  5.382540e+05                          NaN 2026-05-24T21:15:45.772174\n",
      "    r1       text         tokenization         filter_pct  1.377000e+01       % stop-words supprimés 2026-05-24T21:15:45.772264\n",
      "    r1       text          index_build       unique_terms  2.430000e+02                          NaN 2026-05-24T21:15:48.734413\n",
      "    r1       text          index_build        elapsed_sec  5.900000e-01                          NaN 2026-05-24T21:15:48.734551\n",
      "    r1       text            footprint      parquet_bytes  6.778177e+06                          NaN 2026-05-24T21:15:51.417547\n",
      "    r1       text            footprint          csv_bytes  9.224983e+06                          NaN 2026-05-24T21:15:51.417606\n",
      "    r1       text            footprint              ratio  7.348000e-01                  parquet/csv 2026-05-24T21:15:51.417652\n",
      "    r1       text          query_grove         latency_ms  2.177400e+02      freq=7324 postings=7324 2026-05-24T21:15:52.257593\n",
      "    r1       text       query_hamilton         latency_ms  9.882000e+01      freq=3587 postings=3587 2026-05-24T21:15:52.356549\n",
      "    r1       text        query_newport         latency_ms  6.690000e+01      freq=5466 postings=5466 2026-05-24T21:15:52.423566\n",
      "    r1       text         query_jersey         latency_ms  5.465000e+01      freq=2689 postings=2689 2026-05-24T21:15:52.478441\n",
      "    r1       text           query_path         latency_ms  5.119000e+01    freq=10184 postings=10184 2026-05-24T21:15:52.529761\n",
      "    r1       text           query_park         latency_ms  6.829000e+01    freq=21431 postings=21431 2026-05-24T21:15:52.598167\n",
      "    r1       text query_xzqintrouvable         latency_ms  5.227000e+01            freq=0 postings=0 2026-05-24T21:15:52.650594\n",
      "    r1 clustering            kmeans_k2         silhouette  6.768340e-01                          NaN 2026-05-24T21:15:59.130364\n",
      "    r1 clustering            kmeans_k2              wssse  3.260415e+02                          NaN 2026-05-24T21:15:59.130378\n",
      "    r1 clustering            kmeans_k2        elapsed_sec  2.623000e+00                          NaN 2026-05-24T21:15:59.130384\n",
      "    r1 clustering            kmeans_k3         silhouette  5.098400e-01                          NaN 2026-05-24T21:16:00.269274\n",
      "    r1 clustering            kmeans_k3              wssse  2.343005e+02                          NaN 2026-05-24T21:16:00.269283\n",
      "    r1 clustering            kmeans_k3        elapsed_sec  1.139000e+00                          NaN 2026-05-24T21:16:00.269287\n",
      "    r1 clustering            kmeans_k4         silhouette  5.574340e-01                          NaN 2026-05-24T21:16:01.724431\n",
      "    r1 clustering            kmeans_k4              wssse  2.185406e+02                          NaN 2026-05-24T21:16:01.724443\n",
      "    r1 clustering            kmeans_k4        elapsed_sec  1.455000e+00                          NaN 2026-05-24T21:16:01.724447\n",
      "    r1 clustering            kmeans_k5         silhouette  4.815830e-01                          NaN 2026-05-24T21:16:02.895198\n",
      "    r1 clustering            kmeans_k5              wssse  1.671396e+02                          NaN 2026-05-24T21:16:02.895212\n",
      "    r1 clustering            kmeans_k5        elapsed_sec  1.171000e+00                          NaN 2026-05-24T21:16:02.895216\n",
      "    r1 clustering            kmeans_k6         silhouette  4.533440e-01                          NaN 2026-05-24T21:16:04.043894\n",
      "    r1 clustering            kmeans_k6              wssse  1.366671e+02                          NaN 2026-05-24T21:16:04.043902\n",
      "    r1 clustering            kmeans_k6        elapsed_sec  1.149000e+00                          NaN 2026-05-24T21:16:04.043906\n",
      "    r1 clustering            kmeans_k7         silhouette  3.492340e-01                          NaN 2026-05-24T21:16:05.153624\n",
      "    r1 clustering            kmeans_k7              wssse  1.307607e+02                          NaN 2026-05-24T21:16:05.153635\n",
      "    r1 clustering            kmeans_k7        elapsed_sec  1.110000e+00                          NaN 2026-05-24T21:16:05.153640\n",
      "    r1 clustering            kmeans_k8         silhouette  4.225120e-01                          NaN 2026-05-24T21:16:06.206511\n",
      "    r1 clustering            kmeans_k8              wssse  1.158728e+02                          NaN 2026-05-24T21:16:06.206520\n",
      "    r1 clustering            kmeans_k8        elapsed_sec  1.053000e+00                          NaN 2026-05-24T21:16:06.206524\n",
      "    r1 clustering         sweep_result             best_k  2.000000e+00                          NaN 2026-05-24T21:16:06.206789\n",
      "    r1 clustering         sweep_result    best_silhouette  6.768340e-01                          NaN 2026-05-24T21:16:06.206830\n",
      "    r1 clustering     partition_before        elapsed_sec  9.610000e-01               200 partitions 2026-05-24T21:16:08.927365\n",
      "    r1 clustering     partition_before         silhouette  6.768340e-01                          NaN 2026-05-24T21:16:08.927438\n",
      "    r1 clustering      partition_after        elapsed_sec  1.526000e+00                12 partitions 2026-05-24T21:16:08.927507\n",
      "    r1 clustering      partition_after         silhouette  6.768340e-01                          NaN 2026-05-24T21:16:08.927559\n",
      "    r1 clustering      partition_after            speedup  6.300000e-01                          NaN 2026-05-24T21:16:08.927609\n",
      "    r1 clustering               seed_0         silhouette  6.768340e-01                          NaN 2026-05-24T21:16:10.080684\n",
      "    r1 clustering               seed_0              wssse  3.260415e+02                          NaN 2026-05-24T21:16:10.080698\n",
      "    r1 clustering               seed_0        elapsed_sec  1.139000e+00                          NaN 2026-05-24T21:16:10.080704\n",
      "    r1 clustering               seed_7         silhouette  6.768340e-01                          NaN 2026-05-24T21:16:11.443468\n",
      "    r1 clustering               seed_7              wssse  3.260415e+02                          NaN 2026-05-24T21:16:11.443480\n",
      "    r1 clustering               seed_7        elapsed_sec  1.363000e+00                          NaN 2026-05-24T21:16:11.443485\n",
      "    r1 clustering              seed_42         silhouette  6.768340e-01                          NaN 2026-05-24T21:16:12.965106\n",
      "    r1 clustering              seed_42              wssse  3.260415e+02                          NaN 2026-05-24T21:16:12.965117\n",
      "    r1 clustering              seed_42        elapsed_sec  1.521000e+00                          NaN 2026-05-24T21:16:12.965122\n",
      "    r1 clustering              seed_99         silhouette  6.768340e-01                          NaN 2026-05-24T21:16:14.230054\n",
      "    r1 clustering              seed_99              wssse  3.260415e+02                          NaN 2026-05-24T21:16:14.230063\n",
      "    r1 clustering              seed_99        elapsed_sec  1.265000e+00                          NaN 2026-05-24T21:16:14.230067\n",
      "    r1 clustering             seed_123         silhouette  6.768340e-01                          NaN 2026-05-24T21:16:15.303091\n",
      "    r1 clustering             seed_123              wssse  3.260415e+02                          NaN 2026-05-24T21:16:15.303102\n",
      "    r1 clustering             seed_123        elapsed_sec  1.073000e+00                          NaN 2026-05-24T21:16:15.303105\n",
      "    r1 clustering             seed_256         silhouette  6.768340e-01                          NaN 2026-05-24T21:16:16.415628\n",
      "    r1 clustering             seed_256              wssse  3.260415e+02                          NaN 2026-05-24T21:16:16.415642\n",
      "    r1 clustering             seed_256        elapsed_sec  1.112000e+00                          NaN 2026-05-24T21:16:16.415650\n",
      "    r1 clustering             seed_512         silhouette  6.768340e-01                          NaN 2026-05-24T21:16:17.882094\n",
      "    r1 clustering             seed_512              wssse  3.260415e+02                          NaN 2026-05-24T21:16:17.882104\n",
      "    r1 clustering             seed_512        elapsed_sec  1.466000e+00                          NaN 2026-05-24T21:16:17.882108\n",
      "    r1 clustering       seed_stability           sil_mean  6.768340e-01                          NaN 2026-05-24T21:16:17.882582\n",
      "    r1 clustering       seed_stability            sil_std  0.000000e+00                          NaN 2026-05-24T21:16:17.882638\n",
      "    r1   llm_prep             curation             n_docs  6.454000e+03                  après dédup 2026-05-24T21:16:20.015600\n",
      "    r1   llm_prep             curation        quality_pct  7.870000e+00 % Silver passant les filtres 2026-05-24T21:16:20.015669\n",
      "    r1   llm_prep             curation        elapsed_sec  1.270000e+00                          NaN 2026-05-24T21:16:20.015714\n"
     ]
    }
   ],
   "source": [
    "# ── Écriture project_metrics_log.csv ─────────────────────────────────────────\n",
    "METRICS_PATH = \"project_metrics_log.csv\"\n",
    "fieldnames = [\"run_id\", \"stage\", \"task\", \"metric_name\",\n",
    "              \"metric_value\", \"notes\", \"timestamp\"]\n",
    "\n",
    "with open(METRICS_PATH, \"w\", newline=\"\", encoding=\"utf-8\") as f:\n",
    "    writer = csv.DictWriter(f, fieldnames=fieldnames)\n",
    "    writer.writeheader()\n",
    "    writer.writerows(METRICS)\n",
    "\n",
    "print(f\"project_metrics_log.csv écrit — {len(METRICS)} entrées\")\n",
    "\n",
    "# ── Preview ───────────────────────────────────────────────────────────────────\n",
    "import pandas as pd\n",
    "df_metrics = pd.read_csv(METRICS_PATH)\n",
    "print(f\"\\nEntrées par stage :\")\n",
    "print(df_metrics.groupby(\"stage\")[\"metric_name\"].count().to_string())\n",
    "print()\n",
    "print(df_metrics.to_string(index=False))\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "id": "c04c5c65",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "=================================================================\n",
      "RÉSUMÉ DES SLOs — DE2 Final Project Track C\n",
      "=================================================================\n",
      "  Parquet ≤ 60% CSV (text)                        0.7348  (seuil ≤0.6)  ❌ KO\n",
      "  Silhouette ≥ 0.25 (clustering)                  0.6768  (seuil ≥0.25)  ✅ OK\n",
      "  LLM qualité ≥ 80%                               0.0787  (seuil ≥0.8)  ❌ KO\n",
      "=================================================================\n",
      "\n",
      "Livrables proof/ :\n",
      "  data_card.json\n",
      "  plan_after.txt\n",
      "  plan_before.txt\n",
      "  plan_etl.txt\n",
      "  plan_index_build.txt\n",
      "  plan_llm_curation.txt\n",
      "  plan_query.txt\n",
      "  plan_streaming.txt\n",
      "  query_progress_after.json\n",
      "  query_progress_before.json\n"
     ]
    }
   ],
   "source": [
    "# ── Résumé des SLOs ───────────────────────────────────────────────────────────\n",
    "print(\"=\" * 65)\n",
    "print(\"RÉSUMÉ DES SLOs — DE2 Final Project Track C\")\n",
    "print(\"=\" * 65)\n",
    "\n",
    "slos_check = [\n",
    "    (\"Parquet ≤ 60% CSV (text)\",\n",
    "     ratio,\n",
    "     CFG[\"slos\"][\"parquet_ratio_max\"],\n",
    "     \"≤\"),\n",
    "    (\"Silhouette ≥ 0.25 (clustering)\",\n",
    "     best_sil,\n",
    "     CFG[\"slos\"][\"silhouette_min\"],\n",
    "     \"≥\"),\n",
    "    (\"LLM qualité ≥ 80%\",\n",
    "     quality_pct / 100,\n",
    "     CFG[\"slos\"][\"llm_quality_pct\"],\n",
    "     \"≥\"),\n",
    "]\n",
    "\n",
    "for label, val, threshold, op in slos_check:\n",
    "    ok = (val <= threshold) if op == \"≤\" else (val >= threshold)\n",
    "    status = \"✅ OK\" if ok else \"❌ KO\"\n",
    "    print(f\"  {label:<45} {val:>8.4f}  (seuil {op}{threshold})  {status}\")\n",
    "\n",
    "print(\"=\" * 65)\n",
    "print(\"\\nLivrables proof/ :\")\n",
    "for f in sorted(pathlib.Path(CFG[\"paths\"][\"proof\"]).iterdir()):\n",
    "    print(f\"  {f.name}\")\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "2b8dc26b",
   "metadata": {},
   "source": [
    "## 9. Cleanup"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "id": "5cca289a",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "SparkSession arrêtée.\n",
      "DE2 Final Project — Track C — Pipeline complet ✅\n"
     ]
    }
   ],
   "source": [
    "spark.stop()\n",
    "print(\"SparkSession arrêtée.\")\n",
    "print(\"DE2 Final Project — Track C — Pipeline complet ✅\")\n"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python [conda env:base] *",
   "language": "python",
   "name": "conda-base-py"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.13.9"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
