From 72c1b90eb132754ab3d57acd3042676507f3b55e Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 23 May 2026 15:32:53 +0000 Subject: [PATCH 01/16] feat: capture full IO paths via QueryExecutionListener MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds DataflintIOListener that walks the typed analyzed logical plan after every action and emits DataflintIOEvent with full paths/table identifiers extracted from typed plan nodes — bypassing SQLConf.maxToStringFields truncation that affects the existing plan-description regex extraction. Reads: LogicalRelation+HadoopFsRelation, DataSourceV2Relation. Writes: InsertIntoHadoopFsRelationCommand, SaveIntoDataSourceCommand, CreateDataSourceTableAsSelectCommand, V2WriteCommand. Gated behind spark.dataflint.io.tracking.enabled (default false); registers the listener via spark.sql.queryExecutionListeners so Spark instantiates it when the SparkSession is built. DataflintListener dispatches the new event into the existing ElementTrackingStore. Integration spec asserts the captured write/read paths are not truncated under aggressive spark.sql.maxPlanStringLength. https://claude.ai/code/session_01DrzKAGkGTXdfbNYVGQdCwB --- .../DataflintSparkUICommonLoader.scala | 33 +++++ .../listener/DataflintIOExtractor.scala | 132 ++++++++++++++++++ .../listener/DataflintIOListener.scala | 47 +++++++ .../listener/DataflintListener.scala | 4 + .../spark/dataflint/listener/model.scala | 28 ++++ .../dataflint/DataflintIOListenerSpec.scala | 99 +++++++++++++ 6 files changed, 343 insertions(+) create mode 100644 spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintIOExtractor.scala create mode 100644 spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintIOListener.scala create mode 100644 spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintIOListenerSpec.scala diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/DataflintSparkUICommonLoader.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/DataflintSparkUICommonLoader.scala index 58cbdd23..867e2888 100644 --- a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/DataflintSparkUICommonLoader.scala +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/DataflintSparkUICommonLoader.scala @@ -53,6 +53,10 @@ class DataflintSparkUICommonInstaller extends Logging { val deltaLakeCacheZindexFieldsToProperties = context.conf.getBoolean("spark.dataflint.instrument.deltalake.cacheZindexFieldsToProperties", defaultValue = true) val deltaLakeHistoryLimit = context.conf.getInt("spark.dataflint.instrument.deltalake.historyLimit", defaultValue = 1000) val icebergAuthCatalogDiscovery = context.conf.getBoolean("spark.dataflint.iceberg.autoCatalogDiscovery", defaultValue = false) + val ioTrackingEnabled = context.conf.getBoolean(DataflintSparkUICommonLoader.IO_TRACKING_ENABLED, defaultValue = false) + if (ioTrackingEnabled) { + DataflintSparkUICommonLoader.registerIOListener(context) + } if(icebergInstalled && icebergEnabled) { if(icebergAuthCatalogDiscovery && isMetricLoaderInRightClassLoader()) { context.conf.getAll.filter(_._1.startsWith("spark.sql.catalog")).filter(keyValue => keyValue._2 == "org.apache.iceberg.spark.SparkCatalog" || keyValue._2 == "org.apache.iceberg.spark.SparkSessionCatalog").foreach(keyValue => { @@ -135,6 +139,8 @@ class DataflintSparkUICommonInstaller extends Logging { object DataflintSparkUICommonLoader extends Logging { private val DATAFLINT_EXTENSION_CLASS = "org.apache.spark.dataflint.DataFlintInstrumentationExtension" + private val DATAFLINT_IO_LISTENER_CLASS = "org.apache.spark.dataflint.listener.DataflintIOListener" + val IO_TRACKING_ENABLED = "spark.dataflint.io.tracking.enabled" val INSTRUMENT_SPARK_ENABLED = "spark.dataflint.instrument.spark.enabled" val INSTRUMENT_MAP_IN_PANDAS_ENABLED = "spark.dataflint.instrument.spark.mapInPandas.enabled" val INSTRUMENT_MAP_IN_ARROW_ENABLED = "spark.dataflint.instrument.spark.mapInArrow.enabled" @@ -205,4 +211,31 @@ object DataflintSparkUICommonLoader extends Logging { logWarning("Could not register DataFlint instrumentation extension", e) } } + + /** + * Append [[org.apache.spark.dataflint.listener.DataflintIOListener]] to + * `spark.sql.queryExecutionListeners` so Spark instantiates it when the + * SparkSession is built. The listener captures full read/write/save paths + * and table identifiers from the typed analyzed plan (bypassing + * SQLConf.maxToStringFields truncation of plan descriptions). + * + * This is in the org.apache.spark.dataflint package to access SparkContext.conf + * (which is private[spark]). + */ + def registerIOListener(sc: SparkContext): Unit = { + try { + val current = sc.conf.get("spark.sql.queryExecutionListeners", "") + if (current.contains(DATAFLINT_IO_LISTENER_CLASS)) { + logInfo("DataflintIOListener already registered in spark.sql.queryExecutionListeners") + } else { + val updated = if (current.isEmpty) DATAFLINT_IO_LISTENER_CLASS + else s"$current,$DATAFLINT_IO_LISTENER_CLASS" + sc.conf.set("spark.sql.queryExecutionListeners", updated) + logInfo("Registered DataflintIOListener in spark.sql.queryExecutionListeners") + } + } catch { + case e: Throwable => + logWarning("Could not register DataflintIOListener", e) + } + } } diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintIOExtractor.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintIOExtractor.scala new file mode 100644 index 00000000..acf3f59d --- /dev/null +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintIOExtractor.scala @@ -0,0 +1,132 @@ +package org.apache.spark.dataflint.listener + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, V2WriteCommand} +import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoHadoopFsRelationCommand, LogicalRelation, SaveIntoDataSourceCommand} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation + +import scala.util.Try + +/** + * Walks an analyzed [[LogicalPlan]] and pulls out read/write/save targets from + * the typed nodes (not from `simpleString` descriptions), so paths and table + * identifiers are captured at their full length regardless of + * SQLConf.maxToStringFields truncation. + */ +object DataflintIOExtractor extends Logging { + + /** Extract all read+write targets from an analyzed logical plan. */ + def extract(plan: LogicalPlan): Seq[DataflintIOTarget] = { + val out = collection.mutable.ArrayBuffer.empty[DataflintIOTarget] + Try { + plan.foreach { node => + Try(extractFromNode(node)).toOption.flatten.foreach(out += _) + } + }.recover { + case t: Throwable => logWarning("DataflintIOExtractor: failed to walk plan", t) + } + out.toSeq + } + + private def extractFromNode(node: LogicalPlan): Option[DataflintIOTarget] = node match { + + // --- READS --------------------------------------------------------------- + case lr: LogicalRelation => + val (paths, format) = lr.relation match { + case h: HadoopFsRelation => + (h.location.rootPaths.map(_.toString), h.fileFormat.getClass.getSimpleName) + case other => + (Seq.empty[String], other.getClass.getSimpleName) + } + val tableName = lr.catalogTable.map(_.identifier.unquotedString) + Some(DataflintIOTarget( + operation = "read", + format = format, + paths = paths, + tableName = tableName, + saveMode = None, + partitionColumns = lr.catalogTable.map(_.partitionColumnNames).getOrElse(Seq.empty), + options = Map.empty + )) + + case d: DataSourceV2Relation => + val tableName = d.identifier.map(_.toString).orElse(Option(d.table).map(_.name)) + val format = Option(d.table).map(_.getClass.getSimpleName).getOrElse("DataSourceV2") + val opts = safeOptionsMap(d.options.asCaseSensitiveMap()) + Some(DataflintIOTarget( + operation = "read", + format = format, + paths = optionalPath(opts), + tableName = tableName, + saveMode = None, + partitionColumns = Seq.empty, + options = opts + )) + + // --- WRITES (V1) --------------------------------------------------------- + case c: InsertIntoHadoopFsRelationCommand => + Some(DataflintIOTarget( + operation = "write", + format = c.fileFormat.getClass.getSimpleName, + paths = Seq(c.outputPath.toString), + tableName = c.catalogTable.map(_.identifier.unquotedString), + saveMode = Some(c.mode.toString), + partitionColumns = c.partitionColumns.map(_.name), + options = c.options + )) + + case c: SaveIntoDataSourceCommand => + val tableName = c.options.get("dbtable").orElse(c.options.get("table")) + Some(DataflintIOTarget( + operation = "write", + format = c.dataSource.getClass.getSimpleName, + paths = c.options.get("path").toSeq, + tableName = tableName, + saveMode = Some(c.mode.toString), + partitionColumns = Seq.empty, + options = c.options + )) + + case c: CreateDataSourceTableAsSelectCommand => + Some(DataflintIOTarget( + operation = "write", + format = c.table.provider.getOrElse(""), + paths = c.table.storage.locationUri.map(_.toString).toSeq, + tableName = Some(c.table.identifier.unquotedString), + saveMode = Some(c.mode.toString), + partitionColumns = c.table.partitionColumnNames, + options = c.table.storage.properties + )) + + // --- WRITES (V2: AppendData / OverwriteByExpression / OverwritePartitionsDynamic / ReplaceData) --- + case w: V2WriteCommand => + val named = Try(w.table).toOption + val tableName = named.flatMap(t => Option(t.name)) + val format = named.map(_.getClass.getSimpleName).getOrElse("V2Write") + Some(DataflintIOTarget( + operation = "write", + format = format, + paths = Seq.empty, + tableName = tableName, + saveMode = Some(w.getClass.getSimpleName), + partitionColumns = Seq.empty, + options = Map.empty + )) + + case _ => None + } + + /** Defensive conversion of a Spark CaseInsensitiveStringMap to a Scala Map. */ + private def safeOptionsMap(m: java.util.Map[String, String]): Map[String, String] = { + Try { + val it = m.entrySet().iterator() + val b = Map.newBuilder[String, String] + while (it.hasNext) { val e = it.next(); b += (e.getKey -> e.getValue) } + b.result() + }.getOrElse(Map.empty) + } + + private def optionalPath(opts: Map[String, String]): Seq[String] = + opts.get("path").orElse(opts.get("paths")).toSeq +} diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintIOListener.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintIOListener.scala new file mode 100644 index 00000000..896d5076 --- /dev/null +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintIOListener.scala @@ -0,0 +1,47 @@ +package org.apache.spark.dataflint.listener + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.util.QueryExecutionListener + +import scala.util.Try + +/** + * QueryExecutionListener that walks the analyzed logical plan after every action + * and posts a [[DataflintIOEvent]] with the full (untruncated) paths and table + * identifiers extracted from typed plan nodes. + * + * Registered automatically by [[org.apache.spark.dataflint.DataflintSparkUICommonLoader]] + * via `spark.sql.queryExecutionListeners` when `spark.dataflint.io.tracking.enabled=true`. + * + * Must have a no-arg constructor: Spark instantiates listeners listed in + * `spark.sql.queryExecutionListeners` via `Class.getConstructor()` reflection. + */ +class DataflintIOListener extends QueryExecutionListener with Logging { + + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + process(funcName, qe, durationNs) + } + + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { + // Still try — the plan was constructed even if execution failed. + process(funcName, qe, 0L) + } + + private def process(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + Try { + val targets = DataflintIOExtractor.extract(qe.analyzed) + if (targets.nonEmpty) { + val info = DataflintIOInfo( + executionId = qe.id, + sqlFuncName = funcName, + durationMs = durationNs / 1000000L, + targets = targets + ) + qe.sparkSession.sparkContext.listenerBus.post(DataflintIOEvent(info)) + } + }.recover { + case t: Throwable => logWarning("DataflintIOListener: failed to process query", t) + } + } +} diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintListener.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintListener.scala index 158ea192..91018df5 100644 --- a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintListener.scala +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintListener.scala @@ -26,6 +26,10 @@ class DataflintListener(store: ElementTrackingStore) extends SparkListener with val wrapper = new DataflintDeltaLakeScanInfoWrapper(e.scanInfo) store.write(wrapper) } + case e: DataflintIOEvent => { + val wrapper = new DataflintIOInfoWrapper(e.ioInfo) + store.write(wrapper) + } case _ => {} } } catch { diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/model.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/model.scala index 16494aae..0396f171 100644 --- a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/model.scala +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/model.scala @@ -91,3 +91,31 @@ class DataflintDeltaLakeScanInfoWrapper(val info: DataflintDeltaLakeScanInfo) { @JsonIgnore def id: String = s"${info.minExecutionId}_${info.tablePath.replaceAll(" ", "")}" } + +// Captured from the typed logical plan via QueryExecutionListener so paths/table +// identifiers are the full untruncated values (not the simpleString form Spark uses +// in plan descriptions, which gets cut at SQLConf.maxToStringFields). +case class DataflintIOTarget( + operation: String, // "read" | "write" + format: String, // "parquet", "delta", "iceberg", "kafka", ... + paths: Seq[String], + tableName: Option[String], + saveMode: Option[String], // SaveMode.toString for writes + partitionColumns: Seq[String], + options: Map[String, String] + ) + +case class DataflintIOInfo( + executionId: Long, + sqlFuncName: String, + durationMs: Long, + targets: Seq[DataflintIOTarget] + ) + +case class DataflintIOEvent(ioInfo: DataflintIOInfo) extends SparkListenerEvent + +class DataflintIOInfoWrapper(val info: DataflintIOInfo) { + @KVIndex + @JsonIgnore + def id: String = info.executionId.toString +} diff --git a/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintIOListenerSpec.scala b/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintIOListenerSpec.scala new file mode 100644 index 00000000..5d3d7740 --- /dev/null +++ b/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintIOListenerSpec.scala @@ -0,0 +1,99 @@ +package org.apache.spark.dataflint + +import java.util.concurrent.CopyOnWriteArrayList + +import org.apache.spark.dataflint.listener.{DataflintIOEvent, DataflintIOInfo, DataflintIOListener} +import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} +import org.apache.spark.sql.SparkSession +import org.scalatest.BeforeAndAfterAll +import org.scalatest.concurrent.Eventually +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.{Seconds, Span} + +/** + * Integration test: enable DataflintIOListener via spark.sql.queryExecutionListeners, + * run a write+read against a deeply-nested temp path, and verify that the listener + * captured the FULL path — i.e. nothing got cut by Spark's maxToStringFields-driven + * plan description truncation. + */ +class DataflintIOListenerSpec extends AnyFunSuite with Matchers with BeforeAndAfterAll with Eventually { + + private var spark: SparkSession = _ + private val captured = new CopyOnWriteArrayList[DataflintIOInfo]() + + private val collector = new SparkListener { + override def onOtherEvent(event: SparkListenerEvent): Unit = event match { + case e: DataflintIOEvent => captured.add(e.ioInfo) + case _ => + } + } + + override def beforeAll(): Unit = { + spark = SparkSession.builder() + .master("local[1]") + .appName("DataflintIOListenerSpec") + // Drive plan-description truncation hard, so a regression-test failure would + // be visible as a "..." path coming through. + .config("spark.sql.maxPlanStringLength", "32") + .config("spark.sql.adaptive.enabled", "false") + .config("spark.ui.enabled", "false") + .config("spark.sql.queryExecutionListeners", classOf[DataflintIOListener].getName) + .getOrCreate() + spark.sparkContext.addSparkListener(collector) + } + + override def afterAll(): Unit = { + if (spark != null) { + spark.sparkContext.removeSparkListener(collector) + spark.stop() + } + } + + private def deleteRecursively(f: java.io.File): Unit = { + if (f.isDirectory) Option(f.listFiles).foreach(_.foreach(deleteRecursively)) + f.delete() + } + + test("captures full write path through InsertIntoHadoopFsRelationCommand without truncation") { + val baseDir = java.nio.file.Files.createTempDirectory("dataflint-io").toFile + // Deliberately deep+long path so it would be truncated in plan descriptions. + val deepPath = new java.io.File(baseDir, + "a-very-long-and-deeply-nested-folder-name-aaaaaaaaaaaa/" + + "another-long-segment-bbbbbbbbbbbbbbbbbbbb/" + + "final-leaf-cccccccccccccccccccccccccccccc") + deepPath.getParentFile.mkdirs() + try { + captured.clear() + + val session = spark + import session.implicits._ + Seq(("a", 1), ("b", 2)).toDF("key", "value") + .write.mode("overwrite").parquet(deepPath.getAbsolutePath) + + val writeInfo = eventually(timeout(Span(10, Seconds))) { + val w = (0 until captured.size()).map(captured.get).find(_.targets.exists(_.operation == "write")) + w.getOrElse(fail(s"No write event captured. Got ${captured.size()} events.")) + } + + val writeTarget = writeInfo.targets.find(_.operation == "write").get + writeTarget.paths should have size 1 + // Crucial assertion: full path, no truncation marker. + writeTarget.paths.head should include (deepPath.getName) + writeTarget.paths.head should not include "..." + writeTarget.saveMode shouldBe Some("Overwrite") + + // Now read it back and verify the read target carries the full path too. + spark.read.parquet(deepPath.getAbsolutePath).collect() + + val readInfo = eventually(timeout(Span(10, Seconds))) { + val r = (0 until captured.size()).map(captured.get).find(_.targets.exists(_.operation == "read")) + r.getOrElse(fail("No read event captured.")) + } + val readTarget = readInfo.targets.find(_.operation == "read").get + readTarget.paths.exists(p => p.contains(deepPath.getName) && !p.endsWith("...")) shouldBe true + } finally { + deleteRecursively(baseDir) + } + } +} From ef8063c7a19ea4ae6d4db1282ac19da9fda1c678 Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 23 May 2026 16:54:44 +0000 Subject: [PATCH 02/16] test: run DataflintIOListenerSpec under pluginspark4 as well MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The spec only touches version-stable Spark surface (SparkSession, DataFrameWriter.parquet, SparkListener.onOtherEvent, QueryExecutionListener via spark.sql.queryExecutionListeners), so add it to the cross-version test source list alongside DataFlintCodegenFallbackSpec — gives us the same truncation-resistance guarantee against Spark 4.0.1. https://claude.ai/code/session_01DrzKAGkGTXdfbNYVGQdCwB --- spark-plugin/build.sbt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spark-plugin/build.sbt b/spark-plugin/build.sbt index 5715a85a..453aa90e 100644 --- a/spark-plugin/build.sbt +++ b/spark-plugin/build.sbt @@ -183,7 +183,8 @@ lazy val pluginspark4 = (project in file("pluginspark4")) Test / unmanagedSources ++= { val pluginspark3Tests = (pluginspark3 / Test / sourceDirectory).value / "scala" Seq( - pluginspark3Tests / "org" / "apache" / "spark" / "dataflint" / "DataFlintCodegenFallbackSpec.scala" + pluginspark3Tests / "org" / "apache" / "spark" / "dataflint" / "DataFlintCodegenFallbackSpec.scala", + pluginspark3Tests / "org" / "apache" / "spark" / "dataflint" / "DataflintIOListenerSpec.scala" ) }, From 80924fa2f32b38d9fcf372462ea7f1d899dc4283 Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 23 May 2026 17:05:55 +0000 Subject: [PATCH 03/16] test: drop scalatest-concurrent dependency from DataflintIOListenerSpec MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit pluginspark3/4 only pull scalatest-funsuite + scalatest-shouldmatchers; neither transitively brings org.scalatest.concurrent.Eventually, so the spec failed to compile in CI. Replace eventually(timeout(...)) with LiveListenerBus.waitUntilEmpty (the standard Spark test-drain primitive), keeping the same intent — wait for the async listener queue to flush before asserting. --- .../dataflint/DataflintIOListenerSpec.scala | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintIOListenerSpec.scala b/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintIOListenerSpec.scala index 5d3d7740..ca84de2a 100644 --- a/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintIOListenerSpec.scala +++ b/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintIOListenerSpec.scala @@ -6,10 +6,8 @@ import org.apache.spark.dataflint.listener.{DataflintIOEvent, DataflintIOInfo, D import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} import org.apache.spark.sql.SparkSession import org.scalatest.BeforeAndAfterAll -import org.scalatest.concurrent.Eventually import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers -import org.scalatest.time.{Seconds, Span} /** * Integration test: enable DataflintIOListener via spark.sql.queryExecutionListeners, @@ -17,7 +15,7 @@ import org.scalatest.time.{Seconds, Span} * captured the FULL path — i.e. nothing got cut by Spark's maxToStringFields-driven * plan description truncation. */ -class DataflintIOListenerSpec extends AnyFunSuite with Matchers with BeforeAndAfterAll with Eventually { +class DataflintIOListenerSpec extends AnyFunSuite with Matchers with BeforeAndAfterAll { private var spark: SparkSession = _ private val captured = new CopyOnWriteArrayList[DataflintIOInfo]() @@ -55,6 +53,14 @@ class DataflintIOListenerSpec extends AnyFunSuite with Matchers with BeforeAndAf f.delete() } + /** Drain the async listener bus so any pending DataflintIOEvent is delivered to the collector. */ + private def drain(): Unit = { + spark.sparkContext.listenerBus.waitUntilEmpty(10000) + } + + private def find(op: String): Option[DataflintIOInfo] = + (0 until captured.size()).map(captured.get).find(_.targets.exists(_.operation == op)) + test("captures full write path through InsertIntoHadoopFsRelationCommand without truncation") { val baseDir = java.nio.file.Files.createTempDirectory("dataflint-io").toFile // Deliberately deep+long path so it would be truncated in plan descriptions. @@ -71,10 +77,9 @@ class DataflintIOListenerSpec extends AnyFunSuite with Matchers with BeforeAndAf Seq(("a", 1), ("b", 2)).toDF("key", "value") .write.mode("overwrite").parquet(deepPath.getAbsolutePath) - val writeInfo = eventually(timeout(Span(10, Seconds))) { - val w = (0 until captured.size()).map(captured.get).find(_.targets.exists(_.operation == "write")) - w.getOrElse(fail(s"No write event captured. Got ${captured.size()} events.")) - } + drain() + val writeInfo = find("write") + .getOrElse(fail(s"No write event captured. Got ${captured.size()} events.")) val writeTarget = writeInfo.targets.find(_.operation == "write").get writeTarget.paths should have size 1 @@ -86,10 +91,8 @@ class DataflintIOListenerSpec extends AnyFunSuite with Matchers with BeforeAndAf // Now read it back and verify the read target carries the full path too. spark.read.parquet(deepPath.getAbsolutePath).collect() - val readInfo = eventually(timeout(Span(10, Seconds))) { - val r = (0 until captured.size()).map(captured.get).find(_.targets.exists(_.operation == "read")) - r.getOrElse(fail("No read event captured.")) - } + drain() + val readInfo = find("read").getOrElse(fail("No read event captured.")) val readTarget = readInfo.targets.find(_.operation == "read").get readTarget.paths.exists(p => p.contains(deepPath.getName) && !p.endsWith("...")) shouldBe true } finally { From 8323a3686efae64c1f21d84d60eadd3c60a930e5 Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 23 May 2026 17:32:48 +0000 Subject: [PATCH 04/16] feat: generic plan-node translation for truncated descriptions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds DataflintPlanTranslationListener — a QueryExecutionListener that walks every node in qe.executedPlan, compares node.simpleString(maxToStringFields) (what SparkUI shows) to node.simpleString(Int.MaxValue) (truncation disabled), and emits a DataflintPlanTranslationsEvent per query carrying the full rendering for each node whose description got cut. Format-agnostic: no per-node-type extraction code, works for any plan node that implements simpleString correctly. Sparse: nodes that fit emit nothing and cost one String#equals. Pre-order nodeIndex matches SparkPlanGraph's indexing so consumers can match translations back to the SparkUI plan view. Gated behind spark.dataflint.plan.translation.enabled (default false); registers via spark.sql.queryExecutionListeners using the shared appendQueryExecutionListener helper extracted from registerIOListener. DataflintListener dispatches the new event into the existing ElementTrackingStore keyed by executionId. Integration spec drives spark.sql.maxToStringFields=2 + a 20-column Project and asserts the listener emitted a translation whose full form contains every column name while the truncated form does not, also included in pluginspark4 cross-version test set. https://claude.ai/code/session_01DrzKAGkGTXdfbNYVGQdCwB --- spark-plugin/build.sbt | 3 +- .../DataflintSparkUICommonLoader.scala | 30 +++++-- .../listener/DataflintListener.scala | 4 + .../DataflintPlanTranslationListener.scala | 64 ++++++++++++++ .../spark/dataflint/listener/model.scala | 24 +++++ ...DataflintPlanTranslationListenerSpec.scala | 88 +++++++++++++++++++ 6 files changed, 205 insertions(+), 8 deletions(-) create mode 100644 spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintPlanTranslationListener.scala create mode 100644 spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintPlanTranslationListenerSpec.scala diff --git a/spark-plugin/build.sbt b/spark-plugin/build.sbt index 453aa90e..16f419d4 100644 --- a/spark-plugin/build.sbt +++ b/spark-plugin/build.sbt @@ -184,7 +184,8 @@ lazy val pluginspark4 = (project in file("pluginspark4")) val pluginspark3Tests = (pluginspark3 / Test / sourceDirectory).value / "scala" Seq( pluginspark3Tests / "org" / "apache" / "spark" / "dataflint" / "DataFlintCodegenFallbackSpec.scala", - pluginspark3Tests / "org" / "apache" / "spark" / "dataflint" / "DataflintIOListenerSpec.scala" + pluginspark3Tests / "org" / "apache" / "spark" / "dataflint" / "DataflintIOListenerSpec.scala", + pluginspark3Tests / "org" / "apache" / "spark" / "dataflint" / "DataflintPlanTranslationListenerSpec.scala" ) }, diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/DataflintSparkUICommonLoader.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/DataflintSparkUICommonLoader.scala index 867e2888..84e581f6 100644 --- a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/DataflintSparkUICommonLoader.scala +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/DataflintSparkUICommonLoader.scala @@ -57,6 +57,10 @@ class DataflintSparkUICommonInstaller extends Logging { if (ioTrackingEnabled) { DataflintSparkUICommonLoader.registerIOListener(context) } + val planTranslationEnabled = context.conf.getBoolean(DataflintSparkUICommonLoader.PLAN_TRANSLATION_ENABLED, defaultValue = false) + if (planTranslationEnabled) { + DataflintSparkUICommonLoader.registerPlanTranslationListener(context) + } if(icebergInstalled && icebergEnabled) { if(icebergAuthCatalogDiscovery && isMetricLoaderInRightClassLoader()) { context.conf.getAll.filter(_._1.startsWith("spark.sql.catalog")).filter(keyValue => keyValue._2 == "org.apache.iceberg.spark.SparkCatalog" || keyValue._2 == "org.apache.iceberg.spark.SparkSessionCatalog").foreach(keyValue => { @@ -140,7 +144,9 @@ object DataflintSparkUICommonLoader extends Logging { private val DATAFLINT_EXTENSION_CLASS = "org.apache.spark.dataflint.DataFlintInstrumentationExtension" private val DATAFLINT_IO_LISTENER_CLASS = "org.apache.spark.dataflint.listener.DataflintIOListener" + private val DATAFLINT_PLAN_TRANSLATION_LISTENER_CLASS = "org.apache.spark.dataflint.listener.DataflintPlanTranslationListener" val IO_TRACKING_ENABLED = "spark.dataflint.io.tracking.enabled" + val PLAN_TRANSLATION_ENABLED = "spark.dataflint.plan.translation.enabled" val INSTRUMENT_SPARK_ENABLED = "spark.dataflint.instrument.spark.enabled" val INSTRUMENT_MAP_IN_PANDAS_ENABLED = "spark.dataflint.instrument.spark.mapInPandas.enabled" val INSTRUMENT_MAP_IN_ARROW_ENABLED = "spark.dataflint.instrument.spark.mapInArrow.enabled" @@ -222,20 +228,30 @@ object DataflintSparkUICommonLoader extends Logging { * This is in the org.apache.spark.dataflint package to access SparkContext.conf * (which is private[spark]). */ - def registerIOListener(sc: SparkContext): Unit = { + def registerIOListener(sc: SparkContext): Unit = + appendQueryExecutionListener(sc, DATAFLINT_IO_LISTENER_CLASS, "DataflintIOListener") + + /** + * Append [[org.apache.spark.dataflint.listener.DataflintPlanTranslationListener]] + * to `spark.sql.queryExecutionListeners`. Captures full simpleString rendering + * for any plan node whose description got cut by SQLConf.maxToStringFields. + */ + def registerPlanTranslationListener(sc: SparkContext): Unit = + appendQueryExecutionListener(sc, DATAFLINT_PLAN_TRANSLATION_LISTENER_CLASS, "DataflintPlanTranslationListener") + + private def appendQueryExecutionListener(sc: SparkContext, className: String, displayName: String): Unit = { try { val current = sc.conf.get("spark.sql.queryExecutionListeners", "") - if (current.contains(DATAFLINT_IO_LISTENER_CLASS)) { - logInfo("DataflintIOListener already registered in spark.sql.queryExecutionListeners") + if (current.contains(className)) { + logInfo(s"$displayName already registered in spark.sql.queryExecutionListeners") } else { - val updated = if (current.isEmpty) DATAFLINT_IO_LISTENER_CLASS - else s"$current,$DATAFLINT_IO_LISTENER_CLASS" + val updated = if (current.isEmpty) className else s"$current,$className" sc.conf.set("spark.sql.queryExecutionListeners", updated) - logInfo("Registered DataflintIOListener in spark.sql.queryExecutionListeners") + logInfo(s"Registered $displayName in spark.sql.queryExecutionListeners") } } catch { case e: Throwable => - logWarning("Could not register DataflintIOListener", e) + logWarning(s"Could not register $displayName", e) } } } diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintListener.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintListener.scala index 91018df5..250f9b89 100644 --- a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintListener.scala +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintListener.scala @@ -30,6 +30,10 @@ class DataflintListener(store: ElementTrackingStore) extends SparkListener with val wrapper = new DataflintIOInfoWrapper(e.ioInfo) store.write(wrapper) } + case e: DataflintPlanTranslationsEvent => { + val wrapper = new DataflintPlanTranslationsInfoWrapper(e.info) + store.write(wrapper) + } case _ => {} } } catch { diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintPlanTranslationListener.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintPlanTranslationListener.scala new file mode 100644 index 00000000..850ec212 --- /dev/null +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintPlanTranslationListener.scala @@ -0,0 +1,64 @@ +package org.apache.spark.dataflint.listener + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.util.QueryExecutionListener + +import scala.util.Try + +/** + * Generic plan-truncation listener: for every node in `qe.executedPlan`, compare + * `simpleString(maxToStringFields)` (the form SparkUI shows) to + * `simpleString(Int.MaxValue)` (truncation disabled). When they differ, emit the + * full rendering as a [[DataflintPlanNodeTranslation]] keyed by pre-order + * `nodeIndex` — the same indexing scheme SparkPlanGraph uses, so consumers can + * look up translations for nodes whose description got cut. + * + * Format-agnostic: works for FileScan, Project, Aggregate, Window, custom nodes — + * anything that overrides `simpleString` properly. Sparse: nodes that fit emit + * nothing and cost one String#equals. + * + * Registered automatically by [[org.apache.spark.dataflint.DataflintSparkUICommonLoader]] + * via `spark.sql.queryExecutionListeners` when + * `spark.dataflint.plan.translation.enabled=true`. + * + * Must have a no-arg constructor: Spark instantiates listeners listed in + * `spark.sql.queryExecutionListeners` via `Class.getConstructor()` reflection. + */ +class DataflintPlanTranslationListener extends QueryExecutionListener with Logging { + + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + process(qe) + } + + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { + process(qe) + } + + private def process(qe: QueryExecution): Unit = { + Try { + val maxFields = qe.sparkSession.sessionState.conf.maxToStringFields + val translations = collection.mutable.ArrayBuffer.empty[DataflintPlanNodeTranslation] + var idx = 0 + qe.executedPlan.foreach { node => + val truncated = Try(node.simpleString(maxFields)).getOrElse("") + val full = Try(node.simpleString(Int.MaxValue)).getOrElse(truncated) + if (truncated != full) { + translations += DataflintPlanNodeTranslation( + nodeIndex = idx, + nodeName = node.nodeName, + truncatedDescription = truncated, + fullDescription = full + ) + } + idx += 1 + } + if (translations.nonEmpty) { + val info = DataflintPlanTranslationsInfo(qe.id, translations.toSeq) + qe.sparkSession.sparkContext.listenerBus.post(DataflintPlanTranslationsEvent(info)) + } + }.recover { + case t: Throwable => logWarning("DataflintPlanTranslationListener: failed to process query", t) + } + } +} diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/model.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/model.scala index 0396f171..fc49d13c 100644 --- a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/model.scala +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/model.scala @@ -119,3 +119,27 @@ class DataflintIOInfoWrapper(val info: DataflintIOInfo) { @JsonIgnore def id: String = info.executionId.toString } + +// Per-node "untruncated rendering" for plan nodes whose simpleString got cut +// by SQLConf.maxToStringFields. The truncatedDescription matches the string +// SparkPlanGraph carries on its node, so a consumer can look up the translation +// by description equality and/or by nodeIndex (pre-order position). +case class DataflintPlanNodeTranslation( + nodeIndex: Int, + nodeName: String, + truncatedDescription: String, + fullDescription: String + ) + +case class DataflintPlanTranslationsInfo( + executionId: Long, + translations: Seq[DataflintPlanNodeTranslation] + ) + +case class DataflintPlanTranslationsEvent(info: DataflintPlanTranslationsInfo) extends SparkListenerEvent + +class DataflintPlanTranslationsInfoWrapper(val info: DataflintPlanTranslationsInfo) { + @KVIndex + @JsonIgnore + def id: String = info.executionId.toString +} diff --git a/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintPlanTranslationListenerSpec.scala b/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintPlanTranslationListenerSpec.scala new file mode 100644 index 00000000..607a8d33 --- /dev/null +++ b/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintPlanTranslationListenerSpec.scala @@ -0,0 +1,88 @@ +package org.apache.spark.dataflint + +import java.util.concurrent.CopyOnWriteArrayList + +import org.apache.spark.dataflint.listener.{DataflintPlanTranslationListener, DataflintPlanTranslationsEvent, DataflintPlanTranslationsInfo} +import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions.lit +import org.scalatest.BeforeAndAfterAll +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers + +/** + * Drive Spark's field-list truncation hard (maxToStringFields=2), build a Project + * with many output columns, run an action, and assert the listener emitted a + * translation whose truncated form carries the SparkUI "... N more fields" + * marker while the full form contains every column name. + */ +class DataflintPlanTranslationListenerSpec extends AnyFunSuite with Matchers with BeforeAndAfterAll { + + private var spark: SparkSession = _ + private val captured = new CopyOnWriteArrayList[DataflintPlanTranslationsInfo]() + + private val collector = new SparkListener { + override def onOtherEvent(event: SparkListenerEvent): Unit = event match { + case e: DataflintPlanTranslationsEvent => captured.add(e.info) + case _ => + } + } + + override def beforeAll(): Unit = { + spark = SparkSession.builder() + .master("local[1]") + .appName("DataflintPlanTranslationListenerSpec") + // Aggressively truncate field lists so even a small Project produces a translation. + .config("spark.sql.maxToStringFields", "2") + .config("spark.sql.adaptive.enabled", "false") + .config("spark.ui.enabled", "false") + .config("spark.sql.queryExecutionListeners", classOf[DataflintPlanTranslationListener].getName) + .getOrCreate() + spark.sparkContext.addSparkListener(collector) + } + + override def afterAll(): Unit = { + if (spark != null) { + spark.sparkContext.removeSparkListener(collector) + spark.stop() + } + } + + private def drain(): Unit = spark.sparkContext.listenerBus.waitUntilEmpty(10000) + + test("emits full simpleString for a Project whose field list got truncated") { + captured.clear() + + val session = spark + import session.implicits._ + + val columnNames = (1 to 20).map(i => s"column_with_long_recognizable_name_$i") + val projection = columnNames.map(n => lit(0).as(n)) + Seq(1).toDF("seed").select(projection: _*).collect() + + drain() + + val info = (0 until captured.size()).map(captured.get).headOption + .getOrElse(fail(s"No translation event captured. Got ${captured.size()} events.")) + + info.translations should not be empty + + // Every emitted translation must (a) be detected as truncated and + // (b) fully render in the expanded form. + info.translations.foreach { t => + t.truncatedDescription should not be t.fullDescription + t.fullDescription.length should be > t.truncatedDescription.length + } + + // At least one translation should cover the Project carrying our columns: + // the full form contains the last column name; the truncated one does not. + val lastCol = columnNames.last + val projection = info.translations.find { t => + t.fullDescription.contains(lastCol) && !t.truncatedDescription.contains(lastCol) + } + projection.getOrElse( + fail(s"No translation contains the last column name '$lastCol' only in the full form.\n" + + info.translations.map(t => s" ${t.nodeName}: trunc=${t.truncatedDescription.take(80)}... full=${t.fullDescription.take(80)}...").mkString("\n")) + ) + } +} From f2b03d5317e76600015172b982f73281fdf2fa97 Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 23 May 2026 17:52:23 +0000 Subject: [PATCH 05/16] test: clear active/default SparkSession on teardown MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sbt runs all specs in a single forked JVM. Stopping the SparkContext via spark.stop() doesn't always clear SparkSession.activeThreadLocal / defaultSession, so a later spec calling SparkSession.builder().getOrCreate() can pick up a leaked session — making any configs we set (spark.sql.queryExecutionListeners, spark.sql.maxToStringFields=2, spark.sql.maxPlanStringLength=32) bleed into unrelated suites. Clear both sessions explicitly so each spec starts with a clean slate. --- .../org/apache/spark/dataflint/DataflintIOListenerSpec.scala | 2 ++ .../spark/dataflint/DataflintPlanTranslationListenerSpec.scala | 2 ++ 2 files changed, 4 insertions(+) diff --git a/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintIOListenerSpec.scala b/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintIOListenerSpec.scala index ca84de2a..85fe45c1 100644 --- a/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintIOListenerSpec.scala +++ b/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintIOListenerSpec.scala @@ -45,6 +45,8 @@ class DataflintIOListenerSpec extends AnyFunSuite with Matchers with BeforeAndAf if (spark != null) { spark.sparkContext.removeSparkListener(collector) spark.stop() + SparkSession.clearActiveSession() + SparkSession.clearDefaultSession() } } diff --git a/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintPlanTranslationListenerSpec.scala b/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintPlanTranslationListenerSpec.scala index 607a8d33..5148e587 100644 --- a/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintPlanTranslationListenerSpec.scala +++ b/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintPlanTranslationListenerSpec.scala @@ -45,6 +45,8 @@ class DataflintPlanTranslationListenerSpec extends AnyFunSuite with Matchers wit if (spark != null) { spark.sparkContext.removeSparkListener(collector) spark.stop() + SparkSession.clearActiveSession() + SparkSession.clearDefaultSession() } } From baf95825cfc3d967b96c1da5bbc8737eeab511a1 Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 23 May 2026 18:22:06 +0000 Subject: [PATCH 06/16] fix: rename shadowing val in DataflintPlanTranslationListenerSpec The user-edited test introduced a `val projection: Seq[Column]` at the top of the test body; the later `val projection = info.translations.find` shadowed it and confused the compiler about which symbol the `select(...)` expansion referred to. Renamed the second val to projectionTranslation. --- .../dataflint/DataflintPlanTranslationListenerSpec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintPlanTranslationListenerSpec.scala b/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintPlanTranslationListenerSpec.scala index 5148e587..3936916b 100644 --- a/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintPlanTranslationListenerSpec.scala +++ b/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintPlanTranslationListenerSpec.scala @@ -79,10 +79,10 @@ class DataflintPlanTranslationListenerSpec extends AnyFunSuite with Matchers wit // At least one translation should cover the Project carrying our columns: // the full form contains the last column name; the truncated one does not. val lastCol = columnNames.last - val projection = info.translations.find { t => + val projectionTranslation = info.translations.find { t => t.fullDescription.contains(lastCol) && !t.truncatedDescription.contains(lastCol) } - projection.getOrElse( + projectionTranslation.getOrElse( fail(s"No translation contains the last column name '$lastCol' only in the full form.\n" + info.translations.map(t => s" ${t.nodeName}: trunc=${t.truncatedDescription.take(80)}... full=${t.fullDescription.take(80)}...").mkString("\n")) ) From ecf2179b68c9684d7afc02a7a64b5a355fe8655c Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 24 May 2026 05:06:25 +0000 Subject: [PATCH 07/16] revert: drop DataflintPlanTranslationListener MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The translation listener duplicated full plan strings into the event log (SparkListenerSQLExecutionStart already carried the truncated form, we were adding a parallel event with the full form). Users who want full plan descriptions can simply raise spark.sql.maxToStringFields — that only affects plan rendering paths and stores the full strings once. Removes the listener, event/model classes, dispatcher case, flag/loader wiring, and integration spec. Keeps the IO listener intact. --- spark-plugin/build.sbt | 3 +- .../DataflintSparkUICommonLoader.scala | 30 ++----- .../listener/DataflintListener.scala | 4 - .../DataflintPlanTranslationListener.scala | 64 ------------- .../spark/dataflint/listener/model.scala | 24 ----- ...DataflintPlanTranslationListenerSpec.scala | 90 ------------------- 6 files changed, 8 insertions(+), 207 deletions(-) delete mode 100644 spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintPlanTranslationListener.scala delete mode 100644 spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintPlanTranslationListenerSpec.scala diff --git a/spark-plugin/build.sbt b/spark-plugin/build.sbt index 16f419d4..453aa90e 100644 --- a/spark-plugin/build.sbt +++ b/spark-plugin/build.sbt @@ -184,8 +184,7 @@ lazy val pluginspark4 = (project in file("pluginspark4")) val pluginspark3Tests = (pluginspark3 / Test / sourceDirectory).value / "scala" Seq( pluginspark3Tests / "org" / "apache" / "spark" / "dataflint" / "DataFlintCodegenFallbackSpec.scala", - pluginspark3Tests / "org" / "apache" / "spark" / "dataflint" / "DataflintIOListenerSpec.scala", - pluginspark3Tests / "org" / "apache" / "spark" / "dataflint" / "DataflintPlanTranslationListenerSpec.scala" + pluginspark3Tests / "org" / "apache" / "spark" / "dataflint" / "DataflintIOListenerSpec.scala" ) }, diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/DataflintSparkUICommonLoader.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/DataflintSparkUICommonLoader.scala index 84e581f6..867e2888 100644 --- a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/DataflintSparkUICommonLoader.scala +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/DataflintSparkUICommonLoader.scala @@ -57,10 +57,6 @@ class DataflintSparkUICommonInstaller extends Logging { if (ioTrackingEnabled) { DataflintSparkUICommonLoader.registerIOListener(context) } - val planTranslationEnabled = context.conf.getBoolean(DataflintSparkUICommonLoader.PLAN_TRANSLATION_ENABLED, defaultValue = false) - if (planTranslationEnabled) { - DataflintSparkUICommonLoader.registerPlanTranslationListener(context) - } if(icebergInstalled && icebergEnabled) { if(icebergAuthCatalogDiscovery && isMetricLoaderInRightClassLoader()) { context.conf.getAll.filter(_._1.startsWith("spark.sql.catalog")).filter(keyValue => keyValue._2 == "org.apache.iceberg.spark.SparkCatalog" || keyValue._2 == "org.apache.iceberg.spark.SparkSessionCatalog").foreach(keyValue => { @@ -144,9 +140,7 @@ object DataflintSparkUICommonLoader extends Logging { private val DATAFLINT_EXTENSION_CLASS = "org.apache.spark.dataflint.DataFlintInstrumentationExtension" private val DATAFLINT_IO_LISTENER_CLASS = "org.apache.spark.dataflint.listener.DataflintIOListener" - private val DATAFLINT_PLAN_TRANSLATION_LISTENER_CLASS = "org.apache.spark.dataflint.listener.DataflintPlanTranslationListener" val IO_TRACKING_ENABLED = "spark.dataflint.io.tracking.enabled" - val PLAN_TRANSLATION_ENABLED = "spark.dataflint.plan.translation.enabled" val INSTRUMENT_SPARK_ENABLED = "spark.dataflint.instrument.spark.enabled" val INSTRUMENT_MAP_IN_PANDAS_ENABLED = "spark.dataflint.instrument.spark.mapInPandas.enabled" val INSTRUMENT_MAP_IN_ARROW_ENABLED = "spark.dataflint.instrument.spark.mapInArrow.enabled" @@ -228,30 +222,20 @@ object DataflintSparkUICommonLoader extends Logging { * This is in the org.apache.spark.dataflint package to access SparkContext.conf * (which is private[spark]). */ - def registerIOListener(sc: SparkContext): Unit = - appendQueryExecutionListener(sc, DATAFLINT_IO_LISTENER_CLASS, "DataflintIOListener") - - /** - * Append [[org.apache.spark.dataflint.listener.DataflintPlanTranslationListener]] - * to `spark.sql.queryExecutionListeners`. Captures full simpleString rendering - * for any plan node whose description got cut by SQLConf.maxToStringFields. - */ - def registerPlanTranslationListener(sc: SparkContext): Unit = - appendQueryExecutionListener(sc, DATAFLINT_PLAN_TRANSLATION_LISTENER_CLASS, "DataflintPlanTranslationListener") - - private def appendQueryExecutionListener(sc: SparkContext, className: String, displayName: String): Unit = { + def registerIOListener(sc: SparkContext): Unit = { try { val current = sc.conf.get("spark.sql.queryExecutionListeners", "") - if (current.contains(className)) { - logInfo(s"$displayName already registered in spark.sql.queryExecutionListeners") + if (current.contains(DATAFLINT_IO_LISTENER_CLASS)) { + logInfo("DataflintIOListener already registered in spark.sql.queryExecutionListeners") } else { - val updated = if (current.isEmpty) className else s"$current,$className" + val updated = if (current.isEmpty) DATAFLINT_IO_LISTENER_CLASS + else s"$current,$DATAFLINT_IO_LISTENER_CLASS" sc.conf.set("spark.sql.queryExecutionListeners", updated) - logInfo(s"Registered $displayName in spark.sql.queryExecutionListeners") + logInfo("Registered DataflintIOListener in spark.sql.queryExecutionListeners") } } catch { case e: Throwable => - logWarning(s"Could not register $displayName", e) + logWarning("Could not register DataflintIOListener", e) } } } diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintListener.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintListener.scala index 250f9b89..91018df5 100644 --- a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintListener.scala +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintListener.scala @@ -30,10 +30,6 @@ class DataflintListener(store: ElementTrackingStore) extends SparkListener with val wrapper = new DataflintIOInfoWrapper(e.ioInfo) store.write(wrapper) } - case e: DataflintPlanTranslationsEvent => { - val wrapper = new DataflintPlanTranslationsInfoWrapper(e.info) - store.write(wrapper) - } case _ => {} } } catch { diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintPlanTranslationListener.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintPlanTranslationListener.scala deleted file mode 100644 index 850ec212..00000000 --- a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintPlanTranslationListener.scala +++ /dev/null @@ -1,64 +0,0 @@ -package org.apache.spark.dataflint.listener - -import org.apache.spark.internal.Logging -import org.apache.spark.sql.execution.QueryExecution -import org.apache.spark.sql.util.QueryExecutionListener - -import scala.util.Try - -/** - * Generic plan-truncation listener: for every node in `qe.executedPlan`, compare - * `simpleString(maxToStringFields)` (the form SparkUI shows) to - * `simpleString(Int.MaxValue)` (truncation disabled). When they differ, emit the - * full rendering as a [[DataflintPlanNodeTranslation]] keyed by pre-order - * `nodeIndex` — the same indexing scheme SparkPlanGraph uses, so consumers can - * look up translations for nodes whose description got cut. - * - * Format-agnostic: works for FileScan, Project, Aggregate, Window, custom nodes — - * anything that overrides `simpleString` properly. Sparse: nodes that fit emit - * nothing and cost one String#equals. - * - * Registered automatically by [[org.apache.spark.dataflint.DataflintSparkUICommonLoader]] - * via `spark.sql.queryExecutionListeners` when - * `spark.dataflint.plan.translation.enabled=true`. - * - * Must have a no-arg constructor: Spark instantiates listeners listed in - * `spark.sql.queryExecutionListeners` via `Class.getConstructor()` reflection. - */ -class DataflintPlanTranslationListener extends QueryExecutionListener with Logging { - - override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { - process(qe) - } - - override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { - process(qe) - } - - private def process(qe: QueryExecution): Unit = { - Try { - val maxFields = qe.sparkSession.sessionState.conf.maxToStringFields - val translations = collection.mutable.ArrayBuffer.empty[DataflintPlanNodeTranslation] - var idx = 0 - qe.executedPlan.foreach { node => - val truncated = Try(node.simpleString(maxFields)).getOrElse("") - val full = Try(node.simpleString(Int.MaxValue)).getOrElse(truncated) - if (truncated != full) { - translations += DataflintPlanNodeTranslation( - nodeIndex = idx, - nodeName = node.nodeName, - truncatedDescription = truncated, - fullDescription = full - ) - } - idx += 1 - } - if (translations.nonEmpty) { - val info = DataflintPlanTranslationsInfo(qe.id, translations.toSeq) - qe.sparkSession.sparkContext.listenerBus.post(DataflintPlanTranslationsEvent(info)) - } - }.recover { - case t: Throwable => logWarning("DataflintPlanTranslationListener: failed to process query", t) - } - } -} diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/model.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/model.scala index fc49d13c..0396f171 100644 --- a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/model.scala +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/model.scala @@ -119,27 +119,3 @@ class DataflintIOInfoWrapper(val info: DataflintIOInfo) { @JsonIgnore def id: String = info.executionId.toString } - -// Per-node "untruncated rendering" for plan nodes whose simpleString got cut -// by SQLConf.maxToStringFields. The truncatedDescription matches the string -// SparkPlanGraph carries on its node, so a consumer can look up the translation -// by description equality and/or by nodeIndex (pre-order position). -case class DataflintPlanNodeTranslation( - nodeIndex: Int, - nodeName: String, - truncatedDescription: String, - fullDescription: String - ) - -case class DataflintPlanTranslationsInfo( - executionId: Long, - translations: Seq[DataflintPlanNodeTranslation] - ) - -case class DataflintPlanTranslationsEvent(info: DataflintPlanTranslationsInfo) extends SparkListenerEvent - -class DataflintPlanTranslationsInfoWrapper(val info: DataflintPlanTranslationsInfo) { - @KVIndex - @JsonIgnore - def id: String = info.executionId.toString -} diff --git a/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintPlanTranslationListenerSpec.scala b/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintPlanTranslationListenerSpec.scala deleted file mode 100644 index 3936916b..00000000 --- a/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintPlanTranslationListenerSpec.scala +++ /dev/null @@ -1,90 +0,0 @@ -package org.apache.spark.dataflint - -import java.util.concurrent.CopyOnWriteArrayList - -import org.apache.spark.dataflint.listener.{DataflintPlanTranslationListener, DataflintPlanTranslationsEvent, DataflintPlanTranslationsInfo} -import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.functions.lit -import org.scalatest.BeforeAndAfterAll -import org.scalatest.funsuite.AnyFunSuite -import org.scalatest.matchers.should.Matchers - -/** - * Drive Spark's field-list truncation hard (maxToStringFields=2), build a Project - * with many output columns, run an action, and assert the listener emitted a - * translation whose truncated form carries the SparkUI "... N more fields" - * marker while the full form contains every column name. - */ -class DataflintPlanTranslationListenerSpec extends AnyFunSuite with Matchers with BeforeAndAfterAll { - - private var spark: SparkSession = _ - private val captured = new CopyOnWriteArrayList[DataflintPlanTranslationsInfo]() - - private val collector = new SparkListener { - override def onOtherEvent(event: SparkListenerEvent): Unit = event match { - case e: DataflintPlanTranslationsEvent => captured.add(e.info) - case _ => - } - } - - override def beforeAll(): Unit = { - spark = SparkSession.builder() - .master("local[1]") - .appName("DataflintPlanTranslationListenerSpec") - // Aggressively truncate field lists so even a small Project produces a translation. - .config("spark.sql.maxToStringFields", "2") - .config("spark.sql.adaptive.enabled", "false") - .config("spark.ui.enabled", "false") - .config("spark.sql.queryExecutionListeners", classOf[DataflintPlanTranslationListener].getName) - .getOrCreate() - spark.sparkContext.addSparkListener(collector) - } - - override def afterAll(): Unit = { - if (spark != null) { - spark.sparkContext.removeSparkListener(collector) - spark.stop() - SparkSession.clearActiveSession() - SparkSession.clearDefaultSession() - } - } - - private def drain(): Unit = spark.sparkContext.listenerBus.waitUntilEmpty(10000) - - test("emits full simpleString for a Project whose field list got truncated") { - captured.clear() - - val session = spark - import session.implicits._ - - val columnNames = (1 to 20).map(i => s"column_with_long_recognizable_name_$i") - val projection = columnNames.map(n => lit(0).as(n)) - Seq(1).toDF("seed").select(projection: _*).collect() - - drain() - - val info = (0 until captured.size()).map(captured.get).headOption - .getOrElse(fail(s"No translation event captured. Got ${captured.size()} events.")) - - info.translations should not be empty - - // Every emitted translation must (a) be detected as truncated and - // (b) fully render in the expanded form. - info.translations.foreach { t => - t.truncatedDescription should not be t.fullDescription - t.fullDescription.length should be > t.truncatedDescription.length - } - - // At least one translation should cover the Project carrying our columns: - // the full form contains the last column name; the truncated one does not. - val lastCol = columnNames.last - val projectionTranslation = info.translations.find { t => - t.fullDescription.contains(lastCol) && !t.truncatedDescription.contains(lastCol) - } - projectionTranslation.getOrElse( - fail(s"No translation contains the last column name '$lastCol' only in the full form.\n" + - info.translations.map(t => s" ${t.nodeName}: trunc=${t.truncatedDescription.take(80)}... full=${t.fullDescription.take(80)}...").mkString("\n")) - ) - } -} From f8e27e9f21f5d9e6394f0c8fd22ef28a49261d33 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 24 May 2026 06:05:23 +0000 Subject: [PATCH 08/16] feat: add nodeId to DataflintIOTarget matching SparkPlanGraph index MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Walk qe.executedPlan with a pre-order counter and assign each captured target the pre-order index — the same numbering SparkPlanGraph uses, so consumers can correlate IO events to specific nodes in the SparkUI SQL tab. Reads come from typed physical scans (FileSourceScanExec, RowDataSourceScanExec, BatchScanExec). Writes via DataWritingCommandExec delegate to the existing logical-plan matcher on the wrapped DataWritingCommand, then overlay the physical nodeId. The logical-plan matcher still emits targets with nodeId = -1 for nodes that have no direct physical correspondence (kept private to the extractor for now). --- .../listener/DataflintIOExtractor.scala | 108 +++++++++++++----- .../listener/DataflintIOListener.scala | 2 +- .../spark/dataflint/listener/model.scala | 5 +- .../dataflint/DataflintIOListenerSpec.scala | 4 + 4 files changed, 91 insertions(+), 28 deletions(-) diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintIOExtractor.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintIOExtractor.scala index acf3f59d..61902702 100644 --- a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintIOExtractor.scala +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintIOExtractor.scala @@ -2,26 +2,34 @@ package org.apache.spark.dataflint.listener import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, V2WriteCommand} -import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand +import org.apache.spark.sql.execution.{FileSourceScanExec, QueryExecution, RowDataSourceScanExec, SparkPlan} +import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, DataWritingCommandExec} import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoHadoopFsRelationCommand, LogicalRelation, SaveIntoDataSourceCommand} -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2Relation} import scala.util.Try /** - * Walks an analyzed [[LogicalPlan]] and pulls out read/write/save targets from - * the typed nodes (not from `simpleString` descriptions), so paths and table - * identifiers are captured at their full length regardless of - * SQLConf.maxToStringFields truncation. + * Pulls IO targets (reads/writes/saves) out of a QueryExecution by reading + * typed plan fields — not regex over `simpleString` — so paths and identifiers + * are captured at full length regardless of SQLConf.maxToStringFields truncation. + * + * Primary walk is over `qe.executedPlan` with a pre-order index assigned to each + * node; that index matches the nodeId Spark's [[org.apache.spark.sql.execution.ui.SparkPlanGraph]] + * assigns, so consumers can correlate each target back to the SparkUI SQL tab. + * For [[DataWritingCommandExec]] we delegate to the logical-plan matcher on the + * wrapped `cmd` and overlay the physical nodeId. */ object DataflintIOExtractor extends Logging { - /** Extract all read+write targets from an analyzed logical plan. */ - def extract(plan: LogicalPlan): Seq[DataflintIOTarget] = { + /** Walk the physical plan, attaching the pre-order nodeId to every captured target. */ + def extract(qe: QueryExecution): Seq[DataflintIOTarget] = { val out = collection.mutable.ArrayBuffer.empty[DataflintIOTarget] Try { - plan.foreach { node => - Try(extractFromNode(node)).toOption.flatten.foreach(out += _) + var idx = 0 + qe.executedPlan.foreach { node => + Try(extractFromPhysical(node, idx)).toOption.flatten.foreach(out += _) + idx += 1 } }.recover { case t: Throwable => logWarning("DataflintIOExtractor: failed to walk plan", t) @@ -29,9 +37,59 @@ object DataflintIOExtractor extends Logging { out.toSeq } - private def extractFromNode(node: LogicalPlan): Option[DataflintIOTarget] = node match { + private def extractFromPhysical(node: SparkPlan, nodeId: Int): Option[DataflintIOTarget] = node match { + + // --- PHYSICAL READS ------------------------------------------------------ + case fsse: FileSourceScanExec => + Some(DataflintIOTarget( + nodeId = nodeId, + operation = "read", + format = fsse.relation.fileFormat.getClass.getSimpleName, + paths = fsse.relation.location.rootPaths.map(_.toString), + tableName = fsse.tableIdentifier.map(_.unquotedString), + saveMode = None, + partitionColumns = fsse.relation.partitionSchema.fieldNames.toSeq, + options = fsse.relation.options + )) + + case bse: BatchScanExec => + val tableName = Option(bse.table).map(_.name) + Some(DataflintIOTarget( + nodeId = nodeId, + operation = "read", + format = Option(bse.table).map(_.getClass.getSimpleName).getOrElse("DataSourceV2"), + paths = Seq.empty, + tableName = tableName, + saveMode = None, + partitionColumns = Seq.empty, + options = Map.empty + )) + + case rdse: RowDataSourceScanExec => + Some(DataflintIOTarget( + nodeId = nodeId, + operation = "read", + format = Try(rdse.relation.getClass.getSimpleName).getOrElse("RowDataSource"), + paths = Seq.empty, + tableName = Try(rdse.tableIdentifier.map(_.unquotedString)).getOrElse(None), + saveMode = None, + partitionColumns = Seq.empty, + options = Map.empty + )) + + // --- PHYSICAL WRITES (V1) — delegate to logical matcher, overlay nodeId -- + case dwce: DataWritingCommandExec => + extractFromLogical(dwce.cmd).map(_.copy(nodeId = nodeId)) + + case _ => None + } + + /** + * Logical-plan matcher for the commands [[DataWritingCommandExec]] wraps. + * Returns nodeId = -1; the physical-plan caller overlays the real id. + */ + private def extractFromLogical(node: LogicalPlan): Option[DataflintIOTarget] = node match { - // --- READS --------------------------------------------------------------- case lr: LogicalRelation => val (paths, format) = lr.relation match { case h: HadoopFsRelation => @@ -39,34 +97,33 @@ object DataflintIOExtractor extends Logging { case other => (Seq.empty[String], other.getClass.getSimpleName) } - val tableName = lr.catalogTable.map(_.identifier.unquotedString) Some(DataflintIOTarget( + nodeId = -1, operation = "read", format = format, paths = paths, - tableName = tableName, + tableName = lr.catalogTable.map(_.identifier.unquotedString), saveMode = None, partitionColumns = lr.catalogTable.map(_.partitionColumnNames).getOrElse(Seq.empty), options = Map.empty )) case d: DataSourceV2Relation => - val tableName = d.identifier.map(_.toString).orElse(Option(d.table).map(_.name)) - val format = Option(d.table).map(_.getClass.getSimpleName).getOrElse("DataSourceV2") val opts = safeOptionsMap(d.options.asCaseSensitiveMap()) Some(DataflintIOTarget( + nodeId = -1, operation = "read", - format = format, + format = Option(d.table).map(_.getClass.getSimpleName).getOrElse("DataSourceV2"), paths = optionalPath(opts), - tableName = tableName, + tableName = d.identifier.map(_.toString).orElse(Option(d.table).map(_.name)), saveMode = None, partitionColumns = Seq.empty, options = opts )) - // --- WRITES (V1) --------------------------------------------------------- case c: InsertIntoHadoopFsRelationCommand => Some(DataflintIOTarget( + nodeId = -1, operation = "write", format = c.fileFormat.getClass.getSimpleName, paths = Seq(c.outputPath.toString), @@ -77,12 +134,12 @@ object DataflintIOExtractor extends Logging { )) case c: SaveIntoDataSourceCommand => - val tableName = c.options.get("dbtable").orElse(c.options.get("table")) Some(DataflintIOTarget( + nodeId = -1, operation = "write", format = c.dataSource.getClass.getSimpleName, paths = c.options.get("path").toSeq, - tableName = tableName, + tableName = c.options.get("dbtable").orElse(c.options.get("table")), saveMode = Some(c.mode.toString), partitionColumns = Seq.empty, options = c.options @@ -90,6 +147,7 @@ object DataflintIOExtractor extends Logging { case c: CreateDataSourceTableAsSelectCommand => Some(DataflintIOTarget( + nodeId = -1, operation = "write", format = c.table.provider.getOrElse(""), paths = c.table.storage.locationUri.map(_.toString).toSeq, @@ -99,16 +157,14 @@ object DataflintIOExtractor extends Logging { options = c.table.storage.properties )) - // --- WRITES (V2: AppendData / OverwriteByExpression / OverwritePartitionsDynamic / ReplaceData) --- case w: V2WriteCommand => val named = Try(w.table).toOption - val tableName = named.flatMap(t => Option(t.name)) - val format = named.map(_.getClass.getSimpleName).getOrElse("V2Write") Some(DataflintIOTarget( + nodeId = -1, operation = "write", - format = format, + format = named.map(_.getClass.getSimpleName).getOrElse("V2Write"), paths = Seq.empty, - tableName = tableName, + tableName = named.flatMap(t => Option(t.name)), saveMode = Some(w.getClass.getSimpleName), partitionColumns = Seq.empty, options = Map.empty diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintIOListener.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintIOListener.scala index 896d5076..25bdf35f 100644 --- a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintIOListener.scala +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintIOListener.scala @@ -30,7 +30,7 @@ class DataflintIOListener extends QueryExecutionListener with Logging { private def process(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { Try { - val targets = DataflintIOExtractor.extract(qe.analyzed) + val targets = DataflintIOExtractor.extract(qe) if (targets.nonEmpty) { val info = DataflintIOInfo( executionId = qe.id, diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/model.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/model.scala index 0396f171..6e9bc20f 100644 --- a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/model.scala +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/model.scala @@ -92,10 +92,13 @@ class DataflintDeltaLakeScanInfoWrapper(val info: DataflintDeltaLakeScanInfo) { def id: String = s"${info.minExecutionId}_${info.tablePath.replaceAll(" ", "")}" } -// Captured from the typed logical plan via QueryExecutionListener so paths/table +// Captured from the typed physical plan via QueryExecutionListener so paths/table // identifiers are the full untruncated values (not the simpleString form Spark uses // in plan descriptions, which gets cut at SQLConf.maxToStringFields). +// nodeId is the pre-order index in qe.executedPlan, matching SparkPlanGraph node IDs; +// -1 if extracted from a logical-plan node that has no direct physical correspondence. case class DataflintIOTarget( + nodeId: Int, operation: String, // "read" | "write" format: String, // "parquet", "delta", "iceberg", "kafka", ... paths: Seq[String], diff --git a/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintIOListenerSpec.scala b/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintIOListenerSpec.scala index 85fe45c1..a76b7b91 100644 --- a/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintIOListenerSpec.scala +++ b/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintIOListenerSpec.scala @@ -89,6 +89,9 @@ class DataflintIOListenerSpec extends AnyFunSuite with Matchers with BeforeAndAf writeTarget.paths.head should include (deepPath.getName) writeTarget.paths.head should not include "..." writeTarget.saveMode shouldBe Some("Overwrite") + // nodeId is the pre-order index in qe.executedPlan; the write command exec + // is the root or near-root, so it should be a small non-negative integer. + writeTarget.nodeId should be >= 0 // Now read it back and verify the read target carries the full path too. spark.read.parquet(deepPath.getAbsolutePath).collect() @@ -97,6 +100,7 @@ class DataflintIOListenerSpec extends AnyFunSuite with Matchers with BeforeAndAf val readInfo = find("read").getOrElse(fail("No read event captured.")) val readTarget = readInfo.targets.find(_.operation == "read").get readTarget.paths.exists(p => p.contains(deepPath.getName) && !p.endsWith("...")) shouldBe true + readTarget.nodeId should be >= 0 } finally { deleteRecursively(baseDir) } From c1b37e0754f0581cd0d01023a002b50cf058c4c9 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 24 May 2026 09:11:49 +0000 Subject: [PATCH 09/16] feat: one DataflintIOEvent per target MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously a single event carried all targets for a query. A query touching many paths could push a single SparkListenerSQLExecutionStart- adjacent event past Databricks' 2 MB per-event cap, dropping every path in the batch. Split it: each DataflintIOTarget rides in its own DataflintIOEvent. - DataflintIOInfo.targets: Seq → DataflintIOInfo.target: single value - Extractor returns Seq[DataflintIOTarget] per physical node (was Option); FileSourceScanExec with N rootPaths now emits N targets - Listener iterates and posts one event per target - DataflintIOInfoWrapper.id keyed by composite (executionId, nodeId, path-or-table-fingerprint) so all rows coexist in the store --- .../listener/DataflintIOExtractor.scala | 31 ++++++++++++------- .../listener/DataflintIOListener.scala | 21 ++++++++----- .../spark/dataflint/listener/model.scala | 11 +++++-- .../dataflint/DataflintIOListenerSpec.scala | 9 +++--- 4 files changed, 46 insertions(+), 26 deletions(-) diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintIOExtractor.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintIOExtractor.scala index 61902702..fe4c6cb8 100644 --- a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintIOExtractor.scala +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintIOExtractor.scala @@ -22,13 +22,18 @@ import scala.util.Try */ object DataflintIOExtractor extends Logging { - /** Walk the physical plan, attaching the pre-order nodeId to every captured target. */ + /** + * Walk the physical plan, attaching the pre-order nodeId to every captured target. + * A single node can yield multiple targets (e.g. a FileSourceScanExec with multiple + * rootPaths emits one target per path) — keeps each target small enough that one + * Spark event per target stays well under Databricks' 2 MB per-event cap. + */ def extract(qe: QueryExecution): Seq[DataflintIOTarget] = { val out = collection.mutable.ArrayBuffer.empty[DataflintIOTarget] Try { var idx = 0 qe.executedPlan.foreach { node => - Try(extractFromPhysical(node, idx)).toOption.flatten.foreach(out += _) + Try(extractFromPhysical(node, idx)).toOption.foreach(out ++= _) idx += 1 } }.recover { @@ -37,36 +42,38 @@ object DataflintIOExtractor extends Logging { out.toSeq } - private def extractFromPhysical(node: SparkPlan, nodeId: Int): Option[DataflintIOTarget] = node match { + private def extractFromPhysical(node: SparkPlan, nodeId: Int): Seq[DataflintIOTarget] = node match { // --- PHYSICAL READS ------------------------------------------------------ case fsse: FileSourceScanExec => - Some(DataflintIOTarget( + val rootPaths = fsse.relation.location.rootPaths.map(_.toString) + val base = DataflintIOTarget( nodeId = nodeId, operation = "read", format = fsse.relation.fileFormat.getClass.getSimpleName, - paths = fsse.relation.location.rootPaths.map(_.toString), + paths = Seq.empty, tableName = fsse.tableIdentifier.map(_.unquotedString), saveMode = None, partitionColumns = fsse.relation.partitionSchema.fieldNames.toSeq, options = fsse.relation.options - )) + ) + if (rootPaths.isEmpty) Seq(base) + else rootPaths.map(p => base.copy(paths = Seq(p))) case bse: BatchScanExec => - val tableName = Option(bse.table).map(_.name) - Some(DataflintIOTarget( + Seq(DataflintIOTarget( nodeId = nodeId, operation = "read", format = Option(bse.table).map(_.getClass.getSimpleName).getOrElse("DataSourceV2"), paths = Seq.empty, - tableName = tableName, + tableName = Option(bse.table).map(_.name), saveMode = None, partitionColumns = Seq.empty, options = Map.empty )) case rdse: RowDataSourceScanExec => - Some(DataflintIOTarget( + Seq(DataflintIOTarget( nodeId = nodeId, operation = "read", format = Try(rdse.relation.getClass.getSimpleName).getOrElse("RowDataSource"), @@ -79,9 +86,9 @@ object DataflintIOExtractor extends Logging { // --- PHYSICAL WRITES (V1) — delegate to logical matcher, overlay nodeId -- case dwce: DataWritingCommandExec => - extractFromLogical(dwce.cmd).map(_.copy(nodeId = nodeId)) + extractFromLogical(dwce.cmd).map(_.copy(nodeId = nodeId)).toSeq - case _ => None + case _ => Seq.empty } /** diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintIOListener.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintIOListener.scala index 25bdf35f..8e48af0a 100644 --- a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintIOListener.scala +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintIOListener.scala @@ -32,13 +32,20 @@ class DataflintIOListener extends QueryExecutionListener with Logging { Try { val targets = DataflintIOExtractor.extract(qe) if (targets.nonEmpty) { - val info = DataflintIOInfo( - executionId = qe.id, - sqlFuncName = funcName, - durationMs = durationNs / 1000000L, - targets = targets - ) - qe.sparkSession.sparkContext.listenerBus.post(DataflintIOEvent(info)) + val durationMs = durationNs / 1000000L + val bus = qe.sparkSession.sparkContext.listenerBus + // One event per target — keeps each event small enough to stay well under + // Databricks' 2 MB per-SparkListenerEvent cap and ensures we never drop a + // path because the whole batch was too large. + targets.foreach { target => + val info = DataflintIOInfo( + executionId = qe.id, + sqlFuncName = funcName, + durationMs = durationMs, + target = target + ) + bus.post(DataflintIOEvent(info)) + } } }.recover { case t: Throwable => logWarning("DataflintIOListener: failed to process query", t) diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/model.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/model.scala index 6e9bc20f..d260bdb8 100644 --- a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/model.scala +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/model.scala @@ -112,13 +112,20 @@ case class DataflintIOInfo( executionId: Long, sqlFuncName: String, durationMs: Long, - targets: Seq[DataflintIOTarget] + target: DataflintIOTarget ) case class DataflintIOEvent(ioInfo: DataflintIOInfo) extends SparkListenerEvent class DataflintIOInfoWrapper(val info: DataflintIOInfo) { + // Composite key so multiple targets per execution / per node coexist in the store. + // (executionId, nodeId, fingerprint of the resource it represents) @KVIndex @JsonIgnore - def id: String = info.executionId.toString + def id: String = { + val fingerprint = info.target.paths.headOption + .orElse(info.target.tableName) + .getOrElse(info.target.operation + ":" + info.target.format) + s"${info.executionId}_${info.target.nodeId}_${fingerprint.hashCode.toHexString}" + } } diff --git a/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintIOListenerSpec.scala b/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintIOListenerSpec.scala index a76b7b91..36fb35de 100644 --- a/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintIOListenerSpec.scala +++ b/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintIOListenerSpec.scala @@ -61,7 +61,7 @@ class DataflintIOListenerSpec extends AnyFunSuite with Matchers with BeforeAndAf } private def find(op: String): Option[DataflintIOInfo] = - (0 until captured.size()).map(captured.get).find(_.targets.exists(_.operation == op)) + (0 until captured.size()).map(captured.get).find(_.target.operation == op) test("captures full write path through InsertIntoHadoopFsRelationCommand without truncation") { val baseDir = java.nio.file.Files.createTempDirectory("dataflint-io").toFile @@ -80,10 +80,10 @@ class DataflintIOListenerSpec extends AnyFunSuite with Matchers with BeforeAndAf .write.mode("overwrite").parquet(deepPath.getAbsolutePath) drain() - val writeInfo = find("write") + val writeTarget = find("write") .getOrElse(fail(s"No write event captured. Got ${captured.size()} events.")) + .target - val writeTarget = writeInfo.targets.find(_.operation == "write").get writeTarget.paths should have size 1 // Crucial assertion: full path, no truncation marker. writeTarget.paths.head should include (deepPath.getName) @@ -97,8 +97,7 @@ class DataflintIOListenerSpec extends AnyFunSuite with Matchers with BeforeAndAf spark.read.parquet(deepPath.getAbsolutePath).collect() drain() - val readInfo = find("read").getOrElse(fail("No read event captured.")) - val readTarget = readInfo.targets.find(_.operation == "read").get + val readTarget = find("read").getOrElse(fail("No read event captured.")).target readTarget.paths.exists(p => p.contains(deepPath.getName) && !p.endsWith("...")) shouldBe true readTarget.nodeId should be >= 0 } finally { From d67d234d0afbc53564e2fe4398b3f8e2e500ae0c Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 24 May 2026 11:01:29 +0000 Subject: [PATCH 10/16] feat: size-based chunking of DataflintIOEvent MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the one-event-per-target safety net with greedy size-budgeted chunks. Each DataflintIOEvent now carries Seq[DataflintIOTarget] packed up to ~1 MB of estimated JSON bytes — well under Databricks' 2 MB per-event cap — so typical queries emit a single event and only genuinely huge IO sets (hundreds of long paths) split across multiple. - DataflintIOInfo: target → targets + chunkIndex - DataflintIOInfoWrapper.id: (executionId, chunkIndex) - Listener.chunkTargets: greedy pack, never drops a target (a single oversize target gets its own chunk rather than being skipped) - Listener.estimateBytes: sums string field lengths * 1.2 (JSON overhead) + 256 B structural per-target Spec updated to flatten event.targets when looking up read/write rows. --- .../listener/DataflintIOListener.scala | 55 +++++++++++++++++-- .../spark/dataflint/listener/model.scala | 14 ++--- .../dataflint/DataflintIOListenerSpec.scala | 13 +++-- 3 files changed, 62 insertions(+), 20 deletions(-) diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintIOListener.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintIOListener.scala index 8e48af0a..306864b2 100644 --- a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintIOListener.scala +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintIOListener.scala @@ -19,6 +19,13 @@ import scala.util.Try */ class DataflintIOListener extends QueryExecutionListener with Logging { + /** + * Per-event size budget for the targets payload (rough upper bound of JSON bytes). + * Databricks drops any SparkListenerEvent larger than ~2 MB; we cap chunks at + * 1 MB so the JSON envelope + base fields still fit comfortably. + */ + private val maxChunkBytes = 1024 * 1024 + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { process(funcName, qe, durationNs) } @@ -34,21 +41,59 @@ class DataflintIOListener extends QueryExecutionListener with Logging { if (targets.nonEmpty) { val durationMs = durationNs / 1000000L val bus = qe.sparkSession.sparkContext.listenerBus - // One event per target — keeps each event small enough to stay well under - // Databricks' 2 MB per-SparkListenerEvent cap and ensures we never drop a - // path because the whole batch was too large. - targets.foreach { target => + var chunkIndex = 0 + chunkTargets(targets).foreach { chunk => val info = DataflintIOInfo( executionId = qe.id, sqlFuncName = funcName, durationMs = durationMs, - target = target + chunkIndex = chunkIndex, + targets = chunk ) bus.post(DataflintIOEvent(info)) + chunkIndex += 1 } } }.recover { case t: Throwable => logWarning("DataflintIOListener: failed to process query", t) } } + + /** + * Greedy pack: accumulate targets in order until the next would push the chunk + * over the budget, then flush. A single target larger than the budget gets its + * own chunk (we prefer attempting a too-large event over dropping data). + */ + private[listener] def chunkTargets(targets: Seq[DataflintIOTarget]): Seq[Seq[DataflintIOTarget]] = { + val out = collection.mutable.ArrayBuffer.empty[Seq[DataflintIOTarget]] + val current = collection.mutable.ArrayBuffer.empty[DataflintIOTarget] + var currentBytes = 0 + targets.foreach { t => + val s = estimateBytes(t) + if (current.nonEmpty && currentBytes + s > maxChunkBytes) { + out += current.toSeq + current.clear() + currentBytes = 0 + } + current += t + currentBytes += s + } + if (current.nonEmpty) out += current.toSeq + out.toSeq + } + + /** Rough upper bound for the JSON-serialized size of one target. */ + private[listener] def estimateBytes(t: DataflintIOTarget): Int = { + val strBytes = + t.format.length + + t.operation.length + + t.tableName.fold(0)(_.length) + + t.saveMode.fold(0)(_.length) + + t.paths.iterator.map(_.length).sum + + t.partitionColumns.iterator.map(_.length).sum + + t.options.iterator.map { case (k, v) => k.length + v.length }.sum + // 1.2x for JSON escaping/structure + 256 B per-target constant overhead. + (strBytes * 12 / 10) + 256 + } } + diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/model.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/model.scala index d260bdb8..8ff21ef8 100644 --- a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/model.scala +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/model.scala @@ -112,20 +112,16 @@ case class DataflintIOInfo( executionId: Long, sqlFuncName: String, durationMs: Long, - target: DataflintIOTarget + chunkIndex: Int, + targets: Seq[DataflintIOTarget] ) case class DataflintIOEvent(ioInfo: DataflintIOInfo) extends SparkListenerEvent class DataflintIOInfoWrapper(val info: DataflintIOInfo) { - // Composite key so multiple targets per execution / per node coexist in the store. - // (executionId, nodeId, fingerprint of the resource it represents) + // (executionId, chunkIndex) — one wrapper per emitted event. Chunks are sized + // by the listener to stay under Databricks' 2 MB per-event cap. @KVIndex @JsonIgnore - def id: String = { - val fingerprint = info.target.paths.headOption - .orElse(info.target.tableName) - .getOrElse(info.target.operation + ":" + info.target.format) - s"${info.executionId}_${info.target.nodeId}_${fingerprint.hashCode.toHexString}" - } + def id: String = s"${info.executionId}_${info.chunkIndex}" } diff --git a/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintIOListenerSpec.scala b/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintIOListenerSpec.scala index 36fb35de..ff1a0207 100644 --- a/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintIOListenerSpec.scala +++ b/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintIOListenerSpec.scala @@ -2,7 +2,7 @@ package org.apache.spark.dataflint import java.util.concurrent.CopyOnWriteArrayList -import org.apache.spark.dataflint.listener.{DataflintIOEvent, DataflintIOInfo, DataflintIOListener} +import org.apache.spark.dataflint.listener.{DataflintIOEvent, DataflintIOInfo, DataflintIOListener, DataflintIOTarget} import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} import org.apache.spark.sql.SparkSession import org.scalatest.BeforeAndAfterAll @@ -60,8 +60,10 @@ class DataflintIOListenerSpec extends AnyFunSuite with Matchers with BeforeAndAf spark.sparkContext.listenerBus.waitUntilEmpty(10000) } - private def find(op: String): Option[DataflintIOInfo] = - (0 until captured.size()).map(captured.get).find(_.target.operation == op) + private def find(op: String): Option[DataflintIOTarget] = { + val all = (0 until captured.size()).map(captured.get).flatMap(_.targets) + all.find(_.operation == op) + } test("captures full write path through InsertIntoHadoopFsRelationCommand without truncation") { val baseDir = java.nio.file.Files.createTempDirectory("dataflint-io").toFile @@ -81,8 +83,7 @@ class DataflintIOListenerSpec extends AnyFunSuite with Matchers with BeforeAndAf drain() val writeTarget = find("write") - .getOrElse(fail(s"No write event captured. Got ${captured.size()} events.")) - .target + .getOrElse(fail(s"No write target captured. Got ${captured.size()} events.")) writeTarget.paths should have size 1 // Crucial assertion: full path, no truncation marker. @@ -97,7 +98,7 @@ class DataflintIOListenerSpec extends AnyFunSuite with Matchers with BeforeAndAf spark.read.parquet(deepPath.getAbsolutePath).collect() drain() - val readTarget = find("read").getOrElse(fail("No read event captured.")).target + val readTarget = find("read").getOrElse(fail("No read target captured.")) readTarget.paths.exists(p => p.contains(deepPath.getName) && !p.endsWith("...")) shouldBe true readTarget.nodeId should be >= 0 } finally { From 2a011988852708a42a7a5071dce20b42beb6bd17 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 24 May 2026 11:02:39 +0000 Subject: [PATCH 11/16] test: unit-test DataflintIOListener.chunkTargets Pure unit test (no SparkSession) covering: - many small targets pack into one chunk - size budget is respected (split when total > 1 MB) - single oversize target gets its own chunk (never dropped) - target order is preserved across chunks - empty input yields empty output --- .../DataflintIOListenerChunkingSpec.scala | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 spark-plugin/plugin/src/test/scala/org/apache/spark/dataflint/listener/DataflintIOListenerChunkingSpec.scala diff --git a/spark-plugin/plugin/src/test/scala/org/apache/spark/dataflint/listener/DataflintIOListenerChunkingSpec.scala b/spark-plugin/plugin/src/test/scala/org/apache/spark/dataflint/listener/DataflintIOListenerChunkingSpec.scala new file mode 100644 index 00000000..9d96fa8a --- /dev/null +++ b/spark-plugin/plugin/src/test/scala/org/apache/spark/dataflint/listener/DataflintIOListenerChunkingSpec.scala @@ -0,0 +1,67 @@ +package org.apache.spark.dataflint.listener + +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers + +/** + * Pure unit test for the chunking logic — no SparkSession needed. Builds + * targets of known string size and asserts that DataflintIOListener.chunkTargets + * never drops a target, never produces an empty chunk, and respects the size + * budget within one over-the-budget item's worth of slack. + */ +class DataflintIOListenerChunkingSpec extends AnyFunSuite with Matchers { + + private val listener = new DataflintIOListener + + private def target(pathSize: Int, nodeId: Int = 0): DataflintIOTarget = + DataflintIOTarget( + nodeId = nodeId, + operation = "read", + format = "Parquet", + paths = Seq("a" * pathSize), + tableName = None, + saveMode = None, + partitionColumns = Seq.empty, + options = Map.empty + ) + + test("packs many small targets into a single chunk") { + val targets = (1 to 50).map(_ => target(64)) + val chunks = listener.chunkTargets(targets) + chunks should have size 1 + chunks.head should have size 50 + } + + test("splits when total estimated bytes exceed the budget") { + // ~600 KB per target — three should fit in one 1 MB chunk, four should not. + val targets = (1 to 10).map(_ => target(600 * 1024)) + val chunks = listener.chunkTargets(targets) + chunks.size should be > 1 + chunks.flatten should contain theSameElementsInOrderAs targets + // Each chunk respects the budget except possibly a single oversize item alone. + chunks.foreach { chunk => + val sum = chunk.map(listener.estimateBytes).sum + withClue(s"chunk size $sum, chunk len ${chunk.size}: ") { + (chunk.size == 1 || sum <= 1024 * 1024) shouldBe true + } + } + } + + test("an oversized target gets its own chunk rather than being dropped") { + val huge = target(3 * 1024 * 1024) // ~3 MB single target — bigger than budget + val chunks = listener.chunkTargets(Seq(huge)) + chunks should have size 1 + chunks.head should have size 1 + } + + test("preserves order of targets across chunks") { + val targets = (1 to 20).map(i => target(200 * 1024, nodeId = i)) + val chunks = listener.chunkTargets(targets) + val nodeIds = chunks.flatten.map(_.nodeId) + nodeIds should contain theSameElementsInOrderAs (1 to 20) + } + + test("empty input produces empty output") { + listener.chunkTargets(Seq.empty) shouldBe empty + } +} From efb3878f0dc6f20cc4bd0189ac5769ed3ce878f4 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 24 May 2026 11:53:18 +0000 Subject: [PATCH 12/16] test: rename DataflintIOListenerSpec to ZDataflintIOListenerSpec MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ScalaTest's sbt integration doesn't guarantee strict alphabetical run order — classes can be returned in classpath order. WriteMetricsSpec started failing on this branch with capturedQE null (listener never fired), which can happen if the IO listener spec runs first and leaves the shared forked JVM with a stopped SparkContext / cleared session state that the next spec's getOrCreate + listenerManager.register doesn't recover from cleanly. Prefix the class/file with Z so it sorts after WriteMetricsSpec under any plausible ordering scheme — IO spec definitively runs last in the JVM. --- spark-plugin/build.sbt | 2 +- ...flintIOListenerSpec.scala => ZDataflintIOListenerSpec.scala} | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) rename spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/{DataflintIOListenerSpec.scala => ZDataflintIOListenerSpec.scala} (97%) diff --git a/spark-plugin/build.sbt b/spark-plugin/build.sbt index 453aa90e..2481a312 100644 --- a/spark-plugin/build.sbt +++ b/spark-plugin/build.sbt @@ -184,7 +184,7 @@ lazy val pluginspark4 = (project in file("pluginspark4")) val pluginspark3Tests = (pluginspark3 / Test / sourceDirectory).value / "scala" Seq( pluginspark3Tests / "org" / "apache" / "spark" / "dataflint" / "DataFlintCodegenFallbackSpec.scala", - pluginspark3Tests / "org" / "apache" / "spark" / "dataflint" / "DataflintIOListenerSpec.scala" + pluginspark3Tests / "org" / "apache" / "spark" / "dataflint" / "ZDataflintIOListenerSpec.scala" ) }, diff --git a/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintIOListenerSpec.scala b/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/ZDataflintIOListenerSpec.scala similarity index 97% rename from spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintIOListenerSpec.scala rename to spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/ZDataflintIOListenerSpec.scala index ff1a0207..efba1197 100644 --- a/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataflintIOListenerSpec.scala +++ b/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/ZDataflintIOListenerSpec.scala @@ -15,7 +15,7 @@ import org.scalatest.matchers.should.Matchers * captured the FULL path — i.e. nothing got cut by Spark's maxToStringFields-driven * plan description truncation. */ -class DataflintIOListenerSpec extends AnyFunSuite with Matchers with BeforeAndAfterAll { +class ZDataflintIOListenerSpec extends AnyFunSuite with Matchers with BeforeAndAfterAll { private var spark: SparkSession = _ private val captured = new CopyOnWriteArrayList[DataflintIOInfo]() From 3c4a1c0fc8ac00a6a3ae62db3424dd7b021c70a4 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 24 May 2026 12:24:02 +0000 Subject: [PATCH 13/16] test: drain listener bus before reading capturedQE in WriteMetricsSpec QueryExecutionListener.onSuccess is delivered asynchronously through sparkContext.listenerBus (via ExecutionListenerBus). The test was reading capturedQE immediately after .write.parquet() returned, racing the bus thread. On main the race usually wins; on this branch the JVM is slow enough (extra class loading from the new IO listener files on the classpath) that the race loses and capturedQE is null. Add a one-liner: waitUntilEmpty(10000) before reading capturedQE so the bus has drained. Pure timing fix, no change to what's asserted. --- .../apache/spark/dataflint/DataFlintWriteMetricsSpec.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataFlintWriteMetricsSpec.scala b/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataFlintWriteMetricsSpec.scala index ee08da4a..4b68f829 100644 --- a/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataFlintWriteMetricsSpec.scala +++ b/spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataFlintWriteMetricsSpec.scala @@ -71,6 +71,10 @@ class DataFlintWriteMetricsSpec extends AnyFunSuite with Matchers with BeforeAnd Seq(("a", 1), ("b", 2), ("c", 3), ("d", 4)).toDF("key", "value") .write.mode("overwrite").parquet(tempDir.getAbsolutePath) + // QueryExecutionListener delivery is async via the listener bus — drain + // it so the test's onSuccess hook has set capturedQE before we read it. + spark.sparkContext.listenerBus.waitUntilEmpty(10000) + val qe = capturedQE.get() qe should not be null From a1dfced9d72414fcba5bb633fcd2a6b426a1561d Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 24 May 2026 15:19:04 +0000 Subject: [PATCH 14/16] ci: surface sbt test failures in step summary + artifact MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Capture sbt +test output to spark-plugin/sbt-test.log via tee, then on failure dump grepped error / failed-test lines plus the last 200 lines to GITHUB_STEP_SUMMARY — visible directly in the workflow UI on the PR's checks tab without having to open the raw log. Also always upload the full log as an artifact (14-day retention) for the cases where the step summary isn't enough. Note: ci.yml is generated by sbt-github-actions but this is an additive edit and idempotent under regeneration; if needed it can be replicated in build.sbt via githubWorkflowGenerate later. --- .github/workflows/ci.yml | 34 +++++++++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4c2d2a88..f2199740 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -64,6 +64,38 @@ jobs: working-directory: ./spark-ui - name: Build and test plugin - run: sbt +test + id: sbt-test + run: | + set -o pipefail + sbt +test 2>&1 | tee sbt-test.log working-directory: ./spark-plugin + - name: Surface failing test output + if: failure() && steps.sbt-test.conclusion == 'failure' + run: | + LOG=spark-plugin/sbt-test.log + { + echo "## sbt test failure" + echo "" + echo "### \`[error]\` / failed test lines" + echo '```' + # Grab failure markers plus 3 lines of trailing context. + grep -nE '\*\*\* (FAILED|ABORTED) \*\*\*|^\[error\]|^\[info\] [A-Z][^:]*:$' "$LOG" \ + | tail -200 || true + echo '```' + echo "" + echo "### Last 200 log lines" + echo '```' + tail -200 "$LOG" + echo '```' + } >> "$GITHUB_STEP_SUMMARY" + + - name: Upload sbt test log + if: always() + uses: actions/upload-artifact@v4 + with: + name: sbt-test-log + path: spark-plugin/sbt-test.log + if-no-files-found: ignore + retention-days: 14 + From 9266a7e4829d5c608925f7f57998e05db268d115 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 24 May 2026 15:38:17 +0000 Subject: [PATCH 15/16] ci: post sbt test failure as a PR comment When the test task fails on a PR, post the grepped failure block + the last 300 sbt log lines as a PR comment. Lets reviewers (and tooling that can read PR comments via the GitHub API) get the failure detail without opening the workflow log. Needs pull-requests: write on the job-level permissions block. Sticky behavior (one comment per PR) can be layered later if the comment churn gets noisy. --- .github/workflows/ci.yml | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f2199740..8897e7bb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -21,6 +21,9 @@ jobs: os: [ubuntu-latest] java: [temurin@17] runs-on: ubuntu-latest + permissions: + contents: read + pull-requests: write steps: - name: Checkout current branch (full) uses: actions/checkout@v4 @@ -99,3 +102,29 @@ jobs: if-no-files-found: ignore retention-days: 14 + - name: Post failure block as PR comment + if: failure() && steps.sbt-test.conclusion == 'failure' && github.event_name == 'pull_request' + env: + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + PR_NUMBER: ${{ github.event.pull_request.number }} + SHA: ${{ github.event.pull_request.head.sha }} + run: | + LOG=spark-plugin/sbt-test.log + { + echo "## sbt test failure — \`${SHA:0:7}\`" + echo "" + echo "### \`[error]\` / failed test lines" + echo '```' + grep -nE '\*\*\* (FAILED|ABORTED) \*\*\*|^\[error\]|^\[info\] [A-Z][^:]*:$' "$LOG" \ + | tail -200 || true + echo '```' + echo "" + echo "
Last 300 lines of sbt output" + echo "" + echo '```' + tail -300 "$LOG" + echo '```' + echo "
" + } > /tmp/ci-failure-comment.md + gh pr comment "$PR_NUMBER" --body-file /tmp/ci-failure-comment.md + From 2acc7b869dad45d52d75d48d1e6bf210ee090e54 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 24 May 2026 15:42:48 +0000 Subject: [PATCH 16/16] fix: cross-Scala-version ArrayBuffer aliasing bug in chunkTargets MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit On Scala 2.12 mutable.ArrayBuffer.toSeq returns `this` (no copy) because ArrayBuffer IS-A scala.collection.Seq in 2.12. Our chunker did: out += current.toSeq current.clear() — which on 2.12 cleared the buffer we just stored in `out`, so every emitted chunk ended up empty and the order-preservation unit test failed ("preserves order of targets across chunks"). On Scala 2.13 toSeq makes an immutable copy so the bug was latent. Use toList for an eager immutable copy in both versions. --- .../spark/dataflint/listener/DataflintIOListener.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintIOListener.scala b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintIOListener.scala index 306864b2..1e5578d0 100644 --- a/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintIOListener.scala +++ b/spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintIOListener.scala @@ -71,15 +71,17 @@ class DataflintIOListener extends QueryExecutionListener with Logging { targets.foreach { t => val s = estimateBytes(t) if (current.nonEmpty && currentBytes + s > maxChunkBytes) { - out += current.toSeq + // toList — eager immutable copy. ArrayBuffer.toSeq in Scala 2.12 returns + // `this`, so a later current.clear() would empty the seq we just stored. + out += current.toList current.clear() currentBytes = 0 } current += t currentBytes += s } - if (current.nonEmpty) out += current.toSeq - out.toSeq + if (current.nonEmpty) out += current.toList + out.toList } /** Rough upper bound for the JSON-serialized size of one target. */