diff --git a/analysis/rowgroup_analysis.py b/analysis/rowgroup_analysis.py new file mode 100644 index 00000000..0e393ff5 --- /dev/null +++ b/analysis/rowgroup_analysis.py @@ -0,0 +1,63 @@ +import os + +import pyarrow.parquet as pq +import polars as pl + + +def get_rowgroup_statistics(parquet_path: str) -> pl.DataFrame: + """ + Extract row group statistics from a Parquet file and return as a Polars DataFrame. + + Args: + parquet_path: Path to the parquet file + + Returns: + Polars DataFrame with row group statistics + """ + parquet_file = pq.ParquetFile(parquet_path) + file_metadata = parquet_file.metadata + + records = [] + for i in range(file_metadata.num_row_groups): + row_group_metadata = file_metadata.row_group(i) + + for col_i in range(row_group_metadata.num_columns): + column_chunk_metadata = row_group_metadata.column(col_i) + stats = column_chunk_metadata.statistics + + records.append( + { + "row_group": i, + "num_rows": row_group_metadata.num_rows, + "total_byte_size": row_group_metadata.total_byte_size, + "column_index": col_i, + "column_path": column_chunk_metadata.path_in_schema, + "compressed_size": column_chunk_metadata.total_compressed_size, + "uncompressed_size": column_chunk_metadata.total_uncompressed_size, + "min_value": str(stats.min) if stats and stats.min is not None else None, + "max_value": str(stats.max) if stats and stats.max is not None else None, + "null_count": stats.null_count if stats else None, + "num_values": stats.num_values if stats else None, + } + ) + + return pl.DataFrame(records) + + +if __name__ == "__main__": + # df = get_rowgroup_statistics("/tmp/2026_2_1.parquet") + # print(df) + + parquet_file = pq.ParquetFile(file) + file_metadata = parquet_file.metadata + for file in os.listdir("/Users/hhuang/ingestion/lamp/RT_TRIP_UPDATES/year=2026/month=2/day=1/"): + print(file) + df = get_rowgroup_statistics(f"/Users/hhuang/ingestion/lamp/RT_TRIP_UPDATES/year=2026/month=2/day=1/{file}") + breakpoint() + ts = df.filter(pl.col("column_path") == "feed_timestamp") + parquet_file = pq.ParquetFile(f"/Users/hhuang/ingestion/lamp/RT_TRIP_UPDATES/year=2026/month=2/day=1/{file}") + file_metadata = parquet_file.metadata + + ts.select("min_value", "max_value").with_columns( + pl.lit(f"/Users/hhuang/ingestion/lamp/RT_TRIP_UPDATES/year=2026/month=2/day=1/{file}").alias("filename") + ) diff --git a/analysis/write_local_pq_test.py b/analysis/write_local_pq_test.py new file mode 100644 index 00000000..68ec6a90 --- /dev/null +++ b/analysis/write_local_pq_test.py @@ -0,0 +1,134 @@ +# pylint: disable=R0914 +# pylint too many local variables (more than 15) +from importlib.resources import files + +import os +import tempfile +from typing import ( + List, + Tuple, +) + +from pyarrow import fs + +import pyarrow.compute as pc +import pyarrow.parquet as pq +import pyarrow.dataset as pd + +from lamp_py.aws.s3 import ( + upload_file, +) + +from lamp_py.runtime_utils.remote_files import ( + S3_SPRINGBOARD, +) +import time + + +# with pq.ParquetWriter(self.local_parquet_path, schema=self.output_processed_schema) as writer: +# for batch in ds.to_batches( +# batch_size=500_000, +# columns=[col for col in ds.schema.names if col != "lamp_record_hash"], +# filter=self.parquet_filter, +# batch_readahead=1, +# fragment_readahead=0, +# ): +# # don't check empty batch if no rows +# if batch.num_rows == 0: +# continue + + +def partition_column() -> str: + return "trip_update.trip.route_id" + + +def table_sort_order() -> List[Tuple[str, str]]: + return [ + ("feed_timestamp", "ascending"), + ("trip_update.trip.route_pattern_id", "ascending"), + ("trip_update.trip.direction_id", "ascending"), + ("trip_update.vehicle.id", "ascending"), + ] + + +def table_sort_order_pl() -> Tuple[List[str], List[bool]]: + sort_order = table_sort_order() + return ( + [item[0] for item in sort_order], + [item[1] == "ascending" for item in sort_order], + ) + + +def write_local_pq(local_path: str, partition_column: str, in_partition_sort: List[Tuple[str, str]]) -> None: + + ds_paths = [s.replace("s3://", "") for s in files] + + # s3_uris = file_list_from_s3( + # bucket_name=self.remote_input_location.bucket, + # file_prefix=self.remote_input_location.prefix, + # ) + + ds_paths = os.listdir(local_path) + ds_paths = [os.path.join(local_path, s) for s in ds_paths] + + ds = pd.dataset( + ds_paths, + format="parquet", + filesystem=fs.LocalFileSystem(), + ) + + with tempfile.TemporaryDirectory(delete=False) as temp_dir: + rail_full_set_path = os.path.join(temp_dir, "rail_full_set.parquet") + + print(rail_full_set_path) + # include the hash column for debug + rail_full_set_writer = pq.ParquetWriter( + rail_full_set_path, schema=ds.schema, compression="zstd", compression_level=3 + ) + + partitions = pc.unique(ds.to_table(columns=[partition_column]).column(partition_column)) + + partitions = sorted(partitions.to_pylist()) + + print(f"Found {len(partitions)} unique partitions based on column {partition_column}") + + debug = True + if debug: + start_time = time.time() + + # col, order = table_sort_order_pl() + for part in partitions: + write_table = ds.to_table(filter=((pc.field(partition_column) == part))).sort_by(in_partition_sort) + + # write_table = pl.from_arrow(ds.to_table( + # filter=( + # (pc.field(partition_column) == part) + # ) + # )).sort(col, descending=order).to_arrow().cast(ds.schema) + + rail_full_set_writer.write_table(write_table) + + if debug: + elapsed = time.time() - start_time + print(f"Processed partition {part}: {elapsed:.2f}s elapsed, total rows: {write_table.num_rows}") + + rail_full_set_writer.close() + + if False: + # upload the upload_path file (without hash) to s3 + # replace the first part of the path with the s3 path + upload_file( + upload_path, + local_path.replace(self.tmp_folder, S3_SPRINGBOARD), + ) + + +if __name__ == "__main__": + write_local_pq( + "/Users/hhuang/ingestion/lamp/RT_TRIP_UPDATES/year=2026/month=2/day=1/", partition_column(), table_sort_order() + ) + write_local_pq( + "s3://mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2026/month=4/day=1/2026-04-01T00:00:00.parquet", + partition_column(), + table_sort_order(), + ) diff --git a/poetry.lock b/poetry.lock index e663b0ed..7c5f00fa 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1992,41 +1992,42 @@ testing = ["pytest"] [[package]] name = "marimo" -version = "0.16.5" +version = "0.23.6" description = "A library for making reactive notebooks and apps" optional = false -python-versions = ">=3.9" -groups = ["investigation"] +python-versions = ">=3.10" files = [ - {file = "marimo-0.16.5-py3-none-any.whl", hash = "sha256:1f98c0ee0fed9337e26c895c662f92cc578cdd03502c194eac9ceeb434bf479b"}, - {file = "marimo-0.16.5.tar.gz", hash = "sha256:8f5939d3c4e67ff25f6cfeefe731971ed7f3346c20098034b923a24a0d7770d6"}, + {file = "marimo-0.23.6-py3-none-any.whl", hash = "sha256:e8d19d875b0212600faa80eaaed0fd3f34dfb7b5241e630cf2b1cedd8dd14509"}, + {file = "marimo-0.23.6.tar.gz", hash = "sha256:d63aeeee1e9ea7cac79bf2530daba915199153dce4d156fade7546474679d3ca"}, ] [package.dependencies] click = ">=8.0,<9" docutils = ">=0.16.0" itsdangerous = ">=2.0.0" -jedi = ">=0.18.0" -loro = {version = ">=1.5.0", markers = "python_full_version >= \"3.11.0\""} +jedi = ">=0.18.0,<0.20.0" +loro = ">=1.10.0" markdown = ">=3.6,<4" -msgspec = ">=0.19.0" +msgspec = ">=0.20.0" narwhals = ">=2.0.0" packaging = "*" psutil = ">=5.0" -pygments = ">=2.13,<3" -pymdown-extensions = ">=10.15,<11" -pyyaml = ">=6.0" -starlette = ">=0.35.0,<0.36.0 || >0.36.0" +pygments = ">=2.19,<3" +pymdown-extensions = ">=10.21.2,<11" +pyyaml = ">=6.0.1" +pyzmq = {version = ">=27.1.0", markers = "python_full_version < \"3.15\""} +starlette = ">=0.37.2" tomlkit = ">=0.12.0" -uvicorn = ">=0.22.0,<0.36.0" +uvicorn = ">=0.22.0" websockets = ">=14.2.0" [package.extras] -dev = ["black (>=23.12.1,<23.13.0)", "click (>=8.0,<9)", "duckdb (>=1.0.0)", "openai (>=1.55.3)", "opentelemetry-api (>=1.26.0,<1.27.0)", "opentelemetry-sdk (>=1.26.0,<1.27.0)", "ruff (>=0.13.2)", "sqlglot[rs] (>=26.2.0)"] lsp = ["python-lsp-ruff (>=2.0.0)", "python-lsp-server (>=1.13.0)"] -mcp = ["mcp (>=1.0.0) ; python_full_version >= \"3.10.0\"", "pydantic (>2) ; python_full_version >= \"3.10.0\""] -recommended = ["altair (>=5.4.0)", "duckdb (>=1.0.0)", "nbformat (>=5.7.0)", "openai (>=1.55.3)", "polars[pyarrow] (>=1.9.0)", "ruff", "sqlglot[rs] (>=26.2.0)"] -sql = ["duckdb (>=1.0.0)", "polars[pyarrow] (>=1.9.0)", "sqlglot[rs] (>=26.2.0)"] +mcp = ["mcp (>=1.0.0)", "pydantic (>2)"] +otel = ["opentelemetry-api (>=1.28.0,<1.29.0)", "opentelemetry-exporter-otlp-proto-grpc (>=1.28.0,<1.29.0)", "opentelemetry-exporter-otlp-proto-http (>=1.28.0,<1.29.0)", "opentelemetry-sdk (>=1.28.0,<1.29.0)"] +recommended = ["altair (>=5.4.0)", "marimo[sandbox]", "marimo[sql]", "nbformat (>=5.7.0)", "pydantic-ai-slim[openai] (>=1.52.0)", "ruff"] +sandbox = ["pyzmq (>=27.1.0)", "uv (>=0.9.21)"] +sql = ["duckdb (>=1.0.0)", "polars[pyarrow] (>=1.9.0)", "sqlglot[c] (>=26.8.0)"] [[package]] name = "markdown" @@ -3325,14 +3326,14 @@ testutils = ["gitpython (>3)"] [[package]] name = "pymdown-extensions" -version = "10.21" +version = "10.21.3" description = "Extension pack for Python Markdown." optional = false python-versions = ">=3.9" groups = ["investigation"] files = [ - {file = "pymdown_extensions-10.21-py3-none-any.whl", hash = "sha256:91b879f9f864d49794c2d9534372b10150e6141096c3908a455e45ca72ad9d3f"}, - {file = "pymdown_extensions-10.21.tar.gz", hash = "sha256:39f4a020f40773f6b2ff31d2cd2546c2c04d0a6498c31d9c688d2be07e1767d5"}, + {file = "pymdown_extensions-10.21.3-py3-none-any.whl", hash = "sha256:d7a5d08014fc571e80ca21dd6f854e31f94c489800350564d55d15b3c41e76b6"}, + {file = "pymdown_extensions-10.21.3.tar.gz", hash = "sha256:72cfcf55f07aea0d4af2c4f11dd4e52466ddfb1bb819673146398e0bd3a77354"}, ] [package.dependencies] @@ -4502,4 +4503,4 @@ propcache = ">=0.2.1" [metadata] lock-version = "2.1" python-versions = "^3.12" -content-hash = "92a0fd856642cb354f15e8c22a84bc23972365b7f4cac91c2301e5cf4d7feb45" +content-hash = "453e3d5378b2505a1183687b8b1d9cfd14d7122e097fc7921b6a7fed611ebf0f" diff --git a/pyproject.toml b/pyproject.toml index 47a05f96..2c5fcd2f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,7 +54,8 @@ ipykernel = "^7.1.0" matplotlib = "^3.9.0" seaborn = "^0.13.2" tabulate = "^0.9.0" -marimo = "^0.16" +marimo = "^0.23" +plotly = "^6.5" [tool.poetry.group.dev.dependencies] black = "^24.3.0" diff --git a/runners/run_ingest_s3_files.py b/runners/run_ingest_s3_files.py new file mode 100644 index 00000000..74cf9ec2 --- /dev/null +++ b/runners/run_ingest_s3_files.py @@ -0,0 +1,7 @@ +from lamp_py.ingestion.ingest_gtfs import ingest_s3_files + +# from lamp_py.postgres.postgres_utils import start_rds_writer_process + +# metadata_queue, rds_process = start_rds_writer_process() + +ingest_s3_files(None, bucket_filter="lamp") diff --git a/src/lamp_py/ad_hoc/backfill_runner.py b/src/lamp_py/ad_hoc/backfill_runner.py index cc4cb252..c3d3af0c 100644 --- a/src/lamp_py/ad_hoc/backfill_runner.py +++ b/src/lamp_py/ad_hoc/backfill_runner.py @@ -1,382 +1,107 @@ -# pylint: disable=too-many-positional-arguments,too-many-arguments, too-many-locals, redefined-outer-name, R0801 - -from concurrent.futures import ThreadPoolExecutor -import logging -import os -from datetime import date, datetime, timedelta -from pathlib import Path -from queue import Queue -from typing import Dict, Iterable, List, Optional -import pyarrow -import pyarrow.dataset as pd -import pyarrow.parquet as pq -import dataframely as dy -import polars as pl - -from lamp_py.ingestion.config_rt_trip import RtTripDetail -from lamp_py.ingestion.convert_gtfs_rt import GtfsRtConverter, TableData -from lamp_py.ingestion.converter import ConfigType - -from lamp_py.aws.s3 import file_list_from_s3, upload_file -from lamp_py.runtime_utils.remote_files import springboard_rt_vehicle_positions -from lamp_py.runtime_utils.process_logger import ProcessLogger -from lamp_py.runtime_utils.remote_files import LAMP, S3_ARCHIVE, S3Location - -from lamp_py.tableau.conversions import convert_gtfs_rt_vehicle_position -from lamp_py.tableau.jobs.filtered_hyper import FilteredHyperJob -from lamp_py.tableau.jobs.lamp_jobs import GTFS_RT_TABLEAU_PROJECT -from lamp_py.utils.filter_bank import FilterBankRtVehiclePositions - -# read everything from a day in archive -# parse it json like before...rerun processing, spit out tmp file -# ensure the file name is in different partition -# output back to springboard - -# is this a good way to do it? -# kind of want...merge sort or something. -# process files is running 1 worker for each type. so this will be slow. -# want to run N workers for all of the files. - -# test gz to pyarrow vs gz to polars - time it - -# specialize GtfsRtConverter - - -# pylint disable=too-many-arguments -class GtfsRtTripsAdHocConverter(GtfsRtConverter): - """ - Converter that handles GTFS Real Time JSON data - - https_cdn.mbta.com_realtime_TripUpdates_enhanced.json.gz - """ - - def __init__( - self, - config_type: ConfigType, - metadata_queue: Queue[Optional[str]], - output_location: str, - polars_filter: pl.Expr | None = None, # default to true - which will essentially not filter - max_workers: int = 4, - ) -> None: - GtfsRtConverter.__init__(self, config_type, metadata_queue, max_workers=max_workers) - - self.detail = RtTripDetail() - - self.tmp_folder = output_location - - self.data_parts: Dict[datetime, TableData] = {} - - self.error_files: List[str] = [] - self.archive_files: List[str] = [] - - if polars_filter is None: - polars_filter = pl.lit(None) - self.filter = polars_filter - - def convert(self) -> None: - - process_logger = ProcessLogger( - "parquet_table_creator", - table_type="gtfs-rt", - config_type=str(self.config_type), - file_count=len(self.files), - ) - process_logger.log_start() - - table_count = 0 - try: - for table in self.process_files(): - if table.num_rows == 0: - continue - partition_dt = self.partition_dt(table) - - local_path = os.path.join( - self.tmp_folder, - LAMP, - str(self.config_type), - f"year={partition_dt.year}", - f"month={partition_dt.month}", - f"day={partition_dt.day}", - f"{partition_dt.isoformat()}_part_{str(table_count)}.parquet", - ) - os.makedirs(Path(local_path).parent, exist_ok=True) - - self.write_local_pq(table, local_path) - - pool = pyarrow.default_memory_pool() - pool.release_unused() - table_count += 1 - process_logger.add_metadata(table_count=table_count) - - except Exception as exception: - process_logger.log_failure(exception) - else: - process_logger.log_complete() - finally: - self.data_parts = {} - # self.move_s3_files() - # self.clean_local_folders() - - def write_local_pq(self, table: pyarrow.Table, local_path: str) -> None: - """ - just write the file out.. - """ - print("running GtfsRtTripUpdatesConverter::write_local_pq") - - writer = pq.ParquetWriter(local_path, schema=table.schema, compression="zstd", compression_level=3) - writer.write_table(table) - writer.close() - - def process_files(self) -> Iterable[pyarrow.table]: - """ - iterate through all of the files to be converted - apply a polars filter to narrow results at the source to reduce write churn - filter at json.gz input level - - only yield a new table when table size crosses over min_rows of yield_check - """ - - process_logger = ProcessLogger( - "create_pyarrow_tables", - config_type=str(self.config_type), - ) - process_logger.log_start() - - with ThreadPoolExecutor(max_workers=self.max_workers, initializer=self.thread_init) as pool: - for result_dt, result_filename, rt_data in pool.map(self.gz_to_pyarrow, self.files): - # errors in gtfs_rt conversions are handled in the gz_to_pyarrow - # function. if one is encountered, the datetime will be none. log - # the error and move on to the next file. - if result_dt is None: - logging.error( - "skipping processing: %s", - result_filename, - ) - continue - - # create key for self.data_parts dictionary - dt_part = datetime( - year=result_dt.year, - month=result_dt.month, - day=result_dt.day, - ) - # create new self.table_groups entry for key if it doesn't exist - if dt_part not in self.data_parts: - self.data_parts[dt_part] = TableData() - tmp = self.detail.transform_for_write(rt_data) - df = pl.from_arrow(tmp) - self.data_parts[dt_part].table = df.filter(self.filter).to_arrow() # type: ignore - - else: - self.data_parts[dt_part].table = pyarrow.concat_tables( - [ - self.data_parts[dt_part].table, - pl.from_arrow(self.detail.transform_for_write(rt_data)).filter(self.filter).to_arrow(), # type: ignore - ] - ) - - self.data_parts[dt_part].files.append(result_filename) - - yield from self.yield_check(process_logger) - - # yield any remaining tables - yield from self.yield_check(process_logger, min_rows=-1) - - process_logger.add_metadata(file_count=0, number_of_rows=0) - process_logger.log_complete() - - -def delta_reingestion_runner( - start_date: date, - end_date: date, - final_output_path: S3Location, - polars_filter: pl.Expr | None = None, - max_workers: int = 4, - local_output_location: str = "/tmp/gtfs-rt-continuous/", -) -> None: - """ - Docstring for delta_reingestion_runner - - :param start_date: start of reingestion range - :type start_date: date - :param end_date: end of reingestion range - :type end_date: date - :param final_output_base: final resting prefix-path of output artifacts - :type final_output_base: S3Location - :param polars_filter: optional polars expression filter to be applied - :type polars_filter: pl.Expr | None - :param max_workers: number of worker threads to ingest delta files with - default is 4 - :type max_workers: int - :param local_output_location: temporary working directory to place outputs - :type local_output_location: str - - """ - logger = ProcessLogger("backfiller") - logger.log_start() - - cur_date = start_date - - while cur_date <= end_date: - prefix = ( - os.path.join( - LAMP, - "delta", - cur_date.strftime("%Y"), - cur_date.strftime("%m"), - cur_date.strftime("%d"), - ) - + "/" - ) - - file_list = file_list_from_s3( - S3_ARCHIVE, - prefix, - in_filter="mbta.com_realtime_TripUpdates_enhanced.json.gz", - ) - - print(len(file_list)) - - #### Stage 1: local to local (MANY to many) - - # construct and run converter once per day - converter = GtfsRtTripsAdHocConverter( - config_type=ConfigType.RT_TRIP_UPDATES, - metadata_queue=Queue(), - output_location=local_output_location, - polars_filter=polars_filter, - max_workers=max_workers, - ) - converter.add_files(file_list) - # this outputs to local output_location=tmp_output_location - converter.convert() - - ## Stage 2: local to local (many to 1) - - # Define the path to your input Parquet files (can use a glob pattern) - converter_output_path = f"{local_output_location}/lamp/RT_TRIP_UPDATES/year={cur_date.year}/month={cur_date.month}/day={cur_date.day}/" - consolidated_parquet_output_file = f"/tmp/{cur_date.year}_{cur_date.month}_{cur_date.day}.parquet" - - # Create a dataset from the input files - ds = pd.dataset(converter_output_path, format="parquet") - - with pq.ParquetWriter( - consolidated_parquet_output_file, schema=ds.schema, compression="zstd", compression_level=3 - ) as writer: - for batch in ds.to_batches(batch_size=512 * 1024): - writer.write_batch(batch) - - #### Stage 3: local to remote (one to one) - - # upload local to remote - upload_file( - consolidated_parquet_output_file, - consolidated_parquet_output_file.replace( - "/tmp/", f"{final_output_path.s3_uri}/year={cur_date.year}/month={cur_date.month}/day={cur_date.day}/" - ), - ) - - cur_date = cur_date + timedelta(days=1) - - class MinimalSchema(dy.Schema): - "Intersection of descendant rail schemas." - trip_id = dy.String(nullable=True, alias="trip_update.trip.trip_id") - start_date = dy.String(nullable=True, alias="trip_update.trip.start_date") - feed_timestamp = dy.Datetime(nullable=True) - departure_time = dy.Datetime(nullable=True, alias="trip_update.stop_time_update.departure.time") - - final_output_base_tu = S3Location(S3_ARCHIVE, "lamp/adhoc/RT_TRIP_UPDATES_20251024_20251124.parquet") - - hyperjob = FilteredHyperJob( - remote_input_location=final_output_path, - remote_output_location=final_output_base_tu, - start_date=start_date, - end_date=end_date, - processed_schema=MinimalSchema.to_pyarrow_schema(), - dataframe_filter=adhoc_convert_tz_filter_revenue_only, - parquet_filter=None, - tableau_project_name=GTFS_RT_TABLEAU_PROJECT, - ) - - hyperjob.run_parquet() - hyperjob.create_local_hyper() - - -def adhoc_convert_tz_filter_revenue_only(df: pl.DataFrame) -> pl.DataFrame: - """ - Docstring for adhoc_convert_tz_filter_revenue_only - - :param df: trip_updates dataframe - :type df: polars DataFrame - :return: trip_updates with timezones converted and filtered to revenue only - :rtype: polars DataFrame - """ - # Filter data to = TRUE, trip_update.stop_time_update.schedule_relationship != SKIPPED, and trip_update.trip.schedule_relationship != CANCELED - df = df.with_columns( - pl.from_epoch(pl.col("trip_update.stop_time_update.departure.time"), time_unit="s") - .dt.convert_time_zone(time_zone="US/Eastern") - .dt.replace_time_zone(None), - pl.from_epoch(pl.col("trip_update.stop_time_update.arrival.time"), time_unit="s") - .dt.convert_time_zone(time_zone="US/Eastern") - .dt.replace_time_zone(None), - pl.from_epoch(pl.col("trip_update.timestamp"), time_unit="s") - .dt.convert_time_zone(time_zone="US/Eastern") - .dt.replace_time_zone(None), - pl.from_epoch(pl.col("feed_timestamp"), time_unit="s") - .dt.convert_time_zone(time_zone="US/Eastern") - .dt.replace_time_zone(None), - ) - - df = df.filter( - pl.col("trip_update.trip.revenue"), - ).select( - [ - "trip_update.trip.trip_id", - "trip_update.trip.start_date", - "trip_update.stop_time_update.departure.time", # - Convert from Epoch format to regular datetime - "feed_timestamp", # - Convert from Epoch format to regular datetime - ] - ) - - return df - - -def run_backfill() -> None: - """ - Full encapsulated method to call all of this backfill job - """ - local_tmp_output = "/tmp/gtfs-rt-continuous" - - start = datetime(2025, 10, 24, 0, 0, 0) - end = datetime(2025, 11, 24, 0, 0, 0) - - polars_filter = pl.col("trip_update.trip.route_id").is_in( - ["Red", "Orange", "Blue", "Green-B", "Green-C", "Green-D", "Green-E", "Mattapan"] - ) - - final_output_path = S3Location(S3_ARCHIVE, "lamp/adhoc/RT_TRIP_UPDATES_20251024_20251124") - - final_output_base_vp = S3Location(S3_ARCHIVE, "lamp/adhoc/RT_VEHICLE_POSITION_20251024_20251124.parquet") - - rt_vp_unfiltered_hyperjob = FilteredHyperJob( - remote_input_location=springboard_rt_vehicle_positions, - remote_output_location=final_output_base_vp, - start_date=start, - end_date=end, - processed_schema=convert_gtfs_rt_vehicle_position.VehiclePositions.to_pyarrow_schema(), - dataframe_filter=convert_gtfs_rt_vehicle_position.apply_gtfs_rt_vehicle_positions_timezone_conversions, - parquet_filter=FilterBankRtVehiclePositions.ParquetFilter.rail, - tableau_project_name=GTFS_RT_TABLEAU_PROJECT, - ) - - rt_vp_unfiltered_hyperjob.run_parquet() - rt_vp_unfiltered_hyperjob.create_local_hyper() - - delta_reingestion_runner( - start_date=start, - end_date=end, - local_output_location=local_tmp_output, - final_output_path=final_output_path, - polars_filter=polars_filter, - ) - - -if __name__ == "__main__": - run_backfill() +# # pylint: disable=too-many-positional-arguments,too-many-arguments, too-many-locals, redefined-outer-name, R0801 + +# import os +# from datetime import date, datetime, timedelta +# from queue import Queue + +# import dataframely as dy +# import polars as pl + +# from lamp_py.ingestion.backfill.delta_reingestion import delta_reingestion_runner +# from lamp_py.ingestion.convert_gtfs_rt_fullset import GtfsRtFullPartitionConverter +# from lamp_py.ingestion.converter import ConfigType + +# from lamp_py.aws.s3 import file_list_from_s3 +# from lamp_py.runtime_utils.remote_files import springboard_rt_vehicle_positions +# from lamp_py.runtime_utils.process_logger import ProcessLogger +# from lamp_py.runtime_utils.remote_files import LAMP, S3_ARCHIVE, S3Location + +# from lamp_py.tableau.conversions import convert_gtfs_rt_vehicle_position +# from lamp_py.tableau.jobs.filtered_hyper import FilteredHyperJob +# from lamp_py.tableau.jobs.lamp_jobs import GTFS_RT_TABLEAU_PROJECT +# from lamp_py.utils.filter_bank import FilterBankRtVehiclePositions + + +# def adhoc_convert_tz_filter_revenue_only(df: pl.DataFrame) -> pl.DataFrame: +# """ +# Docstring for adhoc_convert_tz_filter_revenue_only + +# :param df: trip_updates dataframe +# :type df: polars DataFrame +# :return: trip_updates with timezones converted and filtered to revenue only +# :rtype: polars DataFrame +# """ +# # Filter data to = TRUE, trip_update.stop_time_update.schedule_relationship != SKIPPED, and trip_update.trip.schedule_relationship != CANCELED +# df = df.with_columns( +# pl.from_epoch(pl.col("trip_update.stop_time_update.departure.time"), time_unit="s") +# .dt.convert_time_zone(time_zone="US/Eastern") +# .dt.replace_time_zone(None), +# pl.from_epoch(pl.col("trip_update.stop_time_update.arrival.time"), time_unit="s") +# .dt.convert_time_zone(time_zone="US/Eastern") +# .dt.replace_time_zone(None), +# pl.from_epoch(pl.col("trip_update.timestamp"), time_unit="s") +# .dt.convert_time_zone(time_zone="US/Eastern") +# .dt.replace_time_zone(None), +# pl.from_epoch(pl.col("feed_timestamp"), time_unit="s") +# .dt.convert_time_zone(time_zone="US/Eastern") +# .dt.replace_time_zone(None), +# ) + +# df = df.filter( +# pl.col("trip_update.trip.revenue"), +# ).select( +# [ +# "trip_update.trip.trip_id", +# "trip_update.trip.start_date", +# "trip_update.stop_time_update.departure.time", # - Convert from Epoch format to regular datetime +# "feed_timestamp", # - Convert from Epoch format to regular datetime +# ] +# ) + +# return df + + +# def run_backfill_gtfs_tu() -> None: +# """ +# Full encapsulated method to call all of this backfill job + +# Note: this is disabled and inoperable for now - will be reimplemented in follow-on backfill work +# """ +# local_tmp_output = "/tmp/gtfs-rt-continuous" + +# start = datetime(2025, 10, 24, 0, 0, 0) +# end = datetime(2025, 11, 24, 0, 0, 0) + +# polars_filter = pl.col("trip_update.trip.route_id").is_in( +# ["Red", "Orange", "Blue", "Green-B", "Green-C", "Green-D", "Green-E", "Mattapan"] +# ) + +# final_output_path = S3Location(S3_ARCHIVE, "lamp/adhoc/RT_TRIP_UPDATES_20251024_20251124") + +# final_output_base_vp = S3Location(S3_ARCHIVE, "lamp/adhoc/RT_VEHICLE_POSITION_20251024_20251124.parquet") + +# rt_vp_unfiltered_hyperjob = FilteredHyperJob( +# remote_input_location=springboard_rt_vehicle_positions, +# remote_output_location=final_output_base_vp, +# start_date=start, +# end_date=end, +# processed_schema=convert_gtfs_rt_vehicle_position.VehiclePositions.to_pyarrow_schema(), +# dataframe_filter=convert_gtfs_rt_vehicle_position.apply_gtfs_rt_vehicle_positions_timezone_conversions, +# parquet_filter=FilterBankRtVehiclePositions.ParquetFilter.rail, +# tableau_project_name=GTFS_RT_TABLEAU_PROJECT, +# ) + +# rt_vp_unfiltered_hyperjob.run_parquet() +# rt_vp_unfiltered_hyperjob.create_local_hyper() + + +# # delta_reingestion_runner( +# # start_date=start, +# # end_date=end, +# # local_output_location=local_tmp_output, +# # final_output_path=final_output_path, +# # ) + + +# if __name__ == "__main__": +# run_backfill_gtfs_tu() diff --git a/src/lamp_py/ad_hoc/backfill_runner_terminal_predictions.py b/src/lamp_py/ad_hoc/backfill_runner_terminal_predictions.py new file mode 100644 index 00000000..266c63d8 --- /dev/null +++ b/src/lamp_py/ad_hoc/backfill_runner_terminal_predictions.py @@ -0,0 +1,45 @@ +# pylint: disable=too-many-positional-arguments,too-many-arguments, too-many-locals, redefined-outer-name, R0801 + +import os +from datetime import datetime +from queue import Queue + +from lamp_py.ingestion.convert_gtfs_rt_fullset import GtfsRtFullPartitionConverter +from lamp_py.ingestion.backfill.delta_reingestion import delta_reingestion_runner +from lamp_py.ingestion.converter import ConfigType + +from lamp_py.runtime_utils.remote_files import S3_ARCHIVE, S3Location + +if __name__ == "__main__": + LOCAL_TMP_OUTPUT = "/tmp/gtfs-rt-continuous/" + + if not os.path.exists(LOCAL_TMP_OUTPUT): + os.makedirs(LOCAL_TMP_OUTPUT) + + start = datetime(2026, 3, 3, 0, 0, 0) + end = datetime(2026, 3, 3, 0, 0, 0) + config = ConfigType.RT_TRIP_UPDATES + + final_output_path = S3Location(S3_ARCHIVE, "lamp/adhoc/RT_TRIP_UPDATES_FULLSET") + final_output_path_daily = S3Location(S3_ARCHIVE, "lamp/adhoc/RT_TRIP_UPDATES") + + # construct and run converter once per day + converter = GtfsRtFullPartitionConverter( + config_type=config, + metadata_queue=Queue(), + local_output_location=LOCAL_TMP_OUTPUT, + # remote_output_location=final_output_path_daily, + max_workers=16, + time_chunk_minutes=15, + move_source_on_completion=True, + ) + + delta_reingestion_runner( + start_date=start.date(), + end_date=end.date(), + local_output_location=LOCAL_TMP_OUTPUT, + final_output_path=final_output_path, + converter=converter, + in_filter="mbta.com_realtime_TripUpdates_enhanced.json.gz", + bucket=S3_ARCHIVE, + ) diff --git a/src/lamp_py/ingestion/backfill/__init__.py b/src/lamp_py/ingestion/backfill/__init__.py new file mode 100644 index 00000000..a3507758 --- /dev/null +++ b/src/lamp_py/ingestion/backfill/__init__.py @@ -0,0 +1 @@ +"""Tools for backfilling and reprocessing data""" diff --git a/src/lamp_py/ingestion/backfill/delta_reingestion.py b/src/lamp_py/ingestion/backfill/delta_reingestion.py new file mode 100644 index 00000000..88bdd28a --- /dev/null +++ b/src/lamp_py/ingestion/backfill/delta_reingestion.py @@ -0,0 +1,161 @@ +# pylint: disable=too-many-positional-arguments,too-many-arguments, too-many-locals, redefined-outer-name, R0801 + +import os +from datetime import date, timedelta +import time +import tempfile +from typing import List, Tuple +import polars as pl +from pyarrow import fs +import pyarrow.dataset as pd +import pyarrow.parquet as pq +import pyarrow.compute as pc + +from lamp_py.ingestion.convert_gtfs_rt import GtfsRtConverter + +from lamp_py.aws.s3 import file_list_from_s3, object_exists +from lamp_py.runtime_utils.remote_files import S3_INCOMING +from lamp_py.runtime_utils.process_logger import ProcessLogger +from lamp_py.runtime_utils.remote_files import LAMP, S3_ARCHIVE, S3Location + + +def write_dataset_to_single_parquet_partitioned_and_sorted( + local_path: str, + output_parquet_path: str, + partition_column: str, + in_partition_sort: List[Tuple[str, str]], + debug_flag: bool = False, +) -> None: + """Write partitioned and sorted parquet file from pyarrow dataset.""" + logger = ProcessLogger("write_dataset_to_single_parquet_partitioned_and_sorted") + ds_paths = os.listdir(local_path) + ds_paths = [os.path.join(local_path, s) for s in ds_paths] + + ds = pd.dataset( + ds_paths, + format="parquet", + filesystem=fs.LocalFileSystem(), + ) + + with tempfile.TemporaryDirectory(delete=False): + # include the hash column for debug + writer = pq.ParquetWriter(output_parquet_path, schema=ds.schema, compression="zstd", compression_level=3) + + # Get unique partition values and sort them + unique_column = ds.to_table(columns=[partition_column]).column(partition_column).unique() + + partitions = sorted(pl.from_arrow(unique_column).to_list()) + logger.add_metadata(unique_partitions=len(partitions)) + + if debug_flag: + start_time = time.time() + + for part in partitions: + try: + write_table = ds.to_table(filter=pc.field(partition_column) == part).sort_by(in_partition_sort) + + writer.write_table(write_table) + + if debug_flag: + elapsed = time.time() - start_time + logger.add_metadata(partition_id=part, elapsed=elapsed, total_rows=write_table.num_rows) + except Exception as e: + logger.log_warning(e) + logger.add_metadata(partition_id=part, status="warning") + continue + + writer.close() + + logger.log_complete() + + +def delta_reingestion_runner( + start_date: date, + end_date: date, + final_output_path: S3Location, + converter: GtfsRtConverter, + in_filter: str | None = None, + local_output_location: str = "/tmp/gtfs-rt-continuous/", + bucket: str = S3_ARCHIVE, +) -> None: + """ + Docstring for delta_reingestion_runner + + :param start_date: start of reingestion range + :type start_date: date + :param end_date: end of reingestion range + :type end_date: date + :param final_output_base: final resting prefix-path of output artifacts + :type final_output_base: S3Location + :param local_output_location: temporary working directory to place outputs + :type local_output_location: str + :param bucket: S3 bucket to read from (S3_ARCHIVE or S3_INCOMING) + :type bucket: str + + """ + logger = ProcessLogger("delta_reingestion_runner") + logger.log_start() + + if bucket not in (S3_ARCHIVE, S3_INCOMING): + raise ValueError(f"bucket must be either S3_ARCHIVE or S3_INCOMING, got {bucket}") + + cur_date = start_date + + while cur_date <= end_date: + # setup input and output locations + local_combined_file = f"{local_output_location}/{cur_date.year}_{cur_date.month}_{cur_date.day}.parquet" + s3_combined_file = local_combined_file.replace( + f"{local_output_location}/", + f"{final_output_path.s3_uri}/year={cur_date.year}/month={cur_date.month}/day={cur_date.day}/", + ) + + # skip if output already exists - need a more robust version of this eventually + if object_exists(s3_combined_file): + logger.add_metadata(skipping_date=cur_date, reason="output already exists") + cur_date = cur_date + timedelta(days=1) + continue + + # otherwise, continue processing + prefix = ( + os.path.join( + LAMP, + "delta", + cur_date.strftime("%Y"), + cur_date.strftime("%m"), + cur_date.strftime("%d"), + ) + + "/" + ) + file_list = file_list_from_s3(bucket, prefix, in_filter=in_filter) + + logger.add_metadata(file_list_length=len(file_list)) + + #### Stage 1: local to local (MANY to many) + # converter = copy.deepcopy(converter_template_instance) + + converter.add_files(file_list) + converter.convert() + + # ## Stage 2: local to local (many to 1) + + # write_dataset_to_single_parquet_partitioned_and_sorted( + # local_converter_partition_path, + # local_combined_file, + # partition_column=converter.partition_column(), + # in_partition_sort=converter.table_sort_order(), + # debug_flag=True, + # ) + + # #### Stage 3: local to remote (one to one) + + # # upload local to remote + # upload_file( + # local_combined_file, + # s3_combined_file, + # ) + + cur_date = cur_date + timedelta(days=1) + + # cleanup + converter.clean_local_folders() # clear local output folders after each day to manage disk space + converter.reset_files() diff --git a/src/lamp_py/ingestion/compress_gtfs/gtfs_schema_map.py b/src/lamp_py/ingestion/compress_gtfs/gtfs_schema_map.py index 1f36a4c4..df751206 100644 --- a/src/lamp_py/ingestion/compress_gtfs/gtfs_schema_map.py +++ b/src/lamp_py/ingestion/compress_gtfs/gtfs_schema_map.py @@ -344,7 +344,7 @@ def gtfs_schema(gtfs_table_file: str) -> Dict[str, pl.DataType]: """ - get schema of gtfs table file with polars datatypes + Get schema of gtfs table file with polars datatypes :param gtfs_table_file: (ie. stop_times.txt) @@ -359,7 +359,7 @@ def gtfs_schema(gtfs_table_file: str) -> Dict[str, pl.DataType]: def gtfs_schema_list() -> List[str]: """ - create list of all expected gtfs table files, with feed_info.txt at the end + Create list of all expected gtfs table files, with feed_info.txt at the end :return List[gtfs table files (ie. stop_times.txt)] """ diff --git a/src/lamp_py/ingestion/compress_gtfs/gtfs_to_parquet.py b/src/lamp_py/ingestion/compress_gtfs/gtfs_to_parquet.py index 77cbbb4a..1f1f556b 100644 --- a/src/lamp_py/ingestion/compress_gtfs/gtfs_to_parquet.py +++ b/src/lamp_py/ingestion/compress_gtfs/gtfs_to_parquet.py @@ -118,7 +118,8 @@ def merge_frame_with_parquet(merge_df: pl.DataFrame, export_path: str, filter_da def compress_gtfs_file(gtfs_table_file: str, schedule_details: ScheduleDetails) -> None: """ - Compress an indivdual gtfs_table_file (ie. stop_times.txt) into yearly parquet partitioned parquet file(s) + Compress an indivdual gtfs_table_file (ie. stop_times.txt) into yearly parquet + partitioned parquet file(s) yearly partition is based on ScheduleDetals.active_from_int value (1 day after published_dt) diff --git a/src/lamp_py/ingestion/compress_gtfs/pq_to_sqlite.py b/src/lamp_py/ingestion/compress_gtfs/pq_to_sqlite.py index bddf4ea6..eb569663 100644 --- a/src/lamp_py/ingestion/compress_gtfs/pq_to_sqlite.py +++ b/src/lamp_py/ingestion/compress_gtfs/pq_to_sqlite.py @@ -10,7 +10,7 @@ def sqlite_type(pq_type: str) -> str: """ - return SQLITE type from pyarrow Field type + Return SQLITE type from pyarrow Field type """ if "int" in pq_type: return "INTEGER" @@ -25,7 +25,7 @@ def sqlite_type(pq_type: str) -> str: def sqlite_table_query(table_name: str, schema: pyarrow.Schema) -> str: """ - return CREATE TABLE query for sqlite table from pyarrow schema + Return CREATE TABLE query for sqlite table from pyarrow schema """ logger = ProcessLogger("sqlite_create_table") logger.log_start() @@ -44,7 +44,7 @@ def sqlite_table_query(table_name: str, schema: pyarrow.Schema) -> str: def pq_folder_to_sqlite(year_path: str) -> None: """ - load all files from year_path folder into SQLITE3 db file + Load all files from year_path folder into SQLITE3 db file """ logger = ProcessLogger("pq_to_sqlite", year_path=year_path) logger.log_start() diff --git a/src/lamp_py/ingestion/compress_gtfs/schedule_details.py b/src/lamp_py/ingestion/compress_gtfs/schedule_details.py index 0f5457ba..89cf04b2 100644 --- a/src/lamp_py/ingestion/compress_gtfs/schedule_details.py +++ b/src/lamp_py/ingestion/compress_gtfs/schedule_details.py @@ -68,7 +68,7 @@ def __post_init__(self) -> None: def headers_from_file(self, gtfs_table_file: str) -> List[str]: """ - extract header columns from gtfs_table_file + Extract header columns from gtfs_table_file :param gtfs_table_file (ie. stop_times.txt) @@ -85,7 +85,7 @@ def headers_from_file(self, gtfs_table_file: str) -> List[str]: def gtfs_to_frame(self, gtfs_table_file: str) -> pl.DataFrame: """ - create frame from .txt gtfs table + Create frame from .txt gtfs table dataframe will include all columns that are defined in polars_schema_map for gtfs_table_file if defined columns are missing from .txt table they are added with all NULL values @@ -183,7 +183,7 @@ def gtfs_to_frame(self, gtfs_table_file: str) -> pl.DataFrame: def schedules_to_compress(tmp_folder: str) -> pl.DataFrame: """ - compare already compressed schedule files to schedules available in the MBTA + Compare already compressed schedule files to schedules available in the MBTA feed archive (https://cdn.mbta.com/archive/archived_feeds.txt) to determine which schedules need to be compressed diff --git a/src/lamp_py/ingestion/config_busloc_trip.py b/src/lamp_py/ingestion/config_busloc_trip.py index ee376ce5..897be141 100644 --- a/src/lamp_py/ingestion/config_busloc_trip.py +++ b/src/lamp_py/ingestion/config_busloc_trip.py @@ -17,7 +17,7 @@ class RtBusTripDetail(GTFSRTDetail): """ def transform_for_write(self, table: pyarrow.table) -> pyarrow.table: - """modify table schema before write to parquet""" + """Modify table schema before write to parquet""" return flatten_table_schema(explode_table_column(flatten_table_schema(table), "trip_update.stop_time_update")) @property diff --git a/src/lamp_py/ingestion/config_rt_trip.py b/src/lamp_py/ingestion/config_rt_trip.py index 2cf53f07..07af4190 100644 --- a/src/lamp_py/ingestion/config_rt_trip.py +++ b/src/lamp_py/ingestion/config_rt_trip.py @@ -17,7 +17,7 @@ class RtTripDetail(GTFSRTDetail): """ def transform_for_write(self, table: pyarrow.table) -> pyarrow.table: - """modify table schema before write to parquet""" + """Modify table schema before write to parquet""" return flatten_table_schema(explode_table_column(flatten_table_schema(table), "trip_update.stop_time_update")) @property diff --git a/src/lamp_py/ingestion/convert_gtfs.py b/src/lamp_py/ingestion/convert_gtfs.py index 85464cf6..ed5c5f69 100644 --- a/src/lamp_py/ingestion/convert_gtfs.py +++ b/src/lamp_py/ingestion/convert_gtfs.py @@ -26,7 +26,7 @@ def gtfs_files_to_convert() -> List[Tuple[str, int]]: """ - create list of Tuple[GTFS url, version_key] for GtfsConverter + Create list of Tuple[GTFS url, version_key] for GtfsConverter version_key is based on published_dt """ @@ -75,7 +75,7 @@ def convert(self) -> None: def process_schedule(self, url: str, version_key: int) -> None: """ - convert a schedule gtfs zip file into tables. the zip file is + Convert a schedule gtfs zip file into tables. the zip file is essentially a small database with each contained file (outside of feed info) acting as its own table. info on the gtfs scheduling standard can be found at http://gtfs.org/schedule/ @@ -99,7 +99,7 @@ def process_schedule(self, url: str, version_key: int) -> None: def create_table(self, gtfs_zip: zipfile.ZipFile, table_filename: str, version_key: int) -> None: """ - read a csv table out of a gtfs static schedule file, add a timestamp + Read a csv table out of a gtfs static schedule file, add a timestamp column to each row, and write it as a parquet file on s3, partitioned by the timestamp diff --git a/src/lamp_py/ingestion/convert_gtfs_rt.py b/src/lamp_py/ingestion/convert_gtfs_rt.py index d1d6cb18..634100dd 100644 --- a/src/lamp_py/ingestion/convert_gtfs_rt.py +++ b/src/lamp_py/ingestion/convert_gtfs_rt.py @@ -136,7 +136,12 @@ class GtfsRtConverter(Converter): https_mbta_integration.mybluemix.net_vehicleCount.gz """ - def __init__(self, config_type: ConfigType, metadata_queue: Queue[Optional[str]], max_workers: int = 4) -> None: + def __init__( + self, + config_type: ConfigType, + metadata_queue: Queue[Optional[str]], + max_workers: int = 8, + ) -> None: Converter.__init__(self, config_type, metadata_queue) # Depending on filename, assign self.details to correct implementation @@ -204,7 +209,7 @@ def convert(self) -> None: def thread_init(self) -> None: """ - initialize the filesystem in each convert thread + Initialize the filesystem in each convert thread update the active fs to use the s3 filesystem for all loading if the first file starts with s3 @@ -217,11 +222,10 @@ def thread_init(self) -> None: def process_files(self) -> Iterable[pyarrow.table]: """ - iterate through all of the files to be converted + Iterate through all of the files to be converted only yield a new table when table size crosses over min_rows of yield_check """ - process_logger = ProcessLogger( "create_pyarrow_tables", config_type=str(self.config_type), @@ -272,7 +276,7 @@ def process_files(self) -> Iterable[pyarrow.table]: def yield_check(self, process_logger: ProcessLogger, min_rows: int = 2_000_000) -> Iterable[pyarrow.table]: """ - yield all tables in the data_parts map that have been sufficiently + Yield all tables in the data_parts map that have been sufficiently processed. @min_rows - how many rows the table must have to be yielded @@ -367,7 +371,7 @@ def gz_to_pyarrow(self, filename: str) -> Tuple[Optional[datetime], str, Optiona def partition_dt(self, table: pyarrow.Table) -> datetime: """ - verify partition structure of pyarrow Table + Verify partition structure of pyarrow Table :param table: pyarrow Table to verify @@ -419,7 +423,7 @@ def sync_with_s3(self, local_path: str) -> bool: def make_hash_dataset(self, table: pyarrow.Table, local_path: str) -> pd.Dataset: """ - create dataset, with hash column, that will be written to parquet file + Create dataset, with hash column, that will be written to parquet file :param table: pyarrow Table :param local_path: path to local parquet file @@ -451,9 +455,9 @@ def make_hash_dataset(self, table: pyarrow.Table, local_path: str) -> pd.Dataset # pylint: disable=R0914 # pylint too many local variables (more than 15) - def write_local_pq(self, table: pyarrow.Table, local_path: str) -> None: + def write_local_pq_partition(self, table: pyarrow.Table, local_path: str) -> None: """ - merge pyarrow Table with existing local_path parquet file + Merge pyarrow Table with existing local_path parquet file :param table: pyarrow Table :param local_path: path to local parquet file @@ -586,7 +590,7 @@ def continuous_pq_update(self, table: pyarrow.Table) -> None: log.add_metadata(local_path=local_path) - self.write_local_pq(table, local_path) + self.write_local_pq_partition(table, local_path) self.send_metadata(local_path.replace(self.tmp_folder, S3_SPRINGBOARD)) # record the number of rows in the final parquet file for logging @@ -610,7 +614,7 @@ def continuous_pq_update(self, table: pyarrow.Table) -> None: def clean_local_folders(self) -> None: """ - clean local temp folders + Clean local temp folders """ days_to_keep = 2 root_folder = os.path.join( @@ -630,7 +634,7 @@ def clean_local_folders(self) -> None: def move_s3_files(self) -> None: """ - move archive and error files to their respective s3 buckets. + Move archive and error files to their respective s3 buckets. """ if len(self.error_files) > 0: self.error_files = move_s3_objects( diff --git a/src/lamp_py/ingestion/convert_gtfs_rt_fullset.py b/src/lamp_py/ingestion/convert_gtfs_rt_fullset.py new file mode 100644 index 00000000..474e866d --- /dev/null +++ b/src/lamp_py/ingestion/convert_gtfs_rt_fullset.py @@ -0,0 +1,297 @@ +# pylint: disable=too-many-positional-arguments,too-many-arguments, too-many-locals, redefined-outer-name, R0801 + +from concurrent.futures import Future, ThreadPoolExecutor +import logging +import os +from datetime import datetime +from pathlib import Path +from queue import Queue +from typing import Dict, Iterable, List, Optional, Tuple +import pyarrow +import polars as pl + +from lamp_py.aws.s3 import move_s3_objects, upload_file +from lamp_py.ingestion.config_rt_trip import RtTripDetail +from lamp_py.ingestion.convert_gtfs_rt import GtfsRtConverter, TableData +from lamp_py.ingestion.converter import ConfigType + +from lamp_py.runtime_utils.process_logger import ProcessLogger +from lamp_py.runtime_utils.remote_files import LAMP, S3_ARCHIVE, S3_ERROR, S3Location + + +# pylint: disable=too-many-arguments,too-many-instance-attributes +class GtfsRtFullPartitionConverter(GtfsRtConverter): + """ + Converter that handles GTFS Real Time JSON data + + https_cdn.mbta.com_realtime_TripUpdates_enhanced.json.gz + """ + + def __init__( + self, + config_type: ConfigType, + metadata_queue: Queue[Optional[str]], + local_output_location: str = "/tmp/gtfs-rt-fullset/", + remote_output_location: S3Location | None = None, + polars_filter: pl.Expr = pl.lit(True), # default to true - which will essentially not filter + max_workers: int = 8, + time_chunk_minutes: int = 15, + move_source_on_completion: bool = False, + ) -> None: + """ + Initialize GTFS-RT fullset converter with time-chunked partitioning. + + Args: + config_type: Type of GTFS-RT configuration (trip updates, vehicle positions, alerts). + metadata_queue: Queue for metadata communication. + local_output_location: Local directory for temporary parquet files. + remote_output_location: S3 location for final output; if None, files stay local. + polars_filter: Polars expression to filter data at conversion time; defaults to no filtering. + max_workers: Number of worker threads for parallel processing. + time_chunk_minutes: Minutes for time-based partitioning (e.g., 15 min intervals). + move_source_on_completion: If True, move source files to archive after completion. + """ + GtfsRtConverter.__init__(self, config_type, metadata_queue, max_workers=max_workers) + + if time_chunk_minutes < 5: + raise ValueError( + "time_chunk_minutes must be at least 5 to ensure proper partitioning and avoid too many small files" + ) + self.detail = RtTripDetail() + self.tmp_folder = os.path.join(local_output_location, LAMP, str(self.config_type)) + self.remote_output_location = remote_output_location + self.data_parts: Dict[datetime, TableData] = {} + self.filter = polars_filter + self.move_source_on_completion = move_source_on_completion + self.time_chunk_minutes = time_chunk_minutes + + def convert(self) -> None: + """ + Convert all files in self.files to time-chunked parquet partitions. + + Specialization over GtfsRtConverter::convert() - this converter does not + apply unique() on the incoming GTFS-RT data, and will partition files by + day or by time chunk (e.g. 15 min intervals) depending on the + time_chunk_minutes parameter. It will also apply a polars filter to the + data at the point of conversion from json.gz to pyarrow table, which + should help reduce the amount of data being written out and speed up the + conversion process. This converter is applicable for live ingestion and + backfill tasks. + """ + process_logger = ProcessLogger( + "fullset_gtfs_parquet_table_creator", + table_type="gtfs-rt", + config_type=str(self.config_type), + file_count=len(self.files), + ) + process_logger.log_start() + + table_count = 0 + move_futures: List[Future] = [] + try: + for table, partition_dt in self.process_files(): + if table.num_rows == 0: + continue + + path_suffix = os.path.join( + f"year={partition_dt.year}", + f"month={partition_dt.month}", + f"day={partition_dt.day}", + f"{partition_dt.isoformat()}.parquet", + ) + + local_path = os.path.join(self.tmp_folder, path_suffix) + + os.makedirs(Path(local_path).parent, exist_ok=True) + + self.write_local_pq_partition(table, local_path) + + # in backfill mode, we don't want to move files around in s3 - + # we just want to write the converted files to the output location + # (which could be a temp location or the final archive location). + # in non-backfill mode, we want to move the original gtfs-rt files from + # incoming to archive after successful conversion. + if self.move_source_on_completion: + self.move_s3_files() + + # mirror on s3 if remote output location is provided + if self.remote_output_location is not None: + s3_path = os.path.join(self.remote_output_location.s3_uri, path_suffix) + upload_file(local_path, s3_path) + + pool = pyarrow.default_memory_pool() + pool.release_unused() + table_count += 1 + process_logger.add_metadata(table_count=table_count) + + # wait for all background s3 moves to finish + for future in move_futures: + future.result() + + except Exception as exception: + process_logger.log_failure(exception) + else: + process_logger.log_complete() + finally: + self.data_parts = {} + if self.move_source_on_completion: + self.move_s3_files() + self.clean_local_folders() + + @staticmethod + def _move_s3_files_async(archive_files: List[str], error_files: List[str]) -> None: + """Move archive and error files to S3 in a background thread.""" + if error_files: + move_s3_objects(error_files, os.path.join(S3_ERROR, LAMP)) + if archive_files: + move_s3_objects(archive_files, os.path.join(S3_ARCHIVE, LAMP)) + + def write_local_pq_partition(self, table: pyarrow.Table, local_path: str) -> None: + """ + Just write the file out.. + + this should be already sorted by timestamp based on how self.files is yielded. + """ + df: pl.DataFrame = pl.from_arrow(table).filter(self.filter) # type: ignore[arg-type, assignment] + + # read local_path if exists and concat with table + # this handles the case where a prior iteration yielded an incomplete time chunk, + # and we are now filling in the remaining record that is part of that chunk + if os.path.exists(local_path): + existing_table: pl.DataFrame = pl.read_parquet(local_path) + df = pl.concat([existing_table, df]) + + if not self.move_source_on_completion: + df = ( + df.unique() + ) # unique is appropriate here to ensure we don't write duplicates when files are NOT moved for backfill usecase + df.write_parquet(local_path, compression="zstd", compression_level=3) + + def process_files(self) -> Iterable[pyarrow.table]: + """ + Yield time-chunked parquet tables from all input files. + + Applies a polars filter to narrow results at the source to reduce write + churn. Converts json.gz input files to pyarrow tables, groups into time + chunk intervals, and yields tables for completed intervals as we go. + """ + process_logger = ProcessLogger( + "fullset_create_pyarrow_tables", + config_type=str(self.config_type), + ) + process_logger.log_start() + + with ThreadPoolExecutor(max_workers=self.max_workers, initializer=self.thread_init) as pool: + # for file in self.files: + # result_dt, result_filename, rt_data = self.gz_to_pyarrow(file) + + for result_dt, result_filename, rt_data in pool.map(self.gz_to_pyarrow, self.files): + # errors in gtfs_rt conversions are handled in the gz_to_pyarrow + # function. if one is encountered, the datetime will be none. log + # the error and move on to the next file. + + if result_dt is None: + logging.error( + "skipping processing: %s", + result_filename, + ) + continue + + # create key for self.data_parts dictionary that bins based on the chunk interval + dt_part = self._interval_key(result_dt) + + # create new self.table_groups entry for key if it doesn't exist + if dt_part not in self.data_parts: + self.data_parts[dt_part] = TableData() + self.data_parts[dt_part].table = self.detail.transform_for_write(rt_data) + else: + self.data_parts[dt_part].table = pyarrow.concat_tables( + [self.data_parts[dt_part].table, self.detail.transform_for_write(rt_data)] + ) + + self.data_parts[dt_part].files.append(result_filename) + + # we're mapping each gz to a range dt_part. when we have > 1 dt_parts in the map, + # that means we've processed files from at least 2 different time intervals and + # can start yielding tables for the intervals that are complete. + # this relies on self.files being sorted - enforced by add_files(), + # and ThreadPoolExecutor/map yields __next__ iterator, i.e. returns in the right order + if len(self.data_parts) > 1: + yield from self.yield_check_periodic(process_logger, result_dt) + + yield from self.yield_check_periodic(process_logger, flush=True) + + process_logger.add_metadata(file_count=0, number_of_rows=0) + process_logger.log_complete() + + def partition_column(self) -> str: + """Return the column name for partitioning output parquet files.""" + return "trip_update.trip.route_id" + + def table_sort_order(self) -> List[Tuple[str, str]]: + """Return sort order specification for output parquet tables.""" + return [ + ("feed_timestamp", "ascending"), + ("trip_update.trip.route_pattern_id", "ascending"), + ("trip_update.trip.direction_id", "ascending"), + ("trip_update.vehicle.id", "ascending"), + ] + + def _interval_key(self, ts: datetime) -> datetime: + """ + Truncate a UTC datetime to its wall-clock-aligned interval start. + + For time_chunk_minutes=15: + 01:07 -> 01:00, 01:15 -> 01:15, 01:29 -> 01:15, 23:59 -> 23:45 + + Returns a naive datetime (no timezone) matching the existing data_parts key convention. + """ + total_minutes = ts.hour * 60 + ts.minute + aligned_minutes = (total_minutes // self.time_chunk_minutes) * self.time_chunk_minutes + return datetime( + year=ts.year, + month=ts.month, + day=ts.day, + hour=aligned_minutes // 60, + minute=aligned_minutes % 60, + ) + + def yield_check_periodic( + self, + process_logger: ProcessLogger, + current_ts: datetime = datetime.now(), + flush: bool = False, + ) -> Iterable[Tuple[pyarrow.table, datetime]]: + """ + Yield completed time-chunk intervals from data_parts. + + When flush=True, yield all remaining intervals regardless of current_ts. + + @current_ts - the feed timestamp of the file just processed + @flush - if True, yield everything remaining + """ + current_interval = self._interval_key(current_ts) + for iter_ts in list(self.data_parts.keys()): + table = self.data_parts[iter_ts].table + if table is None: + continue + + # yield if we've moved past this interval + # or if flushing all remaining data + if flush or current_interval > iter_ts: + # only populate archive_files if we're in live ingestion mode + if self.move_source_on_completion: + self.archive_files += self.data_parts[iter_ts].files + + process_logger.add_metadata( + file_count=len(self.data_parts[iter_ts].files), + number_of_rows=table.num_rows, + interval_start=iter_ts.isoformat(), + ) + process_logger.log_complete() + process_logger.add_metadata(file_count=0, number_of_rows=0, print_log=False) + process_logger.log_start() + + yield (table, iter_ts) + + del self.data_parts[iter_ts] diff --git a/src/lamp_py/ingestion/converter.py b/src/lamp_py/ingestion/converter.py index 9388cf1c..fb6d0362 100644 --- a/src/lamp_py/ingestion/converter.py +++ b/src/lamp_py/ingestion/converter.py @@ -113,16 +113,20 @@ def __init__(self, config_type: ConfigType, metadata_queue: Queue[Optional[str]] self.metadata_queue: Queue[Optional[str]] = metadata_queue def add_files(self, files: List[str]) -> None: - """add files to this converter""" + """Add files to this converter""" self.files += files + def reset_files(self) -> None: + """Remove files from this converter""" + self.files = [] + def send_metadata(self, written_file: str) -> None: - """send metadata path to rds writer process""" + """Send metadata path to rds writer process""" self.metadata_queue.put(written_file) @abstractmethod def convert(self) -> None: """ - convert files to pyarrow tables, write them to s3 as parquete, and move + Convert files to pyarrow tables, write them to s3 as parquete, and move files from incoming to archive (or error) """ diff --git a/src/lamp_py/ingestion/daily/__init__.py b/src/lamp_py/ingestion/daily/__init__.py new file mode 100644 index 00000000..bc18ef55 --- /dev/null +++ b/src/lamp_py/ingestion/daily/__init__.py @@ -0,0 +1 @@ +"""Tools to compress various gtfs after a whole day""" diff --git a/src/lamp_py/ingestion/daily/config.py b/src/lamp_py/ingestion/daily/config.py new file mode 100644 index 00000000..55108cac --- /dev/null +++ b/src/lamp_py/ingestion/daily/config.py @@ -0,0 +1,2 @@ +START_HOUR = 1 # 1 am UTC, 9 pm EST - after rollover of ingestion dataset. +END_HOUR = 7 # 7 am UTC, 3 am EST - before start of next day. diff --git a/src/lamp_py/ingestion/daily/trip_updates.py b/src/lamp_py/ingestion/daily/trip_updates.py new file mode 100644 index 00000000..5673fbfb --- /dev/null +++ b/src/lamp_py/ingestion/daily/trip_updates.py @@ -0,0 +1,75 @@ +from datetime import datetime, timezone +import polars as pl +from lamp_py.ingestion.daily.config import END_HOUR, START_HOUR +from lamp_py.utils.filter_bank import HeavyRailFilter, LightRailFilter + + +def within_daily_processing_window() -> bool: + """Check if current time is within the daily processing window.""" + now = datetime.now(timezone.utc) + hour = now.hour + return START_HOUR <= hour < END_HOUR + + +def reprocess_trip_updates_terminal_prediction() -> bool: + """Filter fullset trip updates for heavy/light rail terminal predictions.""" + # Stub for future implementation + all_terminal_stops = LightRailFilter.terminal_stop_ids + HeavyRailFilter.terminal_stop_ids + _polars_filter = pl.col("trip_update.trip.route_id").is_in( + ["Red", "Orange", "Blue", "Green-B", "Green-C", "Green-D", "Green-E", "Mattapan"] + ) & pl.col("trip_update.stop_time_update.stop_id").is_in(all_terminal_stops) + + return False + + +# def consolidate_partitions_for_archive(local_converter_partition_path: date) -> bool: + +# write_dataset_to_single_parquet_partitioned_and_sorted( +# local_converter_partition_path, +# local_combined_file, +# partition_column=converter.partition_column(), +# in_partition_sort=converter.table_sort_order(), +# debug_flag=True, +# ) + +# #### Stage 3: local to remote (one to one) + +# # upload local to remote +# upload_file( +# local_combined_file, +# s3_combined_file, +# ) + + +# ## Stage 2: local to local (many to 1) + +# # Define the path to your input Parquet files (can use a glob pattern) +# converter_output_path = f"{local_output_location}/lamp/RT_TRIP_UPDATES/year={cur_date.year}/month={cur_date.month}/day={cur_date.day}/" +# consolidated_parquet_output_file = ( +# f"{local_output_location}/{cur_date.year}_{cur_date.month}_{cur_date.day}.parquet" +# ) + +# # Create a dataset from the input files +# ds = pd.dataset(converter_output_path, format="parquet") + +# with pq.ParquetWriter( +# consolidated_parquet_output_file, schema=ds.schema, compression="zstd", compression_level=3 +# ) as writer: +# for batch in ds.to_batches(batch_size=512 * 1024): +# writer.write_batch(batch) + +# #### Stage 3: local to remote (one to one) + +# # upload local to remote +# upload_file( +# consolidated_parquet_output_file, +# consolidated_parquet_output_file.replace( +# f"{local_output_location}/", +# f"{final_output_path.s3_uri}/year={cur_date.year}/month={cur_date.month}/day={cur_date.day}/", +# ), +# ) + + +# def reprocess_delta_backfill(config: ConfigType, start_date: date, end_date: date) -> bool: + +# return True diff --git a/src/lamp_py/ingestion/glides.py b/src/lamp_py/ingestion/glides.py index 6eea7939..1df91ea1 100644 --- a/src/lamp_py/ingestion/glides.py +++ b/src/lamp_py/ingestion/glides.py @@ -213,7 +213,7 @@ def unique_key(self) -> str: """Key in record['data'] that is unique to this event type""" def download_remote(self) -> None: - """download the remote parquet path for appending""" + """Download the remote parquet path for appending""" if os.path.exists(self.local_path): os.remove(self.local_path) @@ -405,7 +405,7 @@ def ingest_glides_events( kinesis_reader: KinesisReader, metadata_queue: Queue[Optional[str]], upload: bool = False ) -> None: """ - ingest glides records from the kinesis stream and add them to parquet files + Ingest glides records from the kinesis stream and add them to parquet files """ process_logger = ProcessLogger(process_name="ingest_glides_events") process_logger.log_start() diff --git a/src/lamp_py/ingestion/gtfs_rt_detail.py b/src/lamp_py/ingestion/gtfs_rt_detail.py index 99718434..0e17259f 100644 --- a/src/lamp_py/ingestion/gtfs_rt_detail.py +++ b/src/lamp_py/ingestion/gtfs_rt_detail.py @@ -16,7 +16,7 @@ class GTFSRTDetail(ABC): """ def transform_for_write(self, table: pyarrow.table) -> pyarrow.table: - """modify table schema before write to parquet""" + """Modify table schema before write to parquet""" return flatten_table_schema(table) @property diff --git a/src/lamp_py/ingestion/ingest_gtfs.py b/src/lamp_py/ingestion/ingest_gtfs.py index 81aa1728..2f95f79b 100644 --- a/src/lamp_py/ingestion/ingest_gtfs.py +++ b/src/lamp_py/ingestion/ingest_gtfs.py @@ -11,10 +11,10 @@ move_s3_objects, file_list_from_s3, ) +from lamp_py.ingestion.convert_gtfs_rt_fullset import GtfsRtFullPartitionConverter from lamp_py.runtime_utils.process_logger import ProcessLogger from lamp_py.ingestion.convert_gtfs import GtfsConverter -from lamp_py.ingestion.convert_gtfs_rt import GtfsRtConverter from lamp_py.ingestion.converter import ( ConfigType, Converter, @@ -24,7 +24,7 @@ NoImplException, IgnoreIngestion, ) -from lamp_py.runtime_utils.remote_files import LAMP, S3_ERROR, S3_INCOMING +from lamp_py.runtime_utils.remote_files import LAMP, S3_ERROR, S3_INCOMING, S3_SPRINGBOARD, S3Location from lamp_py.ingestion.utils import group_sort_file_list from lamp_py.ingestion.compress_gtfs.gtfs_to_parquet import gtfs_to_parquet @@ -52,7 +52,7 @@ def run_converter(converter: Converter) -> None: def ingest_gtfs_archive(metadata_queue: Queue[Optional[str]]) -> None: """ - ingest gtfs schedules from MBTA GTFS schedule archive + Ingest gtfs schedules from MBTA GTFS schedule archive """ logger = ProcessLogger(process_name="ingest_gtfs") logger.log_start() @@ -65,7 +65,7 @@ def ingest_gtfs_archive(metadata_queue: Queue[Optional[str]]) -> None: def ingest_s3_files(metadata_queue: Queue[Optional[str]], bucket_filter: str = LAMP) -> None: """ - get all of the filepaths currently in the incoming bucket, sort them into + Get all of the filepaths currently in the incoming bucket, sort them into batches of similar gtfs-rt files, convert each batch into tables, write the tables to parquet files in the springboard bucket, add the parquet filepaths to the metadata table as unprocessed, and move gtfs files to the @@ -75,7 +75,7 @@ def ingest_s3_files(metadata_queue: Queue[Optional[str]], bucket_filter: str = L logger.log_start() try: - files = file_list_from_s3(bucket_name=S3_INCOMING, file_prefix=bucket_filter, max_list_size=10000) + files = file_list_from_s3(bucket_name=S3_INCOMING, file_prefix=bucket_filter, max_list_size=50000) grouped_files = group_sort_file_list(files) @@ -94,7 +94,15 @@ def ingest_s3_files(metadata_queue: Queue[Optional[str]], bucket_filter: str = L try: config_type = ConfigType.from_filename(file_group[0]) if config_type not in converters: - converters[config_type] = GtfsRtConverter(config_type, metadata_queue) + if config_type in (ConfigType.RT_ALERTS, ConfigType.VEHICLE_COUNT, ConfigType.SCHEDULE): + converters[config_type] = GtfsConverter(config_type, metadata_queue) + else: # all TripUpdates, VehiclePositions + converters[config_type] = GtfsRtFullPartitionConverter( + config_type, + metadata_queue, + remote_output_location=S3Location(S3_SPRINGBOARD, os.path.join(LAMP, str(config_type))), + move_source_on_completion=True, + ) converters[config_type].add_files(file_group) except IgnoreIngestion: continue @@ -123,6 +131,7 @@ def ingest_s3_files(metadata_queue: Queue[Optional[str]], bucket_filter: str = L process_count = os.cpu_count() if process_count is None: process_count = 4 + if len(converters) > 0: with get_context("spawn").Pool(processes=process_count, maxtasksperchild=1) as pool: pool.map_async(run_converter, converters.values()) @@ -134,7 +143,7 @@ def ingest_s3_files(metadata_queue: Queue[Optional[str]], bucket_filter: str = L def ingest_gtfs(metadata_queue: Queue[Optional[str]], bucket_filter: str = LAMP) -> None: """ - ingest all gtfs file types + Ingest all gtfs file types static schedule files should be ingested first """ diff --git a/src/lamp_py/ingestion/pipeline.py b/src/lamp_py/ingestion/pipeline.py index 6ed15ec9..bcd645d3 100755 --- a/src/lamp_py/ingestion/pipeline.py +++ b/src/lamp_py/ingestion/pipeline.py @@ -1,5 +1,4 @@ #!/usr/bin/env python - import os import time import logging @@ -24,7 +23,7 @@ def main() -> None: """ - run the ingestion pipeline + Run the ingestion pipeline * setup metadata queue metadata writer process * setup a glides kinesis reader @@ -40,23 +39,41 @@ def main() -> None: # connect to the glides kinesis stream glides_reader = KinesisReader(stream_name="ctd-glides-prod") + # today = datetime.now(timezone.utc).date() + # run the event loop every 30 seconds while True: process_logger = ProcessLogger(process_name="main") process_logger.log_start() bucket_filter = LAMP check_for_sigterm(metadata_queue, rds_process) + ingest_gtfs(metadata_queue, bucket_filter=bucket_filter) ingest_glides_events(glides_reader, metadata_queue, upload=True) check_for_sigterm(metadata_queue, rds_process) + # # if datetime has moved on (i.e. it's the next day) + # if today != datetime.now(timezone.utc).date(): + + # in_processing_window=within_daily_processing_window() + # process_logger.add_metadata(in_processing_window=in_processing_window) + + # # this doesn't make use of the processing window fully. we need to have other parts of ingestion + # # populate a queue. this queue can be written to disk occasionally to pick back up...todo + # if in_processing_window: + # # it is next day, do repack and reset + # yesterday = today + # today = datetime.now(timezone.utc).date() + # status = consolidate_partitions_for_archive(yesterday, config=[ConfigType.TRIP_UPDATES]) + # process_logger.add_metadata(consolidate_day=yesterday,consolidation_status=status) + process_logger.log_complete() time.sleep(30) def start() -> None: - """configure and start the ingestion process""" + """Configure and start the ingestion process""" clear_folder("/tmp") # setup handling shutdown commands signal.signal(signal.SIGTERM, handle_ecs_sigterm) diff --git a/src/lamp_py/ingestion/utils.py b/src/lamp_py/ingestion/utils.py index 4f261d20..060e88ff 100644 --- a/src/lamp_py/ingestion/utils.py +++ b/src/lamp_py/ingestion/utils.py @@ -22,7 +22,7 @@ def group_sort_file_list(filepaths: List[str]) -> Dict[str, List[str]]: """ - group and sort list of filepaths by filename + Group and sort list of filepaths by filename expects s3 file paths that can be split on timestamp: @@ -41,7 +41,7 @@ def group_sort_file_list(filepaths: List[str]) -> Dict[str, List[str]]: def strip_timestamp(fileobject: str) -> str: """ - utility for sorting pulling timestamp string out of file path. + Utility for sorting pulling timestamp string out of file path. assumption is that the objects will have a bunch of "directories" that pathlib can parse out, and the filename will start with a timestamp "YYY-MM-DDTHH:MM:SSZ" (20 char) format. @@ -108,7 +108,7 @@ def date_from_feed_version(feed_version: str) -> datetime.datetime: def ordered_schedule_frame() -> pl.DataFrame: """ - create de-duplicated and ordered frame of all MBTA gtfs schedules from + Create de-duplicated and ordered frame of all MBTA gtfs schedules from https://cdn.mbta.com/archive/archived_feeds.txt de-duplicated on: published_date @@ -177,7 +177,7 @@ def file_as_bytes_buf(file: str) -> BytesIO: def flatten_table_schema(table: pyarrow.table) -> pyarrow.table: - """flatten pyarrow table if struct column type exists""" + """Flatten pyarrow table if struct column type exists""" for field in table.schema: if str(field.type).startswith("struct"): return flatten_table_schema(table.flatten()) @@ -185,7 +185,7 @@ def flatten_table_schema(table: pyarrow.table) -> pyarrow.table: def explode_table_column(table: pyarrow.table, column: str) -> pyarrow.table: - """explode list-like column of pyarrow table by creating rows for each list value""" + """Explode list-like column of pyarrow table by creating rows for each list value""" other_columns = list(table.schema.names) other_columns.remove(column) indices = pc.list_parent_indices(table[column]) @@ -205,7 +205,7 @@ def explode_table_column(table: pyarrow.table, column: str) -> pyarrow.table: def hash_gtfs_rt_table(table: pyarrow.Table) -> pyarrow.Table: """ - add GTFS_RT_HASH_COL column to pyarrow table, if not already present + Add GTFS_RT_HASH_COL column to pyarrow table, if not already present """ log = ProcessLogger( "hash_gtfs_rt_table", table_rows=table.num_rows, table_mbs=round(table.nbytes / (1024 * 1024), 2) @@ -229,7 +229,7 @@ def hash_gtfs_rt_table(table: pyarrow.Table) -> pyarrow.Table: def hash_gtfs_rt_parquet(path: str) -> None: """ - add GTFS_RT_HASH_COL to local parquet file, if not already present + Add GTFS_RT_HASH_COL to local parquet file, if not already present """ ds = pq.ParquetFile(path) ds_schema = ds.schema.to_arrow_schema() @@ -259,7 +259,7 @@ def hash_gtfs_rt_parquet(path: str) -> None: def gzip_file(path: str, keep_original: bool = False) -> None: """ - gzip local file + Gzip local file :param path: local file path :param keep_original: keep original non-gzip file = False diff --git a/tests/ingestion/test_daily_jobs.py b/tests/ingestion/test_daily_jobs.py new file mode 100644 index 00000000..b2f69472 --- /dev/null +++ b/tests/ingestion/test_daily_jobs.py @@ -0,0 +1,26 @@ +from datetime import datetime, timezone +from unittest.mock import patch + +import pytest + +from lamp_py.ingestion.daily.trip_updates import within_daily_processing_window + + +@pytest.mark.parametrize( + "hour,expected", + [ + # Outside processing window + (0, False), # midnight + (23, False), # 11 PM UTC + # Within processing window (1 AM to 7 AM UTC, hour < 7) + (1, True), + (2, True), + (6, True), + ], +) +def test_get_daily_processing_window(hour: int, expected: bool) -> None: + """Test that processing window correctly identifies hours within range [1, 23).""" + mock_time = datetime(2026, 4, 7, hour, 0, 0, tzinfo=timezone.utc) + with patch("lamp_py.ingestion.daily.trip_updates.datetime") as mock_dt: + mock_dt.now.return_value = mock_time + assert within_daily_processing_window() is expected diff --git a/tests/ingestion/test_delta_reingestion.py b/tests/ingestion/test_delta_reingestion.py new file mode 100644 index 00000000..cad5dd86 --- /dev/null +++ b/tests/ingestion/test_delta_reingestion.py @@ -0,0 +1,82 @@ +from datetime import date +from queue import Queue +from unittest.mock import MagicMock, patch + +import pytest + +from lamp_py.ingestion.backfill.delta_reingestion import delta_reingestion_runner +from lamp_py.ingestion.convert_gtfs_rt import GtfsRtConverter +from lamp_py.ingestion.converter import ConfigType +from lamp_py.runtime_utils.remote_files import S3_ARCHIVE, S3_INCOMING, S3Location + + +@pytest.mark.parametrize( + "bucket,start_date,end_date,object_exists_side_effect,expect_error,expected_convert_calls,expected_file_list_calls", + [ + pytest.param("bad-bucket", date(2026, 5, 1), date(2026, 5, 1), [False], True, 0, 0, id="invalid-bucket"), + pytest.param(S3_ARCHIVE, date(2026, 5, 1), date(2026, 5, 2), [True, False], False, 1, 1, id="skip-then-run"), + pytest.param(S3_INCOMING, date(2026, 5, 1), date(2026, 5, 1), [False], False, 1, 1, id="single-day-run"), + ], +) +# pylint: disable=too-many-arguments,too-many-positional-arguments +def test_delta_reingestion_runner( + bucket: str, + start_date: date, + end_date: date, + object_exists_side_effect: list[bool], + expect_error: bool, + expected_convert_calls: int, + expected_file_list_calls: int, +) -> None: + """Test delta_reingestion_runner routes files and handles validation.""" + converter = MagicMock(spec=GtfsRtConverter) + converter.config_type = ConfigType.RT_TRIP_UPDATES + + with ( + patch( + "lamp_py.ingestion.backfill.delta_reingestion.object_exists", side_effect=object_exists_side_effect + ) as mock_exists, + patch( + "lamp_py.ingestion.backfill.delta_reingestion.file_list_from_s3", return_value=["file-a", "file-b"] + ) as mock_list, + ): + if expect_error: + with pytest.raises(ValueError): + delta_reingestion_runner( + start_date=start_date, + end_date=end_date, + final_output_path=S3Location(S3_ARCHIVE, "lamp/RT_TRIP_UPDATES"), + converter=converter, + bucket=bucket, + ) + else: + delta_reingestion_runner( + start_date=start_date, + end_date=end_date, + final_output_path=S3Location(S3_ARCHIVE, "lamp/RT_TRIP_UPDATES"), + converter=converter, + bucket=bucket, + ) + + assert converter.convert.call_count == expected_convert_calls + assert converter.add_files.call_count == expected_convert_calls + assert converter.clean_local_folders.call_count == expected_convert_calls + assert converter.reset_files.call_count == expected_convert_calls + assert mock_list.call_count == expected_file_list_calls + if not expect_error: + assert mock_exists.call_count == len(object_exists_side_effect) + + +@pytest.mark.parametrize( + "initial_files,expected_files", + [ + pytest.param([], [], id="empty-list"), + pytest.param(["a.json.gz", "b.json.gz"], [], id="clears-non-empty-list"), + ], +) +def test_reset_files(initial_files: list[str], expected_files: list[str]) -> None: + """Test reset_files clears the converter's file list.""" + converter = GtfsRtConverter(config_type=ConfigType.RT_ALERTS, metadata_queue=Queue()) + converter.add_files(initial_files) + converter.reset_files() + assert converter.files == expected_files diff --git a/tests/ingestion/test_gtfs_compress.py b/tests/ingestion/test_gtfs_compress.py index f363d4fc..7e697cae 100644 --- a/tests/ingestion/test_gtfs_compress.py +++ b/tests/ingestion/test_gtfs_compress.py @@ -21,7 +21,7 @@ # pylint: disable=R0914 def test_gtfs_to_parquet_compression() -> None: """ - test gtfs -> parquet compression pipeline + Test gtfs -> parquet compression pipeline will test compression of 3 randomly selected schedules from the past year """ diff --git a/tests/ingestion/test_gtfs_converter.py b/tests/ingestion/test_gtfs_converter.py index c643178e..ee8515a2 100644 --- a/tests/ingestion/test_gtfs_converter.py +++ b/tests/ingestion/test_gtfs_converter.py @@ -307,12 +307,11 @@ def all_table_attributes() -> Dict: @pytest.fixture(name="_s3_patch") def fixture_s3_patch(monkeypatch: MonkeyPatch) -> Iterator[None]: """ - insert a monkeypatch over the s3 functions used by the gtfs converter. + Insert a monkeypatch over the s3 functions used by the gtfs converter. * write parquet file function patched to check the table rather than write it * move s3 objects asserts that all moves are to archive and not error """ - # keep track of what tables are written to check against after converting tables_written: List[str] = [] @@ -324,7 +323,7 @@ def mock_write_parquet_file( visitor_func: Optional[Callable[..., None]] = None, ) -> None: """ - instead of writing the parquet file to s3, inspect the contents of the + Instead of writing the parquet file to s3, inspect the contents of the table. call the visitor function on a dummy s3 path. """ # pull the name out of the s3 path and check that we are expecting this table @@ -352,7 +351,7 @@ def mock_write_parquet_file( ) def mock_gtfs_files_to_convert() -> List[Tuple[str, int]]: - """provide list of gtfs paths to convert""" + """Provide list of gtfs paths to convert""" return [(os.path.join(incoming_dir, "MBTA_GTFS.zip"), 1655517536)] monkeypatch.setattr( @@ -373,7 +372,7 @@ def mock_gtfs_files_to_convert() -> List[Tuple[str, int]]: def test_schedule_conversion(_s3_patch: Callable[..., None]) -> None: """ - test that a schedule zip file can be processed correctly, checking for files + Test that a schedule zip file can be processed correctly, checking for files table names, table column names, and table lengths """ # generate a schedule converter with an empty queue to inspect later diff --git a/tests/ingestion/test_hash.py b/tests/ingestion/test_hash.py index fdbae507..76c8822b 100644 --- a/tests/ingestion/test_hash.py +++ b/tests/ingestion/test_hash.py @@ -6,7 +6,7 @@ def hash_gtfs_rt_row_marked_for_removal_wip(row: Any) -> bytes: - """hash row from polars dataframe""" + """Hash row from polars dataframe""" return hashlib.md5(pickle.dumps(row), usedforsecurity=False).digest() @@ -16,7 +16,6 @@ def test_polars_hashrows_equivalent_to_hash_gtfs_rt_row() -> None: the built in polars method does. Can remove hash_gtfs_rt_row_marked_for_removal_wip once we are confident this always holds. """ - table = pl.DataFrame( [ pl.Series("month", [10, 10, 11], dtype=pl.Int64), @@ -54,7 +53,6 @@ def test_polars_hashrows_works_for_all_trip_update_col_types() -> None: Check that hash_row() and the custom hash_gtfs_rt_row() return the same sets when their individual hashes are applied """ - table = pl.read_json( io.StringIO( '[{"id":"71194552","trip_update.trip.trip_id":"71194552","trip_update.trip.route_id":"Green-E","trip_update.trip.direction_id":1,"trip_update.trip.start_time":"20:30:00","trip_update.trip.start_date":"20251023","trip_update.trip.schedule_relationship":null,"trip_update.trip.route_pattern_id":"Green-E-886-1","trip_update.trip.tm_trip_id":null,"trip_update.trip.overload_id":null,"trip_update.trip.overload_offset":null,"trip_update.trip.revenue":true,"trip_update.trip.last_trip":false,"trip_update.vehicle.id":"G-10052","trip_update.vehicle.label":null,"trip_update.vehicle.license_plate":null,"trip_update.vehicle.consist":null,"trip_update.vehicle.assignment_status":null,"trip_update.timestamp":1761263939,"trip_update.delay":null,"year":2025,"month":10,"day":24,"feed_timestamp":1761264002,"trip_update.stop_time_update.stop_sequence":1,"trip_update.stop_time_update.stop_id":"70260","trip_update.stop_time_update.arrival.delay":null,"trip_update.stop_time_update.arrival.time":null,"trip_update.stop_time_update.arrival.uncertainty":null,"trip_update.stop_time_update.departure.delay":null,"trip_update.stop_time_update.departure.time":1761265800,"trip_update.stop_time_update.departure.uncertainty":360,"trip_update.stop_time_update.schedule_relationship":null,"trip_update.stop_time_update.boarding_status":null}]' diff --git a/tests/ingestion/test_ingest.py b/tests/ingestion/test_ingest.py index da6dc4b2..6bf98075 100644 --- a/tests/ingestion/test_ingest.py +++ b/tests/ingestion/test_ingest.py @@ -8,7 +8,6 @@ from lamp_py.ingestion.converter import ConfigType from lamp_py.runtime_utils.lamp_exception import NoImplException -from lamp_py.runtime_utils.lamp_exception import IgnoreIngestion from lamp_py.ingestion.convert_gtfs_rt import GtfsRtConverter diff --git a/tests/ingestion/test_yield_check_periodic.py b/tests/ingestion/test_yield_check_periodic.py new file mode 100644 index 00000000..ef531ee8 --- /dev/null +++ b/tests/ingestion/test_yield_check_periodic.py @@ -0,0 +1,163 @@ +"""Tests for time-based periodic yielding in GtfsRtFullPartitionConverter.""" + +from datetime import datetime +from queue import Queue +from typing import List, Tuple + +import pyarrow +import pytest + +from lamp_py.ingestion.convert_gtfs_rt import TableData +from lamp_py.ingestion.convert_gtfs_rt_fullset import GtfsRtFullPartitionConverter +from lamp_py.ingestion.converter import ConfigType +from lamp_py.runtime_utils.process_logger import ProcessLogger + + +def make_converter( + time_chunk_minutes: int = 15, move_source_on_completion: bool = False +) -> GtfsRtFullPartitionConverter: + """Create a converter with periodic yielding enabled.""" + return GtfsRtFullPartitionConverter( + config_type=ConfigType.RT_TRIP_UPDATES, + metadata_queue=Queue(), + move_source_on_completion=move_source_on_completion, + time_chunk_minutes=time_chunk_minutes, + ) + + +def make_dummy_table(num_rows: int = 10) -> pyarrow.Table: + """Create a minimal pyarrow table for testing.""" + return pyarrow.table({"col": list(range(num_rows))}) + + +@pytest.mark.parametrize( + "chunk_minutes, input_ts, expected_ts", + [ + # 15-minute chunks + pytest.param(15, datetime(2026, 5, 4, 1, 15, 0), datetime(2026, 5, 4, 1, 15), id="15m-on-boundary"), + pytest.param(15, datetime(2026, 5, 4, 1, 14, 59), datetime(2026, 5, 4, 1, 0), id="15m-just-before-boundary"), + pytest.param(15, datetime(2026, 5, 4, 1, 15, 1), datetime(2026, 5, 4, 1, 15), id="15m-just-after-boundary"), + pytest.param(15, datetime(2026, 5, 4, 1, 0, 0), datetime(2026, 5, 4, 1, 0), id="15m-start-of-hour"), + pytest.param(15, datetime(2026, 5, 4, 23, 59, 59), datetime(2026, 5, 4, 23, 45), id="15m-end-of-day"), + pytest.param(15, datetime(2026, 5, 4, 0, 0, 0), datetime(2026, 5, 4, 0, 0), id="15m-midnight"), + # 30-minute chunks + pytest.param(30, datetime(2026, 5, 4, 1, 29), datetime(2026, 5, 4, 1, 0), id="30m-before-boundary"), + pytest.param(30, datetime(2026, 5, 4, 1, 30), datetime(2026, 5, 4, 1, 30), id="30m-on-boundary"), + pytest.param(30, datetime(2026, 5, 4, 1, 59), datetime(2026, 5, 4, 1, 30), id="30m-end-of-hour"), + # 5-minute chunks + pytest.param(5, datetime(2026, 5, 4, 1, 7), datetime(2026, 5, 4, 1, 5), id="5m-mid-interval"), + pytest.param(5, datetime(2026, 5, 4, 1, 10), datetime(2026, 5, 4, 1, 10), id="5m-on-boundary"), + # 60-minute chunks + pytest.param(60, datetime(2026, 5, 4, 1, 59), datetime(2026, 5, 4, 1, 0), id="60m-end-of-hour"), + pytest.param(60, datetime(2026, 5, 4, 2, 0), datetime(2026, 5, 4, 2, 0), id="60m-on-boundary"), + ], +) +def test_interval_key(chunk_minutes: int, input_ts: datetime, expected_ts: datetime) -> None: + """_interval_key truncates timestamps to wall-clock-aligned interval starts.""" + c = make_converter(chunk_minutes) + # pylint: disable=protected-access + assert c._interval_key(input_ts) == expected_ts + + +@pytest.mark.parametrize( + "interval_minutes, current_ts, flush, expected_yield_count, expected_remaining_keys", + [ + # current_ts in same interval as data: should not yield + pytest.param( + [(1, 15)], + datetime(2026, 5, 4, 1, 20), + False, + 0, + [datetime(2026, 5, 4, 1, 15)], + id="same-interval-no-yield", + ), + # current_ts before interval start: should not yield + pytest.param( + [(1, 15)], + datetime(2026, 5, 4, 1, 10), + False, + 0, + [datetime(2026, 5, 4, 1, 15)], + id="past-interval-no-yield", + ), + # current_ts in a later interval: should yield old interval + pytest.param( + [(1, 15)], + datetime(2026, 5, 4, 1, 35), + False, + 1, + [], + id="later-interval-yields", + ), + # flush yields everything regardless of current_ts + pytest.param( + [(1, 0), (1, 15)], + datetime(2026, 5, 4, 23, 59), + True, + 2, + [], + id="flush-yields-all", + ), + # selective yield: current at 01:00, should yield 01:15 and 01:30 but not 01:00 + pytest.param( + [(1, 0), (1, 15), (1, 30)], + datetime(2026, 5, 4, 1, 20), + False, + 1, + [datetime(2026, 5, 4, 1, 15), datetime(2026, 5, 4, 1, 30)], + id="selective-yield-older-only", + ), + ], +) +def test_yield_check_periodic( + interval_minutes: List[Tuple[int, int]], + current_ts: datetime, + flush: bool, + expected_yield_count: int, + expected_remaining_keys: List[datetime], +) -> None: + """yield_check_periodic yields completed intervals based on current_ts position.""" + c = make_converter(15) + logger = ProcessLogger("test") + logger.log_start() + + for hour, minute in interval_minutes: + key = datetime(2026, 5, 4, hour, minute) + c.data_parts[key] = TableData() + c.data_parts[key].table = make_dummy_table(1) + c.data_parts[key].files = [f"file_{hour}_{minute}.json.gz"] + + tables = list(c.yield_check_periodic(logger, current_ts, flush=flush)) + + assert len(tables) == expected_yield_count + assert list(c.data_parts.keys()) == expected_remaining_keys + + +def test_yield_check_periodic_skips_none_table() -> None: + """Intervals with None table should not be yielded even when flush=True.""" + c = make_converter(15) + logger = ProcessLogger("test") + logger.log_start() + + key = datetime(2026, 5, 4, 1, 15) + c.data_parts[key] = TableData() # table is None by default + + tables = list(c.yield_check_periodic(logger, datetime(2026, 5, 4, 0, 50), flush=True)) + assert len(tables) == 0 + + +def test_yield_check_periodic_archives_files() -> None: + """Yielded intervals should move their files to archive_files.""" + c = make_converter(15, move_source_on_completion=True) + logger = ProcessLogger("test") + logger.log_start() + + key = datetime(2026, 5, 4, 1, 15) + c.data_parts[key] = TableData() + c.data_parts[key].table = make_dummy_table(5) + c.data_parts[key].files = ["a.json.gz", "b.json.gz"] + + list(c.yield_check_periodic(logger, datetime(2026, 5, 4, 1, 40))) + + assert "a.json.gz" in c.archive_files + assert "b.json.gz" in c.archive_files diff --git a/validation/validate_data_across_environment.py b/validation/validate_data_across_environment.py new file mode 100644 index 00000000..39429aed --- /dev/null +++ b/validation/validate_data_across_environment.py @@ -0,0 +1,262 @@ +# type: ignore +import marimo + +__generated_with = "0.14.17" +app = marimo.App(width="medium") + + +@app.cell +def _(): + import marimo as mo + import polars as pl + import pyarrow.parquet as pq + + return pl, pq + + +@app.cell +def _(pl, pq): + + def get_rowgroup_statistics(parquet_path: str) -> pl.DataFrame: + """ + Extract row group statistics from a Parquet file and return as a Polars DataFrame. + Args: + parquet_path: Path to the parquet file + Returns: + Polars DataFrame with row group statistics + """ + parquet_file = pq.ParquetFile(parquet_path) + file_metadata = parquet_file.metadata + + records = [] + for i in range(file_metadata.num_row_groups): + row_group_metadata = file_metadata.row_group(i) + + for col_i in range(row_group_metadata.num_columns): + column_chunk_metadata = row_group_metadata.column(col_i) + stats = column_chunk_metadata.statistics + + records.append( + { + "row_group": i, + "num_rows": row_group_metadata.num_rows, + "total_byte_size": row_group_metadata.total_byte_size, + "column_index": col_i, + "column_path": column_chunk_metadata.path_in_schema, + "compressed_size": column_chunk_metadata.total_compressed_size, + "uncompressed_size": column_chunk_metadata.total_uncompressed_size, + "min_value": str(stats.min) if stats and stats.min is not None else None, + "max_value": str(stats.max) if stats and stats.max is not None else None, + "null_count": stats.null_count if stats else None, + "num_values": stats.num_values if stats else None, + } + ) + + return pl.DataFrame(records) + + return (get_rowgroup_statistics,) + + +@app.cell +def _(get_rowgroup_statistics): + df = get_rowgroup_statistics( + "s3://mbta-ctd-dataplatform-staging-springboard/lamp/RT_TRIP_UPDATES/year=2026/month=5/day=1/2026-05-01T00:00:00.parquet" + ) + return (df,) + + +@app.cell +def _(df): + df["total_byte_size"].sum() / (1024**3) + return + + +@app.cell +def _(df): + df["num_rows"].sum() + return + + +@app.cell +def _(): + import duckdb + + return (duckdb,) + + +@app.cell +def _(duckdb): + con = duckdb.connect() + return (con,) + + +@app.cell +def _(con): + con.execute("load aws;") + return + + +@app.cell +def _(con): + con.execute( + """ + CREATE OR REPLACE SECRET secret ( + TYPE s3, + PROVIDER credential_chain + ); + """ + ) + return + + +@app.cell +def _(): + 2000000 * 171 + return + + +@app.cell +def _(con): + route1 = con.sql( + """SELECT * from read_parquet('s3://mbta-ctd-dataplatform-staging-springboard/lamp/RT_TRIP_UPDATES/year=2026/month=5/day=16/2026-05-16T00:00:00.parquet') + where "trip_update.trip.route_id" == '1'""" + ).pl() + return (route1,) + + +@app.cell +def _(route1): + route1 + return + + +@app.cell +def _(con): + route1_prod = con.sql( + """SELECT * from read_parquet('s3://mbta-ctd-dataplatform-springboard/lamp/RT_TRIP_UPDATES/year=2026/month=5/day=16/2026-05-16T00:00:00.parquet') + where "trip_update.trip.route_id" == '1'""" + ).pl() + return (route1_prod,) + + +@app.cell +def _(con): + route1_prod_vp = con.sql( + """SELECT * from read_parquet('s3://mbta-ctd-dataplatform-springboard/lamp/RT_VEHICLE_POSITIONS/year=2026/month=5/day=16/2026-05-16T00:00:00.parquet') + where "vehicle.trip.route_id" == '1'""" + ).pl() + return (route1_prod_vp,) + + +@app.cell +def _(route1_prod_vp): + route1_prod_vp.write_parquet("prod_vp.parquet") + return + + +@app.cell +def _(con): + route1_dev = con.sql( + """SELECT * from read_parquet('s3://mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2026/month=5/day=16/*.parquet') + where "trip_update.trip.route_id" == '1'""" + ).pl() + return (route1_dev,) + + +@app.cell +def _(route1, route1_prod): + route1_prod.sort("feed_timestamp").equals(route1.sort("feed_timestamp")) + return + + +@app.cell +def _(route1): + route1.describe() + return + + +@app.cell +def _(route1, route1_prod): + route1_prod.describe().equals(route1.describe()) + return + + +@app.cell +def _(pl, route1_dev, route1_prod): + route1_prod.describe().equals( + route1_dev.unique(subset=pl.exclude("feed_timestamp")).select(route1_prod.columns).describe() + ) + return + + +@app.cell +def _(pl, route1_dev, route1_prod): + route1_dev.unique(subset=pl.exclude("feed_timestamp")).select(route1_prod.columns).describe() + return + + +@app.cell +def _(): + return + + +@app.cell +def _(route1, route1_dev, route1_prod): + route1.write_parquet("stage.parquet") + route1_prod.write_parquet("prod.parquet") + route1_dev.write_parquet("dev.parquet") + return + + +@app.cell +def _(): + import pyarrow.dataset as ds + + dset = ds.dataset( + "s3://mbta-ctd-dataplatform-staging-springboard/lamp/RT_TRIP_UPDATES/year=2026/month=5/day=1/2026-05-01T00:00:00.parquet", + format="parquet", + ) + + return (dset,) + + +@app.cell +def _(dset, pl): + pl.scan_pyarrow_dataset(dset).filter(pl.col("trip_update.trip.route_id") == "1").collect() + return + + +@app.cell +def _(df): + routes = df.select("trip_update.trip.route_id").unique().collect() + return (routes,) + + +@app.cell +def _(df): + df.columns + return + + +@app.cell +def _(routes): + routes.sort(by="trip_update.trip.route_id") + return + + +@app.cell +def _(get_rowgroup_statistics): + get_rowgroup_statistics( + "s3://mbta-ctd-dataplatform-staging-springboard/lamp/RT_TRIP_UPDATES/year=2026/month=5/day=1/2026-05-01T00:00:00.parquet" + ) + return + + +@app.cell +def _(df, pl): + df1 = df.filter(pl.col("trip_update.trip.route_id") == "1").collect() + return + + +if __name__ == "__main__": + app.run()