Skip to content

ingestion refactor changes#760

Open
huangh wants to merge 12 commits into
mainfrom
hh-ingestion-refactor-new
Open

ingestion refactor changes#760
huangh wants to merge 12 commits into
mainfrom
hh-ingestion-refactor-new

Conversation

@huangh

@huangh huangh commented May 28, 2026

Copy link
Copy Markdown
Collaborator

Asana Task: ticket
ticket2

What changes does this PR propose?

Writeup on the re-architecture here: https://app.notion.com/p/mbta-downtown-crossing/Ingestion-Redesign-Ingestion-Indegestion-Redegestion-334f5d8d11ea8020bffcdcffe9b6c1ee?source=copy_link

  • This is the first part of the re-architecture - to write ingested files out in 15-minute chunks instead of appending to a single large, growing file throughout the day.
  • this allows us to even entertain a 15 minute lag time between data ingestion -> tableau. This is currently some variable time between 1hr - 2.5hrs depending.
  • a new converter called convert_gtfs_rt_fullset.py has been added that implements this 15-minute chunking. The original converter is left mostly alone. This new converter is applied for now only to RT_TRIP_UPDATES and RT_VEHICLE_POSITIONS - both PROD and DEV_GREEN variants.
  • a change was made in devops to bump the size of ingestion to 8 vCPU, so the default workers and threads have been bumped accordingly
  • Some commented out code to be enabled in follow-on PR allows the ingestion ECS to gather up all of the 15 minute chunks and re-combine them into the archival single day parquet file. This process has been demonstrated to result in much better compression than the current processing.

How were these changes validated?

  1. New tests? test_yield_check_periodic makes sure the files written out make sense.
  2. Used a runner? validate_data_across_environment.py compares data across dev|staging|prod (which have the same inputs via delta) - and ensures that they are identical. This proves that we have all of the records and have lost none.

@github-actions

Copy link
Copy Markdown

LCOV of commit 500541b during Continuous Integration (Python) #1964

Summary coverage rate:
  lines......: 63.9% (3588 of 5613 lines)
  functions..: 29.4% (272 of 926 functions)
  branches...: no data found

Files changed coverage rate:
                                                                                     |Lines       |Functions  |Branches    
  Filename                                                                           |Rate     Num|Rate    Num|Rate     Num
  =========================================================================================================================
  src/lamp_py/ingestion/config_busloc_trip.py                                        |82.4%     17|12.5%     8|    -      0
  src/lamp_py/ingestion/config_rt_trip.py                                            |82.4%     17|12.5%     8|    -      0
  src/lamp_py/ingestion/convert_gtfs.py                                              |79.2%     48|37.5%     8|    -      0
  src/lamp_py/ingestion/convert_gtfs_rt.py                                           |88.6%    237|50.0%    26|    -      0
  src/lamp_py/ingestion/convert_gtfs_rt_fullset.py                                   |45.6%    114|16.7%    18|    -      0
  src/lamp_py/ingestion/converter.py                                                 |93.9%     49|30.0%    20|    -      0
  src/lamp_py/ingestion/glides.py                                                    |95.6%    182|45.5%    44|    -      0
  src/lamp_py/ingestion/gtfs_rt_detail.py                                            |94.1%     17|12.5%     8|    -      0
  src/lamp_py/ingestion/ingest_gtfs.py                                               | 0.0%     63| 0.0%    10|    -      0
  src/lamp_py/ingestion/pipeline.py                                                  | 0.0%     38| 0.0%     4|    -      0
  src/lamp_py/ingestion/utils.py                                                     |63.1%    111|35.0%    20|    -      0

@github-actions

github-actions Bot commented Jun 1, 2026

Copy link
Copy Markdown

LCOV of commit 4607981 during Continuous Integration (Python) #1970

Summary coverage rate:
  lines......: 64.0% (3589 of 5610 lines)
  functions..: 29.4% (272 of 926 functions)
  branches...: no data found

