{"cells":[{"cell_type":"markdown","metadata":{"id":"RUq_c0p7F6nP"},"source":["# DE1 — Lab 2: PostgreSQL → Star Schema ETL\n","> Author : Badr TAJINI - Data Engineering I - ESIEE 2025-2026\n","---\n","\n","\n","Execute all cells. Attach evidence and fill metrics."],"id":"RUq_c0p7F6nP"},{"cell_type":"code","source":["from google.colab import drive\n","drive.mount('/content/drive')"],"metadata":{"colab":{"base_uri":"https://localhost:8080/"},"id":"Ny9sItjuJ8P6","executionInfo":{"status":"ok","timestamp":1767094659929,"user_tz":-180,"elapsed":42363,"user":{"displayName":"Yannick PRAT","userId":"15153610905305463248"}},"outputId":"8e9c4d8f-460c-4001-daa1-e57d603fd8cd"},"id":"Ny9sItjuJ8P6","execution_count":1,"outputs":[{"output_type":"stream","name":"stdout","text":["Mounted at /content/drive\n"]}]},{"cell_type":"code","source":["# Chemin exact :\n","base = \"/content/drive/MyDrive/ESIEE/E4FD/Cours/P1/Data_Engineering/LAB_DE/Data/\""],"metadata":{"id":"Jj8ZmdySKkvu","executionInfo":{"status":"ok","timestamp":1767094659947,"user_tz":-180,"elapsed":35,"user":{"displayName":"Yannick PRAT","userId":"15153610905305463248"}}},"id":"Jj8ZmdySKkvu","execution_count":2,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"J6Z2ZCgKF6nR"},"source":["## 0. Setup and schemas"],"id":"J6Z2ZCgKF6nR"},{"cell_type":"code","source":["!pip install pyspark"],"metadata":{"colab":{"base_uri":"https://localhost:8080/"},"id":"S6IG39XYK1s8","executionInfo":{"status":"ok","timestamp":1767094664562,"user_tz":-180,"elapsed":4640,"user":{"displayName":"Yannick PRAT","userId":"15153610905305463248"}},"outputId":"189d164e-924b-4251-e57e-2c5be8a4c3d7"},"id":"S6IG39XYK1s8","execution_count":3,"outputs":[{"output_type":"stream","name":"stdout","text":["Requirement already satisfied: pyspark in /usr/local/lib/python3.12/dist-packages (4.0.1)\n","Requirement already satisfied: py4j==0.10.9.9 in /usr/local/lib/python3.12/dist-packages (from pyspark) (0.10.9.9)\n"]}]},{"cell_type":"code","execution_count":4,"metadata":{"id":"dMvDZ7PlF6nS","executionInfo":{"status":"ok","timestamp":1767094685889,"user_tz":-180,"elapsed":21314,"user":{"displayName":"Yannick PRAT","userId":"15153610905305463248"}}},"outputs":[],"source":["from pyspark.sql import SparkSession, functions as F, types as T\n","spark = SparkSession.builder.appName(\"de1-lab2\").getOrCreate()\n","# Explicit schemas\n","customers_schema = T.StructType([\n","    T.StructField(\"customer_id\", T.IntegerType(), False),\n","    T.StructField(\"name\", T.StringType(), True),\n","    T.StructField(\"email\", T.StringType(), True),\n","    T.StructField(\"created_at\", T.TimestampType(), True),\n","])\n","brands_schema = T.StructType([\n","    T.StructField(\"brand_id\", T.IntegerType(), False),\n","    T.StructField(\"brand_name\", T.StringType(), True),\n","])\n","categories_schema = T.StructType([\n","    T.StructField(\"category_id\", T.IntegerType(), False),\n","    T.StructField(\"category_name\", T.StringType(), True),\n","])\n","products_schema = T.StructType([\n","    T.StructField(\"product_id\", T.IntegerType(), False),\n","    T.StructField(\"product_name\", T.StringType(), True),\n","    T.StructField(\"brand_id\", T.IntegerType(), True),\n","    T.StructField(\"category_id\", T.IntegerType(), True),\n","    T.StructField(\"price\", T.DoubleType(), True),\n","])\n","orders_schema = T.StructType([\n","    T.StructField(\"order_id\", T.IntegerType(), False),\n","    T.StructField(\"customer_id\", T.IntegerType(), True),\n","    T.StructField(\"order_date\", T.TimestampType(), True),\n","])\n","order_items_schema = T.StructType([\n","    T.StructField(\"order_item_id\", T.IntegerType(), False),\n","    T.StructField(\"order_id\", T.IntegerType(), True),\n","    T.StructField(\"product_id\", T.IntegerType(), True),\n","    T.StructField(\"quantity\", T.IntegerType(), True),\n","    T.StructField(\"unit_price\", T.DoubleType(), True),\n","])\n"],"id":"dMvDZ7PlF6nS"},{"cell_type":"markdown","metadata":{"id":"ItX8WnD0F6nT"},"source":["## 1. Ingest operational tables (from CSV exports)"],"id":"ItX8WnD0F6nT"},{"cell_type":"code","execution_count":5,"metadata":{"colab":{"base_uri":"https://localhost:8080/"},"id":"6s953FcWF6nT","executionInfo":{"status":"ok","timestamp":1767094702083,"user_tz":-180,"elapsed":16204,"user":{"displayName":"Yannick PRAT","userId":"15153610905305463248"}},"outputId":"e23f64b2-9e83-4898-c3a5-c08dd1fb02c2"},"outputs":[{"output_type":"stream","name":"stdout","text":["customers: 24 lignes chargées\n","brands: 8 lignes chargées\n","categories: 9 lignes chargées\n","products: 60 lignes chargées\n","orders: 220 lignes chargées\n","order_items: 638 lignes chargées\n"]}],"source":["# On redéfinit le chemin pour être SÛR\n","base_drive = \"/content/drive/MyDrive/ESIEE/E4FD/Cours/P1/Data_Engineering/LAB_DE/Data/\"\n","\n","customers = spark.read.schema(customers_schema).option(\"header\",\"true\").csv(base_drive + \"lab2_customers.csv\")\n","brands = spark.read.schema(brands_schema).option(\"header\",\"true\").csv(base_drive + \"lab2_brands.csv\")\n","categories = spark.read.schema(categories_schema).option(\"header\",\"true\").csv(base_drive + \"lab2_categories.csv\")\n","products = spark.read.schema(products_schema).option(\"header\",\"true\").csv(base_drive + \"lab2_products.csv\")\n","orders = spark.read.schema(orders_schema).option(\"header\",\"true\").csv(base_drive + \"lab2_orders.csv\")\n","order_items = spark.read.schema(order_items_schema).option(\"header\",\"true\").csv(base_drive + \"lab2_order_items.csv\")\n","\n","for name, df in [(\"customers\",customers),(\"brands\",brands),(\"categories\",categories),(\"products\",products),(\"orders\",orders),(\"order_items\",order_items)]:\n","    print(f\"{name}: {df.count()} lignes chargées\")"],"id":"6s953FcWF6nT"},{"cell_type":"markdown","metadata":{"id":"NsCKI9qGF6nU"},"source":["### Evidence: ingestion plan"],"id":"NsCKI9qGF6nU"},{"cell_type":"code","execution_count":6,"metadata":{"colab":{"base_uri":"https://localhost:8080/"},"id":"MWneAYFFF6nU","executionInfo":{"status":"ok","timestamp":1767094702470,"user_tz":-180,"elapsed":378,"user":{"displayName":"Yannick PRAT","userId":"15153610905305463248"}},"outputId":"71b165c5-d44e-4f73-b2cc-0837f7241dc3"},"outputs":[{"output_type":"stream","name":"stdout","text":["== Physical Plan ==\n","AdaptiveSparkPlan (11)\n","+- HashAggregate (10)\n","   +- Exchange (9)\n","      +- HashAggregate (8)\n","         +- Project (7)\n","            +- BroadcastHashJoin Inner BuildLeft (6)\n","               :- BroadcastExchange (3)\n","               :  +- Filter (2)\n","               :     +- Scan csv  (1)\n","               +- Filter (5)\n","                  +- Scan csv  (4)\n","\n","\n","(1) Scan csv \n","Output [1]: [order_id#13]\n","Batched: false\n","Location: InMemoryFileIndex [file:/content/drive/MyDrive/ESIEE/E4FD/Cours/P1/Data_Engineering/LAB_DE/Data/lab2_orders.csv]\n","PushedFilters: [IsNotNull(order_id)]\n","ReadSchema: struct<order_id:int>\n","\n","(2) Filter\n","Input [1]: [order_id#13]\n","Condition : isnotnull(order_id#13)\n","\n","(3) BroadcastExchange\n","Input [1]: [order_id#13]\n","Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=221]\n","\n","(4) Scan csv \n","Output [1]: [order_id#17]\n","Batched: false\n","Location: InMemoryFileIndex [file:/content/drive/MyDrive/ESIEE/E4FD/Cours/P1/Data_Engineering/LAB_DE/Data/lab2_order_items.csv]\n","PushedFilters: [IsNotNull(order_id)]\n","ReadSchema: struct<order_id:int>\n","\n","(5) Filter\n","Input [1]: [order_id#17]\n","Condition : isnotnull(order_id#17)\n","\n","(6) BroadcastHashJoin\n","Left keys [1]: [order_id#13]\n","Right keys [1]: [order_id#17]\n","Join type: Inner\n","Join condition: None\n","\n","(7) Project\n","Output [1]: [order_id#13]\n","Input [2]: [order_id#13, order_id#17]\n","\n","(8) HashAggregate\n","Input [1]: [order_id#13]\n","Keys [1]: [order_id#13]\n","Functions: []\n","Aggregate Attributes: []\n","Results [1]: [order_id#13]\n","\n","(9) Exchange\n","Input [1]: [order_id#13]\n","Arguments: hashpartitioning(order_id#13, 200), ENSURE_REQUIREMENTS, [plan_id=226]\n","\n","(10) HashAggregate\n","Input [1]: [order_id#13]\n","Keys [1]: [order_id#13]\n","Functions: []\n","Aggregate Attributes: []\n","Results [1]: [order_id#13]\n","\n","(11) AdaptiveSparkPlan\n","Output [1]: [order_id#13]\n","Arguments: isFinalPlan=false\n","\n","\n","Saved proof/plan_ingest.txt\n"]}],"source":["ingest = orders.join(order_items, \"order_id\").select(\"order_id\").distinct()\n","ingest.explain(\"formatted\")\n","from datetime import datetime as _dt\n","import pathlib\n","pathlib.Path(\"proof\").mkdir(exist_ok=True)\n","with open(\"proof/plan_ingest.txt\",\"w\") as f:\n","    f.write(str(_dt.now())+\"\\n\")\n","    f.write(ingest._jdf.queryExecution().executedPlan().toString())\n","print(\"Saved proof/plan_ingest.txt\")\n"],"id":"MWneAYFFF6nU"},{"cell_type":"markdown","metadata":{"id":"FcAS_UGlF6nU"},"source":["## 2. Surrogate key function"],"id":"FcAS_UGlF6nU"},{"cell_type":"code","execution_count":7,"metadata":{"id":"juIAt2exF6nU","executionInfo":{"status":"ok","timestamp":1767094702678,"user_tz":-180,"elapsed":8,"user":{"displayName":"Yannick PRAT","userId":"15153610905305463248"}}},"outputs":[],"source":["def sk(cols):\n","    # stable 64-bit positive surrogate key from natural keys\n","    return F.abs(F.xxhash64(*[F.col(c) for c in cols]))\n"],"id":"juIAt2exF6nU"},{"cell_type":"markdown","metadata":{"id":"Y3lOeL3tF6nV"},"source":["## 3. Build dimensions"],"id":"Y3lOeL3tF6nV"},{"cell_type":"code","execution_count":8,"metadata":{"id":"gKu3fd9XF6nV","executionInfo":{"status":"ok","timestamp":1767094702693,"user_tz":-180,"elapsed":14,"user":{"displayName":"Yannick PRAT","userId":"15153610905305463248"}}},"outputs":[],"source":["dim_customer = customers.select(\n","    sk([\"customer_id\"]).alias(\"customer_sk\"),\n","    \"customer_id\",\"name\",\"email\",\"created_at\"\n",")\n","\n","dim_brand = brands.select(\n","    sk([\"brand_id\"]).alias(\"brand_sk\"),\n","    \"brand_id\",\"brand_name\"\n",")\n","\n","dim_category = categories.select(\n","    sk([\"category_id\"]).alias(\"category_sk\"),\n","    \"category_id\",\"category_name\"\n",")\n","\n","dim_product = products.select(\n","    sk([\"product_id\"]).alias(\"product_sk\"),\n","    \"product_id\",\"product_name\",\n","    sk([\"brand_id\"]).alias(\"brand_sk\"),\n","    sk([\"category_id\"]).alias(\"category_sk\"),\n","    \"price\"\n",")\n"],"id":"gKu3fd9XF6nV"},{"cell_type":"markdown","source":["La cellule transforme tes tables de base en tables de dimension (avec la colonne _sk). C'est cette étape qui crée la structure en étoile."],"metadata":{"id":"bYCvDzGXMxSf"},"id":"bYCvDzGXMxSf"},{"cell_type":"markdown","metadata":{"id":"F1mFP8EoF6nV"},"source":["## 4. Build date dimension"],"id":"F1mFP8EoF6nV"},{"cell_type":"code","execution_count":9,"metadata":{"id":"a2Lhmfp6F6nV","executionInfo":{"status":"ok","timestamp":1767094702768,"user_tz":-180,"elapsed":73,"user":{"displayName":"Yannick PRAT","userId":"15153610905305463248"}}},"outputs":[],"source":["from pyspark.sql import Window as W\n","dates = orders.select(F.to_date(\"order_date\").alias(\"date\")).distinct()\n","dim_date = dates.select(\n","    sk([\"date\"]).alias(\"date_sk\"),\n","    F.col(\"date\"),\n","    F.year(\"date\").alias(\"year\"),\n","    F.month(\"date\").alias(\"month\"),\n","    F.dayofmonth(\"date\").alias(\"day\"),\n","    F.date_format(\"date\",\"E\").alias(\"dow\")\n",")\n"],"id":"a2Lhmfp6F6nV"},{"cell_type":"markdown","metadata":{"id":"G9TQ4iniF6nV"},"source":["## 5. Build fact_sales with broadcast joins where appropriate"],"id":"G9TQ4iniF6nV"},{"cell_type":"code","execution_count":10,"metadata":{"colab":{"base_uri":"https://localhost:8080/"},"id":"q1F86lSFF6nV","executionInfo":{"status":"ok","timestamp":1767094703487,"user_tz":-180,"elapsed":716,"user":{"displayName":"Yannick PRAT","userId":"15153610905305463248"}},"outputId":"76351f03-bc81-432f-a0f9-f22104ef9547"},"outputs":[{"output_type":"stream","name":"stdout","text":["== Physical Plan ==\n","AdaptiveSparkPlan (19)\n","+- Project (18)\n","   +- Project (17)\n","      +- BroadcastHashJoin Inner BuildRight (16)\n","         :- Project (12)\n","         :  +- BroadcastHashJoin Inner BuildRight (11)\n","         :     :- Project (7)\n","         :     :  +- BroadcastHashJoin Inner BuildRight (6)\n","         :     :     :- Filter (2)\n","         :     :     :  +- Scan csv  (1)\n","         :     :     +- BroadcastExchange (5)\n","         :     :        +- Filter (4)\n","         :     :           +- Scan csv  (3)\n","         :     +- BroadcastExchange (10)\n","         :        +- Filter (9)\n","         :           +- Scan csv  (8)\n","         +- BroadcastExchange (15)\n","            +- Filter (14)\n","               +- Scan csv  (13)\n","\n","\n","(1) Scan csv \n","Output [4]: [order_id#17, product_id#18, quantity#19, unit_price#20]\n","Batched: false\n","Location: InMemoryFileIndex [file:/content/drive/MyDrive/ESIEE/E4FD/Cours/P1/Data_Engineering/LAB_DE/Data/lab2_order_items.csv]\n","PushedFilters: [IsNotNull(product_id), IsNotNull(order_id)]\n","ReadSchema: struct<order_id:int,product_id:int,quantity:int,unit_price:double>\n","\n","(2) Filter\n","Input [4]: [order_id#17, product_id#18, quantity#19, unit_price#20]\n","Condition : (isnotnull(product_id#18) AND isnotnull(order_id#17))\n","\n","(3) Scan csv \n","Output [1]: [product_id#8]\n","Batched: false\n","Location: InMemoryFileIndex [file:/content/drive/MyDrive/ESIEE/E4FD/Cours/P1/Data_Engineering/LAB_DE/Data/lab2_products.csv]\n","PushedFilters: [IsNotNull(product_id)]\n","ReadSchema: struct<product_id:int>\n","\n","(4) Filter\n","Input [1]: [product_id#8]\n","Condition : isnotnull(product_id#8)\n","\n","(5) BroadcastExchange\n","Input [1]: [product_id#8]\n","Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=295]\n","\n","(6) BroadcastHashJoin\n","Left keys [1]: [product_id#18]\n","Right keys [1]: [product_id#8]\n","Join type: Inner\n","Join condition: None\n","\n","(7) Project\n","Output [4]: [product_id#18, order_id#17, quantity#19, unit_price#20]\n","Input [5]: [order_id#17, product_id#18, quantity#19, unit_price#20, product_id#8]\n","\n","(8) Scan csv \n","Output [3]: [order_id#13, customer_id#14, order_date#15]\n","Batched: false\n","Location: InMemoryFileIndex [file:/content/drive/MyDrive/ESIEE/E4FD/Cours/P1/Data_Engineering/LAB_DE/Data/lab2_orders.csv]\n","PushedFilters: [IsNotNull(order_id), IsNotNull(customer_id)]\n","ReadSchema: struct<order_id:int,customer_id:int,order_date:timestamp>\n","\n","(9) Filter\n","Input [3]: [order_id#13, customer_id#14, order_date#15]\n","Condition : (isnotnull(order_id#13) AND isnotnull(customer_id#14))\n","\n","(10) BroadcastExchange\n","Input [3]: [order_id#13, customer_id#14, order_date#15]\n","Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=299]\n","\n","(11) BroadcastHashJoin\n","Left keys [1]: [order_id#17]\n","Right keys [1]: [order_id#13]\n","Join type: Inner\n","Join condition: None\n","\n","(12) Project\n","Output [6]: [order_id#17, product_id#18, quantity#19, unit_price#20, customer_id#14, order_date#15]\n","Input [7]: [product_id#18, order_id#17, quantity#19, unit_price#20, order_id#13, customer_id#14, order_date#15]\n","\n","(13) Scan csv \n","Output [1]: [customer_id#0]\n","Batched: false\n","Location: InMemoryFileIndex [file:/content/drive/MyDrive/ESIEE/E4FD/Cours/P1/Data_Engineering/LAB_DE/Data/lab2_customers.csv]\n","PushedFilters: [IsNotNull(customer_id)]\n","ReadSchema: struct<customer_id:int>\n","\n","(14) Filter\n","Input [1]: [customer_id#0]\n","Condition : isnotnull(customer_id#0)\n","\n","(15) BroadcastExchange\n","Input [1]: [customer_id#0]\n","Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=303]\n","\n","(16) BroadcastHashJoin\n","Left keys [1]: [customer_id#14]\n","Right keys [1]: [customer_id#0]\n","Join type: Inner\n","Join condition: None\n","\n","(17) Project\n","Output [6]: [customer_id#14, order_id#17, product_id#18, quantity#19, unit_price#20, cast(order_date#15 as date) AS date#84]\n","Input [7]: [order_id#17, product_id#18, quantity#19, unit_price#20, customer_id#14, order_date#15, customer_id#0]\n","\n","(18) Project\n","Output [9]: [order_id#17, abs(xxhash64(date#84, 42)) AS date_sk#85L, abs(xxhash64(customer_id#14, 42)) AS customer_sk#86L, abs(xxhash64(product_id#18, 42)) AS product_sk#87L, quantity#19, unit_price#20, (cast(quantity#19 as double) * unit_price#20) AS subtotal#90, year(date#84) AS year#91, month(date#84) AS month#92]\n","Input [6]: [customer_id#14, order_id#17, product_id#18, quantity#19, unit_price#20, date#84]\n","\n","(19) AdaptiveSparkPlan\n","Output [9]: [order_id#17, date_sk#85L, customer_sk#86L, product_sk#87L, quantity#19, unit_price#20, subtotal#90, year#91, month#92]\n","Arguments: isFinalPlan=false\n","\n","\n","Saved proof/plan_fact_join.txt\n"]}],"source":["oi = order_items.alias(\"oi\")\n","p = products.alias(\"p\")\n","o = orders.alias(\"o\")\n","c = customers.alias(\"c\")\n","\n","# 1. On fait les jointures\n","df_fact = (oi\n","    .join(p, \"product_id\") # En utilisant juste \"product_id\" entre guillemets, Spark règle l'ambiguïté tout seul\n","    .join(o, \"order_id\")\n","    .join(c, \"customer_id\")\n","    .withColumn(\"date\", F.to_date(\"order_date\"))\n",")\n","\n","# 2. On attache les clés (Surrogate Keys)\n","df_fact = (df_fact\n","    .withColumn(\"date_sk\", sk([\"date\"]))\n","    .withColumn(\"customer_sk\", sk([\"customer_id\"]))\n","    .withColumn(\"product_sk\", sk([\"product_id\"])) # Ici, plus d'ambiguïté grâce au join(\"product_id\")\n","    .withColumn(\"quantity\", F.col(\"quantity\").cast(\"int\"))\n","    .withColumn(\"unit_price\", F.col(\"unit_price\").cast(\"double\"))\n","    .withColumn(\"subtotal\", F.col(\"quantity\") * F.col(\"unit_price\"))\n","    .withColumn(\"year\", F.year(\"date\"))\n","    .withColumn(\"month\", F.month(\"date\"))\n","    .select(\"order_id\", \"date_sk\", \"customer_sk\", \"product_sk\", \"quantity\", \"unit_price\", \"subtotal\", \"year\", \"month\")\n",")\n","\n","# 3. Affichage et sauvegarde du plan\n","df_fact.explain(\"formatted\")\n","pathlib.Path(\"proof\").mkdir(exist_ok=True)\n","with open(\"proof/plan_fact_join.txt\",\"w\") as f:\n","    from datetime import datetime as _dt\n","    f.write(str(_dt.now())+\"\\n\")\n","    f.write(df_fact._jdf.queryExecution().executedPlan().toString())\n","\n","print(\"Saved proof/plan_fact_join.txt\")"],"id":"q1F86lSFF6nV"},{"cell_type":"markdown","metadata":{"id":"GJPTKG3QF6nW"},"source":["## 6. Write Parquet outputs (partitioned by year, month)"],"id":"GJPTKG3QF6nW"},{"cell_type":"code","execution_count":11,"metadata":{"colab":{"base_uri":"https://localhost:8080/"},"id":"levq-qpvF6nW","executionInfo":{"status":"ok","timestamp":1767094709703,"user_tz":-180,"elapsed":6212,"user":{"displayName":"Yannick PRAT","userId":"15153610905305463248"}},"outputId":"8f922b77-a209-459c-e290-252a4862ec95"},"outputs":[{"output_type":"stream","name":"stdout","text":["Parquet written under outputs/lab2/\n"]}],"source":["base_out = \"outputs/lab2\"\n","(dim_customer.write.mode(\"overwrite\").parquet(f\"{base_out}/dim_customer\"))\n","(dim_brand.write.mode(\"overwrite\").parquet(f\"{base_out}/dim_brand\"))\n","(dim_category.write.mode(\"overwrite\").parquet(f\"{base_out}/dim_category\"))\n","(dim_product.write.mode(\"overwrite\").parquet(f\"{base_out}/dim_product\"))\n","(dim_date.write.mode(\"overwrite\").parquet(f\"{base_out}/dim_date\"))\n","(df_fact.write.mode(\"overwrite\").partitionBy(\"year\",\"month\").parquet(f\"{base_out}/fact_sales\"))\n","print(\"Parquet written under outputs/lab2/\")\n"],"id":"levq-qpvF6nW"},{"cell_type":"markdown","metadata":{"id":"7_1LpBt8F6nW"},"source":["## 7. Plan comparison: projection and layout"],"id":"7_1LpBt8F6nW"},{"cell_type":"code","execution_count":12,"metadata":{"colab":{"base_uri":"https://localhost:8080/"},"id":"jrZ7A7fJF6nW","executionInfo":{"status":"ok","timestamp":1767094712408,"user_tz":-180,"elapsed":2700,"user":{"displayName":"Yannick PRAT","userId":"15153610905305463248"}},"outputId":"c850f62a-dc13-4666-cc7b-cb632910a0f7"},"outputs":[{"output_type":"stream","name":"stdout","text":["== Physical Plan ==\n","AdaptiveSparkPlan (16)\n","+- HashAggregate (15)\n","   +- Exchange (14)\n","      +- HashAggregate (13)\n","         +- Project (12)\n","            +- BroadcastHashJoin Inner BuildRight (11)\n","               :- Project (7)\n","               :  +- BroadcastHashJoin Inner BuildLeft (6)\n","               :     :- BroadcastExchange (3)\n","               :     :  +- Filter (2)\n","               :     :     +- Scan csv  (1)\n","               :     +- Filter (5)\n","               :        +- Scan csv  (4)\n","               +- BroadcastExchange (10)\n","                  +- Filter (9)\n","                     +- Scan csv  (8)\n","\n","\n","(1) Scan csv \n","Output [2]: [order_id#13, order_date#15]\n","Batched: false\n","Location: InMemoryFileIndex [file:/content/drive/MyDrive/ESIEE/E4FD/Cours/P1/Data_Engineering/LAB_DE/Data/lab2_orders.csv]\n","PushedFilters: [IsNotNull(order_id)]\n","ReadSchema: struct<order_id:int,order_date:timestamp>\n","\n","(2) Filter\n","Input [2]: [order_id#13, order_date#15]\n","Condition : isnotnull(order_id#13)\n","\n","(3) BroadcastExchange\n","Input [2]: [order_id#13, order_date#15]\n","Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=820]\n","\n","(4) Scan csv \n","Output [3]: [order_id#17, product_id#18, quantity#19]\n","Batched: false\n","Location: InMemoryFileIndex [file:/content/drive/MyDrive/ESIEE/E4FD/Cours/P1/Data_Engineering/LAB_DE/Data/lab2_order_items.csv]\n","PushedFilters: [IsNotNull(order_id), IsNotNull(product_id)]\n","ReadSchema: struct<order_id:int,product_id:int,quantity:int>\n","\n","(5) Filter\n","Input [3]: [order_id#17, product_id#18, quantity#19]\n","Condition : (isnotnull(order_id#17) AND isnotnull(product_id#18))\n","\n","(6) BroadcastHashJoin\n","Left keys [1]: [order_id#13]\n","Right keys [1]: [order_id#17]\n","Join type: Inner\n","Join condition: None\n","\n","(7) Project\n","Output [3]: [order_date#15, product_id#18, quantity#19]\n","Input [5]: [order_id#13, order_date#15, order_id#17, product_id#18, quantity#19]\n","\n","(8) Scan csv \n","Output [2]: [product_id#8, price#12]\n","Batched: false\n","Location: InMemoryFileIndex [file:/content/drive/MyDrive/ESIEE/E4FD/Cours/P1/Data_Engineering/LAB_DE/Data/lab2_products.csv]\n","PushedFilters: [IsNotNull(product_id)]\n","ReadSchema: struct<product_id:int,price:double>\n","\n","(9) Filter\n","Input [2]: [product_id#8, price#12]\n","Condition : isnotnull(product_id#8)\n","\n","(10) BroadcastExchange\n","Input [2]: [product_id#8, price#12]\n","Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=824]\n","\n","(11) BroadcastHashJoin\n","Left keys [1]: [product_id#18]\n","Right keys [1]: [product_id#8]\n","Join type: Inner\n","Join condition: None\n","\n","(12) Project\n","Output [3]: [quantity#19, price#12, cast(order_date#15 as date) AS _groupingexpression#130]\n","Input [5]: [order_date#15, product_id#18, quantity#19, product_id#8, price#12]\n","\n","(13) HashAggregate\n","Input [3]: [quantity#19, price#12, _groupingexpression#130]\n","Keys [1]: [_groupingexpression#130]\n","Functions [1]: [partial_sum((cast(quantity#19 as double) * price#12))]\n","Aggregate Attributes [1]: [sum#131]\n","Results [2]: [_groupingexpression#130, sum#132]\n","\n","(14) Exchange\n","Input [2]: [_groupingexpression#130, sum#132]\n","Arguments: hashpartitioning(_groupingexpression#130, 200), ENSURE_REQUIREMENTS, [plan_id=829]\n","\n","(15) HashAggregate\n","Input [2]: [_groupingexpression#130, sum#132]\n","Keys [1]: [_groupingexpression#130]\n","Functions [1]: [sum((cast(quantity#19 as double) * price#12))]\n","Aggregate Attributes [1]: [sum((cast(quantity#19 as double) * price#12))#129]\n","Results [2]: [_groupingexpression#130 AS d#116, sum((cast(quantity#19 as double) * price#12))#129 AS gmv#117]\n","\n","(16) AdaptiveSparkPlan\n","Output [2]: [d#116, gmv#117]\n","Arguments: isFinalPlan=false\n","\n","\n","== Physical Plan ==\n","AdaptiveSparkPlan (16)\n","+- HashAggregate (15)\n","   +- Exchange (14)\n","      +- HashAggregate (13)\n","         +- Project (12)\n","            +- BroadcastHashJoin Inner BuildRight (11)\n","               :- Project (7)\n","               :  +- BroadcastHashJoin Inner BuildLeft (6)\n","               :     :- BroadcastExchange (3)\n","               :     :  +- Filter (2)\n","               :     :     +- Scan csv  (1)\n","               :     +- Filter (5)\n","               :        +- Scan csv  (4)\n","               +- BroadcastExchange (10)\n","                  +- Filter (9)\n","                     +- Scan csv  (8)\n","\n","\n","(1) Scan csv \n","Output [2]: [order_id#13, order_date#15]\n","Batched: false\n","Location: InMemoryFileIndex [file:/content/drive/MyDrive/ESIEE/E4FD/Cours/P1/Data_Engineering/LAB_DE/Data/lab2_orders.csv]\n","PushedFilters: [IsNotNull(order_id)]\n","ReadSchema: struct<order_id:int,order_date:timestamp>\n","\n","(2) Filter\n","Input [2]: [order_id#13, order_date#15]\n","Condition : isnotnull(order_id#13)\n","\n","(3) BroadcastExchange\n","Input [2]: [order_id#13, order_date#15]\n","Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=1195]\n","\n","(4) Scan csv \n","Output [3]: [order_id#17, product_id#18, quantity#19]\n","Batched: false\n","Location: InMemoryFileIndex [file:/content/drive/MyDrive/ESIEE/E4FD/Cours/P1/Data_Engineering/LAB_DE/Data/lab2_order_items.csv]\n","PushedFilters: [IsNotNull(order_id), IsNotNull(product_id)]\n","ReadSchema: struct<order_id:int,product_id:int,quantity:int>\n","\n","(5) Filter\n","Input [3]: [order_id#17, product_id#18, quantity#19]\n","Condition : (isnotnull(order_id#17) AND isnotnull(product_id#18))\n","\n","(6) BroadcastHashJoin\n","Left keys [1]: [order_id#13]\n","Right keys [1]: [order_id#17]\n","Join type: Inner\n","Join condition: None\n","\n","(7) Project\n","Output [3]: [order_date#15, product_id#18, quantity#19]\n","Input [5]: [order_id#13, order_date#15, order_id#17, product_id#18, quantity#19]\n","\n","(8) Scan csv \n","Output [2]: [product_id#8, price#12]\n","Batched: false\n","Location: InMemoryFileIndex [file:/content/drive/MyDrive/ESIEE/E4FD/Cours/P1/Data_Engineering/LAB_DE/Data/lab2_products.csv]\n","PushedFilters: [IsNotNull(product_id)]\n","ReadSchema: struct<product_id:int,price:double>\n","\n","(9) Filter\n","Input [2]: [product_id#8, price#12]\n","Condition : isnotnull(product_id#8)\n","\n","(10) BroadcastExchange\n","Input [2]: [product_id#8, price#12]\n","Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=1199]\n","\n","(11) BroadcastHashJoin\n","Left keys [1]: [product_id#18]\n","Right keys [1]: [product_id#8]\n","Join type: Inner\n","Join condition: None\n","\n","(12) Project\n","Output [3]: [quantity#19, price#12, cast(order_date#15 as date) AS _groupingexpression#153]\n","Input [5]: [order_date#15, product_id#18, quantity#19, product_id#8, price#12]\n","\n","(13) HashAggregate\n","Input [3]: [quantity#19, price#12, _groupingexpression#153]\n","Keys [1]: [_groupingexpression#153]\n","Functions [1]: [partial_sum((cast(quantity#19 as double) * price#12))]\n","Aggregate Attributes [1]: [sum#154]\n","Results [2]: [_groupingexpression#153, sum#155]\n","\n","(14) Exchange\n","Input [2]: [_groupingexpression#153, sum#155]\n","Arguments: hashpartitioning(_groupingexpression#153, 200), ENSURE_REQUIREMENTS, [plan_id=1204]\n","\n","(15) HashAggregate\n","Input [2]: [_groupingexpression#153, sum#155]\n","Keys [1]: [_groupingexpression#153]\n","Functions [1]: [sum((cast(quantity#19 as double) * price#12))]\n","Aggregate Attributes [1]: [sum((cast(quantity#19 as double) * price#12))#152]\n","Results [2]: [_groupingexpression#153 AS d#145, sum((cast(quantity#19 as double) * price#12))#152 AS gmv#146]\n","\n","(16) AdaptiveSparkPlan\n","Output [2]: [d#145, gmv#146]\n","Arguments: isFinalPlan=false\n","\n","\n","Record Spark UI metrics for both runs in lab2_metrics_log.csv\n"]}],"source":["# Case A: join and then project\n","a = (orders.join(order_items, \"order_id\")\n","            .join(products, \"product_id\")\n","            .groupBy(F.to_date(\"order_date\").alias(\"d\"))\n","            .agg(F.sum(F.col(\"quantity\")*F.col(\"price\")).alias(\"gmv\")))\n","a.explain(\"formatted\")\n","_ = a.count()\n","\n","# Case B: project early\n","b = (orders.select(\"order_id\",\"order_date\")\n","            .join(order_items.select(\"order_id\",\"product_id\",\"quantity\"), \"order_id\")\n","            .join(products.select(\"product_id\",\"price\"), \"product_id\")\n","            .groupBy(F.to_date(\"order_date\").alias(\"d\"))\n","            .agg(F.sum(F.col(\"quantity\")*F.col(\"price\")).alias(\"gmv\")))\n","b.explain(\"formatted\")\n","_ = b.count()\n","\n","print(\"Record Spark UI metrics for both runs in lab2_metrics_log.csv\")\n"],"id":"jrZ7A7fJF6nW"},{"cell_type":"markdown","metadata":{"id":"L0bqPD0aF6nW"},"source":["## 8. Cleanup"],"id":"L0bqPD0aF6nW"},{"cell_type":"code","execution_count":13,"metadata":{"id":"LGx7hoGwF6nW","colab":{"base_uri":"https://localhost:8080/"},"executionInfo":{"status":"ok","timestamp":1767094713147,"user_tz":-180,"elapsed":730,"user":{"displayName":"Yannick PRAT","userId":"15153610905305463248"}},"outputId":"42884939-2b8d-4ffb-aa6c-43832192bb0a"},"outputs":[{"output_type":"stream","name":"stdout","text":["Spark session stopped.\n"]}],"source":["#spark.stop()\n","#print(\"Spark session stopped.\")\n"],"id":"LGx7hoGwF6nW"}],"metadata":{"kernelspec":{"display_name":"Python 3","name":"python3"},"colab":{"provenance":[]}},"nbformat":4,"nbformat_minor":5}