Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 59 additions & 53 deletions src/lamp_py/ingestion/config_busloc_trip.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,77 @@
from typing import List, Tuple
import dataframely as dy
import pyarrow

from lamp_py.ingestion.gtfs_rt_detail import GTFSRTDetail
from lamp_py.ingestion.gtfs_rt_detail import GTFSRTDetail, FeedMessage, TripUpdateTable
from lamp_py.ingestion.gtfs_rt_structs import (
trip_descriptor,
vehicle_descriptor,
stop_time_event,
)
from lamp_py.ingestion.utils import explode_table_column, flatten_table_schema
from lamp_py.utils.dataframely import with_nullable


class RtBusTripDetail(GTFSRTDetail):
class BusLocTripUpdateRecord(FeedMessage):
"""Each TripUpdate message generated by BusLoc."""

entity = dy.List(
inner=dy.Struct(
inner={
"id": dy.String(min_length=1),
"trip_update": dy.Struct(
inner={
"timestamp": dy.UInt64(nullable=True), # Not currently provided by Busloc
"delay": dy.Int32(nullable=True), # Not currently provided by Busloc
"trip": trip_descriptor, # Busloc currently only provides trip_id, route_id and schedule_relationship
"vehicle": with_nullable(
vehicle_descriptor, nullable=True
), # Busloc currently only provides id and label
"stop_time_update": dy.List(
dy.Struct(
inner={
"stop_sequence": dy.UInt32(),
"stop_id": dy.String(nullable=True),
"arrival": stop_time_event,
"departure": stop_time_event,
"schedule_relationship": dy.String(nullable=True),
"cause_id": dy.UInt16(nullable=True),
"cause_description": dy.String(nullable=True),
"remark": dy.String(nullable=True),
},
),
min_length=1,
),
}
),
}
),
min_length=1,
)


class BusLocTripUpdateTable(TripUpdateTable):
"""Flattened BusLoc TripUpdates data with exploded stop_time_update."""

cause_id = dy.UInt16(nullable=True, alias="trip_update.stop_time_update.cause_id")
cause_description = dy.String(nullable=True, alias="trip_update.stop_time_update.cause_description")
remark = dy.String(nullable=True, alias="trip_update.stop_time_update.remark")


class RtBusTripDetail(GTFSRTDetail[BusLocTripUpdateTable, BusLocTripUpdateRecord]):
"""
Detail for how to convert RT GTFS Trip Updates from json entries into
parquet tables.
"""

@property
def table_schema(self) -> type[BusLocTripUpdateTable]:
return BusLocTripUpdateTable

@property
def record_schema(self) -> type[BusLocTripUpdateRecord]:
return BusLocTripUpdateRecord

def transform_for_write(self, table: pyarrow.table) -> pyarrow.table:
"""modify table schema before write to parquet"""
return flatten_table_schema(explode_table_column(flatten_table_schema(table), "trip_update.stop_time_update"))
Expand All @@ -26,57 +82,7 @@ def partition_column(self) -> str:

@property
def import_schema(self) -> pyarrow.schema:
return pyarrow.schema(
[
("id", pyarrow.string()),
(
"trip_update",
pyarrow.struct(
[
(
"timestamp",
pyarrow.uint64(),
), # Not currently provided by Busloc
(
"delay",
pyarrow.int32(),
), # Not currently provided by Busloc
(
"trip",
trip_descriptor.pyarrow_dtype,
), # Busloc currently only provides trip_id, route_id and schedule_relationship
(
"vehicle",
vehicle_descriptor.pyarrow_dtype,
), # Busloc currently only provides id and label
(
"stop_time_update",
pyarrow.list_(
pyarrow.struct(
[
("stop_sequence", pyarrow.uint32()),
("stop_id", pyarrow.string()),
("arrival", stop_time_event.pyarrow_dtype),
("departure", stop_time_event.pyarrow_dtype),
(
"schedule_relationship",
pyarrow.string(),
),
("cause_id", pyarrow.uint16()),
(
"cause_description",
pyarrow.string(),
),
("remark", pyarrow.string()),
]
)
),
),
]
),
),
]
)
return pyarrow.schema([v.pyarrow_field(k) for k, v in self.record_schema.entity.inner.inner.items()]) # type: ignore[attr-defined]

