Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions analysis/rowgroup_analysis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import os

import pyarrow.parquet as pq
import polars as pl


def get_rowgroup_statistics(parquet_path: str) -> pl.DataFrame:
"""
Extract row group statistics from a Parquet file and return as a Polars DataFrame.

Args:
parquet_path: Path to the parquet file

Returns:
Polars DataFrame with row group statistics
"""
parquet_file = pq.ParquetFile(parquet_path)
file_metadata = parquet_file.metadata

records = []
for i in range(file_metadata.num_row_groups):
row_group_metadata = file_metadata.row_group(i)

for col_i in range(row_group_metadata.num_columns):
column_chunk_metadata = row_group_metadata.column(col_i)
stats = column_chunk_metadata.statistics

records.append(
{
"row_group": i,
"num_rows": row_group_metadata.num_rows,
"total_byte_size": row_group_metadata.total_byte_size,
"column_index": col_i,
"column_path": column_chunk_metadata.path_in_schema,
"compressed_size": column_chunk_metadata.total_compressed_size,
"uncompressed_size": column_chunk_metadata.total_uncompressed_size,
"min_value": str(stats.min) if stats and stats.min is not None else None,
"max_value": str(stats.max) if stats and stats.max is not None else None,
"null_count": stats.null_count if stats else None,
"num_values": stats.num_values if stats else None,
}
)

return pl.DataFrame(records)


if __name__ == "__main__":
# df = get_rowgroup_statistics("/tmp/2026_2_1.parquet")
# print(df)

parquet_file = pq.ParquetFile(file)
file_metadata = parquet_file.metadata
for file in os.listdir("/Users/hhuang/ingestion/lamp/RT_TRIP_UPDATES/year=2026/month=2/day=1/"):
print(file)
df = get_rowgroup_statistics(f"/Users/hhuang/ingestion/lamp/RT_TRIP_UPDATES/year=2026/month=2/day=1/{file}")
breakpoint()
ts = df.filter(pl.col("column_path") == "feed_timestamp")
parquet_file = pq.ParquetFile(f"/Users/hhuang/ingestion/lamp/RT_TRIP_UPDATES/year=2026/month=2/day=1/{file}")
file_metadata = parquet_file.metadata

ts.select("min_value", "max_value").with_columns(
pl.lit(f"/Users/hhuang/ingestion/lamp/RT_TRIP_UPDATES/year=2026/month=2/day=1/{file}").alias("filename")
)
134 changes: 134 additions & 0 deletions analysis/write_local_pq_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# pylint: disable=R0914
# pylint too many local variables (more than 15)
from importlib.resources import files

import os
import tempfile
from typing import (
List,
Tuple,
)

from pyarrow import fs

import pyarrow.compute as pc
import pyarrow.parquet as pq
import pyarrow.dataset as pd

from lamp_py.aws.s3 import (
upload_file,
)

from lamp_py.runtime_utils.remote_files import (
S3_SPRINGBOARD,
)
import time


# with pq.ParquetWriter(self.local_parquet_path, schema=self.output_processed_schema) as writer:
# for batch in ds.to_batches(
# batch_size=500_000,
# columns=[col for col in ds.schema.names if col != "lamp_record_hash"],
# filter=self.parquet_filter,
# batch_readahead=1,
# fragment_readahead=0,
# ):
# # don't check empty batch if no rows
# if batch.num_rows == 0:
# continue


def partition_column() -> str:
return "trip_update.trip.route_id"


def table_sort_order() -> List[Tuple[str, str]]:
return [
("feed_timestamp", "ascending"),
("trip_update.trip.route_pattern_id", "ascending"),
("trip_update.trip.direction_id", "ascending"),
("trip_update.vehicle.id", "ascending"),
]


def table_sort_order_pl() -> Tuple[List[str], List[bool]]:
sort_order = table_sort_order()
return (
[item[0] for item in sort_order],
[item[1] == "ascending" for item in sort_order],
)


def write_local_pq(local_path: str, partition_column: str, in_partition_sort: List[Tuple[str, str]]) -> None:

ds_paths = [s.replace("s3://", "") for s in files]

# s3_uris = file_list_from_s3(
# bucket_name=self.remote_input_location.bucket,
# file_prefix=self.remote_input_location.prefix,
# )

ds_paths = os.listdir(local_path)
ds_paths = [os.path.join(local_path, s) for s in ds_paths]

ds = pd.dataset(
ds_paths,
format="parquet",
filesystem=fs.LocalFileSystem(),
)

with tempfile.TemporaryDirectory(delete=False) as temp_dir:
rail_full_set_path = os.path.join(temp_dir, "rail_full_set.parquet")

print(rail_full_set_path)
# include the hash column for debug
rail_full_set_writer = pq.ParquetWriter(
rail_full_set_path, schema=ds.schema, compression="zstd", compression_level=3
)

partitions = pc.unique(ds.to_table(columns=[partition_column]).column(partition_column))

partitions = sorted(partitions.to_pylist())

print(f"Found {len(partitions)} unique partitions based on column {partition_column}")

debug = True
if debug:
start_time = time.time()

# col, order = table_sort_order_pl()
for part in partitions:
write_table = ds.to_table(filter=((pc.field(partition_column) == part))).sort_by(in_partition_sort)

# write_table = pl.from_arrow(ds.to_table(
# filter=(
# (pc.field(partition_column) == part)
# )
# )).sort(col, descending=order).to_arrow().cast(ds.schema)

rail_full_set_writer.write_table(write_table)

if debug:
elapsed = time.time() - start_time
print(f"Processed partition {part}: {elapsed:.2f}s elapsed, total rows: {write_table.num_rows}")

rail_full_set_writer.close()

if False:
# upload the upload_path file (without hash) to s3
# replace the first part of the path with the s3 path
upload_file(
upload_path,
local_path.replace(self.tmp_folder, S3_SPRINGBOARD),
)


if __name__ == "__main__":
write_local_pq(
"/Users/hhuang/ingestion/lamp/RT_TRIP_UPDATES/year=2026/month=2/day=1/", partition_column(), table_sort_order()
)
write_local_pq(
"s3://mbta-ctd-dataplatform-dev-springboard/lamp/RT_TRIP_UPDATES/year=2026/month=4/day=1/2026-04-01T00:00:00.parquet",
partition_column(),
table_sort_order(),
)
43 changes: 22 additions & 21 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ ipykernel = "^7.1.0"
matplotlib = "^3.9.0"
seaborn = "^0.13.2"
tabulate = "^0.9.0"
marimo = "^0.16"
marimo = "^0.23"
plotly = "^6.5"

[tool.poetry.group.dev.dependencies]
black = "^24.3.0"
Expand Down
7 changes: 7 additions & 0 deletions runners/run_ingest_s3_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from lamp_py.ingestion.ingest_gtfs import ingest_s3_files

# from lamp_py.postgres.postgres_utils import start_rds_writer_process

# metadata_queue, rds_process = start_rds_writer_process()

ingest_s3_files(None, bucket_filter="lamp")
Loading
Loading