From 60dc4381b59beac9e7d8027db800a6b36c96d177 Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Thu, 28 May 2026 15:40:28 -0400 Subject: [PATCH 01/12] ingestion refactor changes --- poetry.lock | 43 +-- pyproject.toml | 3 +- runners/run_ingest_s3_files.py | 7 + src/lamp_py/ingestion/backfill/__init__.py | 1 + src/lamp_py/ingestion/config_busloc_trip.py | 2 +- src/lamp_py/ingestion/config_rt_trip.py | 2 +- src/lamp_py/ingestion/convert_gtfs.py | 6 +- src/lamp_py/ingestion/convert_gtfs_rt.py | 28 +- .../ingestion/convert_gtfs_rt_fullset.py | 297 ++++++++++++++++++ src/lamp_py/ingestion/converter.py | 10 +- src/lamp_py/ingestion/glides.py | 4 +- src/lamp_py/ingestion/gtfs_rt_detail.py | 2 +- src/lamp_py/ingestion/ingest_gtfs.py | 23 +- src/lamp_py/ingestion/pipeline.py | 23 +- src/lamp_py/ingestion/utils.py | 16 +- tests/ingestion/test_yield_check_periodic.py | 163 ++++++++++ .../validate_data_across_environment.py | 262 +++++++++++++++ 17 files changed, 829 insertions(+), 63 deletions(-) create mode 100644 runners/run_ingest_s3_files.py create mode 100644 src/lamp_py/ingestion/backfill/__init__.py create mode 100644 src/lamp_py/ingestion/convert_gtfs_rt_fullset.py create mode 100644 tests/ingestion/test_yield_check_periodic.py create mode 100644 validation/validate_data_across_environment.py diff --git a/poetry.lock b/poetry.lock index e663b0ed..7c5f00fa 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1992,41 +1992,42 @@ testing = ["pytest"] [[package]] name = "marimo" -version = "0.16.5" +version = "0.23.6" description = "A library for making reactive notebooks and apps" optional = false -python-versions = ">=3.9" -groups = ["investigation"] +python-versions = ">=3.10" files = [ - {file = "marimo-0.16.5-py3-none-any.whl", hash = "sha256:1f98c0ee0fed9337e26c895c662f92cc578cdd03502c194eac9ceeb434bf479b"}, - {file = "marimo-0.16.5.tar.gz", hash = "sha256:8f5939d3c4e67ff25f6cfeefe731971ed7f3346c20098034b923a24a0d7770d6"}, + {file = "marimo-0.23.6-py3-none-any.whl", hash = "sha256:e8d19d875b0212600faa80eaaed0fd3f34dfb7b5241e630cf2b1cedd8dd14509"}, + {file = "marimo-0.23.6.tar.gz", hash = "sha256:d63aeeee1e9ea7cac79bf2530daba915199153dce4d156fade7546474679d3ca"}, ] [package.dependencies] click = ">=8.0,<9" docutils = ">=0.16.0" itsdangerous = ">=2.0.0" -jedi = ">=0.18.0" -loro = {version = ">=1.5.0", markers = "python_full_version >= \"3.11.0\""} +jedi = ">=0.18.0,<0.20.0" +loro = ">=1.10.0" markdown = ">=3.6,<4" -msgspec = ">=0.19.0" +msgspec = ">=0.20.0" narwhals = ">=2.0.0" packaging = "*" psutil = ">=5.0" -pygments = ">=2.13,<3" -pymdown-extensions = ">=10.15,<11" -pyyaml = ">=6.0" -starlette = ">=0.35.0,<0.36.0 || >0.36.0" +pygments = ">=2.19,<3" +pymdown-extensions = ">=10.21.2,<11" +pyyaml = ">=6.0.1" +pyzmq = {version = ">=27.1.0", markers = "python_full_version < \"3.15\""} +starlette = ">=0.37.2" tomlkit = ">=0.12.0" -uvicorn = ">=0.22.0,<0.36.0" +uvicorn = ">=0.22.0" websockets = ">=14.2.0" [package.extras] -dev = ["black (>=23.12.1,<23.13.0)", "click (>=8.0,<9)", "duckdb (>=1.0.0)", "openai (>=1.55.3)", "opentelemetry-api (>=1.26.0,<1.27.0)", "opentelemetry-sdk (>=1.26.0,<1.27.0)", "ruff (>=0.13.2)", "sqlglot[rs] (>=26.2.0)"] lsp = ["python-lsp-ruff (>=2.0.0)", "python-lsp-server (>=1.13.0)"] -mcp = ["mcp (>=1.0.0) ; python_full_version >= \"3.10.0\"", "pydantic (>2) ; python_full_version >= \"3.10.0\""] -recommended = ["altair (>=5.4.0)", "duckdb (>=1.0.0)", "nbformat (>=5.7.0)", "openai (>=1.55.3)", "polars[pyarrow] (>=1.9.0)", "ruff", "sqlglot[rs] (>=26.2.0)"] -sql = ["duckdb (>=1.0.0)", "polars[pyarrow] (>=1.9.0)", "sqlglot[rs] (>=26.2.0)"] +mcp = ["mcp (>=1.0.0)", "pydantic (>2)"] +otel = ["opentelemetry-api (>=1.28.0,<1.29.0)", "opentelemetry-exporter-otlp-proto-grpc (>=1.28.0,<1.29.0)", "opentelemetry-exporter-otlp-proto-http (>=1.28.0,<1.29.0)", "opentelemetry-sdk (>=1.28.0,<1.29.0)"] +recommended = ["altair (>=5.4.0)", "marimo[sandbox]", "marimo[sql]", "nbformat (>=5.7.0)", "pydantic-ai-slim[openai] (>=1.52.0)", "ruff"] +sandbox = ["pyzmq (>=27.1.0)", "uv (>=0.9.21)"] +sql = ["duckdb (>=1.0.0)", "polars[pyarrow] (>=1.9.0)", "sqlglot[c] (>=26.8.0)"] [[package]] name = "markdown" @@ -3325,14 +3326,14 @@ testutils = ["gitpython (>3)"] [[package]] name = "pymdown-extensions" -version = "10.21" +version = "10.21.3" description = "Extension pack for Python Markdown." optional = false python-versions = ">=3.9" groups = ["investigation"] files = [ - {file = "pymdown_extensions-10.21-py3-none-any.whl", hash = "sha256:91b879f9f864d49794c2d9534372b10150e6141096c3908a455e45ca72ad9d3f"}, - {file = "pymdown_extensions-10.21.tar.gz", hash = "sha256:39f4a020f40773f6b2ff31d2cd2546c2c04d0a6498c31d9c688d2be07e1767d5"}, + {file = "pymdown_extensions-10.21.3-py3-none-any.whl", hash = "sha256:d7a5d08014fc571e80ca21dd6f854e31f94c489800350564d55d15b3c41e76b6"}, + {file = "pymdown_extensions-10.21.3.tar.gz", hash = "sha256:72cfcf55f07aea0d4af2c4f11dd4e52466ddfb1bb819673146398e0bd3a77354"}, ] [package.dependencies] @@ -4502,4 +4503,4 @@ propcache = ">=0.2.1" [metadata] lock-version = "2.1" python-versions = "^3.12" -content-hash = "92a0fd856642cb354f15e8c22a84bc23972365b7f4cac91c2301e5cf4d7feb45" +content-hash = "453e3d5378b2505a1183687b8b1d9cfd14d7122e097fc7921b6a7fed611ebf0f" diff --git a/pyproject.toml b/pyproject.toml index 47a05f96..2c5fcd2f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,7 +54,8 @@ ipykernel = "^7.1.0" matplotlib = "^3.9.0" seaborn = "^0.13.2" tabulate = "^0.9.0" -marimo = "^0.16" +marimo = "^0.23" +plotly = "^6.5" [tool.poetry.group.dev.dependencies] black = "^24.3.0" diff --git a/runners/run_ingest_s3_files.py b/runners/run_ingest_s3_files.py new file mode 100644 index 00000000..74cf9ec2 --- /dev/null +++ b/runners/run_ingest_s3_files.py @@ -0,0 +1,7 @@ +from lamp_py.ingestion.ingest_gtfs import ingest_s3_files + +# from lamp_py.postgres.postgres_utils import start_rds_writer_process + +# metadata_queue, rds_process = start_rds_writer_process() + +ingest_s3_files(None, bucket_filter="lamp") diff --git a/src/lamp_py/ingestion/backfill/__init__.py b/src/lamp_py/ingestion/backfill/__init__.py new file mode 100644 index 00000000..a3507758 --- /dev/null +++ b/src/lamp_py/ingestion/backfill/__init__.py @@ -0,0 +1 @@ +"""Tools for backfilling and reprocessing data""" diff --git a/src/lamp_py/ingestion/config_busloc_trip.py b/src/lamp_py/ingestion/config_busloc_trip.py index ee376ce5..897be141 100644 --- a/src/lamp_py/ingestion/config_busloc_trip.py +++ b/src/lamp_py/ingestion/config_busloc_trip.py @@ -17,7 +17,7 @@ class RtBusTripDetail(GTFSRTDetail): """ def transform_for_write(self, table: pyarrow.table) -> pyarrow.table: - """modify table schema before write to parquet""" + """Modify table schema before write to parquet""" return flatten_table_schema(explode_table_column(flatten_table_schema(table), "trip_update.stop_time_update")) @property diff --git a/src/lamp_py/ingestion/config_rt_trip.py b/src/lamp_py/ingestion/config_rt_trip.py index 2cf53f07..07af4190 100644 --- a/src/lamp_py/ingestion/config_rt_trip.py +++ b/src/lamp_py/ingestion/config_rt_trip.py @@ -17,7 +17,7 @@ class RtTripDetail(GTFSRTDetail): """ def transform_for_write(self, table: pyarrow.table) -> pyarrow.table: - """modify table schema before write to parquet""" + """Modify table schema before write to parquet""" return flatten_table_schema(explode_table_column(flatten_table_schema(table), "trip_update.stop_time_update")) @property diff --git a/src/lamp_py/ingestion/convert_gtfs.py b/src/lamp_py/ingestion/convert_gtfs.py index 85464cf6..ed5c5f69 100644 --- a/src/lamp_py/ingestion/convert_gtfs.py +++ b/src/lamp_py/ingestion/convert_gtfs.py @@ -26,7 +26,7 @@ def gtfs_files_to_convert() -> List[Tuple[str, int]]: """ - create list of Tuple[GTFS url, version_key] for GtfsConverter + Create list of Tuple[GTFS url, version_key] for GtfsConverter version_key is based on published_dt """ @@ -75,7 +75,7 @@ def convert(self) -> None: def process_schedule(self, url: str, version_key: int) -> None: """ - convert a schedule gtfs zip file into tables. the zip file is + Convert a schedule gtfs zip file into tables. the zip file is essentially a small database with each contained file (outside of feed info) acting as its own table. info on the gtfs scheduling standard can be found at http://gtfs.org/schedule/ @@ -99,7 +99,7 @@ def process_schedule(self, url: str, version_key: int) -> None: def create_table(self, gtfs_zip: zipfile.ZipFile, table_filename: str, version_key: int) -> None: """ - read a csv table out of a gtfs static schedule file, add a timestamp + Read a csv table out of a gtfs static schedule file, add a timestamp column to each row, and write it as a parquet file on s3, partitioned by the timestamp diff --git a/src/lamp_py/ingestion/convert_gtfs_rt.py b/src/lamp_py/ingestion/convert_gtfs_rt.py index d1d6cb18..634100dd 100644 --- a/src/lamp_py/ingestion/convert_gtfs_rt.py +++ b/src/lamp_py/ingestion/convert_gtfs_rt.py @@ -136,7 +136,12 @@ class GtfsRtConverter(Converter): https_mbta_integration.mybluemix.net_vehicleCount.gz """ - def __init__(self, config_type: ConfigType, metadata_queue: Queue[Optional[str]], max_workers: int = 4) -> None: + def __init__( + self, + config_type: ConfigType, + metadata_queue: Queue[Optional[str]], + max_workers: int = 8, + ) -> None: Converter.__init__(self, config_type, metadata_queue) # Depending on filename, assign self.details to correct implementation @@ -204,7 +209,7 @@ def convert(self) -> None: def thread_init(self) -> None: """ - initialize the filesystem in each convert thread + Initialize the filesystem in each convert thread update the active fs to use the s3 filesystem for all loading if the first file starts with s3 @@ -217,11 +222,10 @@ def thread_init(self) -> None: def process_files(self) -> Iterable[pyarrow.table]: """ - iterate through all of the files to be converted + Iterate through all of the files to be converted only yield a new table when table size crosses over min_rows of yield_check """ - process_logger = ProcessLogger( "create_pyarrow_tables", config_type=str(self.config_type), @@ -272,7 +276,7 @@ def process_files(self) -> Iterable[pyarrow.table]: def yield_check(self, process_logger: ProcessLogger, min_rows: int = 2_000_000) -> Iterable[pyarrow.table]: """ - yield all tables in the data_parts map that have been sufficiently + Yield all tables in the data_parts map that have been sufficiently processed. @min_rows - how many rows the table must have to be yielded @@ -367,7 +371,7 @@ def gz_to_pyarrow(self, filename: str) -> Tuple[Optional[datetime], str, Optiona def partition_dt(self, table: pyarrow.Table) -> datetime: """ - verify partition structure of pyarrow Table + Verify partition structure of pyarrow Table :param table: pyarrow Table to verify @@ -419,7 +423,7 @@ def sync_with_s3(self, local_path: str) -> bool: def make_hash_dataset(self, table: pyarrow.Table, local_path: str) -> pd.Dataset: """ - create dataset, with hash column, that will be written to parquet file + Create dataset, with hash column, that will be written to parquet file :param table: pyarrow Table :param local_path: path to local parquet file @@ -451,9 +455,9 @@ def make_hash_dataset(self, table: pyarrow.Table, local_path: str) -> pd.Dataset # pylint: disable=R0914 # pylint too many local variables (more than 15) - def write_local_pq(self, table: pyarrow.Table, local_path: str) -> None: + def write_local_pq_partition(self, table: pyarrow.Table, local_path: str) -> None: """ - merge pyarrow Table with existing local_path parquet file + Merge pyarrow Table with existing local_path parquet file :param table: pyarrow Table :param local_path: path to local parquet file @@ -586,7 +590,7 @@ def continuous_pq_update(self, table: pyarrow.Table) -> None: log.add_metadata(local_path=local_path) - self.write_local_pq(table, local_path) + self.write_local_pq_partition(table, local_path) self.send_metadata(local_path.replace(self.tmp_folder, S3_SPRINGBOARD)) # record the number of rows in the final parquet file for logging @@ -610,7 +614,7 @@ def continuous_pq_update(self, table: pyarrow.Table) -> None: def clean_local_folders(self) -> None: """ - clean local temp folders + Clean local temp folders """ days_to_keep = 2 root_folder = os.path.join( @@ -630,7 +634,7 @@ def clean_local_folders(self) -> None: def move_s3_files(self) -> None: """ - move archive and error files to their respective s3 buckets. + Move archive and error files to their respective s3 buckets. """ if len(self.error_files) > 0: self.error_files = move_s3_objects( diff --git a/src/lamp_py/ingestion/convert_gtfs_rt_fullset.py b/src/lamp_py/ingestion/convert_gtfs_rt_fullset.py new file mode 100644 index 00000000..474e866d --- /dev/null +++ b/src/lamp_py/ingestion/convert_gtfs_rt_fullset.py @@ -0,0 +1,297 @@ +# pylint: disable=too-many-positional-arguments,too-many-arguments, too-many-locals, redefined-outer-name, R0801 + +from concurrent.futures import Future, ThreadPoolExecutor +import logging +import os +from datetime import datetime +from pathlib import Path +from queue import Queue +from typing import Dict, Iterable, List, Optional, Tuple +import pyarrow +import polars as pl + +from lamp_py.aws.s3 import move_s3_objects, upload_file +from lamp_py.ingestion.config_rt_trip import RtTripDetail +from lamp_py.ingestion.convert_gtfs_rt import GtfsRtConverter, TableData +from lamp_py.ingestion.converter import ConfigType + +from lamp_py.runtime_utils.process_logger import ProcessLogger +from lamp_py.runtime_utils.remote_files import LAMP, S3_ARCHIVE, S3_ERROR, S3Location + + +# pylint: disable=too-many-arguments,too-many-instance-attributes +class GtfsRtFullPartitionConverter(GtfsRtConverter): + """ + Converter that handles GTFS Real Time JSON data + + https_cdn.mbta.com_realtime_TripUpdates_enhanced.json.gz + """ + + def __init__( + self, + config_type: ConfigType, + metadata_queue: Queue[Optional[str]], + local_output_location: str = "/tmp/gtfs-rt-fullset/", + remote_output_location: S3Location | None = None, + polars_filter: pl.Expr = pl.lit(True), # default to true - which will essentially not filter + max_workers: int = 8, + time_chunk_minutes: int = 15, + move_source_on_completion: bool = False, + ) -> None: + """ + Initialize GTFS-RT fullset converter with time-chunked partitioning. + + Args: + config_type: Type of GTFS-RT configuration (trip updates, vehicle positions, alerts). + metadata_queue: Queue for metadata communication. + local_output_location: Local directory for temporary parquet files. + remote_output_location: S3 location for final output; if None, files stay local. + polars_filter: Polars expression to filter data at conversion time; defaults to no filtering. + max_workers: Number of worker threads for parallel processing. + time_chunk_minutes: Minutes for time-based partitioning (e.g., 15 min intervals). + move_source_on_completion: If True, move source files to archive after completion. + """ + GtfsRtConverter.__init__(self, config_type, metadata_queue, max_workers=max_workers) + + if time_chunk_minutes < 5: + raise ValueError( + "time_chunk_minutes must be at least 5 to ensure proper partitioning and avoid too many small files" + ) + self.detail = RtTripDetail() + self.tmp_folder = os.path.join(local_output_location, LAMP, str(self.config_type)) + self.remote_output_location = remote_output_location + self.data_parts: Dict[datetime, TableData] = {} + self.filter = polars_filter + self.move_source_on_completion = move_source_on_completion + self.time_chunk_minutes = time_chunk_minutes + + def convert(self) -> None: + """ + Convert all files in self.files to time-chunked parquet partitions. + + Specialization over GtfsRtConverter::convert() - this converter does not + apply unique() on the incoming GTFS-RT data, and will partition files by + day or by time chunk (e.g. 15 min intervals) depending on the + time_chunk_minutes parameter. It will also apply a polars filter to the + data at the point of conversion from json.gz to pyarrow table, which + should help reduce the amount of data being written out and speed up the + conversion process. This converter is applicable for live ingestion and + backfill tasks. + """ + process_logger = ProcessLogger( + "fullset_gtfs_parquet_table_creator", + table_type="gtfs-rt", + config_type=str(self.config_type), + file_count=len(self.files), + ) + process_logger.log_start() + + table_count = 0 + move_futures: List[Future] = [] + try: + for table, partition_dt in self.process_files(): + if table.num_rows == 0: + continue + + path_suffix = os.path.join( + f"year={partition_dt.year}", + f"month={partition_dt.month}", + f"day={partition_dt.day}", + f"{partition_dt.isoformat()}.parquet", + ) + + local_path = os.path.join(self.tmp_folder, path_suffix) + + os.makedirs(Path(local_path).parent, exist_ok=True) + + self.write_local_pq_partition(table, local_path) + + # in backfill mode, we don't want to move files around in s3 - + # we just want to write the converted files to the output location + # (which could be a temp location or the final archive location). + # in non-backfill mode, we want to move the original gtfs-rt files from + # incoming to archive after successful conversion. + if self.move_source_on_completion: + self.move_s3_files() + + # mirror on s3 if remote output location is provided + if self.remote_output_location is not None: + s3_path = os.path.join(self.remote_output_location.s3_uri, path_suffix) + upload_file(local_path, s3_path) + + pool = pyarrow.default_memory_pool() + pool.release_unused() + table_count += 1 + process_logger.add_metadata(table_count=table_count) + + # wait for all background s3 moves to finish + for future in move_futures: + future.result() + + except Exception as exception: + process_logger.log_failure(exception) + else: + process_logger.log_complete() + finally: + self.data_parts = {} + if self.move_source_on_completion: + self.move_s3_files() + self.clean_local_folders() + + @staticmethod + def _move_s3_files_async(archive_files: List[str], error_files: List[str]) -> None: + """Move archive and error files to S3 in a background thread.""" + if error_files: + move_s3_objects(error_files, os.path.join(S3_ERROR, LAMP)) + if archive_files: + move_s3_objects(archive_files, os.path.join(S3_ARCHIVE, LAMP)) + + def write_local_pq_partition(self, table: pyarrow.Table, local_path: str) -> None: + """ + Just write the file out.. + + this should be already sorted by timestamp based on how self.files is yielded. + """ + df: pl.DataFrame = pl.from_arrow(table).filter(self.filter) # type: ignore[arg-type, assignment] + + # read local_path if exists and concat with table + # this handles the case where a prior iteration yielded an incomplete time chunk, + # and we are now filling in the remaining record that is part of that chunk + if os.path.exists(local_path): + existing_table: pl.DataFrame = pl.read_parquet(local_path) + df = pl.concat([existing_table, df]) + + if not self.move_source_on_completion: + df = ( + df.unique() + ) # unique is appropriate here to ensure we don't write duplicates when files are NOT moved for backfill usecase + df.write_parquet(local_path, compression="zstd", compression_level=3) + + def process_files(self) -> Iterable[pyarrow.table]: + """ + Yield time-chunked parquet tables from all input files. + + Applies a polars filter to narrow results at the source to reduce write + churn. Converts json.gz input files to pyarrow tables, groups into time + chunk intervals, and yields tables for completed intervals as we go. + """ + process_logger = ProcessLogger( + "fullset_create_pyarrow_tables", + config_type=str(self.config_type), + ) + process_logger.log_start() + + with ThreadPoolExecutor(max_workers=self.max_workers, initializer=self.thread_init) as pool: + # for file in self.files: + # result_dt, result_filename, rt_data = self.gz_to_pyarrow(file) + + for result_dt, result_filename, rt_data in pool.map(self.gz_to_pyarrow, self.files): + # errors in gtfs_rt conversions are handled in the gz_to_pyarrow + # function. if one is encountered, the datetime will be none. log + # the error and move on to the next file. + + if result_dt is None: + logging.error( + "skipping processing: %s", + result_filename, + ) + continue + + # create key for self.data_parts dictionary that bins based on the chunk interval + dt_part = self._interval_key(result_dt) + + # create new self.table_groups entry for key if it doesn't exist + if dt_part not in self.data_parts: + self.data_parts[dt_part] = TableData() + self.data_parts[dt_part].table = self.detail.transform_for_write(rt_data) + else: + self.data_parts[dt_part].table = pyarrow.concat_tables( + [self.data_parts[dt_part].table, self.detail.transform_for_write(rt_data)] + ) + + self.data_parts[dt_part].files.append(result_filename) + + # we're mapping each gz to a range dt_part. when we have > 1 dt_parts in the map, + # that means we've processed files from at least 2 different time intervals and + # can start yielding tables for the intervals that are complete. + # this relies on self.files being sorted - enforced by add_files(), + # and ThreadPoolExecutor/map yields __next__ iterator, i.e. returns in the right order + if len(self.data_parts) > 1: + yield from self.yield_check_periodic(process_logger, result_dt) + + yield from self.yield_check_periodic(process_logger, flush=True) + + process_logger.add_metadata(file_count=0, number_of_rows=0) + process_logger.log_complete() + + def partition_column(self) -> str: + """Return the column name for partitioning output parquet files.""" + return "trip_update.trip.route_id" + + def table_sort_order(self) -> List[Tuple[str, str]]: + """Return sort order specification for output parquet tables.""" + return [ + ("feed_timestamp", "ascending"), + ("trip_update.trip.route_pattern_id", "ascending"), + ("trip_update.trip.direction_id", "ascending"), + ("trip_update.vehicle.id", "ascending"), + ] + + def _interval_key(self, ts: datetime) -> datetime: + """ + Truncate a UTC datetime to its wall-clock-aligned interval start. + + For time_chunk_minutes=15: + 01:07 -> 01:00, 01:15 -> 01:15, 01:29 -> 01:15, 23:59 -> 23:45 + + Returns a naive datetime (no timezone) matching the existing data_parts key convention. + """ + total_minutes = ts.hour * 60 + ts.minute + aligned_minutes = (total_minutes // self.time_chunk_minutes) * self.time_chunk_minutes + return datetime( + year=ts.year, + month=ts.month, + day=ts.day, + hour=aligned_minutes // 60, + minute=aligned_minutes % 60, + ) + + def yield_check_periodic( + self, + process_logger: ProcessLogger, + current_ts: datetime = datetime.now(), + flush: bool = False, + ) -> Iterable[Tuple[pyarrow.table, datetime]]: + """ + Yield completed time-chunk intervals from data_parts. + + When flush=True, yield all remaining intervals regardless of current_ts. + + @current_ts - the feed timestamp of the file just processed + @flush - if True, yield everything remaining + """ + current_interval = self._interval_key(current_ts) + for iter_ts in list(self.data_parts.keys()): + table = self.data_parts[iter_ts].table + if table is None: + continue + + # yield if we've moved past this interval + # or if flushing all remaining data + if flush or current_interval > iter_ts: + # only populate archive_files if we're in live ingestion mode + if self.move_source_on_completion: + self.archive_files += self.data_parts[iter_ts].files + + process_logger.add_metadata( + file_count=len(self.data_parts[iter_ts].files), + number_of_rows=table.num_rows, + interval_start=iter_ts.isoformat(), + ) + process_logger.log_complete() + process_logger.add_metadata(file_count=0, number_of_rows=0, print_log=False) + process_logger.log_start() + + yield (table, iter_ts) + + del self.data_parts[iter_ts] diff --git a/src/lamp_py/ingestion/converter.py b/src/lamp_py/ingestion/converter.py index 9388cf1c..fb6d0362 100644 --- a/src/lamp_py/ingestion/converter.py +++ b/src/lamp_py/ingestion/converter.py @@ -113,16 +113,20 @@ def __init__(self, config_type: ConfigType, metadata_queue: Queue[Optional[str]] self.metadata_queue: Queue[Optional[str]] = metadata_queue def add_files(self, files: List[str]) -> None: - """add files to this converter""" + """Add files to this converter""" self.files += files + def reset_files(self) -> None: + """Remove files from this converter""" + self.files = [] + def send_metadata(self, written_file: str) -> None: - """send metadata path to rds writer process""" + """Send metadata path to rds writer process""" self.metadata_queue.put(written_file) @abstractmethod def convert(self) -> None: """ - convert files to pyarrow tables, write them to s3 as parquete, and move + Convert files to pyarrow tables, write them to s3 as parquete, and move files from incoming to archive (or error) """ diff --git a/src/lamp_py/ingestion/glides.py b/src/lamp_py/ingestion/glides.py index 6eea7939..1df91ea1 100644 --- a/src/lamp_py/ingestion/glides.py +++ b/src/lamp_py/ingestion/glides.py @@ -213,7 +213,7 @@ def unique_key(self) -> str: """Key in record['data'] that is unique to this event type""" def download_remote(self) -> None: - """download the remote parquet path for appending""" + """Download the remote parquet path for appending""" if os.path.exists(self.local_path): os.remove(self.local_path) @@ -405,7 +405,7 @@ def ingest_glides_events( kinesis_reader: KinesisReader, metadata_queue: Queue[Optional[str]], upload: bool = False ) -> None: """ - ingest glides records from the kinesis stream and add them to parquet files + Ingest glides records from the kinesis stream and add them to parquet files """ process_logger = ProcessLogger(process_name="ingest_glides_events") process_logger.log_start() diff --git a/src/lamp_py/ingestion/gtfs_rt_detail.py b/src/lamp_py/ingestion/gtfs_rt_detail.py index 1ca80a27..8b0d8590 100644 --- a/src/lamp_py/ingestion/gtfs_rt_detail.py +++ b/src/lamp_py/ingestion/gtfs_rt_detail.py @@ -37,7 +37,7 @@ class GTFSRTDetail[T: FeedEntityTable, M: FeedMessage](ABC): """ def transform_for_write(self, table: pyarrow.table) -> pyarrow.table: - """modify table schema before write to parquet""" + """Modify table schema before write to parquet""" return flatten_table_schema(table) @property diff --git a/src/lamp_py/ingestion/ingest_gtfs.py b/src/lamp_py/ingestion/ingest_gtfs.py index 81aa1728..2f95f79b 100644 --- a/src/lamp_py/ingestion/ingest_gtfs.py +++ b/src/lamp_py/ingestion/ingest_gtfs.py @@ -11,10 +11,10 @@ move_s3_objects, file_list_from_s3, ) +from lamp_py.ingestion.convert_gtfs_rt_fullset import GtfsRtFullPartitionConverter from lamp_py.runtime_utils.process_logger import ProcessLogger from lamp_py.ingestion.convert_gtfs import GtfsConverter -from lamp_py.ingestion.convert_gtfs_rt import GtfsRtConverter from lamp_py.ingestion.converter import ( ConfigType, Converter, @@ -24,7 +24,7 @@ NoImplException, IgnoreIngestion, ) -from lamp_py.runtime_utils.remote_files import LAMP, S3_ERROR, S3_INCOMING +from lamp_py.runtime_utils.remote_files import LAMP, S3_ERROR, S3_INCOMING, S3_SPRINGBOARD, S3Location from lamp_py.ingestion.utils import group_sort_file_list from lamp_py.ingestion.compress_gtfs.gtfs_to_parquet import gtfs_to_parquet @@ -52,7 +52,7 @@ def run_converter(converter: Converter) -> None: def ingest_gtfs_archive(metadata_queue: Queue[Optional[str]]) -> None: """ - ingest gtfs schedules from MBTA GTFS schedule archive + Ingest gtfs schedules from MBTA GTFS schedule archive """ logger = ProcessLogger(process_name="ingest_gtfs") logger.log_start() @@ -65,7 +65,7 @@ def ingest_gtfs_archive(metadata_queue: Queue[Optional[str]]) -> None: def ingest_s3_files(metadata_queue: Queue[Optional[str]], bucket_filter: str = LAMP) -> None: """ - get all of the filepaths currently in the incoming bucket, sort them into + Get all of the filepaths currently in the incoming bucket, sort them into batches of similar gtfs-rt files, convert each batch into tables, write the tables to parquet files in the springboard bucket, add the parquet filepaths to the metadata table as unprocessed, and move gtfs files to the @@ -75,7 +75,7 @@ def ingest_s3_files(metadata_queue: Queue[Optional[str]], bucket_filter: str = L logger.log_start() try: - files = file_list_from_s3(bucket_name=S3_INCOMING, file_prefix=bucket_filter, max_list_size=10000) + files = file_list_from_s3(bucket_name=S3_INCOMING, file_prefix=bucket_filter, max_list_size=50000) grouped_files = group_sort_file_list(files) @@ -94,7 +94,15 @@ def ingest_s3_files(metadata_queue: Queue[Optional[str]], bucket_filter: str = L try: config_type = ConfigType.from_filename(file_group[0]) if config_type not in converters: - converters[config_type] = GtfsRtConverter(config_type, metadata_queue) + if config_type in (ConfigType.RT_ALERTS, ConfigType.VEHICLE_COUNT, ConfigType.SCHEDULE): + converters[config_type] = GtfsConverter(config_type, metadata_queue) + else: # all TripUpdates, VehiclePositions + converters[config_type] = GtfsRtFullPartitionConverter( + config_type, + metadata_queue, + remote_output_location=S3Location(S3_SPRINGBOARD, os.path.join(LAMP, str(config_type))), + move_source_on_completion=True, + ) converters[config_type].add_files(file_group) except IgnoreIngestion: continue @@ -123,6 +131,7 @@ def ingest_s3_files(metadata_queue: Queue[Optional[str]], bucket_filter: str = L process_count = os.cpu_count() if process_count is None: process_count = 4 + if len(converters) > 0: with get_context("spawn").Pool(processes=process_count, maxtasksperchild=1) as pool: pool.map_async(run_converter, converters.values()) @@ -134,7 +143,7 @@ def ingest_s3_files(metadata_queue: Queue[Optional[str]], bucket_filter: str = L def ingest_gtfs(metadata_queue: Queue[Optional[str]], bucket_filter: str = LAMP) -> None: """ - ingest all gtfs file types + Ingest all gtfs file types static schedule files should be ingested first """ diff --git a/src/lamp_py/ingestion/pipeline.py b/src/lamp_py/ingestion/pipeline.py index 6ed15ec9..bcd645d3 100755 --- a/src/lamp_py/ingestion/pipeline.py +++ b/src/lamp_py/ingestion/pipeline.py @@ -1,5 +1,4 @@ #!/usr/bin/env python - import os import time import logging @@ -24,7 +23,7 @@ def main() -> None: """ - run the ingestion pipeline + Run the ingestion pipeline * setup metadata queue metadata writer process * setup a glides kinesis reader @@ -40,23 +39,41 @@ def main() -> None: # connect to the glides kinesis stream glides_reader = KinesisReader(stream_name="ctd-glides-prod") + # today = datetime.now(timezone.utc).date() + # run the event loop every 30 seconds while True: process_logger = ProcessLogger(process_name="main") process_logger.log_start() bucket_filter = LAMP check_for_sigterm(metadata_queue, rds_process) + ingest_gtfs(metadata_queue, bucket_filter=bucket_filter) ingest_glides_events(glides_reader, metadata_queue, upload=True) check_for_sigterm(metadata_queue, rds_process) + # # if datetime has moved on (i.e. it's the next day) + # if today != datetime.now(timezone.utc).date(): + + # in_processing_window=within_daily_processing_window() + # process_logger.add_metadata(in_processing_window=in_processing_window) + + # # this doesn't make use of the processing window fully. we need to have other parts of ingestion + # # populate a queue. this queue can be written to disk occasionally to pick back up...todo + # if in_processing_window: + # # it is next day, do repack and reset + # yesterday = today + # today = datetime.now(timezone.utc).date() + # status = consolidate_partitions_for_archive(yesterday, config=[ConfigType.TRIP_UPDATES]) + # process_logger.add_metadata(consolidate_day=yesterday,consolidation_status=status) + process_logger.log_complete() time.sleep(30) def start() -> None: - """configure and start the ingestion process""" + """Configure and start the ingestion process""" clear_folder("/tmp") # setup handling shutdown commands signal.signal(signal.SIGTERM, handle_ecs_sigterm) diff --git a/src/lamp_py/ingestion/utils.py b/src/lamp_py/ingestion/utils.py index 4f261d20..060e88ff 100644 --- a/src/lamp_py/ingestion/utils.py +++ b/src/lamp_py/ingestion/utils.py @@ -22,7 +22,7 @@ def group_sort_file_list(filepaths: List[str]) -> Dict[str, List[str]]: """ - group and sort list of filepaths by filename + Group and sort list of filepaths by filename expects s3 file paths that can be split on timestamp: @@ -41,7 +41,7 @@ def group_sort_file_list(filepaths: List[str]) -> Dict[str, List[str]]: def strip_timestamp(fileobject: str) -> str: """ - utility for sorting pulling timestamp string out of file path. + Utility for sorting pulling timestamp string out of file path. assumption is that the objects will have a bunch of "directories" that pathlib can parse out, and the filename will start with a timestamp "YYY-MM-DDTHH:MM:SSZ" (20 char) format. @@ -108,7 +108,7 @@ def date_from_feed_version(feed_version: str) -> datetime.datetime: def ordered_schedule_frame() -> pl.DataFrame: """ - create de-duplicated and ordered frame of all MBTA gtfs schedules from + Create de-duplicated and ordered frame of all MBTA gtfs schedules from https://cdn.mbta.com/archive/archived_feeds.txt de-duplicated on: published_date @@ -177,7 +177,7 @@ def file_as_bytes_buf(file: str) -> BytesIO: def flatten_table_schema(table: pyarrow.table) -> pyarrow.table: - """flatten pyarrow table if struct column type exists""" + """Flatten pyarrow table if struct column type exists""" for field in table.schema: if str(field.type).startswith("struct"): return flatten_table_schema(table.flatten()) @@ -185,7 +185,7 @@ def flatten_table_schema(table: pyarrow.table) -> pyarrow.table: def explode_table_column(table: pyarrow.table, column: str) -> pyarrow.table: - """explode list-like column of pyarrow table by creating rows for each list value""" + """Explode list-like column of pyarrow table by creating rows for each list value""" other_columns = list(table.schema.names) other_columns.remove(column) indices = pc.list_parent_indices(table[column]) @@ -205,7 +205,7 @@ def explode_table_column(table: pyarrow.table, column: str) -> pyarrow.table: def hash_gtfs_rt_table(table: pyarrow.Table) -> pyarrow.Table: """ - add GTFS_RT_HASH_COL column to pyarrow table, if not already present + Add GTFS_RT_HASH_COL column to pyarrow table, if not already present """ log = ProcessLogger( "hash_gtfs_rt_table", table_rows=table.num_rows, table_mbs=round(table.nbytes / (1024 * 1024), 2) @@ -229,7 +229,7 @@ def hash_gtfs_rt_table(table: pyarrow.Table) -> pyarrow.Table: def hash_gtfs_rt_parquet(path: str) -> None: """ - add GTFS_RT_HASH_COL to local parquet file, if not already present + Add GTFS_RT_HASH_COL to local parquet file, if not already present """ ds = pq.ParquetFile(path) ds_schema = ds.schema.to_arrow_schema() @@ -259,7 +259,7 @@ def hash_gtfs_rt_parquet(path: str) -> None: def gzip_file(path: str, keep_original: bool = False) -> None: """ - gzip local file + Gzip local file :param path: local file path :param keep_original: keep original non-gzip file = False diff --git a/tests/ingestion/test_yield_check_periodic.py b/tests/ingestion/test_yield_check_periodic.py new file mode 100644 index 00000000..ef531ee8 --- /dev/null +++ b/tests/ingestion/test_yield_check_periodic.py @@ -0,0 +1,163 @@ +"""Tests for time-based periodic yielding in GtfsRtFullPartitionConverter.""" + +from datetime import datetime +from queue import Queue +from typing import List, Tuple + +import pyarrow +import pytest + +from lamp_py.ingestion.convert_gtfs_rt import TableData +from lamp_py.ingestion.convert_gtfs_rt_fullset import GtfsRtFullPartitionConverter +from lamp_py.ingestion.converter import ConfigType +from lamp_py.runtime_utils.process_logger import ProcessLogger + + +def make_converter( + time_chunk_minutes: int = 15, move_source_on_completion: bool = False +) -> GtfsRtFullPartitionConverter: + """Create a converter with periodic yielding enabled.""" + return GtfsRtFullPartitionConverter( + config_type=ConfigType.RT_TRIP_UPDATES, + metadata_queue=Queue(), + move_source_on_completion=move_source_on_completion, + time_chunk_minutes=time_chunk_minutes, + ) + + +def make_dummy_table(num_rows: int = 10) -> pyarrow.Table: + """Create a minimal pyarrow table for testing.""" + return pyarrow.table({"col": list(range(num_rows))}) + + +@pytest.mark.parametrize( + "chunk_minutes, input_ts, expected_ts", + [ + # 15-minute chunks + pytest.param(15, datetime(2026, 5, 4, 1, 15, 0), datetime(2026, 5, 4, 1, 15), id="15m-on-boundary"), + pytest.param(15, datetime(2026, 5, 4, 1, 14, 59), datetime(2026, 5, 4, 1, 0), id="15m-just-before-boundary"), + pytest.param(15, datetime(2026, 5, 4, 1, 15, 1), datetime(2026, 5, 4, 1, 15), id="15m-just-after-boundary"), + pytest.param(15, datetime(2026, 5, 4, 1, 0, 0), datetime(2026, 5, 4, 1, 0), id="15m-start-of-hour"), + pytest.param(15, datetime(2026, 5, 4, 23, 59, 59), datetime(2026, 5, 4, 23, 45), id="15m-end-of-day"), + pytest.param(15, datetime(2026, 5, 4, 0, 0, 0), datetime(2026, 5, 4, 0, 0), id="15m-midnight"), + # 30-minute chunks + pytest.param(30, datetime(2026, 5, 4, 1, 29), datetime(2026, 5, 4, 1, 0), id="30m-before-boundary"), + pytest.param(30, datetime(2026, 5, 4, 1, 30), datetime(2026, 5, 4, 1, 30), id="30m-on-boundary"), + pytest.param(30, datetime(2026, 5, 4, 1, 59), datetime(2026, 5, 4, 1, 30), id="30m-end-of-hour"), + # 5-minute chunks + pytest.param(5, datetime(2026, 5, 4, 1, 7), datetime(2026, 5, 4, 1, 5), id="5m-mid-interval"), + pytest.param(5, datetime(2026, 5, 4, 1, 10), datetime(2026, 5, 4, 1, 10), id="5m-on-boundary"), + # 60-minute chunks + pytest.param(60, datetime(2026, 5, 4, 1, 59), datetime(2026, 5, 4, 1, 0), id="60m-end-of-hour"), + pytest.param(60, datetime(2026, 5, 4, 2, 0), datetime(2026, 5, 4, 2, 0), id="60m-on-boundary"), + ], +) +def test_interval_key(chunk_minutes: int, input_ts: datetime, expected_ts: datetime) -> None: + """_interval_key truncates timestamps to wall-clock-aligned interval starts.""" + c = make_converter(chunk_minutes) + # pylint: disable=protected-access + assert c._interval_key(input_ts) == expected_ts + + +@pytest.mark.parametrize( + "interval_minutes, current_ts, flush, expected_yield_count, expected_remaining_keys", + [ + # current_ts in same interval as data: should not yield + pytest.param( + [(1, 15)], + datetime(2026, 5, 4, 1, 20), + False, + 0, + [datetime(2026, 5, 4, 1, 15)], + id="same-interval-no-yield", + ), + # current_ts before interval start: should not yield + pytest.param( + [(1, 15)], + datetime(2026, 5, 4, 1, 10), + False, + 0, + [datetime(2026, 5, 4, 1, 15)], + id="past-interval-no-yield", + ), + # current_ts in a later interval: should yield old interval + pytest.param( + [(1, 15)], + datetime(2026, 5, 4, 1, 35), + False, + 1, + [], + id="later-interval-yields", + ), + # flush yields everything regardless of current_ts + pytest.param( + [(1, 0), (1, 15)], + datetime(2026, 5, 4, 23, 59), + True, + 2, + [], + id="flush-yields-all", + ), + # selective yield: current at 01:00, should yield 01:15 and 01:30 but not 01:00 + pytest.param( + [(1, 0), (1, 15), (1, 30)], + datetime(2026, 5, 4, 1, 20), + False, + 1, + [datetime(2026, 5, 4, 1, 15), datetime(2026, 5, 4, 1, 30)], + id="selective-yield-older-only", + ), + ], +) +def test_yield_check_periodic( + interval_minutes: List[Tuple[int, int]], + current_ts: datetime, + flush: bool, + expected_yield_count: int, + expected_remaining_keys: List[datetime], +) -> None: + """yield_check_periodic yields completed intervals based on current_ts position.""" + c = make_converter(15) + logger = ProcessLogger("test") + logger.log_start() + + for hour, minute in interval_minutes: + key = datetime(2026, 5, 4, hour, minute) + c.data_parts[key] = TableData() + c.data_parts[key].table = make_dummy_table(1) + c.data_parts[key].files = [f"file_{hour}_{minute}.json.gz"] + + tables = list(c.yield_check_periodic(logger, current_ts, flush=flush)) + + assert len(tables) == expected_yield_count + assert list(c.data_parts.keys()) == expected_remaining_keys + + +def test_yield_check_periodic_skips_none_table() -> None: + """Intervals with None table should not be yielded even when flush=True.""" + c = make_converter(15) + logger = ProcessLogger("test") + logger.log_start() + + key = datetime(2026, 5, 4, 1, 15) + c.data_parts[key] = TableData() # table is None by default + + tables = list(c.yield_check_periodic(logger, datetime(2026, 5, 4, 0, 50), flush=True)) + assert len(tables) == 0 + + +def test_yield_check_periodic_archives_files() -> None: + """Yielded intervals should move their files to archive_files.""" + c = make_converter(15, move_source_on_completion=True) + logger = ProcessLogger("test") + logger.log_start() + + key = datetime(2026, 5, 4, 1, 15) + c.data_parts[key] = TableData() + c.data_parts[key].table = make_dummy_table(5) + c.data_parts[key].files = ["a.json.gz", "b.json.gz"] + + list(c.yield_check_periodic(logger, datetime(2026, 5, 4, 1, 40))) + + assert "a.json.gz" in c.archive_files + assert "b.json.gz" in c.archive_files diff --git a/validation/validate_data_across_environment.py b/validation/validate_data_across_environment.py new file mode 100644 index 00000000..39429aed --- /dev/null +++ b/validation/validate_data_across_environment.py @@ -0,0 +1,262 @@ +# type: ignore +import marimo + +__generated_with = "0.14.17" +app = marimo.App(width="medium") + + +@app.cell +def _(): + import marimo as mo + import polars as pl + import pyarrow.parquet as pq + + return pl, pq + + +@app.cell +def _(pl, pq): + + def get_rowgroup_statistics(parquet_path: str) -> pl.DataFrame: + """ + Extract row group statistics from a Parquet file and return as a Polars DataFrame. + Args: + parquet_path: Path to the parquet file + Returns: + Polars DataFrame with row group statistics + """ + parquet_file = pq.ParquetFile(parquet_path) + file_metadata = parquet_file.metadata + + records = [] + for i in range(file_metadata.num_row_groups): + row_group_metadata = file_metadata.row_group(i) + + for col_i in range(row_group_metadata.num_columns): + column_chunk_metadata = row_group_metadata.column(col_i) + stats = column_chunk_metadata.statistics + + records.append( + { + "row_group": i, + "num_rows": row_group_metadata.num_rows, + "total_byte_size": row_group_metadata.total_byte_size, + "column_index": col_i, + "column_path": column_chunk_metadata.path_in_schema, + "compressed_size": column_chunk_metadata.total_compressed_size, + "uncompressed_size": column_chunk_metadata.total_uncompressed_size, + "min_value": str(stats.min) if stats and stats.min is not None else None, + "max_value": str(stats.max) if stats and stats.max is not None else None, + "null_count": stats.null_count if stats else None, + "num_values": stats.num_values if stats else None, + } + ) + + return pl.DataFrame(records) + + return (get_rowgroup_statistics,) + + +@app.cell +def _(get_rowgroup_statistics): + df = get_rowgroup_statistics( + "s3://mbta-ctd-dataplatform-staging-springboard/lamp/RT_TRIP_UPDATES/year=2026/month=5/day=1/2026-05-01T00:00:00.parquet" + ) + return (df,) + + +@app.cell +def _(df): + df["total_byte_size"].sum() / (1024**3) + return + + +@app.cell +def _(df): + df["num_rows"].sum() + return + + +@app.cell +def _(): + import duckdb + + return (duckdb,) + + +@app.cell +def _(duckdb): + con = duckdb.connect() + return (con,) + + +@app.cell +def _(con): + con.execute("load aws;") + return + + +@app.cell +def _(con): + con.execute( + """ + CREATE OR REPLACE SECRET secret ( + TYPE s3, + PROVIDER credential_chain + ); + """ + ) + return + + +@app.cell +def _(): + 2000000 * 171 + return + + +@app.cell +def _(con): + route1 = con.sql( + """SELECT * from read_parquet('s3://mbta-ctd-dataplatform-staging-springboard/lamp/RT_TRIP_UPDATES/year=2026/month=5/day=16/2026-05-16T00:00:00.parquet') + where "trip_update.trip.route_id" == '1'""" + ).pl() + return (route1,) + + +@app.cell +def _(route1): + route1 + return + + +@app.cell +def _(con): + route1_prod = con.sql( + """SELECT * from read_parquet('s3://mbta-ctd-dataplatform-springboard/lamp/RT_TRIP_UPDATES/year=2026/month=5/day=16/2026-05-16T00:00:00.parquet') + where "trip_update.trip.route_id" == '1'""" + ).pl() + return (route1_prod,) + + +@app.cell +def _(con): + route1_prod_vp = con.sql( + """SELECT * from read_parquet('s3://mbta-ctd-dataplatform-springboard/lamp/RT_VEHICLE_POSITIONS/year=2026/month=5/day=16/2026-05-16T00:00:00.parquet') + where "vehicle.trip.route_id" == '1'""" + ).pl() + return (route1_prod_vp,) + + +@app.cell +def _(route1_prod_vp): + route1_prod_vp.write_parquet("prod_vp.parquet") + return + + +@app.cell +def _(con): + route1_dev = con.sql( + """SELECT * from read_parquet('s3://mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2026/month=5/day=16/*.parquet') + where "trip_update.trip.route_id" == '1'""" + ).pl() + return (route1_dev,) + + +@app.cell +def _(route1, route1_prod): + route1_prod.sort("feed_timestamp").equals(route1.sort("feed_timestamp")) + return + + +@app.cell +def _(route1): + route1.describe() + return + + +@app.cell +def _(route1, route1_prod): + route1_prod.describe().equals(route1.describe()) + return + + +@app.cell +def _(pl, route1_dev, route1_prod): + route1_prod.describe().equals( + route1_dev.unique(subset=pl.exclude("feed_timestamp")).select(route1_prod.columns).describe() + ) + return + + +@app.cell +def _(pl, route1_dev, route1_prod): + route1_dev.unique(subset=pl.exclude("feed_timestamp")).select(route1_prod.columns).describe() + return + + +@app.cell +def _(): + return + + +@app.cell +def _(route1, route1_dev, route1_prod): + route1.write_parquet("stage.parquet") + route1_prod.write_parquet("prod.parquet") + route1_dev.write_parquet("dev.parquet") + return + + +@app.cell +def _(): + import pyarrow.dataset as ds + + dset = ds.dataset( + "s3://mbta-ctd-dataplatform-staging-springboard/lamp/RT_TRIP_UPDATES/year=2026/month=5/day=1/2026-05-01T00:00:00.parquet", + format="parquet", + ) + + return (dset,) + + +@app.cell +def _(dset, pl): + pl.scan_pyarrow_dataset(dset).filter(pl.col("trip_update.trip.route_id") == "1").collect() + return + + +@app.cell +def _(df): + routes = df.select("trip_update.trip.route_id").unique().collect() + return (routes,) + + +@app.cell +def _(df): + df.columns + return + + +@app.cell +def _(routes): + routes.sort(by="trip_update.trip.route_id") + return + + +@app.cell +def _(get_rowgroup_statistics): + get_rowgroup_statistics( + "s3://mbta-ctd-dataplatform-staging-springboard/lamp/RT_TRIP_UPDATES/year=2026/month=5/day=1/2026-05-01T00:00:00.parquet" + ) + return + + +@app.cell +def _(df, pl): + df1 = df.filter(pl.col("trip_update.trip.route_id") == "1").collect() + return + + +if __name__ == "__main__": + app.run() From e9b85b5553c88f9daa42f77e89e9ef471a6a6d51 Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Sun, 31 May 2026 22:06:11 -0400 Subject: [PATCH 02/12] ai: found and corrected bug in local tmp directory concatenating. agree this is the issue - seen in local running --- .../ingestion/convert_gtfs_rt_fullset.py | 14 +++++----- tests/ingestion/test_yield_check_periodic.py | 27 +++++++++++++++++++ 2 files changed, 33 insertions(+), 8 deletions(-) diff --git a/src/lamp_py/ingestion/convert_gtfs_rt_fullset.py b/src/lamp_py/ingestion/convert_gtfs_rt_fullset.py index 474e866d..feb17841 100644 --- a/src/lamp_py/ingestion/convert_gtfs_rt_fullset.py +++ b/src/lamp_py/ingestion/convert_gtfs_rt_fullset.py @@ -1,6 +1,6 @@ # pylint: disable=too-many-positional-arguments,too-many-arguments, too-many-locals, redefined-outer-name, R0801 -from concurrent.futures import Future, ThreadPoolExecutor +from concurrent.futures import ThreadPoolExecutor import logging import os from datetime import datetime @@ -58,7 +58,10 @@ def __init__( "time_chunk_minutes must be at least 5 to ensure proper partitioning and avoid too many small files" ) self.detail = RtTripDetail() - self.tmp_folder = os.path.join(local_output_location, LAMP, str(self.config_type)) + # Keep tmp_folder as the base temp root to match GtfsRtConverter helpers + # (clean_local_folders, continuous_pq_update, etc.), which append + # lamp/ internally. + self.tmp_folder = local_output_location self.remote_output_location = remote_output_location self.data_parts: Dict[datetime, TableData] = {} self.filter = polars_filter @@ -87,7 +90,6 @@ def convert(self) -> None: process_logger.log_start() table_count = 0 - move_futures: List[Future] = [] try: for table, partition_dt in self.process_files(): if table.num_rows == 0: @@ -100,7 +102,7 @@ def convert(self) -> None: f"{partition_dt.isoformat()}.parquet", ) - local_path = os.path.join(self.tmp_folder, path_suffix) + local_path = os.path.join(self.tmp_folder, LAMP, str(self.config_type), path_suffix) os.makedirs(Path(local_path).parent, exist_ok=True) @@ -124,10 +126,6 @@ def convert(self) -> None: table_count += 1 process_logger.add_metadata(table_count=table_count) - # wait for all background s3 moves to finish - for future in move_futures: - future.result() - except Exception as exception: process_logger.log_failure(exception) else: diff --git a/tests/ingestion/test_yield_check_periodic.py b/tests/ingestion/test_yield_check_periodic.py index ef531ee8..8425a773 100644 --- a/tests/ingestion/test_yield_check_periodic.py +++ b/tests/ingestion/test_yield_check_periodic.py @@ -1,6 +1,7 @@ """Tests for time-based periodic yielding in GtfsRtFullPartitionConverter.""" from datetime import datetime +from pathlib import Path from queue import Queue from typing import List, Tuple @@ -11,6 +12,7 @@ from lamp_py.ingestion.convert_gtfs_rt_fullset import GtfsRtFullPartitionConverter from lamp_py.ingestion.converter import ConfigType from lamp_py.runtime_utils.process_logger import ProcessLogger +from lamp_py.runtime_utils.remote_files import LAMP def make_converter( @@ -161,3 +163,28 @@ def test_yield_check_periodic_archives_files() -> None: assert "a.json.gz" in c.archive_files assert "b.json.gz" in c.archive_files + + +def test_clean_local_folders_removes_oldest_day(tmp_path: Path) -> None: + """clean_local_folders should keep only the two newest day partitions.""" + c = GtfsRtFullPartitionConverter( + config_type=ConfigType.RT_TRIP_UPDATES, + metadata_queue=Queue(), + local_output_location=tmp_path.as_posix(), + ) + + root = tmp_path / LAMP / str(ConfigType.RT_TRIP_UPDATES) + day_folders = [ + root / "year=2026" / "month=5" / "day=1", + root / "year=2026" / "month=5" / "day=2", + root / "year=2026" / "month=5" / "day=3", + ] + for day_folder in day_folders: + day_folder.mkdir(parents=True, exist_ok=True) + (day_folder / "part.parquet").write_text("x", encoding="utf-8") + + c.clean_local_folders() + + assert not day_folders[0].exists() + assert day_folders[1].exists() + assert day_folders[2].exists() From 51f2e09480e7c5e3e7d988fa44262f953223f193 Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Thu, 11 Jun 2026 23:27:02 -0400 Subject: [PATCH 03/12] address pr comment nits/commenting. remove stray unused move_s3_files_async, and remove hardcoded self.detail (bug) --- src/lamp_py/ingestion/convert_gtfs_rt.py | 6 ++- .../ingestion/convert_gtfs_rt_fullset.py | 41 ++++++++----------- 2 files changed, 20 insertions(+), 27 deletions(-) diff --git a/src/lamp_py/ingestion/convert_gtfs_rt.py b/src/lamp_py/ingestion/convert_gtfs_rt.py index 634100dd..61cee232 100644 --- a/src/lamp_py/ingestion/convert_gtfs_rt.py +++ b/src/lamp_py/ingestion/convert_gtfs_rt.py @@ -190,8 +190,10 @@ def convert(self) -> None: continue self.continuous_pq_update(table) - pool = pyarrow.default_memory_pool() - pool.release_unused() + + # try to get pyarrow to limit memory usage after each loop. + # this is a "ask nicely and pray" move...we can't manage memory directly in python. + pyarrow.default_memory_pool().release_unused() table_count += 1 process_logger.add_metadata(table_count=table_count) # limit number of tables produced on each event loop diff --git a/src/lamp_py/ingestion/convert_gtfs_rt_fullset.py b/src/lamp_py/ingestion/convert_gtfs_rt_fullset.py index feb17841..59c13b12 100644 --- a/src/lamp_py/ingestion/convert_gtfs_rt_fullset.py +++ b/src/lamp_py/ingestion/convert_gtfs_rt_fullset.py @@ -42,14 +42,19 @@ def __init__( Initialize GTFS-RT fullset converter with time-chunked partitioning. Args: - config_type: Type of GTFS-RT configuration (trip updates, vehicle positions, alerts). + config_type: Type of GTFS-RT configuration (trip updates, vehicle positions). metadata_queue: Queue for metadata communication. local_output_location: Local directory for temporary parquet files. remote_output_location: S3 location for final output; if None, files stay local. polars_filter: Polars expression to filter data at conversion time; defaults to no filtering. max_workers: Number of worker threads for parallel processing. time_chunk_minutes: Minutes for time-based partitioning (e.g., 15 min intervals). - move_source_on_completion: If True, move source files to archive after completion. + move_source_on_completion: If True, move source files to archive after completion. + For the LAMP usecase, `source` in this context refers to the `delta` or `archive` + bucket, for ingestion or backfill respectively. The output is uploaded regardless, + but the `source` is only conditionally moved. + `delta` -> `archive` True, do this, + `archive` -> `archive` False, don't do this """ GtfsRtConverter.__init__(self, config_type, metadata_queue, max_workers=max_workers) @@ -57,7 +62,7 @@ def __init__( raise ValueError( "time_chunk_minutes must be at least 5 to ensure proper partitioning and avoid too many small files" ) - self.detail = RtTripDetail() + # Keep tmp_folder as the base temp root to match GtfsRtConverter helpers # (clean_local_folders, continuous_pq_update, etc.), which append # lamp/ internally. @@ -78,7 +83,7 @@ def convert(self) -> None: time_chunk_minutes parameter. It will also apply a polars filter to the data at the point of conversion from json.gz to pyarrow table, which should help reduce the amount of data being written out and speed up the - conversion process. This converter is applicable for live ingestion and + conversion process. This converter is suitable for live ingestion and backfill tasks. """ process_logger = ProcessLogger( @@ -121,8 +126,10 @@ def convert(self) -> None: s3_path = os.path.join(self.remote_output_location.s3_uri, path_suffix) upload_file(local_path, s3_path) - pool = pyarrow.default_memory_pool() - pool.release_unused() + # try to get pyarrow to limit memory usage after each loop. + # this is a "ask nicely and pray" move...we can't manage memory directly in python. + pyarrow.default_memory_pool().release_unused() + table_count += 1 process_logger.add_metadata(table_count=table_count) @@ -136,14 +143,6 @@ def convert(self) -> None: self.move_s3_files() self.clean_local_folders() - @staticmethod - def _move_s3_files_async(archive_files: List[str], error_files: List[str]) -> None: - """Move archive and error files to S3 in a background thread.""" - if error_files: - move_s3_objects(error_files, os.path.join(S3_ERROR, LAMP)) - if archive_files: - move_s3_objects(archive_files, os.path.join(S3_ARCHIVE, LAMP)) - def write_local_pq_partition(self, table: pyarrow.Table, local_path: str) -> None: """ Just write the file out.. @@ -157,12 +156,8 @@ def write_local_pq_partition(self, table: pyarrow.Table, local_path: str) -> Non # and we are now filling in the remaining record that is part of that chunk if os.path.exists(local_path): existing_table: pl.DataFrame = pl.read_parquet(local_path) - df = pl.concat([existing_table, df]) + df = pl.concat([existing_table, df], how="diagonal") - if not self.move_source_on_completion: - df = ( - df.unique() - ) # unique is appropriate here to ensure we don't write duplicates when files are NOT moved for backfill usecase df.write_parquet(local_path, compression="zstd", compression_level=3) def process_files(self) -> Iterable[pyarrow.table]: @@ -180,16 +175,13 @@ def process_files(self) -> Iterable[pyarrow.table]: process_logger.log_start() with ThreadPoolExecutor(max_workers=self.max_workers, initializer=self.thread_init) as pool: - # for file in self.files: - # result_dt, result_filename, rt_data = self.gz_to_pyarrow(file) - for result_dt, result_filename, rt_data in pool.map(self.gz_to_pyarrow, self.files): # errors in gtfs_rt conversions are handled in the gz_to_pyarrow # function. if one is encountered, the datetime will be none. log # the error and move on to the next file. if result_dt is None: - logging.error( + process_logger.log_failure( "skipping processing: %s", result_filename, ) @@ -271,8 +263,7 @@ def yield_check_periodic( current_interval = self._interval_key(current_ts) for iter_ts in list(self.data_parts.keys()): table = self.data_parts[iter_ts].table - if table is None: - continue + # yield if we've moved past this interval # or if flushing all remaining data From 81385d22112b16376e6719dc5e3faf781a95cdca Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Thu, 11 Jun 2026 23:33:40 -0400 Subject: [PATCH 04/12] black, lint, etc --- src/lamp_py/ingestion/convert_gtfs_rt.py | 6 ++-- .../ingestion/convert_gtfs_rt_fullset.py | 28 ++++++++----------- 2 files changed, 14 insertions(+), 20 deletions(-) diff --git a/src/lamp_py/ingestion/convert_gtfs_rt.py b/src/lamp_py/ingestion/convert_gtfs_rt.py index 61cee232..bcc98149 100644 --- a/src/lamp_py/ingestion/convert_gtfs_rt.py +++ b/src/lamp_py/ingestion/convert_gtfs_rt.py @@ -190,10 +190,10 @@ def convert(self) -> None: continue self.continuous_pq_update(table) - - # try to get pyarrow to limit memory usage after each loop. + + # try to get pyarrow to limit memory usage after each loop. # this is a "ask nicely and pray" move...we can't manage memory directly in python. - pyarrow.default_memory_pool().release_unused() + pyarrow.default_memory_pool().release_unused() table_count += 1 process_logger.add_metadata(table_count=table_count) # limit number of tables produced on each event loop diff --git a/src/lamp_py/ingestion/convert_gtfs_rt_fullset.py b/src/lamp_py/ingestion/convert_gtfs_rt_fullset.py index 59c13b12..47da57af 100644 --- a/src/lamp_py/ingestion/convert_gtfs_rt_fullset.py +++ b/src/lamp_py/ingestion/convert_gtfs_rt_fullset.py @@ -1,7 +1,6 @@ # pylint: disable=too-many-positional-arguments,too-many-arguments, too-many-locals, redefined-outer-name, R0801 from concurrent.futures import ThreadPoolExecutor -import logging import os from datetime import datetime from pathlib import Path @@ -10,13 +9,12 @@ import pyarrow import polars as pl -from lamp_py.aws.s3 import move_s3_objects, upload_file -from lamp_py.ingestion.config_rt_trip import RtTripDetail +from lamp_py.aws.s3 import upload_file from lamp_py.ingestion.convert_gtfs_rt import GtfsRtConverter, TableData from lamp_py.ingestion.converter import ConfigType from lamp_py.runtime_utils.process_logger import ProcessLogger -from lamp_py.runtime_utils.remote_files import LAMP, S3_ARCHIVE, S3_ERROR, S3Location +from lamp_py.runtime_utils.remote_files import LAMP, S3Location # pylint: disable=too-many-arguments,too-many-instance-attributes @@ -49,11 +47,11 @@ def __init__( polars_filter: Polars expression to filter data at conversion time; defaults to no filtering. max_workers: Number of worker threads for parallel processing. time_chunk_minutes: Minutes for time-based partitioning (e.g., 15 min intervals). - move_source_on_completion: If True, move source files to archive after completion. - For the LAMP usecase, `source` in this context refers to the `delta` or `archive` - bucket, for ingestion or backfill respectively. The output is uploaded regardless, - but the `source` is only conditionally moved. - `delta` -> `archive` True, do this, + move_source_on_completion: If True, move source files to archive after completion. + For the LAMP usecase, `source` in this context refers to the `delta` or `archive` + bucket, for ingestion or backfill respectively. The output is uploaded regardless, + but the `source` is only conditionally moved. + `delta` -> `archive` True, do this, `archive` -> `archive` False, don't do this """ GtfsRtConverter.__init__(self, config_type, metadata_queue, max_workers=max_workers) @@ -126,10 +124,10 @@ def convert(self) -> None: s3_path = os.path.join(self.remote_output_location.s3_uri, path_suffix) upload_file(local_path, s3_path) - # try to get pyarrow to limit memory usage after each loop. + # try to get pyarrow to limit memory usage after each loop. # this is a "ask nicely and pray" move...we can't manage memory directly in python. - pyarrow.default_memory_pool().release_unused() - + pyarrow.default_memory_pool().release_unused() + table_count += 1 process_logger.add_metadata(table_count=table_count) @@ -181,10 +179,7 @@ def process_files(self) -> Iterable[pyarrow.table]: # the error and move on to the next file. if result_dt is None: - process_logger.log_failure( - "skipping processing: %s", - result_filename, - ) + process_logger.add_metadata(result=result_dt, file=result_filename) continue # create key for self.data_parts dictionary that bins based on the chunk interval @@ -264,7 +259,6 @@ def yield_check_periodic( for iter_ts in list(self.data_parts.keys()): table = self.data_parts[iter_ts].table - # yield if we've moved past this interval # or if flushing all remaining data if flush or current_interval > iter_ts: From 5cdd410a47ff0d17305adddbcb3596a638254678 Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Thu, 11 Jun 2026 23:50:23 -0400 Subject: [PATCH 05/12] promote partitioning function to full method - fix fallout --- .../ingestion/convert_gtfs_rt_fullset.py | 24 ++------------- src/lamp_py/ingestion/utils.py | 30 +++++++++++++++---- tests/ingestion/test_yield_check_periodic.py | 11 +++---- 3 files changed, 32 insertions(+), 33 deletions(-) diff --git a/src/lamp_py/ingestion/convert_gtfs_rt_fullset.py b/src/lamp_py/ingestion/convert_gtfs_rt_fullset.py index 47da57af..5c28be76 100644 --- a/src/lamp_py/ingestion/convert_gtfs_rt_fullset.py +++ b/src/lamp_py/ingestion/convert_gtfs_rt_fullset.py @@ -13,6 +13,7 @@ from lamp_py.ingestion.convert_gtfs_rt import GtfsRtConverter, TableData from lamp_py.ingestion.converter import ConfigType +from lamp_py.ingestion.utils import assign_datetime_to_binned_interval from lamp_py.runtime_utils.process_logger import ProcessLogger from lamp_py.runtime_utils.remote_files import LAMP, S3Location @@ -183,7 +184,7 @@ def process_files(self) -> Iterable[pyarrow.table]: continue # create key for self.data_parts dictionary that bins based on the chunk interval - dt_part = self._interval_key(result_dt) + dt_part = assign_datetime_to_binned_interval(result_dt, self.time_chunk_minutes) # create new self.table_groups entry for key if it doesn't exist if dt_part not in self.data_parts: @@ -222,25 +223,6 @@ def table_sort_order(self) -> List[Tuple[str, str]]: ("trip_update.vehicle.id", "ascending"), ] - def _interval_key(self, ts: datetime) -> datetime: - """ - Truncate a UTC datetime to its wall-clock-aligned interval start. - - For time_chunk_minutes=15: - 01:07 -> 01:00, 01:15 -> 01:15, 01:29 -> 01:15, 23:59 -> 23:45 - - Returns a naive datetime (no timezone) matching the existing data_parts key convention. - """ - total_minutes = ts.hour * 60 + ts.minute - aligned_minutes = (total_minutes // self.time_chunk_minutes) * self.time_chunk_minutes - return datetime( - year=ts.year, - month=ts.month, - day=ts.day, - hour=aligned_minutes // 60, - minute=aligned_minutes % 60, - ) - def yield_check_periodic( self, process_logger: ProcessLogger, @@ -255,7 +237,7 @@ def yield_check_periodic( @current_ts - the feed timestamp of the file just processed @flush - if True, yield everything remaining """ - current_interval = self._interval_key(current_ts) + current_interval = assign_datetime_to_binned_interval(current_ts, self.time_chunk_minutes) for iter_ts in list(self.data_parts.keys()): table = self.data_parts[iter_ts].table diff --git a/src/lamp_py/ingestion/utils.py b/src/lamp_py/ingestion/utils.py index 060e88ff..a1d26f97 100644 --- a/src/lamp_py/ingestion/utils.py +++ b/src/lamp_py/ingestion/utils.py @@ -3,7 +3,7 @@ import gzip import shutil import pathlib -import datetime +from datetime import datetime, timezone import zoneinfo import tempfile from typing import Dict, List @@ -71,7 +71,7 @@ def strip_timestamp(fileobject: str) -> str: return grouped_files -def date_from_feed_version(feed_version: str) -> datetime.datetime: +def date_from_feed_version(feed_version: str) -> datetime: """ Extract date from feed_version text. Raise LookupError if no date found. @@ -84,7 +84,7 @@ def date_from_feed_version(feed_version: str) -> datetime.datetime: :return: datetime extracted from feed_version text """ - utc_tz = datetime.timezone.utc + utc_tz = timezone.utc local_tz = zoneinfo.ZoneInfo("US/Eastern") pattern_1 = r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})" @@ -95,11 +95,11 @@ def date_from_feed_version(feed_version: str) -> datetime.datetime: if pattern_1_result is not None: date_str = pattern_1_result.group(0) - date_dt = datetime.datetime.fromisoformat(date_str).replace(tzinfo=utc_tz) + date_dt = datetime.fromisoformat(date_str).replace(tzinfo=utc_tz) date_dt = date_dt.astimezone(local_tz).replace(tzinfo=None) elif pattern_2_result is not None: date_str = pattern_2_result.group(0) - date_dt = datetime.datetime.strptime(date_str, "%m/%d/%y") + date_dt = datetime.strptime(date_str, "%m/%d/%y") else: raise LookupError(f"No date found in feed_version: '{feed_version}'") @@ -274,3 +274,23 @@ def gzip_file(path: str, keep_original: bool = False) -> None: os.remove(path) logger.log_complete() + + +def assign_datetime_to_binned_interval(ts: datetime, interval_mins: int) -> datetime: + """ + Truncate a UTC datetime to its wall-clock-aligned interval start. + + For time_chunk_minutes=15: + 01:07 -> 01:00, 01:15 -> 01:15, 01:29 -> 01:15, 23:59 -> 23:45 + + Returns a naive datetime (no timezone) matching the existing data_parts key convention. + """ + total_minutes = ts.hour * 60 + ts.minute + aligned_minutes = (total_minutes // interval_mins) * interval_mins + return datetime( + year=ts.year, + month=ts.month, + day=ts.day, + hour=aligned_minutes // 60, + minute=aligned_minutes % 60, + ) diff --git a/tests/ingestion/test_yield_check_periodic.py b/tests/ingestion/test_yield_check_periodic.py index 8425a773..8ff17c46 100644 --- a/tests/ingestion/test_yield_check_periodic.py +++ b/tests/ingestion/test_yield_check_periodic.py @@ -1,5 +1,3 @@ -"""Tests for time-based periodic yielding in GtfsRtFullPartitionConverter.""" - from datetime import datetime from pathlib import Path from queue import Queue @@ -11,6 +9,7 @@ from lamp_py.ingestion.convert_gtfs_rt import TableData from lamp_py.ingestion.convert_gtfs_rt_fullset import GtfsRtFullPartitionConverter from lamp_py.ingestion.converter import ConfigType +from lamp_py.ingestion.utils import assign_datetime_to_binned_interval from lamp_py.runtime_utils.process_logger import ProcessLogger from lamp_py.runtime_utils.remote_files import LAMP @@ -54,11 +53,9 @@ def make_dummy_table(num_rows: int = 10) -> pyarrow.Table: pytest.param(60, datetime(2026, 5, 4, 2, 0), datetime(2026, 5, 4, 2, 0), id="60m-on-boundary"), ], ) -def test_interval_key(chunk_minutes: int, input_ts: datetime, expected_ts: datetime) -> None: - """_interval_key truncates timestamps to wall-clock-aligned interval starts.""" - c = make_converter(chunk_minutes) - # pylint: disable=protected-access - assert c._interval_key(input_ts) == expected_ts +def test_assign_datetime_to_binned_interval(chunk_minutes: int, input_ts: datetime, expected_ts: datetime) -> None: + """assign_datetime_to_binned_interval truncates timestamps to wall-clock-aligned interval starts.""" + assert assign_datetime_to_binned_interval(input_ts, chunk_minutes) == expected_ts @pytest.mark.parametrize( From a7b0a596bf16ae39330c830caf6782297f8d1b51 Mon Sep 17 00:00:00 2001 From: Henry <1400827+huangh@users.noreply.github.com> Date: Thu, 11 Jun 2026 23:51:03 -0400 Subject: [PATCH 06/12] test flush/non-flush cases of yield_check Co-authored-by: Corey Runkel <39202587+runkelcorey@users.noreply.github.com> --- tests/ingestion/test_yield_check_periodic.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/ingestion/test_yield_check_periodic.py b/tests/ingestion/test_yield_check_periodic.py index 8ff17c46..53a02588 100644 --- a/tests/ingestion/test_yield_check_periodic.py +++ b/tests/ingestion/test_yield_check_periodic.py @@ -132,7 +132,8 @@ def test_yield_check_periodic( assert list(c.data_parts.keys()) == expected_remaining_keys -def test_yield_check_periodic_skips_none_table() -> None: +@pytest.parametrize("flush", [True, False]) +def test_yield_check_periodic_skips_none_table(flush: bool) -> None: """Intervals with None table should not be yielded even when flush=True.""" c = make_converter(15) logger = ProcessLogger("test") @@ -141,8 +142,8 @@ def test_yield_check_periodic_skips_none_table() -> None: key = datetime(2026, 5, 4, 1, 15) c.data_parts[key] = TableData() # table is None by default - tables = list(c.yield_check_periodic(logger, datetime(2026, 5, 4, 0, 50), flush=True)) - assert len(tables) == 0 + tables = list(c.yield_check_periodic(logger, datetime(2026, 5, 4, 0, 50), flush=flush)) + assert len(tables) == 0 if flush else 1 def test_yield_check_periodic_archives_files() -> None: From 9e78c08e4e097c4018c0cd1f09fe9fd8fd8d8c3a Mon Sep 17 00:00:00 2001 From: Henry <1400827+huangh@users.noreply.github.com> Date: Thu, 11 Jun 2026 23:51:18 -0400 Subject: [PATCH 07/12] Update tests/ingestion/test_yield_check_periodic.py Co-authored-by: Corey Runkel <39202587+runkelcorey@users.noreply.github.com> --- tests/ingestion/test_yield_check_periodic.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/ingestion/test_yield_check_periodic.py b/tests/ingestion/test_yield_check_periodic.py index 53a02588..4fa383f9 100644 --- a/tests/ingestion/test_yield_check_periodic.py +++ b/tests/ingestion/test_yield_check_periodic.py @@ -146,9 +146,10 @@ def test_yield_check_periodic_skips_none_table(flush: bool) -> None: assert len(tables) == 0 if flush else 1 -def test_yield_check_periodic_archives_files() -> None: +@pytest.parametrize("move", [True, False]) +def test_yield_check_periodic_archives_files(move: bool) -> None: """Yielded intervals should move their files to archive_files.""" - c = make_converter(15, move_source_on_completion=True) + c = make_converter(15, move_source_on_completion=move) logger = ProcessLogger("test") logger.log_start() @@ -159,8 +160,8 @@ def test_yield_check_periodic_archives_files() -> None: list(c.yield_check_periodic(logger, datetime(2026, 5, 4, 1, 40))) - assert "a.json.gz" in c.archive_files - assert "b.json.gz" in c.archive_files + assert ("a.json.gz" in c.archive_files) == move + assert ("b.json.gz" in c.archive_files) == move def test_clean_local_folders_removes_oldest_day(tmp_path: Path) -> None: From da3f1bc4f7a5dcf6fb3aab464b611aa011bd22a6 Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Mon, 15 Jun 2026 20:24:50 -0400 Subject: [PATCH 08/12] fix pytest paramaterize invocation from suggestions --- tests/ingestion/test_yield_check_periodic.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tests/ingestion/test_yield_check_periodic.py b/tests/ingestion/test_yield_check_periodic.py index 4fa383f9..bb17fa9f 100644 --- a/tests/ingestion/test_yield_check_periodic.py +++ b/tests/ingestion/test_yield_check_periodic.py @@ -132,7 +132,7 @@ def test_yield_check_periodic( assert list(c.data_parts.keys()) == expected_remaining_keys -@pytest.parametrize("flush", [True, False]) +@pytest.mark.parametrize("flush", [True, False]) def test_yield_check_periodic_skips_none_table(flush: bool) -> None: """Intervals with None table should not be yielded even when flush=True.""" c = make_converter(15) @@ -146,7 +146,7 @@ def test_yield_check_periodic_skips_none_table(flush: bool) -> None: assert len(tables) == 0 if flush else 1 -@pytest.parametrize("move", [True, False]) +@pytest.mark.parametrize("move", [True, False]) def test_yield_check_periodic_archives_files(move: bool) -> None: """Yielded intervals should move their files to archive_files.""" c = make_converter(15, move_source_on_completion=move) @@ -165,7 +165,10 @@ def test_yield_check_periodic_archives_files(move: bool) -> None: def test_clean_local_folders_removes_oldest_day(tmp_path: Path) -> None: - """clean_local_folders should keep only the two newest day partitions.""" + """ + clean_local_folders should keep only the two newest day partitions. + Handles case when ingestion is down for a duration exceeding a couple days + """ c = GtfsRtFullPartitionConverter( config_type=ConfigType.RT_TRIP_UPDATES, metadata_queue=Queue(), From 7bc4a283d4ea8648bf2ac381139f385c9357ecb1 Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Mon, 15 Jun 2026 21:31:22 -0400 Subject: [PATCH 09/12] drop useless test --- tests/ingestion/test_yield_check_periodic.py | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/tests/ingestion/test_yield_check_periodic.py b/tests/ingestion/test_yield_check_periodic.py index bb17fa9f..02fdf46f 100644 --- a/tests/ingestion/test_yield_check_periodic.py +++ b/tests/ingestion/test_yield_check_periodic.py @@ -131,21 +131,6 @@ def test_yield_check_periodic( assert len(tables) == expected_yield_count assert list(c.data_parts.keys()) == expected_remaining_keys - -@pytest.mark.parametrize("flush", [True, False]) -def test_yield_check_periodic_skips_none_table(flush: bool) -> None: - """Intervals with None table should not be yielded even when flush=True.""" - c = make_converter(15) - logger = ProcessLogger("test") - logger.log_start() - - key = datetime(2026, 5, 4, 1, 15) - c.data_parts[key] = TableData() # table is None by default - - tables = list(c.yield_check_periodic(logger, datetime(2026, 5, 4, 0, 50), flush=flush)) - assert len(tables) == 0 if flush else 1 - - @pytest.mark.parametrize("move", [True, False]) def test_yield_check_periodic_archives_files(move: bool) -> None: """Yielded intervals should move their files to archive_files.""" From fb69a46b6269142b22ea3996d03b3fc13dc39961 Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Mon, 15 Jun 2026 21:31:46 -0400 Subject: [PATCH 10/12] black --- tests/ingestion/test_yield_check_periodic.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/ingestion/test_yield_check_periodic.py b/tests/ingestion/test_yield_check_periodic.py index 02fdf46f..8ad1b89b 100644 --- a/tests/ingestion/test_yield_check_periodic.py +++ b/tests/ingestion/test_yield_check_periodic.py @@ -131,6 +131,7 @@ def test_yield_check_periodic( assert len(tables) == expected_yield_count assert list(c.data_parts.keys()) == expected_remaining_keys + @pytest.mark.parametrize("move", [True, False]) def test_yield_check_periodic_archives_files(move: bool) -> None: """Yielded intervals should move their files to archive_files.""" From 3479c0fa4413cc301af888ed421bf29c64a89a7e Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Tue, 16 Jun 2026 10:46:55 -0400 Subject: [PATCH 11/12] bug: need to write the file partition to metadata so perf manager picks it up --- src/lamp_py/ingestion/convert_gtfs_rt_fullset.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/lamp_py/ingestion/convert_gtfs_rt_fullset.py b/src/lamp_py/ingestion/convert_gtfs_rt_fullset.py index 5c28be76..94673692 100644 --- a/src/lamp_py/ingestion/convert_gtfs_rt_fullset.py +++ b/src/lamp_py/ingestion/convert_gtfs_rt_fullset.py @@ -15,7 +15,7 @@ from lamp_py.ingestion.utils import assign_datetime_to_binned_interval from lamp_py.runtime_utils.process_logger import ProcessLogger -from lamp_py.runtime_utils.remote_files import LAMP, S3Location +from lamp_py.runtime_utils.remote_files import LAMP, S3_SPRINGBOARD, S3Location # pylint: disable=too-many-arguments,too-many-instance-attributes @@ -125,6 +125,10 @@ def convert(self) -> None: s3_path = os.path.join(self.remote_output_location.s3_uri, path_suffix) upload_file(local_path, s3_path) + # update the metadata table with the new s3 path for the converted file, + # so it can be picked up by the next stage of the pipeline + self.send_metadata(s3_path) + # try to get pyarrow to limit memory usage after each loop. # this is a "ask nicely and pray" move...we can't manage memory directly in python. pyarrow.default_memory_pool().release_unused() From a440de34af65283cb6edc76ed0f354e8c09116c4 Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Tue, 16 Jun 2026 13:34:08 -0400 Subject: [PATCH 12/12] write fullset data to a new partition, to keep the existing springboard expected structure the same. deduped data will be written to the standard springboard location --- src/lamp_py/ingestion/convert_gtfs_rt_fullset.py | 2 +- src/lamp_py/ingestion/ingest_gtfs.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/lamp_py/ingestion/convert_gtfs_rt_fullset.py b/src/lamp_py/ingestion/convert_gtfs_rt_fullset.py index 94673692..24c63660 100644 --- a/src/lamp_py/ingestion/convert_gtfs_rt_fullset.py +++ b/src/lamp_py/ingestion/convert_gtfs_rt_fullset.py @@ -15,7 +15,7 @@ from lamp_py.ingestion.utils import assign_datetime_to_binned_interval from lamp_py.runtime_utils.process_logger import ProcessLogger -from lamp_py.runtime_utils.remote_files import LAMP, S3_SPRINGBOARD, S3Location +from lamp_py.runtime_utils.remote_files import LAMP, S3Location # pylint: disable=too-many-arguments,too-many-instance-attributes diff --git a/src/lamp_py/ingestion/ingest_gtfs.py b/src/lamp_py/ingestion/ingest_gtfs.py index 2f95f79b..de17223c 100644 --- a/src/lamp_py/ingestion/ingest_gtfs.py +++ b/src/lamp_py/ingestion/ingest_gtfs.py @@ -100,7 +100,9 @@ def ingest_s3_files(metadata_queue: Queue[Optional[str]], bucket_filter: str = L converters[config_type] = GtfsRtFullPartitionConverter( config_type, metadata_queue, - remote_output_location=S3Location(S3_SPRINGBOARD, os.path.join(LAMP, str(config_type))), + remote_output_location=S3Location( + S3_SPRINGBOARD, os.path.join(LAMP, "FULLSET", str(config_type)) + ), move_source_on_completion=True, ) converters[config_type].add_files(file_group)