Add Apache Gluten/Velox support to DataFlint UI#59
Conversation
- Add Gluten/Velox node type classification, display names, and accelerator badges (Velox, Photon, RAPIDS, DataFusion) in the SQL plan flow view - Fix stage identification for Gluten's WholeStageCodegenTransformer nodes by inferring codegen-to-node mapping and handling AQE codegen renumbering - Split ColumnarExchange into write/read visual nodes across stage boundaries - Propagate stages through Gluten-specific boundary nodes (VeloxResizeBatches, RowToVeloxColumnar, TakeOrderedAndProjectExecTransformer, etc.) - Show Velox native timing metrics (aggregation/filter/sort/window time, peak memory, spill) on plan nodes - Strip Gluten class name prefixes from plan descriptions in parsers - Add Docker environment and example app for running Gluten/Velox on Spark 3.5 - Add unit test for Gluten stage assignment with real fixture data
|
|
ScreenshotsStatus Page — GlutenVeloxExample with 16 SQL QueriesThe DataFlint status page correctly identifies the Gluten/Velox application. GroupBy Aggregation (SQL 3) — Two stages with split exchangeShows Stage 5 (write side) and Stage 7 (read side) with the ColumnarExchange visually split. All Velox-accelerated nodes have orange Velox badges. Native metrics like Aggregation Time and Peak Memory are displayed. Window Functions (SQL 10) — Three stages with window partition fieldsShows Stage 47, Stage 49, and Stage 52 with two split ColumnarExchange nodes. Window nodes display Partition Fields (play_name), Sort Fields, and Select Fields parsed from the physical plan.
|
- Add CometExchange, CometColumnarExchange, GpuColumnarExchange to exchange visual split, stage assignment, and shuffle metrics calculation - Add CometHashAggregate to aggregate node parsing and naming - Support Comet plan description format (Keys:/Functions:) in parser - Re-add fallback plan description parsing from SQL-level planDescription for native engines where DataFlint custom endpoint returns empty
…support # Conflicts: # spark-ui/src/reducers/PlanParsers/WindowParser.ts # spark-ui/src/reducers/SQLNodeStageReducer.ts # spark-ui/src/reducers/SqlReducer.ts # spark-ui/src/reducers/SqlReducerUtils.ts
There was a problem hiding this comment.
Pull request overview
Adds Apache Gluten/Velox (and related accelerators) support to the DataFlint UI’s SQL plan flow view, including new node classifications/labels, metrics, accelerator badges, and stage-assignment improvements, plus a runnable Docker + example app environment for validation.
Changes:
- Extend SQL node typing/renaming and metric handling to cover Gluten/Velox operators and Velox-native metrics.
- Improve stage identification for Gluten WholeStageCodegenTransformer and broaden exchange/stage-boundary handling.
- Add UI accelerator badges and a Docker + Scala example app for running/validating Gluten/Velox plans.
Reviewed changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| spark-ui/src/reducers/SqlReducerUtils.ts | Add Velox/Gluten node mappings, metric allowlists/transformers/renames, exchange/aggregate lists, and accelerator lookup. |
| spark-ui/src/reducers/SqlReducer.ts | Add fallback plan-description parsing, Gluten codegen-id inference, and extend exchange/broadcast metric handling. |
| spark-ui/src/reducers/PlanParsers/ProjectParser.ts | Support parsing ProjectExecTransformer. |
| spark-ui/src/reducers/PlanParsers/hashAggregateParser.ts | Support alternate Keys: / Functions [...] formats. |
| spark-ui/src/reducers/PlanParsers/FilterParser.ts | Support parsing FilterExecTransformer. |
| spark-ui/src/reducers/PlanParsers/ExchangeParser.ts | Broaden exchange type parsing to include Columnar/Broadcast exchange variants. |
| spark-ui/src/reducers/PlanGraphUtils.ts | Extend “skip-through” node list for input-row backtracking with Velox/Gluten node names. |
| spark-ui/src/reducers/tests/GlutenStageAssignment.spec.ts | New test validating stage assignment with a real Gluten fixture. |
| spark-ui/src/reducers/tests/gluten-sql4-fixture.json | New fixture data for Gluten stage assignment test. |
| spark-ui/src/components/SqlFlow/StageNode.tsx | Show accelerator badge (Velox/Photon/RAPIDS/DataFusion) on nodes. |
| spark-ui/src/components/SqlFlow/SqlLayoutService.ts | Treat Columnar/Comet/GPU exchanges as split exchange nodes in layout. |
| spark-plugin/example_3_5_1/src/main/scala/io/dataflint/example/GlutenVeloxExample.scala | New Scala example app exercising common operators under Gluten/Velox. |
| spark-plugin/example_3_5_1/src/main/scala/io/dataflint/example/DataFusionCometExample.scala | Comment out a hardcoded local-path example line. |
| docker/gluten/run-gluten-example.sh | Automation script to build artifacts, download Gluten bundle, and run Docker. |
| docker/gluten/Dockerfile | New container image to run Spark + Gluten/Velox + DataFlint example. |
| docker/gluten/docker-compose.yml | Compose definition for running the Gluten example container. |
| docker/gluten/.gitignore | Ignore generated jars/test data/event logs for the Docker example. |
Comments suppressed due to low confidence (1)
spark-ui/src/reducers/SqlReducer.ts:824
- In
calcBroadcastExchangeDuration, thetime to build/time to collectdurations are currently not added todurationbecause they’re on a separate statement starting with unary+, so the function always returns onlytime to broadcast. Combine these into a single summed expression (orduration += ...) so the returned duration reflects all broadcast phases.
if (nodeName === "BroadcastExchange" || nodeName === "ColumnarBroadcastExchange") {
const duration = getMetricDuration("time to broadcast", metrics) ?? 0;
+(getMetricDuration("time to build", metrics) ?? 0) +
(getMetricDuration("time to collect", metrics) ?? 0);
return duration;
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
When a native query accelerator (Gluten/Velox, Comet/DataFusion, RAPIDS GPU, Photon) is on the classpath, DataFlint's TimedExec wrapper hides Spark operators from the accelerator's plan-substitution rule because TimedExec uses transparent children on Spark 3.2+. The accelerator silently falls back to the JVM path. Detect each accelerator via a classpath probe and skip wrapping any operator it would transform, so both DataFlint instrumentation and native acceleration coexist. Other fixes pulled in along the way: - Spark UI: GraphDurationAttribution recognizes columnar exchanges and native blocking nodes (ColumnarExchange/CometExchange/Gpu*/Photon*) as stage boundaries and blocking operators. - GlutenStageAssignment.spec.ts: drop the unreachable third branch on the discriminated stage union that tripped tsc. - docker/gluten/run-gluten-example.sh: case-insensitive find for the example jar (sbt lowercases normalizedName, BSD find is case-sensitive). - GlutenVeloxExample now enables spark.dataflint.instrument.spark.enabled to exercise the new coexistence path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
ApplyColumnarRulesAndInsertTransitions applies pre rules in registration order then post rules in **reverse** registration order (Columnar.scala:570-576). The DataFlint plugin's init() runs first and auto-registers its SessionExtension first, so the DataFlint columnar rule sits at index 0 in columnarRules. Moving the TimedExec wrap from preColumnarTransitions to postColumnarTransitions makes it run **last**, after every accelerator's pre-phase substitution. By that point Gluten/Comet/RAPIDS/Photon have already swapped FilterExec → FilterExecTransformer (and the Comet/Gpu/Photon equivalents) in their preColumnarTransitions. DataFlint's class-name match list contains only Spark-native classes, so transformed nodes naturally don't match — no explicit accelerator skip list needed, and the result is robust to plugin registration order. The explicit Accelerator-aware skip in enabledNodeNames is removed; only an informational log naming detected accelerators remains, so users see the coexistence path is engaged. DataFlintPythonExecSpec swapped its 12 direct preColumnarTransitions test calls to postColumnarTransitions; DataFlintWindowExecSpec's comment about the injection phase was updated. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Mirrors docker/gluten/ for Apache DataFusion Comet 0.4.0: - Dockerfile based on apache/spark:3.5.7 with Comet plugin + shuffle manager wired in spark-defaults.conf and the JDK 11+ --add-opens flags Comet's off-heap memory path requires. - run-comet-example.sh downloads the Comet jar from Maven Central (cached), builds the DataFlint UI + plugin + example jar, copies them into the docker context, and brings up docker compose. - The released Comet jar bundles linux_amd64 + linux_aarch64 natives in a single artifact, so no architecture-specific download is needed (the --amd64 flag is still there to force Rosetta on Apple Silicon if you want x86_64). macOS users go through Docker because the upstream jar does not include darwin natives. DataFusionCometExample.scala: lowered offHeap to 4g (was 16g, too large for a typical docker container), enabled event logging + DataFlint instrumentation, and replaced the interactive readLine + spark.stop with Thread.sleep(Long.MaxValue) so the Spark UI stays reachable on port 10000 when running detached. Both run scripts now pick the example jar by mtime (`ls -t ... | head -1`) instead of `find ... | head -1`, which was returning a stale jar when many timestamped sbt-dynver outputs accumulated in target/. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Re Copilot's low-confidence note about |
- SqlReducer.calcBroadcastExchangeDuration: chain all three broadcast
durations (broadcast + build + collect) into a single sum. The
previous code left `+(getMetricDuration("time to build", metrics))...`
as a dead expression statement after the prior statement terminated
at `;`, so only "time to broadcast" was returned.
- SqlReducer.buildFallbackPlanDescriptions: capture the rest of the
section header line (`(.+?)\s*$`) instead of the first token
(`\S+`). Without this, node names with spaces ("Scan csv",
"WholeStageCodegenTransformer (1)") never matched the fallback
section and their plan descriptions were dropped.
- GlutenStageAssignment.spec.ts: removed the per-node console.log debug
loop. Test assertions already report enough on failure.
- docker/gluten + docker/comet Dockerfiles: removed the broken
`--add-opens=java.base/sun.misc=ALL-UNNAMED` flag (sun.misc.Unsafe
lives in jdk.unsupported, not java.base — the JVM was emitting
`WARNING: package sun.misc not in java.base` on every run). Also
dropped the `spark.driver.extraJavaOptions` and
`spark.executor.extraJavaOptions` lines entirely; the bottom-of-file
`_JAVA_OPTIONS` env already supplies the same flags to driver and
executor JVMs. Verified Velox/Comet still run end-to-end after the
cleanup.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
Adds first-class support for native query accelerators in DataFlint — both in the Spark UI side (node classification, parsers, stage attribution) and the JVM-side plugin (instrumentation that no longer fights the accelerator's plan substitution). Includes runnable Docker examples for Apache Gluten/Velox and Apache DataFusion Comet.
What's in
UI / SqlFlow
nodeTypeDict,nodeRenamerDict, exchange/blocking sets inGraphDurationAttribution.ts, and accelerator badges onStageNode.WindowParser,FilterParser,ProjectParser,ExchangeParser,hashAggregateParseraccept transformer/columnar variants (e.g.WindowExecTransformer,ColumnarExchange,CometExchange).SqlReducerinferswholeStageCodegenIdfor Gluten'sWholeStageCodegenTransformer(no parent edges in the graph) by ID-ordered scan up to stage boundaries; AQE codegen renumbering is reconciled at the stage-attribution step.time of aggregation/filter/sort/window/generate/hash build/probe,number of hash build/probe input rows,number of spilled bytes,peak memory bytes. Existing metrics likeagg time,build time,durationcontinue to work.GlutenStageAssignment.spec.tswith a real fixture exercising the full stage-attribution pipeline against a Gluten plan.JVM Plugin
Accelerator.scala(new, in sharedplugin/module): registry of known accelerators (Gluten/Velox, Comet/DataFusion, RAPIDS GPU, Photon) — each with its detection classes and the set of Spark*Execoperators it substitutes.Accelerator.activeprobes the classpath once and caches the result.postColumnarTransitions:ApplyColumnarRulesAndInsertTransitionsapplies pre rules in registration order then post rules in reverse order. The DataFlint plugin auto-registers itsSessionExtensionfirst, so its columnar rule sits at index 0 and naturally runs last in post. By that point every accelerator has already swappedFilterExec → FilterExecTransformeretc. in pre, and DataFlint's Spark-class match list misses the transformer variants naturally — no explicit skip list needed, robust to plugin registration order.!isInstanceOf[TimedExec]guard) so the rule re-runs cleanly under AQE.Examples / Docker
docker/gluten/: runnable end-to-end script that downloads the Gluten nightly bundle (cached), builds the DataFlint UI/plugin/example, copies jars into the Docker context, brings up docker compose.GlutenVeloxExample.scalaexercises filter/project/agg/sort/joins/window/explode/union with DataFlint instrumentation enabled. Spark UI onlocalhost:10000.docker/comet/: mirrors the Gluten setup for Apache DataFusion Comet 0.4.0. Comet's Maven jar bundles Linux x86_64/aarch64 natives in a single artifact; macOS users go through the Linux container.ls -t ... | head -1) so stale sbt-dynver outputs intarget/don't get used.How it solves the coexistence problem
TimedExecis transparent on Spark 3.2+ (children = child.children), which hides the wrapped operator from any other rule's plan traversal. Before this PR, wrappingFilterExecinpreColumnarTransitionsmade the accelerator's rule walk past it — the query silently fell back to the Spark JVM path. Moving the wrap topostColumnarTransitionsexploits Spark's reverse-iteration of post rules, so the accelerator substitutes operators first and DataFlint instruments whatever's left (fallback ops, writes, Python UDFs).Test plan
GlutenStageAssignment.spec.tswith a real Gluten fixture)DataFlintPythonExecSpec,DataFlintWindowExecSpec,DataFlintSqlNodesSpec,TimedExecMetricsSpec./docker/gluten/run-gluten-example.shbrings up SparkUI atlocalhost:10000, log showsGluten/Velox detected, Velox ops execute, DataFlint wraps the fallback scans./docker/comet/run-comet-example.shbrings up SparkUI, log showsComet/DataFusion detected, Comet native runtime initializes, DataFlint wraps the fallback ops🤖 Generated with Claude Code