Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 22 additions & 21 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ ipykernel = "^7.1.0"
matplotlib = "^3.9.0"
seaborn = "^0.13.2"
tabulate = "^0.9.0"
marimo = "^0.16"
marimo = "^0.23"
plotly = "^6.5"

[tool.poetry.group.dev.dependencies]
black = "^24.3.0"
Expand Down
7 changes: 7 additions & 0 deletions runners/run_ingest_s3_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from lamp_py.ingestion.ingest_gtfs import ingest_s3_files

# from lamp_py.postgres.postgres_utils import start_rds_writer_process

# metadata_queue, rds_process = start_rds_writer_process()

ingest_s3_files(None, bucket_filter="lamp")
1 change: 1 addition & 0 deletions src/lamp_py/ingestion/backfill/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Tools for backfilling and reprocessing data"""
2 changes: 1 addition & 1 deletion src/lamp_py/ingestion/config_busloc_trip.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class RtBusTripDetail(GTFSRTDetail):
"""

def transform_for_write(self, table: pyarrow.table) -> pyarrow.table:
"""modify table schema before write to parquet"""
"""Modify table schema before write to parquet"""
return flatten_table_schema(explode_table_column(flatten_table_schema(table), "trip_update.stop_time_update"))

@property
Expand Down
2 changes: 1 addition & 1 deletion src/lamp_py/ingestion/config_rt_trip.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class RtTripDetail(GTFSRTDetail):
"""

def transform_for_write(self, table: pyarrow.table) -> pyarrow.table:
"""modify table schema before write to parquet"""
"""Modify table schema before write to parquet"""
return flatten_table_schema(explode_table_column(flatten_table_schema(table), "trip_update.stop_time_update"))

@property
Expand Down
6 changes: 3 additions & 3 deletions src/lamp_py/ingestion/convert_gtfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

def gtfs_files_to_convert() -> List[Tuple[str, int]]:
"""
create list of Tuple[GTFS url, version_key] for GtfsConverter
Create list of Tuple[GTFS url, version_key] for GtfsConverter

version_key is based on published_dt
"""
Expand Down Expand Up @@ -75,7 +75,7 @@ def convert(self) -> None:

def process_schedule(self, url: str, version_key: int) -> None:
"""
convert a schedule gtfs zip file into tables. the zip file is
Convert a schedule gtfs zip file into tables. the zip file is
essentially a small database with each contained file (outside of feed
info) acting as its own table. info on the gtfs scheduling standard can
be found at http://gtfs.org/schedule/
Expand All @@ -99,7 +99,7 @@ def process_schedule(self, url: str, version_key: int) -> None:

def create_table(self, gtfs_zip: zipfile.ZipFile, table_filename: str, version_key: int) -> None:
"""
read a csv table out of a gtfs static schedule file, add a timestamp
Read a csv table out of a gtfs static schedule file, add a timestamp
column to each row, and write it as a parquet file on s3, partitioned
by the timestamp

Expand Down
34 changes: 20 additions & 14 deletions src/lamp_py/ingestion/convert_gtfs_rt.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,12 @@ class GtfsRtConverter(Converter):
https_mbta_integration.mybluemix.net_vehicleCount.gz
"""

def __init__(self, config_type: ConfigType, metadata_queue: Queue[Optional[str]], max_workers: int = 4) -> None:
def __init__(
self,
config_type: ConfigType,
metadata_queue: Queue[Optional[str]],
max_workers: int = 8,
) -> None:
Converter.__init__(self, config_type, metadata_queue)

# Depending on filename, assign self.details to correct implementation
Expand Down Expand Up @@ -185,8 +190,10 @@ def convert(self) -> None:
continue

self.continuous_pq_update(table)
pool = pyarrow.default_memory_pool()
pool.release_unused()

# try to get pyarrow to limit memory usage after each loop.
# this is a "ask nicely and pray" move...we can't manage memory directly in python.
pyarrow.default_memory_pool().release_unused()
table_count += 1
process_logger.add_metadata(table_count=table_count)
# limit number of tables produced on each event loop
Expand All @@ -204,7 +211,7 @@ def convert(self) -> None:

def thread_init(self) -> None:
"""
initialize the filesystem in each convert thread
Initialize the filesystem in each convert thread

update the active fs to use the s3 filesystem for all loading if the
first file starts with s3
Expand All @@ -217,11 +224,10 @@ def thread_init(self) -> None:

def process_files(self) -> Iterable[pyarrow.table]:
"""
iterate through all of the files to be converted
Iterate through all of the files to be converted

only yield a new table when table size crosses over min_rows of yield_check
"""

process_logger = ProcessLogger(
"create_pyarrow_tables",
config_type=str(self.config_type),
Expand Down Expand Up @@ -272,7 +278,7 @@ def process_files(self) -> Iterable[pyarrow.table]:

def yield_check(self, process_logger: ProcessLogger, min_rows: int = 2_000_000) -> Iterable[pyarrow.table]:
"""
yield all tables in the data_parts map that have been sufficiently
Yield all tables in the data_parts map that have been sufficiently
processed.

@min_rows - how many rows the table must have to be yielded
Expand Down Expand Up @@ -367,7 +373,7 @@ def gz_to_pyarrow(self, filename: str) -> Tuple[Optional[datetime], str, Optiona

def partition_dt(self, table: pyarrow.Table) -> datetime:
"""
verify partition structure of pyarrow Table
Verify partition structure of pyarrow Table

:param table: pyarrow Table to verify

Expand Down Expand Up @@ -419,7 +425,7 @@ def sync_with_s3(self, local_path: str) -> bool:

def make_hash_dataset(self, table: pyarrow.Table, local_path: str) -> pd.Dataset:
"""
create dataset, with hash column, that will be written to parquet file
Create dataset, with hash column, that will be written to parquet file

:param table: pyarrow Table
:param local_path: path to local parquet file
Expand Down Expand Up @@ -451,9 +457,9 @@ def make_hash_dataset(self, table: pyarrow.Table, local_path: str) -> pd.Dataset

# pylint: disable=R0914
# pylint too many local variables (more than 15)
def write_local_pq(self, table: pyarrow.Table, local_path: str) -> None:
def write_local_pq_partition(self, table: pyarrow.Table, local_path: str) -> None:
"""
merge pyarrow Table with existing local_path parquet file
Merge pyarrow Table with existing local_path parquet file

:param table: pyarrow Table
:param local_path: path to local parquet file
Expand Down Expand Up @@ -586,7 +592,7 @@ def continuous_pq_update(self, table: pyarrow.Table) -> None:

log.add_metadata(local_path=local_path)

self.write_local_pq(table, local_path)
self.write_local_pq_partition(table, local_path)
self.send_metadata(local_path.replace(self.tmp_folder, S3_SPRINGBOARD))

# record the number of rows in the final parquet file for logging
Expand All @@ -610,7 +616,7 @@ def continuous_pq_update(self, table: pyarrow.Table) -> None:

def clean_local_folders(self) -> None:
"""
clean local temp folders
Clean local temp folders
"""
days_to_keep = 2
root_folder = os.path.join(
Expand All @@ -630,7 +636,7 @@ def clean_local_folders(self) -> None:

def move_s3_files(self) -> None:
"""
move archive and error files to their respective s3 buckets.
Move archive and error files to their respective s3 buckets.
"""
if len(self.error_files) > 0:
self.error_files = move_s3_objects(
Expand Down
Loading
Loading