diff --git a/pipelines/order_items/transformations/transformation.py b/pipelines/order_items/transformations/transformation.py index 28908a5..8a14a18 100644 --- a/pipelines/order_items/transformations/transformation.py +++ b/pipelines/order_items/transformations/transformation.py @@ -5,12 +5,23 @@ import pyspark.sql.functions as F from pyspark.sql.types import ( StructType, StructField, StringType, IntegerType, - DoubleType, ArrayType + DoubleType, ArrayType, TimestampType ) # ────────────────────────────────────────────────────────────── # 0. Bronze – raw event stream # ────────────────────────────────────────────────────────────── +all_events_schema = StructType([ + StructField("event_id", StringType()), + StructField("event_type", StringType()), + StructField("ts", TimestampType()), + StructField("gk_id", StringType()), + StructField("location_id", IntegerType()), + StructField("order_id", StringType()), + StructField("sequence", IntegerType()), + StructField("body", StringType()), +]) + @dlt.table( comment = "Raw JSON events as ingested (one file per event)." ) @@ -19,8 +30,9 @@ def all_events(): SCHEMA = spark.conf.get("RAW_DATA_SCHEMA") VOLUME = spark.conf.get("RAW_DATA_VOLUME") return ( - spark.readStream.format("cloudFiles") + spark.readStream.format("cloudFiles") .option("cloudFiles.format", "json") + .schema(all_events_schema) .load(f"/Volumes/{CATALOG}/{SCHEMA}/{VOLUME}") ) @@ -54,7 +66,7 @@ def silver_order_items(): df = ( dlt.read_stream("all_events") .filter(F.col("event_type") == "order_created") - .withColumn("event_ts", F.to_timestamp("ts")) # enforce TIMESTAMP + .withColumnRenamed("ts", "event_ts") .withColumn("body_obj", F.from_json("body", body_schema)) .withColumn("item", F.explode("body_obj.items")) .withColumn("extended_price", F.col("item.price") * F.col("item.qty"))