diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala index b415fb548..ee8289159 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala @@ -52,6 +52,10 @@ object IndexConstants { val INDEX_FILTER_RULE_USE_BUCKET_SPEC = "spark.hyperspace.index.filterRule.useBucketSpec" val INDEX_FILTER_RULE_USE_BUCKET_SPEC_DEFAULT = "false" + // Config used to enable join v2 rule. + val INDEX_JOIN_V2_RULE_ENABLED = "spark.hyperspace.index.joinv2.enabled" + val INDEX_JOIN_V2_RULE_ENABLED_DEFAULT = "false" + // TODO: Remove dev config when nested column is fully supported. val DEV_NESTED_COLUMN_ENABLED = "spark.hyperspace.dev.index.nestedColumn.enabled" val DEV_NESTED_COLUMN_ENABLED_DEFAULT = "false" diff --git a/src/main/scala/com/microsoft/hyperspace/index/covering/JoinIndexRule.scala b/src/main/scala/com/microsoft/hyperspace/index/covering/JoinIndexRule.scala index 01a3a80a5..c1f3b6d5b 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/covering/JoinIndexRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/covering/JoinIndexRule.scala @@ -16,18 +16,10 @@ package com.microsoft.hyperspace.index.covering -import scala.collection.mutable -import scala.util.Try +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.analysis.CleanupAliases -import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, Expression} -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} -import org.apache.spark.sql.execution.joins.SortMergeJoinExec -import org.apache.spark.sql.hyperspace.shim.SparkPlannerShim - -import com.microsoft.hyperspace.Hyperspace import com.microsoft.hyperspace.index.{IndexLogEntry, IndexLogEntryTags} -import com.microsoft.hyperspace.index.covering.JoinAttributeFilter.extractConditions import com.microsoft.hyperspace.index.plananalysis.FilterReasons import com.microsoft.hyperspace.index.rules.{HyperspaceRule, IndexRankFilter, IndexTypeFilter, QueryPlanIndexFilter, RuleUtils} import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap} @@ -42,9 +34,10 @@ import com.microsoft.hyperspace.util.ResolverUtils.resolve * 1-1) Join does not have condition. * 1-2) Left or Right child is not linear plan. * 1-3) Join condition is not eligible - only Equi-joins and simple CNF form are supported. + * 1-4) SortMergeJoin. * 2) the source plan of indexes is not part of the join (neither Left nor Right). */ -object JoinPlanNodeFilter extends QueryPlanIndexFilter { +object JoinPlanNodeFilter extends JoinQueryPlanIndexFilter { override def apply(plan: LogicalPlan, candidateIndexes: PlanToIndexesMap): PlanToIndexesMap = { if (candidateIndexes.isEmpty) { return Map.empty @@ -65,18 +58,18 @@ object JoinPlanNodeFilter extends QueryPlanIndexFilter { candidateIndexes.getOrElse(left.get.plan, Nil) ++ candidateIndexes .getOrElse(right.get.plan, Nil) - val joinConditionCond = withFilterReasonTag( + val sortMergeJoinCond = withFilterReasonTag( plan, leftAndRightIndexes, - FilterReasons.NotEligibleJoin("Non equi-join or has literal")) { - isJoinConditionSupported(condition) + FilterReasons.NotEligibleJoin("Not SortMergeJoin")) { + isSortMergeJoin(spark, plan) } - val sortMergeJoinCond = withFilterReasonTag( + val joinConditionCond = withFilterReasonTag( plan, leftAndRightIndexes, - FilterReasons.NotEligibleJoin("Not SortMergeJoin")) { - isSortMergeJoin(plan) + FilterReasons.NotEligibleJoin("Non equi-join or has literal")) { + isJoinConditionSupported(condition) } val leftPlanLinearCond = @@ -118,56 +111,6 @@ object JoinPlanNodeFilter extends QueryPlanIndexFilter { Map.empty } } - - private def isSortMergeJoin(join: LogicalPlan): Boolean = { - val execJoin = new SparkPlannerShim(spark).JoinSelection(join) - execJoin.head.isInstanceOf[SortMergeJoinExec] - } - - /** - * Checks whether a logical plan is linear. Linear means starting at the top, each node in the - * plan has at most one child. - * - * Note: More work needs to be done to support arbitrary logical plans. Until then, this check - * is in place to avoid unexplored scenarios. - * - * The non-linearity can affect when the logical plan signature conflicts happen. - * Assume plan: Join(A, Join(B, C)) -> - * Here, - * left = A - * right = Join(B, C) - * - * If by any chance, signature(right) matches with some signature(some other table D), then - * we might use indexes of table D as a replacement of right - * This problem is possible because currently the signature function depends on the data files - * only. It doesn't incorporate the logical plan structure. - * Once the signature function evolves to take the plan structure into account, we can remove - * the linearity check completely. - * - * @param plan logical plan - * @return true if the plan is linear. False otherwise - */ - private def isPlanLinear(plan: LogicalPlan): Boolean = - plan.children.length <= 1 && plan.children.forall(isPlanLinear) - - /** - * Check for supported Join Conditions. Equi-Joins in simple CNF form are supported. - * - * Predicates should be of the form (A = B and C = D and E = F and...). OR based conditions - * are not supported. E.g. (A = B OR C = D) is not supported - * - * TODO: Investigate whether OR condition can use bucketing info for optimization - * - * @param condition Join condition - * @return True if the condition is supported. False otherwise. - */ - private def isJoinConditionSupported(condition: Expression): Boolean = { - condition match { - case EqualTo(_: AttributeReference, _: AttributeReference) => true - case And(left, right) => isJoinConditionSupported(left) && isJoinConditionSupported(right) - case _ => false - } - } } /** @@ -176,7 +119,7 @@ object JoinPlanNodeFilter extends QueryPlanIndexFilter { * 2) attributes from left plan must exclusively have one-to-one mapping with attribute * from attributes from right plan. */ -object JoinAttributeFilter extends QueryPlanIndexFilter { +object JoinAttributeFilter extends JoinQueryPlanIndexFilter { override def apply(plan: LogicalPlan, candidateIndexes: PlanToIndexesMap): PlanToIndexesMap = { if (candidateIndexes.isEmpty || candidateIndexes.size != 2) { return Map.empty @@ -197,124 +140,6 @@ object JoinAttributeFilter extends QueryPlanIndexFilter { } } - /** - * Requirements to support join optimizations using join indexes are as follows: - * - * 1. All Join condition attributes, i.e. attributes referenced in join condition - * must directly come from base relation attributes. - * E.g. for condition (A = B) => Both A and B should come directly from the base tables. - * - * This will not be true if say, A is an alias of some other column C from the base table. We - * don't optimize Join queries for such cases. - * E.g. The following query is not supported: - * WITH newT1 AS (SELECT a AS aliasCol FROM T1) - * SELECT aliasCol, b - * FROM newT1, T2 - * WHERE newT1.aliasCol = T2.b - * Here, aliasCol is not directly from the base relation T1 - * - * TODO: add alias resolver for supporting aliases in join condition. Until then, - * make sure this scenario isn't supported - * - * 2. For each equality condition in the join predicate, one attribute must belong to the left - * subplan, and another from right subplan. - * E.g. A = B => A should come from left and B should come from right or vice versa. - * - * 3. Exclusivity Check: Join Condition Attributes from left subplan must exclusively have - * one-to-one mapping with join condition attributes from right subplan. - * E.g. (A = B and C = D) is supported. A maps with B, C maps with D exclusively. - * E.g. (A = B and A = D) is not supported. A maps with both B and D. There isn't a one-to-one - * mapping. - * - * Background knowledge: - * An alias in a query plan is represented as [[Alias]] at the time of - * its creation. Unnecessary aliases get resolved and removed during query analysis phase by - * [[CleanupAliases]] rule. Some nodes still remain with alias definitions. E.g. [[Project]]. - * - * Secondly, the output of a logical plan is an [[AttributeSet]]. Alias objects get converted - * to [[AttributeReference]]s at plan boundaries. - * - * From looking at the join condition, we can't know whether the attributes used in the - * condition were original columns from the base table, or were alias-turned-attributes. - * - * Algorithm: - * 1. Collect base table attributes from the supported relation nodes. These are the output set - * from leaf nodes. - * 2. Maintain a mapping of join condition attributes. - * 3. For every equality condition (A == B), Ensure one-to-one mapping between A and B. A - * should not be compared against any other attribute than B. B should not be compared with any - * other attribute than A. Steps: - * a. A and B should belong to base tables from left and right subplans. - * b. If A belongs to left subplan, B should be from right, or vice-versa. - * c. If A and B are never seen in the past (check the mapping), add them to the map with - * (A -> B) and (B -> A) entries - * d. If A exists in the map, make sure A maps to B AND B maps to A back. - * e. If A doesn't exist in the map, B should not exist in the map - * - * Note: this check might affect performance of query optimizer for very large query plans, - * because of multiple collectLeaves calls. We call this method as late as possible. - * - * @param l Left relation - * @param r Right relation - * @param condition Join condition - * @return True if all attributes in join condition are from base relation nodes. - */ - private def ensureAttributeRequirements( - l: FileBasedRelation, - r: FileBasedRelation, - condition: Expression): Boolean = { - // Output attributes from base relations. Join condition attributes must belong to these - // attributes. We work on canonicalized forms to make sure we support case-sensitivity. - val lBaseAttrs = l.plan.output.map(_.canonicalized) - val rBaseAttrs = r.plan.output.map(_.canonicalized) - - def fromDifferentBaseRelations(c1: Expression, c2: Expression): Boolean = { - (lBaseAttrs.contains(c1) && rBaseAttrs.contains(c2)) || - (lBaseAttrs.contains(c2) && rBaseAttrs.contains(c1)) - } - - // Map to maintain and check one-to-one relation between join condition attributes. For join - // condition attributes, we store their corresponding base relation attributes in the map. - // This ensures uniqueness of attributes in case of case-insensitive system. E.g. We want to - // store just one copy of column 'A' when join condition contains column 'A' as well as 'a'. - val attrMap = new mutable.HashMap[Expression, Expression]() - - extractConditions(condition).forall { - case EqualTo(e1, e2) => - val (c1, c2) = (e1.canonicalized, e2.canonicalized) - // Check 1: c1 and c2 should belong to l and r respectively, or r and l respectively. - if (!fromDifferentBaseRelations(c1, c2)) { - return false - } - // Check 2: c1 is compared only against c2 and vice versa. - if (attrMap.contains(c1) && attrMap.contains(c2)) { - attrMap(c1).equals(c2) && attrMap(c2).equals(c1) - } else if (!attrMap.contains(c1) && !attrMap.contains(c2)) { - attrMap.put(c1, c2) - attrMap.put(c2, c1) - true - } else { - false - } - case _ => throw new IllegalStateException("Unsupported condition found.") - } - } - - /** - * Simple single equi-join conditions and composite AND based conditions will be optimized. - * OR-based conditions will not be optimized. - * - * @param condition Join condition - * @return Sequence of simple conditions from original condition - */ - private[hyperspace] def extractConditions(condition: Expression): Seq[Expression] = - condition match { - case EqualTo(_: AttributeReference, _: AttributeReference) => - Seq(condition) - case And(left, right) => - extractConditions(left) ++ extractConditions(right) - case _ => throw new IllegalStateException("Unsupported condition found") - } } /** @@ -322,7 +147,7 @@ object JoinAttributeFilter extends QueryPlanIndexFilter { * 1) an index does not contain all required columns * 2) all join column should be the indexed columns of an index */ -object JoinColumnFilter extends QueryPlanIndexFilter { +object JoinColumnFilter extends JoinQueryPlanIndexFilter { override def apply(plan: LogicalPlan, candidateIndexes: PlanToIndexesMap): PlanToIndexesMap = { if (candidateIndexes.isEmpty || candidateIndexes.size != 2) { return Map.empty @@ -381,135 +206,6 @@ object JoinColumnFilter extends QueryPlanIndexFilter { } } } - - /** - * Returns a one-to-one column mapping from the predicates. E.g. for predicate. - * T1.A = T2.B and T2.D = T1.C - * it returns a mapping (A -> B), (C -> D), assuming T1 is left table and t2 is right - * - * This mapping is used to find compatible indexes of T1 and T2. - * - * @param leftBaseAttrs required indexed columns from left plan - * @param rightBaseAttrs required indexed columns from right plan - * @param condition join condition which will be used to find the left-right column mapping - * @return Mapping of corresponding columns from left and right, depending on the join - * condition. The keys represent columns from left subplan. The values are columns from - * right subplan. - */ - private[hyperspace] def getLRColumnMapping( - leftBaseAttrs: Seq[String], - rightBaseAttrs: Seq[String], - condition: Expression): Map[String, String] = { - extractConditions(condition).map { - case EqualTo(attr1: AttributeReference, attr2: AttributeReference) => - Try { - ( - resolve(spark, attr1.name, leftBaseAttrs).get, - resolve(spark, attr2.name, rightBaseAttrs).get) - }.getOrElse { - Try { - ( - resolve(spark, attr2.name, leftBaseAttrs).get, - resolve(spark, attr1.name, rightBaseAttrs).get) - }.getOrElse { - throw new IllegalStateException("Unexpected exception while using join rule") - } - } - }.toMap - } - - /** - * Get usable indexes which satisfy indexed and included column requirements. - * - * Pre-requisite: the indexed and included columns required must be already resolved with their - * corresponding base relation columns at this point. - * - * @param plan Query plan - * @param indexes All available indexes for the logical plan - * @param requiredIndexCols required indexed columns resolved with their base relation column. - * @param allRequiredCols required included columns resolved with their base relation column. - * @return Indexes which satisfy the indexed and covering column requirements from the logical - * plan and join condition - */ - private def getUsableIndexes( - plan: LogicalPlan, - indexes: Seq[IndexLogEntry], - requiredIndexCols: Seq[String], - allRequiredCols: Seq[String], - leftOrRight: String): Seq[IndexLogEntry] = { - indexes.filter { idx => - val allCols = idx.derivedDataset.referencedColumns - // All required index columns should match one-to-one with all indexed columns and - // vice-versa. All required columns must be present in the available index columns. - withFilterReasonTag( - plan, - idx, - FilterReasons.NotAllJoinColIndexed( - leftOrRight, - requiredIndexCols.mkString(","), - idx.indexedColumns.mkString(","))) { - requiredIndexCols.toSet.equals(idx.indexedColumns.toSet) - } && - withFilterReasonTag( - plan, - idx, - FilterReasons.MissingIndexedCol( - leftOrRight, - allRequiredCols.mkString(","), - idx.indexedColumns.mkString(","))) { - allRequiredCols.forall(allCols.contains) - } - } - } - - /** - * Returns list of column names which must be present in either the indexed or the included - * columns list of a selected index. For this, collect all columns referenced in the plan - * EXCEPT for the logical relation (i.e. scan node). - * - * E.g. Project(A, B) -> Filter (C = 10) -> Scan T(A,B,C,D,E) - * In the above scenario, A, B, C must be part of either the indexed or the included columns - * requirement. D,E are not part of the requirement. - * - * E.g Filter (C = 10) -> Scan T(A,B,C,D,E) - * In this example, it turns out, all columns A,B,C,D,E are required columns. The Filter - * node doesn't do column pruning. This query plan corresponds to a sql query: - * SELECT * FROM T WHERE C = 10 - * We need to make sure all columns are part of the required columns - * - * Algorithm: - * 1. allReferences: Collect all referenced columns from nodes as required columns. These - * columns (i.e. `LogicalPlan.references`) represent the columns used in the plan. For e.g. - * Filter(a = 10 && b > 20) contains column 'a' and 'b' as referenced columns. - * - * 2. topLevelOutputs: Collect the top level output columns from the plan. These are required - * as these will be used later on in the original query. This is basically a shortened list - * of columns which did not get masked in intermediate Project nodes. - * E.g.1 Filter (C = 10) -> Scan T(A,B,C,D,E) - * Here, top level output will contain all columns A,B,C,D,E - * - * E.g.2 Project (A) -> Filter(c=10) -> Scan T(A,B,C,D,E) - * Here, top level output will contain only column A - * - * 3. return the distinct set of column names from 1 and 2 combined. - * - * @param plan logical plan - * @return list of column names from this plan which must be part of either indexed or included - * columns in a chosen index - */ - private def allRequiredCols(plan: LogicalPlan): Seq[String] = { - val provider = Hyperspace.getContext(spark).sourceProviderManager - val cleaned = CleanupAliases(plan) - val allReferences = cleaned.collect { - case l: LeafNode if provider.isSupportedRelation(l) => Seq() - case other => other.references - }.flatten - val topLevelOutputs = cleaned.outputSet.toSeq - - (allReferences ++ topLevelOutputs).distinct.collect { - case attr: AttributeReference => attr.name - } - } } /** diff --git a/src/main/scala/com/microsoft/hyperspace/index/covering/JoinIndexV2Rule.scala b/src/main/scala/com/microsoft/hyperspace/index/covering/JoinIndexV2Rule.scala new file mode 100644 index 000000000..da260c936 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/covering/JoinIndexV2Rule.scala @@ -0,0 +1,350 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.covering + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +import com.microsoft.hyperspace.index.{IndexLogEntry, IndexLogEntryTags} +import com.microsoft.hyperspace.index.plananalysis.FilterReasons +import com.microsoft.hyperspace.index.rules.{HyperspaceRule, IndexRankFilter, IndexTypeFilter, QueryPlanIndexFilter, RuleUtils} +import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap} +import com.microsoft.hyperspace.index.sources.FileBasedRelation +import com.microsoft.hyperspace.shim.JoinWithoutHint +import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEventLogging, HyperspaceIndexUsageEvent} +import com.microsoft.hyperspace.util.HyperspaceConf +import com.microsoft.hyperspace.util.ResolverUtils.resolve + +/** + * JoinV2PlanNodeFilter filters indexes if + * 1) the given plan is not eligible join plan. + * 1-1) Join does not have condition. + * 1-2) Both left and right are not linear plan. + * 1-3) Join condition is not eligible - only Equi-joins and simple CNF form are supported. + * 1-4) SortMergeJoin. + * 2) the source plan of indexes is not part of the join (neither Left nor Right). + */ +object JoinV2PlanNodeFilter extends QueryPlanIndexFilter with JoinQueryPlanIndexFilter { + override def apply(plan: LogicalPlan, candidateIndexes: PlanToIndexesMap): PlanToIndexesMap = { + if (candidateIndexes.isEmpty) { + return Map.empty + } + + plan match { + case JoinWithoutHint(l, r, _, Some(condition)) => + val left = RuleUtils.getRelation(spark, l) + val right = RuleUtils.getRelation(spark, r) + + if (!(left.isDefined && right.isDefined && !RuleUtils.isIndexApplied( + left.get) && !RuleUtils + .isIndexApplied(right.get))) { + return Map.empty + } + + val leftAndRightIndexes = + candidateIndexes.getOrElse(left.get.plan, Nil) ++ candidateIndexes + .getOrElse(right.get.plan, Nil) + + val sortMergeJoinCond = withFilterReasonTag( + plan, + leftAndRightIndexes, + FilterReasons.NotEligibleJoin("Not SortMergeJoin")) { + isSortMergeJoin(spark, plan) + } + + val joinConditionCond = withFilterReasonTag( + plan, + leftAndRightIndexes, + FilterReasons.NotEligibleJoin("Non equi-join or has literal")) { + isJoinConditionSupported(condition) + } + + val leftPlanLinearCond = isPlanLinear(l) + val rightPlanLinearCond = isPlanLinear(r) + + if (sortMergeJoinCond && joinConditionCond && (leftPlanLinearCond || rightPlanLinearCond)) { + // check left or right + if (leftPlanLinearCond) { + JoinIndexV2Rule.leftRelation.set(left.get) + } + if (rightPlanLinearCond) { + JoinIndexV2Rule.rightRelation.set(right.get) + } + + JoinIndexV2Rule.joinCondition.set(condition) + (candidateIndexes.get(left.get.plan).map(lIndexes => left.get.plan -> lIndexes) ++ + candidateIndexes + .get(right.get.plan) + .map(rIndexes => right.get.plan -> rIndexes)).toMap + } else { + Map.empty + } + case JoinWithoutHint(_, _, _, None) => + setFilterReasonTag( + plan, + candidateIndexes.values.flatten.toSeq, + FilterReasons.NotEligibleJoin("No join condition")) + Map.empty + case _ => + Map.empty + } + } +} + +/** + * JoinV2AttributeFilter filters indexes out if + * 1) each join condition column should com from relations directly + * 2) attributes from left plan must exclusively have one-to-one mapping with attribute + * from attributes from right plan. + */ +object JoinV2AttributeFilter extends JoinQueryPlanIndexFilter { + override def apply(plan: LogicalPlan, candidateIndexes: PlanToIndexesMap): PlanToIndexesMap = { + if (candidateIndexes.isEmpty) { + return Map.empty + } + + if (withFilterReasonTag( + plan, + candidateIndexes.flatMap(_._2).toSeq, + FilterReasons.NotEligibleJoin("incompatible left and right join columns")) { + ensureAttributeRequirements( + JoinIndexV2Rule.leftRelation.get, + JoinIndexV2Rule.rightRelation.get, + JoinIndexV2Rule.joinCondition.get) + }) { + candidateIndexes + } else { + Map.empty + } + } +} + +/** + * JoinV2ColumnFilter filters indexes out if + * 1) an index does not contain all required columns + * 2) all join column should be the indexed columns of an index + */ +object JoinV2ColumnFilter extends JoinQueryPlanIndexFilter { + override def apply(plan: LogicalPlan, candidateIndexes: PlanToIndexesMap): PlanToIndexesMap = { + if (candidateIndexes.isEmpty || candidateIndexes.size != 2) { + return Map.empty + } + + val leftRelation = JoinIndexV2Rule.leftRelation.get + val rightRelation = JoinIndexV2Rule.rightRelation.get + + val lBaseAttrs = leftRelation.plan.output.map(_.name) + val rBaseAttrs = rightRelation.plan.output.map(_.name) + + // Map of left resolved columns with their corresponding right resolved + // columns from condition. + val lRMap = getLRColumnMapping(lBaseAttrs, rBaseAttrs, JoinIndexV2Rule.joinCondition.get) + JoinIndexV2Rule.leftToRightColumnMap.set(lRMap) + val lRequiredIndexedCols = lRMap.keys.toSeq + val rRequiredIndexedCols = lRMap.values.toSeq + + plan match { + case JoinWithoutHint(l, r, _, _) => + // All required columns resolved with base relation. + val lRequiredAllCols = resolve(spark, allRequiredCols(l), lBaseAttrs).get + val rRequiredAllCols = resolve(spark, allRequiredCols(r), rBaseAttrs).get + + // Make sure required indexed columns are subset of all required columns. + assert( + resolve(spark, lRequiredIndexedCols, lRequiredAllCols).isDefined && + resolve(spark, rRequiredIndexedCols, rRequiredAllCols).isDefined) + + val lIndexes = + getUsableIndexes( + plan, + candidateIndexes.getOrElse(leftRelation.plan, Nil), + lRequiredIndexedCols, + lRequiredAllCols, + "left") + val rIndexes = + getUsableIndexes( + plan, + candidateIndexes.getOrElse(rightRelation.plan, Nil), + rRequiredIndexedCols, + rRequiredAllCols, + "right") + + Map(leftRelation.plan -> lIndexes, rightRelation.plan -> rIndexes) + } + } +} + +/** + * JoinV2RankFilter selected the best applicable indexes. + */ +object JoinV2RankFilter extends IndexRankFilter { + override def apply(plan: LogicalPlan, indexes: PlanToIndexesMap): PlanToSelectedIndexMap = { + if (indexes.isEmpty) { + return Map.empty + } + + val rPlan = JoinIndexV2Rule.rightRelation.get.plan + val lPlan = JoinIndexV2Rule.leftRelation.get.plan + + // Pick one index with the largest coverage. + val lIndexes = indexes.get(lPlan) + val rIndexes = indexes.get(rPlan) + + if (lIndexes.isEmpty) { + val candidate = indexWithMostCommonBytes(rPlan, rIndexes.get) + setFilterReasonTagForRank(plan, rIndexes.get, candidate) + Map(rPlan -> indexWithMostCommonBytes(rPlan, rIndexes.get)) + } else if (rIndexes.isEmpty) { + val candidate = indexWithMostCommonBytes(lPlan, lIndexes.get) + setFilterReasonTagForRank(plan, lIndexes.get, candidate) + Map(lPlan -> candidate) + } else { + // Apply one index which has a larger data to reduce the shuffle/sorting time. + // If we apply for both left having a different indexed columns and bucket number, + // Spark optimizer might remove exchange of small side, and add a shuffle for large dataset. + val lSize = JoinIndexV2Rule.leftRelation.get.allFileSizeInBytes + val rSize = JoinIndexV2Rule.rightRelation.get.allFileSizeInBytes + if (lSize > rSize) { + val candidate = indexWithMostCommonBytes(lPlan, lIndexes.get) + setFilterReasonTagForRank(plan, rIndexes.get, candidate) + Map(lPlan -> candidate) + } else { + val candidate = indexWithMostCommonBytes(rPlan, rIndexes.get) + setFilterReasonTagForRank(plan, lIndexes.get, candidate) + Map(rPlan -> candidate) + } + } + } + + private def indexWithMostCommonBytes( + plan: LogicalPlan, + candidates: Seq[IndexLogEntry]): IndexLogEntry = { + // On the other side, there's no applicable index. + if (HyperspaceConf.hybridScanEnabled(spark)) { + candidates.maxBy( + _.getTagValue(plan, IndexLogEntryTags.COMMON_SOURCE_SIZE_IN_BYTES).getOrElse(0L)) + } else { + candidates.maxBy(_.indexFilesSizeInBytes) + } + } +} + +/** + * Rule to optimize a join by applying index either of left or right plan. + * + * This rule improves a SortMergeJoin performance by replacing data files with index files. + * The index files being bucketed and sorted, will eliminate a full shuffle of the data + * during a sort-merge-join operation. + * + * For e.g. + * SELECT T1.A, T1.B, T2.C, T2.D FROM T1, T2 WHERE T1.A = T2.C + * The above query can be optimized to use indexes if one of the following indexes exists: + * Index1: indexedColumns: T1.A, includedColumns: T1.B + * Index2: indexedColumns: T2.C, includedColumns: T2.D + * + * These indexes are indexed by the join columns and can improve the query performance by + * avoiding full shuffling of T1 or T2. + */ +object JoinIndexV2Rule extends HyperspaceRule with HyperspaceEventLogging { + + override val filtersOnQueryPlan: Seq[QueryPlanIndexFilter] = + IndexTypeFilter[CoveringIndex]() :: + JoinV2PlanNodeFilter :: + JoinV2AttributeFilter :: + JoinV2ColumnFilter :: + Nil + + override val indexRanker: IndexRankFilter = JoinV2RankFilter + + // Execution context + var leftRelation: ThreadLocal[FileBasedRelation] = new ThreadLocal[FileBasedRelation] + var rightRelation: ThreadLocal[FileBasedRelation] = new ThreadLocal[FileBasedRelation] + var joinCondition: ThreadLocal[Expression] = new ThreadLocal[Expression] + var leftToRightColumnMap: ThreadLocal[Map[String, String]] = + new ThreadLocal[Map[String, String]] + + override def applyIndex(plan: LogicalPlan, indexes: PlanToSelectedIndexMap): LogicalPlan = { + if (indexes.size != 1) { + return plan + } + + plan match { + case join @ JoinWithoutHint(l, r, _, _) => + val updatedPlan = if (indexes.contains(leftRelation.get.plan)) { + val lIndex = indexes(leftRelation.get.plan) + join + .copy(left = CoveringIndexRuleUtils.transformPlanToUseIndex( + spark, + lIndex, + l, + useBucketSpec = true, + useBucketUnionForAppended = true)) + + } else { + val rIndex = indexes(rightRelation.get.plan) + join + .copy(right = CoveringIndexRuleUtils.transformPlanToUseIndex( + spark, + rIndex, + r, + useBucketSpec = true, + useBucketUnionForAppended = true)) + } + + logEvent( + HyperspaceIndexUsageEvent( + AppInfo(sparkContext.sparkUser, sparkContext.applicationId, sparkContext.appName), + Seq(indexes.values.toSeq.head), + join.toString, + updatedPlan.toString, + "Join index v2 rule applied.")) + updatedPlan + } + } + + override def score(plan: LogicalPlan, indexes: PlanToSelectedIndexMap): Int = { + if (indexes.size != 1) { + return 0 + } + + val targetRel = if (indexes.contains(leftRelation.get.plan)) { + leftRelation.get + } else { + rightRelation.get + } + + def getCommonBytes(index: IndexLogEntry, relation: FileBasedRelation): Long = { + index + .getTagValue(relation.plan, IndexLogEntryTags.COMMON_SOURCE_SIZE_IN_BYTES) + .getOrElse { + relation.allFileInfos.foldLeft(0L) { (res, f) => + if (index.sourceFileInfoSet.contains(f)) { + res + f.size // count, total bytes + } else { + res + } + } + } + } + + val commonBytes = getCommonBytes(indexes.head._2, targetRel) + + // TODO Enhance scoring function. + // See https://github.com/microsoft/hyperspace/issues/444 + (60 * (commonBytes.toFloat / targetRel.allFileSizeInBytes)).round + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/covering/JoinQueryPlanIndexFilter.scala b/src/main/scala/com/microsoft/hyperspace/index/covering/JoinQueryPlanIndexFilter.scala new file mode 100644 index 000000000..5fa16bb4a --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/covering/JoinQueryPlanIndexFilter.scala @@ -0,0 +1,336 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.covering + +import scala.collection.mutable +import scala.util.Try + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.CleanupAliases +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} +import org.apache.spark.sql.execution.joins.SortMergeJoinExec +import org.apache.spark.sql.hyperspace.shim.SparkPlannerShim + +import com.microsoft.hyperspace.Hyperspace +import com.microsoft.hyperspace.index.IndexLogEntry +import com.microsoft.hyperspace.index.plananalysis.FilterReasons +import com.microsoft.hyperspace.index.rules.QueryPlanIndexFilter +import com.microsoft.hyperspace.index.sources.FileBasedRelation +import com.microsoft.hyperspace.util.ResolverUtils.resolve + +trait JoinQueryPlanIndexFilter extends QueryPlanIndexFilter { + + /** + * Checks whether a logical plan is linear. Linear means starting at the top, each node in the + * plan has at most one child. + * + * Note: More work needs to be done to support arbitrary logical plans. Until then, this check + * is in place to avoid unexplored scenarios. + * + * The non-linearity can affect when the logical plan signature conflicts happen. + * Assume plan: Join(A, Join(B, C)) -> + * Here, + * left = A + * right = Join(B, C) + * + * If by any chance, signature(right) matches with some signature(some other table D), then + * we might use indexes of table D as a replacement of right + * This problem is possible because currently the signature function depends on the data files + * only. It doesn't incorporate the logical plan structure. + * Once the signature function evolves to take the plan structure into account, we can remove + * the linearity check completely. + * + * @param plan logical plan + * @return true if the plan is linear. False otherwise + */ + protected def isPlanLinear(plan: LogicalPlan): Boolean = + plan.children.length <= 1 && plan.children.forall(isPlanLinear) + + /** + * Check for supported Join Conditions. Equi-Joins in simple CNF form are supported. + * + * Predicates should be of the form (A = B and C = D and E = F and...). OR based conditions + * are not supported. E.g. (A = B OR C = D) is not supported + * + * TODO: Investigate whether OR condition can use bucketing info for optimization + * + * @param condition Join condition + * @return True if the condition is supported. False otherwise. + */ + protected def isJoinConditionSupported(condition: Expression): Boolean = { + condition match { + case EqualTo(_: AttributeReference, _: AttributeReference) => true + case And(left, right) => isJoinConditionSupported(left) && isJoinConditionSupported(right) + case _ => false + } + } + + protected def isSortMergeJoin(spark: SparkSession, join: LogicalPlan): Boolean = { + val execJoin = new SparkPlannerShim(spark).JoinSelection(join) + execJoin.head.isInstanceOf[SortMergeJoinExec] + } + + /** + * Requirements to support join optimizations using join indexes are as follows: + * + * 1. All Join condition attributes, i.e. attributes referenced in join condition + * must directly come from base relation attributes. + * E.g. for condition (A = B) => Both A and B should come directly from the base tables. + * + * This will not be true if say, A is an alias of some other column C from the base table. We + * don't optimize Join queries for such cases. + * E.g. The following query is not supported: + * WITH newT1 AS (SELECT a AS aliasCol FROM T1) + * SELECT aliasCol, b + * FROM newT1, T2 + * WHERE newT1.aliasCol = T2.b + * Here, aliasCol is not directly from the base relation T1 + * + * TODO: add alias resolver for supporting aliases in join condition. Until then, + * make sure this scenario isn't supported + * + * 2. For each equality condition in the join predicate, one attribute must belong to the left + * subplan, and another from right subplan. + * E.g. A = B => A should come from left and B should come from right or vice versa. + * + * 3. Exclusivity Check: Join Condition Attributes from left subplan must exclusively have + * one-to-one mapping with join condition attributes from right subplan. + * E.g. (A = B and C = D) is supported. A maps with B, C maps with D exclusively. + * E.g. (A = B and A = D) is not supported. A maps with both B and D. There isn't a one-to-one + * mapping. + * + * Background knowledge: + * An alias in a query plan is represented as [[Alias]] at the time of + * its creation. Unnecessary aliases get resolved and removed during query analysis phase by + * [[CleanupAliases]] rule. Some nodes still remain with alias definitions. E.g. [[Project]]. + * + * Secondly, the output of a logical plan is an [[AttributeSet]]. Alias objects get converted + * to [[AttributeReference]]s at plan boundaries. + * + * From looking at the join condition, we can't know whether the attributes used in the + * condition were original columns from the base table, or were alias-turned-attributes. + * + * Algorithm: + * 1. Collect base table attributes from the supported relation nodes. These are the output set + * from leaf nodes. + * 2. Maintain a mapping of join condition attributes. + * 3. For every equality condition (A == B), Ensure one-to-one mapping between A and B. A + * should not be compared against any other attribute than B. B should not be compared with any + * other attribute than A. Steps: + * a. A and B should belong to base tables from left and right subplans. + * b. If A belongs to left subplan, B should be from right, or vice-versa. + * c. If A and B are never seen in the past (check the mapping), add them to the map with + * (A -> B) and (B -> A) entries + * d. If A exists in the map, make sure A maps to B AND B maps to A back. + * e. If A doesn't exist in the map, B should not exist in the map + * + * Note: this check might affect performance of query optimizer for very large query plans, + * because of multiple collectLeaves calls. We call this method as late as possible. + * + * @param l Left relation + * @param r Right relation + * @param condition Join condition + * @return True if all attributes in join condition are from base relation nodes. + */ + protected def ensureAttributeRequirements( + l: FileBasedRelation, + r: FileBasedRelation, + condition: Expression): Boolean = { + // Output attributes from base relations. Join condition attributes must belong to these + // attributes. We work on canonicalized forms to make sure we support case-sensitivity. + val lBaseAttrs = l.plan.output.map(_.canonicalized) + val rBaseAttrs = r.plan.output.map(_.canonicalized) + + def fromDifferentBaseRelations(c1: Expression, c2: Expression): Boolean = { + (lBaseAttrs.contains(c1) && rBaseAttrs.contains(c2)) || + (lBaseAttrs.contains(c2) && rBaseAttrs.contains(c1)) + } + + // Map to maintain and check one-to-one relation between join condition attributes. For join + // condition attributes, we store their corresponding base relation attributes in the map. + // This ensures uniqueness of attributes in case of case-insensitive system. E.g. We want to + // store just one copy of column 'A' when join condition contains column 'A' as well as 'a'. + val attrMap = new mutable.HashMap[Expression, Expression]() + + extractConditions(condition).forall { + case EqualTo(e1, e2) => + val (c1, c2) = (e1.canonicalized, e2.canonicalized) + // Check 1: c1 and c2 should belong to l and r respectively, or r and l respectively. + if (!fromDifferentBaseRelations(c1, c2)) { + return false + } + // Check 2: c1 is compared only against c2 and vice versa. + if (attrMap.contains(c1) && attrMap.contains(c2)) { + attrMap(c1).equals(c2) && attrMap(c2).equals(c1) + } else if (!attrMap.contains(c1) && !attrMap.contains(c2)) { + attrMap.put(c1, c2) + attrMap.put(c2, c1) + true + } else { + false + } + case _ => throw new IllegalStateException("Unsupported condition found.") + } + } + + /** + * Simple single equi-join conditions and composite AND based conditions will be optimized. + * OR-based conditions will not be optimized. + * + * @param condition Join condition + * @return Sequence of simple conditions from original condition + */ + protected def extractConditions(condition: Expression): Seq[Expression] = + condition match { + case EqualTo(_: AttributeReference, _: AttributeReference) => + Seq(condition) + case And(left, right) => + extractConditions(left) ++ extractConditions(right) + case _ => throw new IllegalStateException("Unsupported condition found") + } + + /** + * Returns a one-to-one column mapping from the predicates. E.g. for predicate. + * T1.A = T2.B and T2.D = T1.C + * it returns a mapping (A -> B), (C -> D), assuming T1 is left table and t2 is right + * + * This mapping is used to find compatible indexes of T1 and T2. + * + * @param leftBaseAttrs required indexed columns from left plan + * @param rightBaseAttrs required indexed columns from right plan + * @param condition join condition which will be used to find the left-right column mapping + * @return Mapping of corresponding columns from left and right, depending on the join + * condition. The keys represent columns from left subplan. The values are columns from + * right subplan. + */ + protected def getLRColumnMapping( + leftBaseAttrs: Seq[String], + rightBaseAttrs: Seq[String], + condition: Expression): Map[String, String] = { + extractConditions(condition).map { + case EqualTo(attr1: AttributeReference, attr2: AttributeReference) => + Try { + ( + resolve(spark, attr1.name, leftBaseAttrs).get, + resolve(spark, attr2.name, rightBaseAttrs).get) + }.getOrElse { + Try { + ( + resolve(spark, attr2.name, leftBaseAttrs).get, + resolve(spark, attr1.name, rightBaseAttrs).get) + }.getOrElse { + throw new IllegalStateException("Unexpected exception while using join rule") + } + } + }.toMap + } + + /** + * Get usable indexes which satisfy indexed and included column requirements. + * + * Pre-requisite: the indexed and included columns required must be already resolved with their + * corresponding base relation columns at this point. + * + * @param plan Query plan + * @param indexes All available indexes for the logical plan + * @param requiredIndexCols required indexed columns resolved with their base relation column. + * @param allRequiredCols required included columns resolved with their base relation column. + * @return Indexes which satisfy the indexed and covering column requirements from the logical + * plan and join condition + */ + protected def getUsableIndexes( + plan: LogicalPlan, + indexes: Seq[IndexLogEntry], + requiredIndexCols: Seq[String], + allRequiredCols: Seq[String], + leftOrRight: String): Seq[IndexLogEntry] = { + indexes.filter { idx => + val allCols = idx.derivedDataset.referencedColumns + // All required index columns should match one-to-one with all indexed columns and + // vice-versa. All required columns must be present in the available index columns. + withFilterReasonTag( + plan, + idx, + FilterReasons.NotAllJoinColIndexed( + leftOrRight, + requiredIndexCols.mkString(","), + idx.indexedColumns.mkString(","))) { + requiredIndexCols.toSet.equals(idx.indexedColumns.toSet) + } && + withFilterReasonTag( + plan, + idx, + FilterReasons.MissingIndexedCol( + leftOrRight, + allRequiredCols.mkString(","), + idx.indexedColumns.mkString(","))) { + allRequiredCols.forall(allCols.contains) + } + } + } + + /** + * Returns list of column names which must be present in either the indexed or the included + * columns list of a selected index. For this, collect all columns referenced in the plan + * EXCEPT for the logical relation (i.e. scan node). + * + * E.g. Project(A, B) -> Filter (C = 10) -> Scan T(A,B,C,D,E) + * In the above scenario, A, B, C must be part of either the indexed or the included columns + * requirement. D,E are not part of the requirement. + * + * E.g Filter (C = 10) -> Scan T(A,B,C,D,E) + * In this example, it turns out, all columns A,B,C,D,E are required columns. The Filter + * node doesn't do column pruning. This query plan corresponds to a sql query: + * SELECT * FROM T WHERE C = 10 + * We need to make sure all columns are part of the required columns + * + * Algorithm: + * 1. allReferences: Collect all referenced columns from nodes as required columns. These + * columns (i.e. `LogicalPlan.references`) represent the columns used in the plan. For e.g. + * Filter(a = 10 && b > 20) contains column 'a' and 'b' as referenced columns. + * + * 2. topLevelOutputs: Collect the top level output columns from the plan. These are required + * as these will be used later on in the original query. This is basically a shortened list + * of columns which did not get masked in intermediate Project nodes. + * E.g.1 Filter (C = 10) -> Scan T(A,B,C,D,E) + * Here, top level output will contain all columns A,B,C,D,E + * + * E.g.2 Project (A) -> Filter(c=10) -> Scan T(A,B,C,D,E) + * Here, top level output will contain only column A + * + * 3. return the distinct set of column names from 1 and 2 combined. + * + * @param plan logical plan + * @return list of column names from this plan which must be part of either indexed or included + * columns in a chosen index + */ + protected def allRequiredCols(plan: LogicalPlan): Seq[String] = { + val provider = Hyperspace.getContext(spark).sourceProviderManager + val cleaned = CleanupAliases(plan) + val allReferences = cleaned.collect { + case l: LeafNode if provider.isSupportedRelation(l) => Seq() + case other => other.references + }.flatten + val topLevelOutputs = cleaned.outputSet.toSeq + + (allReferences ++ topLevelOutputs).distinct.collect { + case attr: AttributeReference => attr.name + } + } + +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/plananalysis/CandidateIndexAnalyzer.scala b/src/main/scala/com/microsoft/hyperspace/index/plananalysis/CandidateIndexAnalyzer.scala index 13b8f7fb8..314450033 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/plananalysis/CandidateIndexAnalyzer.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/plananalysis/CandidateIndexAnalyzer.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.hyperspace.utils.DataFrameUtils import com.microsoft.hyperspace.{Hyperspace, HyperspaceException} import com.microsoft.hyperspace.actions.Constants import com.microsoft.hyperspace.index.{IndexLogEntry, IndexLogEntryTags} -import com.microsoft.hyperspace.index.rules.{CandidateIndexCollector, ScoreBasedIndexPlanOptimizer} +import com.microsoft.hyperspace.index.rules.{ApplyHyperspace, CandidateIndexCollector, ScoreBasedIndexPlanOptimizer} object CandidateIndexAnalyzer extends Logging { def whyNotIndexString( @@ -331,7 +331,8 @@ object CandidateIndexAnalyzer extends Logging { try { prepareTagsForAnalysis(indexes) val candidateIndexes = CandidateIndexCollector.apply(plan, indexes) - val transformedPlan = new ScoreBasedIndexPlanOptimizer().apply(plan, candidateIndexes) + val transformedPlan = new ScoreBasedIndexPlanOptimizer(ApplyHyperspace.availableRules) + .apply(plan, candidateIndexes) ( transformedPlan, indexes diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/ApplyHyperspace.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/ApplyHyperspace.scala index a9ae97acd..b2d2f70e2 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/ApplyHyperspace.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/ApplyHyperspace.scala @@ -23,7 +23,10 @@ import org.apache.spark.sql.catalyst.rules.Rule import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace} import com.microsoft.hyperspace.actions.Constants import com.microsoft.hyperspace.index.IndexLogEntry +import com.microsoft.hyperspace.index.covering.{FilterIndexRule, JoinIndexRule, JoinIndexV2Rule} +import com.microsoft.hyperspace.index.dataskipping.rules.ApplyDataSkippingIndex import com.microsoft.hyperspace.telemetry.HyperspaceEventLogging +import com.microsoft.hyperspace.util.HyperspaceConf /** * Transform the given plan to use Hyperspace indexes. @@ -40,6 +43,17 @@ object ApplyHyperspace // Flag to disable ApplyHyperspace rule during index maintenance jobs such as createIndex, // refreshIndex and optimizeIndex. private[hyperspace] val disableForIndexMaintenance = new ThreadLocal[Boolean] + private val rules: Seq[HyperspaceRule] = + Seq(FilterIndexRule, JoinIndexRule, ApplyDataSkippingIndex, NoOpRule) + + def availableRules: Seq[HyperspaceRule] = { + val optionalRules = if (HyperspaceConf.joinV2RuleEnabled(spark)) { + JoinIndexV2Rule :: Nil + } else { + Nil + } + rules ++ optionalRules + } override def apply(plan: LogicalPlan): LogicalPlan = { if (disableForIndexMaintenance.get) { @@ -55,7 +69,7 @@ object ApplyHyperspace } else { try { val candidateIndexes = CandidateIndexCollector(plan, allIndexes) - new ScoreBasedIndexPlanOptimizer().apply(plan, candidateIndexes) + new ScoreBasedIndexPlanOptimizer(availableRules).apply(plan, candidateIndexes) } catch { case e: Exception => logWarning("Cannot apply Hyperspace indexes: " + e.getMessage) diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/HyperspaceRule.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/HyperspaceRule.scala index ab017cf8f..2a44f690f 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/HyperspaceRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/HyperspaceRule.scala @@ -66,7 +66,7 @@ trait HyperspaceRule extends ActiveSparkSession { val applicableIndexes = filtersOnQueryPlan .foldLeft(candidateIndexes) { (pti, filter) => - filter(plan, pti) + filter(plan, pti).filterNot(_._2.isEmpty) } if (applicableIndexes.nonEmpty) { diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/IndexRankFilter.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/IndexRankFilter.scala index cfea96bd5..91d22006b 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/IndexRankFilter.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/IndexRankFilter.scala @@ -47,7 +47,7 @@ trait IndexRankFilter extends IndexFilter { plan: LogicalPlan, indexes: Seq[IndexLogEntry], selectedIndex: IndexLogEntry): Unit = { - indexes.foreach { index => + indexes.filterNot(_.name.equals(selectedIndex.name)).foreach { index => setFilterReasonTag( selectedIndex.name.equals(index.name), plan, diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/ScoreBasedIndexPlanOptimizer.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/ScoreBasedIndexPlanOptimizer.scala index b2faa7427..25a0fedbe 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/ScoreBasedIndexPlanOptimizer.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/ScoreBasedIndexPlanOptimizer.scala @@ -20,16 +20,12 @@ import scala.collection.mutable import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import com.microsoft.hyperspace.index.covering.{FilterIndexRule, JoinIndexRule} -import com.microsoft.hyperspace.index.dataskipping.rules.ApplyDataSkippingIndex import com.microsoft.hyperspace.index.rules.ApplyHyperspace.PlanToIndexesMap /** * Apply Hyperspace indexes based on the score of each index application. */ -class ScoreBasedIndexPlanOptimizer { - private val rules: Seq[HyperspaceRule] = - Seq(FilterIndexRule, JoinIndexRule, ApplyDataSkippingIndex, NoOpRule) +class ScoreBasedIndexPlanOptimizer(rules: Seq[HyperspaceRule]) { // Map for memoization. The key is the logical plan before applying [[HyperspaceRule]]s // and its value is a pair of best transformed plan and its score. diff --git a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala index 2fcc56519..c61cfc4e5 100644 --- a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala +++ b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala @@ -69,6 +69,14 @@ object HyperspaceConf { .toBoolean } + def joinV2RuleEnabled(spark: SparkSession): Boolean = { + spark.conf + .get( + IndexConstants.INDEX_JOIN_V2_RULE_ENABLED, + IndexConstants.INDEX_JOIN_V2_RULE_ENABLED_DEFAULT) + .toBoolean + } + def numBucketsForIndex(spark: SparkSession): Int = { getConfStringWithMultipleKeys( spark, diff --git a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala index ec1c2841f..e857fba16 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala @@ -46,6 +46,7 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite { super.beforeAll() spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) + spark.conf.set(IndexConstants.INDEX_JOIN_V2_RULE_ENABLED, "true") hyperspace = new Hyperspace(spark) fileSystem.delete(new Path(testDir), true) @@ -226,17 +227,79 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite { getIndexFilesPath(rightDfIndexConfig.indexName)) } - test("E2E test for join query with alias columns is not supported.") { - def verifyNoChange(f: () => DataFrame): Unit = { - spark.enableHyperspace() - val plan = f().queryExecution.optimizedPlan - val allIndexes = IndexCollectionManager(spark).getIndexes(Seq(Constants.States.ACTIVE)) - val candidateIndexes = CandidateIndexCollector.apply(plan, allIndexes) - val (updatedPlan, score) = JoinIndexRule.apply(plan, candidateIndexes) - assert(updatedPlan.equals(plan)) - assert(score == 0) + test("Check v2 rule can be applied with one side of index.") { + val leftDf = spark.read.parquet(nonPartitionedDataPath) + + val rightDf = spark.read.parquet(nonPartitionedDataPath) + val rightDfIndexConfig = IndexConfig("rightIndex", Seq("c3"), Seq("C4")) + hyperspace.createIndex(rightDf, rightDfIndexConfig) + + def query(): DataFrame = { + leftDf.join(rightDf, leftDf("c3") === rightDf("C3")).select(leftDf("C1"), rightDf("c4")) } + withSQLConf(IndexConstants.INDEX_JOIN_V2_RULE_ENABLED -> "true") { + verifyIndexUsage( + query, + getAllRootPaths(leftDf.queryExecution.optimizedPlan) ++ getIndexFilesPath( + rightDfIndexConfig.indexName)) + } + } + + test("If join v2 rule applied for a child, filter rule is applied for another if eligible.") { + val leftDf = spark.read.parquet(nonPartitionedDataPath) + val leftDfIndexConfig = IndexConfig("leftIndex", Seq("c3", "c4"), Seq("C5")) + hyperspace.createIndex(leftDf, leftDfIndexConfig) + + val rightDf = spark.read.parquet(nonPartitionedDataPath) + val rightDfIndexConfig = IndexConfig("rightIndex", Seq("c4", "c3"), Seq("C1")) + hyperspace.createIndex(rightDf, rightDfIndexConfig) + + def query(): DataFrame = { + leftDf.join(rightDf, Seq("c4", "c3")).select(leftDf("C5"), rightDf("c1")) + } + + verifyNoChange(query) + + withSQLConf( + IndexConstants.INDEX_JOIN_V2_RULE_ENABLED -> "true", + "spark.sql.legacy.bucketedTableScan.outputOrdering" -> "true") { + verifyIndexUsage( + query, + getIndexFilesPath(leftDfIndexConfig.indexName) ++ getIndexFilesPath( + rightDfIndexConfig.indexName)) + + // Check number of exchange & sort nodes. + { + spark.disableHyperspace() + val dfWithHyperspaceDisabled = query() + val shuffleNodes = dfWithHyperspaceDisabled.queryExecution.executedPlan.collect { + case s: ShuffleExchangeExec => s + } + assert(shuffleNodes.size == 2) + val sortNodes = dfWithHyperspaceDisabled.queryExecution.executedPlan.collect { + case s: SortExec => s + } + assert(sortNodes.size == 2) + } + + { + spark.enableHyperspace() + val dfWithHyperspaceEnabled = query() + val shuffleNodes = dfWithHyperspaceEnabled.queryExecution.executedPlan.collect { + case s: ShuffleExchangeExec => s + } + assert(shuffleNodes.size == 1) + val sortNodes = dfWithHyperspaceEnabled.queryExecution.executedPlan.collect { + case s: SortExec => s + } + assert(sortNodes.size == 1) + } + } + } + + test("E2E test for join query with alias columns is not supported.") { + withView("t1", "t2") { val leftDf = spark.read.parquet(nonPartitionedDataPath) val leftDfIndexConfig = IndexConfig("leftIndex", Seq("c3"), Seq("c1")) @@ -1061,6 +1124,16 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite { } } + private def verifyNoChange(f: () => DataFrame): Unit = { + spark.enableHyperspace() + val plan = f().queryExecution.optimizedPlan + val allIndexes = IndexCollectionManager(spark).getIndexes(Seq(Constants.States.ACTIVE)) + val candidateIndexes = CandidateIndexCollector.apply(plan, allIndexes) + val (updatedPlan, score) = JoinIndexRule.apply(plan, candidateIndexes) + assert(updatedPlan.equals(plan)) + assert(score == 0) + } + /** * Gets the sorted rows from the given dataframe to make it easy to compare with * other dataframe. diff --git a/src/test/scala/com/microsoft/hyperspace/index/covering/JoinIndexRuleTest.scala b/src/test/scala/com/microsoft/hyperspace/index/covering/JoinIndexRuleTest.scala index fcb307436..014322b8d 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/covering/JoinIndexRuleTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/covering/JoinIndexRuleTest.scala @@ -128,7 +128,7 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper { verifyUpdatedIndex(originalPlan, updatedPlan, indexPaths) } - test("Join rule doesn't update plan if it's broadcast join.") { + test("Join rule doesn't update plan if it's not SortMergeJoin.") { withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "10241024") { val joinCondition = EqualTo(t1c1, t2c1) val originalPlan = @@ -140,7 +140,8 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper { val reasons = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) assert(reasons.isDefined) val msg = reasons.get.map(_.verboseStr) - assert(msg.exists(_.contains("Join condition is not eligible. Reason: Not SortMergeJoin"))) + assert( + msg.exists(_.contains("Join condition is not eligible. Reason: Not SortMergeJoin"))) } } } @@ -160,7 +161,8 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper { test("Join rule does not update plan if index location is not set.") { withSQLConf(IndexConstants.INDEX_SYSTEM_PATH -> "") { val joinCondition = EqualTo(t1c1, t2c1) - val originalPlan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + val originalPlan = + Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) try { applyJoinIndexRuleHelper(originalPlan) diff --git a/src/test/scala/com/microsoft/hyperspace/index/covering/JoinIndexV2RuleTest.scala b/src/test/scala/com/microsoft/hyperspace/index/covering/JoinIndexV2RuleTest.scala new file mode 100644 index 000000000..41cc8f19b --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/covering/JoinIndexV2RuleTest.scala @@ -0,0 +1,613 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.covering + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.{JoinType, SQLHelper} +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.types.{IntegerType, StringType} + +import com.microsoft.hyperspace.actions.Constants +import com.microsoft.hyperspace.index._ +import com.microsoft.hyperspace.index.rules.CandidateIndexCollector +import com.microsoft.hyperspace.shim.{JoinWithoutHint => Join} +import com.microsoft.hyperspace.util.{FileUtils, SparkTestShims} + +class JoinIndexV2RuleTest extends HyperspaceRuleSuite with SQLHelper { + override val indexLocationDirName = "joinIndexV2RuleTest" + + val t1c1 = AttributeReference("t1c1", IntegerType)() + val t1c2 = AttributeReference("t1c2", StringType)() + val t1c3 = AttributeReference("t1c3", IntegerType)() + val t1c4 = AttributeReference("t1c4", StringType)() + val t2c1 = AttributeReference("t2c1", IntegerType)() + val t2c2 = AttributeReference("t2c2", StringType)() + val t2c3 = AttributeReference("t2c3", IntegerType)() + val t2c4 = AttributeReference("t2c4", StringType)() + + val t1Schema = schemaFromAttributes(t1c1, t1c2, t1c3, t1c4) + val t2Schema = schemaFromAttributes(t2c1, t2c2, t2c3, t2c4) + + var t1Relation: HadoopFsRelation = _ + var t2Relation: HadoopFsRelation = _ + var t1ScanNode: LogicalRelation = _ + var t2ScanNode: LogicalRelation = _ + var t1FilterNode: Filter = _ + var t2FilterNode: Filter = _ + var t1ProjectNode: Project = _ + var t2ProjectNode: Project = _ + + /** + * Test Setup: + * + * The basic scenario tested here is a [[Join]] logical plan node which consists of two children + * Left child is a [[Project]] -> [[Filter]] -> [[LogicalRelation]] subplan which reads data + * from files on disk. + * Right child is also a [[Project]] -> [[Filter]] -> [[LogicalRelation]] subplan same as left. + * + * If the Join node satisfies the requirements for [[JoinIndexRule]], the plan must get updated + * to use available indexes from the system. If not, the plan should remain unaffected on + * application of the rule. + */ + override def beforeAll(): Unit = { + super.beforeAll() + spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) + + val t1Location = + new InMemoryFileIndex(spark, Seq(new Path("t1")), Map.empty, Some(t1Schema), NoopCache) + val t2Location = + new InMemoryFileIndex(spark, Seq(new Path("t2")), Map.empty, Some(t2Schema), NoopCache) + + t1Relation = baseRelation(t1Location, t1Schema) + t2Relation = baseRelation(t2Location, t2Schema) + + t1ScanNode = LogicalRelation(t1Relation, Seq(t1c1, t1c2, t1c3, t1c4), None, false) + t2ScanNode = LogicalRelation(t2Relation, Seq(t2c1, t2c2, t2c3, t2c4), None, false) + + t1FilterNode = Filter(IsNotNull(t1c1), t1ScanNode) + t2FilterNode = Filter(IsNotNull(t2c1), t2ScanNode) + + t1ProjectNode = Project(Seq(t1c1, t1c3), t1FilterNode) + // Project [t1c1#0, t1c3#2] + // +- Filter isnotnull(t1c1#0) + // +- Relation[t1c1#0,t1c2#1,t1c3#2,t1c4#3] parquet + + t2ProjectNode = Project(Seq(t2c1, t2c3), t2FilterNode) + // Project [t2c1#4, t2c3#6] + // +- Filter isnotnull(t2c1#4) + // +- Relation[t2c1#4,t2c2#5,t2c3#6,t2c4#7] parquet + + createIndexLogEntry("t1i1", Seq(t1c1), Seq(t1c3), t1ProjectNode) + createIndexLogEntry("t1i2", Seq(t1c1, t1c2), Seq(t1c3), t1ProjectNode) + createIndexLogEntry("t1i3", Seq(t1c2), Seq(t1c3), t1ProjectNode) + createIndexLogEntry("t2i1", Seq(t2c1), Seq(t2c3), t2ProjectNode) + createIndexLogEntry("t2i2", Seq(t2c1, t2c2), Seq(t2c3), t2ProjectNode) + } + + before { + clearCache() + } + + def applyJoinIndexRuleHelper( + plan: LogicalPlan, + allIndexes: Seq[IndexLogEntry] = Nil): (LogicalPlan, Int) = { + val indexes = if (allIndexes.isEmpty) { + IndexCollectionManager(spark).getIndexes(Seq(Constants.States.ACTIVE)) + } else { + allIndexes.foreach(_.setTagValue(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED, true)) + allIndexes + } + val candidateIndexes = CandidateIndexCollector(plan, indexes) + JoinIndexV2Rule.apply(plan, candidateIndexes) + } + + test("Join v2 rule works if indexes exist and configs are set correctly.") { + val joinCondition = EqualTo(t1c1, t2c1) + val originalPlan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + val (updatedPlan, _) = applyJoinIndexRuleHelper(originalPlan) + assert(!updatedPlan.equals(originalPlan)) + + // When left and right dataset size is the same, Join v2 rule applies the index for right only. + val indexPaths = Seq(getIndexDataFilesPaths("t2i1")).flatten + verifyUpdatedIndex(originalPlan, updatedPlan, indexPaths) + } + + test("Join v2 rule doesn't update plan if it's not SortMergeJoin.") { + withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "10241024") { + val joinCondition = EqualTo(t1c1, t2c1) + val originalPlan = + Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + val allIndexes = IndexCollectionManager(spark).getIndexes(Seq(Constants.States.ACTIVE)) + val (updatedPlan, _) = applyJoinIndexRuleHelper(originalPlan, allIndexes) + assert(updatedPlan.equals(originalPlan)) + allIndexes.foreach { index => + val reasons = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) + assert(reasons.isDefined) + val msg = reasons.get.map(_.verboseStr) + assert( + msg.exists(_.contains("Join condition is not eligible. Reason: Not SortMergeJoin"))) + } + } + } + + test("Join v2 rule works if indexes exist for case insensitive index and query.") { + val t1c1Caps = t1c1.withName("T1C1") + + val joinCondition = EqualTo(t1c1Caps, t2c1) + val originalPlan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + val (updatedPlan, _) = applyJoinIndexRuleHelper(originalPlan) + assert(!updatedPlan.equals(originalPlan)) + + // When left and right dataset size is the same, Join v2 rule applies the index for right only. + val indexPaths = Seq(getIndexDataFilesPaths("t2i1")).flatten + verifyUpdatedIndex(originalPlan, updatedPlan, indexPaths) + } + + test("Join v2 rule does not update plan if index location is not set.") { + withSQLConf(IndexConstants.INDEX_SYSTEM_PATH -> "") { + val joinCondition = EqualTo(t1c1, t2c1) + val originalPlan = + Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + + try { + applyJoinIndexRuleHelper(originalPlan) + assert(false, "An exception should be thrown.") + } catch { + case e: Throwable => + assert(e.getMessage.contains("Can not create a Path from an empty string")) + } + } + } + + test("Join v2 rule does not update plan if join condition does not exist.") { + val originalPlan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), None) + val allIndexes = IndexCollectionManager(spark).getIndexes(Seq(Constants.States.ACTIVE)) + val (updatedPlan, _) = applyJoinIndexRuleHelper(originalPlan, allIndexes) + assert(updatedPlan.equals(originalPlan)) + allIndexes.foreach { index => + val reasons = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) + assert(reasons.isDefined) + val msg = reasons.get.map(_.verboseStr) + assert(msg.exists(_.contains("Join condition is not eligible. Reason: No join condition"))) + } + } + + test("Join v2 rule does not update plan if join condition is not equality.") { + val joinCondition = GreaterThan(t1c1, t2c1) + val originalPlan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + val allIndexes = IndexCollectionManager(spark).getIndexes(Seq(Constants.States.ACTIVE)) + val (updatedPlan, _) = applyJoinIndexRuleHelper(originalPlan, allIndexes) + assert(updatedPlan.equals(originalPlan)) + allIndexes.foreach { index => + val reasons = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) + assert(reasons.isDefined) + val msg = reasons.get.map(_.verboseStr) + assert( + msg.exists( + _.contains("Join condition is not eligible. Reason: Non equi-join or has literal"))) + } + } + + test("Join v2 rule does not update plan if join condition contains Or.") { + val joinCondition = Or(EqualTo(t1c1, t2c1), EqualTo(t1c2, t2c2)) + val originalPlan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + val allIndexes = IndexCollectionManager(spark).getIndexes(Seq(Constants.States.ACTIVE)) + val (updatedPlan, _) = applyJoinIndexRuleHelper(originalPlan, allIndexes) + assert(updatedPlan.equals(originalPlan)) + allIndexes.foreach { index => + val reasons = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) + assert(reasons.isDefined) + val msg = reasons.get.map(_.verboseStr) + assert( + msg.exists( + _.contains("Join condition is not eligible. Reason: Non equi-join or has literal"))) + } + } + + test("Join v2 rule does not update plan if join condition contains Literals.") { + val joinCondition = EqualTo(t1c2, Literal(10, IntegerType)) + val originalPlan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + val allIndexes = IndexCollectionManager(spark).getIndexes(Seq(Constants.States.ACTIVE)) + val (updatedPlan, _) = applyJoinIndexRuleHelper(originalPlan, allIndexes) + assert(updatedPlan.equals(originalPlan)) + allIndexes.foreach { index => + val reasons = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) + assert(reasons.isDefined) + val msg = reasons.get.map(_.verboseStr) + assert( + msg.exists( + _.contains("Join condition is not eligible. Reason: Non equi-join or has literal"))) + } + } + + test("Join v2 rule should update plan even if index doesn't exist for either table.") { + val t1FilterNode = Filter(IsNotNull(t1c2), t1ScanNode) + val t2FilterNode = Filter(IsNotNull(t2c2), t2ScanNode) + + val t1ProjectNode = Project(Seq(t1c2, t1c3), t1FilterNode) + val t2ProjectNode = Project(Seq(t2c2, t2c3), t2FilterNode) + + // Index exists with t1c2 as indexed columns but not for t2c2. + val joinCondition = EqualTo(t1c2, t2c2) + val originalPlan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + val allIndexes = IndexCollectionManager(spark).getIndexes(Seq(Constants.States.ACTIVE)) + val (updatedPlan, _) = applyJoinIndexRuleHelper(originalPlan, allIndexes) + assert(!updatedPlan.equals(originalPlan)) + + allIndexes.filterNot(_.name.equals("t1i3")).foreach { index => + val reasons = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) + assert(reasons.isDefined) + val msg = reasons.get.map(_.verboseStr) + index.name match { + case "t1i1" => + assert( + msg.toSet.equals( + Set( + "All join condition column and indexed column should be the same. " + + "Join columns: [t1c2], Indexed columns for left subplan: [t1c1]")), + msg) + case "t1i2" => + assert( + msg.toSet.equals( + Set( + "All join condition column and indexed column should be the same. " + + "Join columns: [t1c2], Indexed columns for left subplan: [t1c1,t1c2]")), + msg) + case "t2i1" => + assert( + msg.toSet.equals( + Set( + "All join condition column and indexed column should be the same. " + + "Join columns: [t2c2], Indexed columns for right subplan: [t2c1]")), + msg) + case "t2i2" => + assert( + msg.toSet.equals( + Set( + "All join condition column and indexed column should be the same. " + + "Join columns: [t2c2], Indexed columns for right subplan: [t2c1,t2c2]")), + msg) + } + } + } + + test( + "Join v2 rule does not update plan if index doesn't satisfy included columns.") { + val t1FilterNode = Filter(IsNotNull(t1c1), t1ScanNode) + val t2FilterNode = Filter(IsNotNull(t2c1), t2ScanNode) + + val t1ProjectNode = Project(Seq(t1c1, t1c4), t1FilterNode) + val t2ProjectNode = Project(Seq(t2c1, t2c4), t2FilterNode) + val joinCondition = EqualTo(t1c1, t2c1) + val originalPlan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + // The plan requires t1c4 and t4c4 columns for projection. These columns are not part of any + // index. Since no index satisfies the requirement, the plan should not change. + + val allIndexes = IndexCollectionManager(spark).getIndexes(Seq(Constants.States.ACTIVE)) + val (updatedPlan, _) = applyJoinIndexRuleHelper(originalPlan, allIndexes) + assert(updatedPlan.equals(originalPlan)) + + allIndexes.foreach { index => + val reasons = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) + assert(reasons.isDefined) + val msg = reasons.get.map(_.verboseStr) + + index.name match { + case "t1i1" => + assert( + msg.toSet.equals( + Set( + "Index does not contain required columns for left subplan. " + + "Required indexed columns: [t1c1,t1c4], Indexed columns: [t1c1]")), + msg) + case "t2i1" => + assert( + msg.toSet.equals( + Set( + "Index does not contain required columns for right subplan. " + + "Required indexed columns: [t2c1,t2c4], Indexed columns: [t2c1]")), + msg) + case _ => + } + } + } + + test("Join v2 rule correctly handles implicit output columns.") { + val t1FilterNode = Filter(IsNotNull(t1c1), t1ScanNode) + val t2FilterNode = Filter(IsNotNull(t2c1), t2ScanNode) + val joinCondition = EqualTo(t1c1, t2c1) + + // Implicit output set of columns containing all of the following columns + // t1c1, t1c2, t1c3, t1c4, t2c1, t2c2, t2c3, t2c4. + // The below query is same as + // SELECT * FROM T1, T2 WHERE T1.C1 = T2.C1 + val originalPlan = Join(t1FilterNode, t2FilterNode, JoinType("inner"), Some(joinCondition)) + + { + // Test: should not update plan if no index exist to cover all implicit columns + val allIndexes = IndexCollectionManager(spark).getIndexes(Seq(Constants.States.ACTIVE)) + val (updatedPlan, _) = applyJoinIndexRuleHelper(originalPlan, allIndexes) + assert(updatedPlan.equals(originalPlan)) + } + + { + // Test: should update plan if index exists to cover all implicit columns + val t1TestIndex = + createIndexLogEntry("t1Idx", Seq(t1c1), Seq(t1c2, t1c3, t1c4), t1FilterNode) + + // clear cache so the new indexes gets added to it + clearCache() + + val allIndexes = IndexCollectionManager(spark).getIndexes(Seq(Constants.States.ACTIVE)) + val (updatedPlan, _) = applyJoinIndexRuleHelper(originalPlan, allIndexes) + assert(!updatedPlan.equals(originalPlan)) + + val indexPaths = + Seq(getIndexDataFilesPaths("t1Idx")).flatten + verifyUpdatedIndex(originalPlan, updatedPlan, indexPaths) + + // Cleanup created indexes after test + FileUtils.delete(getIndexRootPath(t1TestIndex.name)) + } + } + + test("Join v2 rule does not update plan if join condition contains aliased column names.") { + val t1c1Alias = Alias(t1c1, "t1c1Alias")() + val t1ProjectNode = Project(Seq(t1c1Alias, t1c3), t1FilterNode) + + val joinCondition = EqualTo(t1c1Alias.toAttribute, t2c1) + val originalPlan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + val (updatedPlan, _) = applyJoinIndexRuleHelper(originalPlan) + assert(updatedPlan.equals(originalPlan)) + } + + test( + "Join v2 rule does not update plan if join condition contains columns from " + + "non-LogicalRelation leaf nodes.") { + // Creating a LocalRelation for join + val localCol1 = AttributeReference("lc1", IntegerType)() + val localCol2 = AttributeReference("lc2", StringType)() + val localData: Seq[Row] = Seq((1, "a"), (2, "b"), (3, "c")).map(Row(_)) + val localRelation: LocalRelation = + LocalRelation.fromExternalRows(Seq(localCol1, localCol2), localData) + val localFilterNode = Filter(IsNotNull(localCol1), localRelation) + val localProjectNode = Project(Seq(localCol1, localCol2), localFilterNode) + + // Here, join condition contains a column from a LocalRelation and one from a LogicalRelation + val joinCondition = EqualTo(t1c1, localCol1) + val originalPlan = + Join(t1ProjectNode, localProjectNode, JoinType("inner"), Some(joinCondition)) + + val (updatedPlan, _) = applyJoinIndexRuleHelper(originalPlan) + assert(updatedPlan.equals(originalPlan)) + } + + test("Join v2 rule updates plan for composite query (AND based Equi-Join).") { + val t1ProjectNode = Project(Seq(t1c1, t1c2, t1c3), t1FilterNode) + val t2ProjectNode = Project(Seq(t2c1, t2c2, t2c3), t2FilterNode) + + // SELECT t1c1, t1c2, t1c3, t2c1, t2c2, t2c3 + // FROM t1, t2 + // WHERE t1c1 = t2c1 and t1c2 = t2c2 + val joinCondition = And(EqualTo(t1c1, t2c1), EqualTo(t1c2, t2c2)) + val originalPlan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + val (updatedPlan, _) = applyJoinIndexRuleHelper(originalPlan) + assert(!updatedPlan.equals(originalPlan)) + + val indexPaths = Seq(getIndexDataFilesPaths("t2i2")).flatten + verifyUpdatedIndex(originalPlan, updatedPlan, indexPaths) + } + + test("Join v2 rule updates plan for composite query with order of predicates changed.") { + val t1ProjectNode = Project(Seq(t1c1, t1c2, t1c3), t1FilterNode) + val t2ProjectNode = Project(Seq(t2c1, t2c2, t2c3), t2FilterNode) + + // SELECT t1c1, t1c2, t1c3, t2c1, t2c2, t2c3 + // FROM t1, t2 + // WHERE t1c2 = t2c2 and t1c1 = t2c1 >> order of predicates changed. The rule should make sure + // if any usable index can be found irrespective of order of predicates + val joinCondition = And(EqualTo(t1c2, t2c2), EqualTo(t1c1, t2c1)) + val originalPlan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + val (updatedPlan, _) = applyJoinIndexRuleHelper(originalPlan) + assert(!updatedPlan.equals(originalPlan)) + + val indexPaths = Seq(getIndexDataFilesPaths("t2i2")).flatten + verifyUpdatedIndex(originalPlan, updatedPlan, indexPaths) + } + + test("Join v2 rule updates plan for composite query with swapped attributes.") { + val t1ProjectNode = Project(Seq(t1c1, t1c2, t1c3), t1FilterNode) + val t2ProjectNode = Project(Seq(t2c1, t2c2, t2c3), t2FilterNode) + + // SELECT t1c1, t1c2, t1c3, t2c1, t2c2, t2c3 + // FROM t1, t2 + // WHERE t1c1 = t2c1 and t2c2 = t1c2 >> Swapped order of query columns + val joinCondition = And(EqualTo(t1c1, t2c1), EqualTo(t2c2, t1c2)) + val originalPlan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + val (updatedPlan, _) = applyJoinIndexRuleHelper(originalPlan) + assert(!updatedPlan.equals(originalPlan)) + + val indexPaths = Seq(getIndexDataFilesPaths("t2i2")).flatten + verifyUpdatedIndex(originalPlan, updatedPlan, indexPaths) + } + + test("Join v2 rule doesn't update plan if columns don't have one-to-one mapping.") { + val t1ProjectNode = Project(Seq(t1c1, t1c2, t1c3), t1FilterNode) + val t2ProjectNode = Project(Seq(t2c1, t2c2, t2c3), t2FilterNode) + + { + // SELECT t1c1, t1c2, t1c3, t2c1, t2c2, t2c3 + // FROM t1, t2 + // WHERE t1c1 = t2c1 and t1c1 = t2c2 >> t1c1 compared against both t2c1 and t2c2 + val joinCondition = And(EqualTo(t1c1, t2c1), EqualTo(t1c1, t2c2)) + val originalPlan = + Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + val allIndexes = IndexCollectionManager(spark).getIndexes(Seq(Constants.States.ACTIVE)) + val (updatedPlan, _) = applyJoinIndexRuleHelper(originalPlan, allIndexes) + assert(updatedPlan.equals(originalPlan)) + allIndexes.foreach { index => + val reasons = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS) + assert(reasons.isDefined) + val msg = reasons.get.map(_.verboseStr) + assert( + msg.size == 1 && msg.head.contains( + "Join condition is not eligible. Reason: incompatible left and right join columns"), + msg) + } + } + { + // SELECT t1c1, t1c2, t1c3, t2c1, t2c2, t2c3 + // FROM t1, t2 + // WHERE t1c1 = t2c1 and t1c2 = t2c1 >> t2c1 compared against both t1c1 and t1c2 + val joinCondition = And(EqualTo(t1c1, t2c1), EqualTo(t1c2, t2c1)) + val originalPlan = + Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + val (updatedPlan, _) = applyJoinIndexRuleHelper(originalPlan) + assert(updatedPlan.equals(originalPlan)) + } + } + + test( + "Join v2 rule updates plan if columns have one-to-one mapping with repeated " + + "case-insensitive predicates.") { + val t1ProjectNode = Project(Seq(t1c1, t1c3), t1FilterNode) + val t2ProjectNode = Project(Seq(t2c1, t2c3), t2FilterNode) + + val t1c1Caps = t1c1.withName("T1C1") + val t2c1Caps = t2c1.withName("T2C1") + + val joinCondition = And(EqualTo(t1c1, t2c1), EqualTo(t1c1Caps, t2c1Caps)) + val originalPlan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + val (updatedPlan, _) = applyJoinIndexRuleHelper(originalPlan) + assert(!updatedPlan.equals(originalPlan)) + + val indexPaths = Seq(getIndexDataFilesPaths("t2i1")).flatten + verifyUpdatedIndex(originalPlan, updatedPlan, indexPaths) + } + + test("Join v2 rule updates plan for composite query for repeated predicates.") { + val t1ProjectNode = Project(Seq(t1c1, t1c2, t1c3), t1FilterNode) + val t2ProjectNode = Project(Seq(t2c1, t2c2, t2c3), t2FilterNode) + + // SELECT t1c1, t1c2, t1c3, t2c1, t2c2, t2c3 + // FROM t1, t2 + // WHERE t1c1 = t2c1 and t1c1 = t2c2 and t1c1 = t2c1 >> one predicate repeated twice + val joinCondition = And(And(EqualTo(t1c1, t2c1), EqualTo(t1c2, t2c2)), EqualTo(t1c1, t2c1)) + val originalPlan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + val (updatedPlan, _) = applyJoinIndexRuleHelper(originalPlan) + assert(!updatedPlan.equals(originalPlan)) + + val indexPaths = Seq(getIndexDataFilesPaths("t2i2")).flatten + verifyUpdatedIndex(originalPlan, updatedPlan, indexPaths) + } + + test("Join v2 rule doesn't update plan if columns don't belong to either side of join node.") { + val t1ProjectNode = Project(Seq(t1c1, t1c2, t1c3), t1FilterNode) + val t2ProjectNode = Project(Seq(t2c1, t2c2, t2c3), t2FilterNode) + + // SELECT t1c1, t1c2, t1c3, t2c1, t2c2, t2c3 + // FROM t1, t2 + // WHERE t1c1 = t1c2 and t1c1 = t2c2 >> two columns of t1 compared against each other + val joinCondition = And(EqualTo(t1c1, t1c2), EqualTo(t1c2, t2c2)) + val originalPlan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + val (updatedPlan, _) = applyJoinIndexRuleHelper(originalPlan) + assert(updatedPlan.equals(originalPlan)) + } + + test( + "Join v2 rule updates plan if condition attributes contain 'qualifier' " + + "but base table attributes don't.") { + + // Attributes same as the base data source, qualified with table names (e.g. from a table name + // from the catalog) + val t1c1Qualified = t1c1.copy()(t1c1.exprId, Seq("Table1")) + val t2c1Qualified = t2c1.copy()(t2c1.exprId, Seq("Table2")) + + val joinCondition = EqualTo(t1c1Qualified, t2c1Qualified) + val originalPlan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + val (updatedPlan, _) = applyJoinIndexRuleHelper(originalPlan) + assert(!updatedPlan.equals(originalPlan)) + + val indexPaths = Seq(getIndexDataFilesPaths("t2i1")).flatten + verifyUpdatedIndex(originalPlan, updatedPlan, indexPaths) + } + + test("Join v2 rule is not applied for modified plan.") { + val joinCondition = EqualTo(t1c1, t2c1) + val plan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + + { + val (updatedPlan, _) = applyJoinIndexRuleHelper(plan) + assert(!updatedPlan.equals(plan)) + } + + // Mark the relation that the rule is applied and verify the plan does not change. + val newPlan = plan transform { + case r @ LogicalRelation(h: HadoopFsRelation, _, _, _) => + r.copy(relation = h.copy(options = Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark)) + } + + { + val (updatedPlan, _) = applyJoinIndexRuleHelper(newPlan) + assert(updatedPlan.equals(newPlan)) + } + } + + private def verifyUpdatedIndex( + originalPlan: Join, + updatedPlan: LogicalPlan, + indexPaths: Seq[Path]): Unit = { + assert(treeStructureEquality(originalPlan, updatedPlan)) + assert(basePaths(updatedPlan) == indexPaths) + } + + /** method to check if tree structures of two logical plans are the same. */ + private def treeStructureEquality(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = { + val originalNodeCount = plan1.treeString.split("\n").length + val updatedNodeCount = plan2.treeString.split("\n").length + + if (originalNodeCount == updatedNodeCount) { + import SparkTestShims.Implicits._ + (0 until originalNodeCount).forall { i => + plan1(i) match { + // for LogicalRelation, we just check if the updated also has LogicalRelation. If the + // updated plan uses index, the root paths will be different here + case _: LogicalRelation => plan2(i).isInstanceOf[LogicalRelation] + + // for other node types, we compare exact matching between original and updated plans + case node => node.simpleStringFull.equals(plan2(i).simpleStringFull) + } + } + } else { + false + } + } + + /** Returns all root paths from a logical plan */ + private def basePaths(plan: LogicalPlan): Seq[Path] = { + plan + .collectLeaves() + .collect { + case LogicalRelation(HadoopFsRelation(location, _, _, Some(_), _, _), _, _, _) => + location.rootPaths + } + .flatten + } +}