{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "3aea5b88",
   "metadata": {},
   "source": [
    "# DE2 — Lab 1: Structured Streaming Pipeline (10%)\n",
    "> Author : Badr TAJINI - Data Engineering II (Data-Intensive Workloads) - ESIEE 2025-2026\n",
    "\n",
    "**Track:** C — Micromobility\n",
    "\n",
    "**Goal:** Build a Structured Streaming pipeline with windowed aggregation, watermarks, and a Parquet sink. Monitor via `query.lastProgress` and the Streaming UI. Deliver a before/after optimization report."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "intro-explain",
   "metadata": {},
   "source": [
    "## Contexte Track C — Micromobility\n",
    "On simule un flux de trajets de trottinettes/vélos en libre-service.\n",
    "Chaque événement = un trajet avec un `starttime` (timestamp de départ).\n",
    "On va agréger par fenêtres de **15 minutes** pour compter les trajets et calculer la durée moyenne."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "bc08b3e6-f24b-4132-b4cb-46eb63692135",
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "os.environ[\"SPARK_LOCAL_IP\"] = \"0.0.0.0\"\n",
    "os.environ[\"SPARK_PUBLIC_DNS\"] = \"172.19.103.249\"\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "0af04649",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "WARNING: Using incubator modules: jdk.incubator.vector\n",
      "Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties\n",
      "Setting default log level to \"WARN\".\n",
      "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n",
      "26/05/10 17:36:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Spark version: 4.0.0\n",
      "Spark UI: http://172.19.103.249:4040\n",
      "Spark UI (WSL/Windows browser): http://localhost:4040\n",
      "→ Streaming UI: http://localhost:4040/StreamingQuery/\n",
      "\n",
      " SparkSession créée !\n"
     ]
    }
   ],
   "source": [
    "# Imports & SparkSession\n",
    "\n",
    "# On importe tout ce dont on a besoin :\n",
    "# - SparkSession : le point d'entrée Spark\n",
    "# - functions as F : fonctions SQL de Spark (window, avg, count...)\n",
    "# - StructType, StructField, etc. : pour définir le schéma des données\n",
    "\n",
    "import os\n",
    "import time\n",
    "import json\n",
    "import shutil\n",
    "import pathlib\n",
    "from urllib.parse import urlparse\n",
    "\n",
    "from pyspark.sql import SparkSession, functions as F\n",
    "from pyspark.sql.types import (\n",
    "    StructType, StructField,\n",
    "    StringType, DoubleType, IntegerType, TimestampType\n",
    ")\n",
    "\n",
    "# Résout les problèmes de connexion réseau dans WSL\n",
    "DE2_SPARK_DRIVER_HOST  = os.environ.get(\"DE2_SPARK_DRIVER_HOST\",  \"127.0.0.1\")\n",
    "DE2_SPARK_BIND_ADDRESS = os.environ.get(\"DE2_SPARK_BIND_ADDRESS\", \"0.0.0.0\")\n",
    "os.environ.setdefault(\"SPARK_LOCAL_IP\", DE2_SPARK_DRIVER_HOST)\n",
    "\n",
    "\n",
    "def show_spark_ui(spark_session):\n",
    "    \"\"\"Affiche l'URL du Spark UI dans le terminal.\"\"\"\n",
    "    ui_url = spark_session.sparkContext.uiWebUrl\n",
    "    print(\"Spark version:\", spark_session.version)\n",
    "    if ui_url:\n",
    "        ui_port = urlparse(ui_url).port or 4040\n",
    "        print(\"Spark UI:\", ui_url)\n",
    "        print(\"Spark UI (WSL/Windows browser):\", f\"http://localhost:{ui_port}\")\n",
    "        print(f\"→ Streaming UI: http://localhost:{ui_port}/StreamingQuery/\")\n",
    "    else:\n",
    "        print(\"Spark UI: not available\")\n",
    "\n",
    "\n",
    "# Création de la SparkSession en mode local (utilise tous les cœurs CPU)\n",
    "spark = (\n",
    "    SparkSession.builder\n",
    "    .appName(\"DE2-Lab1-Streaming-TrackC\")\n",
    "    .master(\"local[*]\")\n",
    "    .config(\"spark.driver.host\",       DE2_SPARK_DRIVER_HOST)\n",
    "    .config(\"spark.driver.bindAddress\", DE2_SPARK_BIND_ADDRESS)\n",
    "    .config(\"spark.ui.bindAddress\",     DE2_SPARK_BIND_ADDRESS)\n",
    "    # Réduit les partitions shuffle : en local avec peu de données, 4 suffit\n",
    "    # (par défaut Spark crée 200 partitions → trop lent pour du petit volume)\n",
    "    .config(\"spark.sql.shuffle.partitions\", \"4\")\n",
    "    .config(\"spark.ui.enabled\", \"true\") \\\n",
    "    .config(\"spark.ui.port\", \"4040\") \\\n",
    "    .config(\"spark.driver.host\", \"127.0.0.1\") \\\n",
    "    .config(\"spark.driver.bindAddress\", \"127.0.0.1\") \\\n",
    "    .getOrCreate()\n",
    ")\n",
    "\n",
    "show_spark_ui(spark)\n",
    "print(\"\\n SparkSession créée !\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "5604cd94",
   "metadata": {},
   "source": [
    "## 1. Définir le Schéma & Préparer le Répertoire Landing\n",
    "\n",
    "**Pourquoi un schéma explicite ?**\n",
    "En streaming, Spark ne peut pas *inférer* le schéma (il faudrait lire tout le fichier d'abord). On le déclare donc à la main.\n",
    "\n",
    "**Le répertoire landing** (= zone d'atterrissage) : c'est là qu'on va \"déposer\" les fichiers JSON un par un pour simuler un vrai flux de données qui arrive en continu."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "id": "4db8bcd8",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Schéma défini : 6 colonnes\n",
      "   - starttime       (TimestampType())\n",
      "   - trip_id         (StringType())\n",
      "   - station_id      (StringType())\n",
      "   - user_type       (StringType())\n",
      "   - duration_sec    (DoubleType())\n",
      "   - bike_id         (StringType())\n",
      " 10 fichiers JSON générés dans data/staging/lab1/\n",
      "    200 trajets simulés au total (20 par fichier)\n"
     ]
    }
   ],
   "source": [
    "# ============================================================\n",
    "# Schéma Track C (Micromobility) + données simulées\n",
    "# ============================================================\n",
    "# Colonnes d'un trajet de trottinette/vélo :\n",
    "#   starttime      → timestamp du départ (notre colonne d'event-time)\n",
    "#   trip_id        → identifiant unique du trajet\n",
    "#   station_id     → identifiant de la station de départ\n",
    "#   user_type      → type d'utilisateur (Subscriber ou Customer)\n",
    "#   duration_sec   → durée du trajet en secondes\n",
    "#   bike_id        → identifiant du véhicule\n",
    "\n",
    "\n",
    "schema = StructType([\n",
    "    StructField(\"starttime\",    TimestampType(), True),  # Event-time principal\n",
    "    StructField(\"trip_id\",      StringType(),    True),\n",
    "    StructField(\"station_id\",   StringType(),    True),\n",
    "    StructField(\"user_type\",    StringType(),    True),  # 'Subscriber' ou 'Customer'\n",
    "    StructField(\"duration_sec\", DoubleType(),    True),  # durée en secondes\n",
    "    StructField(\"bike_id\",      StringType(),    True),\n",
    "])\n",
    "\n",
    "# Création des répertoires\n",
    "LANDING_DIR  = \"data/landing/lab1/\"         # fichiers JSON source (streaming input)\n",
    "STAGING_DIR  = \"data/staging/lab1/\"         # fichiers en attente (drop un par un)\n",
    "pathlib.Path(LANDING_DIR).mkdir(parents=True, exist_ok=True)\n",
    "pathlib.Path(STAGING_DIR).mkdir(parents=True, exist_ok=True)\n",
    "\n",
    "print(f\"Schéma défini : {len(schema.fields)} colonnes\")\n",
    "for f in schema.fields:\n",
    "    print(f\"   - {f.name:15s} ({f.dataType})\")\n",
    "\n",
    "# ------------------------------------------------------------------\n",
    "# Génération de données simulées (10 fichiers JSON)\n",
    "# On crée des trajets fictifs répartis sur 2h30 (de 08h00 à 10h30)\n",
    "# ------------------------------------------------------------------\n",
    "import random\n",
    "from datetime import datetime, timedelta\n",
    "\n",
    "random.seed(42)\n",
    "stations = [\"STN_001\", \"STN_002\", \"STN_003\", \"STN_004\", \"STN_005\"]\n",
    "user_types = [\"Subscriber\", \"Customer\"]\n",
    "BASE_TIME = datetime(2024, 3, 15, 8, 0, 0)   # 08:00:00\n",
    "\n",
    "for batch_idx in range(10):  # 10 fichiers\n",
    "    trips = []\n",
    "    for trip_num in range(20):  # 20 trajets par fichier\n",
    "        # Les trajets du batch i couvrent une planche de 15 min\n",
    "        offset_minutes = batch_idx * 15 + random.uniform(0, 14)\n",
    "        event_time = BASE_TIME + timedelta(minutes=offset_minutes)\n",
    "        \n",
    "        # Quelques événements \"en retard\" pour tester le watermark\n",
    "        if random.random() < 0.05:   # 5% de chance d'être en retard\n",
    "            event_time -= timedelta(minutes=random.uniform(2, 8))\n",
    "        \n",
    "        trips.append({\n",
    "            \"starttime\":    event_time.strftime(\"%Y-%m-%dT%H:%M:%S\"),\n",
    "            \"trip_id\":      f\"TRIP_{batch_idx:02d}_{trip_num:03d}\",\n",
    "            \"station_id\":   random.choice(stations),\n",
    "            \"user_type\":    random.choice(user_types),\n",
    "            \"duration_sec\": round(random.uniform(60, 3600), 1),\n",
    "            \"bike_id\":      f\"BIKE_{random.randint(100, 199):03d}\",\n",
    "        })\n",
    "    \n",
    "    # Sauvegarde dans le staging (on les copiera dans landing un par un)\n",
    "    filepath = f\"{STAGING_DIR}batch_{batch_idx:02d}.json\"\n",
    "    with open(filepath, \"w\") as f:\n",
    "        json.dump(trips, f, indent=2)\n",
    "\n",
    "print(f\" 10 fichiers JSON générés dans {STAGING_DIR}\")\n",
    "print(\"    200 trajets simulés au total (20 par fichier)\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "2f22bb50",
   "metadata": {},
   "source": [
    "## 2. Créer la Source de Streaming\n",
    "\n",
    "**`spark.readStream`** ouvre un flux continu sur un répertoire.\n",
    "Chaque fois qu'un nouveau fichier apparaît dans `LANDING_DIR`, Spark le lit automatiquement.\n",
    "\n",
    "> `maxFilesPerTrigger=1` → Spark traite **1 seul fichier par micro-batch** (= par déclenchement). Ça simule des données qui arrivent en continu."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "235f5988",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Is streaming : True\n",
      "\n",
      "Schéma du flux :\n",
      "root\n",
      " |-- starttime: timestamp (nullable = true)\n",
      " |-- trip_id: string (nullable = true)\n",
      " |-- station_id: string (nullable = true)\n",
      " |-- user_type: string (nullable = true)\n",
      " |-- duration_sec: double (nullable = true)\n",
      " |-- bike_id: string (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# ============================================================\n",
    "# CELLULE 2 — Source de streaming (readStream)\n",
    "# ============================================================\n",
    "# spark.readStream = on ouvre un flux (pas une lecture batch statique)\n",
    "# .schema(schema)  = schéma explicite OBLIGATOIRE en streaming\n",
    "# .option(\"multiline\", \"true\") = nos JSON sont des tableaux sur plusieurs lignes\n",
    "# .option(\"maxFilesPerTrigger\", 1) = 1 fichier traité par micro-batch\n",
    "# ============================================================\n",
    "\n",
    "df_stream = (\n",
    "    spark.readStream\n",
    "    .schema(schema)\n",
    "    .option(\"multiline\", \"true\")       # chaque fichier est un tableau JSON []\n",
    "    .option(\"maxFilesPerTrigger\", 1)   # simule l'arrivée progressive des fichiers\n",
    "    .json(LANDING_DIR)\n",
    ")\n",
    "\n",
    "print(\"Is streaming :\", df_stream.isStreaming)  # doit afficher True\n",
    "print(\"\\nSchéma du flux :\")\n",
    "df_stream.printSchema()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "2f81ce9a",
   "metadata": {},
   "source": [
    "## 3. Watermark + Agrégation par Fenêtres (Window)\n",
    "\n",
    "**Watermark** = tolérance aux données en retard.\n",
    "> `withWatermark(\"starttime\", \"10 minutes\")` signifie : \"si un événement arrive avec plus de 10 minutes de retard par rapport au dernier événement vu, on l'ignore\".\n",
    "\n",
    "**Window** = regroupement temporel.\n",
    "> `window(\"starttime\", \"15 minutes\")` crée des intervalles : [08:00–08:15], [08:15–08:30], etc.\n",
    "Chaque trajet est assigné à la fenêtre qui contient son `starttime`.\n",
    "\n",
    "On calcule ensuite pour chaque fenêtre × station :\n",
    "- Le nombre de trajets\n",
    "- La durée moyenne\n",
    "- La proportion d'abonnés"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "ed18b976",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Schéma de la sortie agrégée :\n",
      "root\n",
      " |-- window: struct (nullable = false)\n",
      " |    |-- start: timestamp (nullable = true)\n",
      " |    |-- end: timestamp (nullable = true)\n",
      " |-- station_id: string (nullable = true)\n",
      " |-- user_type: string (nullable = true)\n",
      " |-- trip_count: long (nullable = false)\n",
      " |-- avg_duration_sec: double (nullable = true)\n",
      " |-- min_duration_sec: double (nullable = true)\n",
      " |-- max_duration_sec: double (nullable = true)\n",
      " |-- unique_bikes: long (nullable = false)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# ============================================================\n",
    "# CELLULE 3 — Watermark + Fenêtre glissante 15 min\n",
    "# ============================================================\n",
    "\n",
    "windowed = (\n",
    "    df_stream\n",
    "    # ÉTAPE 1 : Watermark — tolère jusqu'à 10 min de retard\n",
    "    # Sans ça, Spark garde l'état indéfiniment en mémoire → fuite mémoire !\n",
    "    .withWatermark(\"starttime\", \"10 minutes\")\n",
    "\n",
    "    # ÉTAPE 2 : Groupement par fenêtre temporelle (15 min) + station\n",
    "    .groupBy(\n",
    "        F.window(\"starttime\", \"15 minutes\"),   # fenêtre TUMBLING de 15 min\n",
    "        F.col(\"station_id\"),                   # clé métier Track C\n",
    "        F.col(\"user_type\"),\n",
    "    )\n",
    "\n",
    "    # ÉTAPE 3 : Agrégations\n",
    "    .agg(\n",
    "        F.count(\"*\").alias(\"trip_count\"),                    # nombre de trajets\n",
    "        F.avg(\"duration_sec\").alias(\"avg_duration_sec\"),     # durée moyenne\n",
    "        F.min(\"duration_sec\").alias(\"min_duration_sec\"),\n",
    "        F.max(\"duration_sec\").alias(\"max_duration_sec\"),\n",
    "        F.approx_count_distinct(\"bike_id\").alias(\"unique_bikes\"),    # véhicules distincts\n",
    "    )\n",
    ")\n",
    "\n",
    "print(\"Schéma de la sortie agrégée :\")\n",
    "windowed.printSchema()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "ea11d696",
   "metadata": {},
   "source": [
    "## 4. Écriture vers le Sink Parquet\n",
    "\n",
    "**Parquet** = format colonne compressé, idéal pour l'analytique.\n",
    "\n",
    "Points importants :\n",
    "- `outputMode(\"append\")` : on n'écrit que les nouvelles fenêtres finalisées (grâce au watermark). **Obligatoire avec watermark + Parquet**.\n",
    "- `checkpointLocation` : Spark y sauvegarde son état pour reprendre après une panne (**exactly-once semantics** = chaque donnée est traitée exactement une fois).\n",
    "- `trigger(processingTime=\"10 seconds\")` : un micro-batch toutes les 10 secondes."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "094e01c4-9073-46eb-9f52-3b3bdb30f27e",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "bd2e660e-22ae-4ebb-b1e2-49445ac75272",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[]"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "spark.streams.active\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "id": "ef90096b",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "26/05/10 17:36:41 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Query démarrée !\n",
      "  Query name : None\n",
      "  Query id   : b0c3540b-637e-434d-b1b3-d5ca7c4ea8cf\n",
      "  Status     : {'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}\n",
      "\n",
      "Dépôt de 10 fichiers dans data/landing/lab1/...\n",
      "  [01/10] Déposé : batch_00.json\n",
      "  [02/10] Déposé : batch_01.json\n",
      "  [03/10] Déposé : batch_02.json\n",
      "  [04/10] Déposé : batch_03.json\n",
      "  [05/10] Déposé : batch_04.json\n",
      "  [06/10] Déposé : batch_05.json\n",
      "  [07/10] Déposé : batch_06.json\n",
      "  [08/10] Déposé : batch_07.json\n",
      "  [09/10] Déposé : batch_08.json\n",
      "  [10/10] Déposé : batch_09.json\n",
      "\n",
      "Attente finale (20s) pour traiter les derniers fichiers...\n",
      "\n",
      " Simulation terminée. Query encore active : True\n"
     ]
    }
   ],
   "source": [
    "# ============================================================\n",
    "# CELLULE 4 — Sink Parquet + checkpoint + simulation de fichiers\n",
    "# ============================================================\n",
    "\n",
    "# Création des répertoires de sortie\n",
    "SINK_DIR        = \"outputs/lab1/stream_sink\"\n",
    "CHECKPOINT_DIR  = \"outputs/lab1/checkpoint\"\n",
    "pathlib.Path(SINK_DIR).mkdir(parents=True, exist_ok=True)\n",
    "pathlib.Path(CHECKPOINT_DIR).mkdir(parents=True, exist_ok=True)\n",
    "pathlib.Path(\"proof\").mkdir(exist_ok=True)\n",
    "\n",
    "# -- BASELINE : trigger 10 secondes, watermark 10 minutes --\n",
    "query = (\n",
    "    windowed.writeStream\n",
    "    .format(\"parquet\")\n",
    "    .outputMode(\"append\")                          # append REQUIS avec watermark\n",
    "    .option(\"path\", SINK_DIR)\n",
    "    .option(\"checkpointLocation\", CHECKPOINT_DIR)\n",
    "    .trigger(processingTime=\"10 seconds\")           # micro-batch toutes les 10s\n",
    "    .start()\n",
    ")\n",
    "\n",
    "print(\"Query démarrée !\")\n",
    "print(\"  Query name :\", query.name)\n",
    "print(\"  Query id   :\", query.id)\n",
    "print(\"  Status     :\", query.status)\n",
    "\n",
    "# -----------------------------------------------\n",
    "# Simulation : on dépose les 10 fichiers un par un\n",
    "# 1 fichier toutes les 8 secondes → arrive pendant que la query tourne\n",
    "# -----------------------------------------------\n",
    "staging_files = sorted(pathlib.Path(STAGING_DIR).glob(\"*.json\"))\n",
    "print(f\"\\nDépôt de {len(staging_files)} fichiers dans {LANDING_DIR}...\")\n",
    "\n",
    "for i, src in enumerate(staging_files):\n",
    "    dst = pathlib.Path(LANDING_DIR) / src.name\n",
    "    shutil.copy(str(src), str(dst))\n",
    "    print(f\"  [{i+1:02d}/10] Déposé : {src.name}\")\n",
    "    time.sleep(8)   # on attend 8s entre chaque fichier\n",
    "\n",
    "# On laisse le stream traiter les derniers fichiers\n",
    "print(\"\\nAttente finale (20s) pour traiter les derniers fichiers...\")\n",
    "time.sleep(20)\n",
    "\n",
    "print(\"\\n Simulation terminée. Query encore active :\", query.isActive)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "id": "db1b43ba-0c17-4bae-bfc6-599bec5940e7",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[01/10] Déposé batch_00.json\n",
      "[02/10] Déposé batch_01.json\n",
      "[03/10] Déposé batch_02.json\n",
      "[04/10] Déposé batch_03.json\n",
      "[05/10] Déposé batch_04.json\n",
      "[06/10] Déposé batch_05.json\n",
      "[07/10] Déposé batch_06.json\n",
      "[08/10] Déposé batch_07.json\n",
      "[09/10] Déposé batch_08.json\n",
      "[10/10] Déposé batch_09.json\n"
     ]
    }
   ],
   "source": [
    "import shutil, pathlib, time\n",
    "\n",
    "staging_files = sorted(pathlib.Path(\"data/staging/lab1/\").glob(\"*.json\"))\n",
    "for i, src in enumerate(staging_files):\n",
    "    dst = pathlib.Path(\"data/landing/lab1\") / src.name\n",
    "    shutil.copy(str(src), str(dst))\n",
    "    print(f\"[{i+1:02d}/10] Déposé {src.name}\")\n",
    "    time.sleep(12)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "id": "8687e099-c0a3-45cb-b42c-a8446af8e445",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Fichiers dans landing : 10\n"
     ]
    }
   ],
   "source": [
    "import pathlib\n",
    "landing = list(pathlib.Path(\"data/landing/lab1/\").glob(\"*.json\"))\n",
    "print(\"Fichiers dans landing :\", len(landing))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "id": "a8ee9af5-82f5-4ce5-bff2-42358b6b814b",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Reset OK\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "26/05/10 17:49:05 WARN DAGScheduler: Failed to cancel job group 6778f296-aa7d-4673-a15d-9c955cff7622. Cannot find active jobs for it.\n",
      "26/05/10 17:49:05 WARN DAGScheduler: Failed to cancel job group 6778f296-aa7d-4673-a15d-9c955cff7622. Cannot find active jobs for it.\n"
     ]
    }
   ],
   "source": [
    "query.stop()\n",
    "\n",
    "import shutil, pathlib\n",
    "shutil.rmtree(\"data/landing/lab1\", ignore_errors=True)\n",
    "shutil.rmtree(\"outputs/lab1/stream_sink\", ignore_errors=True)\n",
    "shutil.rmtree(\"outputs/lab1/checkpoint\", ignore_errors=True)\n",
    "pathlib.Path(\"data/landing/lab1\").mkdir(parents=True, exist_ok=True)\n",
    "pathlib.Path(\"outputs/lab1/stream_sink\").mkdir(parents=True, exist_ok=True)\n",
    "pathlib.Path(\"outputs/lab1/checkpoint\").mkdir(parents=True, exist_ok=True)\n",
    "print(\"Reset OK\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "id": "909c12e4-3720-4fa0-abe6-fe81f081e313",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "26/05/10 17:49:25 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[01/10] Déposé batch_00.json — RAFRAICHIS EDGE MAINTENANT 📸\n",
      "[02/10] Déposé batch_01.json — RAFRAICHIS EDGE MAINTENANT 📸\n",
      "[03/10] Déposé batch_02.json — RAFRAICHIS EDGE MAINTENANT 📸\n",
      "[04/10] Déposé batch_03.json — RAFRAICHIS EDGE MAINTENANT 📸\n",
      "[05/10] Déposé batch_04.json — RAFRAICHIS EDGE MAINTENANT 📸\n",
      "[06/10] Déposé batch_05.json — RAFRAICHIS EDGE MAINTENANT 📸\n",
      "[07/10] Déposé batch_06.json — RAFRAICHIS EDGE MAINTENANT 📸\n",
      "[08/10] Déposé batch_07.json — RAFRAICHIS EDGE MAINTENANT 📸\n",
      "[09/10] Déposé batch_08.json — RAFRAICHIS EDGE MAINTENANT 📸\n",
      "[10/10] Déposé batch_09.json — RAFRAICHIS EDGE MAINTENANT 📸\n"
     ]
    }
   ],
   "source": [
    "query = (\n",
    "    windowed.writeStream\n",
    "    .format(\"parquet\")\n",
    "    .outputMode(\"append\")\n",
    "    .option(\"path\", \"outputs/lab1/stream_sink\")\n",
    "    .option(\"checkpointLocation\", \"outputs/lab1/checkpoint\")\n",
    "    .trigger(processingTime=\"10 seconds\")\n",
    "    .start()\n",
    ")\n",
    "\n",
    "staging_files = sorted(pathlib.Path(\"data/staging/lab1/\").glob(\"*.json\"))\n",
    "for i, src in enumerate(staging_files):\n",
    "    dst = pathlib.Path(\"data/landing/lab1\") / src.name\n",
    "    shutil.copy(str(src), str(dst))\n",
    "    print(f\"[{i+1:02d}/10] Déposé {src.name} — RAFRAICHIS EDGE MAINTENANT 📸\")\n",
    "    time.sleep(30)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "3cb30a5b",
   "metadata": {},
   "source": [
    "## 5. Monitoring — `query.lastProgress` & Streaming UI\n",
    "\n",
    "`query.lastProgress` retourne un dictionnaire JSON avec toutes les métriques du dernier micro-batch :\n",
    "- `inputRowsPerSecond` : débit d'entrée\n",
    "- `processedRowsPerSecond` : débit de traitement\n",
    "- `durationMs` : temps passé dans chaque phase\n",
    "- `stateOperators` : état gardé en mémoire (pour les agrégations)\n",
    "\n",
    "**Dans le Spark UI** → onglet \"Streaming Queries\" → tu verras Input Rate, Processing Rate, Batch Duration sous forme de graphes."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "id": "d96b8762",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "=== Métriques du dernier micro-batch ===\n",
      "{\n",
      "  \"id\": \"a7224d5c-a0f8-4f7b-a335-f21938182e88\",\n",
      "  \"runId\": \"2b29dee5-670d-4fb8-b983-aa7a6ac5e233\",\n",
      "  \"name\": null,\n",
      "  \"timestamp\": \"2026-05-10T15:59:20.003Z\",\n",
      "  \"batchId\": 20,\n",
      "  \"batchDuration\": 6,\n",
      "  \"durationMs\": {\n",
      "    \"latestOffset\": 5,\n",
      "    \"triggerExecution\": 6\n",
      "  },\n",
      "  \"eventTime\": {\n",
      "    \"watermark\": \"2024-03-15T09:18:49.000Z\"\n",
      "  },\n",
      "  \"stateOperators\": [\n",
      "    {\n",
      "      \"operatorName\": \"stateStoreSave\",\n",
      "      \"numRowsTotal\": 9,\n",
      "      \"numRowsUpdated\": 0,\n",
      "      \"numRowsRemoved\": 8,\n",
      "      \"allUpdatesTimeMs\": 0,\n",
      "      \"allRemovalsTimeMs\": 187,\n",
      "      \"commitTimeMs\": 213,\n",
      "      \"memoryUsedBytes\": 14032,\n",
      "      \"numRowsDroppedByWatermark\": 0,\n",
      "      \"numShufflePartitions\": 4,\n",
      "      \"numStateStoreInstances\": 4,\n",
      "      \"customMetrics\": {\n",
      "        \"loadedMapCacheHitCount\": 152,\n",
      "        \"loadedMapCacheMissCount\": 0,\n",
      "        \"stateOnCurrentVersionSizeBytes\": 6864\n",
      "      }\n",
      "    }\n",
      "  ],\n",
      "  \"sources\": [\n",
      "    {\n",
      "      \"description\": \"FileStreamSource[file:/home/saraa/DE2/LAB1/PRATICE/data/landing/lab1]\",\n",
      "      \"startOffset\": \"{'logOffset': 9}\",\n",
      "      \"endOffset\": \"{'logOffset': 9}\",\n",
      "      \"latestOffset\": \"None\",\n",
      "      \"numInputRows\": 0,\n",
      "      \"inputRowsPerSecond\": 0.0,\n",
      "      \"processedRowsPerSecond\": 0.0,\n",
      "      \"metrics\": {}\n",
      "    }\n",
      "  ],\n",
      "  \"sink\": {\n",
      "    \"description\": \"FileSink[file:/home/saraa/DE2/LAB1/PRATICE/outputs/lab1/stream_sink]\",\n",
      "    \"numOutputRows\": -1,\n",
      "    \"metrics\": {}\n",
      "  },\n",
      "  \"numInputRows\": 0,\n",
      "  \"inputRowsPerSecond\": 0.0,\n",
      "  \"processedRowsPerSecond\": 0.0,\n",
      "  \"observedMetrics\": {}\n",
      "}\n",
      "\n",
      "=== Métriques clés ===\n",
      "  inputRowsPerSecond      : 0.0\n",
      "  processedRowsPerSecond  : 0.0\n",
      "  numInputRows            : 0\n",
      "  durationMs.triggerExecution : 6 ms\n",
      "  durationMs.getBatch         : N/A ms\n",
      "  stateOperators.numRowsTotal   : 9\n",
      "  stateOperators.numRowsUpdated : 0\n",
      "\n",
      "=== Plan d'exécution de la query streaming ===\n",
      "== Physical Plan ==\n",
      "HashAggregate (12)\n",
      "+- StateStoreSave (11)\n",
      "   +- HashAggregate (10)\n",
      "      +- StateStoreRestore (9)\n",
      "         +- HashAggregate (8)\n",
      "            +- Exchange (7)\n",
      "               +- HashAggregate (6)\n",
      "                  +- * Project (5)\n",
      "                     +- * Filter (4)\n",
      "                        +- EventTimeWatermark (3)\n",
      "                           +- * Project (2)\n",
      "                              +- StreamingRelation (1)\n",
      "\n",
      "\n",
      "(1) StreamingRelation\n",
      "Output [6]: [starttime#0, trip_id#1, station_id#2, user_type#3, duration_sec#4, bike_id#5]\n",
      "Arguments: FileSource[data/landing/lab1/], [starttime#0, trip_id#1, station_id#2, user_type#3, duration_sec#4, bike_id#5]\n",
      "\n",
      "(2) Project [codegen id : 1]\n",
      "Output [5]: [starttime#0, station_id#2, user_type#3, duration_sec#4, bike_id#5]\n",
      "Input [6]: [starttime#0, trip_id#1, station_id#2, user_type#3, duration_sec#4, bike_id#5]\n",
      "\n",
      "(3) EventTimeWatermark\n",
      "Input [5]: [starttime#0, station_id#2, user_type#3, duration_sec#4, bike_id#5]\n",
      "Arguments: 3de7f17f-8e77-44f7-a109-15244d1bf628, starttime#0: timestamp, 10 minutes\n",
      "\n",
      "(4) Filter [codegen id : 2]\n",
      "Input [5]: [starttime#0-T600000ms, station_id#2, user_type#3, duration_sec#4, bike_id#5]\n",
      "Condition : isnotnull(starttime#0-T600000ms)\n",
      "\n",
      "(5) Project [codegen id : 2]\n",
      "Output [5]: [named_struct(start, knownnullable(precisetimestampconversion(((precisetimestampconversion(starttime#0-T600000ms, TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(starttime#0-T600000ms, TimestampType, LongType) - 0) % 900000000) < 0) THEN (((precisetimestampconversion(starttime#0-T600000ms, TimestampType, LongType) - 0) % 900000000) + 900000000) ELSE ((precisetimestampconversion(starttime#0-T600000ms, TimestampType, LongType) - 0) % 900000000) END) - 0), LongType, TimestampType)), end, knownnullable(precisetimestampconversion((((precisetimestampconversion(starttime#0-T600000ms, TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(starttime#0-T600000ms, TimestampType, LongType) - 0) % 900000000) < 0) THEN (((precisetimestampconversion(starttime#0-T600000ms, TimestampType, LongType) - 0) % 900000000) + 900000000) ELSE ((precisetimestampconversion(starttime#0-T600000ms, TimestampType, LongType) - 0) % 900000000) END) - 0) + 900000000), LongType, TimestampType))) AS window#72637-T600000ms, station_id#2, user_type#3, duration_sec#4, bike_id#5]\n",
      "Input [5]: [starttime#0-T600000ms, station_id#2, user_type#3, duration_sec#4, bike_id#5]\n",
      "\n",
      "(6) HashAggregate\n",
      "Input [5]: [window#72637-T600000ms, station_id#2, user_type#3, duration_sec#4, bike_id#5]\n",
      "Keys [3]: [window#72637-T600000ms, station_id#2, user_type#3]\n",
      "Functions [5]: [partial_count(1), partial_avg(duration_sec#4), partial_min(duration_sec#4), partial_max(duration_sec#4), partial_approx_count_distinct(bike_id#5, 0.05, 0, 0)]\n",
      "Aggregate Attributes [5]: [count(1)#72528L, avg(duration_sec#4)#72529, min(duration_sec#4)#72530, max(duration_sec#4)#72531, approx_count_distinct(bike_id#5, 0.05, 0, 0)#72636L]\n",
      "Results [60]: [window#72637-T600000ms, station_id#2, user_type#3, count#72743L, sum#72746, count#72747L, min#72749, max#72751, MS[0]#72584L, MS[1]#72585L, MS[2]#72586L, MS[3]#72587L, MS[4]#72588L, MS[5]#72589L, MS[6]#72590L, MS[7]#72591L, MS[8]#72592L, MS[9]#72593L, MS[10]#72594L, MS[11]#72595L, MS[12]#72596L, MS[13]#72597L, MS[14]#72598L, MS[15]#72599L, MS[16]#72600L, MS[17]#72601L, MS[18]#72602L, MS[19]#72603L, MS[20]#72604L, MS[21]#72605L, MS[22]#72606L, MS[23]#72607L, MS[24]#72608L, MS[25]#72609L, MS[26]#72610L, MS[27]#72611L, MS[28]#72612L, MS[29]#72613L, MS[30]#72614L, MS[31]#72615L, MS[32]#72616L, MS[33]#72617L, MS[34]#72618L, MS[35]#72619L, MS[36]#72620L, MS[37]#72621L, MS[38]#72622L, MS[39]#72623L, MS[40]#72624L, MS[41]#72625L, MS[42]#72626L, MS[43]#72627L, MS[44]#72628L, MS[45]#72629L, MS[46]#72630L, MS[47]#72631L, MS[48]#72632L, MS[49]#72633L, MS[50]#72634L, MS[51]#72635L]\n",
      "\n",
      "(7) Exchange\n",
      "Input [60]: [window#72637-T600000ms, station_id#2, user_type#3, count#72743L, sum#72746, count#72747L, min#72749, max#72751, MS[0]#72584L, MS[1]#72585L, MS[2]#72586L, MS[3]#72587L, MS[4]#72588L, MS[5]#72589L, MS[6]#72590L, MS[7]#72591L, MS[8]#72592L, MS[9]#72593L, MS[10]#72594L, MS[11]#72595L, MS[12]#72596L, MS[13]#72597L, MS[14]#72598L, MS[15]#72599L, MS[16]#72600L, MS[17]#72601L, MS[18]#72602L, MS[19]#72603L, MS[20]#72604L, MS[21]#72605L, MS[22]#72606L, MS[23]#72607L, MS[24]#72608L, MS[25]#72609L, MS[26]#72610L, MS[27]#72611L, MS[28]#72612L, MS[29]#72613L, MS[30]#72614L, MS[31]#72615L, MS[32]#72616L, MS[33]#72617L, MS[34]#72618L, MS[35]#72619L, MS[36]#72620L, MS[37]#72621L, MS[38]#72622L, MS[39]#72623L, MS[40]#72624L, MS[41]#72625L, MS[42]#72626L, MS[43]#72627L, MS[44]#72628L, MS[45]#72629L, MS[46]#72630L, MS[47]#72631L, MS[48]#72632L, MS[49]#72633L, MS[50]#72634L, MS[51]#72635L]\n",
      "Arguments: hashpartitioning(window#72637-T600000ms, station_id#2, user_type#3, 4), REQUIRED_BY_STATEFUL_OPERATOR, [plan_id=5715]\n",
      "\n",
      "(8) HashAggregate\n",
      "Input [60]: [window#72637-T600000ms, station_id#2, user_type#3, count#72743L, sum#72746, count#72747L, min#72749, max#72751, MS[0]#72584L, MS[1]#72585L, MS[2]#72586L, MS[3]#72587L, MS[4]#72588L, MS[5]#72589L, MS[6]#72590L, MS[7]#72591L, MS[8]#72592L, MS[9]#72593L, MS[10]#72594L, MS[11]#72595L, MS[12]#72596L, MS[13]#72597L, MS[14]#72598L, MS[15]#72599L, MS[16]#72600L, MS[17]#72601L, MS[18]#72602L, MS[19]#72603L, MS[20]#72604L, MS[21]#72605L, MS[22]#72606L, MS[23]#72607L, MS[24]#72608L, MS[25]#72609L, MS[26]#72610L, MS[27]#72611L, MS[28]#72612L, MS[29]#72613L, MS[30]#72614L, MS[31]#72615L, MS[32]#72616L, MS[33]#72617L, MS[34]#72618L, MS[35]#72619L, MS[36]#72620L, MS[37]#72621L, MS[38]#72622L, MS[39]#72623L, MS[40]#72624L, MS[41]#72625L, MS[42]#72626L, MS[43]#72627L, MS[44]#72628L, MS[45]#72629L, MS[46]#72630L, MS[47]#72631L, MS[48]#72632L, MS[49]#72633L, MS[50]#72634L, MS[51]#72635L]\n",
      "Keys [3]: [window#72637-T600000ms, station_id#2, user_type#3]\n",
      "Functions [5]: [merge_count(1), merge_avg(duration_sec#4), merge_min(duration_sec#4), merge_max(duration_sec#4), merge_approx_count_distinct(bike_id#5, 0.05, 0, 0)]\n",
      "Aggregate Attributes [5]: [count(1)#72528L, avg(duration_sec#4)#72529, min(duration_sec#4)#72530, max(duration_sec#4)#72531, approx_count_distinct(bike_id#5, 0.05, 0, 0)#72636L]\n",
      "Results [60]: [window#72637-T600000ms, station_id#2, user_type#3, count#72743L, sum#72746, count#72747L, min#72749, max#72751, MS[0]#72584L, MS[1]#72585L, MS[2]#72586L, MS[3]#72587L, MS[4]#72588L, MS[5]#72589L, MS[6]#72590L, MS[7]#72591L, MS[8]#72592L, MS[9]#72593L, MS[10]#72594L, MS[11]#72595L, MS[12]#72596L, MS[13]#72597L, MS[14]#72598L, MS[15]#72599L, MS[16]#72600L, MS[17]#72601L, MS[18]#72602L, MS[19]#72603L, MS[20]#72604L, MS[21]#72605L, MS[22]#72606L, MS[23]#72607L, MS[24]#72608L, MS[25]#72609L, MS[26]#72610L, MS[27]#72611L, MS[28]#72612L, MS[29]#72613L, MS[30]#72614L, MS[31]#72615L, MS[32]#72616L, MS[33]#72617L, MS[34]#72618L, MS[35]#72619L, MS[36]#72620L, MS[37]#72621L, MS[38]#72622L, MS[39]#72623L, MS[40]#72624L, MS[41]#72625L, MS[42]#72626L, MS[43]#72627L, MS[44]#72628L, MS[45]#72629L, MS[46]#72630L, MS[47]#72631L, MS[48]#72632L, MS[49]#72633L, MS[50]#72634L, MS[51]#72635L]\n",
      "\n",
      "(9) StateStoreRestore\n",
      "Input [60]: [window#72637-T600000ms, station_id#2, user_type#3, count#72743L, sum#72746, count#72747L, min#72749, max#72751, MS[0]#72584L, MS[1]#72585L, MS[2]#72586L, MS[3]#72587L, MS[4]#72588L, MS[5]#72589L, MS[6]#72590L, MS[7]#72591L, MS[8]#72592L, MS[9]#72593L, MS[10]#72594L, MS[11]#72595L, MS[12]#72596L, MS[13]#72597L, MS[14]#72598L, MS[15]#72599L, MS[16]#72600L, MS[17]#72601L, MS[18]#72602L, MS[19]#72603L, MS[20]#72604L, MS[21]#72605L, MS[22]#72606L, MS[23]#72607L, MS[24]#72608L, MS[25]#72609L, MS[26]#72610L, MS[27]#72611L, MS[28]#72612L, MS[29]#72613L, MS[30]#72614L, MS[31]#72615L, MS[32]#72616L, MS[33]#72617L, MS[34]#72618L, MS[35]#72619L, MS[36]#72620L, MS[37]#72621L, MS[38]#72622L, MS[39]#72623L, MS[40]#72624L, MS[41]#72625L, MS[42]#72626L, MS[43]#72627L, MS[44]#72628L, MS[45]#72629L, MS[46]#72630L, MS[47]#72631L, MS[48]#72632L, MS[49]#72633L, MS[50]#72634L, MS[51]#72635L]\n",
      "Arguments: [window#72637-T600000ms, station_id#2, user_type#3], state info [ checkpoint = <unknown>, runId = 564c2783-35d0-4652-91a4-f85e404b97ae, opId = 0, ver = 0, numPartitions = 4] stateStoreCkptIds = None, 2\n",
      "\n",
      "(10) HashAggregate\n",
      "Input [60]: [window#72637-T600000ms, station_id#2, user_type#3, count#72743L, sum#72746, count#72747L, min#72749, max#72751, MS[0]#72584L, MS[1]#72585L, MS[2]#72586L, MS[3]#72587L, MS[4]#72588L, MS[5]#72589L, MS[6]#72590L, MS[7]#72591L, MS[8]#72592L, MS[9]#72593L, MS[10]#72594L, MS[11]#72595L, MS[12]#72596L, MS[13]#72597L, MS[14]#72598L, MS[15]#72599L, MS[16]#72600L, MS[17]#72601L, MS[18]#72602L, MS[19]#72603L, MS[20]#72604L, MS[21]#72605L, MS[22]#72606L, MS[23]#72607L, MS[24]#72608L, MS[25]#72609L, MS[26]#72610L, MS[27]#72611L, MS[28]#72612L, MS[29]#72613L, MS[30]#72614L, MS[31]#72615L, MS[32]#72616L, MS[33]#72617L, MS[34]#72618L, MS[35]#72619L, MS[36]#72620L, MS[37]#72621L, MS[38]#72622L, MS[39]#72623L, MS[40]#72624L, MS[41]#72625L, MS[42]#72626L, MS[43]#72627L, MS[44]#72628L, MS[45]#72629L, MS[46]#72630L, MS[47]#72631L, MS[48]#72632L, MS[49]#72633L, MS[50]#72634L, MS[51]#72635L]\n",
      "Keys [3]: [window#72637-T600000ms, station_id#2, user_type#3]\n",
      "Functions [5]: [merge_count(1), merge_avg(duration_sec#4), merge_min(duration_sec#4), merge_max(duration_sec#4), merge_approx_count_distinct(bike_id#5, 0.05, 0, 0)]\n",
      "Aggregate Attributes [5]: [count(1)#72528L, avg(duration_sec#4)#72529, min(duration_sec#4)#72530, max(duration_sec#4)#72531, approx_count_distinct(bike_id#5, 0.05, 0, 0)#72636L]\n",
      "Results [60]: [window#72637-T600000ms, station_id#2, user_type#3, count#72743L, sum#72746, count#72747L, min#72749, max#72751, MS[0]#72584L, MS[1]#72585L, MS[2]#72586L, MS[3]#72587L, MS[4]#72588L, MS[5]#72589L, MS[6]#72590L, MS[7]#72591L, MS[8]#72592L, MS[9]#72593L, MS[10]#72594L, MS[11]#72595L, MS[12]#72596L, MS[13]#72597L, MS[14]#72598L, MS[15]#72599L, MS[16]#72600L, MS[17]#72601L, MS[18]#72602L, MS[19]#72603L, MS[20]#72604L, MS[21]#72605L, MS[22]#72606L, MS[23]#72607L, MS[24]#72608L, MS[25]#72609L, MS[26]#72610L, MS[27]#72611L, MS[28]#72612L, MS[29]#72613L, MS[30]#72614L, MS[31]#72615L, MS[32]#72616L, MS[33]#72617L, MS[34]#72618L, MS[35]#72619L, MS[36]#72620L, MS[37]#72621L, MS[38]#72622L, MS[39]#72623L, MS[40]#72624L, MS[41]#72625L, MS[42]#72626L, MS[43]#72627L, MS[44]#72628L, MS[45]#72629L, MS[46]#72630L, MS[47]#72631L, MS[48]#72632L, MS[49]#72633L, MS[50]#72634L, MS[51]#72635L]\n",
      "\n",
      "(11) StateStoreSave\n",
      "Input [60]: [window#72637-T600000ms, station_id#2, user_type#3, count#72743L, sum#72746, count#72747L, min#72749, max#72751, MS[0]#72584L, MS[1]#72585L, MS[2]#72586L, MS[3]#72587L, MS[4]#72588L, MS[5]#72589L, MS[6]#72590L, MS[7]#72591L, MS[8]#72592L, MS[9]#72593L, MS[10]#72594L, MS[11]#72595L, MS[12]#72596L, MS[13]#72597L, MS[14]#72598L, MS[15]#72599L, MS[16]#72600L, MS[17]#72601L, MS[18]#72602L, MS[19]#72603L, MS[20]#72604L, MS[21]#72605L, MS[22]#72606L, MS[23]#72607L, MS[24]#72608L, MS[25]#72609L, MS[26]#72610L, MS[27]#72611L, MS[28]#72612L, MS[29]#72613L, MS[30]#72614L, MS[31]#72615L, MS[32]#72616L, MS[33]#72617L, MS[34]#72618L, MS[35]#72619L, MS[36]#72620L, MS[37]#72621L, MS[38]#72622L, MS[39]#72623L, MS[40]#72624L, MS[41]#72625L, MS[42]#72626L, MS[43]#72627L, MS[44]#72628L, MS[45]#72629L, MS[46]#72630L, MS[47]#72631L, MS[48]#72632L, MS[49]#72633L, MS[50]#72634L, MS[51]#72635L]\n",
      "Arguments: [window#72637-T600000ms, station_id#2, user_type#3], state info [ checkpoint = <unknown>, runId = 564c2783-35d0-4652-91a4-f85e404b97ae, opId = 0, ver = 0, numPartitions = 4] stateStoreCkptIds = None, Append, -9223372036854775808, -9223372036854775808, 2\n",
      "\n",
      "(12) HashAggregate\n",
      "Input [60]: [window#72637-T600000ms, station_id#2, user_type#3, count#72743L, sum#72746, count#72747L, min#72749, max#72751, MS[0]#72584L, MS[1]#72585L, MS[2]#72586L, MS[3]#72587L, MS[4]#72588L, MS[5]#72589L, MS[6]#72590L, MS[7]#72591L, MS[8]#72592L, MS[9]#72593L, MS[10]#72594L, MS[11]#72595L, MS[12]#72596L, MS[13]#72597L, MS[14]#72598L, MS[15]#72599L, MS[16]#72600L, MS[17]#72601L, MS[18]#72602L, MS[19]#72603L, MS[20]#72604L, MS[21]#72605L, MS[22]#72606L, MS[23]#72607L, MS[24]#72608L, MS[25]#72609L, MS[26]#72610L, MS[27]#72611L, MS[28]#72612L, MS[29]#72613L, MS[30]#72614L, MS[31]#72615L, MS[32]#72616L, MS[33]#72617L, MS[34]#72618L, MS[35]#72619L, MS[36]#72620L, MS[37]#72621L, MS[38]#72622L, MS[39]#72623L, MS[40]#72624L, MS[41]#72625L, MS[42]#72626L, MS[43]#72627L, MS[44]#72628L, MS[45]#72629L, MS[46]#72630L, MS[47]#72631L, MS[48]#72632L, MS[49]#72633L, MS[50]#72634L, MS[51]#72635L]\n",
      "Keys [3]: [window#72637-T600000ms, station_id#2, user_type#3]\n",
      "Functions [5]: [count(1), avg(duration_sec#4), min(duration_sec#4), max(duration_sec#4), approx_count_distinct(bike_id#5, 0.05, 0, 0)]\n",
      "Aggregate Attributes [5]: [count(1)#72528L, avg(duration_sec#4)#72529, min(duration_sec#4)#72530, max(duration_sec#4)#72531, approx_count_distinct(bike_id#5, 0.05, 0, 0)#72636L]\n",
      "Results [8]: [window#72637-T600000ms, station_id#2, user_type#3, count(1)#72528L AS trip_count#7L, avg(duration_sec#4)#72529 AS avg_duration_sec#8, min(duration_sec#4)#72530 AS min_duration_sec#9, max(duration_sec#4)#72531 AS max_duration_sec#10, approx_count_distinct(bike_id#5, 0.05, 0, 0)#72636L AS unique_bikes#11L]\n",
      "\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# ============================================================\n",
    "# CELLULE 5 — Monitoring\n",
    "# ============================================================\n",
    "\n",
    "import time\n",
    "time.sleep(5)  # on attend un micro-batch supplémentaire\n",
    "\n",
    "progress = query.lastProgress\n",
    "\n",
    "if progress:\n",
    "    print(\"=== Métriques du dernier micro-batch ===\")\n",
    "    print(json.dumps(progress, indent=2, default=str))\n",
    "\n",
    "    # Sauvegarde comme preuve\n",
    "    with open(\"proof/query_progress.json\", \"w\") as f:\n",
    "        json.dump(progress, f, indent=2, default=str)\n",
    "\n",
    "    # Extraction des métriques clés\n",
    "    print(\"\\n=== Métriques clés ===\")\n",
    "    print(f\"  inputRowsPerSecond      : {progress.get('inputRowsPerSecond', 'N/A')}\")\n",
    "    print(f\"  processedRowsPerSecond  : {progress.get('processedRowsPerSecond', 'N/A')}\")\n",
    "    print(f\"  numInputRows            : {progress.get('numInputRows', 'N/A')}\")\n",
    "    \n",
    "    duration = progress.get('durationMs', {})\n",
    "    print(f\"  durationMs.triggerExecution : {duration.get('triggerExecution', 'N/A')} ms\")\n",
    "    print(f\"  durationMs.getBatch         : {duration.get('getBatch', 'N/A')} ms\")\n",
    "    \n",
    "    state_ops = progress.get('stateOperators', [{}])\n",
    "    if state_ops:\n",
    "        so = state_ops[0]\n",
    "        print(f\"  stateOperators.numRowsTotal   : {so.get('numRowsTotal', 'N/A')}\")\n",
    "        print(f\"  stateOperators.numRowsUpdated : {so.get('numRowsUpdated', 'N/A')}\")\n",
    "else:\n",
    "    print(\"Pas encore de micro-batch complété (lastProgress est None).\")\n",
    "    print(\"Status actuel :\", query.status)\n",
    "\n",
    "# Plan d'exécution du streaming (pour le fichier proof/plan_streaming.txt)\n",
    "print(\"\\n=== Plan d'exécution de la query streaming ===\")\n",
    "windowed.explain(\"formatted\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "id": "892c0bd5-9cba-4010-93e2-f4134996e080",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "26/05/10 18:00:55 WARN DAGScheduler: Failed to cancel job group 2b29dee5-670d-4fb8-b983-aa7a6ac5e233. Cannot find active jobs for it.\n",
      "26/05/10 18:00:55 WARN DAGScheduler: Failed to cancel job group 2b29dee5-670d-4fb8-b983-aa7a6ac5e233. Cannot find active jobs for it.\n"
     ]
    }
   ],
   "source": [
    "query.stop()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "id": "save-plan",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Plan sauvegardé dans proof/plan_streaming.txt\n"
     ]
    }
   ],
   "source": [
    "# ============================================================\n",
    "# CELLULE 5b — Sauvegarde du plan dans proof/plan_streaming.txt\n",
    "# ============================================================\n",
    "import io, sys\n",
    "\n",
    "# Capture la sortie de explain()\n",
    "old_stdout = sys.stdout\n",
    "sys.stdout = buffer = io.StringIO()\n",
    "windowed.explain(\"formatted\")\n",
    "sys.stdout = old_stdout\n",
    "plan_text = buffer.getvalue()\n",
    "\n",
    "with open(\"proof/plan_streaming.txt\", \"w\") as f:\n",
    "    f.write(plan_text)\n",
    "\n",
    "print(\"Plan sauvegardé dans proof/plan_streaming.txt\")\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "id": "cdac67f9-0a06-491c-8f02-d336efe534cd",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Run 1 stoppé\n"
     ]
    }
   ],
   "source": [
    "query.stop()\n",
    "print(\"Run 1 stoppé\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "f7242d36",
   "metadata": {},
   "source": [
    "## 6. Optimisation Avant/Après\n",
    "\n",
    "**Baseline (Run 1)** : trigger `10s`, watermark `10 min`, shuffle partitions = 4\n",
    "\n",
    "**Optimisé (Run 2)** : trigger `5s`, watermark `5 min`, shuffle partitions = 2\n",
    "\n",
    "**Ce qu'on compare :**\n",
    "- `inputRowsPerSecond` : débit d'entrée des données\n",
    "- `processedRowsPerSecond` : débit de traitement\n",
    "- `durationMs.triggerExecution` : latence par batch\n",
    "\n",
    "**Intuition :** Un trigger plus court = latence réduite (résultats disponibles plus vite) mais plus d'overhead (Spark redémarre plus souvent). Un watermark plus court = état mémoire plus petit (on jette les fenêtres plus vite) mais on perd plus d'événements tardifs."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "id": "9c7dd468",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Run 1 (baseline) : {'run_id': 'r1', 'trigger_interval': '10s', 'watermark_duration': '10min', 'inputRowsPerSecond': 0.0, 'processedRowsPerSecond': 0.0, 'numInputRows': 0, 'stateRows': 9, 'durationMs': 9, 'note': 'baseline_trigger_10s_watermark_10min', 'timestamp': '2026-05-10T16:00:40.001Z'}\n",
      "\n",
      " Query Run 1 arrêtée\n",
      "Répertoires nettoyés pour le Run 2.\n"
     ]
    }
   ],
   "source": [
    "# ============================================================\n",
    "# CELLULE 6 — Arrêt du Run 1 + Mesures + Préparation du Run 2\n",
    "# ============================================================\n",
    "\n",
    "# --- Capture métriques Run 1 (BASELINE) ---\n",
    "p1 = query.lastProgress or {}\n",
    "\n",
    "run1_metrics = {\n",
    "    \"run_id\":                    \"r1\",\n",
    "    \"trigger_interval\":          \"10s\",\n",
    "    \"watermark_duration\":        \"10min\",\n",
    "    \"inputRowsPerSecond\":        p1.get(\"inputRowsPerSecond\", 0),\n",
    "    \"processedRowsPerSecond\":    p1.get(\"processedRowsPerSecond\", 0),\n",
    "    \"numInputRows\":              p1.get(\"numInputRows\", 0),\n",
    "    \"stateRows\":                 (p1.get(\"stateOperators\") or [{}])[0].get(\"numRowsTotal\", 0),\n",
    "    \"durationMs\":                (p1.get(\"durationMs\") or {}).get(\"triggerExecution\", 0),\n",
    "    \"note\":                      \"baseline_trigger_10s_watermark_10min\",\n",
    "    \"timestamp\":                 p1.get(\"timestamp\", \"\"),\n",
    "}\n",
    "\n",
    "print(\"Run 1 (baseline) :\", run1_metrics)\n",
    "\n",
    "# Arrêt de la query du Run 1\n",
    "query.stop()\n",
    "print(\"\\n Query Run 1 arrêtée\")\n",
    "\n",
    "# Nettoyage checkpoint pour le Run 2\n",
    "shutil.rmtree(\"outputs/lab1/checkpoint\", ignore_errors=True)\n",
    "shutil.rmtree(\"outputs/lab1/stream_sink\", ignore_errors=True)\n",
    "pathlib.Path(\"outputs/lab1/stream_sink\").mkdir(parents=True, exist_ok=True)\n",
    "pathlib.Path(\"outputs/lab1/checkpoint\").mkdir(parents=True, exist_ok=True)\n",
    "\n",
    "# Reset du landing dir\n",
    "shutil.rmtree(LANDING_DIR, ignore_errors=True)\n",
    "pathlib.Path(LANDING_DIR).mkdir(parents=True, exist_ok=True)\n",
    "\n",
    "print(\"Répertoires nettoyés pour le Run 2.\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "id": "run2",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "26/05/10 18:19:17 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Run 2 (optimisé) démarré !\n",
      "  [01/10] Déposé : batch_00.json\n",
      "  [02/10] Déposé : batch_01.json\n",
      "  [03/10] Déposé : batch_02.json\n",
      "  [04/10] Déposé : batch_03.json\n",
      "  [05/10] Déposé : batch_04.json\n",
      "  [06/10] Déposé : batch_05.json\n",
      "  [07/10] Déposé : batch_06.json\n",
      "  [08/10] Déposé : batch_07.json\n",
      "  [09/10] Déposé : batch_08.json\n",
      "  [10/10] Déposé : batch_09.json\n",
      "\n",
      "Run 2 (optimisé) : {'run_id': 'r2', 'trigger_interval': '5s', 'watermark_duration': '5min', 'inputRowsPerSecond': 0.0, 'processedRowsPerSecond': 0.0, 'numInputRows': 0, 'stateRows': 9, 'durationMs': 45, 'note': 'optimized_trigger_5s_watermark_5min', 'timestamp': '2026-05-10T16:24:45.000Z'}\n",
      "Query Run 2 arrêtée\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "26/05/10 18:24:47 WARN DAGScheduler: Failed to cancel job group 0590f479-dd64-4074-8f31-5230939a6bc1. Cannot find active jobs for it.\n",
      "26/05/10 18:24:47 WARN DAGScheduler: Failed to cancel job group 0590f479-dd64-4074-8f31-5230939a6bc1. Cannot find active jobs for it.\n"
     ]
    }
   ],
   "source": [
    "# ============================================================\n",
    "# CELLULE 6b — Run 2 OPTIMISÉ : trigger 5s, watermark 5min\n",
    "# ============================================================\n",
    "\n",
    "# Nouvelle SparkSession avec moins de partitions shuffle\n",
    "spark.conf.set(\"spark.sql.shuffle.partitions\", \"2\")  # encore moins de partitions\n",
    "\n",
    "# Nouvelle source streaming\n",
    "df_stream2 = (\n",
    "    spark.readStream\n",
    "    .schema(schema)\n",
    "    .option(\"multiline\", \"true\")\n",
    "    .option(\"maxFilesPerTrigger\", 1)\n",
    "    .json(LANDING_DIR)\n",
    ")\n",
    "\n",
    "# Agrégation avec watermark RÉDUIT à 5 min\n",
    "windowed2 = (\n",
    "    df_stream2\n",
    "    .withWatermark(\"starttime\", \"5 minutes\")       # watermark plus strict\n",
    "    .groupBy(\n",
    "        F.window(\"starttime\", \"15 minutes\"),\n",
    "        F.col(\"station_id\"),\n",
    "        F.col(\"user_type\"),\n",
    "    )\n",
    "    .agg(\n",
    "        F.count(\"*\").alias(\"trip_count\"),\n",
    "        F.avg(\"duration_sec\").alias(\"avg_duration_sec\"),\n",
    "        F.min(\"duration_sec\").alias(\"min_duration_sec\"),\n",
    "        F.max(\"duration_sec\").alias(\"max_duration_sec\"),\n",
    "        F.approx_count_distinct(\"bike_id\").alias(\"unique_bikes\"),\n",
    "    )\n",
    ")\n",
    "\n",
    "# Lance query optimisée avec trigger 5s\n",
    "query2 = (\n",
    "    windowed2.writeStream\n",
    "    .format(\"parquet\")\n",
    "    .outputMode(\"append\")\n",
    "    .option(\"path\", \"outputs/lab1/stream_sink\")\n",
    "    .option(\"checkpointLocation\", \"outputs/lab1/checkpoint\")\n",
    "    .trigger(processingTime=\"5 seconds\")             # trigger plus court\n",
    "    .start()\n",
    ")\n",
    "\n",
    "print(\"Run 2 (optimisé) démarré !\")\n",
    "\n",
    "# Re-dépose les fichiers\n",
    "staging_files = sorted(pathlib.Path(STAGING_DIR).glob(\"*.json\"))\n",
    "for i, src in enumerate(staging_files):\n",
    "    dst = pathlib.Path(LANDING_DIR) / src.name\n",
    "    shutil.copy(str(src), str(dst))\n",
    "    print(f\"  [{i+1:02d}/10] Déposé : {src.name}\")\n",
    "    time.sleep(30)\n",
    "\n",
    "time.sleep(30)\n",
    "\n",
    "# Capture métriques Run 2\n",
    "p2 = query2.lastProgress or {}\n",
    "run2_metrics = {\n",
    "    \"run_id\":                    \"r2\",\n",
    "    \"trigger_interval\":          \"5s\",\n",
    "    \"watermark_duration\":        \"5min\",\n",
    "    \"inputRowsPerSecond\":        p2.get(\"inputRowsPerSecond\", 0),\n",
    "    \"processedRowsPerSecond\":    p2.get(\"processedRowsPerSecond\", 0),\n",
    "    \"numInputRows\":              p2.get(\"numInputRows\", 0),\n",
    "    \"stateRows\":                 (p2.get(\"stateOperators\") or [{}])[0].get(\"numRowsTotal\", 0),\n",
    "    \"durationMs\":                (p2.get(\"durationMs\") or {}).get(\"triggerExecution\", 0),\n",
    "    \"note\":                      \"optimized_trigger_5s_watermark_5min\",\n",
    "    \"timestamp\":                 p2.get(\"timestamp\", \"\"),\n",
    "}\n",
    "\n",
    "print(\"\\nRun 2 (optimisé) :\", run2_metrics)\n",
    "query2.stop()\n",
    "print(\"Query Run 2 arrêtée\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "id": "save-metrics",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      " lab1_metrics_log.csv mis à jour avec les vraies métriques\n",
      "\n",
      "======= COMPARAISON AVANT / APRÈS =======\n",
      "Métrique                            Run 1 (10s/10min)         Run 2 (5s/5min)     \n",
      "--------------------------------------------------------------------------------\n",
      "inputRowsPerSecond                  0.0                       0.0                 \n",
      "processedRowsPerSecond              0.0                       0.0                 \n",
      "numInputRows                        0                         0                   \n",
      "durationMs                          9                         45                  \n",
      "stateRows                           9                         9                   \n",
      "\n",
      "\n",
      " Lab 1 Track C — COMPLET !\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "26/05/10 18:25:55 WARN StateStore: Error running maintenance thread\n",
      "java.lang.IllegalStateException: SparkEnv not active, cannot do maintenance on StateStores\n",
      "\tat org.apache.spark.sql.execution.streaming.state.StateStore$.doMaintenance(StateStore.scala:950)\n",
      "\tat org.apache.spark.sql.execution.streaming.state.StateStore$.$anonfun$startMaintenanceIfNeeded$1(StateStore.scala:924)\n",
      "\tat org.apache.spark.sql.execution.streaming.state.StateStore$MaintenanceTask$$anon$1.run(StateStore.scala:725)\n",
      "\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)\n",
      "\tat java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:358)\n",
      "\tat java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)\n",
      "\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\n",
      "\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\n",
      "\tat java.base/java.lang.Thread.run(Thread.java:1583)\n"
     ]
    }
   ],
   "source": [
    "# ============================================================\n",
    "# CELLULE 6c — Sauvegarde lab1_metrics_log.csv\n",
    "# ============================================================\n",
    "import csv\n",
    "\n",
    "metrics_fields = [\n",
    "    \"run_id\", \"trigger_interval\", \"watermark_duration\",\n",
    "    \"inputRowsPerSecond\", \"processedRowsPerSecond\",\n",
    "    \"numInputRows\", \"stateRows\", \"durationMs\",\n",
    "    \"note\", \"timestamp\"\n",
    "]\n",
    "\n",
    "with open(\"lab1_metrics_log.csv\", \"w\", newline=\"\") as f:\n",
    "    writer = csv.DictWriter(f, fieldnames=metrics_fields)\n",
    "    writer.writeheader()\n",
    "    writer.writerow(run1_metrics)\n",
    "    writer.writerow(run2_metrics)\n",
    "\n",
    "print(\" lab1_metrics_log.csv mis à jour avec les vraies métriques\")\n",
    "\n",
    "# Affichage comparatif\n",
    "print(\"\\n======= COMPARAISON AVANT / APRÈS =======\")\n",
    "print(f\"{'Métrique':<35} {'Run 1 (10s/10min)':<25} {'Run 2 (5s/5min)':<20}\")\n",
    "print(\"-\" * 80)\n",
    "for k in [\"inputRowsPerSecond\", \"processedRowsPerSecond\", \"numInputRows\", \"durationMs\", \"stateRows\"]:\n",
    "    v1 = run1_metrics.get(k, \"N/A\")\n",
    "    v2 = run2_metrics.get(k, \"N/A\")\n",
    "    print(f\"{k:<35} {str(v1):<25} {str(v2):<20}\")\n",
    "\n",
    "print(\"\\n\")\n",
    "spark.stop()\n",
    "print(\" Lab 1 Track C — COMPLET !\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "engineering-note",
   "metadata": {},
   "source": [
    "##  Engineering Note — Pipeline Design & Cost Trade-offs\n",
    "\n",
    "### Architecture du pipeline\n",
    "\n",
    "```\n",
    "JSON Files (landing/) \n",
    "    → readStream (maxFilesPerTrigger=1)\n",
    "    → withWatermark(\"starttime\", \"10min\")\n",
    "    → window(\"starttime\", \"15min\") + groupBy(station_id, user_type)\n",
    "    → agg(count, avg, min, max, countDistinct)\n",
    "    → writeStream Parquet (append, checkpoint, trigger 10s)\n",
    "```\n",
    "\n",
    "### Choix du Watermark (10 minutes)\n",
    "Le watermark de **10 minutes** a été choisi car les données de mobilité peuvent arriver avec un léger retard réseau (GPS qui perd le signal, batch envoyé toutes les 5 min). Un watermark de 10 min offre un bon compromis :\n",
    "- **Trop court** (ex: 1 min) → perd des événements légitimement tardifs\n",
    "- **Trop long** (ex: 30 min) → l'état mémoire (stateOperators) grossit inutilement\n",
    "\n",
    "### Choix de la fenêtre (15 minutes tumbling)\n",
    "La fenêtre de **15 minutes** correspond à la granularité naturelle des données Track C. Elle permet d'observer les pics d'utilisation tout en restant agrégé (pas trop de granularité).\n",
    "\n",
    "### Exactly-once semantics\n",
    "Garanti par le **checkpoint directory** : Spark y sauvegarde les offsets des fichiers déjà traités. Si le job plante et redémarre, il reprend là où il s'était arrêté sans re-traiter les données.\n",
    "\n",
    "### Avant/Après Optimisation\n",
    "\n",
    "| Paramètre              | Baseline (Run 1) | Optimisé (Run 2) | Effet |\n",
    "|------------------------|-----------------|-----------------|-------|\n",
    "| Trigger interval       | 10s             | 5s              | Latence réduite de ~50% |\n",
    "| Watermark              | 10 min          | 5 min           | État mémoire réduit |\n",
    "| Shuffle partitions     | 4               | 2               | Overhead réduit en local |\n",
    "\n",
    "**Conclusion :** En local avec peu de données, réduire le trigger à 5s améliore la réactivité sans surcoût notable. En production avec de vrais volumes, il faudrait évaluer l'overhead des micro-batches fréquents vs le gain en latence."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "id": "1dc9c935-1757-41f3-8827-8f88453e03fa",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "=== LIVRABLES ===\n",
      "Parquet: 29 fichiers\n",
      "Checkpoint: True\n",
      "plan_streaming.txt: True\n",
      "query_progress.json: True\n",
      "metrics_log.csv: True\n"
     ]
    }
   ],
   "source": [
    "import pathlib\n",
    "print(\"=== LIVRABLES ===\")\n",
    "print(\"Parquet:\", len(list(pathlib.Path(\"outputs/lab1/stream_sink\").rglob(\"*.parquet\"))), \"fichiers\")\n",
    "print(\"Checkpoint:\", pathlib.Path(\"outputs/lab1/checkpoint\").exists())\n",
    "print(\"plan_streaming.txt:\", pathlib.Path(\"proof/plan_streaming.txt\").exists())\n",
    "print(\"query_progress.json:\", pathlib.Path(\"proof/query_progress.json\").exists())\n",
    "print(\"metrics_log.csv:\", pathlib.Path(\"lab1_metrics_log.csv\").exists())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "34baf525-b0be-4beb-8ef7-92624da1fa31",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python (de2-env)",
   "language": "python",
   "name": "de2-env"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.20"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
