aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorwangzhenhua <wangzhenhua@huawei.com>2017-03-21 08:44:09 -0700
committerXiao Li <gatorsmile@gmail.com>2017-03-21 08:44:09 -0700
commit14865d7ff78db5cf9a3e8626204c8e7ed059c353 (patch)
treecf632b93d086421785da9d662a5896b5a1b23bac /sql/catalyst
parent650d03cfc9a609a2c603f9ced452d03ec8429b0d (diff)
downloadspark-14865d7ff78db5cf9a3e8626204c8e7ed059c353.tar.gz
spark-14865d7ff78db5cf9a3e8626204c8e7ed059c353.tar.bz2
spark-14865d7ff78db5cf9a3e8626204c8e7ed059c353.zip
[SPARK-17080][SQL][FOLLOWUP] Improve documentation, change buildJoin method structure and add a debug log
## What changes were proposed in this pull request? 1. Improve documentation for class `Cost` and `JoinReorderDP` and method `buildJoin()`. 2. Change code structure of `buildJoin()` to make the logic clearer. 3. Add a debug-level log to record information for join reordering, including time cost, the number of items and the number of plans in memo. ## How was this patch tested? Not related. Author: wangzhenhua <wangzhenhua@huawei.com> Closes #17353 from wzhfy/reorderFollow.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala109
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala1
2 files changed, 68 insertions, 42 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
index 521c468fe1..fc37720809 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer
import scala.collection.mutable
+import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeSet, Expression, PredicateHelper}
import org.apache.spark.sql.catalyst.plans.{Inner, InnerLike}
import org.apache.spark.sql.catalyst.plans.logical.{BinaryNode, Join, LogicalPlan, Project}
@@ -51,7 +52,7 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr
}
}
- def reorder(plan: LogicalPlan, output: AttributeSet): LogicalPlan = {
+ private def reorder(plan: LogicalPlan, output: AttributeSet): LogicalPlan = {
val (items, conditions) = extractInnerJoins(plan)
// TODO: Compute the set of star-joins and use them in the join enumeration
// algorithm to prune un-optimal plan choices.
@@ -69,7 +70,7 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr
}
/**
- * Extract consecutive inner joinable items and join conditions.
+ * Extracts items of consecutive inner joins and join conditions.
* This method works for bushy trees and left/right deep trees.
*/
private def extractInnerJoins(plan: LogicalPlan): (Seq[LogicalPlan], Set[Expression]) = {
@@ -119,18 +120,21 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr
* When building m-way joins, we only keep the best plan (with the lowest cost) for the same set
* of m items. E.g., for 3-way joins, we keep only the best plan for items {A, B, C} among
* plans (A J B) J C, (A J C) J B and (B J C) J A.
- *
- * Thus the plans maintained for each level when reordering four items A, B, C, D are as follows:
+ * We also prune cartesian product candidates when building a new plan if there exists no join
+ * condition involving references from both left and right. This pruning strategy significantly
+ * reduces the search space.
+ * E.g., given A J B J C J D with join conditions A.k1 = B.k1 and B.k2 = C.k2 and C.k3 = D.k3,
+ * plans maintained for each level are as follows:
* level 0: p({A}), p({B}), p({C}), p({D})
- * level 1: p({A, B}), p({A, C}), p({A, D}), p({B, C}), p({B, D}), p({C, D})
- * level 2: p({A, B, C}), p({A, B, D}), p({A, C, D}), p({B, C, D})
+ * level 1: p({A, B}), p({B, C}), p({C, D})
+ * level 2: p({A, B, C}), p({B, C, D})
* level 3: p({A, B, C, D})
* where p({A, B, C, D}) is the final output plan.
*
* For cost evaluation, since physical costs for operators are not available currently, we use
* cardinalities and sizes to compute costs.
*/
-object JoinReorderDP extends PredicateHelper {
+object JoinReorderDP extends PredicateHelper with Logging {
def search(
conf: SQLConf,
@@ -138,6 +142,7 @@ object JoinReorderDP extends PredicateHelper {
conditions: Set[Expression],
topOutput: AttributeSet): LogicalPlan = {
+ val startTime = System.nanoTime()
// Level i maintains all found plans for i + 1 items.
// Create the initial plans: each plan is a single item with zero cost.
val itemIndex = items.zipWithIndex
@@ -152,6 +157,10 @@ object JoinReorderDP extends PredicateHelper {
foundPlans += searchLevel(foundPlans, conf, conditions, topOutput)
}
+ val durationInMs = (System.nanoTime() - startTime) / (1000 * 1000)
+ logDebug(s"Join reordering finished. Duration: $durationInMs ms, number of items: " +
+ s"${items.length}, number of plans in memo: ${foundPlans.map(_.size).sum}")
+
// The last level must have one and only one plan, because all items are joinable.
assert(foundPlans.size == items.length && foundPlans.last.size == 1)
foundPlans.last.head._2.plan
@@ -183,18 +192,15 @@ object JoinReorderDP extends PredicateHelper {
}
otherSideCandidates.foreach { otherSidePlan =>
- // Should not join two overlapping item sets.
- if (oneSidePlan.itemIds.intersect(otherSidePlan.itemIds).isEmpty) {
- val joinPlan = buildJoin(oneSidePlan, otherSidePlan, conf, conditions, topOutput)
- if (joinPlan.isDefined) {
- val newJoinPlan = joinPlan.get
+ buildJoin(oneSidePlan, otherSidePlan, conf, conditions, topOutput) match {
+ case Some(newJoinPlan) =>
// Check if it's the first plan for the item set, or it's a better plan than
// the existing one due to lower cost.
val existingPlan = nextLevel.get(newJoinPlan.itemIds)
if (existingPlan.isEmpty || newJoinPlan.betterThan(existingPlan.get, conf)) {
nextLevel.update(newJoinPlan.itemIds, newJoinPlan)
}
- }
+ case None =>
}
}
}
@@ -203,7 +209,17 @@ object JoinReorderDP extends PredicateHelper {
nextLevel.toMap
}
- /** Build a new join node. */
+ /**
+ * Builds a new JoinPlan when both conditions hold:
+ * - the sets of items contained in left and right sides do not overlap.
+ * - there exists at least one join condition involving references from both sides.
+ * @param oneJoinPlan One side JoinPlan for building a new JoinPlan.
+ * @param otherJoinPlan The other side JoinPlan for building a new join node.
+ * @param conf SQLConf for statistics computation.
+ * @param conditions The overall set of join conditions.
+ * @param topOutput The output attributes of the final plan.
+ * @return Builds and returns a new JoinPlan if both conditions hold. Otherwise, returns None.
+ */
private def buildJoin(
oneJoinPlan: JoinPlan,
otherJoinPlan: JoinPlan,
@@ -211,6 +227,11 @@ object JoinReorderDP extends PredicateHelper {
conditions: Set[Expression],
topOutput: AttributeSet): Option[JoinPlan] = {
+ if (oneJoinPlan.itemIds.intersect(otherJoinPlan.itemIds).nonEmpty) {
+ // Should not join two overlapping item sets.
+ return None
+ }
+
val onePlan = oneJoinPlan.plan
val otherPlan = otherJoinPlan.plan
val joinConds = conditions
@@ -220,33 +241,33 @@ object JoinReorderDP extends PredicateHelper {
if (joinConds.isEmpty) {
// Cartesian product is very expensive, so we exclude them from candidate plans.
// This also significantly reduces the search space.
- None
+ return None
+ }
+
+ // Put the deeper side on the left, tend to build a left-deep tree.
+ val (left, right) = if (oneJoinPlan.itemIds.size >= otherJoinPlan.itemIds.size) {
+ (onePlan, otherPlan)
} else {
- // Put the deeper side on the left, tend to build a left-deep tree.
- val (left, right) = if (oneJoinPlan.itemIds.size >= otherJoinPlan.itemIds.size) {
- (onePlan, otherPlan)
+ (otherPlan, onePlan)
+ }
+ val newJoin = Join(left, right, Inner, joinConds.reduceOption(And))
+ val collectedJoinConds = joinConds ++ oneJoinPlan.joinConds ++ otherJoinPlan.joinConds
+ val remainingConds = conditions -- collectedJoinConds
+ val neededAttr = AttributeSet(remainingConds.flatMap(_.references)) ++ topOutput
+ val neededFromNewJoin = newJoin.outputSet.filter(neededAttr.contains)
+ val newPlan =
+ if ((newJoin.outputSet -- neededFromNewJoin).nonEmpty) {
+ Project(neededFromNewJoin.toSeq, newJoin)
} else {
- (otherPlan, onePlan)
+ newJoin
}
- val newJoin = Join(left, right, Inner, joinConds.reduceOption(And))
- val collectedJoinConds = joinConds ++ oneJoinPlan.joinConds ++ otherJoinPlan.joinConds
- val remainingConds = conditions -- collectedJoinConds
- val neededAttr = AttributeSet(remainingConds.flatMap(_.references)) ++ topOutput
- val neededFromNewJoin = newJoin.outputSet.filter(neededAttr.contains)
- val newPlan =
- if ((newJoin.outputSet -- neededFromNewJoin).nonEmpty) {
- Project(neededFromNewJoin.toSeq, newJoin)
- } else {
- newJoin
- }
- val itemIds = oneJoinPlan.itemIds.union(otherJoinPlan.itemIds)
- // Now the root node of onePlan/otherPlan becomes an intermediate join (if it's a non-leaf
- // item), so the cost of the new join should also include its own cost.
- val newPlanCost = oneJoinPlan.planCost + oneJoinPlan.rootCost(conf) +
- otherJoinPlan.planCost + otherJoinPlan.rootCost(conf)
- Some(JoinPlan(itemIds, newPlan, collectedJoinConds, newPlanCost))
- }
+ val itemIds = oneJoinPlan.itemIds.union(otherJoinPlan.itemIds)
+ // Now the root node of onePlan/otherPlan becomes an intermediate join (if it's a non-leaf
+ // item), so the cost of the new join should also include its own cost.
+ val newPlanCost = oneJoinPlan.planCost + oneJoinPlan.rootCost(conf) +
+ otherJoinPlan.planCost + otherJoinPlan.rootCost(conf)
+ Some(JoinPlan(itemIds, newPlan, collectedJoinConds, newPlanCost))
}
/** Map[set of item ids, join plan for these items] */
@@ -278,10 +299,10 @@ object JoinReorderDP extends PredicateHelper {
}
def betterThan(other: JoinPlan, conf: SQLConf): Boolean = {
- if (other.planCost.rows == 0 || other.planCost.size == 0) {
+ if (other.planCost.card == 0 || other.planCost.size == 0) {
false
} else {
- val relativeRows = BigDecimal(this.planCost.rows) / BigDecimal(other.planCost.rows)
+ val relativeRows = BigDecimal(this.planCost.card) / BigDecimal(other.planCost.card)
val relativeSize = BigDecimal(this.planCost.size) / BigDecimal(other.planCost.size)
relativeRows * conf.joinReorderCardWeight +
relativeSize * (1 - conf.joinReorderCardWeight) < 1
@@ -290,7 +311,11 @@ object JoinReorderDP extends PredicateHelper {
}
}
-/** This class defines the cost model. */
-case class Cost(rows: BigInt, size: BigInt) {
- def +(other: Cost): Cost = Cost(this.rows + other.rows, this.size + other.size)
+/**
+ * This class defines the cost model for a plan.
+ * @param card Cardinality (number of rows).
+ * @param size Size in bytes.
+ */
+case class Cost(card: BigInt, size: BigInt) {
+ def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size)
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index b6e0b8ccbe..d5006c1646 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -708,6 +708,7 @@ object SQLConf {
buildConf("spark.sql.cbo.joinReorder.dp.threshold")
.doc("The maximum number of joined nodes allowed in the dynamic programming algorithm.")
.intConf
+ .checkValue(number => number > 0, "The maximum number must be a positive integer.")
.createWithDefault(12)
val JOIN_REORDER_CARD_WEIGHT =