@property
def table_sort_order(self) -> List[Tuple[str, str]]:
Expand Down
37 changes: 26 additions & 11 deletions src/lamp_py/ingestion/config_busloc_vehicle.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,8 @@
import dataframely as dy
import pyarrow

from .gtfs_rt_detail import GTFSRTDetail, FeedMessage, FeedEntityTable
from .gtfs_rt_structs import (
position,
vehicle_descriptor,
trip_descriptor,
)
from lamp_py.ingestion.gtfs_rt_detail import GTFSRTDetail, FeedMessage, FeedEntityTable
from lamp_py.ingestion.gtfs_rt_structs import position


class BusLocVehicleRecord(FeedMessage):
Expand All @@ -23,8 +19,30 @@ class BusLocVehicleRecord(FeedMessage):
"position": position,
"location_source": dy.String(nullable=True),
"timestamp": dy.UInt64(nullable=True),
"trip": trip_descriptor,
"vehicle": vehicle_descriptor,
"trip": dy.Struct(
inner={
"trip_id": dy.String(nullable=True),
"route_id": dy.String(nullable=True),
"direction_id": dy.UInt8(nullable=True),
"start_time": dy.String(nullable=True),
"start_date": dy.String(nullable=True),
"schedule_relationship": dy.String(nullable=True),
"route_pattern_id": dy.String(nullable=True), # MBTA Enhanced Field
"tm_trip_id": dy.String(nullable=True), # Only used by Busloc
"overload_id": dy.Int64(nullable=True), # Only used by Busloc
"overload_offset": dy.Int64(nullable=True), # Only used by Busloc
"revenue": dy.Bool(nullable=True), # MBTA Enhanced Field
"last_trip": dy.Bool(nullable=True), # MBTA Enhanced Field
}
),
"vehicle": dy.Struct(
inner={
"id": dy.String(nullable=False),
"label": dy.String(nullable=True),
"license_plate": dy.String(nullable=True),
"assignment_status": dy.String(nullable=True), # Only used by Busloc
}
),
"operator": dy.Struct(
inner={
"id": dy.String(nullable=True),
Expand Down Expand Up @@ -81,9 +99,6 @@ class BusLocVehicleTable(FeedEntityTable):
vehicle_id = dy.String(nullable=True, alias="vehicle.vehicle.id")
vehicle_label = dy.String(nullable=True, alias="vehicle.vehicle.label")
vehicle_license_plate = dy.String(nullable=True, alias="vehicle.vehicle.license_plate")
vehicle_consist = dy.List(
dy.Struct(inner={"label": dy.String(nullable=True)}), nullable=True, alias="vehicle.vehicle.consist"
)
vehicle_assignment_status = dy.String(nullable=True, alias="vehicle.vehicle.assignment_status")
operator_id = dy.String(nullable=True, alias="vehicle.operator.id")
operator_first_name = dy.String(
Expand Down
14 changes: 8 additions & 6 deletions src/lamp_py/ingestion/config_rt_alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,9 @@ class AlertsRecord(FeedMessage):
nullable=True,
),
"cause": dy.String(nullable=False),
"cause_detail": dy.String(nullable=True), # type does not match spec type of <TranslatedString>
"cause_detail": translated_string, # type does not match spec type of <TranslatedString>
"effect": dy.String(nullable=True),
"effect_detail": dy.String(
nullable=True
), # type does not match spec type of <TranslatedString>
"effect_detail": translated_string, # type does not match spec type of <TranslatedString>
"url": translated_string,
"header_text": translated_string,
"description_text": translated_string,
Expand Down Expand Up @@ -80,9 +78,13 @@ class AlertsTable(FeedEntityTable):
"""Flattened Alerts data."""

alert_cause = dy.String(alias="alert.cause")
alert_cause_detail = dy.String(nullable=True, alias="alert.cause_detail")
alert_cause_detail_translation = with_alias(
translated_string.inner["translation"], new_alias="alert.cause_detail.translation"
)
alert_effect = dy.String(nullable=True, alias="alert.effect")
alert_effect_detail = dy.String(nullable=True, alias="alert.effect_detail")
alert_effect_detail_translation = with_alias(
translated_string.inner["translation"], new_alias="alert.effect_detail.translation"
)
alert_severity_level = dy.String(nullable=True, alias="alert.severity_level")
alert_severity = dy.UInt16(nullable=True, alias="alert.severity")
alert_created_timestamp = dy.UInt64(nullable=True, alias="alert.created_timestamp")
Expand Down
91 changes: 52 additions & 39 deletions src/lamp_py/ingestion/config_rt_trip.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from typing import List, Tuple
import dataframely as dy
import pyarrow

from lamp_py.ingestion.gtfs_rt_detail import GTFSRTDetail
from lamp_py.ingestion.gtfs_rt_detail import GTFSRTDetail, FeedMessage, TripUpdateTable
from lamp_py.ingestion.gtfs_rt_structs import (
trip_descriptor,
vehicle_descriptor,
Expand All @@ -10,12 +11,60 @@
from lamp_py.ingestion.utils import explode_table_column, flatten_table_schema


class RtTripDetail(GTFSRTDetail):
class TripUpdateRecord(FeedMessage):
"""Each TripUpdate message generated by GTFS-RT."""

entity = dy.List(
inner=dy.Struct(
inner={
"id": dy.String(min_length=1),
"trip_update": dy.Struct(
inner={
"trip": trip_descriptor,
"vehicle": vehicle_descriptor,
"stop_time_update": dy.List(
dy.Struct(
inner={
"stop_sequence": dy.UInt32(),
"stop_id": dy.String(nullable=True),
"arrival": stop_time_event,
"departure": stop_time_event,
"schedule_relationship": dy.String(nullable=True),
"boarding_status": dy.String(nullable=True), # MBTA Enhanced Field
},
),
min_length=1,
),
"timestamp": dy.UInt64(nullable=True),
"delay": dy.Int32(nullable=True),
}
),
}
),
min_length=1,
)


class ConcentrateTripUpdateTable(TripUpdateTable):
"""Flattened GTFS-RT TripUpdates data with exploded stop_time_update."""

boarding_status = dy.String(nullable=True, alias="trip_update.stop_time_update.boarding_status")


class RtTripDetail(GTFSRTDetail[ConcentrateTripUpdateTable, TripUpdateRecord]):
"""
Detail for how to convert RT GTFS Trip Updates from json entries into
parquet tables.
"""

@property
def table_schema(self) -> type[ConcentrateTripUpdateTable]:
return ConcentrateTripUpdateTable

@property
def record_schema(self) -> type[TripUpdateRecord]:
return TripUpdateRecord

def transform_for_write(self, table: pyarrow.table) -> pyarrow.table:
"""modify table schema before write to parquet"""
return flatten_table_schema(explode_table_column(flatten_table_schema(table), "trip_update.stop_time_update"))
Expand All @@ -26,43 +75,7 @@ def partition_column(self) -> str:

@property
def import_schema(self) -> pyarrow.schema:
return pyarrow.schema(
[
("id", pyarrow.string()),
(
"trip_update",
pyarrow.struct(
[
("trip", trip_descriptor.pyarrow_dtype),
("vehicle", vehicle_descriptor.pyarrow_dtype),
(
"stop_time_update",
pyarrow.list_(
pyarrow.struct(
[
("stop_sequence", pyarrow.uint32()),
("stop_id", pyarrow.string()),
("arrival", stop_time_event.pyarrow_dtype),
("departure", stop_time_event.pyarrow_dtype),
(
"schedule_relationship",
pyarrow.string(),
),
(
"boarding_status",
pyarrow.string(),
), # MBTA Enhanced Field
]
)
),
),
("timestamp", pyarrow.uint64()),
("delay", pyarrow.int32()),
]
),
),
]
)
return pyarrow.schema([v.pyarrow_field(k) for k, v in self.record_schema.entity.inner.inner.items()]) # type: ignore[attr-defined]

# pylint: disable=R0801
# Similar lines in 2 files
Expand Down
Loading
Loading