From 492ea595c4ac6b5ff80faec83382a975a8a0aa5f Mon Sep 17 00:00:00 2001 From: menishmueli Date: Sat, 11 Apr 2026 23:15:55 +0300 Subject: [PATCH 1/6] Add Apache Gluten/Velox support to DataFlint UI - Add Gluten/Velox node type classification, display names, and accelerator badges (Velox, Photon, RAPIDS, DataFusion) in the SQL plan flow view - Fix stage identification for Gluten's WholeStageCodegenTransformer nodes by inferring codegen-to-node mapping and handling AQE codegen renumbering - Split ColumnarExchange into write/read visual nodes across stage boundaries - Propagate stages through Gluten-specific boundary nodes (VeloxResizeBatches, RowToVeloxColumnar, TakeOrderedAndProjectExecTransformer, etc.) - Show Velox native timing metrics (aggregation/filter/sort/window time, peak memory, spill) on plan nodes - Strip Gluten class name prefixes from plan descriptions in parsers - Add Docker environment and example app for running Gluten/Velox on Spark 3.5 - Add unit test for Gluten stage assignment with real fixture data --- docker/gluten/.gitignore | 3 + docker/gluten/Dockerfile | 61 ++ docker/gluten/docker-compose.yml | 16 + docker/gluten/run-gluten-example.sh | 143 ++++ .../example/GlutenVeloxExample.scala | 116 +++ .../components/SqlFlow/SqlLayoutService.ts | 2 +- spark-ui/src/components/SqlFlow/StageNode.tsx | 36 + spark-ui/src/reducers/PlanGraphUtils.ts | 2 +- .../reducers/PlanParsers/ExchangeParser.ts | 2 +- .../src/reducers/PlanParsers/FilterParser.ts | 1 + .../src/reducers/PlanParsers/ProjectParser.ts | 8 +- .../src/reducers/PlanParsers/WindowParser.ts | 2 +- spark-ui/src/reducers/SQLNodeStageReducer.ts | 62 +- spark-ui/src/reducers/SqlReducer.ts | 55 +- spark-ui/src/reducers/SqlReducerUtils.ts | 164 +++- .../__tests__/GlutenStageAssignment.spec.ts | 157 ++++ .../__tests__/gluten-sql4-fixture.json | 744 ++++++++++++++++++ 17 files changed, 1551 insertions(+), 23 deletions(-) create mode 100644 docker/gluten/.gitignore create mode 100644 docker/gluten/Dockerfile create mode 100644 docker/gluten/docker-compose.yml create mode 100755 docker/gluten/run-gluten-example.sh create mode 100644 spark-plugin/example_3_5_1/src/main/scala/io/dataflint/example/GlutenVeloxExample.scala create mode 100644 spark-ui/src/reducers/__tests__/GlutenStageAssignment.spec.ts create mode 100644 spark-ui/src/reducers/__tests__/gluten-sql4-fixture.json diff --git a/docker/gluten/.gitignore b/docker/gluten/.gitignore new file mode 100644 index 00000000..e486dd31 --- /dev/null +++ b/docker/gluten/.gitignore @@ -0,0 +1,3 @@ +jars/ +test_data/ +spark-events/ diff --git a/docker/gluten/Dockerfile b/docker/gluten/Dockerfile new file mode 100644 index 00000000..f78dc570 --- /dev/null +++ b/docker/gluten/Dockerfile @@ -0,0 +1,61 @@ +# Spark + Gluten/Velox + DataFlint example runner +# +# Build arguments: +# SPARK_VERSION - Spark version (default: 3.5.7) +# GLUTEN_JAR - Filename of the Gluten bundle jar in jars/ directory +# +# Usage: +# ./run-gluten-example.sh (recommended — builds everything and runs) +# docker compose up --build (if jars are already in jars/) + +ARG SPARK_VERSION=3.5.7 + +FROM apache/spark:${SPARK_VERSION} + +ARG SPARK_VERSION=3.5.7 + +USER root + +# Create directories for event logs and test data +RUN mkdir -p /tmp/spark-events && \ + chown -R spark:spark /tmp/spark-events && \ + mkdir -p /opt/spark/work-dir/test_data && \ + chown -R spark:spark /opt/spark/work-dir/test_data + +# Copy all jars (Gluten bundle + DataFlint plugin + example) into Spark's jars dir +COPY jars/*.jar /opt/spark/jars/ + +# Copy test data +COPY test_data/ /opt/spark/work-dir/test_data/ + +# Configure Spark defaults for Gluten +# The --add-opens flags are required because the Gluten nightly (JDK8 target) uses +# sun.misc.Unsafe / DirectByteBuffer internals that are module-restricted on Java 11+. +RUN mkdir -p /opt/spark/conf && \ + echo "spark.plugins=io.dataflint.spark.SparkDataflintPlugin,org.apache.gluten.GlutenPlugin" >> /opt/spark/conf/spark-defaults.conf && \ + echo "spark.shuffle.manager=org.apache.spark.shuffle.sort.ColumnarShuffleManager" >> /opt/spark/conf/spark-defaults.conf && \ + echo "spark.memory.offHeap.enabled=true" >> /opt/spark/conf/spark-defaults.conf && \ + echo "spark.memory.offHeap.size=4g" >> /opt/spark/conf/spark-defaults.conf && \ + echo "spark.eventLog.enabled=true" >> /opt/spark/conf/spark-defaults.conf && \ + echo "spark.eventLog.dir=/tmp/spark-events" >> /opt/spark/conf/spark-defaults.conf && \ + echo "spark.ui.port=10000" >> /opt/spark/conf/spark-defaults.conf && \ + echo "spark.dataflint.telemetry.enabled=false" >> /opt/spark/conf/spark-defaults.conf && \ + echo "spark.sql.maxMetadataStringLength=10000" >> /opt/spark/conf/spark-defaults.conf && \ + echo "spark.sql.adaptive.enabled=true" >> /opt/spark/conf/spark-defaults.conf && \ + echo "spark.driver.extraJavaOptions=--add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.misc=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED" >> /opt/spark/conf/spark-defaults.conf && \ + echo "spark.executor.extraJavaOptions=--add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.misc=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED" >> /opt/spark/conf/spark-defaults.conf + +USER spark + +EXPOSE 10000 + +WORKDIR /opt/spark/work-dir + +ENV _JAVA_OPTIONS="--add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.misc=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED -Dio.netty.tryReflectionSetAccessible=true" + +# Run the Gluten example via spark-submit +CMD ["/opt/spark/bin/spark-submit", \ + "--master", "local[*]", \ + "--class", "io.dataflint.example.GlutenVeloxExample", \ + "--driver-memory", "2g", \ + "/opt/spark/jars/example.jar"] diff --git a/docker/gluten/docker-compose.yml b/docker/gluten/docker-compose.yml new file mode 100644 index 00000000..4ac470e8 --- /dev/null +++ b/docker/gluten/docker-compose.yml @@ -0,0 +1,16 @@ +services: + spark-gluten-example: + build: + context: . + dockerfile: Dockerfile + args: + SPARK_VERSION: ${SPARK_VERSION:-3.5.7} + image: dataflint-gluten-example:${SPARK_VERSION:-3.5.7} + container_name: dataflint-gluten-example + ports: + - "${SPARK_UI_PORT:-10000}:10000" + volumes: + - ${SPARK_EVENTS_DIR:-./spark-events}:/tmp/spark-events + environment: + - SPARK_NO_DAEMONIZE=true + restart: "no" diff --git a/docker/gluten/run-gluten-example.sh b/docker/gluten/run-gluten-example.sh new file mode 100755 index 00000000..a34e5cf4 --- /dev/null +++ b/docker/gluten/run-gluten-example.sh @@ -0,0 +1,143 @@ +#!/bin/bash +set -e + +# Run DataFlint Gluten/Velox Example +# +# This script: +# 1. Builds the DataFlint UI and plugin jar +# 2. Packages the Gluten example app +# 3. Downloads the Gluten nightly bundle jar (cached) +# 4. Builds and runs the Docker container +# +# Prerequisites: Node.js 20+, Java 8+, sbt, Docker +# +# Usage: +# ./run-gluten-example.sh # full build + run +# ./run-gluten-example.sh --skip-build # skip sbt/npm, just rebuild Docker +# ./run-gluten-example.sh --amd64 # force x86_64 (Rosetta 2 emulation) + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)" +JARS_DIR="$SCRIPT_DIR/jars" +TEST_DATA_DIR="$SCRIPT_DIR/test_data" +SPARK_EVENTS_DIR="$SCRIPT_DIR/spark-events" + +SPARK_VERSION="${SPARK_VERSION:-3.5.7}" +SCALA_VERSION="${SCALA_VERSION:-2.12}" + +SKIP_BUILD=false +FORCE_AMD64=false + +for arg in "$@"; do + case $arg in + --skip-build) SKIP_BUILD=true ;; + --amd64) FORCE_AMD64=true ;; + esac +done + +# Detect architecture for Gluten jar download +ARCH=$(uname -m) +if [ "$FORCE_AMD64" = true ]; then + GLUTEN_ARCH="linux_amd64" + DOCKER_PLATFORM="--platform linux/amd64" +elif [ "$ARCH" = "arm64" ] || [ "$ARCH" = "aarch64" ]; then + GLUTEN_ARCH="linux_aarch64" + DOCKER_PLATFORM="" +else + GLUTEN_ARCH="linux_amd64" + DOCKER_PLATFORM="" +fi + +GLUTEN_JAR_NAME="gluten-velox-bundle-spark3.5_2.12-${GLUTEN_ARCH}-1.7.0-SNAPSHOT.jar" +GLUTEN_JAR_URL="https://nightlies.apache.org/gluten/nightly-release-jdk8/${GLUTEN_JAR_NAME}" + +echo "=== DataFlint Gluten/Velox Example ===" +echo "Project root: $PROJECT_ROOT" +echo "Spark version: $SPARK_VERSION" +echo "Architecture: $GLUTEN_ARCH" +echo "Gluten jar: $GLUTEN_JAR_NAME" +echo "" + +mkdir -p "$JARS_DIR" +mkdir -p "$SPARK_EVENTS_DIR" + +# --- Step 1: Download Gluten nightly jar (cached) --- +echo "=== Step 1: Downloading Gluten nightly jar ===" +if [ -f "$JARS_DIR/$GLUTEN_JAR_NAME" ]; then + echo "Gluten jar already cached: $JARS_DIR/$GLUTEN_JAR_NAME" +else + echo "Downloading: $GLUTEN_JAR_URL" + curl -fSL -o "$JARS_DIR/$GLUTEN_JAR_NAME" "$GLUTEN_JAR_URL" + echo "Downloaded successfully." +fi + +if [ "$SKIP_BUILD" = false ]; then + # --- Step 2: Build DataFlint UI --- + echo "" + echo "=== Step 2: Building DataFlint UI ===" + cd "$PROJECT_ROOT/spark-ui" + if [ ! -d "node_modules" ]; then + echo "Installing npm dependencies..." + npm ci + fi + echo "Building and deploying UI into plugin resources..." + npm run deploy + + # --- Step 3: Build DataFlint plugin jar --- + echo "" + echo "=== Step 3: Building DataFlint plugin jar ===" + cd "$PROJECT_ROOT/spark-plugin" + export SBT_OPTS="-Xmx4G -Xss2M -XX:+UseG1GC" + sbt "pluginspark3/assembly" + + # --- Step 4: Package example jar --- + echo "" + echo "=== Step 4: Packaging example jar ===" + sbt "example_3_5_1/package" +fi + +# --- Step 5: Copy jars to docker context --- +echo "" +echo "=== Step 5: Copying jars to Docker context ===" + +# DataFlint plugin jar +PLUGIN_JAR=$(find "$PROJECT_ROOT/spark-plugin/pluginspark3/target/scala-${SCALA_VERSION}" -name "spark_${SCALA_VERSION}-*.jar" -type f | head -1) +if [ -z "$PLUGIN_JAR" ]; then + echo "ERROR: DataFlint plugin jar not found. Run without --skip-build first." + exit 1 +fi +cp "$PLUGIN_JAR" "$JARS_DIR/dataflint-plugin.jar" +echo "Copied DataFlint plugin: $(basename "$PLUGIN_JAR")" + +# Example jar +EXAMPLE_JAR=$(find "$PROJECT_ROOT/spark-plugin/example_3_5_1/target/scala-${SCALA_VERSION}" -name "DataflintSparkExample351_${SCALA_VERSION}-*.jar" -type f | head -1) +if [ -z "$EXAMPLE_JAR" ]; then + echo "ERROR: Example jar not found. Run without --skip-build first." + exit 1 +fi +cp "$EXAMPLE_JAR" "$JARS_DIR/example.jar" +echo "Copied example jar: $(basename "$EXAMPLE_JAR")" + +echo "Gluten jar: $GLUTEN_JAR_NAME" + +# --- Step 6: Copy test data --- +echo "" +echo "=== Step 6: Copying test data ===" +rm -rf "$TEST_DATA_DIR" +cp -r "$PROJECT_ROOT/spark-plugin/test_data" "$TEST_DATA_DIR" +echo "Copied test_data/" + +# --- Step 7: Build and run Docker --- +echo "" +echo "=== Step 7: Building and running Docker container ===" +cd "$SCRIPT_DIR" + +# Stop any previous container +docker compose down 2>/dev/null || true + +# Build with platform flag if needed +if [ -n "$DOCKER_PLATFORM" ]; then + DOCKER_DEFAULT_PLATFORM=linux/amd64 docker compose up --build +else + docker compose up --build +fi diff --git a/spark-plugin/example_3_5_1/src/main/scala/io/dataflint/example/GlutenVeloxExample.scala b/spark-plugin/example_3_5_1/src/main/scala/io/dataflint/example/GlutenVeloxExample.scala new file mode 100644 index 00000000..f435e8a1 --- /dev/null +++ b/spark-plugin/example_3_5_1/src/main/scala/io/dataflint/example/GlutenVeloxExample.scala @@ -0,0 +1,116 @@ +package io.dataflint.example + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.expressions.Window +import org.apache.spark.sql.functions._ + +object GlutenVeloxExample extends App { + val spark = SparkSession + .builder() + .appName("GlutenVeloxExample") + .config("spark.plugins", "io.dataflint.spark.SparkDataflintPlugin,org.apache.gluten.GlutenPlugin") + .config("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + .config("spark.memory.offHeap.enabled", "true") + .config("spark.memory.offHeap.size", "4g") + .config("spark.ui.port", "10000") + .config("spark.eventLog.enabled", "true") + .config("spark.eventLog.dir", "/tmp/spark-events") + .config("spark.dataflint.telemetry.enabled", value = false) + .config("spark.sql.maxMetadataStringLength", "10000") + .config("spark.sql.adaptive.enabled", "true") + .master("local[*]") + .getOrCreate() + + import spark.implicits._ + + def shakespeareDF: DataFrame = spark.read + .format("csv") + .option("sep", ";") + .option("inferSchema", true) + .load("./test_data/will_play_text.csv") + .toDF("line_id", "play_name", "speech_number", "line_number", "speaker", "text_entry") + + // --- Filter + Project --- + spark.sparkContext.setJobDescription("Filter and Select") + val filtered = shakespeareDF + .filter($"speaker".isNotNull && $"line_id" > 100) + .select($"play_name", $"speaker", $"text_entry", $"speech_number") + filtered.show(10, truncate = false) + + // --- Aggregation (GroupBy + Count/Sum) --- + spark.sparkContext.setJobDescription("GroupBy Aggregation") + val linesPerSpeaker = shakespeareDF + .filter($"speaker".isNotNull) + .groupBy("play_name", "speaker") + .agg( + count("*").alias("line_count"), + sum("speech_number").alias("total_speech_numbers"), + avg("speech_number").alias("avg_speech_number") + ) + linesPerSpeaker.show(20, truncate = false) + + // --- Sort --- + spark.sparkContext.setJobDescription("Sort by line count") + val sortedSpeakers = linesPerSpeaker + .orderBy(col("line_count").desc) + sortedSpeakers.show(20, truncate = false) + + // --- Broadcast Hash Join --- + spark.sparkContext.setJobDescription("Broadcast Hash Join") + val topSpeakers = linesPerSpeaker + .filter($"line_count" > 50) + .select($"speaker".alias("top_speaker"), $"play_name".alias("top_play")) + val broadcastJoined = shakespeareDF + .join(broadcast(topSpeakers), $"speaker" === $"top_speaker" && $"play_name" === $"top_play") + println(s"Broadcast join result count: ${broadcastJoined.count()}") + + // --- Sort Merge Join (disable broadcast to force SMJ) --- + spark.sparkContext.setJobDescription("Sort Merge Join") + spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) + val plays1 = shakespeareDF + .groupBy("play_name") + .agg(count("*").alias("total_lines")) + .repartition(10) + val plays2 = shakespeareDF + .groupBy("play_name") + .agg(countDistinct("speaker").alias("unique_speakers")) + .repartition(10) + val smjResult = plays1.join(plays2, Seq("play_name")) + smjResult.show(20, truncate = false) + spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10485760) + + // --- Window Functions --- + spark.sparkContext.setJobDescription("Window Functions") + val speakerWindow = Window.partitionBy("play_name").orderBy(col("line_count").desc) + val rankedSpeakers = linesPerSpeaker + .withColumn("rank", rank().over(speakerWindow)) + .withColumn("dense_rank", dense_rank().over(speakerWindow)) + .withColumn("total_in_play", sum("line_count").over(Window.partitionBy("play_name"))) + .withColumn("pct", round(col("line_count") / col("total_in_play") * 100, 2)) + .filter(col("rank") <= 3) + .orderBy("play_name", "rank") + rankedSpeakers.show(30, truncate = false) + + // --- Explode / Generate --- + spark.sparkContext.setJobDescription("Explode words from text") + val words = shakespeareDF + .filter($"text_entry".isNotNull) + .select($"play_name", $"speaker", explode(split($"text_entry", "\\s+")).alias("word")) + .filter(length($"word") > 0) + val wordCounts = words + .groupBy("word") + .agg(count("*").alias("word_count")) + .orderBy(col("word_count").desc) + wordCounts.show(20, truncate = false) + + // --- Union + distinct --- + spark.sparkContext.setJobDescription("Union and Distinct") + val hamlet = shakespeareDF.filter($"play_name" === "Hamlet").select("speaker") + val macbeth = shakespeareDF.filter($"play_name" === "macbeth").select("speaker") + val allSpeakers = hamlet.union(macbeth).distinct() + println(s"Distinct speakers in Hamlet + Macbeth: ${allSpeakers.count()}") + + println("GlutenVeloxExample completed. Spark UI available at http://localhost:10000") + println("Press Ctrl+C to stop.") + Thread.sleep(Long.MaxValue) +} diff --git a/spark-ui/src/components/SqlFlow/SqlLayoutService.ts b/spark-ui/src/components/SqlFlow/SqlLayoutService.ts index b6ce8175..b1276a01 100644 --- a/spark-ui/src/components/SqlFlow/SqlLayoutService.ts +++ b/spark-ui/src/components/SqlFlow/SqlLayoutService.ts @@ -593,7 +593,7 @@ class SqlLayoutService { const splitExchangeNodeIds = new Set(); for (const nodeId of nodesIds) { const node = nodeMap.get(nodeId); - if (node?.nodeName === "Exchange") { + if (node?.nodeName === "Exchange" || node?.nodeName === "ColumnarExchange") { splitExchangeNodeIds.add(nodeId.toString()); } } diff --git a/spark-ui/src/components/SqlFlow/StageNode.tsx b/spark-ui/src/components/SqlFlow/StageNode.tsx index 6abd5859..1bdb8c55 100644 --- a/spark-ui/src/components/SqlFlow/StageNode.tsx +++ b/spark-ui/src/components/SqlFlow/StageNode.tsx @@ -1,5 +1,6 @@ import ErrorIcon from "@mui/icons-material/Error"; import FlagIcon from "@mui/icons-material/Flag"; +import RocketLaunchIcon from "@mui/icons-material/RocketLaunch"; import WarningIcon from "@mui/icons-material/Warning"; import { Alert, AlertTitle, Box, Tooltip, Typography } from "@mui/material"; import React, { FC, memo, useMemo } from "react"; @@ -7,6 +8,7 @@ import { useSearchParams } from "react-router-dom"; import { Handle, Position } from "reactflow"; import { Alert as AppAlert, EnrichedSqlNode, SQLNodeExchangeStageData, SQLNodeStageData } from "../../interfaces/AppStore"; import { humanFileSize, parseBytesString } from "../../utils/FormatUtils"; +import { getNodeAccelerator } from "../../reducers/SqlReducerUtils"; import { TransperantTooltip } from "../AlertBadge/AlertBadge"; import MetricDisplay, { MetricWithTooltip } from "./MetricDisplay"; import { @@ -49,6 +51,7 @@ const StageNodeComponent: FC = ({ data }) => { + // Memoized computations for better performance const { isHighlighted, allMetrics, hasDeltaOptimizeWrite, displayName, variantStage, variantDuration, variantDurationPercentage } = useMemo(() => { // Parse nodeIds from URL parameters @@ -328,6 +331,39 @@ const StageNodeComponent: FC = ({ data }) => { )} + {/* Accelerator badge - bottom left corner */} + {(() => { + const accel = getNodeAccelerator(data.node.nodeName); + if (!accel) return null; + return ( + + + + + {accel.label} + + + + ); + })()} + {/* Alert badge */} {sqlNodeAlert && ( { if ( - node.nodeName == "CollectLimit" || - node.nodeName === "BroadcastExchange" + node.nodeName === "CollectLimit" || + node.nodeName === "ColumnarCollectLimit" || + node.nodeName === "BroadcastExchange" || + node.nodeName === "ColumnarBroadcastExchange" || + node.nodeName === "VeloxResizeBatches" || + node.nodeName === "RowToVeloxColumnar" ) { + if (node.stage !== undefined) return node; + const previousNode = findPreviousNode(node.nodeId); + if (previousNode !== undefined && previousNode.stage !== undefined) { + return { ...node, stage: previousNode.stage }; + } + } + return node; + }); + rebuildNodeMap(); + // TakeOrderedAndProjectExecTransformer and VeloxColumnarToRow: inherit from next node + nodes = nodes.map((node) => { + if ( + node.nodeName === "TakeOrderedAndProjectExecTransformer" || + node.nodeName === "VeloxColumnarToRow" + ) { + if (node.stage !== undefined) return node; const previousNode = findPreviousNode(node.nodeId); if (previousNode !== undefined && previousNode.stage !== undefined) { return { ...node, stage: previousNode.stage }; @@ -108,7 +128,7 @@ export function calculateSQLNodeStage(sql: EnrichedSparkSQL, sqlStages: SparkSta return node; }); nodes = nodes.map((node) => { - if (node.nodeName === "AQEShuffleRead" || node.nodeName === "Coalesce" || + if (node.nodeName === "AQEShuffleRead" || node.nodeName === "Coalesce" || node.nodeName === "CoalesceExecTransformer" || node.nodeName === "BatchEvalPython" || node.nodeName === "DataFlintBatchEvalPython" || node.nodeName === "MapInPandas" || node.nodeName === "DataFlintMapInPandas" || node.nodeName === "MapInArrow" || node.nodeName === "PythonMapInArrow" || node.nodeName === "DataFlintMapInArrow" || @@ -116,7 +136,7 @@ export function calculateSQLNodeStage(sql: EnrichedSparkSQL, sqlStages: SparkSta node.nodeName === "FlatMapGroupsInPandas" || node.nodeName === "DataFlintFlatMapGroupsInPandas" || node.nodeName === "FlatMapCoGroupsInPandas" || node.nodeName === "DataFlintFlatMapCoGroupsInPandas" || node.nodeName === "WindowInPandas" || node.nodeName === "DataFlintWindowInPandas" || node.nodeName === "DataFlintArrowWindowPython" || - node.nodeName === "Window" || node.nodeName === "DataFlintWindow") { + node.nodeName === "Window" || node.nodeName === "DataFlintWindow" || node.nodeName === "WindowExecTransformer") { const nextNode = findNextNode(node.nodeId); if (nextNode !== undefined && nextNode.stage !== undefined) { return { ...node, stage: nextNode.stage }; @@ -128,7 +148,7 @@ export function calculateSQLNodeStage(sql: EnrichedSparkSQL, sqlStages: SparkSta nodes = nodes.map((node) => { // Convert Exchange nodes to exchange stage type if they have adjacent nodes with stage info // This handles both nodes without stage data and nodes with onestage type that should be exchange type - if (node.nodeName === "Exchange" && (node.stage === undefined || node.stage.type === "onestage")) { + if ((node.nodeName === "Exchange" || node.nodeName === "ColumnarExchange") && (node.stage === undefined || node.stage.type === "onestage")) { const nextNode = findNextNode(node.nodeId); const previousNode = findPreviousNode(node.nodeId); const metricsExchangeStageIds = findExchangeStageIds(node.metrics); @@ -202,7 +222,7 @@ export function calculateSQLNodeStage(sql: EnrichedSparkSQL, sqlStages: SparkSta return node; }); nodes = nodes.map((node) => { - if (node.nodeName === "Window" && node.stage === undefined) { + if ((node.nodeName === "Window" || node.nodeName === "WindowExecTransformer") && node.stage === undefined) { // For Window nodes, try to find stage from next node first, then previous node const nextNode = findNextNode(node.nodeId); if (nextNode !== undefined && nextNode.stage !== undefined) { @@ -216,7 +236,7 @@ export function calculateSQLNodeStage(sql: EnrichedSparkSQL, sqlStages: SparkSta return node; }); nodes = nodes.map((node) => { - if (node.nodeName === "Union" && node.stage === undefined) { + if ((node.nodeName === "Union" || node.nodeName === "ColumnarUnion") && node.stage === undefined) { const nextNode = findNextNode(node.nodeId); if (nextNode !== undefined && nextNode.stage !== undefined) { return { ...node, stage: nextNode.stage }; @@ -402,6 +422,18 @@ export function calculateSqlStage( } } + // Collect stage IDs that have WholeStageCodegenTransformer in their RDD data + const stageCodegenNames = new Map(); + for (const stage of sqlStages) { + if (stage.stagesRdd !== undefined) { + for (const value of Object.values(stage.stagesRdd)) { + if (typeof value === "string" && value.startsWith("WholeStageCodegenTransformer")) { + stageCodegenNames.set(stage.stageId, value); + } + } + } + } + const codegenNodes = sql.codegenNodes.map((node) => { const stageIdByName = rddValueToStageId.get(node.nodeName); const stageIdByRddScope = node.rddScopeId !== undefined ? rddKeyToStageId.get(node.rddScopeId) : undefined; @@ -411,6 +443,20 @@ export function calculateSqlStage( }; }); + // Fallback: AQE may renumber codegen IDs at runtime (e.g., plan has codegen (2) but + // the actual stage has codegen (3)). Match unmatched codegen nodes to unmatched stages + // by ordering. + const matchedStageIds = new Set(codegenNodes.filter(cg => cg.stage !== undefined).map(cg => cg.stage!.type === "onestage" ? cg.stage!.stageId : -1)); + const unmatchedCodegens = codegenNodes.filter(cg => cg.stage === undefined); + const unmatchedStages = Array.from(stageCodegenNames.keys()).filter(sid => !matchedStageIds.has(sid)).sort((a, b) => a - b); + + if (unmatchedCodegens.length > 0 && unmatchedStages.length > 0) { + const sortedUnmatched = [...unmatchedCodegens].sort((a, b) => (a.wholeStageCodegenId ?? 0) - (b.wholeStageCodegenId ?? 0)); + for (let i = 0; i < Math.min(sortedUnmatched.length, unmatchedStages.length); i++) { + sortedUnmatched[i].stage = stageDataFromStage(unmatchedStages[i], stages); + } + } + // Build codegen lookup map, excluding duplicate codegen IDs // If the same codegen ID appears multiple times, we can't reliably determine which stage it belongs to const codegenByWholeStageId = new Map(); @@ -485,7 +531,7 @@ export function calculateSqlStage( readArr.push(node); exchangeReadByStageId.set(node.stage.readStage, readArr); } - if (node.nodeName === "BroadcastExchange" && node?.stage?.type === "onestage") { + if ((node.nodeName === "BroadcastExchange" || node.nodeName === "ColumnarBroadcastExchange") && node?.stage?.type === "onestage") { const arr = broadcastByStageId.get(node.stage.stageId) ?? []; arr.push(node); broadcastByStageId.set(node.stage.stageId, arr); diff --git a/spark-ui/src/reducers/SqlReducer.ts b/spark-ui/src/reducers/SqlReducer.ts index 9ee57a2a..27f95d28 100644 --- a/spark-ui/src/reducers/SqlReducer.ts +++ b/spark-ui/src/reducers/SqlReducer.ts @@ -103,22 +103,27 @@ export function parseNodePlan( case "HashAggregate": case "SortAggregate": case "ObjectHashAggregate": + case "FlushableHashAggregateExecTransformer": + case "RegularHashAggregateExecTransformer": return { type: "HashAggregate", plan: parseHashAggregate(plan.planDescription), }; case "TakeOrderedAndProject": + case "TakeOrderedAndProjectExecTransformer": return { type: "TakeOrderedAndProject", plan: parseTakeOrderedAndProject(plan.planDescription), }; case "CollectLimit": + case "ColumnarCollectLimit": return { type: "CollectLimit", plan: parseCollectLimit(plan.planDescription), }; case "Coalesce": + case "CoalesceExecTransformer": return { type: "Coalesce", plan: parseCoalesce(plan.planDescription), @@ -150,6 +155,7 @@ export function parseNodePlan( case "GpuFilter": case "CometFilter": case "Filter": + case "FilterExecTransformer": return { type: "Filter", plan: parseFilter(plan.planDescription), @@ -158,6 +164,8 @@ export function parseNodePlan( case "CometExchange": case "CometColumnarExchange": case "GpuColumnarExchange": + case "ColumnarExchange": + case "ColumnarBroadcastExchange": return { type: "Exchange", plan: parseExchange(plan.planDescription), @@ -166,6 +174,7 @@ export function parseNodePlan( case "GpuProject": case "CometFilter": case "Project": + case "ProjectExecTransformer": return { type: "Project", plan: parseProject(plan.planDescription), @@ -173,6 +182,7 @@ export function parseNodePlan( case "GpuSort": case "CometSort": case "Sort": + case "SortExecTransformer": return { type: "Sort", plan: parseSort(plan.planDescription), @@ -182,6 +192,7 @@ export function parseNodePlan( case "WindowInPandas": case "DataFlintWindowInPandas": case "DataFlintArrowWindowPython": + case "WindowExecTransformer": return { type: "Window", plan: parseWindow(plan.planDescription), @@ -204,11 +215,13 @@ export function parseNodePlan( plan: parseBatchEvalPython(plan.planDescription), }; case "Generate": + case "GenerateExecTransformer": return { type: "Generate", plan: parseGenerate(plan.planDescription), }; case "Expand": + case "ExpandExecTransformer": return { type: "Expand", plan: parseExpand(plan.planDescription), @@ -318,11 +331,43 @@ function calculateSql( function extractCodegenId(): number | undefined { return parseInt( - node.nodeName.replace("WholeStageCodegen (", "").replace(")", ""), + node.nodeName + .replace("WholeStageCodegenTransformer (", "") + .replace("WholeStageCodegen (", "") + .replace(")", ""), ); } }); + // For Gluten/Velox: WholeStageCodegenTransformer nodes are disconnected orphans in the graph + // and Spark doesn't set wholeStageCodegenId on their child nodes. Infer it from node ID ordering: + // a codegen node at ID X contains the pipeline nodes at IDs X+1, X+2, ... until hitting + // a stage boundary (exchange, AQE, scan) or another codegen node. + const hasGlutenCodegen = typeEnrichedNodes.some( + (n) => n.isCodegenNode && n.nodeName.includes("Transformer"), + ); + if (hasGlutenCodegen) { + const stageBoundaryNames = new Set([ + "ColumnarExchange", "ColumnarBroadcastExchange", "Exchange", "BroadcastExchange", + "AQEShuffleRead", "VeloxResizeBatches", "RowToVeloxColumnar", "VeloxColumnarToRow", + "ColumnarCollectLimit", "AdaptiveSparkPlan", "ColumnarUnion", + ]); + const sorted = [...typeEnrichedNodes].sort((a, b) => a.nodeId - b.nodeId); + let currentCodegenId: number | undefined = undefined; + for (const node of sorted) { + if (node.isCodegenNode) { + currentCodegenId = node.wholeStageCodegenId; + } else if (stageBoundaryNames.has(node.nodeName) || node.nodeName.includes("Scan")) { + currentCodegenId = undefined; + } else if ( + currentCodegenId !== undefined && + node.wholeStageCodegenId === undefined + ) { + node.wholeStageCodegenId = currentCodegenId; + } + } + } + const onlyCodeGenNodes = typeEnrichedNodes .filter((node) => node.isCodegenNode) .map((node) => { @@ -626,8 +671,10 @@ function calcCodegenDuration(metrics: EnrichedSqlMetric[]): number | undefined { function calcExchangeMetrics(nodeName: string, metrics: EnrichedSqlMetric[]) { var exchangeMetrics: ExchangeMetrics | undefined = undefined; - if (nodeName == "Exchange") { - const writeDuration = getMetricDuration("shuffle write time", metrics) ?? 0; + if (nodeName === "Exchange" || nodeName === "ColumnarExchange") { + const writeDuration = + (getMetricDuration("shuffle write time", metrics) ?? 0) + + (getMetricDuration("shuffle wall time", metrics) ?? 0); const readDuration = (getMetricDuration("fetch wait time", metrics) ?? 0) + (getMetricDuration("remote reqs duration", metrics) ?? 0) + @@ -645,7 +692,7 @@ function calcBroadcastExchangeDuration( nodeName: string, metrics: EnrichedSqlMetric[], ): number | undefined { - if (nodeName == "BroadcastExchange") { + if (nodeName === "BroadcastExchange" || nodeName === "ColumnarBroadcastExchange") { const duration = getMetricDuration("time to broadcast", metrics) ?? 0; +(getMetricDuration("time to build", metrics) ?? 0) + (getMetricDuration("time to collect", metrics) ?? 0); diff --git a/spark-ui/src/reducers/SqlReducerUtils.ts b/spark-ui/src/reducers/SqlReducerUtils.ts index 4e3b14f9..e0a80cf7 100644 --- a/spark-ui/src/reducers/SqlReducerUtils.ts +++ b/spark-ui/src/reducers/SqlReducerUtils.ts @@ -34,7 +34,14 @@ const metricAllowlist: Record> = { "total number of files merged by ZOrderBy", "total bytes in files merged by ZOrderBy", ], - join: ["number of output rows", "output columnar batches"], + join: [ + "number of output rows", + "output columnar batches", + "number of hash build input rows", + "number of hash probe input rows", + "time of hash build", + "time of hash probe", + ], transformation: [ "number of output rows", "output columnar batches", @@ -42,6 +49,13 @@ const metricAllowlist: Record> = { "data sent to Python workers", "data returned from Python workers", "duration", + "number of input rows", + "time of aggregation", + "time of filter", + "time of window", + "time of generate", + "number of spilled bytes", + "peak memory bytes", ], shuffle: [ "number of partitions", @@ -57,10 +71,19 @@ const metricAllowlist: Record> = { "remote bytes read", "fetch wait time", "data size", + "number of input rows", + "number of input batches", + "number of output batches", ], broadcast: ["number of output rows", "data size", "output columnar batches"], - sort: ["spill size", "output columnar batches"], + sort: [ + "spill size", + "output columnar batches", + "time of sort", + "number of spilled bytes", + "peak memory bytes", + ], other: [], }; @@ -86,6 +109,19 @@ const metricsValueTransformer: Record< "remote bytes read": extractTotalFromStatisticsMetric, "fetch wait time": extractTotalFromStatisticsMetric, "data size": extractTotalFromStatisticsMetric, + "time of aggregation": extractTotalFromStatisticsMetric, + "time of filter": extractTotalFromStatisticsMetric, + "time of sort": extractTotalFromStatisticsMetric, + "time of window": extractTotalFromStatisticsMetric, + "time of generate": extractTotalFromStatisticsMetric, + "time of hash build": extractTotalFromStatisticsMetric, + "time of hash probe": extractTotalFromStatisticsMetric, + "number of spilled bytes": (value: string) => { + const total = extractTotalFromStatisticsMetric(value); + if (total === undefined || total === "0.0 B" || total === "0 B") return undefined; + return total; + }, + "peak memory bytes": extractTotalFromStatisticsMetric, "number of dynamic part": (value: string) => { // if dynamic part is 0 we want to remove it from metrics if (value === "0") { @@ -134,6 +170,20 @@ const metricsRenamer: Record = { "number of read streams": "number of read streams", "parsing time for BQ": "parsing time", "number of BQ bytes read": "bytes read", + "number of input rows": "input rows", + "number of input batches": "input batches", + "number of output batches": "output batches", + "number of hash build input rows": "build input rows", + "number of hash probe input rows": "probe input rows", + "time of aggregation": "aggregation time", + "time of filter": "filter time", + "time of sort": "sort time", + "time of window": "window time", + "time of generate": "generate time", + "time of hash build": "hash build time", + "time of hash probe": "hash probe time", + "number of spilled bytes": "spill", + "peak memory bytes": "peak memory", }; const nodeTypeDict: Record = { @@ -220,6 +270,31 @@ const nodeTypeDict: Record = { DataFlintWindow: "transformation", Generate: "transformation", Expand: "transformation", + FilterExecTransformer: "transformation", + ProjectExecTransformer: "transformation", + FlushableHashAggregateExecTransformer: "transformation", + RegularHashAggregateExecTransformer: "transformation", + SortExecTransformer: "sort", + BroadcastHashJoinExecTransformer: "join", + ShuffledHashJoinExecTransformer: "join", + SortMergeJoinExecTransformer: "join", + ColumnarExchange: "shuffle", + ColumnarBroadcastExchange: "broadcast", + WindowExecTransformer: "transformation", + GenerateExecTransformer: "transformation", + TakeOrderedAndProjectExecTransformer: "output", + ColumnarCollectLimit: "output", + ColumnarUnion: "join", + VeloxColumnarToRow: "other", + RowToVeloxColumnar: "other", + VeloxResizeBatches: "other", + InputIteratorTransformer: "other", + BatchScanExecTransformer: "input", + FileSourceScanExecTransformer: "input", + ExpandExecTransformer: "transformation", + CoalesceExecTransformer: "shuffle", + LimitTransformer: "output", + CartesianProductExecTransformer: "join", }; const nodeRenamerDict: Record = { @@ -311,6 +386,31 @@ const nodeRenamerDict: Record = { DataFlintWindowInPandas: "Window (with Pandas UDF)", DataFlintArrowWindowPython: "Window (with Arrow UDF)", Expand: "Expand", + FilterExecTransformer: "Filter (Velox)", + ProjectExecTransformer: "Select (Velox)", + FlushableHashAggregateExecTransformer: "Aggregate Within Partition (Velox)", + RegularHashAggregateExecTransformer: "Aggregate By Merge (Velox)", + SortExecTransformer: "Sort (Velox)", + BroadcastHashJoinExecTransformer: "Join (Broadcast Hash) (Velox)", + ShuffledHashJoinExecTransformer: "Join (Shuffled Hash) (Velox)", + SortMergeJoinExecTransformer: "Join (Sort Merge) (Velox)", + ColumnarExchange: "Repartition (Velox)", + ColumnarBroadcastExchange: "Broadcast (Velox)", + WindowExecTransformer: "Window (Velox)", + GenerateExecTransformer: "Generate (Velox)", + TakeOrderedAndProjectExecTransformer: "Take Ordered (Velox)", + ColumnarCollectLimit: "Collect (Velox)", + ColumnarUnion: "Union (Velox)", + VeloxColumnarToRow: "Columnar To Row", + RowToVeloxColumnar: "Row To Columnar", + VeloxResizeBatches: "Resize Batches", + InputIteratorTransformer: "Input Iterator", + BatchScanExecTransformer: "Read (Velox)", + FileSourceScanExecTransformer: "Read (Velox)", + ExpandExecTransformer: "Expand (Velox)", + CoalesceExecTransformer: "Coalesce (Velox)", + LimitTransformer: "Limit (Velox)", + CartesianProductExecTransformer: "Join (Cartesian Product) (Velox)", }; export function extractTotalFromStatisticsMetric( @@ -579,7 +679,9 @@ export const EXCHANGE_NODE_TYPES = [ "CometColumnarExchange", "PhotonBroadcastExchange", "PhotonShuffleExchangeSink", - "PhotonShuffleExchangeSource" + "PhotonShuffleExchangeSource", + "ColumnarExchange", + "ColumnarBroadcastExchange", ]; /** @@ -663,6 +765,8 @@ export const AGGREGATE_NODE_NAMES = [ "HashAggregate", "SortAggregate", "ObjectHashAggregate", + "FlushableHashAggregateExecTransformer", + "RegularHashAggregateExecTransformer", ]; /** @@ -673,3 +777,57 @@ export const AGGREGATE_NODE_NAMES = [ export function isAggregateNode(nodeName: string): boolean { return AGGREGATE_NODE_NAMES.includes(nodeName); } + +export type AcceleratorType = "velox" | "photon" | "rapids" | "comet" | undefined; + +export interface AcceleratorInfo { + type: AcceleratorType; + label: string; + tooltip: string; + gradientFrom: string; + gradientTo: string; +} + +const ACCELERATOR_MAP: Record = {}; + +const VELOX_INFO: AcceleratorInfo = { type: "velox", label: "Velox", tooltip: "Accelerated by Apache Gluten (Velox native engine)", gradientFrom: "#e65100", gradientTo: "#ff6d00" }; +const PHOTON_INFO: AcceleratorInfo = { type: "photon", label: "Photon", tooltip: "Accelerated by Databricks Photon engine", gradientFrom: "#6a1b9a", gradientTo: "#ab47bc" }; +const RAPIDS_INFO: AcceleratorInfo = { type: "rapids", label: "RAPIDS", tooltip: "Accelerated by NVIDIA RAPIDS GPU engine", gradientFrom: "#1b5e20", gradientTo: "#43a047" }; +const COMET_INFO: AcceleratorInfo = { type: "comet", label: "DataFusion", tooltip: "Accelerated by Apache DataFusion Comet engine", gradientFrom: "#01579b", gradientTo: "#0288d1" }; + +[ + "FilterExecTransformer", "ProjectExecTransformer", + "FlushableHashAggregateExecTransformer", "RegularHashAggregateExecTransformer", + "SortExecTransformer", "BroadcastHashJoinExecTransformer", + "ShuffledHashJoinExecTransformer", "SortMergeJoinExecTransformer", + "WindowExecTransformer", "GenerateExecTransformer", + "TakeOrderedAndProjectExecTransformer", "ColumnarCollectLimit", + "ColumnarExchange", "ColumnarBroadcastExchange", "ColumnarUnion", + "ExpandExecTransformer", "CoalesceExecTransformer", "LimitTransformer", + "CartesianProductExecTransformer", "BatchScanExecTransformer", "FileSourceScanExecTransformer", +].forEach(n => ACCELERATOR_MAP[n] = VELOX_INFO); + +[ + "PhotonProject", "PhotonGroupingAgg", "PhotonShuffleExchangeSink", + "PhotonShuffleExchangeSource", "PhotonTopK", "PhotonFilter", + "PhotonBroadcastExchange", "PhotonBroadcastHashJoin", +].forEach(n => ACCELERATOR_MAP[n] = PHOTON_INFO); + +[ + "GpuFilter", "GpuBroadcastHashJoin", "GpuCoalesceBatches", + "GpuBroadcastExchange", "GpuProject", "GpuHashAggregate", + "GpuColumnarExchange", "GpuCustomShuffleReader", "GpuTopN", + "GpuShuffleCoalesce", "GpuSort", "GpuShuffledSymmetricHashJoin", + "GpuBroadcastNestedLoopJoin", +].forEach(n => ACCELERATOR_MAP[n] = RAPIDS_INFO); + +[ + "CometColumnarExchange", "CometHashAggregate", "CometExchange", + "CometProject", "CometFilter", "CometSort", + "CometHashJoin", "CometBroadcastHashJoin", "CometSortMergeJoin", +].forEach(n => ACCELERATOR_MAP[n] = COMET_INFO); + +export function getNodeAccelerator(nodeName: string): AcceleratorInfo | undefined { + return ACCELERATOR_MAP[nodeName]; +} + diff --git a/spark-ui/src/reducers/__tests__/GlutenStageAssignment.spec.ts b/spark-ui/src/reducers/__tests__/GlutenStageAssignment.spec.ts new file mode 100644 index 00000000..0929fe20 --- /dev/null +++ b/spark-ui/src/reducers/__tests__/GlutenStageAssignment.spec.ts @@ -0,0 +1,157 @@ +import fixture from "./gluten-sql4-fixture.json"; +import { EnrichedSparkSQL, EnrichedSqlEdge, EnrichedSqlNode, SparkStagesStore } from "../../interfaces/AppStore"; +import { calcNodeType } from "../SqlReducerUtils"; +import { calculateSqlStage } from "../SQLNodeStageReducer"; + +function buildEnrichedSql(): { sql: EnrichedSparkSQL; stages: SparkStagesStore; jobs: typeof fixture.jobs } { + const stageBoundaryNames = new Set([ + "ColumnarExchange", "ColumnarBroadcastExchange", "Exchange", "BroadcastExchange", + "AQEShuffleRead", "VeloxResizeBatches", "RowToVeloxColumnar", "VeloxColumnarToRow", + "ColumnarCollectLimit", "AdaptiveSparkPlan", "ColumnarUnion", + ]); + + // Step 1: Enrich nodes with type and wholeStageCodegenId (mimics calculateSql) + const rawNodes = fixture.sql.nodes.map((node) => { + const type = calcNodeType(node.nodeName); + const isCodegenNode = node.nodeName.includes("WholeStageCodegen"); + let wholeStageCodegenId: number | undefined = undefined; + if (isCodegenNode) { + wholeStageCodegenId = parseInt( + node.nodeName + .replace("WholeStageCodegenTransformer (", "") + .replace("WholeStageCodegen (", "") + .replace(")", ""), + ); + } + return { + ...node, + type, + isCodegenNode, + wholeStageCodegenId, + enrichedName: node.nodeName, + metrics: node.metrics.map(m => ({ ...m, stageId: undefined as number | undefined })), + } as unknown as EnrichedSqlNode; + }); + + // Step 2: Gluten codegen ID inference (mimics the logic in SqlReducer.ts) + const hasGlutenCodegen = rawNodes.some(n => n.isCodegenNode && n.nodeName.includes("Transformer")); + if (hasGlutenCodegen) { + const sorted = [...rawNodes].sort((a, b) => a.nodeId - b.nodeId); + let currentCodegenId: number | undefined = undefined; + for (const node of sorted) { + if (node.isCodegenNode) { + currentCodegenId = node.wholeStageCodegenId; + } else if (stageBoundaryNames.has(node.nodeName) || node.nodeName.includes("Scan")) { + currentCodegenId = undefined; + } else if (currentCodegenId !== undefined && node.wholeStageCodegenId === undefined) { + (node as any).wholeStageCodegenId = currentCodegenId; + } + } + } + + // Step 3: Separate codegen vs graph nodes + const codegenNodes = rawNodes + .filter(n => n.isCodegenNode) + .map(n => ({ ...n, codegenDuration: undefined as number | undefined, nodeIdFromMetrics: undefined as number | undefined })); + const graphNodes = rawNodes.filter(n => !n.isCodegenNode); + + // Mark last visible node as output if none exists + const hasOutput = graphNodes.some(n => n.type === "output"); + if (!hasOutput) { + const filtered = graphNodes.filter(n => n.nodeName !== "AdaptiveSparkPlan" && n.nodeName !== "ResultQueryStage"); + if (filtered.length > 0) { + filtered[filtered.length - 1].type = "output"; + } + } + + const edges: EnrichedSqlEdge[] = fixture.sql.edges.map(e => ({ fromId: e.fromId, toId: e.toId })); + + // Build stages store + const stages: SparkStagesStore = fixture.stages.map(s => ({ + stageId: s.stageId, + attemptId: s.attemptId, + name: "", + status: s.status, + numTasks: s.numTasks, + completedTasks: s.numCompleteTasks, + failedTasks: s.numFailedTasks, + activeTasks: s.numActiveTasks, + pendingTasks: s.numTasks - s.numCompleteTasks - s.numFailedTasks - s.numActiveTasks, + stageRealTimeDurationMs: undefined, + stagesRdd: fixture.stagesRdd[String(s.stageId) as keyof typeof fixture.stagesRdd], + durationDistribution: [0, 0, 0, 0, 0], + outputDistribution: [0, 0, 0, 0, 0], + outputRowsDistribution: [0, 0, 0, 0, 0], + inputDistribution: [0, 0, 0, 0, 0], + inputRowsDistribution: [0, 0, 0, 0, 0], + spillDiskDistriution: [0, 0, 0, 0, 0], + shuffleReadDistribution: [0, 0, 0, 0, 0], + shuffleWriteDistribution: [0, 0, 0, 0, 0], + stageProgress: 100, + metrics: { executorRunTime: s.executorRunTime }, + } as any)); + + const sql: EnrichedSparkSQL = { + id: fixture.sql.id, + description: fixture.sql.description, + successJobIds: fixture.sql.successJobIds, + runningJobIds: fixture.sql.runningJobIds, + failedJobIds: fixture.sql.failedJobIds, + nodes: graphNodes, + edges, + codegenNodes, + metricUpdateId: "test", + } as any; + + return { sql, stages, jobs: fixture.jobs }; +} + +describe("Gluten/Velox Stage Assignment - SQL 4 (Sort by line count)", () => { + it("should assign correct stages to all nodes", () => { + const { sql, stages, jobs } = buildEnrichedSql(); + + const result = calculateSqlStage(sql, stages, jobs as any); + + // Debug: print all node stages + for (const node of result.nodes) { + const stageInfo = node.stage + ? node.stage.type === "onestage" ? `onestage:${(node.stage as any).stageId}` + : node.stage.type === "exchange" ? `exchange:w=${(node.stage as any).writeStage},r=${(node.stage as any).readStage}` + : `${node.stage.type}` + : "NONE"; + console.log(` Node ${node.nodeId}: ${node.nodeName.padEnd(45)} stage=${stageInfo} wcid=${node.wholeStageCodegenId}`); + } + + // Pre-shuffle nodes should be in stage 8 + const scanNode = result.nodes.find(n => n.nodeName === "Scan csv"); + expect(scanNode?.stage?.type).toBe("onestage"); + expect((scanNode?.stage as any)?.stageId).toBe(8); + + const filterNode = result.nodes.find(n => n.nodeName === "FilterExecTransformer"); + expect(filterNode?.stage?.type).toBe("onestage"); + expect((filterNode?.stage as any)?.stageId).toBe(8); + + const flushableAgg = result.nodes.find(n => n.nodeName === "FlushableHashAggregateExecTransformer"); + expect(flushableAgg?.stage?.type).toBe("onestage"); + expect((flushableAgg?.stage as any)?.stageId).toBe(8); + + // ColumnarExchange should be split: write=8, read=10 + const exchange = result.nodes.find(n => n.nodeName === "ColumnarExchange"); + expect(exchange?.stage?.type).toBe("exchange"); + expect((exchange?.stage as any)?.writeStage).toBe(8); + expect((exchange?.stage as any)?.readStage).toBe(10); + + // Post-shuffle nodes should be in stage 10 + const aqeRead = result.nodes.find(n => n.nodeName === "AQEShuffleRead"); + expect(aqeRead?.stage?.type).toBe("onestage"); + expect((aqeRead?.stage as any)?.stageId).toBe(10); + + const regularAgg = result.nodes.find(n => n.nodeName === "RegularHashAggregateExecTransformer"); + expect(regularAgg?.stage?.type).toBe("onestage"); + expect((regularAgg?.stage as any)?.stageId).toBe(10); + + const takeOrdered = result.nodes.find(n => n.nodeName === "TakeOrderedAndProjectExecTransformer"); + expect(takeOrdered?.stage?.type).toBe("onestage"); + expect((takeOrdered?.stage as any)?.stageId).toBe(10); + }); +}); diff --git a/spark-ui/src/reducers/__tests__/gluten-sql4-fixture.json b/spark-ui/src/reducers/__tests__/gluten-sql4-fixture.json new file mode 100644 index 00000000..b8506ce0 --- /dev/null +++ b/spark-ui/src/reducers/__tests__/gluten-sql4-fixture.json @@ -0,0 +1,744 @@ +{ + "sql": { + "id": "4", + "description": "Sort by line count", + "successJobIds": [ + 7, + 8 + ], + "runningJobIds": [], + "failedJobIds": [], + "nodes": [ + { + "nodeId": 16, + "nodeName": "Scan csv", + "metrics": [ + { + "name": "number of output rows", + "value": "111,389" + }, + { + "name": "number of files read", + "value": "1" + }, + { + "name": "metadata time", + "value": "0 ms" + }, + { + "name": "size of files read", + "value": "9.7 MiB" + } + ] + }, + { + "nodeId": 15, + "nodeName": "RowToVeloxColumnar", + "metrics": [ + { + "name": "number of input rows", + "value": "111,389" + }, + { + "name": "number of output batches", + "value": "29" + }, + { + "name": "time to convert", + "value": "total (min, med, max (stageId: taskId))\n43 ms (7 ms, 17 ms, 19 ms (stage 8.0: task 14))" + } + ] + }, + { + "nodeId": 14, + "nodeName": "InputIteratorTransformer", + "metrics": [ + { + "name": "cpu wall time count", + "value": "70" + }, + { + "name": "time of operator input", + "value": "total (min, med, max (stageId: taskId))\n153 ms (34 ms, 59 ms, 60 ms (stage 8.0: task 13))" + }, + { + "name": "number of output rows", + "value": "111,389" + }, + { + "name": "number of output vectors", + "value": "29" + } + ] + }, + { + "nodeId": 13, + "nodeName": "FilterExecTransformer", + "metrics": [ + { + "name": "time of filter", + "value": "total (min, med, max (stageId: taskId))\n0 ms (0 ms, 0 ms, 0 ms (stage 8.0: task 15))" + }, + { + "name": "number of output bytes", + "value": "total (min, med, max (stageId: taskId))\n7.3 MiB (1388.7 KiB, 3.0 MiB, 3.0 MiB (stage 8.0: task 14))" + }, + { + "name": "cpu wall time count", + "value": "0" + }, + { + "name": "number of output vectors", + "value": "29" + }, + { + "name": "peak memory bytes", + "value": "total (min, med, max (stageId: taskId))\n0.0 B (0.0 B, 0.0 B, 0.0 B (stage 8.0: task 15))" + }, + { + "name": "number of output rows", + "value": "111,389" + }, + { + "name": "number of memory allocations", + "value": "0" + }, + { + "name": "time of loading lazy vectors", + "value": "total (min, med, max (stageId: taskId))\n0 ms (0 ms, 0 ms, 0 ms (stage 8.0: task 15))" + } + ] + }, + { + "nodeId": 12, + "nodeName": "ProjectExecTransformer", + "metrics": [ + { + "name": "time of project", + "value": "total (min, med, max (stageId: taskId))\n0 ms (0 ms, 0 ms, 0 ms (stage 8.0: task 13))" + }, + { + "name": "number of output bytes", + "value": "total (min, med, max (stageId: taskId))\n7.3 MiB (1388.7 KiB, 3.0 MiB, 3.0 MiB (stage 8.0: task 14))" + }, + { + "name": "cpu wall time count", + "value": "262" + }, + { + "name": "number of output vectors", + "value": "29" + }, + { + "name": "peak memory bytes", + "value": "total (min, med, max (stageId: taskId))\n0.0 B (0.0 B, 0.0 B, 0.0 B (stage 8.0: task 15))" + }, + { + "name": "number of output rows", + "value": "111,389" + }, + { + "name": "number of memory allocations", + "value": "0" + }, + { + "name": "time of loading lazy vectors", + "value": "total (min, med, max (stageId: taskId))\n0 ms (0 ms, 0 ms, 0 ms (stage 8.0: task 15))" + } + ] + }, + { + "nodeId": 11, + "nodeName": "FlushableHashAggregateExecTransformer", + "metrics": [ + { + "name": "number of final output vectors", + "value": "0" + }, + { + "name": "time of extraction", + "value": "total (min, med, max (stageId: taskId))\n0 ms (0 ms, 0 ms, 0 ms (stage 8.0: task 13))" + }, + { + "name": "rowConstruction cpu wall time count", + "value": "0" + }, + { + "name": "number of memory allocations", + "value": "97" + }, + { + "name": "number of output vectors", + "value": "3" + }, + { + "name": "number of spilled bytes", + "value": "total (min, med, max (stageId: taskId))\n0.0 B (0.0 B, 0.0 B, 0.0 B (stage 8.0: task 15))" + }, + { + "name": "number of final output rows", + "value": "0" + }, + { + "name": "bloom filter blocks byte size", + "value": "0.0 B" + }, + { + "name": "number of output rows", + "value": "1,338" + }, + { + "name": "number of pushdown aggregations", + "value": "0" + }, + { + "name": "number of output bytes", + "value": "total (min, med, max (stageId: taskId))\n1437.8 KiB (479.3 KiB, 479.3 KiB, 479.3 KiB (stage 8.0: task 15))" + }, + { + "name": "number of spilled files", + "value": "0" + }, + { + "name": "time of aggregation", + "value": "total (min, med, max (stageId: taskId))\n4 ms (0 ms, 1 ms, 1 ms (stage 8.0: task 13))" + }, + { + "name": "peak memory bytes", + "value": "total (min, med, max (stageId: taskId))\n3.6 MiB (1176.1 KiB, 1216.1 KiB, 1260.1 KiB (stage 8.0: task 14))" + }, + { + "name": "number of spilled rows", + "value": "0" + }, + { + "name": "cpu wall time count", + "value": "254" + }, + { + "name": "number of spilled partitions", + "value": "0" + }, + { + "name": "time of loading lazy vectors", + "value": "total (min, med, max (stageId: taskId))\n0 ms (0 ms, 0 ms, 0 ms (stage 8.0: task 15))" + }, + { + "name": "time of rowConstruction", + "value": "0 ms" + }, + { + "name": "number of flushed rows", + "value": "0" + }, + { + "name": "extraction cpu wall time count", + "value": "113" + } + ] + }, + { + "nodeId": 10, + "nodeName": "ProjectExecTransformer", + "metrics": [ + { + "name": "time of project", + "value": "total (min, med, max (stageId: taskId))\n0 ms (0 ms, 0 ms, 0 ms (stage 8.0: task 13))" + }, + { + "name": "number of output bytes", + "value": "total (min, med, max (stageId: taskId))\n1444.5 KiB (480.2 KiB, 482.2 KiB, 482.2 KiB (stage 8.0: task 14))" + }, + { + "name": "cpu wall time count", + "value": "84" + }, + { + "name": "number of output vectors", + "value": "3" + }, + { + "name": "peak memory bytes", + "value": "total (min, med, max (stageId: taskId))\n7.0 KiB (1024.0 B, 3.0 KiB, 3.0 KiB (stage 8.0: task 14))" + }, + { + "name": "number of output rows", + "value": "1,338" + }, + { + "name": "number of memory allocations", + "value": "3" + }, + { + "name": "time of loading lazy vectors", + "value": "total (min, med, max (stageId: taskId))\n0 ms (0 ms, 0 ms, 0 ms (stage 8.0: task 15))" + } + ] + }, + { + "nodeId": 9, + "nodeName": "WholeStageCodegenTransformer (1)", + "metrics": [ + { + "name": "duration", + "value": "total (min, med, max (stageId: taskId))\n184 ms (43 ms, 70 ms, 71 ms (stage 8.0: task 13))" + } + ] + }, + { + "nodeId": 8, + "nodeName": "VeloxResizeBatches", + "metrics": [ + { + "name": "number of output batches", + "value": "3" + }, + { + "name": "number of input rows", + "value": "1,338" + }, + { + "name": "time to convert batches", + "value": "total (min, med, max (stageId: taskId))\n1 ms (0 ms, 0 ms, 1 ms (stage 8.0: task 15))" + }, + { + "name": "number of input batches", + "value": "3" + }, + { + "name": "number of output rows", + "value": "1,338" + } + ] + }, + { + "nodeId": 7, + "nodeName": "ColumnarExchange", + "metrics": [ + { + "name": "shuffle records written", + "value": "1,338" + }, + { + "name": "local merged chunks fetched", + "value": "0" + }, + { + "name": "shuffle write time", + "value": "total (min, med, max (stageId: taskId))\n1 ms (0 ms, 0 ms, 0 ms (stage 8.0: task 14))" + }, + { + "name": "remote merged bytes read", + "value": "0.0 B" + }, + { + "name": "time to compress", + "value": "total (min, med, max (stageId: taskId))\n0 ms (0 ms, 0 ms, 0 ms (stage 8.0: task 15))" + }, + { + "name": "local merged blocks fetched", + "value": "0" + }, + { + "name": "time to split", + "value": "total (min, med, max (stageId: taskId))\n8 ms (2 ms, 2 ms, 3 ms (stage 8.0: task 13))" + }, + { + "name": "corrupt merged block chunks", + "value": "0" + }, + { + "name": "shuffle wall time", + "value": "total (min, med, max (stageId: taskId))\n9 ms (2 ms, 2 ms, 4 ms (stage 8.0: task 13))" + }, + { + "name": "number of input rows", + "value": "1,338" + }, + { + "name": "time to decompress", + "value": "0 ms" + }, + { + "name": "remote merged reqs duration", + "value": "0 ms" + }, + { + "name": "remote merged blocks fetched", + "value": "0" + }, + { + "name": "time to spill", + "value": "total (min, med, max (stageId: taskId))\n0 ms (0 ms, 0 ms, 0 ms (stage 8.0: task 15))" + }, + { + "name": "records read", + "value": "1,338" + }, + { + "name": "local bytes read", + "value": "142.9 KiB" + }, + { + "name": "dictionary size", + "value": "total (min, med, max (stageId: taskId))\n0.0 B (0.0 B, 0.0 B, 0.0 B (stage 8.0: task 15))" + }, + { + "name": "fetch wait time", + "value": "0 ms" + }, + { + "name": "remote bytes read", + "value": "0.0 B" + }, + { + "name": "time to deserialize", + "value": "0 ms" + }, + { + "name": "merged fetch fallback count", + "value": "0" + }, + { + "name": "avg read batch num rows", + "value": "2.5" + }, + { + "name": "batches read", + "value": "515" + }, + { + "name": "shuffle bytes spilled", + "value": "total (min, med, max (stageId: taskId))\n0.0 B (0.0 B, 0.0 B, 0.0 B (stage 8.0: task 15))" + }, + { + "name": "number of input batches", + "value": "3" + }, + { + "name": "avg dictionary fields", + "value": "0" + }, + { + "name": "number of output rows", + "value": "1,338" + }, + { + "name": "local blocks read", + "value": "3" + }, + { + "name": "remote merged chunks fetched", + "value": "0" + }, + { + "name": "remote blocks read", + "value": "0" + }, + { + "name": "data size", + "value": "total (min, med, max (stageId: taskId))\n81.6 KiB (12.6 KiB, 33.0 KiB, 35.9 KiB (stage 8.0: task 13))" + }, + { + "name": "local merged bytes read", + "value": "0.0 B" + }, + { + "name": "peak bytes allocated", + "value": "total (min, med, max (stageId: taskId))\n128.8 MiB (35.3 MiB, 46.1 MiB, 47.4 MiB (stage 8.0: task 13))" + }, + { + "name": "number of partitions", + "value": "200" + }, + { + "name": "remote reqs duration", + "value": "0 ms" + }, + { + "name": "remote bytes read to disk", + "value": "0.0 B" + }, + { + "name": "shuffle bytes written", + "value": "total (min, med, max (stageId: taskId))\n142.9 KiB (29.1 KiB, 55.2 KiB, 58.6 KiB (stage 8.0: task 13))" + } + ] + }, + { + "nodeId": 6, + "nodeName": "AQEShuffleRead", + "metrics": [ + { + "name": "number of partitions", + "value": "1" + }, + { + "name": "partition data size", + "value": "149.6 KiB" + }, + { + "name": "number of coalesced partitions", + "value": "1" + } + ] + }, + { + "nodeId": 5, + "nodeName": "InputIteratorTransformer", + "metrics": [ + { + "name": "cpu wall time count", + "value": "1,034" + }, + { + "name": "time of operator input", + "value": "3 ms" + }, + { + "name": "number of output rows", + "value": "1,338" + }, + { + "name": "number of output vectors", + "value": "515" + } + ] + }, + { + "nodeId": 4, + "nodeName": "RegularHashAggregateExecTransformer", + "metrics": [ + { + "name": "number of final output vectors", + "value": "0" + }, + { + "name": "time of extraction", + "value": "0 ms" + }, + { + "name": "rowConstruction cpu wall time count", + "value": "4,130" + }, + { + "name": "number of memory allocations", + "value": "41" + }, + { + "name": "number of output vectors", + "value": "1" + }, + { + "name": "number of spilled bytes", + "value": "0.0 B" + }, + { + "name": "number of final output rows", + "value": "0" + }, + { + "name": "bloom filter blocks byte size", + "value": "0.0 B" + }, + { + "name": "number of output rows", + "value": "1,326" + }, + { + "name": "number of pushdown aggregations", + "value": "0" + }, + { + "name": "number of output bytes", + "value": "431.3 KiB" + }, + { + "name": "number of spilled files", + "value": "0" + }, + { + "name": "time of aggregation", + "value": "2 ms" + }, + { + "name": "peak memory bytes", + "value": "1004.6 KiB" + }, + { + "name": "number of spilled rows", + "value": "0" + }, + { + "name": "cpu wall time count", + "value": "3,622" + }, + { + "name": "number of spilled partitions", + "value": "0" + }, + { + "name": "time of loading lazy vectors", + "value": "0 ms" + }, + { + "name": "time of rowConstruction", + "value": "2 ms" + }, + { + "name": "number of flushed rows", + "value": "0" + }, + { + "name": "extraction cpu wall time count", + "value": "0" + } + ] + }, + { + "nodeId": 3, + "nodeName": "WholeStageCodegenTransformer (2)", + "metrics": [] + }, + { + "nodeId": 2, + "nodeName": "TakeOrderedAndProjectExecTransformer", + "metrics": [] + }, + { + "nodeId": 1, + "nodeName": "VeloxColumnarToRow", + "metrics": [ + { + "name": "number of output rows", + "value": "21" + }, + { + "name": "number of input batches", + "value": "1" + }, + { + "name": "time to convert", + "value": "0 ms" + } + ] + }, + { + "nodeId": 0, + "nodeName": "AdaptiveSparkPlan", + "metrics": [] + } + ], + "edges": [ + { + "fromId": 1, + "toId": 0 + }, + { + "fromId": 2, + "toId": 1 + }, + { + "fromId": 4, + "toId": 2 + }, + { + "fromId": 5, + "toId": 4 + }, + { + "fromId": 6, + "toId": 5 + }, + { + "fromId": 7, + "toId": 6 + }, + { + "fromId": 8, + "toId": 7 + }, + { + "fromId": 10, + "toId": 8 + }, + { + "fromId": 11, + "toId": 10 + }, + { + "fromId": 12, + "toId": 11 + }, + { + "fromId": 13, + "toId": 12 + }, + { + "fromId": 14, + "toId": 13 + }, + { + "fromId": 15, + "toId": 14 + }, + { + "fromId": 16, + "toId": 15 + } + ] + }, + "jobs": [ + { + "jobId": 8, + "stageIds": [ + 9, + 10 + ] + }, + { + "jobId": 7, + "stageIds": [ + 8 + ] + } + ], + "stages": [ + { + "stageId": 10, + "status": "COMPLETE", + "numTasks": 1, + "numCompleteTasks": 1, + "numFailedTasks": 0, + "numActiveTasks": 0, + "executorRunTime": 14, + "attemptId": 0 + }, + { + "stageId": 8, + "status": "COMPLETE", + "numTasks": 3, + "numCompleteTasks": 3, + "numFailedTasks": 0, + "numActiveTasks": 0, + "executorRunTime": 203, + "attemptId": 0 + } + ], + "stagesRdd": { + "8": { + "100": "Scan csv ", + "90": "ColumnarExchange", + "99": "RowToVeloxColumnar", + "92": "WholeStageCodegenTransformer (1)", + "91": "VeloxResizeBatches" + }, + "9": {}, + "10": { + "107": "AQEShuffleRead", + "108": "WholeStageCodegenTransformer (3)", + "101": "VeloxColumnarToRow", + "115": "mapPartitionsInternal" + } + } +} \ No newline at end of file From 1ef4c56c98f0a5b55c816f756ae8b77659ae519a Mon Sep 17 00:00:00 2001 From: menishmueli Date: Mon, 13 Apr 2026 21:33:10 -0400 Subject: [PATCH 2/6] Fix Comet/DataFusion exchange split and aggregate enrichment - Add CometExchange, CometColumnarExchange, GpuColumnarExchange to exchange visual split, stage assignment, and shuffle metrics calculation - Add CometHashAggregate to aggregate node parsing and naming - Support Comet plan description format (Keys:/Functions:) in parser - Re-add fallback plan description parsing from SQL-level planDescription for native engines where DataFlint custom endpoint returns empty --- .../example/DataFusionCometExample.scala | 2 +- .../components/SqlFlow/SqlLayoutService.ts | 4 +- .../PlanParsers/hashAggregateParser.ts | 4 +- spark-ui/src/reducers/SQLNodeStageReducer.ts | 4 +- spark-ui/src/reducers/SqlReducer.ts | 72 ++++++++++++++++++- spark-ui/src/reducers/SqlReducerUtils.ts | 1 + 6 files changed, 80 insertions(+), 7 deletions(-) diff --git a/spark-plugin/example_3_5_1/src/main/scala/io/dataflint/example/DataFusionCometExample.scala b/spark-plugin/example_3_5_1/src/main/scala/io/dataflint/example/DataFusionCometExample.scala index 0823dbd6..38e3d072 100644 --- a/spark-plugin/example_3_5_1/src/main/scala/io/dataflint/example/DataFusionCometExample.scala +++ b/spark-plugin/example_3_5_1/src/main/scala/io/dataflint/example/DataFusionCometExample.scala @@ -43,7 +43,7 @@ object DataFusionCometExample extends App { println(s"number of unique words : $uniqueWords") - spark.read.load("/Users/menishmueli/Documents/GitHub/spark-sql-perf/data/store_sales").filter($"ss_quantity" > 1).count() +// spark.read.load("/Users/menishmueli/Documents/GitHub/spark-sql-perf/data/store_sales").filter($"ss_quantity" > 1).count() scala.io.StdIn.readLine() spark.stop() diff --git a/spark-ui/src/components/SqlFlow/SqlLayoutService.ts b/spark-ui/src/components/SqlFlow/SqlLayoutService.ts index b1276a01..beaa3c8d 100644 --- a/spark-ui/src/components/SqlFlow/SqlLayoutService.ts +++ b/spark-ui/src/components/SqlFlow/SqlLayoutService.ts @@ -593,7 +593,9 @@ class SqlLayoutService { const splitExchangeNodeIds = new Set(); for (const nodeId of nodesIds) { const node = nodeMap.get(nodeId); - if (node?.nodeName === "Exchange" || node?.nodeName === "ColumnarExchange") { + if (node?.nodeName === "Exchange" || node?.nodeName === "ColumnarExchange" || + node?.nodeName === "CometExchange" || node?.nodeName === "CometColumnarExchange" || + node?.nodeName === "GpuColumnarExchange") { splitExchangeNodeIds.add(nodeId.toString()); } } diff --git a/spark-ui/src/reducers/PlanParsers/hashAggregateParser.ts b/spark-ui/src/reducers/PlanParsers/hashAggregateParser.ts index 2b90403a..cb899e84 100644 --- a/spark-ui/src/reducers/PlanParsers/hashAggregateParser.ts +++ b/spark-ui/src/reducers/PlanParsers/hashAggregateParser.ts @@ -3,8 +3,8 @@ import { bracedSplit, hashNumbersRemover, onlyUnique } from "./PlanParserUtils"; export function parseHashAggregate(input: string): ParsedHashAggregatePlan { const cleanInput = hashNumbersRemover(input); - const keysMatch = cleanInput.match(/keys=\[([^\]]+)\]/); - const functionsMatch = cleanInput.match(/functions=\[([^\]]+)\]/); + const keysMatch = cleanInput.match(/keys=\[([^\]]+)\]/) ?? cleanInput.match(/Keys:\s*\[([^\]]+)\]/); + const functionsMatch = cleanInput.match(/functions=\[([^\]]+)\]/) ?? cleanInput.match(/Functions\s*\[\d+\]:\s*\[([^\]]+)\]/); let keys: string[] = []; let functions: string[] = []; diff --git a/spark-ui/src/reducers/SQLNodeStageReducer.ts b/spark-ui/src/reducers/SQLNodeStageReducer.ts index da3e1f73..d12fadee 100644 --- a/spark-ui/src/reducers/SQLNodeStageReducer.ts +++ b/spark-ui/src/reducers/SQLNodeStageReducer.ts @@ -148,7 +148,9 @@ export function calculateSQLNodeStage(sql: EnrichedSparkSQL, sqlStages: SparkSta nodes = nodes.map((node) => { // Convert Exchange nodes to exchange stage type if they have adjacent nodes with stage info // This handles both nodes without stage data and nodes with onestage type that should be exchange type - if ((node.nodeName === "Exchange" || node.nodeName === "ColumnarExchange") && (node.stage === undefined || node.stage.type === "onestage")) { + if ((node.nodeName === "Exchange" || node.nodeName === "ColumnarExchange" || + node.nodeName === "CometExchange" || node.nodeName === "CometColumnarExchange" || + node.nodeName === "GpuColumnarExchange") && (node.stage === undefined || node.stage.type === "onestage")) { const nextNode = findNextNode(node.nodeId); const previousNode = findPreviousNode(node.nodeId); const metricsExchangeStageIds = findExchangeStageIds(node.metrics); diff --git a/spark-ui/src/reducers/SqlReducer.ts b/spark-ui/src/reducers/SqlReducer.ts index 27f95d28..3ae75268 100644 --- a/spark-ui/src/reducers/SqlReducer.ts +++ b/spark-ui/src/reducers/SqlReducer.ts @@ -100,6 +100,7 @@ export function parseNodePlan( case "PhotonGroupingAgg": case "GpuHashAggregate": case "!CometGpuHashAggregate": + case "CometHashAggregate": case "HashAggregate": case "SortAggregate": case "ObjectHashAggregate": @@ -268,6 +269,57 @@ export function getMetricDuration( return duration; } +/** + * When the DataFlint custom plan endpoint returns empty (e.g., for Gluten/Velox or Comet), + * fall back to parsing per-node descriptions from the SQL-level planDescription text. + * Matches plan sections like "(26) WindowExecTransformer\nArguments: [...]" to SQL nodes by name. + */ +function buildFallbackPlanDescriptions( + sqlPlanDescription: string, + nodes: { nodeId: number; nodeName: string }[], +): Map { + const result = new Map(); + if (!sqlPlanDescription) return result; + + const lines = sqlPlanDescription.split("\n"); + const sections: { name: string; body: string }[] = []; + let currentName: string | undefined; + let currentBody: string[] = []; + + for (const line of lines) { + const headerMatch = line.match(/^\((\d+)\)\s+(\S+)/); + if (headerMatch) { + if (currentName !== undefined && currentBody.length > 0) { + sections.push({ name: currentName, body: currentBody.join(" ") }); + } + currentName = headerMatch[2]; + currentBody = []; + } else if (currentName !== undefined) { + const trimmed = line.trim(); + if (trimmed.startsWith("Arguments:") || trimmed.startsWith("Keys:") || trimmed.startsWith("Functions")) { + currentBody.push(trimmed); + } + } + } + if (currentName !== undefined && currentBody.length > 0) { + sections.push({ name: currentName, body: currentBody.join(" ") }); + } + + const usedSections = new Set(); + for (const node of nodes) { + for (let i = 0; i < sections.length; i++) { + if (usedSections.has(i)) continue; + if (sections[i].name === node.nodeName && sections[i].body) { + result.set(node.nodeId, `${node.nodeName} ${sections[i].body}`); + usedSections.add(i); + break; + } + } + } + + return result; +} + function calculateSql( sql: SparkSQL, plan: SQLPlan | undefined, @@ -277,13 +329,27 @@ function calculateSql( ): EnrichedSparkSQL { const enrichedSql = sql as EnrichedSparkSQL; const originalNumOfNodes = enrichedSql.nodes.length; + + const hasPlanData = plan !== undefined && plan.nodesPlan.length > 0; + const fallbackDescs = hasPlanData + ? new Map() + : buildFallbackPlanDescriptions(sql.planDescription, enrichedSql.nodes); + const typeEnrichedNodes = enrichedSql.nodes.map((node) => { const type = calcNodeType(node.nodeName); const nodePlan = plan?.nodesPlan.find( (planNode) => planNode.id === node.nodeId, ); - const parsedPlan = + let parsedPlan = nodePlan !== undefined ? parseNodePlan(node, nodePlan) : undefined; + + if (parsedPlan === undefined) { + const fallbackDesc = fallbackDescs.get(node.nodeId); + if (fallbackDesc) { + parsedPlan = parseNodePlan(node, { id: node.nodeId, planDescription: fallbackDesc, rddScopeId: undefined }); + } + } + const isCodegenNode = node.nodeName.includes("WholeStageCodegen"); // Find the Delta Lake scan that matches this node's table location @@ -671,7 +737,9 @@ function calcCodegenDuration(metrics: EnrichedSqlMetric[]): number | undefined { function calcExchangeMetrics(nodeName: string, metrics: EnrichedSqlMetric[]) { var exchangeMetrics: ExchangeMetrics | undefined = undefined; - if (nodeName === "Exchange" || nodeName === "ColumnarExchange") { + if (nodeName === "Exchange" || nodeName === "ColumnarExchange" || + nodeName === "CometExchange" || nodeName === "CometColumnarExchange" || + nodeName === "GpuColumnarExchange") { const writeDuration = (getMetricDuration("shuffle write time", metrics) ?? 0) + (getMetricDuration("shuffle wall time", metrics) ?? 0); diff --git a/spark-ui/src/reducers/SqlReducerUtils.ts b/spark-ui/src/reducers/SqlReducerUtils.ts index e0a80cf7..8dce0fa7 100644 --- a/spark-ui/src/reducers/SqlReducerUtils.ts +++ b/spark-ui/src/reducers/SqlReducerUtils.ts @@ -762,6 +762,7 @@ export const AGGREGATE_NODE_NAMES = [ "PhotonGroupingAgg", "GpuHashAggregate", "!CometGpuHashAggregate", + "CometHashAggregate", "HashAggregate", "SortAggregate", "ObjectHashAggregate", From 0aabf2f8512a9111a0ced7d447d85f01437f829e Mon Sep 17 00:00:00 2001 From: Avi Minsky Date: Mon, 18 May 2026 09:56:28 +0300 Subject: [PATCH 3/6] Make DataFlint instrumentation compatible with native accelerators When a native query accelerator (Gluten/Velox, Comet/DataFusion, RAPIDS GPU, Photon) is on the classpath, DataFlint's TimedExec wrapper hides Spark operators from the accelerator's plan-substitution rule because TimedExec uses transparent children on Spark 3.2+. The accelerator silently falls back to the JVM path. Detect each accelerator via a classpath probe and skip wrapping any operator it would transform, so both DataFlint instrumentation and native acceleration coexist. Other fixes pulled in along the way: - Spark UI: GraphDurationAttribution recognizes columnar exchanges and native blocking nodes (ColumnarExchange/CometExchange/Gpu*/Photon*) as stage boundaries and blocking operators. - GlutenStageAssignment.spec.ts: drop the unreachable third branch on the discriminated stage union that tripped tsc. - docker/gluten/run-gluten-example.sh: case-insensitive find for the example jar (sbt lowercases normalizedName, BSD find is case-sensitive). - GlutenVeloxExample now enables spark.dataflint.instrument.spark.enabled to exercise the new coexistence path. Co-Authored-By: Claude Opus 4.7 (1M context) --- docker/gluten/run-gluten-example.sh | 2 +- .../example/GlutenVeloxExample.scala | 1 + .../apache/spark/dataflint/Accelerator.scala | 99 +++++++++++++++++++ .../DataFlintInstrumentationExtension.scala | 22 ++++- .../src/reducers/GraphDurationAttribution.ts | 21 ++++ .../__tests__/GlutenStageAssignment.spec.ts | 6 +- 6 files changed, 146 insertions(+), 5 deletions(-) create mode 100644 spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/Accelerator.scala diff --git a/docker/gluten/run-gluten-example.sh b/docker/gluten/run-gluten-example.sh index a34e5cf4..8afaf1c8 100755 --- a/docker/gluten/run-gluten-example.sh +++ b/docker/gluten/run-gluten-example.sh @@ -110,7 +110,7 @@ cp "$PLUGIN_JAR" "$JARS_DIR/dataflint-plugin.jar" echo "Copied DataFlint plugin: $(basename "$PLUGIN_JAR")" # Example jar -EXAMPLE_JAR=$(find "$PROJECT_ROOT/spark-plugin/example_3_5_1/target/scala-${SCALA_VERSION}" -name "DataflintSparkExample351_${SCALA_VERSION}-*.jar" -type f | head -1) +EXAMPLE_JAR=$(find "$PROJECT_ROOT/spark-plugin/example_3_5_1/target/scala-${SCALA_VERSION}" -iname "dataflintsparkexample351_${SCALA_VERSION}-*.jar" -type f | head -1) if [ -z "$EXAMPLE_JAR" ]; then echo "ERROR: Example jar not found. Run without --skip-build first." exit 1 diff --git a/spark-plugin/example_3_5_1/src/main/scala/io/dataflint/example/GlutenVeloxExample.scala b/spark-plugin/example_3_5_1/src/main/scala/io/dataflint/example/GlutenVeloxExample.scala index f435e8a1..1990bd48 100644 --- a/spark-plugin/example_3_5_1/src/main/scala/io/dataflint/example/GlutenVeloxExample.scala +++ b/spark-plugin/example_3_5_1/src/main/scala/io/dataflint/example/GlutenVeloxExample.scala @@ -16,6 +16,7 @@ object GlutenVeloxExample extends App { .config("spark.eventLog.enabled", "true") .config("spark.eventLog.dir", "/tmp/spark-events") .config("spark.dataflint.telemetry.enabled", value = false) + .config("spark.dataflint.instrument.spark.enabled", value = true) .config("spark.sql.maxMetadataStringLength", "10000") .config("spark.sql.adaptive.enabled", "true") .master("local[*]") diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/Accelerator.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/Accelerator.scala new file mode 100644 index 00000000..418c0c19 --- /dev/null +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/Accelerator.scala @@ -0,0 +1,99 @@ +package org.apache.spark.dataflint + +/** + * A native query accelerator that replaces Spark physical operators with its own variants. + * + * Used by [[DataFlintInstrumentationColumnarRule]] to decide which operators to skip when + * wrapping with [[TimedExec]]. `TimedExec` is transparent on Spark 3.2+ (`children = + * child.children`), so wrapping a Spark operator hides it from any other plan-transform + * rule — including the accelerator's own substitution rule, which then silently falls back + * to the Spark JVM path. + * + * @param name human-readable label for log messages + * @param detectionClasses class names checked via reflection; if ANY is on the classpath + * the accelerator is considered active + * @param transformableExecs simple class names of Spark `*Exec` operators this accelerator + * will replace during physical planning. DataFlint must not wrap + * these. + */ +case class Accelerator( + name: String, + detectionClasses: Seq[String], + transformableExecs: Set[String]) + +object Accelerator { + // Gluten/Velox — Apache Gluten project (Velox native engine). + val GlutenVelox: Accelerator = Accelerator( + name = "Gluten/Velox", + detectionClasses = Seq("org.apache.gluten.GlutenPlugin"), + transformableExecs = Set( + "FilterExec", "ProjectExec", "SortExec", "ExpandExec", "GenerateExec", + "HashAggregateExec", "SortAggregateExec", "ObjectHashAggregateExec", + "SortMergeJoinExec", "BroadcastHashJoinExec", "ShuffledHashJoinExec", + "BroadcastNestedLoopJoinExec", "CartesianProductExec", + "WindowExec", "WindowGroupLimitExec", + "FileSourceScanExec", "BatchScanExec", + )) + + // Comet — Apache DataFusion Comet engine. + val CometDataFusion: Accelerator = Accelerator( + name = "Comet/DataFusion", + detectionClasses = Seq( + "org.apache.spark.CometPlugin", + "org.apache.comet.CometSparkSessionExtensions"), + transformableExecs = Set( + "FilterExec", "ProjectExec", "SortExec", "ExpandExec", + "HashAggregateExec", "ObjectHashAggregateExec", + "SortMergeJoinExec", "BroadcastHashJoinExec", "ShuffledHashJoinExec", + "FileSourceScanExec", "BatchScanExec", + )) + + // RAPIDS — NVIDIA RAPIDS Accelerator for Apache Spark. + val RapidsGpu: Accelerator = Accelerator( + name = "RAPIDS GPU", + detectionClasses = Seq( + "com.nvidia.spark.SQLPlugin", + "com.nvidia.spark.rapids.RapidsPlugin"), + transformableExecs = Set( + "FilterExec", "ProjectExec", "SortExec", "ExpandExec", "GenerateExec", + "HashAggregateExec", "SortAggregateExec", "ObjectHashAggregateExec", + "BroadcastHashJoinExec", "ShuffledHashJoinExec", "SortMergeJoinExec", + "BroadcastNestedLoopJoinExec", + "WindowExec", "WindowGroupLimitExec", + "FileSourceScanExec", "BatchScanExec", + )) + + // Photon — Databricks proprietary engine. Photon is only on the DBR classpath, so + // probing for Photon-internal classes is safe on a normal Spark distribution. + val Photon: Accelerator = Accelerator( + name = "Photon", + detectionClasses = Seq( + "com.databricks.photon.PhotonAdaptiveSparkPlanExec", + "com.databricks.sql.execution.PhotonResultStage"), + transformableExecs = Set( + "FilterExec", "ProjectExec", + "HashAggregateExec", "SortAggregateExec", "ObjectHashAggregateExec", + "BroadcastHashJoinExec", "ShuffledHashJoinExec", "SortMergeJoinExec", + "BroadcastNestedLoopJoinExec", + "WindowExec", + "FileSourceScanExec", + )) + + val known: Seq[Accelerator] = Seq(GlutenVelox, CometDataFusion, RapidsGpu, Photon) + + private def isClassPresent(className: String): Boolean = { + try { + Class.forName(className, false, Thread.currentThread().getContextClassLoader) + true + } catch { + case _: ClassNotFoundException | _: NoClassDefFoundError => false + } + } + + /** + * Accelerators detected on the current classpath. Probed lazily; the result is cached + * for the JVM lifetime since classpath composition cannot change after startup. + */ + lazy val active: Seq[Accelerator] = + known.filter(_.detectionClasses.exists(isClassPresent)) +} \ No newline at end of file diff --git a/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala b/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala index 1d6e2ea5..320ecce5 100644 --- a/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala +++ b/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala @@ -56,7 +56,7 @@ case class DataFlintInstrumentationColumnarRule(session: SparkSession) extends C "WindowExec", "WindowInPandasExec" ) ++ sqlNodes - if (globalEnabled) all + val base = if (globalEnabled) all else { (if (conf.getBoolean(DataflintSparkUICommonLoader.INSTRUMENT_SQL_NODES_ENABLED, false)) sqlNodes else Set.empty[String]) ++ (if (conf.getBoolean(DataflintSparkUICommonLoader.INSTRUMENT_BATCH_EVAL_PYTHON_ENABLED, false)) Set("BatchEvalPythonExec") else Set.empty[String]) ++ @@ -67,6 +67,25 @@ case class DataFlintInstrumentationColumnarRule(session: SparkSession) extends C (if (conf.getBoolean(DataflintSparkUICommonLoader.INSTRUMENT_FLAT_MAP_COGROUPS_PANDAS_ENABLED, false)) Set("FlatMapCoGroupsInPandasExec") else Set.empty[String]) ++ (if (conf.getBoolean(DataflintSparkUICommonLoader.INSTRUMENT_WINDOW_ENABLED, false)) Set("WindowExec", "WindowInPandasExec") else Set.empty[String]) } + // Native-acceleration compatibility: TimedExec uses transparent children on Spark 3.2+ + // (children = child.children) so the wrapped operator is hidden from any other rule's + // plan-transform traversal. Each native accelerator (Gluten/Velox, Comet/DataFusion, + // RAPIDS GPU, Photon) replaces Spark physical operators with its own variants; wrapping + // those operators with TimedExec hides them and the accelerator silently falls back to + // the JVM path. For every active accelerator, drop its transformable operators from + // the wrap set. Python UDFs and DataWritingCommandExec are never claimed by these + // accelerators, so they stay wrappable. + val active = Accelerator.active + if (active.nonEmpty) { + val transformable: Set[String] = active.flatMap(_.transformableExecs).toSet + val dropped = base intersect transformable + if (dropped.nonEmpty) { + logInfo( + s"DataFlint: ${active.map(_.name).mkString(", ")} detected — skipping instrumentation " + + s"for nodes the accelerator will transform: ${dropped.toSeq.sorted.mkString(", ")}") + } + base -- transformable + } else base } override def preColumnarTransitions: Rule[SparkPlan] = { plan => @@ -79,3 +98,4 @@ case class DataFlintInstrumentationColumnarRule(session: SparkSession) extends C } } } + diff --git a/spark-ui/src/reducers/GraphDurationAttribution.ts b/spark-ui/src/reducers/GraphDurationAttribution.ts index 07cc9825..f5ad176f 100644 --- a/spark-ui/src/reducers/GraphDurationAttribution.ts +++ b/spark-ui/src/reducers/GraphDurationAttribution.ts @@ -50,11 +50,32 @@ const BLOCKING_NODES = new Set([ "MapInPandas", "MapInArrow", "PythonMapInArrow", "FlatMapGroupsInPandas", "FlatMapCoGroupsInPandas", "WindowInPandas", "ArrowWindowPython", + // Gluten/Velox native operators + "SortExecTransformer", + "FlushableHashAggregateExecTransformer", "RegularHashAggregateExecTransformer", + "SortMergeJoinExecTransformer", "ShuffledHashJoinExecTransformer", + "BroadcastHashJoinExecTransformer", "CartesianProductExecTransformer", + // Comet (DataFusion) + "CometSort", "CometHashAggregate", + "CometSortMergeJoin", "CometHashJoin", "CometBroadcastHashJoin", + // RAPIDS (GPU) + "GpuSort", "GpuHashAggregate", + "GpuShuffledSymmetricHashJoin", "GpuBroadcastHashJoin", "GpuBroadcastNestedLoopJoin", + // Photon (Databricks) + "PhotonGroupingAgg", ]); const EXCHANGE_NAMES = new Set([ "Exchange", "BroadcastExchange", "ShuffleQueryStage", "BroadcastQueryStage", "ResultQueryStage", + // Gluten/Velox + "ColumnarExchange", "ColumnarBroadcastExchange", + // Comet (DataFusion) + "CometExchange", "CometColumnarExchange", + // RAPIDS (GPU) + "GpuColumnarExchange", "GpuBroadcastExchange", + // Photon (Databricks) + "PhotonShuffleExchangeSink", "PhotonShuffleExchangeSource", "PhotonBroadcastExchange", ]); const NATIVE_EXCLUSIVE_METRICS: Record = { diff --git a/spark-ui/src/reducers/__tests__/GlutenStageAssignment.spec.ts b/spark-ui/src/reducers/__tests__/GlutenStageAssignment.spec.ts index 0929fe20..3c3dc03d 100644 --- a/spark-ui/src/reducers/__tests__/GlutenStageAssignment.spec.ts +++ b/spark-ui/src/reducers/__tests__/GlutenStageAssignment.spec.ts @@ -115,9 +115,9 @@ describe("Gluten/Velox Stage Assignment - SQL 4 (Sort by line count)", () => { // Debug: print all node stages for (const node of result.nodes) { const stageInfo = node.stage - ? node.stage.type === "onestage" ? `onestage:${(node.stage as any).stageId}` - : node.stage.type === "exchange" ? `exchange:w=${(node.stage as any).writeStage},r=${(node.stage as any).readStage}` - : `${node.stage.type}` + ? node.stage.type === "onestage" + ? `onestage:${node.stage.stageId}` + : `exchange:w=${node.stage.writeStage},r=${node.stage.readStage}` : "NONE"; console.log(` Node ${node.nodeId}: ${node.nodeName.padEnd(45)} stage=${stageInfo} wcid=${node.wholeStageCodegenId}`); } From 44a23141c9a9c453f53775d5732d980ee8322b30 Mon Sep 17 00:00:00 2001 From: Avi Minsky Date: Mon, 18 May 2026 11:30:31 +0300 Subject: [PATCH 4/6] Run DataFlint instrumentation last via postColumnarTransitions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ApplyColumnarRulesAndInsertTransitions applies pre rules in registration order then post rules in **reverse** registration order (Columnar.scala:570-576). The DataFlint plugin's init() runs first and auto-registers its SessionExtension first, so the DataFlint columnar rule sits at index 0 in columnarRules. Moving the TimedExec wrap from preColumnarTransitions to postColumnarTransitions makes it run **last**, after every accelerator's pre-phase substitution. By that point Gluten/Comet/RAPIDS/Photon have already swapped FilterExec → FilterExecTransformer (and the Comet/Gpu/Photon equivalents) in their preColumnarTransitions. DataFlint's class-name match list contains only Spark-native classes, so transformed nodes naturally don't match — no explicit accelerator skip list needed, and the result is robust to plugin registration order. The explicit Accelerator-aware skip in enabledNodeNames is removed; only an informational log naming detected accelerators remains, so users see the coexistence path is engaged. DataFlintPythonExecSpec swapped its 12 direct preColumnarTransitions test calls to postColumnarTransitions; DataFlintWindowExecSpec's comment about the injection phase was updated. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../DataFlintInstrumentationExtension.scala | 73 ++++++++++++------- .../dataflint/DataFlintPythonExecSpec.scala | 24 +++--- .../dataflint/DataFlintWindowExecSpec.scala | 2 +- 3 files changed, 58 insertions(+), 41 deletions(-) diff --git a/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala b/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala index 320ecce5..4e45ca29 100644 --- a/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala +++ b/spark-plugin/pluginspark3/src/main/scala/org/apache/spark/dataflint/DataFlintInstrumentationExtension.scala @@ -21,21 +21,47 @@ class DataFlintInstrumentationExtension extends (SparkSessionExtensions => Unit) /** * A ColumnarRule that wraps instrumented physical plan nodes with TimedExec to add a `duration` - * metric. Runs in preColumnarTransitions so it sees the fully-planned physical tree. + * metric. * - * Exchange nodes (ShuffleExchangeExec, BroadcastExchangeExec) are never wrapped. + * ## Why postColumnarTransitions, not pre * - * Version-specific class names (e.g. PythonMapInArrowExec, added in Spark 3.3) are matched by - * simple class name string to avoid NoClassDefFoundError on Spark 3.0/3.1 at load time. + * Native accelerators (Gluten/Velox, Comet, RAPIDS, Photon) replace Spark operators with their + * own variants via `injectColumnar`. TimedExec is transparent on Spark 3.2+ + * (`children = child.children`), which hides the wrapped operator from any other rule's plan + * traversal — so wrapping `FilterExec` in `preColumnarTransitions` makes the accelerator's + * transformation rule walk past it and the plan silently falls back to the Spark JVM path. * - * The !isInstanceOf[TimedExec] guard on the child makes the rule idempotent — safe to re-run - * under AQE prepareForExecution. + * `ApplyColumnarRulesAndInsertTransitions.apply` (Spark `Columnar.scala`) applies `pre` rules + * in registration order then `post` rules in **reverse** registration order: + * + * {{{ + * columnarRules.foreach(r => plan = r.preColumnarTransitions(plan)) + * plan = insertTransitions(plan) + * columnarRules.reverse.foreach(r => plan = r.postColumnarTransitions(plan)) + * }}} + * + * Since the DataFlint plugin's `init()` runs first (it auto-registers this extension before + * the accelerator's plugin appends its own), DataFlint sits at index 0 in `columnarRules` and + * therefore runs **last** in post. By that point every accelerator has already substituted + * its `*ExecTransformer` / `Comet*` / `Gpu*` / `Photon*` variants in `pre`, so the Spark + * class names in `enabledNodeNames` only match operators no accelerator claimed (writes, + * Python UDFs, fallback ops) — exactly what we want to instrument. + * + * ## Idempotence + * + * AQE re-runs `prepareForExecution` for each new query stage, so this rule may run more than + * once over the same subtree. The `!isInstanceOf[TimedExec]` guard makes re-application a + * no-op. + * + * ## Spark 3.0/3.1 + * + * Version-specific class names (e.g. `PythonMapInArrowExec`, added in Spark 3.3) are matched + * by simple class name string to avoid NoClassDefFoundError at load time. */ case class DataFlintInstrumentationColumnarRule(session: SparkSession) extends ColumnarRule with Logging { - // Eagerly compute the set of node simple-class-names to wrap, respecting per-type flags. - // When the global flag is on everything is enabled; otherwise only nodes whose specific - // flag is enabled are included. + // Set of physical operator simple class names this rule will wrap with TimedExec. + // Respects the per-feature flags; the global INSTRUMENT_SPARK_ENABLED implies all. private val enabledNodeNames: Set[String] = { val conf = session.sparkContext.conf val globalEnabled = conf.getBoolean(DataflintSparkUICommonLoader.INSTRUMENT_SPARK_ENABLED, defaultValue = false) @@ -56,7 +82,7 @@ case class DataFlintInstrumentationColumnarRule(session: SparkSession) extends C "WindowExec", "WindowInPandasExec" ) ++ sqlNodes - val base = if (globalEnabled) all + if (globalEnabled) all else { (if (conf.getBoolean(DataflintSparkUICommonLoader.INSTRUMENT_SQL_NODES_ENABLED, false)) sqlNodes else Set.empty[String]) ++ (if (conf.getBoolean(DataflintSparkUICommonLoader.INSTRUMENT_BATCH_EVAL_PYTHON_ENABLED, false)) Set("BatchEvalPythonExec") else Set.empty[String]) ++ @@ -67,28 +93,19 @@ case class DataFlintInstrumentationColumnarRule(session: SparkSession) extends C (if (conf.getBoolean(DataflintSparkUICommonLoader.INSTRUMENT_FLAT_MAP_COGROUPS_PANDAS_ENABLED, false)) Set("FlatMapCoGroupsInPandasExec") else Set.empty[String]) ++ (if (conf.getBoolean(DataflintSparkUICommonLoader.INSTRUMENT_WINDOW_ENABLED, false)) Set("WindowExec", "WindowInPandasExec") else Set.empty[String]) } - // Native-acceleration compatibility: TimedExec uses transparent children on Spark 3.2+ - // (children = child.children) so the wrapped operator is hidden from any other rule's - // plan-transform traversal. Each native accelerator (Gluten/Velox, Comet/DataFusion, - // RAPIDS GPU, Photon) replaces Spark physical operators with its own variants; wrapping - // those operators with TimedExec hides them and the accelerator silently falls back to - // the JVM path. For every active accelerator, drop its transformable operators from - // the wrap set. Python UDFs and DataWritingCommandExec are never claimed by these - // accelerators, so they stay wrappable. + } + + // Surface detected accelerators in logs so users know the coexistence path is engaged. + if (enabledNodeNames.nonEmpty) { val active = Accelerator.active if (active.nonEmpty) { - val transformable: Set[String] = active.flatMap(_.transformableExecs).toSet - val dropped = base intersect transformable - if (dropped.nonEmpty) { - logInfo( - s"DataFlint: ${active.map(_.name).mkString(", ")} detected — skipping instrumentation " + - s"for nodes the accelerator will transform: ${dropped.toSeq.sorted.mkString(", ")}") - } - base -- transformable - } else base + logInfo( + s"DataFlint: native accelerator(s) detected — ${active.map(_.name).mkString(", ")}. " + + "DataFlint runs in postColumnarTransitions and only wraps operators no accelerator transformed.") + } } - override def preColumnarTransitions: Rule[SparkPlan] = { plan => + override def postColumnarTransitions: Rule[SparkPlan] = { plan => if (enabledNodeNames.isEmpty) plan else plan.transformUp { case node if enabledNodeNames.contains(node.getClass.getSimpleName) diff --git a/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataFlintPythonExecSpec.scala b/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataFlintPythonExecSpec.scala index ef782e50..5886211e 100644 --- a/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataFlintPythonExecSpec.scala +++ b/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataFlintPythonExecSpec.scala @@ -54,7 +54,7 @@ class DataFlintPythonExecSpec extends AnyFunSuite with Matchers with BeforeAndAf val udf = fakePythonUDF(PythonEvalType.SQL_MAP_PANDAS_ITER_UDF) val original = MapInPandasExec(udf, Seq.empty, emptyChild, false) val rule = DataFlintInstrumentationColumnarRule(spark) - val result = rule.preColumnarTransitions(original) + val result = rule.postColumnarTransitions(original) result shouldBe a[TimedExec] result.asInstanceOf[TimedExec].child shouldBe a[MapInPandasExec] } @@ -63,7 +63,7 @@ class DataFlintPythonExecSpec extends AnyFunSuite with Matchers with BeforeAndAf val udf = fakePythonUDF(PythonEvalType.SQL_MAP_PANDAS_ITER_UDF) val original = MapInPandasExec(udf, Seq.empty, emptyChild, false) val rule = DataFlintInstrumentationColumnarRule(spark) - val result = rule.preColumnarTransitions(original).asInstanceOf[TimedExec] + val result = rule.postColumnarTransitions(original).asInstanceOf[TimedExec] result.metrics should contain key "duration" } @@ -73,7 +73,7 @@ class DataFlintPythonExecSpec extends AnyFunSuite with Matchers with BeforeAndAf val udf = fakePythonUDF(PythonEvalType.SQL_MAP_ARROW_ITER_UDF) val original = PythonMapInArrowExec(udf, Seq.empty, emptyChild, false) val rule = DataFlintInstrumentationColumnarRule(spark) - val result = rule.preColumnarTransitions(original) + val result = rule.postColumnarTransitions(original) result shouldBe a[TimedExec] result.asInstanceOf[TimedExec].child shouldBe a[PythonMapInArrowExec] } @@ -82,7 +82,7 @@ class DataFlintPythonExecSpec extends AnyFunSuite with Matchers with BeforeAndAf val udf = fakePythonUDF(PythonEvalType.SQL_MAP_ARROW_ITER_UDF) val original = PythonMapInArrowExec(udf, Seq.empty, emptyChild, false) val rule = DataFlintInstrumentationColumnarRule(spark) - val result = rule.preColumnarTransitions(original).asInstanceOf[TimedExec] + val result = rule.postColumnarTransitions(original).asInstanceOf[TimedExec] result.metrics should contain key "duration" } @@ -92,7 +92,7 @@ class DataFlintPythonExecSpec extends AnyFunSuite with Matchers with BeforeAndAf val udf = fakePythonUDF(PythonEvalType.SQL_SCALAR_PANDAS_UDF) val original = ArrowEvalPythonExec(Seq(udf), Seq.empty, emptyChild, PythonEvalType.SQL_SCALAR_PANDAS_UDF) val rule = DataFlintInstrumentationColumnarRule(spark) - val result = rule.preColumnarTransitions(original) + val result = rule.postColumnarTransitions(original) result shouldBe a[TimedExec] result.asInstanceOf[TimedExec].child shouldBe a[ArrowEvalPythonExec] } @@ -101,7 +101,7 @@ class DataFlintPythonExecSpec extends AnyFunSuite with Matchers with BeforeAndAf val udf = fakePythonUDF(PythonEvalType.SQL_SCALAR_PANDAS_UDF) val original = ArrowEvalPythonExec(Seq(udf), Seq.empty, emptyChild, PythonEvalType.SQL_SCALAR_PANDAS_UDF) val rule = DataFlintInstrumentationColumnarRule(spark) - val result = rule.preColumnarTransitions(original).asInstanceOf[TimedExec] + val result = rule.postColumnarTransitions(original).asInstanceOf[TimedExec] result.metrics should contain key "duration" } @@ -111,7 +111,7 @@ class DataFlintPythonExecSpec extends AnyFunSuite with Matchers with BeforeAndAf val udf = fakePythonUDF(PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF) val original = FlatMapGroupsInPandasExec(Seq.empty, udf, Seq.empty, emptyChild) val rule = DataFlintInstrumentationColumnarRule(spark) - val result = rule.preColumnarTransitions(original) + val result = rule.postColumnarTransitions(original) result shouldBe a[TimedExec] result.asInstanceOf[TimedExec].child shouldBe a[FlatMapGroupsInPandasExec] } @@ -120,7 +120,7 @@ class DataFlintPythonExecSpec extends AnyFunSuite with Matchers with BeforeAndAf val udf = fakePythonUDF(PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF) val original = FlatMapGroupsInPandasExec(Seq.empty, udf, Seq.empty, emptyChild) val rule = DataFlintInstrumentationColumnarRule(spark) - val result = rule.preColumnarTransitions(original).asInstanceOf[TimedExec] + val result = rule.postColumnarTransitions(original).asInstanceOf[TimedExec] result.metrics should contain key "duration" } @@ -130,7 +130,7 @@ class DataFlintPythonExecSpec extends AnyFunSuite with Matchers with BeforeAndAf val udf = fakePythonUDF(PythonEvalType.SQL_BATCHED_UDF) val original = BatchEvalPythonExec(Seq(udf), Seq.empty, emptyChild) val rule = DataFlintInstrumentationColumnarRule(spark) - val result = rule.preColumnarTransitions(original) + val result = rule.postColumnarTransitions(original) result shouldBe a[TimedExec] result.asInstanceOf[TimedExec].child shouldBe a[BatchEvalPythonExec] } @@ -139,7 +139,7 @@ class DataFlintPythonExecSpec extends AnyFunSuite with Matchers with BeforeAndAf val udf = fakePythonUDF(PythonEvalType.SQL_BATCHED_UDF) val original = BatchEvalPythonExec(Seq(udf), Seq.empty, emptyChild) val rule = DataFlintInstrumentationColumnarRule(spark) - val result = rule.preColumnarTransitions(original).asInstanceOf[TimedExec] + val result = rule.postColumnarTransitions(original).asInstanceOf[TimedExec] result.metrics should contain key "duration" } @@ -149,7 +149,7 @@ class DataFlintPythonExecSpec extends AnyFunSuite with Matchers with BeforeAndAf val udf = fakePythonUDF(PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF) val original = FlatMapCoGroupsInPandasExec(Seq.empty, Seq.empty, udf, Seq.empty, emptyChild, emptyChild) val rule = DataFlintInstrumentationColumnarRule(spark) - val result = rule.preColumnarTransitions(original) + val result = rule.postColumnarTransitions(original) result shouldBe a[TimedExec] result.asInstanceOf[TimedExec].child shouldBe a[FlatMapCoGroupsInPandasExec] } @@ -158,7 +158,7 @@ class DataFlintPythonExecSpec extends AnyFunSuite with Matchers with BeforeAndAf val udf = fakePythonUDF(PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF) val original = FlatMapCoGroupsInPandasExec(Seq.empty, Seq.empty, udf, Seq.empty, emptyChild, emptyChild) val rule = DataFlintInstrumentationColumnarRule(spark) - val result = rule.preColumnarTransitions(original).asInstanceOf[TimedExec] + val result = rule.postColumnarTransitions(original).asInstanceOf[TimedExec] result.metrics should contain key "duration" } } diff --git a/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataFlintWindowExecSpec.scala b/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataFlintWindowExecSpec.scala index 24fb08df..3dd379fd 100644 --- a/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataFlintWindowExecSpec.scala +++ b/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataFlintWindowExecSpec.scala @@ -51,7 +51,7 @@ class DataFlintWindowExecSpec extends AnyFunSuite with Matchers with BeforeAndAf } // With AQE, executedPlan is AdaptiveSparkPlanExec. After collect(), finalPhysicalPlan holds - // the fully optimised plan. TimedExec is injected by the ColumnarRule (preColumnarTransitions), + // the fully optimised plan. TimedExec is injected by the ColumnarRule (postColumnarTransitions), // so it only appears after execution (in finalPhysicalPlan), not in sparkPlan. private def finalPlan(df: DataFrame) = df.queryExecution.executedPlan match { case aqe: AdaptiveSparkPlanExec => aqe.finalPhysicalPlan From 7e67c3f357e1a92020a75071c3609031106c3ecc Mon Sep 17 00:00:00 2001 From: Avi Minsky Date: Mon, 18 May 2026 11:30:55 +0300 Subject: [PATCH 5/6] Add docker setup for DataFusion Comet example Mirrors docker/gluten/ for Apache DataFusion Comet 0.4.0: - Dockerfile based on apache/spark:3.5.7 with Comet plugin + shuffle manager wired in spark-defaults.conf and the JDK 11+ --add-opens flags Comet's off-heap memory path requires. - run-comet-example.sh downloads the Comet jar from Maven Central (cached), builds the DataFlint UI + plugin + example jar, copies them into the docker context, and brings up docker compose. - The released Comet jar bundles linux_amd64 + linux_aarch64 natives in a single artifact, so no architecture-specific download is needed (the --amd64 flag is still there to force Rosetta on Apple Silicon if you want x86_64). macOS users go through Docker because the upstream jar does not include darwin natives. DataFusionCometExample.scala: lowered offHeap to 4g (was 16g, too large for a typical docker container), enabled event logging + DataFlint instrumentation, and replaced the interactive readLine + spark.stop with Thread.sleep(Long.MaxValue) so the Spark UI stays reachable on port 10000 when running detached. Both run scripts now pick the example jar by mtime (`ls -t ... | head -1`) instead of `find ... | head -1`, which was returning a stale jar when many timestamped sbt-dynver outputs accumulated in target/. Co-Authored-By: Claude Opus 4.7 (1M context) --- docker/comet/.gitignore | 3 + docker/comet/Dockerfile | 61 ++++++++ docker/comet/docker-compose.yml | 16 ++ docker/comet/run-comet-example.sh | 146 ++++++++++++++++++ docker/gluten/run-gluten-example.sh | 2 +- .../example/DataFusionCometExample.scala | 10 +- 6 files changed, 234 insertions(+), 4 deletions(-) create mode 100644 docker/comet/.gitignore create mode 100644 docker/comet/Dockerfile create mode 100644 docker/comet/docker-compose.yml create mode 100755 docker/comet/run-comet-example.sh diff --git a/docker/comet/.gitignore b/docker/comet/.gitignore new file mode 100644 index 00000000..fd69d4c9 --- /dev/null +++ b/docker/comet/.gitignore @@ -0,0 +1,3 @@ +jars/ +test_data/ +spark-events/ \ No newline at end of file diff --git a/docker/comet/Dockerfile b/docker/comet/Dockerfile new file mode 100644 index 00000000..708340e4 --- /dev/null +++ b/docker/comet/Dockerfile @@ -0,0 +1,61 @@ +# Spark + Apache DataFusion Comet + DataFlint example runner +# +# Build arguments: +# SPARK_VERSION - Spark version (default: 3.5.7) +# +# Usage: +# ./run-comet-example.sh (recommended — builds everything and runs) +# docker compose up --build (if jars are already in jars/) + +ARG SPARK_VERSION=3.5.7 + +FROM apache/spark:${SPARK_VERSION} + +ARG SPARK_VERSION=3.5.7 + +USER root + +# Create directories for event logs and test data +RUN mkdir -p /tmp/spark-events && \ + chown -R spark:spark /tmp/spark-events && \ + mkdir -p /opt/spark/work-dir/test_data && \ + chown -R spark:spark /opt/spark/work-dir/test_data + +# Copy all jars (Comet + DataFlint plugin + example) into Spark's jars dir +COPY jars/*.jar /opt/spark/jars/ + +# Copy test data +COPY test_data/ /opt/spark/work-dir/test_data/ + +# Configure Spark defaults for Comet +# The --add-opens flags match what Apache Comet documents as required on Java 11+ for its +# off-heap memory access path (sun.misc.Unsafe, DirectByteBuffer, jdk.internal.misc). +RUN mkdir -p /opt/spark/conf && \ + echo "spark.plugins=io.dataflint.spark.SparkDataflintPlugin,org.apache.spark.CometPlugin" >> /opt/spark/conf/spark-defaults.conf && \ + echo "spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager" >> /opt/spark/conf/spark-defaults.conf && \ + echo "spark.comet.explainFallback.enabled=true" >> /opt/spark/conf/spark-defaults.conf && \ + echo "spark.memory.offHeap.enabled=true" >> /opt/spark/conf/spark-defaults.conf && \ + echo "spark.memory.offHeap.size=4g" >> /opt/spark/conf/spark-defaults.conf && \ + echo "spark.eventLog.enabled=true" >> /opt/spark/conf/spark-defaults.conf && \ + echo "spark.eventLog.dir=/tmp/spark-events" >> /opt/spark/conf/spark-defaults.conf && \ + echo "spark.ui.port=10000" >> /opt/spark/conf/spark-defaults.conf && \ + echo "spark.dataflint.telemetry.enabled=false" >> /opt/spark/conf/spark-defaults.conf && \ + echo "spark.sql.maxMetadataStringLength=10000" >> /opt/spark/conf/spark-defaults.conf && \ + echo "spark.sql.adaptive.enabled=true" >> /opt/spark/conf/spark-defaults.conf && \ + echo "spark.driver.extraJavaOptions=--add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.misc=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED" >> /opt/spark/conf/spark-defaults.conf && \ + echo "spark.executor.extraJavaOptions=--add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.misc=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED" >> /opt/spark/conf/spark-defaults.conf + +USER spark + +EXPOSE 10000 + +WORKDIR /opt/spark/work-dir + +ENV _JAVA_OPTIONS="--add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.misc=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED -Dio.netty.tryReflectionSetAccessible=true" + +# Run the Comet example via spark-submit +CMD ["/opt/spark/bin/spark-submit", \ + "--master", "local[*]", \ + "--class", "io.dataflint.example.DataFusionCometExample", \ + "--driver-memory", "2g", \ + "/opt/spark/jars/example.jar"] \ No newline at end of file diff --git a/docker/comet/docker-compose.yml b/docker/comet/docker-compose.yml new file mode 100644 index 00000000..89027ddf --- /dev/null +++ b/docker/comet/docker-compose.yml @@ -0,0 +1,16 @@ +services: + spark-comet-example: + build: + context: . + dockerfile: Dockerfile + args: + SPARK_VERSION: ${SPARK_VERSION:-3.5.7} + image: dataflint-comet-example:${SPARK_VERSION:-3.5.7} + container_name: dataflint-comet-example + ports: + - "${SPARK_UI_PORT:-10000}:10000" + volumes: + - ${SPARK_EVENTS_DIR:-./spark-events}:/tmp/spark-events + environment: + - SPARK_NO_DAEMONIZE=true + restart: "no" \ No newline at end of file diff --git a/docker/comet/run-comet-example.sh b/docker/comet/run-comet-example.sh new file mode 100755 index 00000000..48d16e2a --- /dev/null +++ b/docker/comet/run-comet-example.sh @@ -0,0 +1,146 @@ +#!/bin/bash +set -e + +# Run DataFlint Apache DataFusion Comet Example +# +# This script: +# 1. Downloads the Apache Comet jar from Maven Central (cached) +# 2. Builds the DataFlint UI and plugin jar +# 3. Packages the Comet example app +# 4. Builds and runs the Docker container +# +# Apache Comet ships native libraries inside the single Maven jar (Linux x86_64 + aarch64). +# Running on macOS goes through Docker (Linux container) because the released jar does not +# include darwin natives — local `sbt run` will fail on macOS. +# +# Prerequisites: Node.js 20+, Java 8+, sbt, Docker +# +# Usage: +# ./run-comet-example.sh # full build + run +# ./run-comet-example.sh --skip-build # skip sbt/npm, just rebuild Docker +# ./run-comet-example.sh --amd64 # force x86_64 (Rosetta 2 emulation) + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)" +JARS_DIR="$SCRIPT_DIR/jars" +TEST_DATA_DIR="$SCRIPT_DIR/test_data" +SPARK_EVENTS_DIR="$SCRIPT_DIR/spark-events" + +SPARK_VERSION="${SPARK_VERSION:-3.5.7}" +SCALA_VERSION="${SCALA_VERSION:-2.12}" +COMET_VERSION="${COMET_VERSION:-0.4.0}" + +SKIP_BUILD=false +FORCE_AMD64=false + +for arg in "$@"; do + case $arg in + --skip-build) SKIP_BUILD=true ;; + --amd64) FORCE_AMD64=true ;; + esac +done + +# Apple Silicon can run the aarch64 natives that ship inside the Comet jar; --amd64 forces +# Rosetta emulation if you specifically need x86_64. +ARCH=$(uname -m) +if [ "$FORCE_AMD64" = true ]; then + DOCKER_PLATFORM="--platform linux/amd64" +elif [ "$ARCH" = "arm64" ] || [ "$ARCH" = "aarch64" ]; then + DOCKER_PLATFORM="" +else + DOCKER_PLATFORM="" +fi + +COMET_JAR_NAME="comet-spark-spark3.5_${SCALA_VERSION}-${COMET_VERSION}.jar" +COMET_JAR_URL="https://repo1.maven.org/maven2/org/apache/datafusion/comet-spark-spark3.5_${SCALA_VERSION}/${COMET_VERSION}/${COMET_JAR_NAME}" + +echo "=== DataFlint Apache Comet Example ===" +echo "Project root: $PROJECT_ROOT" +echo "Spark version: $SPARK_VERSION" +echo "Comet version: $COMET_VERSION" +echo "Comet jar: $COMET_JAR_NAME" +echo "" + +mkdir -p "$JARS_DIR" +mkdir -p "$SPARK_EVENTS_DIR" + +# --- Step 1: Download Comet jar (cached) --- +echo "=== Step 1: Downloading Comet jar ===" +if [ -f "$JARS_DIR/$COMET_JAR_NAME" ]; then + echo "Comet jar already cached: $JARS_DIR/$COMET_JAR_NAME" +else + echo "Downloading: $COMET_JAR_URL" + curl -fSL -o "$JARS_DIR/$COMET_JAR_NAME" "$COMET_JAR_URL" + echo "Downloaded successfully." +fi + +if [ "$SKIP_BUILD" = false ]; then + # --- Step 2: Build DataFlint UI --- + echo "" + echo "=== Step 2: Building DataFlint UI ===" + cd "$PROJECT_ROOT/spark-ui" + if [ ! -d "node_modules" ]; then + echo "Installing npm dependencies..." + npm ci + fi + echo "Building and deploying UI into plugin resources..." + npm run deploy + + # --- Step 3: Build DataFlint plugin jar --- + echo "" + echo "=== Step 3: Building DataFlint plugin jar ===" + cd "$PROJECT_ROOT/spark-plugin" + export SBT_OPTS="-Xmx4G -Xss2M -XX:+UseG1GC" + sbt "pluginspark3/assembly" + + # --- Step 4: Package example jar --- + echo "" + echo "=== Step 4: Packaging example jar ===" + sbt "example_3_5_1/package" +fi + +# --- Step 5: Copy jars to docker context --- +echo "" +echo "=== Step 5: Copying jars to Docker context ===" + +# DataFlint plugin jar +PLUGIN_JAR=$(find "$PROJECT_ROOT/spark-plugin/pluginspark3/target/scala-${SCALA_VERSION}" -name "spark_${SCALA_VERSION}-*.jar" -type f | head -1) +if [ -z "$PLUGIN_JAR" ]; then + echo "ERROR: DataFlint plugin jar not found. Run without --skip-build first." + exit 1 +fi +cp "$PLUGIN_JAR" "$JARS_DIR/dataflint-plugin.jar" +echo "Copied DataFlint plugin: $(basename "$PLUGIN_JAR")" + +# Example jar +EXAMPLE_JAR=$(ls -t "$PROJECT_ROOT/spark-plugin/example_3_5_1/target/scala-${SCALA_VERSION}"/dataflintsparkexample351_${SCALA_VERSION}-*.jar 2>/dev/null | head -1) +if [ -z "$EXAMPLE_JAR" ]; then + echo "ERROR: Example jar not found. Run without --skip-build first." + exit 1 +fi +cp "$EXAMPLE_JAR" "$JARS_DIR/example.jar" +echo "Copied example jar: $(basename "$EXAMPLE_JAR")" + +echo "Comet jar: $COMET_JAR_NAME" + +# --- Step 6: Copy test data --- +echo "" +echo "=== Step 6: Copying test data ===" +rm -rf "$TEST_DATA_DIR" +cp -r "$PROJECT_ROOT/spark-plugin/test_data" "$TEST_DATA_DIR" +echo "Copied test_data/" + +# --- Step 7: Build and run Docker --- +echo "" +echo "=== Step 7: Building and running Docker container ===" +cd "$SCRIPT_DIR" + +# Stop any previous container +docker compose down 2>/dev/null || true + +# Build with platform flag if needed +if [ -n "$DOCKER_PLATFORM" ]; then + DOCKER_DEFAULT_PLATFORM=linux/amd64 docker compose up --build +else + docker compose up --build +fi \ No newline at end of file diff --git a/docker/gluten/run-gluten-example.sh b/docker/gluten/run-gluten-example.sh index 8afaf1c8..f958ecd4 100755 --- a/docker/gluten/run-gluten-example.sh +++ b/docker/gluten/run-gluten-example.sh @@ -110,7 +110,7 @@ cp "$PLUGIN_JAR" "$JARS_DIR/dataflint-plugin.jar" echo "Copied DataFlint plugin: $(basename "$PLUGIN_JAR")" # Example jar -EXAMPLE_JAR=$(find "$PROJECT_ROOT/spark-plugin/example_3_5_1/target/scala-${SCALA_VERSION}" -iname "dataflintsparkexample351_${SCALA_VERSION}-*.jar" -type f | head -1) +EXAMPLE_JAR=$(ls -t "$PROJECT_ROOT/spark-plugin/example_3_5_1/target/scala-${SCALA_VERSION}"/dataflintsparkexample351_${SCALA_VERSION}-*.jar 2>/dev/null | head -1) if [ -z "$EXAMPLE_JAR" ]; then echo "ERROR: Example jar not found. Run without --skip-build first." exit 1 diff --git a/spark-plugin/example_3_5_1/src/main/scala/io/dataflint/example/DataFusionCometExample.scala b/spark-plugin/example_3_5_1/src/main/scala/io/dataflint/example/DataFusionCometExample.scala index 38e3d072..4cabcf29 100644 --- a/spark-plugin/example_3_5_1/src/main/scala/io/dataflint/example/DataFusionCometExample.scala +++ b/spark-plugin/example_3_5_1/src/main/scala/io/dataflint/example/DataFusionCometExample.scala @@ -19,9 +19,12 @@ object DataFusionCometExample extends App { .config("spark.shuffle.manager", "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") .config("spark.comet.explainFallback.enabled", "true") .config("spark.memory.offHeap.enabled", "true") - .config("spark.memory.offHeap.size", "16g") + .config("spark.memory.offHeap.size", "4g") .config("spark.ui.port", "10000") + .config("spark.eventLog.enabled", "true") + .config("spark.eventLog.dir", "/tmp/spark-events") .config("spark.dataflint.telemetry.enabled", value = false) + .config("spark.dataflint.instrument.spark.enabled", value = true) .config("spark.sql.maxMetadataStringLength", "10000") .master("local[*]") .getOrCreate() @@ -45,6 +48,7 @@ object DataFusionCometExample extends App { // spark.read.load("/Users/menishmueli/Documents/GitHub/spark-sql-perf/data/store_sales").filter($"ss_quantity" > 1).count() - scala.io.StdIn.readLine() - spark.stop() + println("DataFusionCometExample completed. Spark UI available at http://localhost:10000") + println("Press Ctrl+C to stop.") + Thread.sleep(Long.MaxValue) } From d1d2bab5beda19876bb6c2bfb8c3069bfab5395b Mon Sep 17 00:00:00 2001 From: Avi Minsky Date: Mon, 18 May 2026 12:07:44 +0300 Subject: [PATCH 6/6] Address Copilot PR review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - SqlReducer.calcBroadcastExchangeDuration: chain all three broadcast durations (broadcast + build + collect) into a single sum. The previous code left `+(getMetricDuration("time to build", metrics))...` as a dead expression statement after the prior statement terminated at `;`, so only "time to broadcast" was returned. - SqlReducer.buildFallbackPlanDescriptions: capture the rest of the section header line (`(.+?)\s*$`) instead of the first token (`\S+`). Without this, node names with spaces ("Scan csv", "WholeStageCodegenTransformer (1)") never matched the fallback section and their plan descriptions were dropped. - GlutenStageAssignment.spec.ts: removed the per-node console.log debug loop. Test assertions already report enough on failure. - docker/gluten + docker/comet Dockerfiles: removed the broken `--add-opens=java.base/sun.misc=ALL-UNNAMED` flag (sun.misc.Unsafe lives in jdk.unsupported, not java.base — the JVM was emitting `WARNING: package sun.misc not in java.base` on every run). Also dropped the `spark.driver.extraJavaOptions` and `spark.executor.extraJavaOptions` lines entirely; the bottom-of-file `_JAVA_OPTIONS` env already supplies the same flags to driver and executor JVMs. Verified Velox/Comet still run end-to-end after the cleanup. Co-Authored-By: Claude Opus 4.7 (1M context) --- docker/comet/Dockerfile | 12 +++++------- docker/gluten/Dockerfile | 12 +++++------- spark-ui/src/reducers/SqlReducer.ts | 9 ++++++--- .../reducers/__tests__/GlutenStageAssignment.spec.ts | 10 ---------- 4 files changed, 16 insertions(+), 27 deletions(-) diff --git a/docker/comet/Dockerfile b/docker/comet/Dockerfile index 708340e4..9bb56342 100644 --- a/docker/comet/Dockerfile +++ b/docker/comet/Dockerfile @@ -27,9 +27,9 @@ COPY jars/*.jar /opt/spark/jars/ # Copy test data COPY test_data/ /opt/spark/work-dir/test_data/ -# Configure Spark defaults for Comet -# The --add-opens flags match what Apache Comet documents as required on Java 11+ for its -# off-heap memory access path (sun.misc.Unsafe, DirectByteBuffer, jdk.internal.misc). +# Configure Spark defaults for Comet. +# JVM module-access flags live in the _JAVA_OPTIONS env at the bottom of this Dockerfile; +# they apply to all JVMs spawned (driver + executors) on Java 11+. RUN mkdir -p /opt/spark/conf && \ echo "spark.plugins=io.dataflint.spark.SparkDataflintPlugin,org.apache.spark.CometPlugin" >> /opt/spark/conf/spark-defaults.conf && \ echo "spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager" >> /opt/spark/conf/spark-defaults.conf && \ @@ -41,9 +41,7 @@ RUN mkdir -p /opt/spark/conf && \ echo "spark.ui.port=10000" >> /opt/spark/conf/spark-defaults.conf && \ echo "spark.dataflint.telemetry.enabled=false" >> /opt/spark/conf/spark-defaults.conf && \ echo "spark.sql.maxMetadataStringLength=10000" >> /opt/spark/conf/spark-defaults.conf && \ - echo "spark.sql.adaptive.enabled=true" >> /opt/spark/conf/spark-defaults.conf && \ - echo "spark.driver.extraJavaOptions=--add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.misc=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED" >> /opt/spark/conf/spark-defaults.conf && \ - echo "spark.executor.extraJavaOptions=--add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.misc=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED" >> /opt/spark/conf/spark-defaults.conf + echo "spark.sql.adaptive.enabled=true" >> /opt/spark/conf/spark-defaults.conf USER spark @@ -51,7 +49,7 @@ EXPOSE 10000 WORKDIR /opt/spark/work-dir -ENV _JAVA_OPTIONS="--add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.misc=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED -Dio.netty.tryReflectionSetAccessible=true" +ENV _JAVA_OPTIONS="--add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED -Dio.netty.tryReflectionSetAccessible=true" # Run the Comet example via spark-submit CMD ["/opt/spark/bin/spark-submit", \ diff --git a/docker/gluten/Dockerfile b/docker/gluten/Dockerfile index f78dc570..bbb48ba6 100644 --- a/docker/gluten/Dockerfile +++ b/docker/gluten/Dockerfile @@ -28,9 +28,9 @@ COPY jars/*.jar /opt/spark/jars/ # Copy test data COPY test_data/ /opt/spark/work-dir/test_data/ -# Configure Spark defaults for Gluten -# The --add-opens flags are required because the Gluten nightly (JDK8 target) uses -# sun.misc.Unsafe / DirectByteBuffer internals that are module-restricted on Java 11+. +# Configure Spark defaults for Gluten. +# JVM module-access flags live in the _JAVA_OPTIONS env at the bottom of this Dockerfile; +# they apply to all JVMs spawned (driver + executors) on Java 11+. RUN mkdir -p /opt/spark/conf && \ echo "spark.plugins=io.dataflint.spark.SparkDataflintPlugin,org.apache.gluten.GlutenPlugin" >> /opt/spark/conf/spark-defaults.conf && \ echo "spark.shuffle.manager=org.apache.spark.shuffle.sort.ColumnarShuffleManager" >> /opt/spark/conf/spark-defaults.conf && \ @@ -41,9 +41,7 @@ RUN mkdir -p /opt/spark/conf && \ echo "spark.ui.port=10000" >> /opt/spark/conf/spark-defaults.conf && \ echo "spark.dataflint.telemetry.enabled=false" >> /opt/spark/conf/spark-defaults.conf && \ echo "spark.sql.maxMetadataStringLength=10000" >> /opt/spark/conf/spark-defaults.conf && \ - echo "spark.sql.adaptive.enabled=true" >> /opt/spark/conf/spark-defaults.conf && \ - echo "spark.driver.extraJavaOptions=--add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.misc=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED" >> /opt/spark/conf/spark-defaults.conf && \ - echo "spark.executor.extraJavaOptions=--add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.misc=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED" >> /opt/spark/conf/spark-defaults.conf + echo "spark.sql.adaptive.enabled=true" >> /opt/spark/conf/spark-defaults.conf USER spark @@ -51,7 +49,7 @@ EXPOSE 10000 WORKDIR /opt/spark/work-dir -ENV _JAVA_OPTIONS="--add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.misc=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED -Dio.netty.tryReflectionSetAccessible=true" +ENV _JAVA_OPTIONS="--add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED -Dio.netty.tryReflectionSetAccessible=true" # Run the Gluten example via spark-submit CMD ["/opt/spark/bin/spark-submit", \ diff --git a/spark-ui/src/reducers/SqlReducer.ts b/spark-ui/src/reducers/SqlReducer.ts index a3b6a12b..9ade387f 100644 --- a/spark-ui/src/reducers/SqlReducer.ts +++ b/spark-ui/src/reducers/SqlReducer.ts @@ -279,7 +279,9 @@ function buildFallbackPlanDescriptions( let currentBody: string[] = []; for (const line of lines) { - const headerMatch = line.match(/^\((\d+)\)\s+(\S+)/); + // Capture the rest of the header line, not just the first token, so node names + // with spaces match (e.g. "Scan csv", "WholeStageCodegenTransformer (1)"). + const headerMatch = line.match(/^\((\d+)\)\s+(.+?)\s*$/); if (headerMatch) { if (currentName !== undefined && currentBody.length > 0) { sections.push({ name: currentName, body: currentBody.join(" ") }); @@ -818,8 +820,9 @@ function calcBroadcastExchangeDuration( metrics: EnrichedSqlMetric[], ): number | undefined { if (nodeName === "BroadcastExchange" || nodeName === "ColumnarBroadcastExchange") { - const duration = getMetricDuration("time to broadcast", metrics) ?? 0; - +(getMetricDuration("time to build", metrics) ?? 0) + + const duration = + (getMetricDuration("time to broadcast", metrics) ?? 0) + + (getMetricDuration("time to build", metrics) ?? 0) + (getMetricDuration("time to collect", metrics) ?? 0); return duration; } diff --git a/spark-ui/src/reducers/__tests__/GlutenStageAssignment.spec.ts b/spark-ui/src/reducers/__tests__/GlutenStageAssignment.spec.ts index 3c3dc03d..fc185353 100644 --- a/spark-ui/src/reducers/__tests__/GlutenStageAssignment.spec.ts +++ b/spark-ui/src/reducers/__tests__/GlutenStageAssignment.spec.ts @@ -112,16 +112,6 @@ describe("Gluten/Velox Stage Assignment - SQL 4 (Sort by line count)", () => { const result = calculateSqlStage(sql, stages, jobs as any); - // Debug: print all node stages - for (const node of result.nodes) { - const stageInfo = node.stage - ? node.stage.type === "onestage" - ? `onestage:${node.stage.stageId}` - : `exchange:w=${node.stage.writeStage},r=${node.stage.readStage}` - : "NONE"; - console.log(` Node ${node.nodeId}: ${node.nodeName.padEnd(45)} stage=${stageInfo} wcid=${node.wholeStageCodegenId}`); - } - // Pre-shuffle nodes should be in stage 8 const scanNode = result.nodes.find(n => n.nodeName === "Scan csv"); expect(scanNode?.stage?.type).toBe("onestage");