Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
b1e6dbd
unnest function is doing too much - split into two methods for single…
huangh Feb 19, 2026
7da5a1a
wip write out both new schema and old schema for vehicle positions to…
huangh Feb 19, 2026
1b399c9
rename service name to be flashback something, not ingestion
huangh Feb 19, 2026
b6e01f9
split up wire API format with internal representation of a vehicle po…
huangh Feb 19, 2026
dbc5ebf
added refactor of vehicle position processing - split aggregate and f…
huangh Feb 25, 2026
5952f7c
remove write_stop_events - function can be consolidated in lambda
huangh Feb 25, 2026
f230a96
add local_override for flashback method for local output and testing.…
huangh Feb 25, 2026
bac8c5b
temp handle function name changes
huangh Feb 25, 2026
b09ebb9
black
huangh Feb 25, 2026
e69569f
allow pipeline to pass in local override argument
huangh Feb 25, 2026
7270a4d
cleanup hierarchy of schemas. remove unused methods
huangh Feb 25, 2026
eac5eb4
finish refactor by removing obsoleted methods, and clean up fallout
huangh Feb 25, 2026
f0e6161
unused variable
huangh Feb 25, 2026
bbc8160
add some comments
huangh Feb 25, 2026
ede9785
formatting
huangh Feb 25, 2026
1073f3a
fix docstring
huangh Feb 25, 2026
70011bc
fix tests for new changes
huangh Feb 25, 2026
631b549
black
huangh Feb 25, 2026
807c7e5
cleanup tests to pass. remove tests that don't test useful functionality
huangh Feb 25, 2026
bf003af
black
huangh Feb 25, 2026
2e20441
types
huangh Feb 25, 2026
5e3fb7f
satisfy dataframely
huangh Feb 25, 2026
bdd9893
blakc
huangh Feb 25, 2026
2ff7062
add remote outputs for all_events, json, and parquet stop events
huangh Feb 26, 2026
48a68a7
add a test for aggregate_duration_with_new_records
huangh Feb 26, 2026
994175a
flatten position as well in vehicle position schema
huangh Feb 26, 2026
d9ef090
rename get_remote_events to get_remote_stop_events
huangh Feb 26, 2026
04335f5
add get_remote_all_events function for processing via all events
huangh Feb 26, 2026
9371419
vehicle events inherits from vehicle positions, which is now the full…
huangh Feb 26, 2026
0799080
simplify the aggregation - fewer LOC
huangh Feb 26, 2026
8e3b9aa
change flashback runner to pass in a directory base path instead of a…
huangh Feb 26, 2026
c6ffb79
black
huangh Feb 26, 2026
6014854
add more output for warning logging
huangh Feb 26, 2026
5b32bee
update id to event_id to differentiate from existing id fields in inp…
huangh Feb 26, 2026
41d4324
raise a warning when we get client error, but haven't hit max_retries…
huangh Feb 26, 2026
e6b476d
lint
huangh Feb 26, 2026
627af8a
Update src/lamp_py/flashback/pipeline.py
huangh Feb 27, 2026
9d8de95
Update src/lamp_py/flashback/events.py
huangh Feb 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion runners/flashback.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from lamp_py.flashback.pipeline import pipeline

pipeline()
pipeline(local_override_path="/tmp/flashback/")
259 changes: 156 additions & 103 deletions src/lamp_py/flashback/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,151 +4,171 @@
import dataframely as dy
import polars as pl

from lamp_py.ingestion.convert_gtfs_rt import VehiclePositions
from lamp_py.ingestion.convert_gtfs_rt import VehiclePositions, VehiclePositionsApiFormat
from lamp_py.runtime_utils.process_logger import ProcessLogger


class StopEventsTable(dy.Schema):
"""Flat events data, with additional information for determining stop departures."""
class VehicleEvents(VehiclePositions):
"""Vehicle Position raw events to be de-duplicated into actual events"""

id = dy.String(primary_key=True) # trip-route-vehicle
event_id = dy.String(primary_key=True) # start_date-trip-route-vehicle
status_start_timestamp = dy.Int64(nullable=True)
status_end_timestamp = dy.Int64(nullable=True)
Comment thread
runkelcorey marked this conversation as resolved.


class VehicleStopEvents(dy.Schema):
"""Vehicle Position raw events to be de-duplicated into actual events"""