Files changed coverage rate:
                                                                                     |Lines       |Functions  |Branches    
  Filename                                                                           |Rate     Num|Rate    Num|Rate     Num
  =========================================================================================================================
  src/lamp_py/ingestion/config_busloc_trip.py                                        |82.4%     17|12.5%     8|    -      0
  src/lamp_py/ingestion/config_rt_trip.py                                            |82.4%     17|12.5%     8|    -      0
  src/lamp_py/ingestion/convert_gtfs.py                                              |79.2%     48|37.5%     8|    -      0
  src/lamp_py/ingestion/convert_gtfs_rt.py                                           |89.0%    237|50.0%    26|    -      0
  src/lamp_py/ingestion/convert_gtfs_rt_fullset.py                                   |46.8%    111|16.7%    18|    -      0
  src/lamp_py/ingestion/converter.py                                                 |93.9%     49|30.0%    20|    -      0
  src/lamp_py/ingestion/glides.py                                                    |95.6%    182|45.5%    44|    -      0
  src/lamp_py/ingestion/gtfs_rt_detail.py                                            |94.1%     17|12.5%     8|    -      0
  src/lamp_py/ingestion/ingest_gtfs.py                                               | 0.0%     63| 0.0%    10|    -      0
  src/lamp_py/ingestion/pipeline.py                                                  | 0.0%     38| 0.0%     4|    -      0
  src/lamp_py/ingestion/utils.py                                                     |63.1%    111|35.0%    20|    -      0

@huangh huangh marked this pull request as ready for review June 5, 2026 12:42
@huangh huangh requested a review from a team as a code owner June 5, 2026 12:42
@huangh huangh requested a review from runkelcorey June 5, 2026 12:42
Comment thread src/lamp_py/ingestion/pipeline.py

@runkelcorey runkelcorey left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like how clean this code is. Can you construct a test that covers convert? I think it would help for regression testing—and to prove to myself that it does what it should. You can take a look at test_convert in test_convert_gtfs_rt to see what this could look like.

Comment thread tests/ingestion/test_yield_check_periodic.py Outdated
Comment thread tests/ingestion/test_yield_check_periodic.py
Comment thread tests/ingestion/test_yield_check_periodic.py Outdated
Comment thread tests/ingestion/test_yield_check_periodic.py
Comment thread tests/ingestion/test_yield_check_periodic.py Outdated
Comment thread src/lamp_py/ingestion/convert_gtfs_rt_fullset.py
Comment thread src/lamp_py/ingestion/convert_gtfs_rt_fullset.py
Comment thread src/lamp_py/ingestion/convert_gtfs_rt_fullset.py
Comment thread src/lamp_py/ingestion/convert_gtfs_rt_fullset.py
Comment thread src/lamp_py/ingestion/convert_gtfs_rt_fullset.py Outdated
@huangh huangh force-pushed the hh-ingestion-refactor-new branch from 4607981 to 51f2e09 Compare June 12, 2026 03:29
@github-actions

Copy link
Copy Markdown

LCOV of commit fb69a46 during Continuous Integration (Python) #1988

Summary coverage rate:
  lines......: 64.3% (3630 of 5645 lines)
  functions..: 29.6% (277 of 936 functions)
  branches...: no data found

Files changed coverage rate:
                                                                                     |Lines       |Functions  |Branches    
  Filename                                                                           |Rate     Num|Rate    Num|Rate     Num
  =========================================================================================================================
  src/lamp_py/ingestion/config_busloc_trip.py                                        |82.4%     17|12.5%     8|    -      0
  src/lamp_py/ingestion/config_rt_trip.py                                            |82.4%     17|12.5%     8|    -      0
  src/lamp_py/ingestion/convert_gtfs.py                                              |79.2%     48|37.5%     8|    -      0
  src/lamp_py/ingestion/convert_gtfs_rt.py                                           |89.0%    236|50.0%    26|    -      0
  src/lamp_py/ingestion/convert_gtfs_rt_fullset.py                                   |44.7%     94|14.3%    14|    -      0
  src/lamp_py/ingestion/converter.py                                                 |93.9%     49|30.0%    20|    -      0
  src/lamp_py/ingestion/glides.py                                                    |95.6%    182|45.5%    44|    -      0
  src/lamp_py/ingestion/gtfs_rt_detail.py                                            |90.0%     30| 8.3%    12|    -      0
  src/lamp_py/ingestion/ingest_gtfs.py                                               | 0.0%     63| 0.0%    10|    -      0
  src/lamp_py/ingestion/pipeline.py                                                  | 0.0%     38| 0.0%     4|    -      0
  src/lamp_py/ingestion/utils.py                                                     |64.3%    115|36.4%    22|    -      0

