{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "2744ce5c",
   "metadata": {},
   "source": [
    "# DE2 — Lab 2: Text Processing — Inverted Index Pipeline (15%)\n",
    "> Author : Badr TAJINI - Data Engineering II (Data-Intensive Workloads) - ESIEE 2025-2026\n",
    "\n",
    "**Track:** C — Micromobility \n",
    "\n",
    "**Corpus:** descriptions de stations Citi Bike combinées avec des champs textuels provenant de NYC Open Data\n",
    "\n",
    "**Document unit:** Une station ou un enregistrement de données (un document = une station).\n",
    "\n",
    "**Goal:** Ingérer un corpus textuel, Tokeniser et normaliser le texte, Construire un index inversé, Mesurer la latence des requêtes, Comparer les performances de stockage Parquet vs CSV"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "id": "a6816221",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Spark version: 4.0.0\n",
      "Spark UI: http://127.0.0.1:4040\n",
      "Spark UI (WSL/Windows browser): http://localhost:4040\n"
     ]
    }
   ],
   "source": [
    "import os\n",
    "from urllib.parse import urlparse\n",
    "from pyspark.sql import SparkSession, functions as F\n",
    "from pyspark.sql.types import StructType, StructField, StringType, LongType\n",
    "import time, pathlib, csv as csv_module, io, sys\n",
    "\n",
    "DE2_SPARK_DRIVER_HOST = os.environ.get(\"DE2_SPARK_DRIVER_HOST\", \"127.0.0.1\")\n",
    "DE2_SPARK_BIND_ADDRESS = os.environ.get(\"DE2_SPARK_BIND_ADDRESS\", \"0.0.0.0\")\n",
    "os.environ.setdefault(\"SPARK_LOCAL_IP\", DE2_SPARK_DRIVER_HOST)\n",
    "\n",
    "\n",
    "def show_spark_ui(spark_session):\n",
    "    ui_url = spark_session.sparkContext.uiWebUrl\n",
    "    print(\"Spark version:\", spark_session.version)\n",
    "    if ui_url:\n",
    "        ui_port = urlparse(ui_url).port or 4040\n",
    "        print(\"Spark UI:\", ui_url)\n",
    "        print(\"Spark UI (WSL/Windows browser):\", f\"http://localhost:{ui_port}\")\n",
    "    else:\n",
    "        print(\"Spark UI: not available\")\n",
    "\n",
    "\n",
    "spark = SparkSession.builder \\\n",
    "    .appName(\"DE2-Lab2-TextProcessing-TrackC\") \\\n",
    "    .master(\"local[*]\") \\\n",
    "    .config(\"spark.driver.host\", DE2_SPARK_DRIVER_HOST) \\\n",
    "    .config(\"spark.driver.bindAddress\", DE2_SPARK_BIND_ADDRESS) \\\n",
    "    .config(\"spark.ui.bindAddress\", DE2_SPARK_BIND_ADDRESS) \\\n",
    "    .getOrCreate()\n",
    "\n",
    "show_spark_ui(spark)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "corpus_gen_md",
   "metadata": {},
   "source": [
    "## 0. Génération du corpus — Citi Bike + NYC Open Data (Track C)\n",
    "\n",
    "Nous construisons un corpus de 40 enregistrements combinant :\n",
    "\n",
    "- Descriptions des stations Citi Bike :\n",
    "nom de la station, borough, contexte du quartier, points d’intérêt à proximité\n",
    "\n",
    "- Champs textuels de NYC Open Data :\n",
    "descriptions de quartiers, notes sur les pistes cyclables, informations d’accessibilité\n",
    "\n",
    "Chaque ligne = un document représentant une station / un enregistrement.\n",
    "Cela reflète ce que l’on obtient depuis le flux GBFS de Citi Bike + l’API NYC Open Data."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "id": "corpus_gen",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Track C corpus saved → data/corpus/citibike_corpus.csv\n",
      "Total documents: 40\n",
      "Breakdown:\n",
      "  Bronx: 1 docs\n",
      "  Brooklyn: 11 docs\n",
      "  Citywide: 12 docs\n",
      "  Manhattan: 12 docs\n",
      "  Queens: 4 docs\n"
     ]
    }
   ],
   "source": [
    "pathlib.Path(\"data/corpus\").mkdir(parents=True, exist_ok=True)\n",
    "\n",
    "# Track C corpus: Citi Bike station descriptions + NYC Open Data text records\n",
    "# Each record: (doc_id, station_name, borough, text)\n",
    "citibike_corpus = [\n",
    "    # Manhattan stations\n",
    "    (\"stn_001\", \"W 21 St & 6 Ave\",              \"Manhattan\",\n",
    "     \"Citi Bike station located at West 21st Street and 6th Avenue in the Chelsea neighbourhood of Manhattan. \"\n",
    "     \"This high-traffic docking station serves commuters and tourists visiting the nearby High Line elevated park. \"\n",
    "     \"The station has 39 docking points and is accessible from the F and M subway lines. \"\n",
    "     \"Bike lanes connect this station to the Hudson River Greenway cycling path.\"),\n",
    "\n",
    "    (\"stn_002\", \"E 47 St & Park Ave\",            \"Manhattan\",\n",
    "     \"Station situated at East 47th Street and Park Avenue in Midtown Manhattan near Grand Central Terminal. \"\n",
    "     \"This is one of the busiest Citi Bike stations in the network due to heavy commuter foot traffic. \"\n",
    "     \"The docking station provides 51 bike slots and connects riders to multiple subway lines. \"\n",
    "     \"Surrounding streets feature protected bike lanes along 2nd Avenue corridor.\"),\n",
    "\n",
    "    (\"stn_003\", \"Broadway & W 56 St\",            \"Manhattan\",\n",
    "     \"Citi Bike docking station at Broadway and West 56th Street near Columbus Circle and Central Park South. \"\n",
    "     \"The station is heavily used by tourists exploring Central Park and the Theater District. \"\n",
    "     \"It offers 27 docking slots and connects riders to the A C B D subway lines at Columbus Circle. \"\n",
    "     \"The Broadway protected lane allows safe cycling northward toward the park.\"),\n",
    "\n",
    "    (\"stn_004\", \"Pershing Square North\",         \"Manhattan\",\n",
    "     \"Located adjacent to Grand Central Terminal at 42nd Street and Park Avenue. \"\n",
    "     \"One of the original Citi Bike stations launched in 2013, this dock serves thousands of daily riders. \"\n",
    "     \"The station provides 39 docking points in a prime midtown location surrounded by office towers. \"\n",
    "     \"Connectivity to the 4 5 6 7 and S trains makes this station a major multimodal hub.\"),\n",
    "\n",
    "    (\"stn_005\", \"W 41 St & 8 Ave\",              \"Manhattan\",\n",
    "     \"Station at West 41st Street and 8th Avenue in the Hell's Kitchen neighbourhood near the Port Authority Bus Terminal. \"\n",
    "     \"This station supports last-mile commuters arriving at the bus terminal or Penn Station. \"\n",
    "     \"Surrounding infrastructure includes a protected bike lane on 8th Avenue running south to Chelsea. \"\n",
    "     \"The station has 39 docking slots and is accessible to people with mobility impairments.\"),\n",
    "\n",
    "    (\"stn_006\", \"Lafayette St & E 8 St\",        \"Manhattan\",\n",
    "     \"Docking station in NoHo at Lafayette Street and East 8th Street near NYU and the East Village. \"\n",
    "     \"Popular with university students commuting between campuses and local residents. \"\n",
    "     \"The station connects to the Bowery protected bike lane heading south into Chinatown and Lower Manhattan. \"\n",
    "     \"27 docking slots available. Nearby subway access via N R W 4 5 6 lines at Astor Place.\"),\n",
    "\n",
    "    (\"stn_007\", \"W 13 St & 7 Ave\",              \"Manhattan\",\n",
    "     \"Located in Greenwich Village at West 13th Street and 7th Avenue South near the Meatpacking District. \"\n",
    "     \"This station serves a dense residential and commercial neighbourhood with active cycling culture. \"\n",
    "     \"Protected bike lanes on 7th Avenue South connect southward to Tribeca and the Financial District. \"\n",
    "     \"High weekend usage from visitors to the Whitney Museum and the High Line park.\"),\n",
    "\n",
    "    (\"stn_008\", \"Fulton St & William St\",       \"Manhattan\",\n",
    "     \"Financial District station near Fulton Center and the World Trade Center site in Lower Manhattan. \"\n",
    "     \"This station caters primarily to weekday business commuters and weekend tourists. \"\n",
    "     \"Connectivity to the A C J Z 2 3 4 5 subway lines at Fulton Street makes this a critical multimodal node. \"\n",
    "     \"The station links to the East River waterfront cycling path via dedicated infrastructure.\"),\n",
    "\n",
    "    (\"stn_009\", \"Spruce St & Nassau St\",        \"Manhattan\",\n",
    "     \"Station in Lower Manhattan near Spruce Street and Nassau Street adjacent to Pace University and City Hall Park. \"\n",
    "     \"Serves a mixed population of students, government workers, and commuters heading to the Financial District. \"\n",
    "     \"The station is integrated with the broader Lower Manhattan bike network. 27 docking slots available.\"),\n",
    "\n",
    "    (\"stn_010\", \"1 Ave & E 18 St\",              \"Manhattan\",\n",
    "     \"Station on 1st Avenue at East 18th Street in the Gramercy Park neighbourhood of Manhattan. \"\n",
    "     \"The 1st Avenue protected bike lane running the full length of Manhattan passes directly by this station. \"\n",
    "     \"High usage from residents of Stuyvesant Town and commuters heading to Midtown East. \"\n",
    "     \"The station provides 39 docking points and connects to the L train at 14th Street.\"),\n",
    "\n",
    "    # Brooklyn stations\n",
    "    (\"stn_011\", \"Degraw St & Smith St\",         \"Brooklyn\",\n",
    "     \"Citi Bike station in Cobble Hill Brooklyn at Degraw Street and Smith Street. \"\n",
    "     \"This residential neighbourhood station serves families and commuters heading to Manhattan via bike or subway. \"\n",
    "     \"The station sits near the F and G train lines at Bergen Street. \"\n",
    "     \"Smith Street is part of a designated bicycle route connecting Red Hook to Downtown Brooklyn.\"),\n",
    "\n",
    "    (\"stn_012\", \"Myrtle Ave & Classon Ave\",     \"Brooklyn\",\n",
    "     \"Station at Myrtle Avenue and Classon Avenue in Clinton Hill Brooklyn near Pratt Institute. \"\n",
    "     \"High usage from Pratt students and faculty commuting between the campus and the G train station. \"\n",
    "     \"The Myrtle Avenue bike lane connects this station toward Williamsburg and the Brooklyn waterfront. \"\n",
    "     \"39 docking slots. The station supports the City's Vision Zero safety initiative for cyclists.\"),\n",
    "\n",
    "    (\"stn_013\", \"Columbia St & Kane St\",        \"Brooklyn\",\n",
    "     \"Located in Columbia Street Waterfront District Brooklyn near Red Hook. \"\n",
    "     \"This station is popular with cyclists riding to Brooklyn Bridge Park and the waterfront esplanade. \"\n",
    "     \"Limited subway access in the area makes Citi Bike a critical transportation option for residents. \"\n",
    "     \"The station integrates with the Brooklyn waterfront greenway cycling path.\"),\n",
    "\n",
    "    (\"stn_014\", \"Grand Army Plaza\",             \"Brooklyn\",\n",
    "     \"Major Citi Bike station at Grand Army Plaza in Prospect Heights Brooklyn near Prospect Park. \"\n",
    "     \"This station serves as a gateway to the Prospect Park loop, a popular recreational cycling route. \"\n",
    "     \"High weekend usage from residents and tourists visiting the Brooklyn Museum and Botanic Garden. \"\n",
    "     \"The station is accessible from the 2 and 3 trains and has 51 docking slots.\"),\n",
    "\n",
    "    (\"stn_015\", \"Bedford Ave & Bergen St\",      \"Brooklyn\",\n",
    "     \"Station in Crown Heights Brooklyn at Bedford Avenue and Bergen Street. \"\n",
    "     \"Serves a diverse residential neighbourhood with growing cycling infrastructure. \"\n",
    "     \"Proximity to the 2 3 4 5 subway lines at Franklin Avenue and Eastern Parkway. \"\n",
    "     \"The Eastern Parkway bike lane passes nearby offering a protected east-west cycling corridor.\"),\n",
    "\n",
    "    (\"stn_016\", \"Wythe Ave & Metropolitan Ave\", \"Brooklyn\",\n",
    "     \"Station in Williamsburg Brooklyn at Wythe Avenue and Metropolitan Avenue near the East River waterfront. \"\n",
    "     \"One of the most active Citi Bike stations in Brooklyn serving the dense young professional population. \"\n",
    "     \"The station connects to the Kent Avenue waterfront bike path and the Williamsburg Bridge approach. \"\n",
    "     \"Subway access via the L train at Bedford Avenue makes this a major cycling-transit interchange.\"),\n",
    "\n",
    "    (\"stn_017\", \"Atlantic Ave & Fort Greene Pl\",\"Brooklyn\",\n",
    "     \"Station in Fort Greene Brooklyn at Atlantic Avenue and Fort Greene Place near Barclays Center. \"\n",
    "     \"Heavy event-driven usage during concerts, NBA games, and other Barclays Center events. \"\n",
    "     \"The station connects commuters to multiple subway lines: B D N R Q 2 3 4 5 at Atlantic Terminal. \"\n",
    "     \"Fort Greene Park is a short ride away along designated cycling streets.\"),\n",
    "\n",
    "    (\"stn_018\", \"Nostrand Ave & Fulton St\",     \"Brooklyn\",\n",
    "     \"Station in Bedford-Stuyvesant Brooklyn at Nostrand Avenue and Fulton Street. \"\n",
    "     \"Serves a historically significant neighbourhood experiencing active transportation investment. \"\n",
    "     \"The NYC DOT has installed sharrows and signage to improve cyclist safety in this area. \"\n",
    "     \"The station connects to the A and C lines at Nostrand Avenue.\"),\n",
    "\n",
    "    # Queens stations\n",
    "    (\"stn_019\", \"Jackson Ave & 46 Rd\",          \"Queens\",\n",
    "     \"Citi Bike station in Long Island City Queens at Jackson Avenue and 46th Road. \"\n",
    "     \"This station is part of the Queens expansion of the Citi Bike network launched in 2019. \"\n",
    "     \"Long Island City has seen rapid development and the station serves thousands of new residents. \"\n",
    "     \"The 7 train at Hunters Point Avenue and the E M G R trains at Court Square are nearby.\"),\n",
    "\n",
    "    (\"stn_020\", \"Queensboro Plaza\",             \"Queens\",\n",
    "     \"Station near Queensboro Plaza in Long Island City Queens serving commuters crossing to Manhattan. \"\n",
    "     \"The Ed Koch Queensboro Bridge shared cycling path connects this station to Manhattan's 2nd Avenue corridor. \"\n",
    "     \"The N W and 7 trains at Queensboro Plaza provide direct multimodal connectivity. \"\n",
    "     \"51 docking slots. One of the busiest stations in Queens.\"),\n",
    "\n",
    "    # NYC Open Data neighbourhood records\n",
    "    (\"nyc_001\", \"Chelsea Neighbourhood Profile\", \"Manhattan\",\n",
    "     \"Chelsea is a neighbourhood on the west side of the borough of Manhattan in New York City. \"\n",
    "     \"The area is bounded roughly by 14th Street to the south, 34th Street to the north, the Hudson River to the west, and 6th Avenue to the east. \"\n",
    "     \"Chelsea is known for its art galleries, the High Line elevated park, and Hudson Yards development. \"\n",
    "     \"The neighbourhood has extensive cycling infrastructure including protected lanes on 8th and 9th Avenues and strong Citi Bike ridership.\"),\n",
    "\n",
    "    (\"nyc_002\", \"Williamsburg Neighbourhood Profile\", \"Brooklyn\",\n",
    "     \"Williamsburg is a neighbourhood in the northern part of Brooklyn bordered by Greenpoint to the north, Bushwick to the east, and the East River to the west. \"\n",
    "     \"The neighbourhood is home to a large young professional and artist population and has seen dramatic gentrification since the early 2000s. \"\n",
    "     \"Cycling infrastructure includes the Kent Avenue waterfront greenway and protected lanes on Bedford Avenue. \"\n",
    "     \"The Williamsburg Bridge provides a major cycling connection to Manhattan's Lower East Side.\"),\n",
    "\n",
    "    (\"nyc_003\", \"Long Island City Development Profile\", \"Queens\",\n",
    "     \"Long Island City is the westernmost neighbourhood of Queens located directly across the East River from Midtown Manhattan. \"\n",
    "     \"The neighbourhood has experienced rapid residential and commercial development over the past decade. \"\n",
    "     \"Active transportation infrastructure includes the Queens Plaza protected bike lane and connections to the Queensboro Bridge cycling path. \"\n",
    "     \"Citi Bike expanded into Long Island City in 2019 to serve the growing population and reduce car dependency.\"),\n",
    "\n",
    "    (\"nyc_004\", \"NYC Bike Lane Infrastructure Report\", \"Citywide\",\n",
    "     \"New York City has over 1400 miles of bike lanes as of 2024 including protected lanes, painted lanes, and shared bicycle routes. \"\n",
    "     \"The NYC DOT continues to expand the cycling network under the NYC Streets Plan which aims to add 250 miles of protected bike lanes by 2026. \"\n",
    "     \"Citi Bike stations are strategically placed at major transit hubs, residential areas, and commercial corridors. \"\n",
    "     \"Protected intersections and daylighting improvements support safer cycling across all five boroughs.\"),\n",
    "\n",
    "    (\"nyc_005\", \"Vision Zero Cycling Safety Initiative\", \"Citywide\",\n",
    "     \"Vision Zero is New York City's initiative to eliminate all traffic fatalities and serious injuries by improving street design. \"\n",
    "     \"Cycling safety measures include protected bike lanes, parking protected lanes, improved signal timing for cyclists, and daylighting at intersections. \"\n",
    "     \"The city tracks cycling injuries and fatalities by borough and neighbourhood to prioritize safety investments. \"\n",
    "     \"Citi Bike ridership data is used to identify high-demand corridors requiring infrastructure upgrades.\"),\n",
    "\n",
    "    (\"nyc_006\", \"Citi Bike System Overview 2024\", \"Citywide\",\n",
    "     \"Citi Bike is New York City's official bike share system operated by Lyft. \"\n",
    "     \"The system operates over 2000 stations and 40000 bikes including classic pedal bikes and electric bikes across Manhattan, Brooklyn, Queens, and the Bronx. \"\n",
    "     \"Annual membership provides unlimited 45-minute classic bike rides and discounted electric bike minutes. \"\n",
    "     \"The system processes over 100000 trips on peak summer days. Real-time station availability is provided via the GBFS API.\"),\n",
    "\n",
    "    (\"nyc_007\", \"Hudson River Greenway Cycling Path\", \"Manhattan\",\n",
    "     \"The Hudson River Greenway is a 13-mile protected cycling and pedestrian path running along the western edge of Manhattan from Battery Park to Inwood. \"\n",
    "     \"The greenway is the busiest cycling path in the United States with over 5 million cyclists annually. \"\n",
    "     \"Multiple Citi Bike stations are located adjacent to the greenway providing direct access for bike share users. \"\n",
    "     \"The path connects major recreational destinations including Riverside Park, Hudson Yards, the High Line, and the World Trade Center.\"),\n",
    "\n",
    "    (\"nyc_008\", \"Brooklyn Bridge Cycling Access\", \"Brooklyn\",\n",
    "     \"The Brooklyn Bridge shared path provides a cycling and pedestrian connection between Lower Manhattan and DUMBO Brooklyn. \"\n",
    "     \"The path is popular with commuters and tourists but is relatively narrow requiring careful navigation during peak hours. \"\n",
    "     \"The NYC DOT has studied options for expanding cycling capacity on the bridge corridor. \"\n",
    "     \"Citi Bike stations on both sides of the bridge support first and last mile connectivity for cyclists.\"),\n",
    "\n",
    "    (\"nyc_009\", \"Electric Bike Expansion Program\", \"Citywide\",\n",
    "     \"Citi Bike began deploying electric pedal-assist bikes in 2018 and has since grown the e-bike fleet to over 10000 bikes. \"\n",
    "     \"Electric bikes allow riders to travel longer distances and tackle hills more easily increasing cycling accessibility. \"\n",
    "     \"E-bike rides cost more per minute than classic bikes but are popular for longer commutes across boroughs. \"\n",
    "     \"The expansion of e-bikes has particularly benefited outer borough riders in Queens and Brooklyn who face longer distances to Manhattan.\"),\n",
    "\n",
    "    (\"nyc_010\", \"Docking Station Capacity Planning\", \"Citywide\",\n",
    "     \"Citi Bike station capacity planning is based on trip origin and destination data, population density, and proximity to transit. \"\n",
    "     \"High-demand stations near transit hubs such as Grand Central, Penn Station, and Atlantic Terminal require larger docking capacity. \"\n",
    "     \"The rebalancing operations team moves bikes between stations using cargo vehicles to ensure availability. \"\n",
    "     \"Station expansion is coordinated with NYC DOT sidewalk permits and community board approval processes.\"),\n",
    "\n",
    "    (\"nyc_011\", \"Prospect Park Cycling Loop\", \"Brooklyn\",\n",
    "     \"Prospect Park in Brooklyn features a 3.35-mile car-free loop road that is one of the most popular cycling destinations in New York City. \"\n",
    "     \"The loop is closed to motor traffic since 2018 and accommodates cyclists, pedestrians, and runners throughout the day. \"\n",
    "     \"Multiple Citi Bike stations around the park perimeter at Grand Army Plaza, Vanderbilt Street, and 9th Avenue provide convenient access. \"\n",
    "     \"Weekend ridership to the park spikes significantly particularly during summer months.\"),\n",
    "\n",
    "    (\"nyc_012\", \"Queens Cycling Infrastructure\", \"Queens\",\n",
    "     \"Queens has seen significant cycling infrastructure investment since 2019 with new protected lanes in Long Island City, Astoria, and Jackson Heights. \"\n",
    "     \"The Queens Plaza redesign created protected bike lanes with dedicated signal phases improving cyclist safety at a historically dangerous intersection. \"\n",
    "     \"Citi Bike expansion into Queens has been accompanied by bike lane construction to ensure safe station access. \"\n",
    "     \"The borough's diverse and dense neighbourhoods offer strong potential for cycling growth as infrastructure improves.\"),\n",
    "\n",
    "    (\"nyc_013\", \"Multimodal Transit Integration\", \"Citywide\",\n",
    "     \"New York City's transportation strategy emphasizes multimodal connectivity between cycling, subway, bus, and commuter rail. \"\n",
    "     \"Citi Bike stations are deliberately sited within 100 metres of subway entrances to facilitate first and last mile cycling. \"\n",
    "     \"The Lyft and MTA partnership allows Citi Bike annual members to receive discounted subway fares in some programmes. \"\n",
    "     \"Bike parking facilities at select LIRR and Metro-North stations further extend multimodal reach.\"),\n",
    "\n",
    "    (\"nyc_014\", \"Cycling Data and Analytics\", \"Citywide\",\n",
    "     \"Citi Bike publishes monthly trip data as open data including start and end station, duration, bike type, and user type. \"\n",
    "     \"This data is widely used by researchers, urban planners, and data engineers for transport analysis and modelling. \"\n",
    "     \"The GBFS real-time feed provides current docking station status, available bikes, and available docks. \"\n",
    "     \"NYC Open Data portal hosts historical station information, bike lane geometries, and Vision Zero safety metrics.\"),\n",
    "\n",
    "    (\"nyc_015\", \"Bronx Citi Bike Expansion\", \"Bronx\",\n",
    "     \"Citi Bike began expanding into the South Bronx in 2023 adding stations in Mott Haven, Port Morris, and Hunts Point. \"\n",
    "     \"The expansion prioritizes environmental justice by bringing bike share to historically underserved communities. \"\n",
    "     \"Community engagement sessions with local residents shaped station placement and programme design. \"\n",
    "     \"The South Bronx greenway and waterfront cycling paths provide key cycling infrastructure in the area.\"),\n",
    "\n",
    "    (\"nyc_016\", \"Cargo Bike Pilot Programme\", \"Citywide\",\n",
    "     \"NYC DOT operates a cargo bike pilot programme supporting last-mile delivery businesses in dense urban areas. \"\n",
    "     \"Electric cargo bikes can replace delivery vans for small package delivery reducing congestion and emissions. \"\n",
    "     \"Several programme participants operate from Citi Bike-adjacent zones in Midtown and Lower Manhattan. \"\n",
    "     \"The programme aligns with the city's goals of reducing vehicle miles traveled and improving air quality.\"),\n",
    "\n",
    "    (\"nyc_017\", \"Accessibility and Adaptive Cycling\", \"Citywide\",\n",
    "     \"NYC provides adaptive cycling programmes for people with disabilities through organisations like Bike New York and the NYC Parks Department. \"\n",
    "     \"Citi Bike stations are required to meet ADA accessibility standards for pavement and approach paths. \"\n",
    "     \"Hand cycles and other adaptive equipment are available at selected locations in Central Park and Prospect Park. \"\n",
    "     \"The city tracks accessible cycling infrastructure as part of its disability inclusion reporting.\"),\n",
    "\n",
    "    (\"nyc_018\", \"Station Rebalancing Operations\", \"Citywide\",\n",
    "     \"Citi Bike rebalancing involves redistributing bikes between stations to match supply with demand throughout the day. \"\n",
    "     \"Demand patterns show strong morning flows from outer boroughs to Manhattan and reverse evening flows. \"\n",
    "     \"Rebalancing is performed by a fleet of electric cargo vans and bike trailers operating 24 hours a day. \"\n",
    "     \"Predictive algorithms using historical trip data and weather forecasts optimise rebalancing routes and timing.\"),\n",
    "\n",
    "    (\"nyc_019\", \"Climate and Seasonal Ridership Patterns\", \"Citywide\",\n",
    "     \"Citi Bike ridership follows strong seasonal patterns with peak usage in summer months and lower ridership in winter. \"\n",
    "     \"Temperature, precipitation, and wind speed are the strongest predictors of daily trip volume. \"\n",
    "     \"The introduction of electric bikes has reduced but not eliminated the winter ridership drop. \"\n",
    "     \"Weather data from NOAA is used alongside trip data in transport demand modelling by urban researchers.\"),\n",
    "\n",
    "    (\"nyc_020\", \"Cycling and Public Health Benefits\", \"Citywide\",\n",
    "     \"Research shows that regular cycling provides significant public health benefits including reduced cardiovascular disease risk and improved mental health. \"\n",
    "     \"NYC health agencies track active transport as a public health indicator alongside other lifestyle factors. \"\n",
    "     \"Citi Bike ridership in low-income neighbourhoods has been shown to increase access to jobs and healthcare facilities. \"\n",
    "     \"The environmental benefits include reduced carbon emissions and improved air quality compared to car trips.\"),\n",
    "]\n",
    "\n",
    "corpus_path = \"data/corpus/citibike_corpus.csv\"\n",
    "with open(corpus_path, \"w\", newline=\"\", encoding=\"utf-8\") as f:\n",
    "    writer = csv_module.writer(f)\n",
    "    writer.writerow([\"doc_id\", \"station_name\", \"borough\", \"text\"])\n",
    "    writer.writerows(citibike_corpus)\n",
    "\n",
    "print(f\"Track C corpus saved → {corpus_path}\")\n",
    "print(f\"Total documents: {len(citibike_corpus)}\")\n",
    "print(\"Breakdown:\")\n",
    "boroughs = {}\n",
    "for row in citibike_corpus:\n",
    "    boroughs[row[2]] = boroughs.get(row[2], 0) + 1\n",
    "for b, c in sorted(boroughs.items()):\n",
    "    print(f\"  {b}: {c} docs\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "9762386a",
   "metadata": {},
   "source": [
    "## 1. Ingestion du corpus\n",
    "\n",
    "Charger le corpus Citi Bike + NYC Open Data avec un schéma explicite.\n",
    "Afficher :\n",
    "\n",
    "- le nombre de lignes\n",
    "\n",
    "- quelques exemples de lignes\n",
    "\n",
    "- la longueur moyenne des documents"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "id": "36c9123b",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Total documents loaded: 40\n",
      "Avg document length : 410 chars\n",
      "Min document length : 320 chars\n",
      "Max document length : 526 chars\n",
      "\n",
      "--- Sample documents ---\n",
      "+-------+----------------------+---------+-------+\n",
      "| doc_id|          station_name|  borough|doc_len|\n",
      "+-------+----------------------+---------+-------+\n",
      "|stn_001|       W 21 St & 6 Ave|Manhattan|    370|\n",
      "|stn_002|    E 47 St & Park Ave|Manhattan|    364|\n",
      "|stn_003|    Broadway & W 56 St|Manhattan|    362|\n",
      "|stn_004| Pershing Square North|Manhattan|    355|\n",
      "|stn_005|       W 41 St & 8 Ave|Manhattan|    390|\n",
      "|stn_006| Lafayette St & E 8 St|Manhattan|    367|\n",
      "|stn_007|       W 13 St & 7 Ave|Manhattan|    375|\n",
      "|stn_008|Fulton St & William St|Manhattan|    374|\n",
      "|stn_009| Spruce St & Nassau St|Manhattan|    320|\n",
      "|stn_010|       1 Ave & E 18 St|Manhattan|    362|\n",
      "+-------+----------------------+---------+-------+\n",
      "only showing top 10 rows\n",
      "--- Documents per borough ---\n",
      "+---------+-----+\n",
      "|  borough|count|\n",
      "+---------+-----+\n",
      "| Citywide|   12|\n",
      "|Manhattan|   12|\n",
      "| Brooklyn|   11|\n",
      "|   Queens|    4|\n",
      "|    Bronx|    1|\n",
      "+---------+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# Explicit schema — no schema inference to guarantee types\n",
    "schema = StructType([\n",
    "    StructField(\"doc_id\",       StringType(), False),\n",
    "    StructField(\"station_name\", StringType(), True),\n",
    "    StructField(\"borough\",      StringType(), True),\n",
    "    StructField(\"text\",         StringType(), True),\n",
    "])\n",
    "\n",
    "df_corpus = (\n",
    "    spark.read\n",
    "         .schema(schema)\n",
    "         .option(\"header\", \"true\")\n",
    "         .option(\"multiLine\", \"true\")   # handles text fields with embedded newlines\n",
    "         .option(\"escape\", '\"')\n",
    "         .csv(\"data/corpus/citibike_corpus.csv\")\n",
    ")\n",
    "\n",
    "n_docs = df_corpus.count()\n",
    "print(f\"Total documents loaded: {n_docs}\")\n",
    "\n",
    "# Add document length column (character count of the text field)\n",
    "df_corpus = df_corpus.withColumn(\"doc_len\", F.length(\"text\"))\n",
    "\n",
    "avg_len = df_corpus.select(F.avg(\"doc_len\")).first()[0]\n",
    "min_len = df_corpus.select(F.min(\"doc_len\")).first()[0]\n",
    "max_len = df_corpus.select(F.max(\"doc_len\")).first()[0]\n",
    "\n",
    "print(f\"Avg document length : {avg_len:.0f} chars\")\n",
    "print(f\"Min document length : {min_len} chars\")\n",
    "print(f\"Max document length : {max_len} chars\")\n",
    "\n",
    "print(\"\\n--- Sample documents ---\")\n",
    "df_corpus.select(\"doc_id\", \"station_name\", \"borough\", \"doc_len\").show(10, truncate=40)\n",
    "\n",
    "print(\"--- Documents per borough ---\")\n",
    "df_corpus.groupBy(\"borough\").count().orderBy(F.desc(\"count\")).show()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "c829620c",
   "metadata": {},
   "source": [
    "## 2. Normalisation du texte — Minuscules, Tokenisation, Suppression des stop‑words\n",
    "\n",
    "Pipeline :\n",
    "\n",
    "- Mettre le champ texte en minuscules\n",
    "\n",
    "- Tokeniser en découpant selon les espaces + la ponctuation via une seule expression régulière\n",
    "\n",
    "- Exploser en paires (doc_id, token)\n",
    "\n",
    "- Filtrer :\n",
    "\n",
    "    - les chaînes vides\n",
    "\n",
    "    - les caractères uniques\n",
    "\n",
    "- Supprimer les stop‑words en utilisant un ensemble Python diffusé (broadcast)\n",
    "\n",
    "- Reporter le nombre de tokens avant et après filtrage"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "id": "3c5a3d63",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Total tokens before stop-word removal: 2,443\n",
      "Total tokens after  stop-word removal: 1,759\n",
      "Stop-words removed : 684 (28.0% of all tokens)\n",
      "\n",
      "--- Sample (doc_id, token) pairs after normalization ---\n",
      "+-------+---------+-------------+\n",
      "| doc_id|  borough|        token|\n",
      "+-------+---------+-------------+\n",
      "|stn_001|Manhattan|         citi|\n",
      "|stn_001|Manhattan|         bike|\n",
      "|stn_001|Manhattan|      station|\n",
      "|stn_001|Manhattan|      located|\n",
      "|stn_001|Manhattan|         west|\n",
      "|stn_001|Manhattan|         21st|\n",
      "|stn_001|Manhattan|       street|\n",
      "|stn_001|Manhattan|          6th|\n",
      "|stn_001|Manhattan|       avenue|\n",
      "|stn_001|Manhattan|      chelsea|\n",
      "|stn_001|Manhattan|neighbourhood|\n",
      "|stn_001|Manhattan|    manhattan|\n",
      "|stn_001|Manhattan|         high|\n",
      "|stn_001|Manhattan|      traffic|\n",
      "|stn_001|Manhattan|      docking|\n",
      "+-------+---------+-------------+\n",
      "only showing top 15 rows\n"
     ]
    }
   ],
   "source": [
    "# Step 1 & 2: Lowercase + tokenize in one pass\n",
    "# [\\s\\W]+ splits on any whitespace or non-word character simultaneously\n",
    "df_tokens = df_corpus.withColumn(\n",
    "    \"tokens\",\n",
    "    F.split(F.lower(F.col(\"text\")), r\"[\\s\\W]+\")\n",
    ")\n",
    "\n",
    "# Step 3: Explode — one row per (doc_id, token)\n",
    "df_exploded = df_tokens.select(\n",
    "    \"doc_id\",\n",
    "    \"borough\",\n",
    "    F.explode(\"tokens\").alias(\"token\")\n",
    ")\n",
    "\n",
    "# Step 4: Remove empty strings and single-character tokens (noise)\n",
    "df_exploded = df_exploded.filter(F.length(\"token\") > 1)\n",
    "\n",
    "total_before = df_exploded.count()\n",
    "print(f\"Total tokens before stop-word removal: {total_before:,}\")\n",
    "\n",
    "# Step 5: Remove English stop-words\n",
    "# Selected for relevance to the NYC/cycling domain\n",
    "STOP_WORDS = {\n",
    "    \"the\", \"a\", \"an\", \"is\", \"are\", \"was\", \"were\", \"in\", \"on\", \"at\",\n",
    "    \"to\", \"for\", \"of\", \"and\", \"or\", \"not\", \"it\", \"this\", \"that\",\n",
    "    \"with\", \"as\", \"by\", \"be\", \"from\", \"its\", \"also\", \"but\", \"you\",\n",
    "    \"can\", \"has\", \"have\", \"so\", \"into\", \"than\", \"more\", \"all\", \"your\",\n",
    "    \"their\", \"they\", \"which\", \"been\", \"has\", \"via\", \"across\", \"over\",\n",
    "    \"between\", \"along\", \"near\", \"such\", \"both\", \"each\", \"while\",\n",
    "    \"including\", \"since\", \"per\", \"up\", \"about\", \"other\", \"who\",\n",
    "    \"within\", \"without\", \"no\", \"any\", \"new\"\n",
    "}\n",
    "\n",
    "# ~isin filters OUT the stop-words\n",
    "df_filtered = df_exploded.filter(~F.col(\"token\").isin(STOP_WORDS))\n",
    "\n",
    "total_after = df_filtered.count()\n",
    "removed     = total_before - total_after\n",
    "\n",
    "print(f\"Total tokens after  stop-word removal: {total_after:,}\")\n",
    "print(f\"Stop-words removed : {removed:,} ({removed/total_before*100:.1f}% of all tokens)\")\n",
    "\n",
    "print(\"\\n--- Sample (doc_id, token) pairs after normalization ---\")\n",
    "df_filtered.show(15)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "6cdc0a14",
   "metadata": {},
   "source": [
    "## 3. Construction de l’index inversé\n",
    "\n",
    "Regrouper par token → collecter la liste de tous les identifiants de documents (postings list) et la fréquence totale du terme (freq)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "id": "ca230228",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Unique terms in inverted index: 685\n",
      "\n",
      "--- Top 20 most frequent terms (Track C corpus) ---\n",
      "+--------------+------------------------------------------------------------+----+\n",
      "|         token|                                                     doc_ids|freq|\n",
      "+--------------+------------------------------------------------------------+----+\n",
      "|          bike|[stn_006, stn_007, stn_009, stn_010, stn_011, stn_011, st...|  61|\n",
      "|       station|[stn_007, stn_008, stn_008, stn_008, stn_009, stn_009, st...|  58|\n",
      "|       cycling|[stn_007, stn_008, stn_013, stn_014, stn_015, stn_015, st...|  34|\n",
      "|          citi|[stn_011, stn_013, stn_014, stn_016, stn_019, stn_019, ny...|  30|\n",
      "|        avenue|[stn_007, stn_007, stn_010, stn_010, stn_012, stn_012, st...|  29|\n",
      "|        street|[stn_007, stn_008, stn_009, stn_009, stn_010, stn_010, st...|  24|\n",
      "|          park|[stn_007, stn_009, stn_010, stn_013, stn_014, stn_014, st...|  21|\n",
      "|      brooklyn|[stn_011, stn_011, stn_012, stn_012, stn_013, stn_013, st...|  20|\n",
      "|     manhattan|[stn_006, stn_008, stn_009, stn_009, stn_010, stn_010, st...|  20|\n",
      "|     protected|[stn_006, stn_007, stn_010, stn_015, nyc_001, nyc_002, ny...|  18|\n",
      "|          city|[stn_009, stn_012, stn_019, stn_019, stn_020, nyc_001, ny...|  17|\n",
      "|       docking|[stn_006, stn_009, stn_010, stn_012, stn_014, stn_020, ny...|  16|\n",
      "|      stations|[stn_016, stn_020, nyc_004, nyc_006, nyc_007, nyc_008, ny...|  16|\n",
      "| neighbourhood|[stn_007, stn_010, stn_011, stn_015, stn_018, nyc_001, ny...|  14|\n",
      "|         bikes|[nyc_006, nyc_006, nyc_006, nyc_009, nyc_009, nyc_009, ny...|  13|\n",
      "|         lanes|[stn_007, nyc_001, nyc_002, nyc_004, nyc_004, nyc_004, ny...|  13|\n",
      "|          east|[stn_008, stn_010, stn_010, stn_015, stn_016, nyc_001, ny...|  13|\n",
      "|        subway|[stn_006, stn_008, stn_011, stn_013, stn_015, stn_016, st...|  13|\n",
      "|          path|[stn_008, stn_013, stn_016, stn_020, nyc_003, nyc_007, ny...|  11|\n",
      "|infrastructure|[stn_008, stn_015, nyc_001, nyc_002, nyc_003, nyc_005, ny...|  11|\n",
      "+--------------+------------------------------------------------------------+----+\n",
      "only showing top 20 rows\n"
     ]
    }
   ],
   "source": [
    "\n",
    "inverted_index = (\n",
    "    df_filtered\n",
    "    .groupBy(\"token\")\n",
    "    .agg(\n",
    "        F.collect_list(\"doc_id\").alias(\"doc_ids\"),\n",
    "        F.count(\"*\").alias(\"freq\")\n",
    "    )\n",
    "    .orderBy(F.desc(\"freq\"))\n",
    ")\n",
    "\n",
    "unique_terms = inverted_index.count()\n",
    "print(f\"Unique terms in inverted index: {unique_terms:,}\")\n",
    "\n",
    "print(\"\\n--- Top 20 most frequent terms (Track C corpus) ---\")\n",
    "inverted_index.show(20, truncate=60)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "af94cdbf",
   "metadata": {},
   "source": [
    "## 4. Écriture de l’index inversé — Parquet et CSV\n",
    "\n",
    "Écrire l’index dans les deux formats (Parquet et CSV), puis sauvegarder le plan de construction de l’index dans le dossier proof/."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "id": "9b5dbac7",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                "
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Parquet index written → outputs/lab2/inverted_index\n",
      "CSV index written    → outputs/lab2/inverted_index_csv\n",
      "\n",
      "Index build plan saved → proof/plan_index_build.txt\n",
      "== Physical Plan ==\n",
      "AdaptiveSparkPlan (11)\n",
      "+- Sort (10)\n",
      "   +- Exchange (9)\n",
      "      +- ObjectHashAggregate (8)\n",
      "         +- Exchange (7)\n",
      "            +- ObjectHashAggregate (6)\n",
      "               +- Filter (5)\n",
      "                  +- Generate (4)\n",
      "                     +- Project (3)\n",
      "                        +- Filter (2)\n",
      "                           +- Scan csv  (1)\n",
      "\n",
      "\n",
      "(1) Scan csv \n",
      "Output [2]: [doc_id#219, text#222]\n",
      "Batched: false\n",
      "Location: InMemoryFileIndex [file:/home/saraa/DE2/LAB2/PRATICE/data/corpus/citibike_corpus.csv]\n",
      "ReadSchema: struct<doc_id:string,text:string>\n",
      "\n",
      "(2) Filter\n",
      "Input [2]: [doc_id#219, text#222]\n",
      "Condition : ((size(split(lower(text#222), [\\s\\W]+, -1), false) > 0) AND isnotnull(split(lower(text#222), [\\s\\W]+, -1)))\n",
      "\n",
      "(3) Project\n",
      "Output [2]: [doc_id#219, split(lower(text#222), [\\s\\W]+, -1) AS tokens#292]\n",
      "Input [2]: [doc_id#219, text#222]\n",
      "\n",
      "(4) Generate\n",
      "Input [2]: [doc_id#219, tokens#292]\n",
      "Arguments: explode(tokens#292), [doc_id#219], false, [token#294]\n",
      "\n",
      "(5) Filter\n",
      "Input [2]: [doc_id#219, token#294]\n",
      "Condition : ((length(token#294) > 1) AND NOT token#294 INSET a, about, across, all, along, also, an, and, any, are, as, at, be, been, between, both, but, by, can, each, for, from, has, have, in, including, into, is, it, its, more, near, new, no, not, of, on, or, other, over, per, since, so, such, than, that, the, their, they, this, to, up, via, was, were, which, while, who, with, within, without, you, your)\n",
      "\n",
      "(6) ObjectHashAggregate\n",
      "Input [2]: [doc_id#219, token#294]\n",
      "Keys [1]: [token#294]\n",
      "Functions [2]: [partial_collect_list(doc_id#219, 0, 0), partial_count(1)]\n",
      "Aggregate Attributes [2]: [buf#342, count#343L]\n",
      "Results [3]: [token#294, buf#344, count#345L]\n",
      "\n",
      "(7) Exchange\n",
      "Input [3]: [token#294, buf#344, count#345L]\n",
      "Arguments: hashpartitioning(token#294, 200), ENSURE_REQUIREMENTS, [plan_id=1699]\n",
      "\n",
      "(8) ObjectHashAggregate\n",
      "Input [3]: [token#294, buf#344, count#345L]\n",
      "Keys [1]: [token#294]\n",
      "Functions [2]: [collect_list(doc_id#219, 0, 0), count(1)]\n",
      "Aggregate Attributes [2]: [collect_list(doc_id#219, 0, 0)#330, count(1)#329L]\n",
      "Results [3]: [token#294, collect_list(doc_id#219, 0, 0)#330 AS doc_ids#324, count(1)#329L AS freq#325L]\n",
      "\n",
      "(9) Exchange\n",
      "Input [3]: [token#294, doc_ids#324, freq#325L]\n",
      "Arguments: rangepartitioning(freq#325L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [plan_id=1702]\n",
      "\n",
      "(10) Sort\n",
      "Input [3]: [token#294, doc_ids#324, freq#325L]\n",
      "Arguments: [freq#325L DESC NULLS LAST], true, 0\n",
      "\n",
      "(11) AdaptiveSparkPlan\n",
      "Output [3]: [token#294, doc_ids#324, freq#325L]\n",
      "Arguments: isFinalPlan=false\n",
      "\n",
      "\n",
      "\n"
     ]
    }
   ],
   "source": [
    "pathlib.Path(\"outputs/lab2/inverted_index\").mkdir(parents=True, exist_ok=True)\n",
    "pathlib.Path(\"outputs/lab2/inverted_index_csv\").mkdir(parents=True, exist_ok=True)\n",
    "pathlib.Path(\"proof\").mkdir(parents=True, exist_ok=True)\n",
    "\n",
    "# --- Parquet ---\n",
    "# Native ArrayType column for doc_ids → efficient columnar storage + Snappy compression\n",
    "inverted_index.write.mode(\"overwrite\").parquet(\"outputs/lab2/inverted_index\")\n",
    "print(\"Parquet index written → outputs/lab2/inverted_index\")\n",
    "\n",
    "# --- CSV ---\n",
    "# Must flatten the ArrayType to a comma-separated string (CSV has no array type)\n",
    "(\n",
    "    inverted_index\n",
    "    .withColumn(\"doc_ids\", F.concat_ws(\",\", \"doc_ids\"))\n",
    "    .write\n",
    "    .mode(\"overwrite\")\n",
    "    .option(\"header\", \"true\")\n",
    "    .csv(\"outputs/lab2/inverted_index_csv\")\n",
    ")\n",
    "print(\"CSV index written    → outputs/lab2/inverted_index_csv\")\n",
    "\n",
    "# --- Save index build plan ---\n",
    "old_stdout = sys.stdout\n",
    "sys.stdout = buffer = io.StringIO()\n",
    "inverted_index.explain(\"formatted\")\n",
    "sys.stdout = old_stdout\n",
    "plan_build = buffer.getvalue()\n",
    "\n",
    "with open(\"proof/plan_index_build.txt\", \"w\") as f:\n",
    "    f.write(plan_build)\n",
    "\n",
    "print(\"\\nIndex build plan saved → proof/plan_index_build.txt\")\n",
    "print(plan_build)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "d5bbac4b",
   "metadata": {},
   "source": [
    "## 5. Mesure de la latence des requêtes\n",
    "\n",
    "Lire l’index Parquet, le mettre en cache, puis rechercher 8 termes pertinents pour le domaine (Track C — micromobilité / NYC).\n",
    "Mesurer la latence en temps réel (wall‑clock) avec time.time() autour de chaque .collect().\n",
    "Sauvegarder le plan de requête."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "id": "63fc5a55",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "26/05/10 20:53:18 WARN CacheManager: Asked to cache already cached data.\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Index loaded from Parquet and cached.\n",
      "\n",
      "Term                   Freq  Postings   Latency (ms)\n",
      "-------------------------------------------------------\n",
      "cycling                  34        34           56.9  [HIT]\n",
      "station                  58        58           52.9  [HIT]\n",
      "bike                     61        61           54.1  [HIT]\n",
      "protected                18        18           68.4  [HIT]\n",
      "subway                   13        13           69.1  [HIT]\n",
      "brooklyn                 20        20           50.0  [HIT]\n",
      "electric                  7         7           49.4  [HIT]\n",
      "rebalancing               4         4           48.1  [HIT]\n",
      "xzqwerty999               0         0           38.5  [MISS]\n",
      "\n",
      "Query plan saved → proof/plan_query.txt\n",
      "== Physical Plan ==\n",
      "* Filter (5)\n",
      "+- InMemoryTableScan (1)\n",
      "      +- InMemoryRelation (2)\n",
      "            +- * ColumnarToRow (4)\n",
      "               +- Scan parquet  (3)\n",
      "\n",
      "\n",
      "(1) InMemoryTableScan\n",
      "Output [3]: [token#1088, doc_ids#1089, freq#1090L]\n",
      "Arguments: [token#1088, doc_ids#1089, freq#1090L], [isnotnull(token#1088), (token#1088 = cycling)]\n",
      "\n",
      "(2) InMemoryRelation\n",
      "Arguments: [token#1088, doc_ids#1089, freq#1090L], StorageLevel(disk, memory, deserialized, 1 replicas)\n",
      "\n",
      "(3) Scan parquet \n",
      "Output [3]: [token#402, doc_ids#403, freq#404L]\n",
      "Batched: true\n",
      "Location: InMemoryFileIndex [file:/home/saraa/DE2/LAB2/PRATICE/outputs/lab2/inverted_index]\n",
      "ReadSchema: struct<token:string,doc_ids:array<string>,freq:bigint>\n",
      "\n",
      "(4) ColumnarToRow [codegen id : 1]\n",
      "Input [3]: [token#402, doc_ids#403, freq#404L]\n",
      "\n",
      "(5) Filter [codegen id : 1]\n",
      "Input [3]: [token#1088, doc_ids#1089, freq#1090L]\n",
      "Condition : (isnotnull(token#1088) AND (token#1088 = cycling))\n",
      "\n",
      "\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# Read back from Parquet for a fair latency measurement\n",
    "idx = spark.read.parquet(\"outputs/lab2/inverted_index\")\n",
    "\n",
    "# Cache the index to memory (simulates a running search service)\n",
    "idx.cache()\n",
    "idx.count()  # trigger materialization\n",
    "print(\"Index loaded from Parquet and cached.\\n\")\n",
    "\n",
    "# Track C — domain-relevant query terms\n",
    "query_terms = [\n",
    "    \"cycling\",       # very common in corpus\n",
    "    \"station\",       # core noun\n",
    "    \"bike\",          # key domain term\n",
    "    \"protected\",     # infrastructure descriptor\n",
    "    \"subway\",        # multimodal term\n",
    "    \"brooklyn\",      # borough name\n",
    "    \"electric\",      # e-bike related\n",
    "    \"rebalancing\",   # operational term\n",
    "    \"xzqwerty999\",   # MISS — term that does not exist\n",
    "]\n",
    "\n",
    "results_log = []\n",
    "\n",
    "print(f\"{'Term':<20} {'Freq':>6} {'Postings':>9} {'Latency (ms)':>14}\")\n",
    "print(\"-\" * 55)\n",
    "\n",
    "for term in query_terms:\n",
    "    t0     = time.time()\n",
    "    result = idx.filter(F.col(\"token\") == term).collect()\n",
    "    lat_ms = (time.time() - t0) * 1000\n",
    "\n",
    "    if result:\n",
    "        freq   = result[0][\"freq\"]\n",
    "        n_docs = len(result[0][\"doc_ids\"])\n",
    "    else:\n",
    "        freq, n_docs = 0, 0\n",
    "\n",
    "    status = \"HIT\" if result else \"MISS\"\n",
    "    print(f\"{term:<20} {freq:>6} {n_docs:>9} {lat_ms:>14.1f}  [{status}]\")\n",
    "    results_log.append((term, freq, n_docs, round(lat_ms, 2), status))\n",
    "\n",
    "# --- Save query plan ---\n",
    "old_stdout = sys.stdout\n",
    "sys.stdout = buffer = io.StringIO()\n",
    "idx.filter(F.col(\"token\") == \"cycling\").explain(\"formatted\")\n",
    "sys.stdout = old_stdout\n",
    "plan_query = buffer.getvalue()\n",
    "\n",
    "with open(\"proof/plan_query.txt\", \"w\") as f:\n",
    "    f.write(plan_query)\n",
    "\n",
    "print(\"\\nQuery plan saved → proof/plan_query.txt\")\n",
    "print(plan_query)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "footprint_md",
   "metadata": {},
   "source": [
    "## 6. Storage Footprint Comparison — Parquet vs CSV\n",
    "\n",
    "Compare on-disk size of both formats for the inverted index."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "id": "49457d01",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "=============================================\n",
      "  Storage Footprint Comparison\n",
      "=============================================\n",
      "  Parquet :      8,171 bytes  (8.0 KB)\n",
      "  CSV     :     21,637 bytes  (21.1 KB)\n",
      "  Ratio   : 0.3776  (Parquet / CSV)\n",
      "=============================================\n",
      "  → Parquet is 62.2% smaller than CSV.\n"
     ]
    }
   ],
   "source": [
    "def dir_size(path):\n",
    "    \"\"\"Total size in bytes of all files under path.\"\"\"\n",
    "    return sum(\n",
    "        f.stat().st_size\n",
    "        for f in pathlib.Path(path).rglob(\"*\")\n",
    "        if f.is_file()\n",
    "    )\n",
    "\n",
    "parquet_bytes = dir_size(\"outputs/lab2/inverted_index\")\n",
    "csv_bytes     = dir_size(\"outputs/lab2/inverted_index_csv\")\n",
    "ratio         = parquet_bytes / csv_bytes if csv_bytes > 0 else 0\n",
    "\n",
    "print(\"=\" * 45)\n",
    "print(\"  Storage Footprint Comparison\")\n",
    "print(\"=\" * 45)\n",
    "print(f\"  Parquet : {parquet_bytes:>10,} bytes  ({parquet_bytes/1024:.1f} KB)\")\n",
    "print(f\"  CSV     : {csv_bytes:>10,} bytes  ({csv_bytes/1024:.1f} KB)\")\n",
    "print(f\"  Ratio   : {ratio:.4f}  (Parquet / CSV)\")\n",
    "print(\"=\" * 45)\n",
    "\n",
    "if ratio < 1:\n",
    "    saving = (1 - ratio) * 100\n",
    "    print(f\"  → Parquet is {saving:.1f}% smaller than CSV.\")\n",
    "else:\n",
    "    overhead = (ratio - 1) * 100\n",
    "    print(f\"  → Parquet is {overhead:.1f}% larger than CSV on this small corpus.\")\n",
    "    print(\"    (Parquet metadata overhead dominates at small scale;\")\n",
    "    print(\"     Parquet wins decisively at millions of records.)\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "metrics_md",
   "metadata": {},
   "source": [
    "## 7. Metrics Log — `lab2_metrics_log.csv`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "id": "metrics_code",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Metrics log written → lab2_metrics_log.csv\n"
     ]
    }
   ],
   "source": [
    "metrics_path = \"lab2_metrics_log.csv\"\n",
    "\n",
    "with open(metrics_path, \"w\", newline=\"\", encoding=\"utf-8\") as mf:\n",
    "    writer = csv_module.writer(mf)\n",
    "    writer.writerow([\"metric_type\", \"key\", \"value\", \"unit\"])\n",
    "\n",
    "    # Corpus\n",
    "    writer.writerow([\"corpus\", \"track\",                  \"C_micromobility\",     \"-\"])\n",
    "    writer.writerow([\"corpus\", \"documents_count\",         len(citibike_corpus),  \"docs\"])\n",
    "    writer.writerow([\"corpus\", \"avg_doc_length_chars\",    round(avg_len, 0),      \"chars\"])\n",
    "    writer.writerow([\"corpus\", \"min_doc_length_chars\",    min_len,               \"chars\"])\n",
    "    writer.writerow([\"corpus\", \"max_doc_length_chars\",    max_len,               \"chars\"])\n",
    "\n",
    "    # Tokenization\n",
    "    writer.writerow([\"tokens\", \"total_before_stopwords\",  total_before,          \"tokens\"])\n",
    "    writer.writerow([\"tokens\", \"total_after_stopwords\",   total_after,           \"tokens\"])\n",
    "    writer.writerow([\"tokens\", \"stopwords_removed\",       total_before-total_after, \"tokens\"])\n",
    "    writer.writerow([\"tokens\", \"stopword_filter_rate_pct\",\n",
    "                     round((total_before-total_after)/total_before*100, 2),       \"%\"])\n",
    "\n",
    "    # Index\n",
    "    writer.writerow([\"index\",  \"unique_terms\",             unique_terms,          \"terms\"])\n",
    "\n",
    "    # Storage\n",
    "    writer.writerow([\"storage\", \"parquet_bytes\",           parquet_bytes,         \"bytes\"])\n",
    "    writer.writerow([\"storage\", \"csv_bytes\",               csv_bytes,             \"bytes\"])\n",
    "    writer.writerow([\"storage\", \"parquet_csv_ratio\",       round(ratio, 4),       \"ratio\"])\n",
    "\n",
    "    # Query latency\n",
    "    for (term, freq, n_docs, lat_ms, status) in results_log:\n",
    "        writer.writerow([\"query_latency\",  f\"term:{term}\", lat_ms,  \"ms\"])\n",
    "        writer.writerow([\"query_freq\",     f\"term:{term}\", freq,    \"occurrences\"])\n",
    "        writer.writerow([\"query_docs\",     f\"term:{term}\", n_docs,  \"postings\"])\n",
    "        writer.writerow([\"query_status\",   f\"term:{term}\", status,  \"-\"])\n",
    "\n",
    "print(f\"Metrics log written → {metrics_path}\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "id": "4d12fba4",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Lab 2 — Track C complete. \n"
     ]
    }
   ],
   "source": [
    "spark.stop()\n",
    "print(\"Lab 2 — Track C complete. \")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "a0e028d7-ff1f-407a-977d-a58d32add32a",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python (de2-env)",
   "language": "python",
   "name": "de2-env"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.20"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