event_id = dy.String(primary_key=True) # start_date-trip-route-vehicle
timestamp = dy.Int64()
start_date = dy.String(nullable=True)
trip_id = dy.String()
direction_id = dy.Int8(min=0, max=1, nullable=True)
route_id = dy.String()
start_time = dy.String(nullable=True)
revenue = dy.Bool(nullable=True)
stop_id = dy.String(nullable=False)
start_date = dy.String(nullable=False)
trip_id = VehicleEvents.trip_id
direction_id = VehicleEvents.direction_id
route_id = VehicleEvents.route_id
start_time = VehicleEvents.start_time
revenue = VehicleEvents.revenue
stop_id = VehicleEvents.stop_id
current_stop_sequence = dy.Int16(primary_key=True)
arrived = dy.Int64(nullable=True)
departed = dy.Int64(nullable=True)
latest_stopped_timestamp = dy.Int64(nullable=True)
# remove current status
# renamed status start and stop to arrival and departure for stop events schema
Comment thread
runkelcorey marked this conversation as resolved.
arrived = VehicleEvents.status_start_timestamp
departed = VehicleEvents.status_end_timestamp


class StopEventsJSON(dy.Schema):
"""Pre-serialized stop events for trips."""

id = dy.String(primary_key=True)
event_id = dy.String(primary_key=True)
timestamp = dy.Int64()
start_date = StopEventsTable.start_date
trip_id = StopEventsTable.trip_id
direction_id = StopEventsTable.direction_id
route_id = StopEventsTable.route_id
start_time = StopEventsTable.start_time
revenue = StopEventsTable.revenue
start_date = VehicleStopEvents.start_date
trip_id = VehicleStopEvents.trip_id
direction_id = VehicleStopEvents.direction_id
route_id = VehicleStopEvents.route_id
start_time = VehicleStopEvents.start_time
revenue = VehicleStopEvents.revenue
stop_events = dy.List(
dy.Struct(
inner={
"stop_id": StopEventsTable.stop_id,
"current_stop_sequence": dy.Int16(),
"arrived": StopEventsTable.arrived,
"departed": StopEventsTable.departed,
"stop_id": VehicleStopEvents.stop_id,
"current_stop_sequence": VehicleStopEvents.current_stop_sequence,
"arrived": VehicleStopEvents.arrived,
"departed": VehicleStopEvents.departed,
}
)
)


