aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-07-30 17:38:48 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-07-30 17:38:48 -0700
commit3c66ff727d4b47220e1ff363cea215189ed64f36 (patch)
tree51e2f06cb4916d1104a8c846d6ee14889931ed39 /sql
parent65fa4181c35135080870c1e4c1f904ada3a8cf59 (diff)
downloadspark-3c66ff727d4b47220e1ff363cea215189ed64f36.tar.gz
spark-3c66ff727d4b47220e1ff363cea215189ed64f36.tar.bz2
spark-3c66ff727d4b47220e1ff363cea215189ed64f36.zip
[SPARK-9489] Remove unnecessary compatibility and requirements checks from Exchange
While reviewing yhuai's patch for SPARK-2205 (#7773), I noticed that Exchange's `compatible` check may be incorrectly returning `false` in many cases. As far as I know, this is not actually a problem because the `compatible`, `meetsRequirements`, and `needsAnySort` checks are serving only as short-circuit performance optimizations that are not necessary for correctness. In order to reduce code complexity, I think that we should remove these checks and unconditionally rewrite the operator's children. This should be safe because we rewrite the tree in a single bottom-up pass. Author: Josh Rosen <joshrosen@databricks.com> Closes #7807 from JoshRosen/SPARK-9489 and squashes the following commits: 9d76ce9 [Josh Rosen] [SPARK-9489] Remove compatibleWith, meetsRequirements, and needsAnySort checks from Exchange
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala35
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala76
2 files changed, 17 insertions, 94 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index 2dcfa19fec..f4d1dbaf28 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -86,14 +86,6 @@ sealed trait Partitioning {
*/
def satisfies(required: Distribution): Boolean
- /**
- * Returns true iff all distribution guarantees made by this partitioning can also be made
- * for the `other` specified partitioning.
- * For example, two [[HashPartitioning HashPartitioning]]s are
- * only compatible if the `numPartitions` of them is the same.
- */
- def compatibleWith(other: Partitioning): Boolean
-
/** Returns the expressions that are used to key the partitioning. */
def keyExpressions: Seq[Expression]
}
@@ -104,11 +96,6 @@ case class UnknownPartitioning(numPartitions: Int) extends Partitioning {
case _ => false
}
- override def compatibleWith(other: Partitioning): Boolean = other match {
- case UnknownPartitioning(_) => true
- case _ => false
- }
-
override def keyExpressions: Seq[Expression] = Nil
}
@@ -117,11 +104,6 @@ case object SinglePartition extends Partitioning {
override def satisfies(required: Distribution): Boolean = true
- override def compatibleWith(other: Partitioning): Boolean = other match {
- case SinglePartition => true
- case _ => false
- }
-
override def keyExpressions: Seq[Expression] = Nil
}
@@ -130,11 +112,6 @@ case object BroadcastPartitioning extends Partitioning {
override def satisfies(required: Distribution): Boolean = true
- override def compatibleWith(other: Partitioning): Boolean = other match {
- case SinglePartition => true
- case _ => false
- }
-
override def keyExpressions: Seq[Expression] = Nil
}
@@ -159,12 +136,6 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
case _ => false
}
- override def compatibleWith(other: Partitioning): Boolean = other match {
- case BroadcastPartitioning => true
- case h: HashPartitioning if h == this => true
- case _ => false
- }
-
override def keyExpressions: Seq[Expression] = expressions
}
@@ -199,11 +170,5 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int)
case _ => false
}
- override def compatibleWith(other: Partitioning): Boolean = other match {
- case BroadcastPartitioning => true
- case r: RangePartitioning if r == this => true
- case _ => false
- }
-
override def keyExpressions: Seq[Expression] = ordering.map(_.child)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 70e5031fb6..6bd57f010a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -202,41 +202,6 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
case operator: SparkPlan =>
- // True iff every child's outputPartitioning satisfies the corresponding
- // required data distribution.
- def meetsRequirements: Boolean =
- operator.requiredChildDistribution.zip(operator.children).forall {
- case (required, child) =>
- val valid = child.outputPartitioning.satisfies(required)
- logDebug(
- s"${if (valid) "Valid" else "Invalid"} distribution," +
- s"required: $required current: ${child.outputPartitioning}")
- valid
- }
-
- // True iff any of the children are incorrectly sorted.
- def needsAnySort: Boolean =
- operator.requiredChildOrdering.zip(operator.children).exists {
- case (required, child) => required.nonEmpty && required != child.outputOrdering
- }
-
- // True iff outputPartitionings of children are compatible with each other.
- // It is possible that every child satisfies its required data distribution
- // but two children have incompatible outputPartitionings. For example,
- // A dataset is range partitioned by "a.asc" (RangePartitioning) and another
- // dataset is hash partitioned by "a" (HashPartitioning). Tuples in these two
- // datasets are both clustered by "a", but these two outputPartitionings are not
- // compatible.
- // TODO: ASSUMES TRANSITIVITY?
- def compatible: Boolean =
- operator.children
- .map(_.outputPartitioning)
- .sliding(2)
- .forall {
- case Seq(a) => true
- case Seq(a, b) => a.compatibleWith(b)
- }
-
// Adds Exchange or Sort operators as required
def addOperatorsIfNecessary(
partitioning: Partitioning,
@@ -269,33 +234,26 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
addSortIfNecessary(addShuffleIfNecessary(child))
}
- if (meetsRequirements && compatible && !needsAnySort) {
- operator
- } else {
- // At least one child does not satisfies its required data distribution or
- // at least one child's outputPartitioning is not compatible with another child's
- // outputPartitioning. In this case, we need to add Exchange operators.
- val requirements =
- (operator.requiredChildDistribution, operator.requiredChildOrdering, operator.children)
+ val requirements =
+ (operator.requiredChildDistribution, operator.requiredChildOrdering, operator.children)
- val fixedChildren = requirements.zipped.map {
- case (AllTuples, rowOrdering, child) =>
- addOperatorsIfNecessary(SinglePartition, rowOrdering, child)
- case (ClusteredDistribution(clustering), rowOrdering, child) =>
- addOperatorsIfNecessary(HashPartitioning(clustering, numPartitions), rowOrdering, child)
- case (OrderedDistribution(ordering), rowOrdering, child) =>
- addOperatorsIfNecessary(RangePartitioning(ordering, numPartitions), rowOrdering, child)
+ val fixedChildren = requirements.zipped.map {
+ case (AllTuples, rowOrdering, child) =>
+ addOperatorsIfNecessary(SinglePartition, rowOrdering, child)
+ case (ClusteredDistribution(clustering), rowOrdering, child) =>
+ addOperatorsIfNecessary(HashPartitioning(clustering, numPartitions), rowOrdering, child)
+ case (OrderedDistribution(ordering), rowOrdering, child) =>
+ addOperatorsIfNecessary(RangePartitioning(ordering, numPartitions), rowOrdering, child)
- case (UnspecifiedDistribution, Seq(), child) =>
- child
- case (UnspecifiedDistribution, rowOrdering, child) =>
- sqlContext.planner.BasicOperators.getSortOperator(rowOrdering, global = false, child)
+ case (UnspecifiedDistribution, Seq(), child) =>
+ child
+ case (UnspecifiedDistribution, rowOrdering, child) =>
+ sqlContext.planner.BasicOperators.getSortOperator(rowOrdering, global = false, child)
- case (dist, ordering, _) =>
- sys.error(s"Don't know how to ensure $dist with ordering $ordering")
- }
-
- operator.withNewChildren(fixedChildren)
+ case (dist, ordering, _) =>
+ sys.error(s"Don't know how to ensure $dist with ordering $ordering")
}
+
+ operator.withNewChildren(fixedChildren)
}
}