aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-07-14 18:55:34 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-07-14 18:55:34 -0700
commitcc57d705e732aefc2f3d3f438e84d71705b2eb65 (patch)
tree12a6b2b2a2cb580bea942765e907a23e0333d96c /sql
parente965a798d09a9fba61b104c5cc0b65cdc28d27f6 (diff)
downloadspark-cc57d705e732aefc2f3d3f438e84d71705b2eb65.tar.gz
spark-cc57d705e732aefc2f3d3f438e84d71705b2eb65.tar.bz2
spark-cc57d705e732aefc2f3d3f438e84d71705b2eb65.zip
[SPARK-9050] [SQL] Remove unused newOrdering argument from Exchange (cleanup after SPARK-8317)
SPARK-8317 changed the SQL Exchange operator so that it no longer pushed sorting into Spark's shuffle layer, a change which allowed more efficient SQL-specific sorters to be used. This patch performs some leftover cleanup based on those changes: - Exchange's constructor should no longer accept a `newOrdering` since it's no longer used and no longer works as expected. - `addOperatorsIfNecessary` looked at shuffle input's output ordering to decide whether to sort, but this is the wrong node to be examining: it needs to look at whether the post-shuffle node has the right ordering, since shuffling will not preserve row orderings. Thanks to davies for spotting this. Author: Josh Rosen <joshrosen@databricks.com> Closes #7407 from JoshRosen/SPARK-9050 and squashes the following commits: e70be50 [Josh Rosen] No need to wrap line e866494 [Josh Rosen] Refactor addOperatorsIfNecessary to make code clearer 2e467da [Josh Rosen] Remove `newOrdering` from Exchange.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala37
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala3
2 files changed, 16 insertions, 24 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 4b783e30d9..feea4f239c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -35,21 +35,13 @@ import org.apache.spark.{HashPartitioner, Partitioner, RangePartitioner, SparkEn
/**
* :: DeveloperApi ::
- * Performs a shuffle that will result in the desired `newPartitioning`. Optionally sorts each
- * resulting partition based on expressions from the partition key. It is invalid to construct an
- * exchange operator with a `newOrdering` that cannot be calculated using the partitioning key.
+ * Performs a shuffle that will result in the desired `newPartitioning`.
*/
@DeveloperApi
-case class Exchange(
- newPartitioning: Partitioning,
- newOrdering: Seq[SortOrder],
- child: SparkPlan)
- extends UnaryNode {
+case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends UnaryNode {
override def outputPartitioning: Partitioning = newPartitioning
- override def outputOrdering: Seq[SortOrder] = newOrdering
-
override def output: Seq[Attribute] = child.output
/**
@@ -279,23 +271,24 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
partitioning: Partitioning,
rowOrdering: Seq[SortOrder],
child: SparkPlan): SparkPlan = {
- val needSort = rowOrdering.nonEmpty && child.outputOrdering != rowOrdering
- val needsShuffle = child.outputPartitioning != partitioning
- val withShuffle = if (needsShuffle) {
- Exchange(partitioning, Nil, child)
- } else {
- child
+ def addShuffleIfNecessary(child: SparkPlan): SparkPlan = {
+ if (child.outputPartitioning != partitioning) {
+ Exchange(partitioning, child)
+ } else {
+ child
+ }
}
- val withSort = if (needSort) {
- sqlContext.planner.BasicOperators.getSortOperator(
- rowOrdering, global = false, withShuffle)
- } else {
- withShuffle
+ def addSortIfNecessary(child: SparkPlan): SparkPlan = {
+ if (rowOrdering.nonEmpty && child.outputOrdering != rowOrdering) {
+ sqlContext.planner.BasicOperators.getSortOperator(rowOrdering, global = false, child)
+ } else {
+ child
+ }
}
- withSort
+ addSortIfNecessary(addShuffleIfNecessary(child))
}
if (meetsRequirements && compatible && !needsAnySort) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index ce25af58b6..73b463471e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -360,8 +360,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.OneRowRelation =>
execution.PhysicalRDD(Nil, singleRowRdd) :: Nil
case logical.RepartitionByExpression(expressions, child) =>
- execution.Exchange(
- HashPartitioning(expressions, numPartitions), Nil, planLater(child)) :: Nil
+ execution.Exchange(HashPartitioning(expressions, numPartitions), planLater(child)) :: Nil
case e @ EvaluatePython(udf, child, _) =>
BatchPythonEvaluation(udf, e.output, planLater(child)) :: Nil
case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd) :: Nil