def unnest_vehicle_positions(vp: dy.DataFrame[VehiclePositions]) -> dy.DataFrame[StopEventsTable]:
def unnest_vehicle_positions(vp: dy.DataFrame[VehiclePositionsApiFormat]) -> dy.DataFrame[VehiclePositions]:
"""Unnest VehiclePositions data into flat table."""
process_logger = ProcessLogger("unnest_vehicle_positions", input_rows=vp.height)
process_logger.log_start()
events = (

# it is what it is. note: the struct "vehicle" appears twice.
# the first is a catch all, the 2nd is vehicle_id and vehicle_label.
vehicle_positions = (
vp.select("entity")
.explode("entity")
.unnest("entity")
.unnest("vehicle")
.unnest("trip")
.filter(
pl.col("current_stop_sequence").is_not_null(),
pl.col("trip_id").is_not_null(),
pl.col("timestamp").is_not_null(),
pl.col("route_id").is_not_null(),
)
.select(
pl.concat_str(pl.col("trip_id"), pl.col("route_id"), pl.col("id"), separator="-").alias("id"),
"timestamp",
"start_date",
"trip_id",
"direction_id",
"route_id",
"start_time",
"revenue",
"stop_id",
"current_stop_sequence",
pl.when(pl.col("current_status").eq("STOPPED_AT")).then(pl.col("timestamp")).alias("arrived"),
pl.lit(None).alias("departed"), # for schema adherence
pl.when(pl.col("current_status").eq("STOPPED_AT"))
.then(pl.col("timestamp"))
.alias("latest_stopped_timestamp"),
)
.rename({"id": "entity_id"})
.unnest("vehicle")
.rename({"id": "vehicle_id", "label": "vehicle_label"})
.rename({"entity_id": "id"})
.unnest("position")
)

valid = process_logger.log_dataframely_filter_results(*StopEventsTable.filter(events, cast=True))
valid = process_logger.log_dataframely_filter_results(*VehiclePositions.filter(vehicle_positions, cast=True))

process_logger.log_complete()

return valid


def update_records(
existing_records: dy.DataFrame[StopEventsTable],
new_records: dy.DataFrame[StopEventsTable],
max_record_age: timedelta,
) -> dy.DataFrame[StopEventsTable]:
"""Return a DataFrame of recent stops using VehiclePositions."""
def vehicle_position_to_archive_events(vp: dy.DataFrame[VehiclePositions]) -> dy.DataFrame[VehicleEvents]:
"""
Convert VehiclePositions data into VehicleEvents format.

Filters vehicle position records to include only those with valid stop sequences,
trip IDs, timestamps, and route IDs. Generates a composite ID from start_date,
trip_id, route_id, and vehicle id, then selects relevant columns for event archival.

Start_date is required to have a unique identifier across days, as all other identifiers are reusable.
"""
process_logger = ProcessLogger("vehicle_position_to_archive_events", input_rows=vp.height)
process_logger.log_start()
events = vp.filter(
pl.col("current_stop_sequence").is_not_null(),
pl.col("trip_id").is_not_null(),
pl.col("timestamp").is_not_null(),
pl.col("route_id").is_not_null(),
pl.col("start_date").is_not_null(),
).with_columns(
pl.concat_str(pl.col("start_date"), pl.col("trip_id"), pl.col("route_id"), pl.col("id"), separator="-").alias(
"event_id"
),
pl.lit(None).cast(pl.Int64).alias("status_start_timestamp"),
pl.lit(None).cast(pl.Int64).alias("status_end_timestamp"),
)

valid = process_logger.log_dataframely_filter_results(*VehicleEvents.filter(events, cast=True))

process_logger.log_complete()

return valid


def aggregate_duration_with_new_records(
existing_records: dy.DataFrame[VehicleEvents],
new_records: dy.DataFrame[VehicleEvents],
) -> dy.DataFrame[VehicleEvents]:
"""
Recalculate derived duration fields for stop events based on status changes.

Merges existing and new stop event records, groups them by vehicle ID and stop
sequence, and calculates the timestamp when each status began and ended. Returns
only records that pass VehicleEvents validation.

Args:
existing_records: DataFrame of previously processed stop events with status information.
new_records: DataFrame of newly received stop events with status information.
max_record_age: Maximum age threshold for records (currently logged but not actively used in filtering).

Returns:
DataFrame of stop events with validated derived duration fields (status_start_timestamp
and status_end_timestamp where applicable).

Note:
Records are sorted by timestamp and grouped by vehicle ID, stop sequence, and current status.
Status end timestamp is only set when the first and last timestamp within a group differ.
"""
process_logger = ProcessLogger(
"update_records", existing_records=existing_records.height, max_record_age=str(max_record_age)
"aggregate_duration_with_new_records",
existing_records=existing_records.height,
)
process_logger.log_start()

# grab only the records that are still getting updates
existing_merge_records = existing_records.filter(pl.col("event_id").is_in(new_records["event_id"].unique()))
all_events = pl.concat([existing_merge_records, new_records], how="diagonal")

# for all records at a current stop sequence and status, calculate the start and end times of that status
combined = (
existing_records.filter( # remove old records
datetime.now(tz=ZoneInfo("America/New_York"))
- pl.from_epoch("timestamp").dt.replace_time_zone(
"America/New_York", ambiguous="latest", non_existent="null"
)
< max_record_age
all_events.sort(by="timestamp")
.group_by("event_id", "current_stop_sequence", "current_status")
.agg(
[
pl.first("timestamp").alias("status_start_timestamp"),
pl.when(pl.first("timestamp").ne(pl.last("timestamp"))).then(
pl.last("timestamp").alias("status_end_timestamp")
),
pl.all().exclude("status_start_timestamp", "status_end_timestamp").last(),
Comment on lines +161 to +165

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

first and last are not guaranteed to be the earliest and latest timestamps of the input dataframes

Suggested change
pl.first("timestamp").alias("status_start_timestamp"),
pl.when(pl.first("timestamp").ne(pl.last("timestamp"))).then(
pl.last("timestamp").alias("status_end_timestamp")
),
pl.all().exclude("status_start_timestamp", "status_end_timestamp").last(),
pl.min("timestamp").alias("status_start_timestamp"),
pl.when(pl.min("timestamp").ne(pl.max("timestamp"))).then(
pl.max("timestamp").alias("status_end_timestamp")
),
pl.all().exclude("status_start_timestamp", "status_end_timestamp").max_by(pl.max_horizontal("status_start_timestamp", "status_end_timestamp")),

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think they are because I sorted by timestamp above.

https://docs.pola.rs/user-guide/expressions/aggregation/#sorting

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if you only receive 1 STOPPED_AT record for a stop? would

id current_stop_sequence current_status timestamp
foo 20 STOPPED_AT 1

become

id current_stop_sequence arrived departed
foo 20 1 null

even if the trip has already advanced to stop sequence 2?

# keep the rest of the columns of the most recent one.
]
)
.join(new_records, on=["id", "current_stop_sequence"], how="full", coalesce=True)
.select(
"id",
"current_stop_sequence",
*[
pl.coalesce(col, f"{col}_right").alias(col)
for col in [
"start_date",
"trip_id",
"direction_id",
"route_id",
"start_time",
"revenue",
"stop_id",
"arrived",
]
],
pl.coalesce(
pl.when( # if the trip has moved past this stop sequence, set departed to latest_stopped_timestamp
pl.col("current_stop_sequence").max().over("id").gt(pl.col("current_stop_sequence"))
).then(pl.col("latest_stopped_timestamp")),
"departed",
).alias("departed"),
pl.coalesce(
pl.when( # if departure is updated, then also update timestamp
pl.col("current_stop_sequence").max().over("id").gt(pl.col("current_stop_sequence")),
pl.col("departed").is_null(),
).then(pl.col("timestamp_right").max().over("id")),
"timestamp",
"timestamp_right",
).alias("timestamp"),
pl.coalesce("latest_stopped_timestamp_right", "latest_stopped_timestamp").alias(
"latest_stopped_timestamp"
), # use value from new record
)
.filter(pl.col("arrived").is_not_null() | pl.col("departed").is_not_null()) # keep only stops with events
)

valid = process_logger.log_dataframely_filter_results(*StopEventsTable.filter(combined, cast=True))
valid = process_logger.log_dataframely_filter_results(*VehicleEvents.filter(combined, cast=True))

process_logger.add_metadata(new_records=new_records.height, updated_records=combined.height)

Expand All @@ -157,10 +177,43 @@ def update_records(
return valid


def structure_stop_events(df: dy.DataFrame[StopEventsTable]) -> dy.DataFrame[StopEventsJSON]:
def filter_stop_events(
compressed_events: dy.DataFrame[VehicleEvents],
max_record_age: timedelta,
) -> dy.DataFrame[VehicleStopEvents]:
"""
take compressed events and take only stopped_at events,
and rename the status start and end periods to stop event schema format
"""

filtered = (
compressed_events.filter(
(pl.col("current_status") == "STOPPED_AT")
& (pl.col("status_start_timestamp").is_not_null() | pl.col("status_end_timestamp").is_not_null())
& (
datetime.now(tz=ZoneInfo("America/New_York"))
- pl.from_epoch("timestamp").dt.replace_time_zone(
"America/New_York", ambiguous="latest", non_existent="null"
)
< max_record_age
) # remove records that are older than max_record_age - flashback usecase only requires max_record_age history
)
.drop("current_status")
.sort("event_id", "current_stop_sequence")
.rename({"status_start_timestamp": "arrived", "status_end_timestamp": "departed"})

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do you know that a vehicle has actually departed? suppose

id current_stop_sequence current_status timestamp
foo 1 STOPPED_AT 1
foo 1 STOPPED_AT 2

as I read it, these become

id current_stop_sequence arrived departed
foo 1 1 2

instead of

id current_stop_sequence arrived departed
foo 1 1 null

)

valid = ProcessLogger("filter_stop_events").log_dataframely_filter_results(
*VehicleStopEvents.filter(filtered, cast=True)
)

return valid


def structure_stop_events(df: dy.DataFrame[VehicleStopEvents]) -> dy.DataFrame[StopEventsJSON]:
"""Structure flat table into StopEvents records."""
process_logger = ProcessLogger("structure_stop_events", input_rows=df.height)
stop_events = df.group_by("id").agg(
stop_events = df.group_by("event_id").agg(
pl.max("timestamp").alias("timestamp"),
pl.selectors.by_name("start_date", "trip_id", "direction_id", "route_id", "start_time", "revenue").first(),
pl.struct("stop_id", "current_stop_sequence", "arrived", "departed").alias("stop_events"),
Expand Down
Loading
Loading