diff options
author | Reynold Xin <rxin@databricks.com> | 2016-12-22 19:35:09 +0100 |
---|---|---|
committer | Herman van Hovell <hvanhovell@databricks.com> | 2016-12-22 19:35:09 +0100 |
commit | 2615100055860faa5f74d3711d4d15ebae6aba25 (patch) | |
tree | a755b2fec9ec5371dd77c4c6cca4542540eac75b /sql/catalyst/src | |
parent | 76622c661fcae81eb0352c61f54a2e9e21a4fb98 (diff) | |
download | spark-2615100055860faa5f74d3711d4d15ebae6aba25.tar.gz spark-2615100055860faa5f74d3711d4d15ebae6aba25.tar.bz2 spark-2615100055860faa5f74d3711d4d15ebae6aba25.zip |
[SPARK-18973][SQL] Remove SortPartitions and RedistributeData
## What changes were proposed in this pull request?
SortPartitions and RedistributeData logical operators are not actually used and can be removed. Note that we do have a Sort operator (with global flag false) that subsumed SortPartitions.
## How was this patch tested?
Also updated test cases to reflect the removal.
Author: Reynold Xin <rxin@databricks.com>
Closes #16381 from rxin/SPARK-18973.
Diffstat (limited to 'sql/catalyst/src')
7 files changed, 26 insertions, 54 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 58f98d529a..73e92066b8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1116,7 +1116,7 @@ class Analyzer( case p: Sort => failOnOuterReference(p) p - case p: RedistributeData => + case p: RepartitionByExpression => failOnOuterReference(p) p diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index 60d9881ac9..053c8eb617 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -166,7 +166,7 @@ object UnsupportedOperationChecker { case GlobalLimit(_, _) | LocalLimit(_, _) if subPlan.children.forall(_.isStreaming) => throwError("Limits are not supported on streaming DataFrames/Datasets") - case Sort(_, _, _) | SortPartitions(_, _) if !containsCompleteData(subPlan) => + case Sort(_, _, _) if !containsCompleteData(subPlan) => throwError("Sorting is not supported on streaming DataFrames/Datasets, unless it is on" + "aggregated DataFrame/Dataset in Complete output mode") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 75d9997582..dfd66aac2d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -796,7 +796,7 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { case _: Distinct => true case _: Generate => true case _: Pivot => true - case _: RedistributeData => true + case _: RepartitionByExpression => true case _: Repartition => true case _: ScriptTransformation => true case _: Sort => true diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index 6958398e03..949ccdcb45 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -489,7 +489,7 @@ object FoldablePropagation extends Rule[LogicalPlan] { case _: AppendColumns => true case _: AppendColumnsWithObject => true case _: BroadcastHint => true - case _: RedistributeData => true + case _: RepartitionByExpression => true case _: Repartition => true case _: Sort => true case _: TypedFilter => true diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 0de5aa8a93..d583fa31b0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -765,6 +765,28 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan) } /** + * This method repartitions data using [[Expression]]s into `numPartitions`, and receives + * information about the number of partitions during execution. Used when a specific ordering or + * distribution is expected by the consumer of the query result. Use [[Repartition]] for RDD-like + * `coalesce` and `repartition`. + * If `numPartitions` is not specified, the number of partitions will be the number set by + * `spark.sql.shuffle.partitions`. + */ +case class RepartitionByExpression( + partitionExpressions: Seq[Expression], + child: LogicalPlan, + numPartitions: Option[Int] = None) extends UnaryNode { + + numPartitions match { + case Some(n) => require(n > 0, s"Number of partitions ($n) must be positive.") + case None => // Ok + } + + override def maxRows: Option[Long] = child.maxRows + override def output: Seq[Attribute] = child.output +} + +/** * A relation with one row. This is used in "SELECT ..." without a from clause. */ case object OneRowRelation extends LeafNode { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala deleted file mode 100644 index 28cbce8748..0000000000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.plans.logical - -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, SortOrder} - -/** - * Performs a physical redistribution of the data. Used when the consumer of the query - * result have expectations about the distribution and ordering of partitioned input data. - */ -abstract class RedistributeData extends UnaryNode { - override def output: Seq[Attribute] = child.output -} - -case class SortPartitions(sortExpressions: Seq[SortOrder], child: LogicalPlan) - extends RedistributeData - -/** - * This method repartitions data using [[Expression]]s into `numPartitions`, and receives - * information about the number of partitions during execution. Used when a specific ordering or - * distribution is expected by the consumer of the query result. Use [[Repartition]] for RDD-like - * `coalesce` and `repartition`. - * If `numPartitions` is not specified, the number of partitions will be the number set by - * `spark.sql.shuffle.partitions`. - */ -case class RepartitionByExpression( - partitionExpressions: Seq[Expression], - child: LogicalPlan, - numPartitions: Option[Int] = None) extends RedistributeData { - numPartitions match { - case Some(n) => require(n > 0, s"Number of partitions ($n) must be positive.") - case None => // Ok - } -} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index 94a008f4f6..d2c0f8cc9f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -213,7 +213,6 @@ class UnsupportedOperationsSuite extends SparkFunSuite { // Other unary operations - testUnaryOperatorInStreamingPlan("sort partitions", SortPartitions(Nil, _), expectedMsg = "sort") testUnaryOperatorInStreamingPlan( "sample", Sample(0.1, 1, true, 1L, _)(), expectedMsg = "sampling") testUnaryOperatorInStreamingPlan( |