aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-08-09 14:26:01 -0700
committerYin Huai <yhuai@databricks.com>2015-08-09 14:26:01 -0700
commit23cf5af08d98da771c41571c00a2f5cafedfebdd (patch)
tree20558c64ea10635a4499668543c0d5552359bed2 /sql/catalyst
parenta863348fd85848e0d4325c4de359da12e5f548d2 (diff)
downloadspark-23cf5af08d98da771c41571c00a2f5cafedfebdd.tar.gz
spark-23cf5af08d98da771c41571c00a2f5cafedfebdd.tar.bz2
spark-23cf5af08d98da771c41571c00a2f5cafedfebdd.zip
[SPARK-9703] [SQL] Refactor EnsureRequirements to avoid certain unnecessary shuffles
This pull request refactors the `EnsureRequirements` planning rule in order to avoid the addition of certain unnecessary shuffles. As an example of how unnecessary shuffles can occur, consider SortMergeJoin, which requires clustered distribution and sorted ordering of its children's input rows. Say that both of SMJ's children produce unsorted output but are both SinglePartition. In this case, we will need to inject sort operators but should not need to inject Exchanges. Unfortunately, it looks like the EnsureRequirements unnecessarily repartitions using a hash partitioning. This patch solves this problem by refactoring `EnsureRequirements` to properly implement the `compatibleWith` checks that were broken in earlier implementations. See the significant inline comments for a better description of how this works. The majority of this PR is new comments and test cases, with few actual changes to the code. Author: Josh Rosen <joshrosen@databricks.com> Closes #7988 from JoshRosen/exchange-fixes and squashes the following commits: 38006e7 [Josh Rosen] Rewrite EnsureRequirements _yet again_ to make things even simpler 0983f75 [Josh Rosen] More guarantees vs. compatibleWith cleanup; delete BroadcastPartitioning. 8784bd9 [Josh Rosen] Giant comment explaining compatibleWith vs. guarantees 1307c50 [Josh Rosen] Update conditions for requiring child compatibility. 18cddeb [Josh Rosen] Rename DummyPlan to DummySparkPlan. 2c7e126 [Josh Rosen] Merge remote-tracking branch 'origin/master' into exchange-fixes fee65c4 [Josh Rosen] Further refinement to comments / reasoning 642b0bb [Josh Rosen] Further expand comment / reasoning 06aba0c [Josh Rosen] Add more comments 8dbc845 [Josh Rosen] Add even more tests. 4f08278 [Josh Rosen] Fix the test by adding the compatibility check to EnsureRequirements a1c12b9 [Josh Rosen] Add failing test to demonstrate allCompatible bug 0725a34 [Josh Rosen] Small assertion cleanup. 5172ac5 [Josh Rosen] Add test for requiresChildrenToProduceSameNumberOfPartitions. 2e0f33a [Josh Rosen] Write a more generic test for EnsureRequirements. 752b8de [Josh Rosen] style fix c628daf [Josh Rosen] Revert accidental ExchangeSuite change. c9fb231 [Josh Rosen] Rewrite exchange to fix better handle this case. adcc742 [Josh Rosen] Move test to PlannerSuite. 0675956 [Josh Rosen] Preserving ordering and partitioning in row format converters also does not help. cc5669c [Josh Rosen] Adding outputPartitioning to Repartition does not fix the test. 2dfc648 [Josh Rosen] Add failing test illustrating bad exchange planning.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala128
1 files changed, 112 insertions, 16 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index ec659ce789..5a89a90b73 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -75,6 +75,37 @@ case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution {
def clustering: Set[Expression] = ordering.map(_.child).toSet
}
+/**
+ * Describes how an operator's output is split across partitions. The `compatibleWith`,
+ * `guarantees`, and `satisfies` methods describe relationships between child partitionings,
+ * target partitionings, and [[Distribution]]s. These relations are described more precisely in
+ * their individual method docs, but at a high level:
+ *
+ * - `satisfies` is a relationship between partitionings and distributions.
+ * - `compatibleWith` is relationships between an operator's child output partitionings.
+ * - `guarantees` is a relationship between a child's existing output partitioning and a target
+ * output partitioning.
+ *
+ * Diagrammatically:
+ *
+ * +--------------+
+ * | Distribution |
+ * +--------------+
+ * ^
+ * |
+ * satisfies
+ * |
+ * +--------------+ +--------------+
+ * | Child | | Target |
+ * +----| Partitioning |----guarantees--->| Partitioning |
+ * | +--------------+ +--------------+
+ * | ^
+ * | |
+ * | compatibleWith
+ * | |
+ * +------------+
+ *
+ */
sealed trait Partitioning {
/** Returns the number of partitions that the data is split across */
val numPartitions: Int
@@ -90,9 +121,66 @@ sealed trait Partitioning {
/**
* Returns true iff we can say that the partitioning scheme of this [[Partitioning]]
* guarantees the same partitioning scheme described by `other`.
+ *
+ * Compatibility of partitionings is only checked for operators that have multiple children
+ * and that require a specific child output [[Distribution]], such as joins.
+ *
+ * Intuitively, partitionings are compatible if they route the same partitioning key to the same
+ * partition. For instance, two hash partitionings are only compatible if they produce the same
+ * number of output partitionings and hash records according to the same hash function and
+ * same partitioning key schema.
+ *
+ * Put another way, two partitionings are compatible with each other if they satisfy all of the
+ * same distribution guarantees.
*/
- // TODO: Add an example once we have the `nullSafe` concept.
- def guarantees(other: Partitioning): Boolean
+ def compatibleWith(other: Partitioning): Boolean
+
+ /**
+ * Returns true iff we can say that the partitioning scheme of this [[Partitioning]] guarantees
+ * the same partitioning scheme described by `other`. If a `A.guarantees(B)`, then repartitioning
+ * the child's output according to `B` will be unnecessary. `guarantees` is used as a performance
+ * optimization to allow the exchange planner to avoid redundant repartitionings. By default,
+ * a partitioning only guarantees partitionings that are equal to itself (i.e. the same number
+ * of partitions, same strategy (range or hash), etc).
+ *
+ * In order to enable more aggressive optimization, this strict equality check can be relaxed.
+ * For example, say that the planner needs to repartition all of an operator's children so that
+ * they satisfy the [[AllTuples]] distribution. One way to do this is to repartition all children
+ * to have the [[SinglePartition]] partitioning. If one of the operator's children already happens
+ * to be hash-partitioned with a single partition then we do not need to re-shuffle this child;
+ * this repartitioning can be avoided if a single-partition [[HashPartitioning]] `guarantees`
+ * [[SinglePartition]].
+ *
+ * The SinglePartition example given above is not particularly interesting; guarantees' real
+ * value occurs for more advanced partitioning strategies. SPARK-7871 will introduce a notion
+ * of null-safe partitionings, under which partitionings can specify whether rows whose
+ * partitioning keys contain null values will be grouped into the same partition or whether they
+ * will have an unknown / random distribution. If a partitioning does not require nulls to be
+ * clustered then a partitioning which _does_ cluster nulls will guarantee the null clustered
+ * partitioning. The converse is not true, however: a partitioning which clusters nulls cannot
+ * be guaranteed by one which does not cluster them. Thus, in general `guarantees` is not a
+ * symmetric relation.
+ *
+ * Another way to think about `guarantees`: if `A.guarantees(B)`, then any partitioning of rows
+ * produced by `A` could have also been produced by `B`.
+ */
+ def guarantees(other: Partitioning): Boolean = this == other
+}
+
+object Partitioning {
+ def allCompatible(partitionings: Seq[Partitioning]): Boolean = {
+ // Note: this assumes transitivity
+ partitionings.sliding(2).map {
+ case Seq(a) => true
+ case Seq(a, b) =>
+ if (a.numPartitions != b.numPartitions) {
+ assert(!a.compatibleWith(b) && !b.compatibleWith(a))
+ false
+ } else {
+ a.compatibleWith(b) && b.compatibleWith(a)
+ }
+ }.forall(_ == true)
+ }
}
case class UnknownPartitioning(numPartitions: Int) extends Partitioning {
@@ -101,6 +189,8 @@ case class UnknownPartitioning(numPartitions: Int) extends Partitioning {
case _ => false
}
+ override def compatibleWith(other: Partitioning): Boolean = false
+
override def guarantees(other: Partitioning): Boolean = false
}
@@ -109,21 +199,9 @@ case object SinglePartition extends Partitioning {
override def satisfies(required: Distribution): Boolean = true
- override def guarantees(other: Partitioning): Boolean = other match {
- case SinglePartition => true
- case _ => false
- }
-}
-
-case object BroadcastPartitioning extends Partitioning {
- val numPartitions = 1
+ override def compatibleWith(other: Partitioning): Boolean = other.numPartitions == 1
- override def satisfies(required: Distribution): Boolean = true
-
- override def guarantees(other: Partitioning): Boolean = other match {
- case BroadcastPartitioning => true
- case _ => false
- }
+ override def guarantees(other: Partitioning): Boolean = other.numPartitions == 1
}
/**
@@ -147,6 +225,12 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
case _ => false
}
+ override def compatibleWith(other: Partitioning): Boolean = other match {
+ case o: HashPartitioning =>
+ this.clusteringSet == o.clusteringSet && this.numPartitions == o.numPartitions
+ case _ => false
+ }
+
override def guarantees(other: Partitioning): Boolean = other match {
case o: HashPartitioning =>
this.clusteringSet == o.clusteringSet && this.numPartitions == o.numPartitions
@@ -185,6 +269,11 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int)
case _ => false
}
+ override def compatibleWith(other: Partitioning): Boolean = other match {
+ case o: RangePartitioning => this == o
+ case _ => false
+ }
+
override def guarantees(other: Partitioning): Boolean = other match {
case o: RangePartitioning => this == o
case _ => false
@@ -229,6 +318,13 @@ case class PartitioningCollection(partitionings: Seq[Partitioning])
partitionings.exists(_.satisfies(required))
/**
+ * Returns true if any `partitioning` of this collection is compatible with
+ * the given [[Partitioning]].
+ */
+ override def compatibleWith(other: Partitioning): Boolean =
+ partitionings.exists(_.compatibleWith(other))
+
+ /**
* Returns true if any `partitioning` of this collection guarantees
* the given [[Partitioning]].
*/