aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-12-22 19:35:09 +0100
committerHerman van Hovell <hvanhovell@databricks.com>2016-12-22 19:35:09 +0100
commit2615100055860faa5f74d3711d4d15ebae6aba25 (patch)
treea755b2fec9ec5371dd77c4c6cca4542540eac75b /sql/catalyst/src
parent76622c661fcae81eb0352c61f54a2e9e21a4fb98 (diff)
downloadspark-2615100055860faa5f74d3711d4d15ebae6aba25.tar.gz
spark-2615100055860faa5f74d3711d4d15ebae6aba25.tar.bz2
spark-2615100055860faa5f74d3711d4d15ebae6aba25.zip
[SPARK-18973][SQL] Remove SortPartitions and RedistributeData
## What changes were proposed in this pull request? SortPartitions and RedistributeData logical operators are not actually used and can be removed. Note that we do have a Sort operator (with global flag false) that subsumed SortPartitions. ## How was this patch tested? Also updated test cases to reflect the removal. Author: Reynold Xin <rxin@databricks.com> Closes #16381 from rxin/SPARK-18973.
Diffstat (limited to 'sql/catalyst/src')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala22
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala49
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala1
7 files changed, 26 insertions, 54 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 58f98d529a..73e92066b8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1116,7 +1116,7 @@ class Analyzer(
case p: Sort =>
failOnOuterReference(p)
p
- case p: RedistributeData =>
+ case p: RepartitionByExpression =>
failOnOuterReference(p)
p
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index 60d9881ac9..053c8eb617 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -166,7 +166,7 @@ object UnsupportedOperationChecker {
case GlobalLimit(_, _) | LocalLimit(_, _) if subPlan.children.forall(_.isStreaming) =>
throwError("Limits are not supported on streaming DataFrames/Datasets")
- case Sort(_, _, _) | SortPartitions(_, _) if !containsCompleteData(subPlan) =>
+ case Sort(_, _, _) if !containsCompleteData(subPlan) =>
throwError("Sorting is not supported on streaming DataFrames/Datasets, unless it is on" +
"aggregated DataFrame/Dataset in Complete output mode")
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 75d9997582..dfd66aac2d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -796,7 +796,7 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
case _: Distinct => true
case _: Generate => true
case _: Pivot => true
- case _: RedistributeData => true
+ case _: RepartitionByExpression => true
case _: Repartition => true
case _: ScriptTransformation => true
case _: Sort => true
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
index 6958398e03..949ccdcb45 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
@@ -489,7 +489,7 @@ object FoldablePropagation extends Rule[LogicalPlan] {
case _: AppendColumns => true
case _: AppendColumnsWithObject => true
case _: BroadcastHint => true
- case _: RedistributeData => true
+ case _: RepartitionByExpression => true
case _: Repartition => true
case _: Sort => true
case _: TypedFilter => true
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 0de5aa8a93..d583fa31b0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -765,6 +765,28 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan)
}
/**
+ * This method repartitions data using [[Expression]]s into `numPartitions`, and receives
+ * information about the number of partitions during execution. Used when a specific ordering or
+ * distribution is expected by the consumer of the query result. Use [[Repartition]] for RDD-like
+ * `coalesce` and `repartition`.
+ * If `numPartitions` is not specified, the number of partitions will be the number set by
+ * `spark.sql.shuffle.partitions`.
+ */
+case class RepartitionByExpression(
+ partitionExpressions: Seq[Expression],
+ child: LogicalPlan,
+ numPartitions: Option[Int] = None) extends UnaryNode {
+
+ numPartitions match {
+ case Some(n) => require(n > 0, s"Number of partitions ($n) must be positive.")
+ case None => // Ok
+ }
+
+ override def maxRows: Option[Long] = child.maxRows
+ override def output: Seq[Attribute] = child.output
+}
+
+/**
* A relation with one row. This is used in "SELECT ..." without a from clause.
*/
case object OneRowRelation extends LeafNode {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala
deleted file mode 100644
index 28cbce8748..0000000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.plans.logical
-
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, SortOrder}
-
-/**
- * Performs a physical redistribution of the data. Used when the consumer of the query
- * result have expectations about the distribution and ordering of partitioned input data.
- */
-abstract class RedistributeData extends UnaryNode {
- override def output: Seq[Attribute] = child.output
-}
-
-case class SortPartitions(sortExpressions: Seq[SortOrder], child: LogicalPlan)
- extends RedistributeData
-
-/**
- * This method repartitions data using [[Expression]]s into `numPartitions`, and receives
- * information about the number of partitions during execution. Used when a specific ordering or
- * distribution is expected by the consumer of the query result. Use [[Repartition]] for RDD-like
- * `coalesce` and `repartition`.
- * If `numPartitions` is not specified, the number of partitions will be the number set by
- * `spark.sql.shuffle.partitions`.
- */
-case class RepartitionByExpression(
- partitionExpressions: Seq[Expression],
- child: LogicalPlan,
- numPartitions: Option[Int] = None) extends RedistributeData {
- numPartitions match {
- case Some(n) => require(n > 0, s"Number of partitions ($n) must be positive.")
- case None => // Ok
- }
-}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index 94a008f4f6..d2c0f8cc9f 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -213,7 +213,6 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
// Other unary operations
- testUnaryOperatorInStreamingPlan("sort partitions", SortPartitions(Nil, _), expectedMsg = "sort")
testUnaryOperatorInStreamingPlan(
"sample", Sample(0.1, 1, true, 1L, _)(), expectedMsg = "sampling")
testUnaryOperatorInStreamingPlan(