diff --git a/poetry.lock b/poetry.lock index e663b0ed..7c5f00fa 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1992,41 +1992,42 @@ testing = ["pytest"] [[package]] name = "marimo" -version = "0.16.5" +version = "0.23.6" description = "A library for making reactive notebooks and apps" optional = false -python-versions = ">=3.9" -groups = ["investigation"] +python-versions = ">=3.10" files = [ - {file = "marimo-0.16.5-py3-none-any.whl", hash = "sha256:1f98c0ee0fed9337e26c895c662f92cc578cdd03502c194eac9ceeb434bf479b"}, - {file = "marimo-0.16.5.tar.gz", hash = "sha256:8f5939d3c4e67ff25f6cfeefe731971ed7f3346c20098034b923a24a0d7770d6"}, + {file = "marimo-0.23.6-py3-none-any.whl", hash = "sha256:e8d19d875b0212600faa80eaaed0fd3f34dfb7b5241e630cf2b1cedd8dd14509"}, + {file = "marimo-0.23.6.tar.gz", hash = "sha256:d63aeeee1e9ea7cac79bf2530daba915199153dce4d156fade7546474679d3ca"}, ] [package.dependencies] click = ">=8.0,<9" docutils = ">=0.16.0" itsdangerous = ">=2.0.0" -jedi = ">=0.18.0" -loro = {version = ">=1.5.0", markers = "python_full_version >= \"3.11.0\""} +jedi = ">=0.18.0,<0.20.0" +loro = ">=1.10.0" markdown = ">=3.6,<4" -msgspec = ">=0.19.0" +msgspec = ">=0.20.0" narwhals = ">=2.0.0" packaging = "*" psutil = ">=5.0" -pygments = ">=2.13,<3" -pymdown-extensions = ">=10.15,<11" -pyyaml = ">=6.0" -starlette = ">=0.35.0,<0.36.0 || >0.36.0" +pygments = ">=2.19,<3" +pymdown-extensions = ">=10.21.2,<11" +pyyaml = ">=6.0.1" +pyzmq = {version = ">=27.1.0", markers = "python_full_version < \"3.15\""} +starlette = ">=0.37.2" tomlkit = ">=0.12.0" -uvicorn = ">=0.22.0,<0.36.0" +uvicorn = ">=0.22.0" websockets = ">=14.2.0" [package.extras] -dev = ["black (>=23.12.1,<23.13.0)", "click (>=8.0,<9)", "duckdb (>=1.0.0)", "openai (>=1.55.3)", "opentelemetry-api (>=1.26.0,<1.27.0)", "opentelemetry-sdk (>=1.26.0,<1.27.0)", "ruff (>=0.13.2)", "sqlglot[rs] (>=26.2.0)"] lsp = ["python-lsp-ruff (>=2.0.0)", "python-lsp-server (>=1.13.0)"] -mcp = ["mcp (>=1.0.0) ; python_full_version >= \"3.10.0\"", "pydantic (>2) ; python_full_version >= \"3.10.0\""] -recommended = ["altair (>=5.4.0)", "duckdb (>=1.0.0)", "nbformat (>=5.7.0)", "openai (>=1.55.3)", "polars[pyarrow] (>=1.9.0)", "ruff", "sqlglot[rs] (>=26.2.0)"] -sql = ["duckdb (>=1.0.0)", "polars[pyarrow] (>=1.9.0)", "sqlglot[rs] (>=26.2.0)"] +mcp = ["mcp (>=1.0.0)", "pydantic (>2)"] +otel = ["opentelemetry-api (>=1.28.0,<1.29.0)", "opentelemetry-exporter-otlp-proto-grpc (>=1.28.0,<1.29.0)", "opentelemetry-exporter-otlp-proto-http (>=1.28.0,<1.29.0)", "opentelemetry-sdk (>=1.28.0,<1.29.0)"] +recommended = ["altair (>=5.4.0)", "marimo[sandbox]", "marimo[sql]", "nbformat (>=5.7.0)", "pydantic-ai-slim[openai] (>=1.52.0)", "ruff"] +sandbox = ["pyzmq (>=27.1.0)", "uv (>=0.9.21)"] +sql = ["duckdb (>=1.0.0)", "polars[pyarrow] (>=1.9.0)", "sqlglot[c] (>=26.8.0)"] [[package]] name = "markdown" @@ -3325,14 +3326,14 @@ testutils = ["gitpython (>3)"] [[package]] name = "pymdown-extensions" -version = "10.21" +version = "10.21.3" description = "Extension pack for Python Markdown." optional = false python-versions = ">=3.9" groups = ["investigation"] files = [ - {file = "pymdown_extensions-10.21-py3-none-any.whl", hash = "sha256:91b879f9f864d49794c2d9534372b10150e6141096c3908a455e45ca72ad9d3f"}, - {file = "pymdown_extensions-10.21.tar.gz", hash = "sha256:39f4a020f40773f6b2ff31d2cd2546c2c04d0a6498c31d9c688d2be07e1767d5"}, + {file = "pymdown_extensions-10.21.3-py3-none-any.whl", hash = "sha256:d7a5d08014fc571e80ca21dd6f854e31f94c489800350564d55d15b3c41e76b6"}, + {file = "pymdown_extensions-10.21.3.tar.gz", hash = "sha256:72cfcf55f07aea0d4af2c4f11dd4e52466ddfb1bb819673146398e0bd3a77354"}, ] [package.dependencies] @@ -4502,4 +4503,4 @@ propcache = ">=0.2.1" [metadata] lock-version = "2.1" python-versions = "^3.12" -content-hash = "92a0fd856642cb354f15e8c22a84bc23972365b7f4cac91c2301e5cf4d7feb45" +content-hash = "453e3d5378b2505a1183687b8b1d9cfd14d7122e097fc7921b6a7fed611ebf0f" diff --git a/pyproject.toml b/pyproject.toml index 47a05f96..2c5fcd2f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,7 +54,8 @@ ipykernel = "^7.1.0" matplotlib = "^3.9.0" seaborn = "^0.13.2" tabulate = "^0.9.0" -marimo = "^0.16" +marimo = "^0.23" +plotly = "^6.5" [tool.poetry.group.dev.dependencies] black = "^24.3.0" diff --git a/runners/run_ingest_s3_files.py b/runners/run_ingest_s3_files.py new file mode 100644 index 00000000..74cf9ec2 --- /dev/null +++ b/runners/run_ingest_s3_files.py @@ -0,0 +1,7 @@ +from lamp_py.ingestion.ingest_gtfs import ingest_s3_files + +# from lamp_py.postgres.postgres_utils import start_rds_writer_process + +# metadata_queue, rds_process = start_rds_writer_process() + +ingest_s3_files(None, bucket_filter="lamp") diff --git a/src/lamp_py/ingestion/backfill/__init__.py b/src/lamp_py/ingestion/backfill/__init__.py new file mode 100644 index 00000000..a3507758 --- /dev/null +++ b/src/lamp_py/ingestion/backfill/__init__.py @@ -0,0 +1 @@ +"""Tools for backfilling and reprocessing data""" diff --git a/src/lamp_py/ingestion/config_busloc_trip.py b/src/lamp_py/ingestion/config_busloc_trip.py index ee376ce5..897be141 100644 --- a/src/lamp_py/ingestion/config_busloc_trip.py +++ b/src/lamp_py/ingestion/config_busloc_trip.py @@ -17,7 +17,7 @@ class RtBusTripDetail(GTFSRTDetail): """ def transform_for_write(self, table: pyarrow.table) -> pyarrow.table: - """modify table schema before write to parquet""" + """Modify table schema before write to parquet""" return flatten_table_schema(explode_table_column(flatten_table_schema(table), "trip_update.stop_time_update")) @property diff --git a/src/lamp_py/ingestion/config_rt_trip.py b/src/lamp_py/ingestion/config_rt_trip.py index 2cf53f07..07af4190 100644 --- a/src/lamp_py/ingestion/config_rt_trip.py +++ b/src/lamp_py/ingestion/config_rt_trip.py @@ -17,7 +17,7 @@ class RtTripDetail(GTFSRTDetail): """ def transform_for_write(self, table: pyarrow.table) -> pyarrow.table: - """modify table schema before write to parquet""" + """Modify table schema before write to parquet""" return flatten_table_schema(explode_table_column(flatten_table_schema(table), "trip_update.stop_time_update")) @property diff --git a/src/lamp_py/ingestion/convert_gtfs.py b/src/lamp_py/ingestion/convert_gtfs.py index 85464cf6..ed5c5f69 100644 --- a/src/lamp_py/ingestion/convert_gtfs.py +++ b/src/lamp_py/ingestion/convert_gtfs.py @@ -26,7 +26,7 @@ def gtfs_files_to_convert() -> List[Tuple[str, int]]: """ - create list of Tuple[GTFS url, version_key] for GtfsConverter + Create list of Tuple[GTFS url, version_key] for GtfsConverter version_key is based on published_dt """ @@ -75,7 +75,7 @@ def convert(self) -> None: def process_schedule(self, url: str, version_key: int) -> None: """ - convert a schedule gtfs zip file into tables. the zip file is + Convert a schedule gtfs zip file into tables. the zip file is essentially a small database with each contained file (outside of feed info) acting as its own table. info on the gtfs scheduling standard can be found at http://gtfs.org/schedule/ @@ -99,7 +99,7 @@ def process_schedule(self, url: str, version_key: int) -> None: def create_table(self, gtfs_zip: zipfile.ZipFile, table_filename: str, version_key: int) -> None: """ - read a csv table out of a gtfs static schedule file, add a timestamp + Read a csv table out of a gtfs static schedule file, add a timestamp column to each row, and write it as a parquet file on s3, partitioned by the timestamp diff --git a/src/lamp_py/ingestion/convert_gtfs_rt.py b/src/lamp_py/ingestion/convert_gtfs_rt.py index d1d6cb18..bcc98149 100644 --- a/src/lamp_py/ingestion/convert_gtfs_rt.py +++ b/src/lamp_py/ingestion/convert_gtfs_rt.py @@ -136,7 +136,12 @@ class GtfsRtConverter(Converter): https_mbta_integration.mybluemix.net_vehicleCount.gz """ - def __init__(self, config_type: ConfigType, metadata_queue: Queue[Optional[str]], max_workers: int = 4) -> None: + def __init__( + self, + config_type: ConfigType, + metadata_queue: Queue[Optional[str]], + max_workers: int = 8, + ) -> None: Converter.__init__(self, config_type, metadata_queue) # Depending on filename, assign self.details to correct implementation @@ -185,8 +190,10 @@ def convert(self) -> None: continue self.continuous_pq_update(table) - pool = pyarrow.default_memory_pool() - pool.release_unused() + + # try to get pyarrow to limit memory usage after each loop. + # this is a "ask nicely and pray" move...we can't manage memory directly in python. + pyarrow.default_memory_pool().release_unused() table_count += 1 process_logger.add_metadata(table_count=table_count) # limit number of tables produced on each event loop @@ -204,7 +211,7 @@ def convert(self) -> None: def thread_init(self) -> None: """ - initialize the filesystem in each convert thread + Initialize the filesystem in each convert thread update the active fs to use the s3 filesystem for all loading if the first file starts with s3 @@ -217,11 +224,10 @@ def thread_init(self) -> None: def process_files(self) -> Iterable[pyarrow.table]: """ - iterate through all of the files to be converted + Iterate through all of the files to be converted only yield a new table when table size crosses over min_rows of yield_check """ - process_logger = ProcessLogger( "create_pyarrow_tables", config_type=str(self.config_type), @@ -272,7 +278,7 @@ def process_files(self) -> Iterable[pyarrow.table]: def yield_check(self, process_logger: ProcessLogger, min_rows: int = 2_000_000) -> Iterable[pyarrow.table]: """ - yield all tables in the data_parts map that have been sufficiently + Yield all tables in the data_parts map that have been sufficiently processed. @min_rows - how many rows the table must have to be yielded @@ -367,7 +373,7 @@ def gz_to_pyarrow(self, filename: str) -> Tuple[Optional[datetime], str, Optiona def partition_dt(self, table: pyarrow.Table) -> datetime: """ - verify partition structure of pyarrow Table + Verify partition structure of pyarrow Table :param table: pyarrow Table to verify @@ -419,7 +425,7 @@ def sync_with_s3(self, local_path: str) -> bool: def make_hash_dataset(self, table: pyarrow.Table, local_path: str) -> pd.Dataset: """ - create dataset, with hash column, that will be written to parquet file + Create dataset, with hash column, that will be written to parquet file :param table: pyarrow Table :param local_path: path to local parquet file @@ -451,9 +457,9 @@ def make_hash_dataset(self, table: pyarrow.Table, local_path: str) -> pd.Dataset # pylint: disable=R0914 # pylint too many local variables (more than 15) - def write_local_pq(self, table: pyarrow.Table, local_path: str) -> None: + def write_local_pq_partition(self, table: pyarrow.Table, local_path: str) -> None: """ - merge pyarrow Table with existing local_path parquet file + Merge pyarrow Table with existing local_path parquet file :param table: pyarrow Table :param local_path: path to local parquet file @@ -586,7 +592,7 @@ def continuous_pq_update(self, table: pyarrow.Table) -> None: log.add_metadata(local_path=local_path) - self.write_local_pq(table, local_path) + self.write_local_pq_partition(table, local_path) self.send_metadata(local_path.replace(self.tmp_folder, S3_SPRINGBOARD)) # record the number of rows in the final parquet file for logging @@ -610,7 +616,7 @@ def continuous_pq_update(self, table: pyarrow.Table) -> None: def clean_local_folders(self) -> None: """ - clean local temp folders + Clean local temp folders """ days_to_keep = 2 root_folder = os.path.join( @@ -630,7 +636,7 @@ def clean_local_folders(self) -> None: def move_s3_files(self) -> None: """ - move archive and error files to their respective s3 buckets. + Move archive and error files to their respective s3 buckets. """ if len(self.error_files) > 0: self.error_files = move_s3_objects( diff --git a/src/lamp_py/ingestion/convert_gtfs_rt_fullset.py b/src/lamp_py/ingestion/convert_gtfs_rt_fullset.py new file mode 100644 index 00000000..24c63660 --- /dev/null +++ b/src/lamp_py/ingestion/convert_gtfs_rt_fullset.py @@ -0,0 +1,266 @@ +# pylint: disable=too-many-positional-arguments,too-many-arguments, too-many-locals, redefined-outer-name, R0801 + +from concurrent.futures import ThreadPoolExecutor +import os +from datetime import datetime +from pathlib import Path +from queue import Queue +from typing import Dict, Iterable, List, Optional, Tuple +import pyarrow +import polars as pl + +from lamp_py.aws.s3 import upload_file +from lamp_py.ingestion.convert_gtfs_rt import GtfsRtConverter, TableData +from lamp_py.ingestion.converter import ConfigType + +from lamp_py.ingestion.utils import assign_datetime_to_binned_interval +from lamp_py.runtime_utils.process_logger import ProcessLogger +from lamp_py.runtime_utils.remote_files import LAMP, S3Location + + +# pylint: disable=too-many-arguments,too-many-instance-attributes +class GtfsRtFullPartitionConverter(GtfsRtConverter): + """ + Converter that handles GTFS Real Time JSON data + + https_cdn.mbta.com_realtime_TripUpdates_enhanced.json.gz + """ + + def __init__( + self, + config_type: ConfigType, + metadata_queue: Queue[Optional[str]], + local_output_location: str = "/tmp/gtfs-rt-fullset/", + remote_output_location: S3Location | None = None, + polars_filter: pl.Expr = pl.lit(True), # default to true - which will essentially not filter + max_workers: int = 8, + time_chunk_minutes: int = 15, + move_source_on_completion: bool = False, + ) -> None: + """ + Initialize GTFS-RT fullset converter with time-chunked partitioning. + + Args: + config_type: Type of GTFS-RT configuration (trip updates, vehicle positions). + metadata_queue: Queue for metadata communication. + local_output_location: Local directory for temporary parquet files. + remote_output_location: S3 location for final output; if None, files stay local. + polars_filter: Polars expression to filter data at conversion time; defaults to no filtering. + max_workers: Number of worker threads for parallel processing. + time_chunk_minutes: Minutes for time-based partitioning (e.g., 15 min intervals). + move_source_on_completion: If True, move source files to archive after completion. + For the LAMP usecase, `source` in this context refers to the `delta` or `archive` + bucket, for ingestion or backfill respectively. The output is uploaded regardless, + but the `source` is only conditionally moved. + `delta` -> `archive` True, do this, + `archive` -> `archive` False, don't do this + """ + GtfsRtConverter.__init__(self, config_type, metadata_queue, max_workers=max_workers) + + if time_chunk_minutes < 5: + raise ValueError( + "time_chunk_minutes must be at least 5 to ensure proper partitioning and avoid too many small files" + ) + + # Keep tmp_folder as the base temp root to match GtfsRtConverter helpers + # (clean_local_folders, continuous_pq_update, etc.), which append + # lamp/ internally. + self.tmp_folder = local_output_location + self.remote_output_location = remote_output_location + self.data_parts: Dict[datetime, TableData] = {} + self.filter = polars_filter + self.move_source_on_completion = move_source_on_completion + self.time_chunk_minutes = time_chunk_minutes + + def convert(self) -> None: + """ + Convert all files in self.files to time-chunked parquet partitions. + + Specialization over GtfsRtConverter::convert() - this converter does not + apply unique() on the incoming GTFS-RT data, and will partition files by + day or by time chunk (e.g. 15 min intervals) depending on the + time_chunk_minutes parameter. It will also apply a polars filter to the + data at the point of conversion from json.gz to pyarrow table, which + should help reduce the amount of data being written out and speed up the + conversion process. This converter is suitable for live ingestion and + backfill tasks. + """ + process_logger = ProcessLogger( + "fullset_gtfs_parquet_table_creator", + table_type="gtfs-rt", + config_type=str(self.config_type), + file_count=len(self.files), + ) + process_logger.log_start() + + table_count = 0 + try: + for table, partition_dt in self.process_files(): + if table.num_rows == 0: + continue + + path_suffix = os.path.join( + f"year={partition_dt.year}", + f"month={partition_dt.month}", + f"day={partition_dt.day}", + f"{partition_dt.isoformat()}.parquet", + ) + + local_path = os.path.join(self.tmp_folder, LAMP, str(self.config_type), path_suffix) + + os.makedirs(Path(local_path).parent, exist_ok=True) + + self.write_local_pq_partition(table, local_path) + + # in backfill mode, we don't want to move files around in s3 - + # we just want to write the converted files to the output location + # (which could be a temp location or the final archive location). + # in non-backfill mode, we want to move the original gtfs-rt files from + # incoming to archive after successful conversion. + if self.move_source_on_completion: + self.move_s3_files() + + # mirror on s3 if remote output location is provided + if self.remote_output_location is not None: + s3_path = os.path.join(self.remote_output_location.s3_uri, path_suffix) + upload_file(local_path, s3_path) + + # update the metadata table with the new s3 path for the converted file, + # so it can be picked up by the next stage of the pipeline + self.send_metadata(s3_path) + + # try to get pyarrow to limit memory usage after each loop. + # this is a "ask nicely and pray" move...we can't manage memory directly in python. + pyarrow.default_memory_pool().release_unused() + + table_count += 1 + process_logger.add_metadata(table_count=table_count) + + except Exception as exception: + process_logger.log_failure(exception) + else: + process_logger.log_complete() + finally: + self.data_parts = {} + if self.move_source_on_completion: + self.move_s3_files() + self.clean_local_folders() + + def write_local_pq_partition(self, table: pyarrow.Table, local_path: str) -> None: + """ + Just write the file out.. + + this should be already sorted by timestamp based on how self.files is yielded. + """ + df: pl.DataFrame = pl.from_arrow(table).filter(self.filter) # type: ignore[arg-type, assignment] + + # read local_path if exists and concat with table + # this handles the case where a prior iteration yielded an incomplete time chunk, + # and we are now filling in the remaining record that is part of that chunk + if os.path.exists(local_path): + existing_table: pl.DataFrame = pl.read_parquet(local_path) + df = pl.concat([existing_table, df], how="diagonal") + + df.write_parquet(local_path, compression="zstd", compression_level=3) + + def process_files(self) -> Iterable[pyarrow.table]: + """ + Yield time-chunked parquet tables from all input files. + + Applies a polars filter to narrow results at the source to reduce write + churn. Converts json.gz input files to pyarrow tables, groups into time + chunk intervals, and yields tables for completed intervals as we go. + """ + process_logger = ProcessLogger( + "fullset_create_pyarrow_tables", + config_type=str(self.config_type), + ) + process_logger.log_start() + + with ThreadPoolExecutor(max_workers=self.max_workers, initializer=self.thread_init) as pool: + for result_dt, result_filename, rt_data in pool.map(self.gz_to_pyarrow, self.files): + # errors in gtfs_rt conversions are handled in the gz_to_pyarrow + # function. if one is encountered, the datetime will be none. log + # the error and move on to the next file. + + if result_dt is None: + process_logger.add_metadata(result=result_dt, file=result_filename) + continue + + # create key for self.data_parts dictionary that bins based on the chunk interval + dt_part = assign_datetime_to_binned_interval(result_dt, self.time_chunk_minutes) + + # create new self.table_groups entry for key if it doesn't exist + if dt_part not in self.data_parts: + self.data_parts[dt_part] = TableData() + self.data_parts[dt_part].table = self.detail.transform_for_write(rt_data) + else: + self.data_parts[dt_part].table = pyarrow.concat_tables( + [self.data_parts[dt_part].table, self.detail.transform_for_write(rt_data)] + ) + + self.data_parts[dt_part].files.append(result_filename) + + # we're mapping each gz to a range dt_part. when we have > 1 dt_parts in the map, + # that means we've processed files from at least 2 different time intervals and + # can start yielding tables for the intervals that are complete. + # this relies on self.files being sorted - enforced by add_files(), + # and ThreadPoolExecutor/map yields __next__ iterator, i.e. returns in the right order + if len(self.data_parts) > 1: + yield from self.yield_check_periodic(process_logger, result_dt) + + yield from self.yield_check_periodic(process_logger, flush=True) + + process_logger.add_metadata(file_count=0, number_of_rows=0) + process_logger.log_complete() + + def partition_column(self) -> str: + """Return the column name for partitioning output parquet files.""" + return "trip_update.trip.route_id" + + def table_sort_order(self) -> List[Tuple[str, str]]: + """Return sort order specification for output parquet tables.""" + return [ + ("feed_timestamp", "ascending"), + ("trip_update.trip.route_pattern_id", "ascending"), + ("trip_update.trip.direction_id", "ascending"), + ("trip_update.vehicle.id", "ascending"), + ] + + def yield_check_periodic( + self, + process_logger: ProcessLogger, + current_ts: datetime = datetime.now(), + flush: bool = False, + ) -> Iterable[Tuple[pyarrow.table, datetime]]: + """ + Yield completed time-chunk intervals from data_parts. + + When flush=True, yield all remaining intervals regardless of current_ts. + + @current_ts - the feed timestamp of the file just processed + @flush - if True, yield everything remaining + """ + current_interval = assign_datetime_to_binned_interval(current_ts, self.time_chunk_minutes) + for iter_ts in list(self.data_parts.keys()): + table = self.data_parts[iter_ts].table + + # yield if we've moved past this interval + # or if flushing all remaining data + if flush or current_interval > iter_ts: + # only populate archive_files if we're in live ingestion mode + if self.move_source_on_completion: + self.archive_files += self.data_parts[iter_ts].files + + process_logger.add_metadata( + file_count=len(self.data_parts[iter_ts].files), + number_of_rows=table.num_rows, + interval_start=iter_ts.isoformat(), + ) + process_logger.log_complete() + process_logger.add_metadata(file_count=0, number_of_rows=0, print_log=False) + process_logger.log_start() + + yield (table, iter_ts) + + del self.data_parts[iter_ts] diff --git a/src/lamp_py/ingestion/converter.py b/src/lamp_py/ingestion/converter.py index 9388cf1c..fb6d0362 100644 --- a/src/lamp_py/ingestion/converter.py +++ b/src/lamp_py/ingestion/converter.py @@ -113,16 +113,20 @@ def __init__(self, config_type: ConfigType, metadata_queue: Queue[Optional[str]] self.metadata_queue: Queue[Optional[str]] = metadata_queue def add_files(self, files: List[str]) -> None: - """add files to this converter""" + """Add files to this converter""" self.files += files + def reset_files(self) -> None: + """Remove files from this converter""" + self.files = [] + def send_metadata(self, written_file: str) -> None: - """send metadata path to rds writer process""" + """Send metadata path to rds writer process""" self.metadata_queue.put(written_file) @abstractmethod def convert(self) -> None: """ - convert files to pyarrow tables, write them to s3 as parquete, and move + Convert files to pyarrow tables, write them to s3 as parquete, and move files from incoming to archive (or error) """ diff --git a/src/lamp_py/ingestion/glides.py b/src/lamp_py/ingestion/glides.py index 6eea7939..1df91ea1 100644 --- a/src/lamp_py/ingestion/glides.py +++ b/src/lamp_py/ingestion/glides.py @@ -213,7 +213,7 @@ def unique_key(self) -> str: """Key in record['data'] that is unique to this event type""" def download_remote(self) -> None: - """download the remote parquet path for appending""" + """Download the remote parquet path for appending""" if os.path.exists(self.local_path): os.remove(self.local_path) @@ -405,7 +405,7 @@ def ingest_glides_events( kinesis_reader: KinesisReader, metadata_queue: Queue[Optional[str]], upload: bool = False ) -> None: """ - ingest glides records from the kinesis stream and add them to parquet files + Ingest glides records from the kinesis stream and add them to parquet files """ process_logger = ProcessLogger(process_name="ingest_glides_events") process_logger.log_start() diff --git a/src/lamp_py/ingestion/gtfs_rt_detail.py b/src/lamp_py/ingestion/gtfs_rt_detail.py index 1ca80a27..8b0d8590 100644 --- a/src/lamp_py/ingestion/gtfs_rt_detail.py +++ b/src/lamp_py/ingestion/gtfs_rt_detail.py @@ -37,7 +37,7 @@ class GTFSRTDetail[T: FeedEntityTable, M: FeedMessage](ABC): """ def transform_for_write(self, table: pyarrow.table) -> pyarrow.table: - """modify table schema before write to parquet""" + """Modify table schema before write to parquet""" return flatten_table_schema(table) @property diff --git a/src/lamp_py/ingestion/ingest_gtfs.py b/src/lamp_py/ingestion/ingest_gtfs.py index 81aa1728..de17223c 100644 --- a/src/lamp_py/ingestion/ingest_gtfs.py +++ b/src/lamp_py/ingestion/ingest_gtfs.py @@ -11,10 +11,10 @@ move_s3_objects, file_list_from_s3, ) +from lamp_py.ingestion.convert_gtfs_rt_fullset import GtfsRtFullPartitionConverter from lamp_py.runtime_utils.process_logger import ProcessLogger from lamp_py.ingestion.convert_gtfs import GtfsConverter -from lamp_py.ingestion.convert_gtfs_rt import GtfsRtConverter from lamp_py.ingestion.converter import ( ConfigType, Converter, @@ -24,7 +24,7 @@ NoImplException, IgnoreIngestion, ) -from lamp_py.runtime_utils.remote_files import LAMP, S3_ERROR, S3_INCOMING +from lamp_py.runtime_utils.remote_files import LAMP, S3_ERROR, S3_INCOMING, S3_SPRINGBOARD, S3Location from lamp_py.ingestion.utils import group_sort_file_list from lamp_py.ingestion.compress_gtfs.gtfs_to_parquet import gtfs_to_parquet @@ -52,7 +52,7 @@ def run_converter(converter: Converter) -> None: def ingest_gtfs_archive(metadata_queue: Queue[Optional[str]]) -> None: """ - ingest gtfs schedules from MBTA GTFS schedule archive + Ingest gtfs schedules from MBTA GTFS schedule archive """ logger = ProcessLogger(process_name="ingest_gtfs") logger.log_start() @@ -65,7 +65,7 @@ def ingest_gtfs_archive(metadata_queue: Queue[Optional[str]]) -> None: def ingest_s3_files(metadata_queue: Queue[Optional[str]], bucket_filter: str = LAMP) -> None: """ - get all of the filepaths currently in the incoming bucket, sort them into + Get all of the filepaths currently in the incoming bucket, sort them into batches of similar gtfs-rt files, convert each batch into tables, write the tables to parquet files in the springboard bucket, add the parquet filepaths to the metadata table as unprocessed, and move gtfs files to the @@ -75,7 +75,7 @@ def ingest_s3_files(metadata_queue: Queue[Optional[str]], bucket_filter: str = L logger.log_start() try: - files = file_list_from_s3(bucket_name=S3_INCOMING, file_prefix=bucket_filter, max_list_size=10000) + files = file_list_from_s3(bucket_name=S3_INCOMING, file_prefix=bucket_filter, max_list_size=50000) grouped_files = group_sort_file_list(files) @@ -94,7 +94,17 @@ def ingest_s3_files(metadata_queue: Queue[Optional[str]], bucket_filter: str = L try: config_type = ConfigType.from_filename(file_group[0]) if config_type not in converters: - converters[config_type] = GtfsRtConverter(config_type, metadata_queue) + if config_type in (ConfigType.RT_ALERTS, ConfigType.VEHICLE_COUNT, ConfigType.SCHEDULE): + converters[config_type] = GtfsConverter(config_type, metadata_queue) + else: # all TripUpdates, VehiclePositions + converters[config_type] = GtfsRtFullPartitionConverter( + config_type, + metadata_queue, + remote_output_location=S3Location( + S3_SPRINGBOARD, os.path.join(LAMP, "FULLSET", str(config_type)) + ), + move_source_on_completion=True, + ) converters[config_type].add_files(file_group) except IgnoreIngestion: continue @@ -123,6 +133,7 @@ def ingest_s3_files(metadata_queue: Queue[Optional[str]], bucket_filter: str = L process_count = os.cpu_count() if process_count is None: process_count = 4 + if len(converters) > 0: with get_context("spawn").Pool(processes=process_count, maxtasksperchild=1) as pool: pool.map_async(run_converter, converters.values()) @@ -134,7 +145,7 @@ def ingest_s3_files(metadata_queue: Queue[Optional[str]], bucket_filter: str = L def ingest_gtfs(metadata_queue: Queue[Optional[str]], bucket_filter: str = LAMP) -> None: """ - ingest all gtfs file types + Ingest all gtfs file types static schedule files should be ingested first """ diff --git a/src/lamp_py/ingestion/pipeline.py b/src/lamp_py/ingestion/pipeline.py index 6ed15ec9..bcd645d3 100755 --- a/src/lamp_py/ingestion/pipeline.py +++ b/src/lamp_py/ingestion/pipeline.py @@ -1,5 +1,4 @@ #!/usr/bin/env python - import os import time import logging @@ -24,7 +23,7 @@ def main() -> None: """ - run the ingestion pipeline + Run the ingestion pipeline * setup metadata queue metadata writer process * setup a glides kinesis reader @@ -40,23 +39,41 @@ def main() -> None: # connect to the glides kinesis stream glides_reader = KinesisReader(stream_name="ctd-glides-prod") + # today = datetime.now(timezone.utc).date() + # run the event loop every 30 seconds while True: process_logger = ProcessLogger(process_name="main") process_logger.log_start() bucket_filter = LAMP check_for_sigterm(metadata_queue, rds_process) + ingest_gtfs(metadata_queue, bucket_filter=bucket_filter) ingest_glides_events(glides_reader, metadata_queue, upload=True) check_for_sigterm(metadata_queue, rds_process) + # # if datetime has moved on (i.e. it's the next day) + # if today != datetime.now(timezone.utc).date(): + + # in_processing_window=within_daily_processing_window() + # process_logger.add_metadata(in_processing_window=in_processing_window) + + # # this doesn't make use of the processing window fully. we need to have other parts of ingestion + # # populate a queue. this queue can be written to disk occasionally to pick back up...todo + # if in_processing_window: + # # it is next day, do repack and reset + # yesterday = today + # today = datetime.now(timezone.utc).date() + # status = consolidate_partitions_for_archive(yesterday, config=[ConfigType.TRIP_UPDATES]) + # process_logger.add_metadata(consolidate_day=yesterday,consolidation_status=status) + process_logger.log_complete() time.sleep(30) def start() -> None: - """configure and start the ingestion process""" + """Configure and start the ingestion process""" clear_folder("/tmp") # setup handling shutdown commands signal.signal(signal.SIGTERM, handle_ecs_sigterm) diff --git a/src/lamp_py/ingestion/utils.py b/src/lamp_py/ingestion/utils.py index 4f261d20..a1d26f97 100644 --- a/src/lamp_py/ingestion/utils.py +++ b/src/lamp_py/ingestion/utils.py @@ -3,7 +3,7 @@ import gzip import shutil import pathlib -import datetime +from datetime import datetime, timezone import zoneinfo import tempfile from typing import Dict, List @@ -22,7 +22,7 @@ def group_sort_file_list(filepaths: List[str]) -> Dict[str, List[str]]: """ - group and sort list of filepaths by filename + Group and sort list of filepaths by filename expects s3 file paths that can be split on timestamp: @@ -41,7 +41,7 @@ def group_sort_file_list(filepaths: List[str]) -> Dict[str, List[str]]: def strip_timestamp(fileobject: str) -> str: """ - utility for sorting pulling timestamp string out of file path. + Utility for sorting pulling timestamp string out of file path. assumption is that the objects will have a bunch of "directories" that pathlib can parse out, and the filename will start with a timestamp "YYY-MM-DDTHH:MM:SSZ" (20 char) format. @@ -71,7 +71,7 @@ def strip_timestamp(fileobject: str) -> str: return grouped_files -def date_from_feed_version(feed_version: str) -> datetime.datetime: +def date_from_feed_version(feed_version: str) -> datetime: """ Extract date from feed_version text. Raise LookupError if no date found. @@ -84,7 +84,7 @@ def date_from_feed_version(feed_version: str) -> datetime.datetime: :return: datetime extracted from feed_version text """ - utc_tz = datetime.timezone.utc + utc_tz = timezone.utc local_tz = zoneinfo.ZoneInfo("US/Eastern") pattern_1 = r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})" @@ -95,11 +95,11 @@ def date_from_feed_version(feed_version: str) -> datetime.datetime: if pattern_1_result is not None: date_str = pattern_1_result.group(0) - date_dt = datetime.datetime.fromisoformat(date_str).replace(tzinfo=utc_tz) + date_dt = datetime.fromisoformat(date_str).replace(tzinfo=utc_tz) date_dt = date_dt.astimezone(local_tz).replace(tzinfo=None) elif pattern_2_result is not None: date_str = pattern_2_result.group(0) - date_dt = datetime.datetime.strptime(date_str, "%m/%d/%y") + date_dt = datetime.strptime(date_str, "%m/%d/%y") else: raise LookupError(f"No date found in feed_version: '{feed_version}'") @@ -108,7 +108,7 @@ def date_from_feed_version(feed_version: str) -> datetime.datetime: def ordered_schedule_frame() -> pl.DataFrame: """ - create de-duplicated and ordered frame of all MBTA gtfs schedules from + Create de-duplicated and ordered frame of all MBTA gtfs schedules from https://cdn.mbta.com/archive/archived_feeds.txt de-duplicated on: published_date @@ -177,7 +177,7 @@ def file_as_bytes_buf(file: str) -> BytesIO: def flatten_table_schema(table: pyarrow.table) -> pyarrow.table: - """flatten pyarrow table if struct column type exists""" + """Flatten pyarrow table if struct column type exists""" for field in table.schema: if str(field.type).startswith("struct"): return flatten_table_schema(table.flatten()) @@ -185,7 +185,7 @@ def flatten_table_schema(table: pyarrow.table) -> pyarrow.table: def explode_table_column(table: pyarrow.table, column: str) -> pyarrow.table: - """explode list-like column of pyarrow table by creating rows for each list value""" + """Explode list-like column of pyarrow table by creating rows for each list value""" other_columns = list(table.schema.names) other_columns.remove(column) indices = pc.list_parent_indices(table[column]) @@ -205,7 +205,7 @@ def explode_table_column(table: pyarrow.table, column: str) -> pyarrow.table: def hash_gtfs_rt_table(table: pyarrow.Table) -> pyarrow.Table: """ - add GTFS_RT_HASH_COL column to pyarrow table, if not already present + Add GTFS_RT_HASH_COL column to pyarrow table, if not already present """ log = ProcessLogger( "hash_gtfs_rt_table", table_rows=table.num_rows, table_mbs=round(table.nbytes / (1024 * 1024), 2) @@ -229,7 +229,7 @@ def hash_gtfs_rt_table(table: pyarrow.Table) -> pyarrow.Table: def hash_gtfs_rt_parquet(path: str) -> None: """ - add GTFS_RT_HASH_COL to local parquet file, if not already present + Add GTFS_RT_HASH_COL to local parquet file, if not already present """ ds = pq.ParquetFile(path) ds_schema = ds.schema.to_arrow_schema() @@ -259,7 +259,7 @@ def hash_gtfs_rt_parquet(path: str) -> None: def gzip_file(path: str, keep_original: bool = False) -> None: """ - gzip local file + Gzip local file :param path: local file path :param keep_original: keep original non-gzip file = False @@ -274,3 +274,23 @@ def gzip_file(path: str, keep_original: bool = False) -> None: os.remove(path) logger.log_complete() + + +def assign_datetime_to_binned_interval(ts: datetime, interval_mins: int) -> datetime: + """ + Truncate a UTC datetime to its wall-clock-aligned interval start. + + For time_chunk_minutes=15: + 01:07 -> 01:00, 01:15 -> 01:15, 01:29 -> 01:15, 23:59 -> 23:45 + + Returns a naive datetime (no timezone) matching the existing data_parts key convention. + """ + total_minutes = ts.hour * 60 + ts.minute + aligned_minutes = (total_minutes // interval_mins) * interval_mins + return datetime( + year=ts.year, + month=ts.month, + day=ts.day, + hour=aligned_minutes // 60, + minute=aligned_minutes % 60, + ) diff --git a/tests/ingestion/test_yield_check_periodic.py b/tests/ingestion/test_yield_check_periodic.py new file mode 100644 index 00000000..8ad1b89b --- /dev/null +++ b/tests/ingestion/test_yield_check_periodic.py @@ -0,0 +1,178 @@ +from datetime import datetime +from pathlib import Path +from queue import Queue +from typing import List, Tuple + +import pyarrow +import pytest + +from lamp_py.ingestion.convert_gtfs_rt import TableData +from lamp_py.ingestion.convert_gtfs_rt_fullset import GtfsRtFullPartitionConverter +from lamp_py.ingestion.converter import ConfigType +from lamp_py.ingestion.utils import assign_datetime_to_binned_interval +from lamp_py.runtime_utils.process_logger import ProcessLogger +from lamp_py.runtime_utils.remote_files import LAMP + + +def make_converter( + time_chunk_minutes: int = 15, move_source_on_completion: bool = False +) -> GtfsRtFullPartitionConverter: + """Create a converter with periodic yielding enabled.""" + return GtfsRtFullPartitionConverter( + config_type=ConfigType.RT_TRIP_UPDATES, + metadata_queue=Queue(), + move_source_on_completion=move_source_on_completion, + time_chunk_minutes=time_chunk_minutes, + ) + + +def make_dummy_table(num_rows: int = 10) -> pyarrow.Table: + """Create a minimal pyarrow table for testing.""" + return pyarrow.table({"col": list(range(num_rows))}) + + +@pytest.mark.parametrize( + "chunk_minutes, input_ts, expected_ts", + [ + # 15-minute chunks + pytest.param(15, datetime(2026, 5, 4, 1, 15, 0), datetime(2026, 5, 4, 1, 15), id="15m-on-boundary"), + pytest.param(15, datetime(2026, 5, 4, 1, 14, 59), datetime(2026, 5, 4, 1, 0), id="15m-just-before-boundary"), + pytest.param(15, datetime(2026, 5, 4, 1, 15, 1), datetime(2026, 5, 4, 1, 15), id="15m-just-after-boundary"), + pytest.param(15, datetime(2026, 5, 4, 1, 0, 0), datetime(2026, 5, 4, 1, 0), id="15m-start-of-hour"), + pytest.param(15, datetime(2026, 5, 4, 23, 59, 59), datetime(2026, 5, 4, 23, 45), id="15m-end-of-day"), + pytest.param(15, datetime(2026, 5, 4, 0, 0, 0), datetime(2026, 5, 4, 0, 0), id="15m-midnight"), + # 30-minute chunks + pytest.param(30, datetime(2026, 5, 4, 1, 29), datetime(2026, 5, 4, 1, 0), id="30m-before-boundary"), + pytest.param(30, datetime(2026, 5, 4, 1, 30), datetime(2026, 5, 4, 1, 30), id="30m-on-boundary"), + pytest.param(30, datetime(2026, 5, 4, 1, 59), datetime(2026, 5, 4, 1, 30), id="30m-end-of-hour"), + # 5-minute chunks + pytest.param(5, datetime(2026, 5, 4, 1, 7), datetime(2026, 5, 4, 1, 5), id="5m-mid-interval"), + pytest.param(5, datetime(2026, 5, 4, 1, 10), datetime(2026, 5, 4, 1, 10), id="5m-on-boundary"), + # 60-minute chunks + pytest.param(60, datetime(2026, 5, 4, 1, 59), datetime(2026, 5, 4, 1, 0), id="60m-end-of-hour"), + pytest.param(60, datetime(2026, 5, 4, 2, 0), datetime(2026, 5, 4, 2, 0), id="60m-on-boundary"), + ], +) +def test_assign_datetime_to_binned_interval(chunk_minutes: int, input_ts: datetime, expected_ts: datetime) -> None: + """assign_datetime_to_binned_interval truncates timestamps to wall-clock-aligned interval starts.""" + assert assign_datetime_to_binned_interval(input_ts, chunk_minutes) == expected_ts + + +@pytest.mark.parametrize( + "interval_minutes, current_ts, flush, expected_yield_count, expected_remaining_keys", + [ + # current_ts in same interval as data: should not yield + pytest.param( + [(1, 15)], + datetime(2026, 5, 4, 1, 20), + False, + 0, + [datetime(2026, 5, 4, 1, 15)], + id="same-interval-no-yield", + ), + # current_ts before interval start: should not yield + pytest.param( + [(1, 15)], + datetime(2026, 5, 4, 1, 10), + False, + 0, + [datetime(2026, 5, 4, 1, 15)], + id="past-interval-no-yield", + ), + # current_ts in a later interval: should yield old interval + pytest.param( + [(1, 15)], + datetime(2026, 5, 4, 1, 35), + False, + 1, + [], + id="later-interval-yields", + ), + # flush yields everything regardless of current_ts + pytest.param( + [(1, 0), (1, 15)], + datetime(2026, 5, 4, 23, 59), + True, + 2, + [], + id="flush-yields-all", + ), + # selective yield: current at 01:00, should yield 01:15 and 01:30 but not 01:00 + pytest.param( + [(1, 0), (1, 15), (1, 30)], + datetime(2026, 5, 4, 1, 20), + False, + 1, + [datetime(2026, 5, 4, 1, 15), datetime(2026, 5, 4, 1, 30)], + id="selective-yield-older-only", + ), + ], +) +def test_yield_check_periodic( + interval_minutes: List[Tuple[int, int]], + current_ts: datetime, + flush: bool, + expected_yield_count: int, + expected_remaining_keys: List[datetime], +) -> None: + """yield_check_periodic yields completed intervals based on current_ts position.""" + c = make_converter(15) + logger = ProcessLogger("test") + logger.log_start() + + for hour, minute in interval_minutes: + key = datetime(2026, 5, 4, hour, minute) + c.data_parts[key] = TableData() + c.data_parts[key].table = make_dummy_table(1) + c.data_parts[key].files = [f"file_{hour}_{minute}.json.gz"] + + tables = list(c.yield_check_periodic(logger, current_ts, flush=flush)) + + assert len(tables) == expected_yield_count + assert list(c.data_parts.keys()) == expected_remaining_keys + + +@pytest.mark.parametrize("move", [True, False]) +def test_yield_check_periodic_archives_files(move: bool) -> None: + """Yielded intervals should move their files to archive_files.""" + c = make_converter(15, move_source_on_completion=move) + logger = ProcessLogger("test") + logger.log_start() + + key = datetime(2026, 5, 4, 1, 15) + c.data_parts[key] = TableData() + c.data_parts[key].table = make_dummy_table(5) + c.data_parts[key].files = ["a.json.gz", "b.json.gz"] + + list(c.yield_check_periodic(logger, datetime(2026, 5, 4, 1, 40))) + + assert ("a.json.gz" in c.archive_files) == move + assert ("b.json.gz" in c.archive_files) == move + + +def test_clean_local_folders_removes_oldest_day(tmp_path: Path) -> None: + """ + clean_local_folders should keep only the two newest day partitions. + Handles case when ingestion is down for a duration exceeding a couple days + """ + c = GtfsRtFullPartitionConverter( + config_type=ConfigType.RT_TRIP_UPDATES, + metadata_queue=Queue(), + local_output_location=tmp_path.as_posix(), + ) + + root = tmp_path / LAMP / str(ConfigType.RT_TRIP_UPDATES) + day_folders = [ + root / "year=2026" / "month=5" / "day=1", + root / "year=2026" / "month=5" / "day=2", + root / "year=2026" / "month=5" / "day=3", + ] + for day_folder in day_folders: + day_folder.mkdir(parents=True, exist_ok=True) + (day_folder / "part.parquet").write_text("x", encoding="utf-8") + + c.clean_local_folders() + + assert not day_folders[0].exists() + assert day_folders[1].exists() + assert day_folders[2].exists() diff --git a/validation/validate_data_across_environment.py b/validation/validate_data_across_environment.py new file mode 100644 index 00000000..39429aed --- /dev/null +++ b/validation/validate_data_across_environment.py @@ -0,0 +1,262 @@ +# type: ignore +import marimo + +__generated_with = "0.14.17" +app = marimo.App(width="medium") + + +@app.cell +def _(): + import marimo as mo + import polars as pl + import pyarrow.parquet as pq + + return pl, pq + + +@app.cell +def _(pl, pq): + + def get_rowgroup_statistics(parquet_path: str) -> pl.DataFrame: + """ + Extract row group statistics from a Parquet file and return as a Polars DataFrame. + Args: + parquet_path: Path to the parquet file + Returns: + Polars DataFrame with row group statistics + """ + parquet_file = pq.ParquetFile(parquet_path) + file_metadata = parquet_file.metadata + + records = [] + for i in range(file_metadata.num_row_groups): + row_group_metadata = file_metadata.row_group(i) + + for col_i in range(row_group_metadata.num_columns): + column_chunk_metadata = row_group_metadata.column(col_i) + stats = column_chunk_metadata.statistics + + records.append( + { + "row_group": i, + "num_rows": row_group_metadata.num_rows, + "total_byte_size": row_group_metadata.total_byte_size, + "column_index": col_i, + "column_path": column_chunk_metadata.path_in_schema, + "compressed_size": column_chunk_metadata.total_compressed_size, + "uncompressed_size": column_chunk_metadata.total_uncompressed_size, + "min_value": str(stats.min) if stats and stats.min is not None else None, + "max_value": str(stats.max) if stats and stats.max is not None else None, + "null_count": stats.null_count if stats else None, + "num_values": stats.num_values if stats else None, + } + ) + + return pl.DataFrame(records) + + return (get_rowgroup_statistics,) + + +@app.cell +def _(get_rowgroup_statistics): + df = get_rowgroup_statistics( + "s3://mbta-ctd-dataplatform-staging-springboard/lamp/RT_TRIP_UPDATES/year=2026/month=5/day=1/2026-05-01T00:00:00.parquet" + ) + return (df,) + + +@app.cell +def _(df): + df["total_byte_size"].sum() / (1024**3) + return + + +@app.cell +def _(df): + df["num_rows"].sum() + return + + +@app.cell +def _(): + import duckdb + + return (duckdb,) + + +@app.cell +def _(duckdb): + con = duckdb.connect() + return (con,) + + +@app.cell +def _(con): + con.execute("load aws;") + return + + +@app.cell +def _(con): + con.execute( + """ + CREATE OR REPLACE SECRET secret ( + TYPE s3, + PROVIDER credential_chain + ); + """ + ) + return + + +@app.cell +def _(): + 2000000 * 171 + return + + +@app.cell +def _(con): + route1 = con.sql( + """SELECT * from read_parquet('s3://mbta-ctd-dataplatform-staging-springboard/lamp/RT_TRIP_UPDATES/year=2026/month=5/day=16/2026-05-16T00:00:00.parquet') + where "trip_update.trip.route_id" == '1'""" + ).pl() + return (route1,) + + +@app.cell +def _(route1): + route1 + return + + +@app.cell +def _(con): + route1_prod = con.sql( + """SELECT * from read_parquet('s3://mbta-ctd-dataplatform-springboard/lamp/RT_TRIP_UPDATES/year=2026/month=5/day=16/2026-05-16T00:00:00.parquet') + where "trip_update.trip.route_id" == '1'""" + ).pl() + return (route1_prod,) + + +@app.cell +def _(con): + route1_prod_vp = con.sql( + """SELECT * from read_parquet('s3://mbta-ctd-dataplatform-springboard/lamp/RT_VEHICLE_POSITIONS/year=2026/month=5/day=16/2026-05-16T00:00:00.parquet') + where "vehicle.trip.route_id" == '1'""" + ).pl() + return (route1_prod_vp,) + + +@app.cell +def _(route1_prod_vp): + route1_prod_vp.write_parquet("prod_vp.parquet") + return + + +@app.cell +def _(con): + route1_dev = con.sql( + """SELECT * from read_parquet('s3://mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2026/month=5/day=16/*.parquet') + where "trip_update.trip.route_id" == '1'""" + ).pl() + return (route1_dev,) + + +@app.cell +def _(route1, route1_prod): + route1_prod.sort("feed_timestamp").equals(route1.sort("feed_timestamp")) + return + + +@app.cell +def _(route1): + route1.describe() + return + + +@app.cell +def _(route1, route1_prod): + route1_prod.describe().equals(route1.describe()) + return + + +@app.cell +def _(pl, route1_dev, route1_prod): + route1_prod.describe().equals( + route1_dev.unique(subset=pl.exclude("feed_timestamp")).select(route1_prod.columns).describe() + ) + return + + +@app.cell +def _(pl, route1_dev, route1_prod): + route1_dev.unique(subset=pl.exclude("feed_timestamp")).select(route1_prod.columns).describe() + return + + +@app.cell +def _(): + return + + +@app.cell +def _(route1, route1_dev, route1_prod): + route1.write_parquet("stage.parquet") + route1_prod.write_parquet("prod.parquet") + route1_dev.write_parquet("dev.parquet") + return + + +@app.cell +def _(): + import pyarrow.dataset as ds + + dset = ds.dataset( + "s3://mbta-ctd-dataplatform-staging-springboard/lamp/RT_TRIP_UPDATES/year=2026/month=5/day=1/2026-05-01T00:00:00.parquet", + format="parquet", + ) + + return (dset,) + + +@app.cell +def _(dset, pl): + pl.scan_pyarrow_dataset(dset).filter(pl.col("trip_update.trip.route_id") == "1").collect() + return + + +@app.cell +def _(df): + routes = df.select("trip_update.trip.route_id").unique().collect() + return (routes,) + + +@app.cell +def _(df): + df.columns + return + + +@app.cell +def _(routes): + routes.sort(by="trip_update.trip.route_id") + return + + +@app.cell +def _(get_rowgroup_statistics): + get_rowgroup_statistics( + "s3://mbta-ctd-dataplatform-staging-springboard/lamp/RT_TRIP_UPDATES/year=2026/month=5/day=1/2026-05-01T00:00:00.parquet" + ) + return + + +@app.cell +def _(df, pl): + df1 = df.filter(pl.col("trip_update.trip.route_id") == "1").collect() + return + + +if __name__ == "__main__": + app.run()