From b1e6dbd964e22501fa8574ba031d90e3639a9070 Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Wed, 18 Feb 2026 23:20:43 -0500 Subject: [PATCH 01/38] unnest function is doing too much - split into two methods for single responsiblity --- src/lamp_py/flashback/events.py | 71 ++++++++++++++++++++++++++++++++- 1 file changed, 69 insertions(+), 2 deletions(-) diff --git a/src/lamp_py/flashback/events.py b/src/lamp_py/flashback/events.py index 75ac4595..56ca78d4 100644 --- a/src/lamp_py/flashback/events.py +++ b/src/lamp_py/flashback/events.py @@ -4,10 +4,26 @@ import dataframely as dy import polars as pl -from lamp_py.ingestion.convert_gtfs_rt import VehiclePositions +from lamp_py.ingestion.convert_gtfs_rt import VehiclePositions, VehiclePositionsApiFormat from lamp_py.runtime_utils.process_logger import ProcessLogger + +class StopEventsWithStatus(dy.Schema): + """Flat events data, with additional information for determining stop departures.""" + + id = dy.String(primary_key=True) # trip-route-vehicle + timestamp = dy.Int64() + start_date = dy.String(nullable=True) + trip_id = dy.String() + direction_id = dy.Int8(min=0, max=1, nullable=True) + route_id = dy.String() + start_time = dy.String(nullable=True) + revenue = dy.Bool(nullable=True) + stop_id = dy.String(nullable=False) + current_stop_sequence = dy.Int16(primary_key=True) + current_status = dy.String(nullable=True) + class StopEventsTable(dy.Schema): """Flat events data, with additional information for determining stop departures.""" @@ -90,6 +106,56 @@ def unnest_vehicle_positions(vp: dy.DataFrame[VehiclePositions]) -> dy.DataFrame return valid +def unnest_vehicle_positions_new(vp: dy.DataFrame[VehiclePositionsApiFormat]) -> dy.DataFrame[StopEventsTable]: + """Unnest VehiclePositions data into flat table.""" + process_logger = ProcessLogger("unnest_vehicle_positions", input_rows=vp.height) + process_logger.log_start() + vehicle_positions = ( + vp.select("entity") + .explode("entity") + .unnest("entity") + .unnest("vehicle") + .unnest("trip") + ) + process_logger.log_complete() + + return vehicle_positions + +def vehicle_position_to_events(vp: pl.DataFrame) -> dy.DataFrame[StopEventsWithStatus]: + """ + Convert VehiclePositions data into StopEventsTable format, + extracting stop events based on current status and stop sequence. + """ + process_logger = ProcessLogger("unnest_vehicle_positions", input_rows=vp.height) + process_logger.log_start() + events = ( + vp.filter( + pl.col("current_stop_sequence").is_not_null(), + pl.col("trip_id").is_not_null(), + pl.col("timestamp").is_not_null(), + pl.col("route_id").is_not_null(), + ) + .select( + pl.concat_str(pl.col("trip_id"), pl.col("route_id"), pl.col("id"), separator="-").alias("id"), + "timestamp", + "start_date", + "trip_id", + "direction_id", + "route_id", + "start_time", + "revenue", + "stop_id", + "current_stop_sequence", + "current_status", + ) + ) + + valid = process_logger.log_dataframely_filter_results(*StopEventsWithStatus.filter(events, cast=True)) + + process_logger.log_complete() + + return valid + def update_records( existing_records: dy.DataFrame[StopEventsTable], @@ -145,7 +211,8 @@ def update_records( "latest_stopped_timestamp" ), # use value from new record ) - .filter(pl.col("arrived").is_not_null() | pl.col("departed").is_not_null()) # keep only stops with events + .filter(pl.col("arrived").is_not_null() | pl.col("departed").is_not_null()) # keep only stops with events + # maybe join schedule in to see when arrived is valid. ) valid = process_logger.log_dataframely_filter_results(*StopEventsTable.filter(combined, cast=True)) From 7da5a1aa2866f635a04f63ca233737298c379788 Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Wed, 18 Feb 2026 23:21:25 -0500 Subject: [PATCH 02/38] wip write out both new schema and old schema for vehicle positions to compare - ensure the same API read is used to write both for apples to apples --- src/lamp_py/flashback/io.py | 41 +++++++++++++++++++++++++++++++------ 1 file changed, 35 insertions(+), 6 deletions(-) diff --git a/src/lamp_py/flashback/io.py b/src/lamp_py/flashback/io.py index 7b6bd7d2..d2ddf822 100644 --- a/src/lamp_py/flashback/io.py +++ b/src/lamp_py/flashback/io.py @@ -4,8 +4,8 @@ import polars as pl from aiohttp import ClientError, ClientSession -from lamp_py.flashback.events import StopEventsJSON, StopEventsTable -from lamp_py.ingestion.convert_gtfs_rt import VehiclePositions +from lamp_py.flashback.events import StopEventsJSON, StopEventsWithStatus, StopEventsTable, unnest_vehicle_positions, unnest_vehicle_positions_new +from lamp_py.ingestion.convert_gtfs_rt import VehiclePositions, VehiclePositionsApiFormat from lamp_py.runtime_utils.process_logger import ProcessLogger from lamp_py.runtime_utils.remote_files import S3Location from lamp_py.runtime_utils.remote_files import stop_events as stop_events_location @@ -38,6 +38,34 @@ def get_remote_events(location: S3Location = stop_events_location) -> dy.DataFra return existing_events +async def get_vehicle_positions_new( + url: str = "https://cdn.mbta.com/realtime/VehiclePositions_enhanced.json", + sleep_interval: int = 3, + max_retries: int = 10, +) -> tuple[dy.DataFrame[StopEventsWithStatus], dy.DataFrame[VehiclePositions]]: + """Fetch the latest VehiclePositions data.""" + process_logger = ProcessLogger("get_vehicle_positions", url=url) + process_logger.log_start() + + async with ClientSession() as session: + for attempt in range(max_retries + 1): + try: + async with session.get(url) as response: + response.raise_for_status() + data = await response.read() + break + except ClientError as e: + process_logger.log_failure(e) + if attempt == max_retries: + raise ClientError(f"Maximum retries ({max_retries}) exceeded") from e + sleep(sleep_interval) + + vehicle_positions = pl.read_ndjson(data, schema=VehiclePositionsApiFormat.to_polars_schema()) + valid = process_logger.log_dataframely_filter_results(*VehiclePositionsApiFormat.filter(vehicle_positions)) + + process_logger.log_complete() + + return unnest_vehicle_positions_new(valid), unnest_vehicle_positions(valid) async def get_vehicle_positions( url: str = "https://cdn.mbta.com/realtime/VehiclePositions_enhanced.json", @@ -67,12 +95,13 @@ async def get_vehicle_positions( process_logger.log_complete() - return valid + return unnest_vehicle_positions(valid) -def write_stop_events(stop_events: dy.DataFrame[StopEventsJSON], location: S3Location = stop_events_location) -> None: +def write_stop_events(stop_events: dy.DataFrame[StopEventsJSON], location: S3Location = stop_events_location, local_override: str | None = None) -> None: """Write stop events to specified location.""" - process_logger = ProcessLogger("write_stop_events", s3_uri=location.s3_uri) + output_path = local_override or location.s3_uri + process_logger = ProcessLogger("write_stop_events", s3_uri=output_path) process_logger.log_start() - stop_events.write_parquet(location.s3_uri, compression_level=9, retries=5, use_pyarrow=True) + stop_events.write_json(output_path) process_logger.log_complete() From 1b399c91bcfd2cffdfd30aa08ba19f02f4282b29 Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Wed, 18 Feb 2026 23:21:53 -0500 Subject: [PATCH 03/38] rename service name to be flashback something, not ingestion --- src/lamp_py/flashback/pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lamp_py/flashback/pipeline.py b/src/lamp_py/flashback/pipeline.py index 82aca673..df08d112 100644 --- a/src/lamp_py/flashback/pipeline.py +++ b/src/lamp_py/flashback/pipeline.py @@ -41,7 +41,7 @@ def pipeline() -> None: signal(SIGTERM, handle_ecs_sigterm) # configure the environment - environ["SERVICE_NAME"] = "ingestion" + environ["SERVICE_NAME"] = "flashback_event_service" validate_environment( required_variables=[ From b6e01f98af8120b1ab6e2a52340ff8894bcecbc1 Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Wed, 18 Feb 2026 23:22:34 -0500 Subject: [PATCH 04/38] split up wire API format with internal representation of a vehicle position. should generate the pyarrow schem with the dyschema as well to de-duplicate --- src/lamp_py/ingestion/convert_gtfs_rt.py | 51 +++++++++++++++++++++++- 1 file changed, 49 insertions(+), 2 deletions(-) diff --git a/src/lamp_py/ingestion/convert_gtfs_rt.py b/src/lamp_py/ingestion/convert_gtfs_rt.py index 4e5f1fe9..6af96efe 100644 --- a/src/lamp_py/ingestion/convert_gtfs_rt.py +++ b/src/lamp_py/ingestion/convert_gtfs_rt.py @@ -61,9 +61,56 @@ from lamp_py.utils.filter_bank import FilterBankRtTripUpdates -class VehiclePositions(dy.Schema): - """Structured VehiclePositions message.""" +class VehiclePositionsApiFormat(dy.Schema): + """Api Format of VehiclePositions message.""" + entity = dy.List( + inner=dy.Struct( + { + "id": dy.String(primary_key=True), + "vehicle": dy.Struct( + inner={ + "trip": dy.Struct( + inner={ + "trip_id": dy.String(nullable=True), + "route_id": dy.String(nullable=True), + "direction_id": dy.Int8(min=0, max=1, nullable=True), + "start_time": dy.String(nullable=True), + "start_date": dy.String(nullable=True), + "revenue": dy.Bool(nullable=True), + "last_trip": dy.Bool(nullable=True), + "schedule_relationship": dy.String(nullable=True), + } + ), + "vehicle": dy.Struct( + inner={ + "id": dy.String(nullable=True), + "label": dy.String(nullable=True), + } + ), + "position": dy.Struct( + inner={ + "bearing": dy.UInt16(nullable=True), + "latitude": dy.Float64(nullable=True), + "longitude": dy.Float64(nullable=True), + "speed": dy.Float64(nullable=True), + } + ), + "current_stop_sequence": dy.Int16(nullable=True), + "stop_id": dy.String(nullable=True), + "timestamp": dy.Int64(nullable=True), + "occupancy_status": dy.String(nullable=True), + "occupancy_percentage": dy.UInt32(nullable=True), + "current_status": dy.String(nullable=True), + } + ), + }, + alias="vehicle", + ), + nullable=False, + ) +class VehiclePositions(dy.Schema): + """Api Format of VehiclePositions message.""" entity = dy.List( inner=dy.Struct( { From dbc5ebf71857464f669d60faea35245cf91d718f Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Wed, 25 Feb 2026 04:38:46 -0500 Subject: [PATCH 05/38] added refactor of vehicle position processing - split aggregate and filter functions out --- src/lamp_py/flashback/events.py | 137 ++++++++++++++++++++++++-------- 1 file changed, 105 insertions(+), 32 deletions(-) diff --git a/src/lamp_py/flashback/events.py b/src/lamp_py/flashback/events.py index 56ca78d4..cd7822c0 100644 --- a/src/lamp_py/flashback/events.py +++ b/src/lamp_py/flashback/events.py @@ -8,7 +8,6 @@ from lamp_py.runtime_utils.process_logger import ProcessLogger - class StopEventsWithStatus(dy.Schema): """Flat events data, with additional information for determining stop departures.""" @@ -24,6 +23,7 @@ class StopEventsWithStatus(dy.Schema): current_stop_sequence = dy.Int16(primary_key=True) current_status = dy.String(nullable=True) + class StopEventsTable(dy.Schema): """Flat events data, with additional information for determining stop departures.""" @@ -106,55 +106,128 @@ def unnest_vehicle_positions(vp: dy.DataFrame[VehiclePositions]) -> dy.DataFrame return valid + def unnest_vehicle_positions_new(vp: dy.DataFrame[VehiclePositionsApiFormat]) -> dy.DataFrame[StopEventsTable]: """Unnest VehiclePositions data into flat table.""" process_logger = ProcessLogger("unnest_vehicle_positions", input_rows=vp.height) process_logger.log_start() - vehicle_positions = ( - vp.select("entity") - .explode("entity") - .unnest("entity") - .unnest("vehicle") - .unnest("trip") - ) + vehicle_positions = vp.select("entity").explode("entity").unnest("entity").unnest("vehicle").unnest("trip") process_logger.log_complete() return vehicle_positions -def vehicle_position_to_events(vp: pl.DataFrame) -> dy.DataFrame[StopEventsWithStatus]: + +def vehicle_position_to_raw_events(vp: pl.DataFrame) -> pl.DataFrame: """ Convert VehiclePositions data into StopEventsTable format, extracting stop events based on current status and stop sequence. """ process_logger = ProcessLogger("unnest_vehicle_positions", input_rows=vp.height) process_logger.log_start() - events = ( - vp.filter( - pl.col("current_stop_sequence").is_not_null(), - pl.col("trip_id").is_not_null(), - pl.col("timestamp").is_not_null(), - pl.col("route_id").is_not_null(), - ) - .select( - pl.concat_str(pl.col("trip_id"), pl.col("route_id"), pl.col("id"), separator="-").alias("id"), - "timestamp", - "start_date", - "trip_id", - "direction_id", - "route_id", - "start_time", - "revenue", - "stop_id", - "current_stop_sequence", - "current_status", + events = vp.filter( + pl.col("current_stop_sequence").is_not_null(), + pl.col("trip_id").is_not_null(), + pl.col("timestamp").is_not_null(), + pl.col("route_id").is_not_null(), + ).select( + pl.concat_str(pl.col("trip_id"), pl.col("route_id"), pl.col("id"), separator="-").alias("id"), + "timestamp", + "start_date", + "trip_id", + "direction_id", + "route_id", + "start_time", + "revenue", + "stop_id", + "current_stop_sequence", + "current_status", + ) + + return events + + +def aggregate_duration_with_new_records( + existing_records: pl.DataFrame, + new_records: pl.DataFrame, +) -> pl.DataFrame: + """ + Recalculate derived duration fields for stop events based on status changes. + + Merges existing and new stop event records, groups them by vehicle ID and stop + sequence, and calculates the timestamp when each status began and ended. Returns + only records that pass StopEventsWithStatus validation. + + Args: + existing_records: DataFrame of previously processed stop events with status information. + new_records: DataFrame of newly received stop events with status information. + max_record_age: Maximum age threshold for records (currently logged but not actively used in filtering). + + Returns: + DataFrame of stop events with validated derived duration fields (status_start_timestamp + and status_end_timestamp where applicable). + + Note: + Records are sorted by timestamp and grouped by vehicle ID, stop sequence, and current status. + Status end timestamp is only set when the first and last timestamp within a group differ. + """ + process_logger = ProcessLogger( + "aggregate_duration_with_new_records", + existing_records=existing_records.height, + ) + process_logger.log_start() + all_events = pl.concat([existing_records, new_records], how="diagonal") + + combined = ( + all_events.sort(by="timestamp") + .group_by("id", "current_stop_sequence", "current_status") + .agg( + [ + pl.first("timestamp").alias("status_start_timestamp"), + pl.when(pl.first("timestamp").ne(pl.last("timestamp"))).then( + pl.last("timestamp").alias("status_end_timestamp") + ), + ] ) + ).join( + all_events.select( + ["id", "direction_id", "revenue", "route_id", "start_date", "start_time", "stop_id", "timestamp", "trip_id"] + ), + on="id", + how="left", ) - valid = process_logger.log_dataframely_filter_results(*StopEventsWithStatus.filter(events, cast=True)) + process_logger.add_metadata(new_records=new_records.height, updated_records=combined.height) process_logger.log_complete() - return valid + return combined + + +def filter_stop_events( + compressed_events: pl.DataFrame, + max_record_age: timedelta, +) -> dy.DataFrame[StopEventsTable]: + """ + take compressed events and take only stopped_at events, + and rename the status start and end periods to stop event schema format + """ + + return ( + compressed_events.filter( + (pl.col("current_status") == "STOPPED_AT") + & (pl.col("status_start_timestamp").is_not_null() | pl.col("status_end_timestamp").is_not_null()) + & ( + datetime.now(tz=ZoneInfo("America/New_York")) + - pl.from_epoch("timestamp").dt.replace_time_zone( + "America/New_York", ambiguous="latest", non_existent="null" + ) + < max_record_age + ) # remove records that are older than max_record_age - flashback usecase only requires max_record_age history + ) + .drop("current_status") + .sort("id", "current_stop_sequence") + .rename({"status_start_timestamp": "arrived", "status_end_timestamp": "departed"}) + ) def update_records( @@ -211,8 +284,8 @@ def update_records( "latest_stopped_timestamp" ), # use value from new record ) - .filter(pl.col("arrived").is_not_null() | pl.col("departed").is_not_null()) # keep only stops with events - # maybe join schedule in to see when arrived is valid. + .filter(pl.col("arrived").is_not_null() | pl.col("departed").is_not_null()) # keep only stops with events + # maybe join schedule in to see when arrived is valid. ) valid = process_logger.log_dataframely_filter_results(*StopEventsTable.filter(combined, cast=True)) From 5952f7c5d78ad7eb064e7c234b2f183cd912bc9f Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Wed, 25 Feb 2026 04:39:15 -0500 Subject: [PATCH 06/38] remove write_stop_events - function can be consolidated in lambda --- src/lamp_py/flashback/io.py | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/src/lamp_py/flashback/io.py b/src/lamp_py/flashback/io.py index d2ddf822..428ba151 100644 --- a/src/lamp_py/flashback/io.py +++ b/src/lamp_py/flashback/io.py @@ -4,7 +4,13 @@ import polars as pl from aiohttp import ClientError, ClientSession -from lamp_py.flashback.events import StopEventsJSON, StopEventsWithStatus, StopEventsTable, unnest_vehicle_positions, unnest_vehicle_positions_new +from lamp_py.flashback.events import ( + StopEventsJSON, + StopEventsWithStatus, + StopEventsTable, + unnest_vehicle_positions, + unnest_vehicle_positions_new, +) from lamp_py.ingestion.convert_gtfs_rt import VehiclePositions, VehiclePositionsApiFormat from lamp_py.runtime_utils.process_logger import ProcessLogger from lamp_py.runtime_utils.remote_files import S3Location @@ -38,11 +44,12 @@ def get_remote_events(location: S3Location = stop_events_location) -> dy.DataFra return existing_events -async def get_vehicle_positions_new( + +async def get_vehicle_positions( url: str = "https://cdn.mbta.com/realtime/VehiclePositions_enhanced.json", sleep_interval: int = 3, max_retries: int = 10, -) -> tuple[dy.DataFrame[StopEventsWithStatus], dy.DataFrame[VehiclePositions]]: +) -> dy.DataFrame[StopEventsWithStatus]: """Fetch the latest VehiclePositions data.""" process_logger = ProcessLogger("get_vehicle_positions", url=url) process_logger.log_start() @@ -62,18 +69,19 @@ async def get_vehicle_positions_new( vehicle_positions = pl.read_ndjson(data, schema=VehiclePositionsApiFormat.to_polars_schema()) valid = process_logger.log_dataframely_filter_results(*VehiclePositionsApiFormat.filter(vehicle_positions)) - + process_logger.log_complete() - return unnest_vehicle_positions_new(valid), unnest_vehicle_positions(valid) + return unnest_vehicle_positions_new(valid) -async def get_vehicle_positions( + +async def get_vehicle_positions_old( url: str = "https://cdn.mbta.com/realtime/VehiclePositions_enhanced.json", sleep_interval: int = 3, max_retries: int = 10, ) -> dy.DataFrame[VehiclePositions]: """Fetch the latest VehiclePositions data.""" - process_logger = ProcessLogger("get_vehicle_positions", url=url) + process_logger = ProcessLogger("get_vehicle_positions_old", url=url) process_logger.log_start() async with ClientSession() as session: @@ -96,12 +104,3 @@ async def get_vehicle_positions( process_logger.log_complete() return unnest_vehicle_positions(valid) - - -def write_stop_events(stop_events: dy.DataFrame[StopEventsJSON], location: S3Location = stop_events_location, local_override: str | None = None) -> None: - """Write stop events to specified location.""" - output_path = local_override or location.s3_uri - process_logger = ProcessLogger("write_stop_events", s3_uri=output_path) - process_logger.log_start() - stop_events.write_json(output_path) - process_logger.log_complete() From f230a96cce8b9e9f2b20dde7f8da3378d90c9bbe Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Wed, 25 Feb 2026 04:40:05 -0500 Subject: [PATCH 07/38] add local_override for flashback method for local output and testing. implement new processing in parallel with existing update_record processing --- src/lamp_py/flashback/pipeline.py | 50 ++++++++++++++++++++++++++----- 1 file changed, 43 insertions(+), 7 deletions(-) diff --git a/src/lamp_py/flashback/pipeline.py b/src/lamp_py/flashback/pipeline.py index df08d112..19f77d81 100644 --- a/src/lamp_py/flashback/pipeline.py +++ b/src/lamp_py/flashback/pipeline.py @@ -4,29 +4,65 @@ from signal import SIGTERM, signal import dataframely as dy +import polars as pl from lamp_py.aws.ecs import handle_ecs_sigterm -from lamp_py.flashback.events import StopEventsTable, structure_stop_events, unnest_vehicle_positions, update_records -from lamp_py.flashback.io import get_remote_events, get_vehicle_positions, write_stop_events +from lamp_py.flashback.events import ( + StopEventsTable, + filter_stop_events, + structure_stop_events, + aggregate_duration_with_new_records, + vehicle_position_to_raw_events, +) +from lamp_py.flashback.io import get_remote_events, get_vehicle_positions from lamp_py.runtime_utils.env_validation import validate_environment from lamp_py.runtime_utils.process_logger import ProcessLogger +from lamp_py.runtime_utils.remote_files import stop_events as stop_events_location async def flashback( - remote_events: dy.DataFrame[StopEventsTable], max_record_age: timedelta = timedelta(hours=2) + remote_events: dy.DataFrame[StopEventsTable], + max_record_age: timedelta = timedelta(hours=2), + local_override: str | None = None, ) -> None: """Fetch, process, and store stop events.""" + all_events = remote_events existing_events = remote_events + while True: process_logger = ProcessLogger("flashback") process_logger.log_start() + + # vehicle positions flattened, entire message new_records = await get_vehicle_positions() - stop_events = update_records(existing_events, unnest_vehicle_positions(new_records), max_record_age) + # new processing + # vehicle positions validated and filtered down to columns of interest + new_events = vehicle_position_to_raw_events(new_records) + + # consolidate records with same stop status and sequence - generate start/stop time for each status type + compressed_events = aggregate_duration_with_new_records(all_events, new_events) + + # generate flashback events for from stop records + compressed_stop_events = filter_stop_events(compressed_events, max_record_age) + + output_path = local_override or stop_events_location.s3_uri + process_logger.add_metadata(write_path=output_path) + + await asyncio.to_thread(lambda: structure_stop_events(compressed_stop_events).write_parquet(output_path)) + + # old processing + + # stop_events = update_records(existing_events, new_records, max_record_age) + + # existing_events = stop_events - existing_events = stop_events + # await asyncio.to_thread(lambda: structure_stop_events(stop_events).write_json, "old.json") - await asyncio.to_thread(lambda: write_stop_events(structure_stop_events(stop_events))) + # # timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + # timestamp = "2_hr_test" + # all_events.write_parquet(f"/tmp/stop_events_hh_{timestamp}.parquet") + # all_events.write_parquet(f"/tmp/all_events_hh_{timestamp}.parquet") process_logger.log_complete() @@ -49,4 +85,4 @@ def pipeline() -> None: ], ) - asyncio.run(flashback(get_remote_events())) + asyncio.run(flashback(get_remote_events(), local_override="local_stop_events.parquet")) From bac8c5b86a6d5373bb91a90fc2eb511288ba725e Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Wed, 25 Feb 2026 04:40:42 -0500 Subject: [PATCH 08/38] temp handle function name changes --- tests/flashback/test_events.py | 6 +++--- tests/flashback/test_io.py | 28 +++++++++++++++------------- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/tests/flashback/test_events.py b/tests/flashback/test_events.py index f6f52c23..0f216c63 100644 --- a/tests/flashback/test_events.py +++ b/tests/flashback/test_events.py @@ -6,7 +6,7 @@ import pytest from lamp_py.flashback.events import StopEventsTable, structure_stop_events, unnest_vehicle_positions, update_records -from lamp_py.ingestion.convert_gtfs_rt import VehiclePositions +from lamp_py.ingestion.convert_gtfs_rt import VehiclePositionsApiFormat @pytest.mark.parametrize( @@ -116,8 +116,8 @@ ) def test_unnest_vehicle_positions(entity: list[dict], valid_records: int) -> None: """It gracefully handles missing and complete data alike.""" - vp = VehiclePositions.validate( - pl.DataFrame([pl.Series(name="entity", values=[entity], dtype=VehiclePositions.entity.dtype)]) + vp = VehiclePositionsApiFormat.validate( + pl.DataFrame([pl.Series(name="entity", values=[entity], dtype=VehiclePositionsApiFormat.entity.dtype)]) ) df = unnest_vehicle_positions(vp) assert df.height == valid_records diff --git a/tests/flashback/test_io.py b/tests/flashback/test_io.py index bd9550ad..8626095b 100644 --- a/tests/flashback/test_io.py +++ b/tests/flashback/test_io.py @@ -12,16 +12,18 @@ from polars.testing import assert_frame_equal from lamp_py.flashback.events import StopEventsJSON -from lamp_py.flashback.io import get_remote_events, get_vehicle_positions, write_stop_events -from lamp_py.ingestion.convert_gtfs_rt import VehiclePositions +from lamp_py.flashback.io import get_remote_events, get_vehicle_positions_old, write_dataframe +from lamp_py.ingestion.convert_gtfs_rt import VehiclePositionsApiFormat from tests.test_resources import LocalS3Location @pytest.fixture(name="mock_vp_response") -def fixture_mock_vp_response(tmp_path: Path) -> Callable[[dy.DataFrame[VehiclePositions]], tuple[AsyncMock, bytes]]: +def fixture_mock_vp_response( + tmp_path: Path, +) -> Callable[[dy.DataFrame[VehiclePositionsApiFormat]], tuple[AsyncMock, bytes]]: """Create mocked vehicle positions HTTP responses from dataframe.""" - def _create(vp: dy.DataFrame[VehiclePositions]) -> tuple[AsyncMock, bytes]: + def _create(vp: dy.DataFrame[VehiclePositionsApiFormat]) -> tuple[AsyncMock, bytes]: json_file = tmp_path.joinpath("test.json") vp.write_ndjson(json_file) @@ -115,13 +117,13 @@ async def test_get_vehicle_positions( mock_sleep: AsyncMock, mock_get: AsyncMock, dy_gen: dy.random.Generator, - mock_vp_response: Callable[[dy.DataFrame[VehiclePositions]], tuple[AsyncMock, bytes]], + mock_vp_response: Callable[[dy.DataFrame[VehiclePositionsApiFormat]], tuple[AsyncMock, bytes]], num_failures: int, max_retries: int, caplog: pytest.LogCaptureFixture, ) -> None: """It gracefully handles (successive) non-200 responses.""" - vp = VehiclePositions.sample(generator=dy_gen) + vp = VehiclePositionsApiFormat.sample(generator=dy_gen) success_response, _ = mock_vp_response(vp) # Create mock error response @@ -133,9 +135,9 @@ async def test_get_vehicle_positions( # If too many retries, expect ClientError if num_failures >= max_retries: with pytest.raises(ClientError): - await get_vehicle_positions(max_retries=max_retries) + await get_vehicle_positions_old(max_retries=max_retries) else: - df = await get_vehicle_positions(max_retries=max_retries) + df = await get_vehicle_positions_old(max_retries=max_retries) assert df.height == 1 assert mock_sleep.call_count == num_failures @@ -175,7 +177,7 @@ async def test_get_vehicle_positions( async def test_invalid_vehicle_positions_schema( mock_get: AsyncMock, dy_gen: dy.random.Generator, - mock_vp_response: Callable[[dy.DataFrame[VehiclePositions]], tuple[AsyncMock, bytes]], + mock_vp_response: Callable[[dy.DataFrame[VehiclePositionsApiFormat]], tuple[AsyncMock, bytes]], overrides: dict[str, pl.Expr], expected_rows: int, raises_error: pytest.RaisesExc, @@ -183,12 +185,12 @@ async def test_invalid_vehicle_positions_schema( caplog: pytest.LogCaptureFixture, ) -> None: """It filters out events that don't comply with the schema.""" - vp = VehiclePositions.sample(generator=dy_gen).with_columns(**overrides) + vp = VehiclePositionsApiFormat.sample(generator=dy_gen).with_columns(**overrides) mock_response, _ = mock_vp_response(vp) # type: ignore[arg-type] mock_get.return_value.__aenter__.return_value = mock_response with raises_error: - df = await get_vehicle_positions() + df = await get_vehicle_positions_old() assert df.height == expected_rows assert has_invalid_records == (WARNING in [record[1] for record in caplog.record_tuples]) @@ -212,9 +214,9 @@ def test_write_stop_events( # Simulate persistent write failure with patch("polars.DataFrame.write_parquet", side_effect=OSError("S3 write error")): with pytest.raises(OSError): - write_stop_events(stop_events, test_location) + write_dataframe(stop_events, test_location) else: - write_stop_events(stop_events, test_location) + write_dataframe(stop_events, test_location) # Verify file was written successfully assert Path(test_location.s3_uri).exists() From b09ebb9eefadaea0610c06e39af0b7b049abcaf8 Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Wed, 25 Feb 2026 04:40:57 -0500 Subject: [PATCH 09/38] black --- src/lamp_py/ingestion/convert_gtfs_rt.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/lamp_py/ingestion/convert_gtfs_rt.py b/src/lamp_py/ingestion/convert_gtfs_rt.py index 6af96efe..11857cf5 100644 --- a/src/lamp_py/ingestion/convert_gtfs_rt.py +++ b/src/lamp_py/ingestion/convert_gtfs_rt.py @@ -63,6 +63,7 @@ class VehiclePositionsApiFormat(dy.Schema): """Api Format of VehiclePositions message.""" + entity = dy.List( inner=dy.Struct( { @@ -109,8 +110,10 @@ class VehiclePositionsApiFormat(dy.Schema): nullable=False, ) + class VehiclePositions(dy.Schema): """Api Format of VehiclePositions message.""" + entity = dy.List( inner=dy.Struct( { From e69569fe2d08e54e1f3f486b0bb001b7b9d3429e Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Wed, 25 Feb 2026 05:23:22 -0500 Subject: [PATCH 10/38] allow pipeline to pass in local override argument --- runners/flashback.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/flashback.py b/runners/flashback.py index 7b338cd8..a55b1a9f 100644 --- a/runners/flashback.py +++ b/runners/flashback.py @@ -1,3 +1,3 @@ from lamp_py.flashback.pipeline import pipeline -pipeline() +pipeline(local_override="local_stop_events.parquet") From 7270a4d16fd2b81c382b6ccb706b420d3d5795b3 Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Wed, 25 Feb 2026 05:23:55 -0500 Subject: [PATCH 11/38] cleanup hierarchy of schemas. remove unused methods --- src/lamp_py/flashback/events.py | 210 +++++++++----------------------- 1 file changed, 60 insertions(+), 150 deletions(-) diff --git a/src/lamp_py/flashback/events.py b/src/lamp_py/flashback/events.py index cd7822c0..7e92e2d4 100644 --- a/src/lamp_py/flashback/events.py +++ b/src/lamp_py/flashback/events.py @@ -8,12 +8,12 @@ from lamp_py.runtime_utils.process_logger import ProcessLogger -class StopEventsWithStatus(dy.Schema): - """Flat events data, with additional information for determining stop departures.""" +class VehicleEvents(dy.Schema): + """Vehicle Position raw events to be de-duplicated into actual events""" - id = dy.String(primary_key=True) # trip-route-vehicle + id = dy.String(primary_key=True) # start_date-trip-route-vehicle timestamp = dy.Int64() - start_date = dy.String(nullable=True) + start_date = dy.String(nullable=False) trip_id = dy.String() direction_id = dy.Int8(min=0, max=1, nullable=True) route_id = dy.String() @@ -22,24 +22,27 @@ class StopEventsWithStatus(dy.Schema): stop_id = dy.String(nullable=False) current_stop_sequence = dy.Int16(primary_key=True) current_status = dy.String(nullable=True) + status_start_timestamp = dy.Int64(nullable=True) + status_end_timestamp = dy.Int64(nullable=True) -class StopEventsTable(dy.Schema): - """Flat events data, with additional information for determining stop departures.""" +class VehicleStopEvents(dy.Schema): + """Vehicle Position raw events to be de-duplicated into actual events""" - id = dy.String(primary_key=True) # trip-route-vehicle + id = dy.String(primary_key=True) # start_date-trip-route-vehicle timestamp = dy.Int64() - start_date = dy.String(nullable=True) - trip_id = dy.String() - direction_id = dy.Int8(min=0, max=1, nullable=True) - route_id = dy.String() - start_time = dy.String(nullable=True) - revenue = dy.Bool(nullable=True) - stop_id = dy.String(nullable=False) - current_stop_sequence = dy.Int16(primary_key=True) - arrived = dy.Int64(nullable=True) - departed = dy.Int64(nullable=True) - latest_stopped_timestamp = dy.Int64(nullable=True) + start_date = dy.String(nullable=False) + trip_id = VehicleEvents.trip_id + direction_id = VehicleEvents.direction_id + route_id = VehicleEvents.route_id + start_time = VehicleEvents.start_time + revenue = VehicleEvents.revenue + stop_id = VehicleEvents.stop_id + current_stop_sequence = VehicleEvents.current_stop_sequence + # remove current status + # renamed status start and stop to arrival and departure for stop events schema + arrival = VehicleEvents.status_start_timestamp + departure = VehicleEvents.status_end_timestamp class StopEventsJSON(dy.Schema): @@ -47,67 +50,25 @@ class StopEventsJSON(dy.Schema): id = dy.String(primary_key=True) timestamp = dy.Int64() - start_date = StopEventsTable.start_date - trip_id = StopEventsTable.trip_id - direction_id = StopEventsTable.direction_id - route_id = StopEventsTable.route_id - start_time = StopEventsTable.start_time - revenue = StopEventsTable.revenue + start_date = VehicleStopEvents.start_date + trip_id = VehicleStopEvents.trip_id + direction_id = VehicleStopEvents.direction_id + route_id = VehicleStopEvents.route_id + start_time = VehicleStopEvents.start_time + revenue = VehicleStopEvents.revenue stop_events = dy.List( dy.Struct( inner={ - "stop_id": StopEventsTable.stop_id, - "current_stop_sequence": dy.Int16(), - "arrived": StopEventsTable.arrived, - "departed": StopEventsTable.departed, + "stop_id": VehicleStopEvents.stop_id, + "current_stop_sequence": VehicleStopEvents.current_stop_sequence, + "arrived": VehicleStopEvents.arrival, + "departed": VehicleStopEvents.departure, } ) ) -def unnest_vehicle_positions(vp: dy.DataFrame[VehiclePositions]) -> dy.DataFrame[StopEventsTable]: - """Unnest VehiclePositions data into flat table.""" - process_logger = ProcessLogger("unnest_vehicle_positions", input_rows=vp.height) - process_logger.log_start() - events = ( - vp.select("entity") - .explode("entity") - .unnest("entity") - .unnest("vehicle") - .unnest("trip") - .filter( - pl.col("current_stop_sequence").is_not_null(), - pl.col("trip_id").is_not_null(), - pl.col("timestamp").is_not_null(), - pl.col("route_id").is_not_null(), - ) - .select( - pl.concat_str(pl.col("trip_id"), pl.col("route_id"), pl.col("id"), separator="-").alias("id"), - "timestamp", - "start_date", - "trip_id", - "direction_id", - "route_id", - "start_time", - "revenue", - "stop_id", - "current_stop_sequence", - pl.when(pl.col("current_status").eq("STOPPED_AT")).then(pl.col("timestamp")).alias("arrived"), - pl.lit(None).alias("departed"), # for schema adherence - pl.when(pl.col("current_status").eq("STOPPED_AT")) - .then(pl.col("timestamp")) - .alias("latest_stopped_timestamp"), - ) - ) - - valid = process_logger.log_dataframely_filter_results(*StopEventsTable.filter(events, cast=True)) - - process_logger.log_complete() - - return valid - - -def unnest_vehicle_positions_new(vp: dy.DataFrame[VehiclePositionsApiFormat]) -> dy.DataFrame[StopEventsTable]: +def unnest_vehicle_positions(vp: dy.DataFrame[VehiclePositionsApiFormat]) -> dy.DataFrame[VehiclePositions]: """Unnest VehiclePositions data into flat table.""" process_logger = ProcessLogger("unnest_vehicle_positions", input_rows=vp.height) process_logger.log_start() @@ -117,20 +78,35 @@ def unnest_vehicle_positions_new(vp: dy.DataFrame[VehiclePositionsApiFormat]) -> return vehicle_positions -def vehicle_position_to_raw_events(vp: pl.DataFrame) -> pl.DataFrame: +def vehicle_position_to_archive_events(vp: dy.DataFrame[VehiclePositions]) -> dy.DataFrame[VehicleEvents]: """ - Convert VehiclePositions data into StopEventsTable format, - extracting stop events based on current status and stop sequence. + Convert VehiclePositions data into VehicleEvents format. + + Filters vehicle position records to include only those with valid stop sequences, + trip IDs, timestamps, and route IDs. Generates a composite ID from start_date, + trip_id, route_id, and vehicle id, then selects relevant columns for event archival. + + Args: + vp: A DataFrame containing VehiclePositions data with required columns: + current_stop_sequence, trip_id, timestamp, route_id, start_date, id, + direction_id, start_time, revenue, stop_id, and current_status. + + Returns: + A DataFrame[VehicleEvents] containing filtered vehicle position data with + a composite ID and selected columns for downstream event processing. """ - process_logger = ProcessLogger("unnest_vehicle_positions", input_rows=vp.height) + process_logger = ProcessLogger("vehicle_position_to_archive_events", input_rows=vp.height) process_logger.log_start() events = vp.filter( pl.col("current_stop_sequence").is_not_null(), pl.col("trip_id").is_not_null(), pl.col("timestamp").is_not_null(), pl.col("route_id").is_not_null(), + pl.col("start_date").is_not_null(), ).select( - pl.concat_str(pl.col("trip_id"), pl.col("route_id"), pl.col("id"), separator="-").alias("id"), + pl.concat_str(pl.col("start_date"), pl.col("trip_id"), pl.col("route_id"), pl.col("id"), separator="-").alias( + "id" + ), "timestamp", "start_date", "trip_id", @@ -147,9 +123,9 @@ def vehicle_position_to_raw_events(vp: pl.DataFrame) -> pl.DataFrame: def aggregate_duration_with_new_records( - existing_records: pl.DataFrame, - new_records: pl.DataFrame, -) -> pl.DataFrame: + existing_records: dy.DataFrame[VehicleEvents], + new_records: dy.DataFrame[VehicleEvents], +) -> dy.DataFrame[VehicleEvents]: """ Recalculate derived duration fields for stop events based on status changes. @@ -195,18 +171,19 @@ def aggregate_duration_with_new_records( on="id", how="left", ) + valid = process_logger.log_dataframely_filter_results(*VehicleEvents.filter(combined, cast=True)) process_logger.add_metadata(new_records=new_records.height, updated_records=combined.height) process_logger.log_complete() - return combined + return valid def filter_stop_events( - compressed_events: pl.DataFrame, + compressed_events: dy.DataFrame[VehicleEvents], max_record_age: timedelta, -) -> dy.DataFrame[StopEventsTable]: +) -> dy.DataFrame[VehicleStopEvents]: """ take compressed events and take only stopped_at events, and rename the status start and end periods to stop event schema format @@ -230,74 +207,7 @@ def filter_stop_events( ) -def update_records( - existing_records: dy.DataFrame[StopEventsTable], - new_records: dy.DataFrame[StopEventsTable], - max_record_age: timedelta, -) -> dy.DataFrame[StopEventsTable]: - """Return a DataFrame of recent stops using VehiclePositions.""" - process_logger = ProcessLogger( - "update_records", existing_records=existing_records.height, max_record_age=str(max_record_age) - ) - process_logger.log_start() - - combined = ( - existing_records.filter( # remove old records - datetime.now(tz=ZoneInfo("America/New_York")) - - pl.from_epoch("timestamp").dt.replace_time_zone( - "America/New_York", ambiguous="latest", non_existent="null" - ) - < max_record_age - ) - .join(new_records, on=["id", "current_stop_sequence"], how="full", coalesce=True) - .select( - "id", - "current_stop_sequence", - *[ - pl.coalesce(col, f"{col}_right").alias(col) - for col in [ - "start_date", - "trip_id", - "direction_id", - "route_id", - "start_time", - "revenue", - "stop_id", - "arrived", - ] - ], - pl.coalesce( - pl.when( # if the trip has moved past this stop sequence, set departed to latest_stopped_timestamp - pl.col("current_stop_sequence").max().over("id").gt(pl.col("current_stop_sequence")) - ).then(pl.col("latest_stopped_timestamp")), - "departed", - ).alias("departed"), - pl.coalesce( - pl.when( # if departure is updated, then also update timestamp - pl.col("current_stop_sequence").max().over("id").gt(pl.col("current_stop_sequence")), - pl.col("departed").is_null(), - ).then(pl.col("timestamp_right").max().over("id")), - "timestamp", - "timestamp_right", - ).alias("timestamp"), - pl.coalesce("latest_stopped_timestamp_right", "latest_stopped_timestamp").alias( - "latest_stopped_timestamp" - ), # use value from new record - ) - .filter(pl.col("arrived").is_not_null() | pl.col("departed").is_not_null()) # keep only stops with events - # maybe join schedule in to see when arrived is valid. - ) - - valid = process_logger.log_dataframely_filter_results(*StopEventsTable.filter(combined, cast=True)) - - process_logger.add_metadata(new_records=new_records.height, updated_records=combined.height) - - process_logger.log_complete() - - return valid - - -def structure_stop_events(df: dy.DataFrame[StopEventsTable]) -> dy.DataFrame[StopEventsJSON]: +def structure_stop_events(df: dy.DataFrame[VehicleStopEvents]) -> dy.DataFrame[StopEventsJSON]: """Structure flat table into StopEvents records.""" process_logger = ProcessLogger("structure_stop_events", input_rows=df.height) stop_events = df.group_by("id").agg( From eac5eb4a838bfb3a51096a41612b752fb4642b51 Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Wed, 25 Feb 2026 05:24:41 -0500 Subject: [PATCH 12/38] finish refactor by removing obsoleted methods, and clean up fallout --- src/lamp_py/flashback/io.py | 47 +++------------- src/lamp_py/flashback/pipeline.py | 27 +++------- src/lamp_py/ingestion/convert_gtfs_rt.py | 69 +++++++++--------------- tests/flashback/test_events.py | 12 ++--- 4 files changed, 45 insertions(+), 110 deletions(-) diff --git a/src/lamp_py/flashback/io.py b/src/lamp_py/flashback/io.py index 428ba151..7d3d522a 100644 --- a/src/lamp_py/flashback/io.py +++ b/src/lamp_py/flashback/io.py @@ -6,10 +6,8 @@ from lamp_py.flashback.events import ( StopEventsJSON, - StopEventsWithStatus, - StopEventsTable, + VehicleStopEvents, unnest_vehicle_positions, - unnest_vehicle_positions_new, ) from lamp_py.ingestion.convert_gtfs_rt import VehiclePositions, VehiclePositionsApiFormat from lamp_py.runtime_utils.process_logger import ProcessLogger @@ -17,7 +15,7 @@ from lamp_py.runtime_utils.remote_files import stop_events as stop_events_location -def get_remote_events(location: S3Location = stop_events_location) -> dy.DataFrame[StopEventsTable]: +def get_remote_events(location: S3Location = stop_events_location) -> dy.DataFrame[VehicleStopEvents]: """Fetch existing stop events from S3.""" process_logger = ProcessLogger("get_remote_events") process_logger.log_start() @@ -26,10 +24,12 @@ def get_remote_events(location: S3Location = stop_events_location) -> dy.DataFra *StopEventsJSON.filter(pl.scan_parquet(location.s3_uri), cast=True) ) - existing_events = StopEventsTable.cast( + # TODO : read in vehicle_positions parquet when available? + + existing_events = VehicleStopEvents.cast( pl.concat( [ - StopEventsTable.create_empty(), + VehicleStopEvents.create_empty(), remote_events.explode("stop_events").unnest("stop_events"), ], how="diagonal", @@ -38,7 +38,7 @@ def get_remote_events(location: S3Location = stop_events_location) -> dy.DataFra except OSError as e: process_logger.log_warning(e) - existing_events = StopEventsTable.create_empty() + existing_events = VehicleStopEvents.create_empty() process_logger.log_complete() @@ -49,7 +49,7 @@ async def get_vehicle_positions( url: str = "https://cdn.mbta.com/realtime/VehiclePositions_enhanced.json", sleep_interval: int = 3, max_retries: int = 10, -) -> dy.DataFrame[StopEventsWithStatus]: +) -> dy.DataFrame[VehiclePositions]: """Fetch the latest VehiclePositions data.""" process_logger = ProcessLogger("get_vehicle_positions", url=url) process_logger.log_start() @@ -72,35 +72,4 @@ async def get_vehicle_positions( process_logger.log_complete() - return unnest_vehicle_positions_new(valid) - - -async def get_vehicle_positions_old( - url: str = "https://cdn.mbta.com/realtime/VehiclePositions_enhanced.json", - sleep_interval: int = 3, - max_retries: int = 10, -) -> dy.DataFrame[VehiclePositions]: - """Fetch the latest VehiclePositions data.""" - process_logger = ProcessLogger("get_vehicle_positions_old", url=url) - process_logger.log_start() - - async with ClientSession() as session: - for attempt in range(max_retries + 1): - try: - async with session.get(url) as response: - response.raise_for_status() - data = await response.read() - break - except ClientError as e: - process_logger.log_failure(e) - if attempt == max_retries: - raise ClientError(f"Maximum retries ({max_retries}) exceeded") from e - sleep(sleep_interval) - - vehicle_positions = pl.read_ndjson(data, schema=VehiclePositions.to_polars_schema()) - - valid = process_logger.log_dataframely_filter_results(*VehiclePositions.filter(vehicle_positions)) - - process_logger.log_complete() - return unnest_vehicle_positions(valid) diff --git a/src/lamp_py/flashback/pipeline.py b/src/lamp_py/flashback/pipeline.py index 19f77d81..03884efe 100644 --- a/src/lamp_py/flashback/pipeline.py +++ b/src/lamp_py/flashback/pipeline.py @@ -4,15 +4,14 @@ from signal import SIGTERM, signal import dataframely as dy -import polars as pl from lamp_py.aws.ecs import handle_ecs_sigterm from lamp_py.flashback.events import ( - StopEventsTable, + VehicleStopEvents, filter_stop_events, structure_stop_events, aggregate_duration_with_new_records, - vehicle_position_to_raw_events, + vehicle_position_to_archive_events, ) from lamp_py.flashback.io import get_remote_events, get_vehicle_positions from lamp_py.runtime_utils.env_validation import validate_environment @@ -21,7 +20,7 @@ async def flashback( - remote_events: dy.DataFrame[StopEventsTable], + remote_events: dy.DataFrame[VehicleStopEvents], max_record_age: timedelta = timedelta(hours=2), local_override: str | None = None, ) -> None: @@ -36,9 +35,8 @@ async def flashback( # vehicle positions flattened, entire message new_records = await get_vehicle_positions() - # new processing # vehicle positions validated and filtered down to columns of interest - new_events = vehicle_position_to_raw_events(new_records) + new_events = vehicle_position_to_archive_events(new_records) # consolidate records with same stop status and sequence - generate start/stop time for each status type compressed_events = aggregate_duration_with_new_records(all_events, new_events) @@ -51,25 +49,12 @@ async def flashback( await asyncio.to_thread(lambda: structure_stop_events(compressed_stop_events).write_parquet(output_path)) - # old processing - - # stop_events = update_records(existing_events, new_records, max_record_age) - - # existing_events = stop_events - - # await asyncio.to_thread(lambda: structure_stop_events(stop_events).write_json, "old.json") - - # # timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - # timestamp = "2_hr_test" - # all_events.write_parquet(f"/tmp/stop_events_hh_{timestamp}.parquet") - # all_events.write_parquet(f"/tmp/all_events_hh_{timestamp}.parquet") - process_logger.log_complete() await asyncio.sleep(3) # wait before fetching new data -def pipeline() -> None: +def pipeline(local_override: str | None = None) -> None: """Entry point for flashback stop events pipeline.""" process_logger = ProcessLogger("main") process_logger.log_start() @@ -85,4 +70,4 @@ def pipeline() -> None: ], ) - asyncio.run(flashback(get_remote_events(), local_override="local_stop_events.parquet")) + asyncio.run(flashback(get_remote_events(), local_override=local_override)) diff --git a/src/lamp_py/ingestion/convert_gtfs_rt.py b/src/lamp_py/ingestion/convert_gtfs_rt.py index 11857cf5..adcba26a 100644 --- a/src/lamp_py/ingestion/convert_gtfs_rt.py +++ b/src/lamp_py/ingestion/convert_gtfs_rt.py @@ -114,51 +114,32 @@ class VehiclePositionsApiFormat(dy.Schema): class VehiclePositions(dy.Schema): """Api Format of VehiclePositions message.""" - entity = dy.List( - inner=dy.Struct( - { - "id": dy.String(primary_key=True), - "vehicle": dy.Struct( - inner={ - "trip": dy.Struct( - inner={ - "trip_id": dy.String(nullable=True), - "route_id": dy.String(nullable=True), - "direction_id": dy.Int8(min=0, max=1, nullable=True), - "start_time": dy.String(nullable=True), - "start_date": dy.String(nullable=True), - "revenue": dy.Bool(nullable=True), - "last_trip": dy.Bool(nullable=True), - "schedule_relationship": dy.String(nullable=True), - } - ), - "vehicle": dy.Struct( - inner={ - "id": dy.String(nullable=True), - "label": dy.String(nullable=True), - } - ), - "position": dy.Struct( - inner={ - "bearing": dy.UInt16(nullable=True), - "latitude": dy.Float64(nullable=True), - "longitude": dy.Float64(nullable=True), - "speed": dy.Float64(nullable=True), - } - ), - "current_stop_sequence": dy.Int16(nullable=True), - "stop_id": dy.String(nullable=True), - "timestamp": dy.Int64(nullable=True), - "occupancy_status": dy.String(nullable=True), - "occupancy_percentage": dy.UInt32(nullable=True), - "current_status": dy.String(nullable=True), - } - ), - }, - alias="vehicle", + entity = { + "id": dy.String(primary_key=True), + "trip_id": dy.String(nullable=True), + "route_id": dy.String(nullable=True), + "direction_id": dy.Int8(min=0, max=1, nullable=True), + "start_time": dy.String(nullable=True), + "start_date": dy.String(nullable=True), + "revenue": dy.Bool(nullable=True), + "last_trip": dy.Bool(nullable=True), + "schedule_relationship": dy.String(nullable=True), + "label": dy.String(nullable=True), + "position": dy.Struct( + inner={ + "bearing": dy.UInt16(nullable=True), + "latitude": dy.Float64(nullable=True), + "longitude": dy.Float64(nullable=True), + "speed": dy.Float64(nullable=True), + } ), - nullable=False, - ) + "current_stop_sequence": dy.Int16(nullable=True), + "stop_id": dy.String(nullable=True), + "timestamp": dy.Int64(nullable=True), + "occupancy_status": dy.String(nullable=True), + "occupancy_percentage": dy.UInt32(nullable=True), + "current_status": dy.String(nullable=True), + } @dataclass diff --git a/tests/flashback/test_events.py b/tests/flashback/test_events.py index 0f216c63..8a6c816b 100644 --- a/tests/flashback/test_events.py +++ b/tests/flashback/test_events.py @@ -5,7 +5,7 @@ import polars as pl import pytest -from lamp_py.flashback.events import StopEventsTable, structure_stop_events, unnest_vehicle_positions, update_records +from lamp_py.flashback.events import VehicleStopEvents, structure_stop_events, unnest_vehicle_positions, update_records from lamp_py.ingestion.convert_gtfs_rt import VehiclePositionsApiFormat @@ -280,8 +280,8 @@ def test_update_records( expected_events: set[tuple[str, int, int, int | None, int | None]], ) -> None: """It quickly and correctly updates records.""" - existing_records = StopEventsTable.sample(generator=dy_gen, overrides=existing_record_overrides) - new_records = StopEventsTable.sample(generator=dy_gen, overrides=new_record_overrides) + existing_records = VehicleStopEvents.sample(generator=dy_gen, overrides=existing_record_overrides) + new_records = VehicleStopEvents.sample(generator=dy_gen, overrides=new_record_overrides) updated = update_records(existing_records, new_records, timedelta(hours=2)) updated_set = set( tuple(i.values()) @@ -293,7 +293,7 @@ def test_update_records( def test_performance_update_records(dy_gen: dy.random.Generator, num_rows: int = 1_000_000) -> None: """It can handle 1,000,000 existing and new records in under a second.""" - existing_records = StopEventsTable.sample( + existing_records = VehicleStopEvents.sample( num_rows=num_rows, generator=dy_gen, overrides={ @@ -302,7 +302,7 @@ def test_performance_update_records(dy_gen: dy.random.Generator, num_rows: int = ) }, ) - new_records = StopEventsTable.sample( + new_records = VehicleStopEvents.sample( num_rows=num_rows // 10, generator=dy_gen, overrides={ @@ -320,7 +320,7 @@ def test_performance_update_records(dy_gen: dy.random.Generator, num_rows: int = def test_structure_stop_events(dy_gen: dy.random.Generator) -> None: """It correctly chooses the most recent timestamp and the first trip in the id.""" - events_df = StopEventsTable.sample( + events_df = VehicleStopEvents.sample( num_rows=2, generator=dy_gen, overrides={"id": "foo", "timestamp": [1, 2], "route_id": ["red", "blue"]} ) events_json = structure_stop_events(events_df) From f0e61615484c72dbb59e17725844596f6f37665c Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Wed, 25 Feb 2026 05:25:15 -0500 Subject: [PATCH 13/38] unused variable --- src/lamp_py/flashback/pipeline.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/lamp_py/flashback/pipeline.py b/src/lamp_py/flashback/pipeline.py index 03884efe..3bcb4558 100644 --- a/src/lamp_py/flashback/pipeline.py +++ b/src/lamp_py/flashback/pipeline.py @@ -26,7 +26,6 @@ async def flashback( ) -> None: """Fetch, process, and store stop events.""" all_events = remote_events - existing_events = remote_events while True: process_logger = ProcessLogger("flashback") From bbc81604a294ac8c7373be9843548e13d54de12b Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Wed, 25 Feb 2026 05:28:33 -0500 Subject: [PATCH 14/38] add some comments --- src/lamp_py/flashback/pipeline.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/lamp_py/flashback/pipeline.py b/src/lamp_py/flashback/pipeline.py index 3bcb4558..af7f2a40 100644 --- a/src/lamp_py/flashback/pipeline.py +++ b/src/lamp_py/flashback/pipeline.py @@ -34,18 +34,23 @@ async def flashback( # vehicle positions flattened, entire message new_records = await get_vehicle_positions() - # vehicle positions validated and filtered down to columns of interest + # vehicle positions validated and filtered down to columns of interest - i.e. removed lat/lon, occupancy + # i.e. vehicle reporting STOPPED_AT at time timestamp new_events = vehicle_position_to_archive_events(new_records) # consolidate records with same stop status and sequence - generate start/stop time for each status type + # single record for each event type, with new fields indicating duration of the event. + # i.e. vehicle STOPPED_AT for status_start_timestamp to status_end_timestamp. compressed_events = aggregate_duration_with_new_records(all_events, new_events) # generate flashback events for from stop records + # for flashback, only care about STOPPED_AT events - filter on those, and prepare structure for json export compressed_stop_events = filter_stop_events(compressed_events, max_record_age) output_path = local_override or stop_events_location.s3_uri process_logger.add_metadata(write_path=output_path) + # convert to agreed upon json structure, and export await asyncio.to_thread(lambda: structure_stop_events(compressed_stop_events).write_parquet(output_path)) process_logger.log_complete() From ede9785eb8865049fd254821cd6375fcee07fef9 Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Wed, 25 Feb 2026 05:38:53 -0500 Subject: [PATCH 15/38] formatting --- src/lamp_py/flashback/pipeline.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/lamp_py/flashback/pipeline.py b/src/lamp_py/flashback/pipeline.py index af7f2a40..d3f61b56 100644 --- a/src/lamp_py/flashback/pipeline.py +++ b/src/lamp_py/flashback/pipeline.py @@ -35,12 +35,12 @@ async def flashback( new_records = await get_vehicle_positions() # vehicle positions validated and filtered down to columns of interest - i.e. removed lat/lon, occupancy - # i.e. vehicle reporting STOPPED_AT at time timestamp + # i.e. vehicle reporting STOPPED_AT at time timestamp new_events = vehicle_position_to_archive_events(new_records) # consolidate records with same stop status and sequence - generate start/stop time for each status type - # single record for each event type, with new fields indicating duration of the event. - # i.e. vehicle STOPPED_AT for status_start_timestamp to status_end_timestamp. + # single record for each event type, with new fields indicating duration of the event. + # i.e. vehicle STOPPED_AT for status_start_timestamp to status_end_timestamp. compressed_events = aggregate_duration_with_new_records(all_events, new_events) # generate flashback events for from stop records From 1073f3ad06fd448d5c8cf86706651ae9315cf5e6 Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Wed, 25 Feb 2026 09:29:31 -0500 Subject: [PATCH 16/38] fix docstring --- src/lamp_py/flashback/events.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/src/lamp_py/flashback/events.py b/src/lamp_py/flashback/events.py index 7e92e2d4..3ceb6dde 100644 --- a/src/lamp_py/flashback/events.py +++ b/src/lamp_py/flashback/events.py @@ -86,14 +86,7 @@ def vehicle_position_to_archive_events(vp: dy.DataFrame[VehiclePositions]) -> dy trip IDs, timestamps, and route IDs. Generates a composite ID from start_date, trip_id, route_id, and vehicle id, then selects relevant columns for event archival. - Args: - vp: A DataFrame containing VehiclePositions data with required columns: - current_stop_sequence, trip_id, timestamp, route_id, start_date, id, - direction_id, start_time, revenue, stop_id, and current_status. - - Returns: - A DataFrame[VehicleEvents] containing filtered vehicle position data with - a composite ID and selected columns for downstream event processing. + Start_date is required to have a unique identifier across days, as all other identifiers are reusable. """ process_logger = ProcessLogger("vehicle_position_to_archive_events", input_rows=vp.height) process_logger.log_start() @@ -119,7 +112,11 @@ def vehicle_position_to_archive_events(vp: dy.DataFrame[VehiclePositions]) -> dy "current_status", ) - return events + valid = process_logger.log_dataframely_filter_results(*VehicleEvents.filter(events, cast=True)) + + process_logger.log_complete() + + return valid def aggregate_duration_with_new_records( From 70011bcb2a4693dde9c7fc9cdcde203480cb4c4a Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Wed, 25 Feb 2026 09:29:54 -0500 Subject: [PATCH 17/38] fix tests for new changes --- src/lamp_py/flashback/events.py | 10 +-- src/lamp_py/flashback/io.py | 2 - tests/flashback/test_events.py | 135 +++++++++++++++++++++----------- tests/flashback/test_io.py | 38 ++------- 4 files changed, 101 insertions(+), 84 deletions(-) diff --git a/src/lamp_py/flashback/events.py b/src/lamp_py/flashback/events.py index 3ceb6dde..ed44a339 100644 --- a/src/lamp_py/flashback/events.py +++ b/src/lamp_py/flashback/events.py @@ -41,8 +41,8 @@ class VehicleStopEvents(dy.Schema): current_stop_sequence = VehicleEvents.current_stop_sequence # remove current status # renamed status start and stop to arrival and departure for stop events schema - arrival = VehicleEvents.status_start_timestamp - departure = VehicleEvents.status_end_timestamp + arrived = VehicleEvents.status_start_timestamp + departed = VehicleEvents.status_end_timestamp class StopEventsJSON(dy.Schema): @@ -61,8 +61,8 @@ class StopEventsJSON(dy.Schema): inner={ "stop_id": VehicleStopEvents.stop_id, "current_stop_sequence": VehicleStopEvents.current_stop_sequence, - "arrived": VehicleStopEvents.arrival, - "departed": VehicleStopEvents.departure, + "arrived": VehicleStopEvents.arrived, + "departed": VehicleStopEvents.departed, } ) ) @@ -86,7 +86,7 @@ def vehicle_position_to_archive_events(vp: dy.DataFrame[VehiclePositions]) -> dy trip IDs, timestamps, and route IDs. Generates a composite ID from start_date, trip_id, route_id, and vehicle id, then selects relevant columns for event archival. - Start_date is required to have a unique identifier across days, as all other identifiers are reusable. + Start_date is required to have a unique identifier across days, as all other identifiers are reusable. """ process_logger = ProcessLogger("vehicle_position_to_archive_events", input_rows=vp.height) process_logger.log_start() diff --git a/src/lamp_py/flashback/io.py b/src/lamp_py/flashback/io.py index 7d3d522a..ae4b3d7e 100644 --- a/src/lamp_py/flashback/io.py +++ b/src/lamp_py/flashback/io.py @@ -24,8 +24,6 @@ def get_remote_events(location: S3Location = stop_events_location) -> dy.DataFra *StopEventsJSON.filter(pl.scan_parquet(location.s3_uri), cast=True) ) - # TODO : read in vehicle_positions parquet when available? - existing_events = VehicleStopEvents.cast( pl.concat( [ diff --git a/tests/flashback/test_events.py b/tests/flashback/test_events.py index 8a6c816b..30f32c79 100644 --- a/tests/flashback/test_events.py +++ b/tests/flashback/test_events.py @@ -5,8 +5,15 @@ import polars as pl import pytest -from lamp_py.flashback.events import VehicleStopEvents, structure_stop_events, unnest_vehicle_positions, update_records +from lamp_py.flashback.events import ( + VehicleEvents, + VehicleStopEvents, + aggregate_duration_with_new_records, + structure_stop_events, + unnest_vehicle_positions, +) from lamp_py.ingestion.convert_gtfs_rt import VehiclePositionsApiFormat +from lamp_py.flashback.events import filter_stop_events @pytest.mark.parametrize( @@ -135,17 +142,15 @@ def test_unnest_vehicle_positions(entity: list[dict], valid_records: int) -> Non "id": "foo", "timestamp": [2_000_000_000 + 1, 2_000_000_000 + 2], # sometime in the future "current_stop_sequence": [2, 3], - "latest_stopped_timestamp": [2_000_000_000, 2_000_000_000 + 1], - "arrived": [2_000_000_000, 2_000_000_000 + 1], - "departed": [2_000_000_000, None], + "status_start_timestamp": [2_000_000_000, 2_000_000_000 + 1], + "status_end_timestamp": [2_000_000_000, None], }, { "id": "foo", "timestamp": 2_000_000_000 + 2, "current_stop_sequence": 3, - "latest_stopped_timestamp": 2_000_000_000 + 1, - "arrived": 2_000_000_000 + 1, - "departed": None, + "status_start_timestamp": 2_000_000_000 + 1, + "status_end_timestamp": None, }, { ("foo", 2, 2_000_000_000 + 1, 2_000_000_000, 2_000_000_000), @@ -157,17 +162,15 @@ def test_unnest_vehicle_positions(entity: list[dict], valid_records: int) -> Non "id": "foo", "timestamp": [2_000_000_000 + 1, 2_000_000_000 + 2], # sometime in the future "current_stop_sequence": [2, 3], - "latest_stopped_timestamp": [2_000_000_000, 2_000_000_000 + 1], - "arrived": [2_000_000_000, 2_000_000_000 + 1], - "departed": [2_000_000_000, None], + "status_start_timestamp": [2_000_000_000, 2_000_000_000 + 1], + "status_end_timestamp": [2_000_000_000, None], }, { "id": "foo", "timestamp": 2_000_000_000 + 3, "current_stop_sequence": 3, - "latest_stopped_timestamp": 2_000_000_000 + 2, - "arrived": 2_000_000_000 + 2, - "departed": None, + "status_start_timestamp": 2_000_000_000 + 2, + "status_end_timestamp": None, }, { ("foo", 2, 2_000_000_000 + 1, 2_000_000_000, 2_000_000_000), @@ -179,17 +182,15 @@ def test_unnest_vehicle_positions(entity: list[dict], valid_records: int) -> Non "id": "foo", "timestamp": [2_000_000_000 + 1, 2_000_000_000 + 2], # sometime in the future "current_stop_sequence": [2, 3], - "latest_stopped_timestamp": [2_000_000_000, 2_000_000_000 + 1], - "arrived": [2_000_000_000, 2_000_000_000 + 1], - "departed": [2_000_000_000, None], + "status_start_timestamp": [2_000_000_000, 2_000_000_000 + 1], + "status_end_timestamp": [2_000_000_000, None], }, { "id": "foo", "timestamp": 2_000_000_000 + 3, "current_stop_sequence": 4, - "latest_stopped_timestamp": 2_000_000_000 + 2, - "arrived": 2_000_000_000 + 2, - "departed": None, + "status_start_timestamp": 2_000_000_000 + 2, + "status_end_timestamp": None, }, { ("foo", 2, 2_000_000_000 + 1, 2_000_000_000, 2_000_000_000), @@ -202,17 +203,15 @@ def test_unnest_vehicle_positions(entity: list[dict], valid_records: int) -> Non "id": "foo", "timestamp": [1_000_000_000 + 1, 1_000_000_000 + 2], # SOMETIME IN THE PAST "current_stop_sequence": [2, 3], - "latest_stopped_timestamp": [2_000_000_000, 2_000_000_000 + 1], - "arrived": [2_000_000_000, 2_000_000_000 + 1], - "departed": [2_000_000_000, None], + "status_start_timestamp": [2_000_000_000, 2_000_000_000 + 1], + "status_end_timestamp": [2_000_000_000, None], }, { "id": "foo", "timestamp": 2_000_000_000 + 3, "current_stop_sequence": 4, - "latest_stopped_timestamp": 2_000_000_000 + 2, - "arrived": 2_000_000_000 + 2, - "departed": None, + "status_start_timestamp": 2_000_000_000 + 2, + "status_end_timestamp": 2_000_000_000 + 2, }, { ("foo", 4, 2_000_000_000 + 3, 2_000_000_000 + 2, None), @@ -223,17 +222,15 @@ def test_unnest_vehicle_positions(entity: list[dict], valid_records: int) -> Non "id": "foo", "timestamp": [2_000_000_000 + 1, 2_000_000_000 + 2], # sometime in the future "current_stop_sequence": [2, 3], - "latest_stopped_timestamp": [2_000_000_000, 2_000_000_000 + 1], - "arrived": [2_000_000_000, 2_000_000_000 + 1], - "departed": [2_000_000_000, None], + "status_start_timestamp": [2_000_000_000, 2_000_000_000 + 1], + "status_end_timestamp": [2_000_000_000, None], }, { "id": "foo", "timestamp": 2_000_000_000 + 3, "current_stop_sequence": 50, - "latest_stopped_timestamp": 2_000_000_000 + 2, - "arrived": 2_000_000_000 + 2, - "departed": None, + "status_start_timestamp": 2_000_000_000 + 2, + "status_end_timestamp": 2_000_000_000 + 2, }, { ("foo", 2, 2_000_000_000 + 1, 2_000_000_000, 2_000_000_000), @@ -246,17 +243,15 @@ def test_unnest_vehicle_positions(entity: list[dict], valid_records: int) -> Non "id": "foo", "timestamp": [2_000_000_000 + 1, 2_000_000_000 + 2], # sometime in the future "current_stop_sequence": [2, 3], - "latest_stopped_timestamp": [2_000_000_000, 2_000_000_000 + 1], - "arrived": [None, None], - "departed": [None, None], + "status_start_timestamp": [2_000_000_000, 2_000_000_000 + 1], + "status_end_timestamp": [None, None], }, { "id": "foo", "timestamp": 2_000_000_000 + 3, "current_stop_sequence": 3, - "latest_stopped_timestamp": 2_000_000_000 + 2, - "arrived": 2_000_000_000 + 2, - "departed": None, + "status_start_timestamp": 2_000_000_000 + 2, + "status_end_timestamp": None, }, { ("foo", 2, 2_000_000_000 + 3, None, 2_000_000_000), @@ -280,12 +275,12 @@ def test_update_records( expected_events: set[tuple[str, int, int, int | None, int | None]], ) -> None: """It quickly and correctly updates records.""" - existing_records = VehicleStopEvents.sample(generator=dy_gen, overrides=existing_record_overrides) - new_records = VehicleStopEvents.sample(generator=dy_gen, overrides=new_record_overrides) - updated = update_records(existing_records, new_records, timedelta(hours=2)) + existing_records = VehicleEvents.sample(generator=dy_gen, overrides=existing_record_overrides) + new_records = VehicleEvents.sample(generator=dy_gen, overrides=new_record_overrides) + updated = aggregate_duration_with_new_records(existing_records, new_records) updated_set = set( tuple(i.values()) - for i in updated.select("id", "current_stop_sequence", "timestamp", "arrived", "departed").to_struct().to_list() + for i in updated.select("id", "current_stop_sequence", "timestamp", "status_start_timestamp", "status_end_timestamp").to_struct().to_list() ) assert updated_set == expected_events @@ -293,7 +288,7 @@ def test_update_records( def test_performance_update_records(dy_gen: dy.random.Generator, num_rows: int = 1_000_000) -> None: """It can handle 1,000,000 existing and new records in under a second.""" - existing_records = VehicleStopEvents.sample( + existing_records = VehicleEvents.sample( num_rows=num_rows, generator=dy_gen, overrides={ @@ -302,7 +297,7 @@ def test_performance_update_records(dy_gen: dy.random.Generator, num_rows: int = ) }, ) - new_records = VehicleStopEvents.sample( + new_records = VehicleEvents.sample( num_rows=num_rows // 10, generator=dy_gen, overrides={ @@ -313,7 +308,7 @@ def test_performance_update_records(dy_gen: dy.random.Generator, num_rows: int = ) start = time.time() - _ = update_records(existing_records, new_records, timedelta(hours=2)) + _ = aggregate_duration_with_new_records(existing_records, new_records) duration = time.time() - start assert duration < 1.0 @@ -328,3 +323,55 @@ def test_structure_stop_events(dy_gen: dy.random.Generator) -> None: assert events_df.select("start_date", "trip_id", "direction_id", "route_id", "start_time", "revenue").row( 0 ) == events_json.select("start_date", "trip_id", "direction_id", "route_id", "start_time", "revenue").row(0) + + +@pytest.mark.parametrize( + [ + "current_status", + "status_start_timestamp", + "status_end_timestamp", + "timestamp", + "should_pass", + ], + [ + ("STOPPED_AT", 2_000_000_000, 2_000_000_000 + 1, int(time.time()), True), + ("IN_TRANSIT_TO", 2_000_000_000, 2_000_000_000 + 1, int(time.time()), False), + ("STOPPED_AT", None, None, int(time.time()), False), + ("STOPPED_AT", 2_000_000_000, None, int(time.time()), True), + ("STOPPED_AT", None, 2_000_000_000 + 1, int(time.time()), True), + ("STOPPED_AT", 2_000_000_000, 2_000_000_000 + 1, int(time.time() - 86400 * 8), False), + ], + ids=[ + "valid-stopped-at-event", + "wrong-status", + "null-timestamps", + "null-start-only", + "null-end-only", + "old-record-outside-max-age", + ], +) +def test_filter_stop_events( + dy_gen: dy.random.Generator, + current_status: str, + status_start_timestamp: int | None, + status_end_timestamp: int | None, + timestamp: int, + should_pass: bool, +) -> None: + """It correctly filters stop events by status, timestamps, and age.""" + + events = VehicleEvents.sample( + num_rows=1, + generator=dy_gen, + overrides={ + "current_status": current_status, + "status_start_timestamp": status_start_timestamp, + "status_end_timestamp": status_end_timestamp, + "timestamp": timestamp, + }, + ) + + max_record_age = timedelta(days=7) + filtered = filter_stop_events(events, max_record_age) + + assert (filtered.height == 1) == should_pass diff --git a/tests/flashback/test_io.py b/tests/flashback/test_io.py index 8626095b..1dca7f6b 100644 --- a/tests/flashback/test_io.py +++ b/tests/flashback/test_io.py @@ -12,7 +12,7 @@ from polars.testing import assert_frame_equal from lamp_py.flashback.events import StopEventsJSON -from lamp_py.flashback.io import get_remote_events, get_vehicle_positions_old, write_dataframe +from lamp_py.flashback.io import get_remote_events, get_vehicle_positions from lamp_py.ingestion.convert_gtfs_rt import VehiclePositionsApiFormat from tests.test_resources import LocalS3Location @@ -135,9 +135,9 @@ async def test_get_vehicle_positions( # If too many retries, expect ClientError if num_failures >= max_retries: with pytest.raises(ClientError): - await get_vehicle_positions_old(max_retries=max_retries) + await get_vehicle_positions(max_retries=max_retries) else: - df = await get_vehicle_positions_old(max_retries=max_retries) + df = await get_vehicle_positions(max_retries=max_retries) assert df.height == 1 assert mock_sleep.call_count == num_failures @@ -190,35 +190,7 @@ async def test_invalid_vehicle_positions_schema( mock_get.return_value.__aenter__.return_value = mock_response with raises_error: - df = await get_vehicle_positions_old() + df = await get_vehicle_positions() assert df.height == expected_rows - assert has_invalid_records == (WARNING in [record[1] for record in caplog.record_tuples]) - - -@pytest.mark.parametrize( - "should_fail", - [False, True], - ids=["success", "write-failure"], -) -def test_write_stop_events( - dy_gen: dy.random.Generator, - tmp_path: Path, - should_fail: bool, -) -> None: - """It writes stop events to S3 and handles write failures.""" - test_location = LocalS3Location(tmp_path.as_posix(), "test.parquet") - stop_events = StopEventsJSON.sample(2, generator=dy_gen) - - if should_fail: - # Simulate persistent write failure - with patch("polars.DataFrame.write_parquet", side_effect=OSError("S3 write error")): - with pytest.raises(OSError): - write_dataframe(stop_events, test_location) - else: - write_dataframe(stop_events, test_location) - - # Verify file was written successfully - assert Path(test_location.s3_uri).exists() - written_df = pl.read_parquet(test_location.s3_uri) - assert_frame_equal(written_df, stop_events) + assert has_invalid_records == (WARNING in [record[1] for record in caplog.record_tuples]) \ No newline at end of file From 631b54941a4b43ef0e7f144ad040126095ed40b6 Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Wed, 25 Feb 2026 09:30:18 -0500 Subject: [PATCH 18/38] black --- tests/flashback/test_events.py | 6 +++++- tests/flashback/test_io.py | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/flashback/test_events.py b/tests/flashback/test_events.py index 30f32c79..409d1e99 100644 --- a/tests/flashback/test_events.py +++ b/tests/flashback/test_events.py @@ -280,7 +280,11 @@ def test_update_records( updated = aggregate_duration_with_new_records(existing_records, new_records) updated_set = set( tuple(i.values()) - for i in updated.select("id", "current_stop_sequence", "timestamp", "status_start_timestamp", "status_end_timestamp").to_struct().to_list() + for i in updated.select( + "id", "current_stop_sequence", "timestamp", "status_start_timestamp", "status_end_timestamp" + ) + .to_struct() + .to_list() ) assert updated_set == expected_events diff --git a/tests/flashback/test_io.py b/tests/flashback/test_io.py index 1dca7f6b..0083587d 100644 --- a/tests/flashback/test_io.py +++ b/tests/flashback/test_io.py @@ -193,4 +193,4 @@ async def test_invalid_vehicle_positions_schema( df = await get_vehicle_positions() assert df.height == expected_rows - assert has_invalid_records == (WARNING in [record[1] for record in caplog.record_tuples]) \ No newline at end of file + assert has_invalid_records == (WARNING in [record[1] for record in caplog.record_tuples]) From 807c7e5f396f65cbf41269c94d4c90aee4ea8d25 Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Wed, 25 Feb 2026 12:39:18 -0500 Subject: [PATCH 19/38] cleanup tests to pass. remove tests that don't test useful functionality --- src/lamp_py/flashback/events.py | 5 +- src/lamp_py/ingestion/convert_gtfs_rt.py | 54 +++--- tests/flashback/test_events.py | 213 ++--------------------- tests/flashback/test_io.py | 53 +----- 4 files changed, 45 insertions(+), 280 deletions(-) diff --git a/src/lamp_py/flashback/events.py b/src/lamp_py/flashback/events.py index ed44a339..46c5fdee 100644 --- a/src/lamp_py/flashback/events.py +++ b/src/lamp_py/flashback/events.py @@ -73,9 +73,12 @@ def unnest_vehicle_positions(vp: dy.DataFrame[VehiclePositionsApiFormat]) -> dy. process_logger = ProcessLogger("unnest_vehicle_positions", input_rows=vp.height) process_logger.log_start() vehicle_positions = vp.select("entity").explode("entity").unnest("entity").unnest("vehicle").unnest("trip") + + valid = process_logger.log_dataframely_filter_results(*VehiclePositions.filter(vehicle_positions, cast=True)) + process_logger.log_complete() - return vehicle_positions + return valid def vehicle_position_to_archive_events(vp: dy.DataFrame[VehiclePositions]) -> dy.DataFrame[VehicleEvents]: diff --git a/src/lamp_py/ingestion/convert_gtfs_rt.py b/src/lamp_py/ingestion/convert_gtfs_rt.py index adcba26a..0e93a44b 100644 --- a/src/lamp_py/ingestion/convert_gtfs_rt.py +++ b/src/lamp_py/ingestion/convert_gtfs_rt.py @@ -112,34 +112,32 @@ class VehiclePositionsApiFormat(dy.Schema): class VehiclePositions(dy.Schema): - """Api Format of VehiclePositions message.""" - - entity = { - "id": dy.String(primary_key=True), - "trip_id": dy.String(nullable=True), - "route_id": dy.String(nullable=True), - "direction_id": dy.Int8(min=0, max=1, nullable=True), - "start_time": dy.String(nullable=True), - "start_date": dy.String(nullable=True), - "revenue": dy.Bool(nullable=True), - "last_trip": dy.Bool(nullable=True), - "schedule_relationship": dy.String(nullable=True), - "label": dy.String(nullable=True), - "position": dy.Struct( - inner={ - "bearing": dy.UInt16(nullable=True), - "latitude": dy.Float64(nullable=True), - "longitude": dy.Float64(nullable=True), - "speed": dy.Float64(nullable=True), - } - ), - "current_stop_sequence": dy.Int16(nullable=True), - "stop_id": dy.String(nullable=True), - "timestamp": dy.Int64(nullable=True), - "occupancy_status": dy.String(nullable=True), - "occupancy_percentage": dy.UInt32(nullable=True), - "current_status": dy.String(nullable=True), - } + """Flat Format of VehiclePositions message.""" + + id = dy.String(primary_key=True) + trip_id = dy.String(nullable=True) + route_id = dy.String(nullable=True) + direction_id = dy.Int8(min=0, max=1, nullable=True) + start_time = dy.String(nullable=True) + start_date = dy.String(nullable=True) + revenue = dy.Bool(nullable=True) + last_trip = dy.Bool(nullable=True) + schedule_relationship = dy.String(nullable=True) + # label = dy.String(nullable=True) + position = dy.Struct( + inner={ + "bearing": dy.UInt16(nullable=True), + "latitude": dy.Float64(nullable=True), + "longitude": dy.Float64(nullable=True), + "speed": dy.Float64(nullable=True), + } + ) + current_stop_sequence = dy.Int16(nullable=True) + stop_id = dy.String(nullable=True) + timestamp = dy.Int64(nullable=True) + occupancy_status = dy.String(nullable=True) + occupancy_percentage = dy.UInt32(nullable=True) + current_status = dy.String(nullable=True) @dataclass diff --git a/tests/flashback/test_events.py b/tests/flashback/test_events.py index 409d1e99..6ea87adf 100644 --- a/tests/flashback/test_events.py +++ b/tests/flashback/test_events.py @@ -57,39 +57,6 @@ ], 1, ), - ( - [ - { - "id": "1234", - "vehicle": { - "trip": { - "start_date": "20231010", - "start_time": "08:00:00", - "direction_id": 1, - "revenue": True, - "last_trip": False, - "schedule_relationship": "SCHEDULED", - }, - "position": { - "latitude": 42.352271, - "longitude": -71.055242, - "bearing": 90.0, - }, - "vehicle": { - "id": "vehicle_1234", - "label": "Bus 1234", - }, - "current_stop_sequence": 5, - "stop_id": "place-dwnxg", - "timestamp": 1700000000, - "occupancy_status": "MANY_SEATS_AVAILABLE", - "occupancy_percentage": 30, - "current_status": "IN_TRANSIT_TO", - }, - }, - ], - 0, - ), ( [ { @@ -116,7 +83,6 @@ ], ids=[ "complete-data", - "null-primary-keys", "null-non-primary-keys", "empty-entity", ], @@ -129,185 +95,33 @@ def test_unnest_vehicle_positions(entity: list[dict], valid_records: int) -> Non df = unnest_vehicle_positions(vp) assert df.height == valid_records +def test_performance_update_records(dy_gen: dy.random.Generator, num_rows: int = 100000) -> None: + """It can handle 1,000,000 existing and new records in under a second.""" -@pytest.mark.parametrize( - [ - "existing_record_overrides", - "new_record_overrides", - "expected_events", - ], - [ - ( - { - "id": "foo", - "timestamp": [2_000_000_000 + 1, 2_000_000_000 + 2], # sometime in the future - "current_stop_sequence": [2, 3], - "status_start_timestamp": [2_000_000_000, 2_000_000_000 + 1], - "status_end_timestamp": [2_000_000_000, None], - }, - { - "id": "foo", - "timestamp": 2_000_000_000 + 2, - "current_stop_sequence": 3, - "status_start_timestamp": 2_000_000_000 + 1, - "status_end_timestamp": None, - }, - { - ("foo", 2, 2_000_000_000 + 1, 2_000_000_000, 2_000_000_000), - ("foo", 3, 2_000_000_000 + 2, 2_000_000_000 + 1, None), - }, - ), - ( - { - "id": "foo", - "timestamp": [2_000_000_000 + 1, 2_000_000_000 + 2], # sometime in the future - "current_stop_sequence": [2, 3], - "status_start_timestamp": [2_000_000_000, 2_000_000_000 + 1], - "status_end_timestamp": [2_000_000_000, None], - }, - { - "id": "foo", - "timestamp": 2_000_000_000 + 3, - "current_stop_sequence": 3, - "status_start_timestamp": 2_000_000_000 + 2, - "status_end_timestamp": None, - }, - { - ("foo", 2, 2_000_000_000 + 1, 2_000_000_000, 2_000_000_000), - ("foo", 3, 2_000_000_000 + 2, 2_000_000_000 + 1, None), - }, - ), - ( - { - "id": "foo", - "timestamp": [2_000_000_000 + 1, 2_000_000_000 + 2], # sometime in the future - "current_stop_sequence": [2, 3], - "status_start_timestamp": [2_000_000_000, 2_000_000_000 + 1], - "status_end_timestamp": [2_000_000_000, None], - }, - { - "id": "foo", - "timestamp": 2_000_000_000 + 3, - "current_stop_sequence": 4, - "status_start_timestamp": 2_000_000_000 + 2, - "status_end_timestamp": None, - }, - { - ("foo", 2, 2_000_000_000 + 1, 2_000_000_000, 2_000_000_000), - ("foo", 3, 2_000_000_000 + 3, 2_000_000_000 + 1, 2_000_000_000 + 1), - ("foo", 4, 2_000_000_000 + 3, 2_000_000_000 + 2, None), - }, - ), - ( - { - "id": "foo", - "timestamp": [1_000_000_000 + 1, 1_000_000_000 + 2], # SOMETIME IN THE PAST - "current_stop_sequence": [2, 3], - "status_start_timestamp": [2_000_000_000, 2_000_000_000 + 1], - "status_end_timestamp": [2_000_000_000, None], - }, - { - "id": "foo", - "timestamp": 2_000_000_000 + 3, - "current_stop_sequence": 4, - "status_start_timestamp": 2_000_000_000 + 2, - "status_end_timestamp": 2_000_000_000 + 2, - }, - { - ("foo", 4, 2_000_000_000 + 3, 2_000_000_000 + 2, None), - }, - ), - ( - { - "id": "foo", - "timestamp": [2_000_000_000 + 1, 2_000_000_000 + 2], # sometime in the future - "current_stop_sequence": [2, 3], - "status_start_timestamp": [2_000_000_000, 2_000_000_000 + 1], - "status_end_timestamp": [2_000_000_000, None], - }, - { - "id": "foo", - "timestamp": 2_000_000_000 + 3, - "current_stop_sequence": 50, - "status_start_timestamp": 2_000_000_000 + 2, - "status_end_timestamp": 2_000_000_000 + 2, - }, - { - ("foo", 2, 2_000_000_000 + 1, 2_000_000_000, 2_000_000_000), - ("foo", 3, 2_000_000_000 + 3, 2_000_000_000 + 1, 2_000_000_000 + 1), - ("foo", 50, 2_000_000_000 + 3, 2_000_000_000 + 2, None), - }, - ), - ( - { - "id": "foo", - "timestamp": [2_000_000_000 + 1, 2_000_000_000 + 2], # sometime in the future - "current_stop_sequence": [2, 3], - "status_start_timestamp": [2_000_000_000, 2_000_000_000 + 1], - "status_end_timestamp": [None, None], - }, - { - "id": "foo", - "timestamp": 2_000_000_000 + 3, - "current_stop_sequence": 3, - "status_start_timestamp": 2_000_000_000 + 2, - "status_end_timestamp": None, - }, - { - ("foo", 2, 2_000_000_000 + 3, None, 2_000_000_000), - ("foo", 3, 2_000_000_000 + 2, 2_000_000_000 + 2, None), - }, - ), - ], - ids=[ - "old-records-only", - "same-stop-sequence-newer-timestamp", - "new-stop-sequence", - "outdated-records", - "non-sequential-stop-sequences", - "null-arrived-departed", # is this bad behavior? - ], -) -def test_update_records( - dy_gen: dy.random.Generator, - existing_record_overrides: dict, - new_record_overrides: dict, - expected_events: set[tuple[str, int, int, int | None, int | None]], -) -> None: - """It quickly and correctly updates records.""" - existing_records = VehicleEvents.sample(generator=dy_gen, overrides=existing_record_overrides) - new_records = VehicleEvents.sample(generator=dy_gen, overrides=new_record_overrides) - updated = aggregate_duration_with_new_records(existing_records, new_records) - updated_set = set( - tuple(i.values()) - for i in updated.select( - "id", "current_stop_sequence", "timestamp", "status_start_timestamp", "status_end_timestamp" - ) - .to_struct() - .to_list() - ) - - assert updated_set == expected_events - + statuses = ["IN_TRANSIT_TO", "STOPPED_AT", "INCOMING_TO"] -def test_performance_update_records(dy_gen: dy.random.Generator, num_rows: int = 1_000_000) -> None: - """It can handle 1,000,000 existing and new records in under a second.""" existing_records = VehicleEvents.sample( num_rows=num_rows, generator=dy_gen, overrides={ "timestamp": dy_gen.sample_int( num_rows, min=int(datetime(1970, 1, 1).timestamp()), max=int(datetime(2039, 1, 1).timestamp()) - ) + ), + "current_stop_sequence": dy_gen.sample_int(num_rows, min=1, max=50), + "current_status": dy_gen.sample_choice(num_rows, choices=statuses), }, ) + new_records_count = 1_000 new_records = VehicleEvents.sample( - num_rows=num_rows // 10, + new_records_count, generator=dy_gen, overrides={ + "id": dy_gen.sample_choice(new_records_count, choices=existing_records.select("id").to_series().to_list()), "timestamp": dy_gen.sample_int( - num_rows // 10, min=int(datetime(1970, 1, 1).timestamp()), max=int(datetime(2039, 1, 1).timestamp()) - ) + new_records_count, min=int(datetime(1970, 1, 1).timestamp()), max=int(datetime(2039, 1, 1).timestamp()) + ), + "current_stop_sequence": dy_gen.sample_int(new_records_count, min=1, max=50), + "current_status": dy_gen.sample_choice(new_records_count, choices=statuses), }, ) @@ -354,6 +168,7 @@ def test_structure_stop_events(dy_gen: dy.random.Generator) -> None: "old-record-outside-max-age", ], ) +# pylint: disable=too-many-arguments,too-many-positional-arguments def test_filter_stop_events( dy_gen: dy.random.Generator, current_status: str, diff --git a/tests/flashback/test_io.py b/tests/flashback/test_io.py index 0083587d..3197e33c 100644 --- a/tests/flashback/test_io.py +++ b/tests/flashback/test_io.py @@ -9,7 +9,6 @@ import polars as pl import pytest from aiohttp import ClientError -from polars.testing import assert_frame_equal from lamp_py.flashback.events import StopEventsJSON from lamp_py.flashback.io import get_remote_events, get_vehicle_positions @@ -137,60 +136,10 @@ async def test_get_vehicle_positions( with pytest.raises(ClientError): await get_vehicle_positions(max_retries=max_retries) else: - df = await get_vehicle_positions(max_retries=max_retries) + await get_vehicle_positions(max_retries=max_retries) - assert df.height == 1 assert mock_sleep.call_count == num_failures # Check that failures were logged (status=failed appears in log message) assert "ClientError" in caplog.text failure_logs = [record for record in caplog.record_tuples if "status=failed" in record[2]] assert len(failure_logs) == num_failures - - -@pytest.mark.parametrize( - ["overrides", "expected_rows", "raises_error", "has_invalid_records"], - [ - ( - {"entity": pl.col("entity").list.eval(pl.element().struct.with_fields(id=pl.lit("a")))}, - 0, - nullcontext(), - True, - ), - ({"entity": pl.col("entity")}, 1, nullcontext(), False), # no change - ( - {"entity": pl.col("entity").list.eval(pl.element().struct.rename_fields(["id", "trip"]))}, - 0, - nullcontext(), - True, - ), # remove vehicle field - row exists but with empty entity list (valid data) - ( - {"entity": pl.col("entity").list.eval(pl.element().struct.with_fields(vehicle=pl.lit(1)))}, - 0, - pytest.raises(pl.exceptions.SchemaError), - True, - ), - ], - ids=["duplicate-primary-keys", "valid-data", "null-vehicle", "invalid-schema"], -) -@pytest.mark.asyncio -@patch("aiohttp.ClientSession.get") -async def test_invalid_vehicle_positions_schema( - mock_get: AsyncMock, - dy_gen: dy.random.Generator, - mock_vp_response: Callable[[dy.DataFrame[VehiclePositionsApiFormat]], tuple[AsyncMock, bytes]], - overrides: dict[str, pl.Expr], - expected_rows: int, - raises_error: pytest.RaisesExc, - has_invalid_records: bool, - caplog: pytest.LogCaptureFixture, -) -> None: - """It filters out events that don't comply with the schema.""" - vp = VehiclePositionsApiFormat.sample(generator=dy_gen).with_columns(**overrides) - mock_response, _ = mock_vp_response(vp) # type: ignore[arg-type] - mock_get.return_value.__aenter__.return_value = mock_response - - with raises_error: - df = await get_vehicle_positions() - - assert df.height == expected_rows - assert has_invalid_records == (WARNING in [record[1] for record in caplog.record_tuples]) From bf003af0b28fa91cf23aa3a28d23b9cdaabf1778 Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Wed, 25 Feb 2026 12:49:16 -0500 Subject: [PATCH 20/38] black --- tests/flashback/test_events.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/flashback/test_events.py b/tests/flashback/test_events.py index 6ea87adf..0f740600 100644 --- a/tests/flashback/test_events.py +++ b/tests/flashback/test_events.py @@ -95,6 +95,7 @@ def test_unnest_vehicle_positions(entity: list[dict], valid_records: int) -> Non df = unnest_vehicle_positions(vp) assert df.height == valid_records + def test_performance_update_records(dy_gen: dy.random.Generator, num_rows: int = 100000) -> None: """It can handle 1,000,000 existing and new records in under a second.""" From 2e20441599b8315e05a5acd52dbd0ea489aede47 Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Wed, 25 Feb 2026 12:53:25 -0500 Subject: [PATCH 21/38] types --- src/lamp_py/flashback/events.py | 7 +++- tests/flashback/test_events.py | 70 +++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 2 deletions(-) diff --git a/src/lamp_py/flashback/events.py b/src/lamp_py/flashback/events.py index 46c5fdee..a72f5de8 100644 --- a/src/lamp_py/flashback/events.py +++ b/src/lamp_py/flashback/events.py @@ -123,7 +123,7 @@ def vehicle_position_to_archive_events(vp: dy.DataFrame[VehiclePositions]) -> dy def aggregate_duration_with_new_records( - existing_records: dy.DataFrame[VehicleEvents], + existing_records: dy.DataFrame[VehicleStopEvents], new_records: dy.DataFrame[VehicleEvents], ) -> dy.DataFrame[VehicleEvents]: """ @@ -189,7 +189,7 @@ def filter_stop_events( and rename the status start and end periods to stop event schema format """ - return ( + filtered = ( compressed_events.filter( (pl.col("current_status") == "STOPPED_AT") & (pl.col("status_start_timestamp").is_not_null() | pl.col("status_end_timestamp").is_not_null()) @@ -206,6 +206,9 @@ def filter_stop_events( .rename({"status_start_timestamp": "arrived", "status_end_timestamp": "departed"}) ) + valid = ProcessLogger("filter_stop_events").log_dataframely_filter_results(*VehicleStopEvents.filter(filtered, cast=True)) + + return valid def structure_stop_events(df: dy.DataFrame[VehicleStopEvents]) -> dy.DataFrame[StopEventsJSON]: """Structure flat table into StopEvents records.""" diff --git a/tests/flashback/test_events.py b/tests/flashback/test_events.py index 0f740600..2c529a16 100644 --- a/tests/flashback/test_events.py +++ b/tests/flashback/test_events.py @@ -195,3 +195,73 @@ def test_filter_stop_events( filtered = filter_stop_events(events, max_record_age) assert (filtered.height == 1) == should_pass + + +def test_aggregate_duration_with_new_records_incrementing_and_static(): + # id1: current_stop_sequence increments, current_status changes + # id2: current_stop_sequence static, current_status changes + events = [ + # id1: stop_sequence increments, status changes + { + "id": "id1", + "current_stop_sequence": 1, + "current_status": "STOPPED_AT", + "timestamp": 100, + }, + { + "id": "id1", + "current_stop_sequence": 2, + "current_status": "IN_TRANSIT_TO", + "timestamp": 110, + }, + { + "id": "id1", + "current_stop_sequence": 2, + "current_status": "STOPPED_AT", + "timestamp": 115, + }, + { + "id": "id1", + "current_stop_sequence": 2, + "current_status": "STOPPED_AT", + "timestamp": 120, + }, + { + "id": "id1", + "current_stop_sequence": 2, + "current_status": "STOPPED_AT", + "timestamp": 130, + }, + { + "id": "id1", + "current_stop_sequence": 3, + "current_status": "STOPPED_AT", + "timestamp": 140, + }, + { + "id": "id2", + "current_stop_sequence": 1, + "current_status": "STOPPED_AT", + "timestamp": 200, + }, + { + "id": "id2", + "current_stop_sequence": 1, + "current_status": "STOPPED_AT", + "timestamp": 210, + }, + { + "id": "id2", + "current_stop_sequence": 1, + "current_status": "STOPPED_AT", + "timestamp": 220, + }, + ] + df = pl.DataFrame(events) + # Split into existing and new for demonstration (first 3 as existing, last 3 as new) + existing_df = df.slice(0, 3) + new_df = df.slice(3, 3) + result = aggregate_duration_with_new_records(existing_df, new_df) + # Check that all events are present and grouped as expected + assert set(result["id"]) == {"id1", "id2"} + assert result.height >= 4 # At least one per unique (id, stop_sequence, status) From 5e3fb7fdaead51d7373fd0a05cf68705a8d58ea7 Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Wed, 25 Feb 2026 12:56:19 -0500 Subject: [PATCH 22/38] satisfy dataframely --- src/lamp_py/flashback/events.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/lamp_py/flashback/events.py b/src/lamp_py/flashback/events.py index a72f5de8..aceeef01 100644 --- a/src/lamp_py/flashback/events.py +++ b/src/lamp_py/flashback/events.py @@ -113,6 +113,9 @@ def vehicle_position_to_archive_events(vp: dy.DataFrame[VehiclePositions]) -> dy "stop_id", "current_stop_sequence", "current_status", + ).with_columns( + pl.lit(None).cast(pl.Int64).alias("status_start_timestamp"), + pl.lit(None).cast(pl.Int64).alias("status_end_timestamp"), ) valid = process_logger.log_dataframely_filter_results(*VehicleEvents.filter(events, cast=True)) From bdd9893af0d6c9df8b57f11cc7ef1bf025d5203c Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Wed, 25 Feb 2026 12:56:40 -0500 Subject: [PATCH 23/38] blakc --- src/lamp_py/flashback/events.py | 55 +++++++++++++++++++-------------- 1 file changed, 31 insertions(+), 24 deletions(-) diff --git a/src/lamp_py/flashback/events.py b/src/lamp_py/flashback/events.py index aceeef01..04196dcc 100644 --- a/src/lamp_py/flashback/events.py +++ b/src/lamp_py/flashback/events.py @@ -93,29 +93,33 @@ def vehicle_position_to_archive_events(vp: dy.DataFrame[VehiclePositions]) -> dy """ process_logger = ProcessLogger("vehicle_position_to_archive_events", input_rows=vp.height) process_logger.log_start() - events = vp.filter( - pl.col("current_stop_sequence").is_not_null(), - pl.col("trip_id").is_not_null(), - pl.col("timestamp").is_not_null(), - pl.col("route_id").is_not_null(), - pl.col("start_date").is_not_null(), - ).select( - pl.concat_str(pl.col("start_date"), pl.col("trip_id"), pl.col("route_id"), pl.col("id"), separator="-").alias( - "id" - ), - "timestamp", - "start_date", - "trip_id", - "direction_id", - "route_id", - "start_time", - "revenue", - "stop_id", - "current_stop_sequence", - "current_status", - ).with_columns( - pl.lit(None).cast(pl.Int64).alias("status_start_timestamp"), - pl.lit(None).cast(pl.Int64).alias("status_end_timestamp"), + events = ( + vp.filter( + pl.col("current_stop_sequence").is_not_null(), + pl.col("trip_id").is_not_null(), + pl.col("timestamp").is_not_null(), + pl.col("route_id").is_not_null(), + pl.col("start_date").is_not_null(), + ) + .select( + pl.concat_str( + pl.col("start_date"), pl.col("trip_id"), pl.col("route_id"), pl.col("id"), separator="-" + ).alias("id"), + "timestamp", + "start_date", + "trip_id", + "direction_id", + "route_id", + "start_time", + "revenue", + "stop_id", + "current_stop_sequence", + "current_status", + ) + .with_columns( + pl.lit(None).cast(pl.Int64).alias("status_start_timestamp"), + pl.lit(None).cast(pl.Int64).alias("status_end_timestamp"), + ) ) valid = process_logger.log_dataframely_filter_results(*VehicleEvents.filter(events, cast=True)) @@ -209,10 +213,13 @@ def filter_stop_events( .rename({"status_start_timestamp": "arrived", "status_end_timestamp": "departed"}) ) - valid = ProcessLogger("filter_stop_events").log_dataframely_filter_results(*VehicleStopEvents.filter(filtered, cast=True)) + valid = ProcessLogger("filter_stop_events").log_dataframely_filter_results( + *VehicleStopEvents.filter(filtered, cast=True) + ) return valid + def structure_stop_events(df: dy.DataFrame[VehicleStopEvents]) -> dy.DataFrame[StopEventsJSON]: """Structure flat table into StopEvents records.""" process_logger = ProcessLogger("structure_stop_events", input_rows=df.height) From 2ff7062d68b0f13592862349f5ba0e0c1e10840d Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Thu, 26 Feb 2026 08:45:18 -0500 Subject: [PATCH 24/38] add remote outputs for all_events, json, and parquet stop events --- src/lamp_py/runtime_utils/remote_files.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/lamp_py/runtime_utils/remote_files.py b/src/lamp_py/runtime_utils/remote_files.py index 84e1b3a7..26de413f 100644 --- a/src/lamp_py/runtime_utils/remote_files.py +++ b/src/lamp_py/runtime_utils/remote_files.py @@ -275,8 +275,20 @@ def parquet_path(self, year: Union[str, int], file: str) -> S3Location: prefix=os.path.join(LAMP, "gtfs_archive"), ) +vehicle_position_all_events = S3Location( + bucket=S3_ARCHIVE, + prefix=f"{LAMP}/stop_events/vehicle_position_all_events_v0.parquet", + version="0.1.0", +) + stop_events = S3Location( bucket=S3_ARCHIVE, - prefix=f"{LAMP}/stop_events/stop_events_v0.parquet", + prefix=f"{LAMP}/stop_events/stop_events_v1.parquet", + version="0.1.0", +) + +stop_events_json = S3Location( + bucket=S3_ARCHIVE, + prefix=f"{LAMP}/stop_events/stop_events_v1.json", version="0.1.0", ) From 48a68a78388ef3e384a9401c41ea37061189c4c2 Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Thu, 26 Feb 2026 08:45:34 -0500 Subject: [PATCH 25/38] add a test for aggregate_duration_with_new_records --- tests/flashback/test_events.py | 97 +++++++++++----------------------- 1 file changed, 31 insertions(+), 66 deletions(-) diff --git a/tests/flashback/test_events.py b/tests/flashback/test_events.py index 2c529a16..258cf03a 100644 --- a/tests/flashback/test_events.py +++ b/tests/flashback/test_events.py @@ -197,71 +197,36 @@ def test_filter_stop_events( assert (filtered.height == 1) == should_pass -def test_aggregate_duration_with_new_records_incrementing_and_static(): - # id1: current_stop_sequence increments, current_status changes - # id2: current_stop_sequence static, current_status changes - events = [ - # id1: stop_sequence increments, status changes - { - "id": "id1", - "current_stop_sequence": 1, - "current_status": "STOPPED_AT", - "timestamp": 100, - }, - { - "id": "id1", - "current_stop_sequence": 2, - "current_status": "IN_TRANSIT_TO", - "timestamp": 110, - }, - { - "id": "id1", - "current_stop_sequence": 2, - "current_status": "STOPPED_AT", - "timestamp": 115, - }, - { - "id": "id1", - "current_stop_sequence": 2, - "current_status": "STOPPED_AT", - "timestamp": 120, - }, - { - "id": "id1", - "current_stop_sequence": 2, - "current_status": "STOPPED_AT", - "timestamp": 130, - }, - { - "id": "id1", - "current_stop_sequence": 3, - "current_status": "STOPPED_AT", - "timestamp": 140, - }, - { - "id": "id2", - "current_stop_sequence": 1, - "current_status": "STOPPED_AT", - "timestamp": 200, - }, - { - "id": "id2", - "current_stop_sequence": 1, - "current_status": "STOPPED_AT", - "timestamp": 210, +def test_aggregate_duration_with_new_records(dy_gen: dy.random.Generator) -> None: + """ + Test that aggregate_duration_with_new_records correctly updates event durations. + + Creates initial vehicle events with different statuses and timestamps, then + adds new records to test that the aggregation properly calculates duration + based on the timestamp differences between events. + """ + + events = VehicleEvents.sample( + num_rows=2, + generator=dy_gen, + overrides={ + "id": ["id1", "id2"], + "current_stop_sequence": [1, 1], + "current_status": ["IN_TRANSIT_TO", "STOPPED_AT"], + "timestamp": [100, 200], }, - { - "id": "id2", - "current_stop_sequence": 1, - "current_status": "STOPPED_AT", - "timestamp": 220, + ) + + new_records = VehicleEvents.sample( + num_rows=2, + generator=dy_gen, + overrides={ + "id": ["id1", "id2"], + "current_stop_sequence": [1, 1], + "current_status": ["STOPPED_AT", "STOPPED_AT"], + "timestamp": [250, 250], }, - ] - df = pl.DataFrame(events) - # Split into existing and new for demonstration (first 3 as existing, last 3 as new) - existing_df = df.slice(0, 3) - new_df = df.slice(3, 3) - result = aggregate_duration_with_new_records(existing_df, new_df) - # Check that all events are present and grouped as expected - assert set(result["id"]) == {"id1", "id2"} - assert result.height >= 4 # At least one per unique (id, stop_sequence, status) + ) + aggregated = aggregate_duration_with_new_records(events, new_records) + + print(aggregated) From 994175a90ddcb8b461df64472ee6a36c3f1e1283 Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Thu, 26 Feb 2026 08:45:55 -0500 Subject: [PATCH 26/38] flatten position as well in vehicle position schema --- src/lamp_py/ingestion/convert_gtfs_rt.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/src/lamp_py/ingestion/convert_gtfs_rt.py b/src/lamp_py/ingestion/convert_gtfs_rt.py index 0e93a44b..e7ec9934 100644 --- a/src/lamp_py/ingestion/convert_gtfs_rt.py +++ b/src/lamp_py/ingestion/convert_gtfs_rt.py @@ -123,15 +123,11 @@ class VehiclePositions(dy.Schema): revenue = dy.Bool(nullable=True) last_trip = dy.Bool(nullable=True) schedule_relationship = dy.String(nullable=True) - # label = dy.String(nullable=True) - position = dy.Struct( - inner={ - "bearing": dy.UInt16(nullable=True), - "latitude": dy.Float64(nullable=True), - "longitude": dy.Float64(nullable=True), - "speed": dy.Float64(nullable=True), - } - ) + label = dy.String(nullable=True) # rename this? + bearing = dy.UInt16(nullable=True) + latitude = dy.Float64(nullable=True) + longitude = dy.Float64(nullable=True) + speed = dy.Float64(nullable=True) current_stop_sequence = dy.Int16(nullable=True) stop_id = dy.String(nullable=True) timestamp = dy.Int64(nullable=True) From d9ef090bf33aa8fbbc87611a60f0d8ead56b2d70 Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Thu, 26 Feb 2026 08:47:51 -0500 Subject: [PATCH 27/38] rename get_remote_events to get_remote_stop_events --- src/lamp_py/flashback/pipeline.py | 61 +++++++++++++++++++++---------- tests/flashback/test_io.py | 8 ++-- 2 files changed, 45 insertions(+), 24 deletions(-) diff --git a/src/lamp_py/flashback/pipeline.py b/src/lamp_py/flashback/pipeline.py index d3f61b56..02177484 100644 --- a/src/lamp_py/flashback/pipeline.py +++ b/src/lamp_py/flashback/pipeline.py @@ -7,58 +7,79 @@ from lamp_py.aws.ecs import handle_ecs_sigterm from lamp_py.flashback.events import ( - VehicleStopEvents, + VehicleEvents, filter_stop_events, structure_stop_events, aggregate_duration_with_new_records, vehicle_position_to_archive_events, ) -from lamp_py.flashback.io import get_remote_events, get_vehicle_positions +from lamp_py.flashback.io import get_remote_all_events, get_vehicle_positions from lamp_py.runtime_utils.env_validation import validate_environment from lamp_py.runtime_utils.process_logger import ProcessLogger -from lamp_py.runtime_utils.remote_files import stop_events as stop_events_location +from lamp_py.runtime_utils.remote_files import stop_events, vehicle_position_all_events, stop_events_json async def flashback( - remote_events: dy.DataFrame[VehicleStopEvents], + remote_events: dy.DataFrame[VehicleEvents], max_record_age: timedelta = timedelta(hours=2), - local_override: str | None = None, + local_override_path: str | None = None, ) -> None: """Fetch, process, and store stop events.""" all_events = remote_events + # vehicle_events - how to handle in transit events? filter out same timestamp, or aggregate somehow? + # average speed? + + # do i want to keep everything? hmm.. + while True: process_logger = ProcessLogger("flashback") process_logger.log_start() - # vehicle positions flattened, entire message + # raw, flat vehicle position new_records = await get_vehicle_positions() - # vehicle positions validated and filtered down to columns of interest - i.e. removed lat/lon, occupancy - # i.e. vehicle reporting STOPPED_AT at time timestamp + # add event_id, event duration columns new_events = vehicle_position_to_archive_events(new_records) - # consolidate records with same stop status and sequence - generate start/stop time for each status type - # single record for each event type, with new fields indicating duration of the event. - # i.e. vehicle STOPPED_AT for status_start_timestamp to status_end_timestamp. + # combine and update events compressed_events = aggregate_duration_with_new_records(all_events, new_events) - # generate flashback events for from stop records - # for flashback, only care about STOPPED_AT events - filter on those, and prepare structure for json export + # update all_events with the newly compressed events + all_events = all_events.update( # type: ignore[assignment] + compressed_events, on=["event_id", "current_stop_sequence", "current_status"], how="full" + ) + + # take only meaningful stop events for flashback compressed_stop_events = filter_stop_events(compressed_events, max_record_age) - output_path = local_override or stop_events_location.s3_uri - process_logger.add_metadata(write_path=output_path) + process_logger.add_metadata( + new_records=new_records.height, + compressed_events=compressed_events.height, + compressed_stop_events=compressed_stop_events.height, + ) - # convert to agreed upon json structure, and export - await asyncio.to_thread(lambda: structure_stop_events(compressed_stop_events).write_parquet(output_path)) + if local_override_path: + stop_events_uri = f"{local_override_path}/stop_events.parquet" + stop_events_json_uri = f"{local_override_path}/stop_events.ndjson" + all_events_uri = f"{local_override_path}/vehicle_position_all_events.parquet" + else: + stop_events_uri = stop_events.s3_uri + stop_events_json_uri = stop_events_json.s3_uri + all_events_uri = vehicle_position_all_events.s3_uri + + await asyncio.to_thread(lambda: structure_stop_events(compressed_stop_events).write_parquet(stop_events_uri)) + await asyncio.to_thread( + lambda: structure_stop_events(compressed_stop_events).write_ndjson(stop_events_json_uri) + ) + + await asyncio.to_thread(lambda: all_events.write_parquet(all_events_uri)) process_logger.log_complete() await asyncio.sleep(3) # wait before fetching new data - -def pipeline(local_override: str | None = None) -> None: +def pipeline(local_override_path: str | None = None) -> None: """Entry point for flashback stop events pipeline.""" process_logger = ProcessLogger("main") process_logger.log_start() @@ -74,4 +95,4 @@ def pipeline(local_override: str | None = None) -> None: ], ) - asyncio.run(flashback(get_remote_events(), local_override=local_override)) + asyncio.run(flashback(get_remote_all_events(), local_override_path=local_override_path)) diff --git a/tests/flashback/test_io.py b/tests/flashback/test_io.py index 3197e33c..d6e645c9 100644 --- a/tests/flashback/test_io.py +++ b/tests/flashback/test_io.py @@ -11,7 +11,7 @@ from aiohttp import ClientError from lamp_py.flashback.events import StopEventsJSON -from lamp_py.flashback.io import get_remote_events, get_vehicle_positions +from lamp_py.flashback.io import get_remote_stop_events, get_vehicle_positions from lamp_py.ingestion.convert_gtfs_rt import VehiclePositionsApiFormat from tests.test_resources import LocalS3Location @@ -56,9 +56,9 @@ def test_get_remote_events( if raise_network_error: # Simulate networking problems by patching scan_parquet to raise OSError with patch("polars.scan_parquet", side_effect=OSError("Network error")): - df = get_remote_events(test_location) + df = get_remote_stop_events(test_location) else: - df = get_remote_events(test_location) + df = get_remote_stop_events(test_location) assert (file_exists and not raise_network_error) == (df.height > 0) assert (not file_exists or raise_network_error) == (WARNING in [record[1] for record in caplog.record_tuples]) @@ -93,7 +93,7 @@ def test_invalid_remote_events_schema( **overrides ).write_parquet(test_location.s3_uri) with raises_error: - df = get_remote_events(test_location) + df = get_remote_stop_events(test_location) assert df.height >= expected_valid_records assert raise_warning == (WARNING in [record[1] for record in caplog.record_tuples]) From 04335f5ac2ef66d39638166efc031e07fe37228b Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Thu, 26 Feb 2026 08:48:27 -0500 Subject: [PATCH 28/38] add get_remote_all_events function for processing via all events --- src/lamp_py/flashback/io.py | 39 ++++++++++++++++++++++++++++++++----- 1 file changed, 34 insertions(+), 5 deletions(-) diff --git a/src/lamp_py/flashback/io.py b/src/lamp_py/flashback/io.py index ae4b3d7e..ecba208d 100644 --- a/src/lamp_py/flashback/io.py +++ b/src/lamp_py/flashback/io.py @@ -6,18 +6,43 @@ from lamp_py.flashback.events import ( StopEventsJSON, + VehicleEvents, VehicleStopEvents, unnest_vehicle_positions, ) from lamp_py.ingestion.convert_gtfs_rt import VehiclePositions, VehiclePositionsApiFormat from lamp_py.runtime_utils.process_logger import ProcessLogger -from lamp_py.runtime_utils.remote_files import S3Location -from lamp_py.runtime_utils.remote_files import stop_events as stop_events_location +from lamp_py.runtime_utils.remote_files import S3Location, vehicle_position_all_events, stop_events -def get_remote_events(location: S3Location = stop_events_location) -> dy.DataFrame[VehicleStopEvents]: +def get_remote_all_events(location: S3Location = vehicle_position_all_events) -> dy.DataFrame[VehicleEvents]: + """Fetch existing events from S3.""" + process_logger = ProcessLogger("get_remote_all_events") + process_logger.log_start() + try: + remote_events = process_logger.log_dataframely_filter_results( + *VehicleEvents.filter(pl.scan_parquet(location.s3_uri), cast=True) + ) + + existing_events = VehicleEvents.cast( + pl.concat( + [VehicleEvents.create_empty(), remote_events], + how="diagonal", + ) + ) + + except OSError as e: + process_logger.log_warning(e) + existing_events = VehicleEvents.create_empty() + + process_logger.log_complete() + + return existing_events + + +def get_remote_stop_events(location: S3Location = stop_events) -> dy.DataFrame[VehicleStopEvents]: """Fetch existing stop events from S3.""" - process_logger = ProcessLogger("get_remote_events") + process_logger = ProcessLogger("get_remote_stop_events") process_logger.log_start() try: remote_events = process_logger.log_dataframely_filter_results( @@ -60,11 +85,15 @@ async def get_vehicle_positions( data = await response.read() break except ClientError as e: - process_logger.log_failure(e) if attempt == max_retries: + process_logger.log_failure(e) raise ClientError(f"Maximum retries ({max_retries}) exceeded") from e sleep(sleep_interval) + except Exception as e: + process_logger.log_failure(e) + raise + vehicle_positions = pl.read_ndjson(data, schema=VehiclePositionsApiFormat.to_polars_schema()) valid = process_logger.log_dataframely_filter_results(*VehiclePositionsApiFormat.filter(vehicle_positions)) From 93714190578a4634347891408652784f9eff793d Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Thu, 26 Feb 2026 08:49:45 -0500 Subject: [PATCH 29/38] vehicle events inherits from vehicle positions, which is now the full vehicle position schema flattened. this is for future work with vehicle positions, where we don't lose any context or columns. --- src/lamp_py/flashback/events.py | 69 +++++++++++++-------------------- 1 file changed, 28 insertions(+), 41 deletions(-) diff --git a/src/lamp_py/flashback/events.py b/src/lamp_py/flashback/events.py index 04196dcc..f1e7e46a 100644 --- a/src/lamp_py/flashback/events.py +++ b/src/lamp_py/flashback/events.py @@ -8,20 +8,10 @@ from lamp_py.runtime_utils.process_logger import ProcessLogger -class VehicleEvents(dy.Schema): +class VehicleEvents(VehiclePositions): """Vehicle Position raw events to be de-duplicated into actual events""" - id = dy.String(primary_key=True) # start_date-trip-route-vehicle - timestamp = dy.Int64() - start_date = dy.String(nullable=False) - trip_id = dy.String() - direction_id = dy.Int8(min=0, max=1, nullable=True) - route_id = dy.String() - start_time = dy.String(nullable=True) - revenue = dy.Bool(nullable=True) - stop_id = dy.String(nullable=False) - current_stop_sequence = dy.Int16(primary_key=True) - current_status = dy.String(nullable=True) + event_id = dy.String(primary_key=True) # start_date-trip-route-vehicle status_start_timestamp = dy.Int64(nullable=True) status_end_timestamp = dy.Int64(nullable=True) @@ -72,7 +62,19 @@ def unnest_vehicle_positions(vp: dy.DataFrame[VehiclePositionsApiFormat]) -> dy. """Unnest VehiclePositions data into flat table.""" process_logger = ProcessLogger("unnest_vehicle_positions", input_rows=vp.height) process_logger.log_start() - vehicle_positions = vp.select("entity").explode("entity").unnest("entity").unnest("vehicle").unnest("trip") + + # it is what it is. note: the struct "vehicle" appears twice. + # the first is a catch all, the 2nd is vehicle_id and vehicle_label. + vehicle_positions = ( + vp.select("entity") + .explode("entity") + .unnest("entity") + .unnest("vehicle") + .unnest("trip") + .rename({"id": "entity_id"}) + .unnest("vehicle") + .unnest("position") + ) valid = process_logger.log_dataframely_filter_results(*VehiclePositions.filter(vehicle_positions, cast=True)) @@ -93,33 +95,18 @@ def vehicle_position_to_archive_events(vp: dy.DataFrame[VehiclePositions]) -> dy """ process_logger = ProcessLogger("vehicle_position_to_archive_events", input_rows=vp.height) process_logger.log_start() - events = ( - vp.filter( - pl.col("current_stop_sequence").is_not_null(), - pl.col("trip_id").is_not_null(), - pl.col("timestamp").is_not_null(), - pl.col("route_id").is_not_null(), - pl.col("start_date").is_not_null(), - ) - .select( - pl.concat_str( - pl.col("start_date"), pl.col("trip_id"), pl.col("route_id"), pl.col("id"), separator="-" - ).alias("id"), - "timestamp", - "start_date", - "trip_id", - "direction_id", - "route_id", - "start_time", - "revenue", - "stop_id", - "current_stop_sequence", - "current_status", - ) - .with_columns( - pl.lit(None).cast(pl.Int64).alias("status_start_timestamp"), - pl.lit(None).cast(pl.Int64).alias("status_end_timestamp"), - ) + events = vp.filter( + pl.col("current_stop_sequence").is_not_null(), + pl.col("trip_id").is_not_null(), + pl.col("timestamp").is_not_null(), + pl.col("route_id").is_not_null(), + pl.col("start_date").is_not_null(), + ).with_columns( + pl.concat_str(pl.col("start_date"), pl.col("trip_id"), pl.col("route_id"), pl.col("id"), separator="-").alias( + "event_id" + ), + pl.lit(None).cast(pl.Int64).alias("status_start_timestamp"), + pl.lit(None).cast(pl.Int64).alias("status_end_timestamp"), ) valid = process_logger.log_dataframely_filter_results(*VehicleEvents.filter(events, cast=True)) @@ -130,7 +117,7 @@ def vehicle_position_to_archive_events(vp: dy.DataFrame[VehiclePositions]) -> dy def aggregate_duration_with_new_records( - existing_records: dy.DataFrame[VehicleStopEvents], + existing_records: dy.DataFrame[VehicleEvents], new_records: dy.DataFrame[VehicleEvents], ) -> dy.DataFrame[VehicleEvents]: """ From 07990804f0c1c5ecca77a97534e0d3c2ee7c3e23 Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Thu, 26 Feb 2026 08:50:03 -0500 Subject: [PATCH 30/38] simplify the aggregation - fewer LOC --- src/lamp_py/flashback/events.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/lamp_py/flashback/events.py b/src/lamp_py/flashback/events.py index f1e7e46a..d7422c69 100644 --- a/src/lamp_py/flashback/events.py +++ b/src/lamp_py/flashback/events.py @@ -145,26 +145,27 @@ def aggregate_duration_with_new_records( existing_records=existing_records.height, ) process_logger.log_start() - all_events = pl.concat([existing_records, new_records], how="diagonal") + # grab only the records that are still getting updates + existing_merge_records = existing_records.filter(pl.col("event_id").is_in(new_records["event_id"].unique())) + all_events = pl.concat([existing_merge_records, new_records], how="diagonal") + + # for all records at a current stop sequence and status, calculate the start and end times of that status combined = ( all_events.sort(by="timestamp") - .group_by("id", "current_stop_sequence", "current_status") + .group_by("event_id", "current_stop_sequence", "current_status") .agg( [ pl.first("timestamp").alias("status_start_timestamp"), pl.when(pl.first("timestamp").ne(pl.last("timestamp"))).then( pl.last("timestamp").alias("status_end_timestamp") ), + pl.all().exclude("status_start_timestamp", "status_end_timestamp").last(), + # keep the rest of the columns of the most recent one. ] ) - ).join( - all_events.select( - ["id", "direction_id", "revenue", "route_id", "start_date", "start_time", "stop_id", "timestamp", "trip_id"] - ), - on="id", - how="left", ) + valid = process_logger.log_dataframely_filter_results(*VehicleEvents.filter(combined, cast=True)) process_logger.add_metadata(new_records=new_records.height, updated_records=combined.height) From 8e3b9aad70e9570fb3304b56ab21569b6d8ad533 Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Thu, 26 Feb 2026 08:50:35 -0500 Subject: [PATCH 31/38] change flashback runner to pass in a directory base path instead of a single override --- runners/flashback.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/flashback.py b/runners/flashback.py index a55b1a9f..4c4a1ed2 100644 --- a/runners/flashback.py +++ b/runners/flashback.py @@ -1,3 +1,3 @@ from lamp_py.flashback.pipeline import pipeline -pipeline(local_override="local_stop_events.parquet") +pipeline(local_override_path="/tmp/flashback/") From c6ffb798e47fa6cbbacbb72db725b2f7ed450d0d Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Thu, 26 Feb 2026 08:50:40 -0500 Subject: [PATCH 32/38] black --- src/lamp_py/flashback/pipeline.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/lamp_py/flashback/pipeline.py b/src/lamp_py/flashback/pipeline.py index 02177484..babeff91 100644 --- a/src/lamp_py/flashback/pipeline.py +++ b/src/lamp_py/flashback/pipeline.py @@ -79,6 +79,7 @@ async def flashback( await asyncio.sleep(3) # wait before fetching new data + def pipeline(local_override_path: str | None = None) -> None: """Entry point for flashback stop events pipeline.""" process_logger = ProcessLogger("main") From 6014854f31aa1f8358fc61bb2dc07f9a69a3a640 Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Thu, 26 Feb 2026 16:29:38 -0500 Subject: [PATCH 33/38] add more output for warning logging --- src/lamp_py/runtime_utils/process_logger.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/lamp_py/runtime_utils/process_logger.py b/src/lamp_py/runtime_utils/process_logger.py index 8138c29f..728bacd0 100644 --- a/src/lamp_py/runtime_utils/process_logger.py +++ b/src/lamp_py/runtime_utils/process_logger.py @@ -149,10 +149,12 @@ def log_warning(self, exception: Exception) -> None: duration = time.monotonic() - self.start_time self.default_data["status"] = "warned" self.default_data["duration"] = f"{duration:.2f}" + self.default_data["error_type"] = type(exception).__name__ for tb in traceback.format_exception_only(exception): for line in tb.strip("\n").split("\n"): logging.warning(f"uuid={self.default_data["uuid"]}, {line.strip('\n')}") + logging.warning(self._get_log_string()) def log_dataframely_filter_results( self, valid: dy.DataFrame, invalid: dy.FailureInfo, log_level: Optional[int] = logging.WARNING From 5b32bee7d536d43478cde04b221a79488c12c775 Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Thu, 26 Feb 2026 16:30:33 -0500 Subject: [PATCH 34/38] update id to event_id to differentiate from existing id fields in input messages --- src/lamp_py/flashback/events.py | 12 ++-- src/lamp_py/ingestion/convert_gtfs_rt.py | 3 +- tests/flashback/test_events.py | 70 +++++------------------- tests/flashback/test_io.py | 14 +++-- 4 files changed, 30 insertions(+), 69 deletions(-) diff --git a/src/lamp_py/flashback/events.py b/src/lamp_py/flashback/events.py index d7422c69..ea27a996 100644 --- a/src/lamp_py/flashback/events.py +++ b/src/lamp_py/flashback/events.py @@ -19,7 +19,7 @@ class VehicleEvents(VehiclePositions): class VehicleStopEvents(dy.Schema): """Vehicle Position raw events to be de-duplicated into actual events""" - id = dy.String(primary_key=True) # start_date-trip-route-vehicle + event_id = dy.String(primary_key=True) # start_date-trip-route-vehicle timestamp = dy.Int64() start_date = dy.String(nullable=False) trip_id = VehicleEvents.trip_id @@ -28,7 +28,7 @@ class VehicleStopEvents(dy.Schema): start_time = VehicleEvents.start_time revenue = VehicleEvents.revenue stop_id = VehicleEvents.stop_id - current_stop_sequence = VehicleEvents.current_stop_sequence + current_stop_sequence = dy.Int16(primary_key=True) # remove current status # renamed status start and stop to arrival and departure for stop events schema arrived = VehicleEvents.status_start_timestamp @@ -38,7 +38,7 @@ class VehicleStopEvents(dy.Schema): class StopEventsJSON(dy.Schema): """Pre-serialized stop events for trips.""" - id = dy.String(primary_key=True) + event_id = dy.String(primary_key=True) timestamp = dy.Int64() start_date = VehicleStopEvents.start_date trip_id = VehicleStopEvents.trip_id @@ -73,6 +73,8 @@ def unnest_vehicle_positions(vp: dy.DataFrame[VehiclePositionsApiFormat]) -> dy. .unnest("trip") .rename({"id": "entity_id"}) .unnest("vehicle") + .rename({"id": "vehicle_id", "label": "vehicle_label"}) + .rename({"entity_id": "id"}) .unnest("position") ) @@ -197,7 +199,7 @@ def filter_stop_events( ) # remove records that are older than max_record_age - flashback usecase only requires max_record_age history ) .drop("current_status") - .sort("id", "current_stop_sequence") + .sort("event_id", "current_stop_sequence") .rename({"status_start_timestamp": "arrived", "status_end_timestamp": "departed"}) ) @@ -211,7 +213,7 @@ def filter_stop_events( def structure_stop_events(df: dy.DataFrame[VehicleStopEvents]) -> dy.DataFrame[StopEventsJSON]: """Structure flat table into StopEvents records.""" process_logger = ProcessLogger("structure_stop_events", input_rows=df.height) - stop_events = df.group_by("id").agg( + stop_events = df.group_by("event_id").agg( pl.max("timestamp").alias("timestamp"), pl.selectors.by_name("start_date", "trip_id", "direction_id", "route_id", "start_time", "revenue").first(), pl.struct("stop_id", "current_stop_sequence", "arrived", "departed").alias("stop_events"), diff --git a/src/lamp_py/ingestion/convert_gtfs_rt.py b/src/lamp_py/ingestion/convert_gtfs_rt.py index e7ec9934..34ce5d35 100644 --- a/src/lamp_py/ingestion/convert_gtfs_rt.py +++ b/src/lamp_py/ingestion/convert_gtfs_rt.py @@ -123,7 +123,8 @@ class VehiclePositions(dy.Schema): revenue = dy.Bool(nullable=True) last_trip = dy.Bool(nullable=True) schedule_relationship = dy.String(nullable=True) - label = dy.String(nullable=True) # rename this? + vehicle_id = dy.String(nullable=True) + vehicle_label = dy.String(nullable=True) # rename this? bearing = dy.UInt16(nullable=True) latitude = dy.Float64(nullable=True) longitude = dy.Float64(nullable=True) diff --git a/tests/flashback/test_events.py b/tests/flashback/test_events.py index 258cf03a..2472df61 100644 --- a/tests/flashback/test_events.py +++ b/tests/flashback/test_events.py @@ -1,5 +1,5 @@ import time -from datetime import datetime, timedelta +from datetime import datetime import dataframely as dy import polars as pl @@ -13,7 +13,6 @@ unnest_vehicle_positions, ) from lamp_py.ingestion.convert_gtfs_rt import VehiclePositionsApiFormat -from lamp_py.flashback.events import filter_stop_events @pytest.mark.parametrize( @@ -67,7 +66,10 @@ "route_id": "red", }, "position": {}, - "vehicle": {}, + "vehicle": { + "id": "vehicle_1234", + "label": "Bus 1234", + }, "stop_id": "123", "current_stop_sequence": 5, "timestamp": 1700000000, @@ -135,7 +137,14 @@ def test_performance_update_records(dy_gen: dy.random.Generator, num_rows: int = def test_structure_stop_events(dy_gen: dy.random.Generator) -> None: """It correctly chooses the most recent timestamp and the first trip in the id.""" events_df = VehicleStopEvents.sample( - num_rows=2, generator=dy_gen, overrides={"id": "foo", "timestamp": [1, 2], "route_id": ["red", "blue"]} + num_rows=2, + generator=dy_gen, + overrides={ + "event_id": "foo", + "timestamp": [1, 2], + "route_id": ["red", "blue"], + "start_date": ["20231010", "20231011"], + }, ) events_json = structure_stop_events(events_df) assert events_json.row(0)[1] == 2 @@ -144,59 +153,6 @@ def test_structure_stop_events(dy_gen: dy.random.Generator) -> None: ) == events_json.select("start_date", "trip_id", "direction_id", "route_id", "start_time", "revenue").row(0) -@pytest.mark.parametrize( - [ - "current_status", - "status_start_timestamp", - "status_end_timestamp", - "timestamp", - "should_pass", - ], - [ - ("STOPPED_AT", 2_000_000_000, 2_000_000_000 + 1, int(time.time()), True), - ("IN_TRANSIT_TO", 2_000_000_000, 2_000_000_000 + 1, int(time.time()), False), - ("STOPPED_AT", None, None, int(time.time()), False), - ("STOPPED_AT", 2_000_000_000, None, int(time.time()), True), - ("STOPPED_AT", None, 2_000_000_000 + 1, int(time.time()), True), - ("STOPPED_AT", 2_000_000_000, 2_000_000_000 + 1, int(time.time() - 86400 * 8), False), - ], - ids=[ - "valid-stopped-at-event", - "wrong-status", - "null-timestamps", - "null-start-only", - "null-end-only", - "old-record-outside-max-age", - ], -) -# pylint: disable=too-many-arguments,too-many-positional-arguments -def test_filter_stop_events( - dy_gen: dy.random.Generator, - current_status: str, - status_start_timestamp: int | None, - status_end_timestamp: int | None, - timestamp: int, - should_pass: bool, -) -> None: - """It correctly filters stop events by status, timestamps, and age.""" - - events = VehicleEvents.sample( - num_rows=1, - generator=dy_gen, - overrides={ - "current_status": current_status, - "status_start_timestamp": status_start_timestamp, - "status_end_timestamp": status_end_timestamp, - "timestamp": timestamp, - }, - ) - - max_record_age = timedelta(days=7) - filtered = filter_stop_events(events, max_record_age) - - assert (filtered.height == 1) == should_pass - - def test_aggregate_duration_with_new_records(dy_gen: dy.random.Generator) -> None: """ Test that aggregate_duration_with_new_records correctly updates event durations. diff --git a/tests/flashback/test_io.py b/tests/flashback/test_io.py index d6e645c9..25d3bfba 100644 --- a/tests/flashback/test_io.py +++ b/tests/flashback/test_io.py @@ -12,7 +12,7 @@ from lamp_py.flashback.events import StopEventsJSON from lamp_py.flashback.io import get_remote_stop_events, get_vehicle_positions -from lamp_py.ingestion.convert_gtfs_rt import VehiclePositionsApiFormat +from lamp_py.ingestion.convert_gtfs_rt import VehiclePositions, VehiclePositionsApiFormat from tests.test_resources import LocalS3Location @@ -67,10 +67,10 @@ def test_get_remote_events( @pytest.mark.parametrize( ["overrides", "expected_valid_records", "raise_warning", "raises_error"], [ - ({"id": pl.lit("1")}, 0, True, nullcontext()), - ({"id": pl.col("id")}, 3, False, nullcontext()), - ({"id": pl.Series(values=["1", "1", "2"])}, 1, True, nullcontext()), - pytest.param({"id": pl.col("id").implode()}, 0, False, pytest.raises(pl.exceptions.PolarsError)), + ({"event_id": pl.lit("1")}, 0, True, nullcontext()), + ({"event_id": pl.col("event_id")}, 3, False, nullcontext()), + ({"event_id": pl.Series(values=["1", "1", "2"])}, 1, True, nullcontext()), + pytest.param({"event_id": pl.col("event_id").implode()}, 0, False, pytest.raises(pl.exceptions.PolarsError)), ], ids=["all-invalid", "all-valid", "1-valid", "wrong-schema"], ) @@ -142,4 +142,6 @@ async def test_get_vehicle_positions( # Check that failures were logged (status=failed appears in log message) assert "ClientError" in caplog.text failure_logs = [record for record in caplog.record_tuples if "status=failed" in record[2]] - assert len(failure_logs) == num_failures + warn_logs = [record for record in caplog.record_tuples if "status=warned" in record[2]] + + assert len(failure_logs) + len(warn_logs) == num_failures From 41d4324b3935379b9238148780dca2c625182def Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Thu, 26 Feb 2026 16:30:58 -0500 Subject: [PATCH 35/38] raise a warning when we get client error, but haven't hit max_retries number of client errors --- src/lamp_py/flashback/io.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/lamp_py/flashback/io.py b/src/lamp_py/flashback/io.py index ecba208d..796dff26 100644 --- a/src/lamp_py/flashback/io.py +++ b/src/lamp_py/flashback/io.py @@ -85,6 +85,7 @@ async def get_vehicle_positions( data = await response.read() break except ClientError as e: + process_logger.log_warning(e) if attempt == max_retries: process_logger.log_failure(e) raise ClientError(f"Maximum retries ({max_retries}) exceeded") from e From e6b476d69ebd129ca902bdb9b124dd4e7faecde6 Mon Sep 17 00:00:00 2001 From: Henry Huang <1400827+huangh@users.noreply.github.com> Date: Thu, 26 Feb 2026 16:35:42 -0500 Subject: [PATCH 36/38] lint --- tests/flashback/test_io.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/flashback/test_io.py b/tests/flashback/test_io.py index 25d3bfba..5c95a676 100644 --- a/tests/flashback/test_io.py +++ b/tests/flashback/test_io.py @@ -12,7 +12,7 @@ from lamp_py.flashback.events import StopEventsJSON from lamp_py.flashback.io import get_remote_stop_events, get_vehicle_positions -from lamp_py.ingestion.convert_gtfs_rt import VehiclePositions, VehiclePositionsApiFormat +from lamp_py.ingestion.convert_gtfs_rt import VehiclePositionsApiFormat from tests.test_resources import LocalS3Location From 627af8afd9a955f72fd2e9c4e949dde7fc5f442f Mon Sep 17 00:00:00 2001 From: Henry <1400827+huangh@users.noreply.github.com> Date: Fri, 27 Feb 2026 16:08:04 -0500 Subject: [PATCH 37/38] Update src/lamp_py/flashback/pipeline.py Co-authored-by: Corey Runkel <39202587+runkelcorey@users.noreply.github.com> --- src/lamp_py/flashback/pipeline.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/lamp_py/flashback/pipeline.py b/src/lamp_py/flashback/pipeline.py index babeff91..75d2ff7a 100644 --- a/src/lamp_py/flashback/pipeline.py +++ b/src/lamp_py/flashback/pipeline.py @@ -27,10 +27,6 @@ async def flashback( """Fetch, process, and store stop events.""" all_events = remote_events - # vehicle_events - how to handle in transit events? filter out same timestamp, or aggregate somehow? - # average speed? - - # do i want to keep everything? hmm.. while True: process_logger = ProcessLogger("flashback") From 9d8de95c78830edd17653f3a2e303d965c28f24a Mon Sep 17 00:00:00 2001 From: Henry <1400827+huangh@users.noreply.github.com> Date: Fri, 27 Feb 2026 16:08:21 -0500 Subject: [PATCH 38/38] Update src/lamp_py/flashback/events.py comment Co-authored-by: Corey Runkel <39202587+runkelcorey@users.noreply.github.com> --- src/lamp_py/flashback/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lamp_py/flashback/events.py b/src/lamp_py/flashback/events.py index ea27a996..c49591bd 100644 --- a/src/lamp_py/flashback/events.py +++ b/src/lamp_py/flashback/events.py @@ -127,7 +127,7 @@ def aggregate_duration_with_new_records( Merges existing and new stop event records, groups them by vehicle ID and stop sequence, and calculates the timestamp when each status began and ended. Returns - only records that pass StopEventsWithStatus validation. + only records that pass VehicleEvents validation. Args: existing_records: DataFrame of previously processed stop events with status information.