-
Notifications
You must be signed in to change notification settings - Fork 116
[Gold Standard]: Initial code for spark only setup with a single query #384
Conversation
| trait TPCDSBase extends SparkFunSuite with SparkInvolvedSuite { | ||
|
|
||
| val conf = SQLConf.get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: superclasses changed for lack of spark 3.0 support
|
|
||
| // The TPCDS queries below are based on v1.4. | ||
| // TODO: Fix bulid pipeline for q49 and reenable q49. | ||
| val tpcdsQueries = Seq("q1") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: this is a complete list of queries which will run as part of this test. Currently only q1 is selected. In subsequent prs, all queries will be enabled
| // TODO: Fix bulid pipeline for q49 and reenable q49. | ||
| val tpcdsQueries = Seq("q1") | ||
|
|
||
| private val tableColumns = Map( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no change in tableColumns
|
|
||
| val tableNames: Iterable[String] = tableColumns.keys | ||
|
|
||
| def createTable( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no change in createTable
|
|
||
| private val originalCBCEnabled = conf.cboEnabled | ||
| private val originalJoinReorderEnabled = conf.joinReorderEnabled | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: removed val originalPlanStatsEnabled from source. It is only required for stats based tests which are not yet supported
| private val originalCBCEnabled = conf.cboEnabled | ||
| private val originalJoinReorderEnabled = conf.joinReorderEnabled | ||
|
|
||
| override def beforeAll(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: simplified beforeAll and afterAll by removing some stats related code which we don't support yet.
| * | ||
| * To run the entire test suite: | ||
| * {{{ | ||
| * sbt "test:testOnly *PlanStabilitySuite" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: the run command is different from spark, because of the difference in spark vs hyperspace project structure. In spark, this test is within sql/ project so the command looks slightly different.
Same for others
| */ | ||
| // scalastyle:on filelinelengthchecker | ||
|
|
||
| trait PlanStabilitySuite extends TPCDSBase with Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Superclasses changed because of lack of spark 3.0 support
src/test/scala/com/microsoft/hyperspace/goldstandard/PlanStabilitySuite.scala
Outdated
Show resolved
Hide resolved
|
|
||
| override def afterAll(): Unit = { | ||
| super.afterAll() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: in beforeAll and afterAll, some spark conf values have been changed:
originalMaxToStringFields = conf.maxToStringFields=> Conf not present in spark 2.4spark.sql.crossJoin.enabled=> set to true because some queries fail during query optimization phase in case of cross joins
| case subquery: SubqueryExec => | ||
| subqueriesMap.getOrElseUpdate(subquery, subqueriesMap.size + 1) | ||
| case _ => -1 | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: removed a couple of case match statements because of lack of spark 3.0 support
case SubqueryBroadcastExec
case ReusedSubqueryExec| * "sum(sr_return_amt#14)", so we remove all of these using regex | ||
| */ | ||
| def cleanUpReferences(references: AttributeSet): String = { | ||
| referenceRegex.replaceAllIn(references.toSeq.map(_.name).sorted.mkString(","), "") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Node: added sorting of references for consistent behavior in local vs azure build pipeline setup
imback82
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally looking fine to me.
src/test/scala/com/microsoft/hyperspace/goldstandard/PlanStabilitySuite.scala
Outdated
Show resolved
Hide resolved
| s"Location.*spark-warehouse/", | ||
| "Location [not included in comparison]/{warehouse_dir}/") | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: normalization logic changed very slightly.
We depend on df.explain() command for generating output.
Spark depends on QueryExecution.explainString() which generates a different output.
(to compare: link)
| classLoader = Thread.currentThread().getContextClassLoader) | ||
| val qe = spark.sql(queryString).queryExecution | ||
| val plan = qe.executedPlan | ||
| val explain = normalizeLocation(normalizeIds(explainString(qe))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: small change here: use of private def explainString instead of spark 3.0 supported qe.explainString
| } | ||
| } | ||
|
|
||
| def explainString(queryExecution: QueryExecution): String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: new method, not present in original
| override def afterAll(): Unit = { | ||
| super.afterAll() | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: No change from this line onward, until my next comment.
thanks @imback82 , I added the fix to the TODO comment. |
|
bintray is down https://status.bintray.com/ causing build failures. |
Looks like the issue got fixed. The latest build succeeded. @imback82 , please take a look |
src/test/scala/com/microsoft/hyperspace/goldstandard/PlanStabilitySuite.scala
Outdated
Show resolved
Hide resolved
imback82
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @apoorvedave1!
What is the context for this pull request?
What changes were proposed in this pull request?
In this PR, we are introducing the code for spark-only (non-hyperspace) version of Gold standard. This PR is also LIMITED TO ONLY QUERY 1 of tpcds queries.
In the subsequent PR #337, we will push the updated plans for all the remaining queries q2-q99. The sole aim of this PR is for code validation with an example query q1.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit test