{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "3fccb597",
   "metadata": {},
   "source": [
    "# ESIEE Paris — Data Engineering I — Assignment 1\n",
    "> Author : Badr TAJINI\n",
    "\n",
    "**Academic year:** 2025–2026  \n",
    "**Program:** Data & Applications - Engineering - (FD)   \n",
    "**Course:** Data Engineering I  \n",
    "\n",
    "---\n",
    "\n",
    "In this assignment, you'll make sure that you've correctly set up your local Spark environment.\n",
    "You'll then complete a classic \"Word Count\" task on the `description` column of the `a1-brand.csv` file.\n",
    "\n",
    "You can think of \"Word Count\" as the \"Hello World!\" of Hadoop, Spark, etc.\n",
    "The task is simple: We want to count the total number of times each word occurs (in a potentially large collection of text).\n",
    "Typically, we want to sort by the counts in descending order so we can examine the most frequently occurring words."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "a0ac1598",
   "metadata": {},
   "source": [
    "## Learning goals\n",
    "- Confirm local Spark environment in JupyterLab.\n",
    "- Implement word-count using **RDD** and **DataFrame** APIs.\n",
    "- Produce top-10 tokens with and without stopwords.\n",
    "- Record brief performance notes and environment details.\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "2853064b",
   "metadata": {},
   "source": [
    "## 1. Setup\n",
    "\n",
    "The following code snippet should \"just work\" to initialize Spark.\n",
    "If it doesn't, consult the **helper and Lab 0 with installation and setup guide**."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "id": "70dd3289-ab06-481c-82cf-7953412f7d17",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Python version: 3.10.19 (main, Oct 21 2025, 16:43:05) [GCC 11.2.0]\n",
      "PySpark version: 4.0.1\n"
     ]
    }
   ],
   "source": [
    "import sys, pyspark\n",
    "print(\"Python version:\", sys.version)\n",
    "print(\"PySpark version:\", pyspark.__version__)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "d15624cc-d5b7-4ff5-94df-c8fcc9729d59",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "findspark: 2.0.1\n"
     ]
    }
   ],
   "source": [
    "import findspark\n",
    "print(\"findspark:\", findspark.__version__)\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "e10d1601-4445-4fa7-8ca9-656dd87679c8",
   "metadata": {},
   "outputs": [],
   "source": [
    "import findspark, os\n",
    "os.environ[\"SPARK_HOME\"] = \"/home/bazic/spark-4.0.0-bin-hadoop3\"\n",
    "findspark.init()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "17950740",
   "metadata": {},
   "source": [
    "Edit the path below to point to your local copy of `a1-brand.csv`. \n",
    "\n",
    "Examples:\n",
    "- macOS/Linux: `/Users/yourname/data/a1-brand.csv`\n",
    "- Windows: `C:\\\\Users\\\\yourname\\\\data\\\\a1-brand.csv`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "578fd221",
   "metadata": {},
   "outputs": [],
   "source": [
    "# TODO: Set the path to a1-brand.csv\n",
    "DATA_PATH = \"/home/bazic/de1/lab1/a1-brand.csv\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "id": "d498be30-4f08-4982-8904-4a24d7141c52",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Fichier existe ? True\n"
     ]
    }
   ],
   "source": [
    "\n",
    "# Vérification rapide\n",
    "import os\n",
    "print(\"Fichier existe ?\", os.path.exists(DATA_PATH))"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "77fdc36a",
   "metadata": {},
   "source": [
    "Import PySpark:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "id": "636b041f",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "tout fonctionne\n"
     ]
    }
   ],
   "source": [
    "import sys, re\n",
    "from pyspark.sql import SparkSession, functions as F, types as T\n",
    "from pyspark.sql.functions import col\n",
    "print(\"tout fonctionne\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "1eaebd90",
   "metadata": {},
   "source": [
    "Set up to measure wall time and memory. (Don't worry about the details, just run the cell)\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "id": "980f90dc",
   "metadata": {},
   "outputs": [],
   "source": [
    "from IPython.core.magic import register_cell_magic\n",
    "import time, os, platform\n",
    "import psutil, resource\n",
    "\n",
    "def _rss_bytes():\n",
    "    return psutil.Process(os.getpid()).memory_info().rss\n",
    "\n",
    "def _ru_maxrss_bytes():\n",
    "    # ru_maxrss: bytes on macOS; kilobytes on Linux\n",
    "    ru = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss\n",
    "    if platform.system() == \"Darwin\":\n",
    "        return int(ru)  # bytes\n",
    "    else:\n",
    "        return int(ru) * 1024  # KB -> bytes\n",
    "\n",
    "@register_cell_magic\n",
    "def timemem(line, cell):\n",
    "    \"\"\"\n",
    "    Measure wall time and memory around the execution of this cell.\n",
    "    Usage:\n",
    "        %%timemem\n",
    "        <your code>\n",
    "    \"\"\"\n",
    "    ip = get_ipython()\n",
    "    rss_before = _rss_bytes()\n",
    "    peak_before = _ru_maxrss_bytes()\n",
    "    t0 = time.perf_counter()\n",
    "\n",
    "    # Execute the cell body\n",
    "    result = ip.run_cell(cell)\n",
    "\n",
    "    t1 = time.perf_counter()\n",
    "    rss_after = _rss_bytes()\n",
    "    peak_after = _ru_maxrss_bytes()\n",
    "\n",
    "    wall = t1 - t0\n",
    "    rss_delta_mb = (rss_after - rss_before) / (1024*1024)\n",
    "    peak_delta_mb = (peak_after - peak_before) / (1024*1024)\n",
    "\n",
    "    print(\"======================================\")\n",
    "    print(f\"Wall time: {wall:.3f} s\")\n",
    "    print(f\"RSS Δ: {rss_delta_mb:+.2f} MB\")\n",
    "    print(f\"Peak memory Δ: {peak_delta_mb:+.2f} MB (OS-dependent)\")\n",
    "    print(\"======================================\")\n",
    "\n",
    "    return result"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "bbdbf067",
   "metadata": {},
   "source": [
    "Start a local Spark session (i.e., a `SparkContext`):"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "id": "f3506b62",
   "metadata": {
    "scrolled": true
   },
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "WARNING: Using incubator modules: jdk.incubator.vector\n",
      "Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties\n",
      "25/10/23 10:57:42 WARN Utils: Your hostname, BaziCComputer, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)\n",
      "25/10/23 10:57:42 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address\n",
      "Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties\n",
      "Setting default log level to \"WARN\".\n",
      "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n",
      "25/10/23 10:57:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "\n",
       "            <div>\n",
       "                <p><b>SparkSession - in-memory</b></p>\n",
       "                \n",
       "        <div>\n",
       "            <p><b>SparkContext</b></p>\n",
       "\n",
       "            <p><a href=\"http://10.255.255.254:4040\">Spark UI</a></p>\n",
       "\n",
       "            <dl>\n",
       "              <dt>Version</dt>\n",
       "                <dd><code>v4.0.0</code></dd>\n",
       "              <dt>Master</dt>\n",
       "                <dd><code>local[*]</code></dd>\n",
       "              <dt>AppName</dt>\n",
       "                <dd><code>Assignment1</code></dd>\n",
       "            </dl>\n",
       "        </div>\n",
       "        \n",
       "            </div>\n",
       "        "
      ],
      "text/plain": [
       "<pyspark.sql.session.SparkSession at 0x7d46c00ba260>"
      ]
     },
     "execution_count": 10,
     "metadata": {},
     "output_type": "execute_result"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "======================================\n",
      "Wall time: 9.171 s\n",
      "RSS Δ: +2.50 MB\n",
      "Peak memory Δ: +0.00 MB (OS-dependent)\n",
      "======================================\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "<ExecutionResult object at 7d46c0f863b0, execution_count=None error_before_exec=None error_in_exec=None info=<ExecutionInfo object at 7d46c0f86350, raw_cell=\"\n",
       "spark = (\n",
       "    SparkSession.builder\n",
       "    .appName(\"..\" store_history=False silent=False shell_futures=True cell_id=None> result=<pyspark.sql.session.SparkSession object at 0x7d46c00ba260>>"
      ]
     },
     "execution_count": 10,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "%%timemem\n",
    "\n",
    "spark = (\n",
    "    SparkSession.builder\n",
    "    .appName(\"Assignment1\")\n",
    "    .master(\"local[*]\")            # Use all local cores\n",
    "    .config(\"spark.ui.showConsoleProgress\", \"true\")\n",
    "    .getOrCreate()\n",
    ")\n",
    "\n",
    "spark"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "150dde24",
   "metadata": {},
   "source": [
    "If you've gotten to here, congrats! Everything seems to have been set up and initialized properly!"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "2d60c051-264c-4c9d-88ef-d9a24c241d8e",
   "metadata": {},
   "source": [
    "## 2. Word Count with RDDs"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "d4d740f5-5dfd-42b0-905e-3fd3a7f20134",
   "metadata": {},
   "source": [
    "First, let's read the `a1-brand.csv` file into an RDD.\n",
    "\n",
    "**write some code here**\n",
    "\n",
    "**Hints:**\n",
    "\n",
    "- You'll want to fetch the `SparkContext` from the `SparkSession`.\n",
    "- There's a method of the `SparkContext` for reading in text files.\n",
    "- This simple exercise should only take two lines. If you find yourself writing more code, you're doing something wrong..."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "id": "42c7ef3f",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Nombre de lignes: 7262\n",
      "======================================\n",
      "Wall time: 0.256 s\n",
      "RSS Δ: +0.00 MB\n",
      "Peak memory Δ: +0.00 MB (OS-dependent)\n",
      "======================================\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "<ExecutionResult object at 7d46d003f640, execution_count=None error_before_exec=None error_in_exec=None info=<ExecutionInfo object at 7d46d003ed40, raw_cell=\"\n",
       "# On récupère le SparkContext\n",
       "sc = spark.sparkCon..\" store_history=False silent=False shell_futures=True cell_id=None> result=None>"
      ]
     },
     "execution_count": 12,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "%%timemem\n",
    "\n",
    "# On récupère le SparkContext\n",
    "sc = spark.sparkContext\n",
    "\n",
    "# On lit le fichier CSV dans un RDD\n",
    "lines = sc.textFile(DATA_PATH)\n",
    "\n",
    "# Compter le nombre de lignes\n",
    "print(\"Nombre de lignes:\", lines.count())\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "1cb387ef",
   "metadata": {},
   "source": [
    "Next, clean and tokenize text, and then find the 10 most common words.\n",
    "\n",
    "**write some code here**\n",
    "\n",
    "**Required Steps:**\n",
    "\n",
    "- Lowercase all text.\n",
    "- Replace non-letter characters (`[^a-z]`) with spaces.\n",
    "- Split on whitespace into tokens.\n",
    "- Remove tokens with length < 2.\n",
    "\n",
    "**Hints:**\n",
    "\n",
    "- You _must_ use `flatMap` and other RDD operations in this step. If you're not, you're doing something wrong...\n",
    "- At the end, you'll need to `collect` the output.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "id": "806a5885",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "and: 16150\n",
      "the: 9612\n",
      "in: 7958\n",
      "is: 7814\n",
      "for: 6789\n",
      "brand: 6476\n",
      "its: 4241\n",
      "to: 4026\n",
      "of: 3382\n",
      "with: 3099\n",
      "======================================\n",
      "Wall time: 1.540 s\n",
      "RSS Δ: +1.75 MB\n",
      "Peak memory Δ: +0.00 MB (OS-dependent)\n",
      "======================================\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "<ExecutionResult object at 7d46c00bbe80, execution_count=None error_before_exec=None error_in_exec=None info=<ExecutionInfo object at 7d46c00bbeb0, raw_cell=\"\n",
       "import re\n",
       "\n",
       "# Transformation RDD : nettoyage et to..\" store_history=False silent=False shell_futures=True cell_id=None> result=None>"
      ]
     },
     "execution_count": 13,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "%%timemem\n",
    "\n",
    "import re\n",
    "\n",
    "# Transformation RDD : nettoyage et tokenisation\n",
    "words = (\n",
    "    lines\n",
    "    .flatMap(lambda line: re.sub(r'[^a-z]', ' ', line.lower()).split())\n",
    "    .filter(lambda w: len(w) >= 2)\n",
    ")\n",
    "\n",
    "# Compter les occurrences de chaque mot\n",
    "word_counts = (\n",
    "    words\n",
    "    .map(lambda w: (w, 1))\n",
    "    .reduceByKey(lambda a, b: a + b)\n",
    "    .sortBy(lambda x: x[1], ascending=False)\n",
    "    .collect()\n",
    ")\n",
    "\n",
    "# Afficher les 10 mots les plus fréquents\n",
    "for word, count in word_counts[:10]:\n",
    "    print(f\"{word}: {count}\")\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "d34d215c",
   "metadata": {},
   "source": [
    "## 3. Word Count with DataFrames"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "2f5672f0-63ec-4f0e-b784-1684e1c7a7e8",
   "metadata": {},
   "source": [
    "### 3.1 Again, Just with DataFrames\n",
    "\n",
    "Now, we're going to do the same thing, but with DataFrames instead of RDDs.\n",
    "\n",
    "What's the difference, you ask? We'll cover it in lecture soon enough!\n",
    "\n",
    "**write some code here**\n",
    "\n",
    "**Hints:**\n",
    "\n",
    "- Here, you'll use the `SparkSession`.\n",
    "- Loading a DataFrame is a single method call. If you find yourself writing more code, you're doing something wrong...\n",
    "- When loading the CSV file, be aware of your escape character; use something like `.option(\"escape\", ...)`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "id": "e99b288e",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Rows: 7261\n",
      "root\n",
      " |-- brand: string (nullable = true)\n",
      " |-- description: string (nullable = true)\n",
      "\n",
      "+--------------------------------------------------------------------------------+\n",
      "|                                                                     description|\n",
      "+--------------------------------------------------------------------------------+\n",
      "|a-case is a brand specializing in protective accessories for electronic devic...|\n",
      "|A-Derma is a French dermatological skincare brand specializing in products fo...|\n",
      "| a patented ingredient derived from oat plants cultivated under organic farmi...|\n",
      "|                                                                       cleansers|\n",
      "|           A-Derma emphasizes clinical efficacy and hypoallergenic formulations.|\n",
      "+--------------------------------------------------------------------------------+\n",
      "only showing top 5 rows\n",
      "======================================\n",
      "Wall time: 0.361 s\n",
      "RSS Δ: +0.00 MB\n",
      "Peak memory Δ: +0.00 MB (OS-dependent)\n",
      "======================================\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "<ExecutionResult object at 7d469f6aeb00, execution_count=None error_before_exec=None error_in_exec=None info=<ExecutionInfo object at 7d469f6af5b0, raw_cell=\"\n",
       "from pyspark.sql import functions as F\n",
       "from pyspa..\" store_history=False silent=False shell_futures=True cell_id=None> result=None>"
      ]
     },
     "execution_count": 16,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "%%timemem\n",
    "\n",
    "from pyspark.sql import functions as F\n",
    "from pyspark.sql.functions import col, explode, split, lower, regexp_replace\n",
    "\n",
    "# Charger le CSV en DataFrame\n",
    "df = spark.read.option(\"header\", True).csv(\"a1-brand.csv\")\n",
    "\n",
    "# Inspection rapide\n",
    "print(\"Rows:\", df.count())\n",
    "df.printSchema()\n",
    "df.select(\"description\").show(5, truncate=80)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "4882eacc",
   "metadata": {},
   "source": [
    "Next, clean and tokenize text, and then find the 10 most common (i.e., frequently occurring) words.\n",
    "This attempts the same processing as word count with RDDs above, except here you're using a DataFrame.\n",
    "\n",
    "**write some code here**\n",
    "\n",
    "**Required Steps:** (Exactly the same as above.)\n",
    "\n",
    "- Lowercase all text.\n",
    "- Replace non-letter characters (`[^a-z]`) with spaces.\n",
    "- Split on whitespace into tokens.\n",
    "- Remove tokens with length < 2.\n",
    "\n",
    "**Hints:**\n",
    "\n",
    "- You _must_ use `explode` and other Spark DataFrame operations in this exercise.\n",
    "- This exercise shouldn't take more than (roughly) a dozen lines. If you find yourself writing more code, you're doing something wrong..."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "id": "dec84df4",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----+-----+\n",
      "| word|count|\n",
      "+-----+-----+\n",
      "|  and|13535|\n",
      "|  the| 7371|\n",
      "|   is| 6747|\n",
      "|   in| 6632|\n",
      "|  for| 5696|\n",
      "|brand| 5407|\n",
      "|  its| 3438|\n",
      "|   to| 3394|\n",
      "|   of| 2798|\n",
      "| with| 2622|\n",
      "+-----+-----+\n",
      "\n",
      "======================================\n",
      "Wall time: 0.439 s\n",
      "RSS Δ: +0.00 MB\n",
      "Peak memory Δ: +0.00 MB (OS-dependent)\n",
      "======================================\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "<ExecutionResult object at 7d469f6af700, execution_count=None error_before_exec=None error_in_exec=None info=<ExecutionInfo object at 7d469f6af730, raw_cell=\"\n",
       "from pyspark.sql.functions import explode, split,..\" store_history=False silent=False shell_futures=True cell_id=None> result=None>"
      ]
     },
     "execution_count": 17,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "%%timemem\n",
    "\n",
    "from pyspark.sql.functions import explode, split, lower, regexp_replace\n",
    "\n",
    "# Nettoyer, tokenizer et exploser les mots\n",
    "words_df = df.select(\n",
    "    explode(\n",
    "        split(\n",
    "            regexp_replace(lower(col(\"description\")), \"[^a-z]\", \" \"),  # lowercase + nettoyage\n",
    "            \"\\s+\"\n",
    "        )\n",
    "    ).alias(\"word\")\n",
    ")\n",
    "\n",
    "# Filtrer les tokens trop courts (<2 caractères)\n",
    "words_df = words_df.filter(F.length(\"word\") > 1)\n",
    "\n",
    "# Compter les mots et trier par fréquence\n",
    "word_counts = words_df.groupBy(\"word\").count().orderBy(F.desc(\"count\"))\n",
    "\n",
    "# Afficher les 10 mots les plus fréquents\n",
    "top10 = word_counts.limit(10)\n",
    "top10.show()\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "fede1ca2",
   "metadata": {},
   "source": [
    "**Questions to reflect on**:\n",
    "\n",
    "- What is conceptually different about how Spark executes `flatMap` and `explode`?\n",
    "- What are the advantages or disadvantages of using each of them? \n",
    "- Are there cases where you may prefer one over the other?\n",
    "\n",
    "(No need to write answers in the assignment submission. Just think about it...)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "8505e64e",
   "metadata": {},
   "source": [
    "**Question to actually answer**:\n",
    "\n",
    "Does the RDD approach and the DataFrame approach give the same answers? Explain why or why not."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "060a1265-558b-464f-8c85-e9c553ba9fad",
   "metadata": {},
   "source": [
    "**Write your answer to the above question!**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "id": "2730656f-2752-4c43-a301-2670f7baf1da",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'\\nOui, l’approche RDD et l’approche DataFrame donnent les mêmes résultats pour le comptage de mots. Elles font toutes les deux les mêmes opérations : mettre en minuscules, enlever les caractères non lettres, découper en mots et compter.\\n\\nLa différence est surtout dans la façon dont Spark exécute les calculs :\\n\\nLes RDDs travaillent sur des collections d’objets et appliquent les transformations étape par étape.\\n\\nLes DataFrames utilisent un moteur d’optimisation qui planifie les calculs de façon plus efficace.\\n'"
      ]
     },
     "execution_count": 19,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "\"\"\"\n",
    "Oui, l’approche RDD et l’approche DataFrame donnent les mêmes résultats pour le comptage de mots. Elles font toutes les deux les mêmes opérations : mettre en minuscules, enlever les caractères non lettres, découper en mots et compter.\n",
    "\n",
    "La différence est surtout dans la façon dont Spark exécute les calculs :\n",
    "\n",
    "Les RDDs travaillent sur des collections d’objets et appliquent les transformations étape par étape.\n",
    "\n",
    "Les DataFrames utilisent un moteur d’optimisation qui planifie les calculs de façon plus efficace.\n",
    "\"\"\""
   ]
  },
  {
   "cell_type": "markdown",
   "id": "479ca759",
   "metadata": {},
   "source": [
    "### 3.1 Removing Stopwords\n",
    "\n",
    "You've probably noticed that many of the most frequently occurring words are not providing us any indication about the content because they are words like \"in\", \"the\", \"for\", etc.\n",
    "These are called stopwords.\n",
    "\n",
    "Let's remove stopwords and count again!\n",
    "\n",
    "**write some code here**\n",
    "\n",
    "**Hints:**\n",
    "\n",
    "- Filter out all stopwords from the DataFrame before counting.\n",
    "- Use `StopWordsRemover` from `pyspark.ml.feature`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "id": "341c7d28",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------+-----+\n",
      "|        word|count|\n",
      "+------------+-----+\n",
      "|       brand| 5112|\n",
      "|       known| 2504|\n",
      "|   primarily| 2114|\n",
      "|  recognized| 1543|\n",
      "|    products| 1511|\n",
      "|   including| 1482|\n",
      "|specializing| 1407|\n",
      "|       often| 1298|\n",
      "|       range| 1210|\n",
      "|     company| 1075|\n",
      "+------------+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from pyspark.ml.feature import StopWordsRemover\n",
    "from pyspark.sql.functions import split, col, explode, lower\n",
    "\n",
    "# 1. Tokenisation : transformer la colonne 'description' en tableau de mots\n",
    "df_tokens = df.withColumn(\"words\", split(col(\"description\"), \" \"))\n",
    "\n",
    "# 1b. Filtrer les lignes où 'words' est null\n",
    "df_tokens = df_tokens.filter(col(\"words\").isNotNull())\n",
    "\n",
    "# 2. Supprimer les stopwords\n",
    "remover = StopWordsRemover(inputCol=\"words\", outputCol=\"filtered\")\n",
    "df_filtered = remover.transform(df_tokens)\n",
    "\n",
    "# 3. Compter les mots après suppression des stopwords\n",
    "word_counts_noStopWords = (\n",
    "    df_filtered\n",
    "    .select(explode(col(\"filtered\")).alias(\"word\"))\n",
    "    .withColumn(\"word\", lower(col(\"word\")))\n",
    "    .filter(col(\"word\") != \"\")         # supprimer les tokens vides\n",
    "    .groupBy(\"word\")\n",
    "    .count()\n",
    "    .orderBy(col(\"count\").desc())\n",
    ")\n",
    "\n",
    "# 4. Afficher les top-10 mots sans stopwords\n",
    "top10_noStopWords = word_counts_noStopWords.limit(10)\n",
    "top10_noStopWords.show()\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "f623f34a",
   "metadata": {},
   "source": [
    "### 3.2 Saving Results to CSV\n",
    "\n",
    "+ Save the results of the top-10 most frequently occurring words _with stopwords_, as a CSV file, to `top10_words.csv`.\n",
    "+ Save the results of the top-10 frequently occurring words _discarding stopwords_, as a CSV file, to `top10_noStopWords.csv`.\n",
    "\n",
    "**write some code here**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "id": "c5e082af",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "======================================\n",
      "Wall time: 1.494 s\n",
      "RSS Δ: +0.00 MB\n",
      "Peak memory Δ: +0.00 MB (OS-dependent)\n",
      "======================================\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "<ExecutionResult object at 7d46703bb610, execution_count=None error_before_exec=None error_in_exec=None info=<ExecutionInfo object at 7d46703b98d0, raw_cell=\"\n",
       "# Sauvegarder le top-10 avec stopwords\n",
       "top10.coal..\" store_history=False silent=False shell_futures=True cell_id=None> result=None>"
      ]
     },
     "execution_count": 26,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "%%timemem\n",
    "\n",
    "# Sauvegarder le top-10 avec stopwords\n",
    "top10.coalesce(1).write.mode(\"overwrite\").option(\"header\", True).csv(\"top10_words.csv\")\n",
    "\n",
    "# Sauvegarder le top-10 sans stopwords\n",
    "top10_noStopWords.coalesce(1).write.mode(\"overwrite\").option(\"header\", True).csv(\"top10_noStopWords.csv\")\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "f89fac7f",
   "metadata": {},
   "source": [
    "## 4. Assignment Submission and Cleanup\n",
    "\n",
    "Details about the Submission of this assignment are outlined in the helper. Please read carefully the instructions.\n",
    "\n",
    "Finally, clean up!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "id": "3b071ae2",
   "metadata": {},
   "outputs": [],
   "source": [
    "spark.stop()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "96409bb6",
   "metadata": {},
   "source": [
    "## Performance notes"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "2d4c5299",
   "metadata": {},
   "source": [
    "- Prefer DataFrame built-ins; avoid Python UDFs for tokenization where possible.\n",
    "- Keep shuffle partitions modest on local runs.\n",
    "- Cache wisely and avoid unnecessary actions.\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "257f18f3",
   "metadata": {},
   "source": [
    "## Reproducibility checklist"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "64031975",
   "metadata": {},
   "source": [
    "- Record Python/Java/Spark versions.\n",
    "- Fix timezone to UTC.\n",
    "- Provide exact run command and paths to input/output files.\n"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.19"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