@huangh huangh requested a review from runkelcorey June 16, 2026 13:43
@github-actions

Copy link
Copy Markdown

LCOV of commit 3479c0f during Continuous Integration (Python) #1990

Summary coverage rate:
  lines......: 64.3% (3630 of 5646 lines)
  functions..: 29.6% (277 of 936 functions)
  branches...: no data found

Files changed coverage rate:
                                                                                     |Lines       |Functions  |Branches    
  Filename                                                                           |Rate     Num|Rate    Num|Rate     Num
  =========================================================================================================================
  src/lamp_py/ingestion/config_busloc_trip.py                                        |82.4%     17|12.5%     8|    -      0
  src/lamp_py/ingestion/config_rt_trip.py                                            |82.4%     17|12.5%     8|    -      0
  src/lamp_py/ingestion/convert_gtfs.py                                              |79.2%     48|37.5%     8|    -      0
  src/lamp_py/ingestion/convert_gtfs_rt.py                                           |89.0%    236|50.0%    26|    -      0
  src/lamp_py/ingestion/convert_gtfs_rt_fullset.py                                   |44.2%     95|14.3%    14|    -      0
  src/lamp_py/ingestion/converter.py                                                 |93.9%     49|30.0%    20|    -      0
  src/lamp_py/ingestion/glides.py                                                    |95.6%    182|45.5%    44|    -      0
  src/lamp_py/ingestion/gtfs_rt_detail.py                                            |90.0%     30| 8.3%    12|    -      0
  src/lamp_py/ingestion/ingest_gtfs.py                                               | 0.0%     63| 0.0%    10|    -      0
  src/lamp_py/ingestion/pipeline.py                                                  | 0.0%     38| 0.0%     4|    -      0
  src/lamp_py/ingestion/utils.py                                                     |64.3%    115|36.4%    22|    -      0

…rd expected structure the same. deduped data will be written to the standard springboard location
@github-actions

Copy link
Copy Markdown

LCOV of commit a440de3 during Continuous Integration (Python) #1991

Summary coverage rate:
  lines......: 64.3% (3630 of 5646 lines)
  functions..: 29.6% (277 of 936 functions)
  branches...: no data found

Files changed coverage rate:
                                                                                     |Lines       |Functions  |Branches    
  Filename                                                                           |Rate     Num|Rate    Num|Rate     Num
  =========================================================================================================================
  src/lamp_py/ingestion/config_busloc_trip.py                                        |82.4%     17|12.5%     8|    -      0
  src/lamp_py/ingestion/config_rt_trip.py                                            |82.4%     17|12.5%     8|    -      0
  src/lamp_py/ingestion/convert_gtfs.py                                              |79.2%     48|37.5%     8|    -      0
  src/lamp_py/ingestion/convert_gtfs_rt.py                                           |89.0%    236|50.0%    26|    -      0
  src/lamp_py/ingestion/convert_gtfs_rt_fullset.py                                   |44.2%     95|14.3%    14|    -      0
  src/lamp_py/ingestion/converter.py                                                 |93.9%     49|30.0%    20|    -      0
  src/lamp_py/ingestion/glides.py                                                    |95.6%    182|45.5%    44|    -      0
  src/lamp_py/ingestion/gtfs_rt_detail.py                                            |90.0%     30| 8.3%    12|    -      0
  src/lamp_py/ingestion/ingest_gtfs.py                                               | 0.0%     63| 0.0%    10|    -      0
  src/lamp_py/ingestion/pipeline.py                                                  | 0.0%     38| 0.0%     4|    -      0
  src/lamp_py/ingestion/utils.py                                                     |64.3%    115|36.4%    22|    -      0

@runkelcorey runkelcorey left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we deployed this to staging:

  1. what would happen to Rail PM?
  2. would tableau data grow stale until its refactored to expect a single file?

if config_type not in converters:
converters[config_type] = GtfsRtConverter(config_type, metadata_queue)
if config_type in (ConfigType.RT_ALERTS, ConfigType.VEHICLE_COUNT, ConfigType.SCHEDULE):
converters[config_type] = GtfsConverter(config_type, metadata_queue)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huangh bumping this because I don't see how RT_ALERTS gets ingested with a GtfsConverter

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants