aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2015-04-28 22:48:04 -0700
committerReynold Xin <rxin@databricks.com>2015-04-28 22:48:04 -0700
commit271c4c621d91d3f610ae89e5d2e5dab1a2009ca6 (patch)
tree47b00efa3b4d3daf1710e0495fc494bdb31661c1 /sql
parent5ef006fc4d010905e02cb905c9115b95ba55282b (diff)
downloadspark-271c4c621d91d3f610ae89e5d2e5dab1a2009ca6.tar.gz
spark-271c4c621d91d3f610ae89e5d2e5dab1a2009ca6.tar.bz2
spark-271c4c621d91d3f610ae89e5d2e5dab1a2009ca6.zip
[SPARK-7215] made coalesce and repartition a part of the query plan
Coalesce and repartition now show up as part of the query plan, rather than resulting in a new `DataFrame`. cc rxin Author: Burak Yavuz <brkyvz@gmail.com> Closes #5762 from brkyvz/df-repartition and squashes the following commits: b1e76dd [Burak Yavuz] added documentation on repartitions 5807e35 [Burak Yavuz] renamed coalescepartitions fa4509f [Burak Yavuz] rename coalesce 2c349b5 [Burak Yavuz] address comments f2e6af1 [Burak Yavuz] add ticks 686c90b [Burak Yavuz] made coalesce and repartition a part of the query plan
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala11
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala14
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala6
6 files changed, 40 insertions, 13 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index bbc94a7ab3..608e272da7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -311,6 +311,17 @@ case class Distinct(child: LogicalPlan) extends UnaryNode {
}
/**
+ * Return a new RDD that has exactly `numPartitions` partitions. Differs from
+ * [[RepartitionByExpression]] as this method is called directly by DataFrame's, because the user
+ * asked for `coalesce` or `repartition`. [[RepartitionByExpression]] is used when the consumer
+ * of the output requires some specific ordering or distribution of the data.
+ */
+case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan)
+ extends UnaryNode {
+ override def output: Seq[Attribute] = child.output
+}
+
+/**
* A relation with one row. This is used in "SELECT ..." without a from clause.
*/
case object OneRowRelation extends LeafNode {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala
index e737418d9c..63df2c1ee7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala
@@ -32,5 +32,11 @@ abstract class RedistributeData extends UnaryNode {
case class SortPartitions(sortExpressions: Seq[SortOrder], child: LogicalPlan)
extends RedistributeData
-case class Repartition(partitionExpressions: Seq[Expression], child: LogicalPlan)
+/**
+ * This method repartitions data using [[Expression]]s, and receives information about the
+ * number of partitions during execution. Used when a specific ordering or distribution is
+ * expected by the consumer of the query result. Use [[Repartition]] for RDD-like
+ * `coalesce` and `repartition`.
+ */
+case class RepartitionByExpression(partitionExpressions: Seq[Expression], child: LogicalPlan)
extends RedistributeData
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index ca6ae482eb..2affba7d42 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -961,9 +961,7 @@ class DataFrame private[sql](
* @group rdd
*/
override def repartition(numPartitions: Int): DataFrame = {
- sqlContext.createDataFrame(
- queryExecution.toRdd.map(_.copy()).repartition(numPartitions),
- schema, needsConversion = false)
+ Repartition(numPartitions, shuffle = true, logicalPlan)
}
/**
@@ -974,10 +972,7 @@ class DataFrame private[sql](
* @group rdd
*/
override def coalesce(numPartitions: Int): DataFrame = {
- sqlContext.createDataFrame(
- queryExecution.toRdd.coalesce(numPartitions),
- schema,
- needsConversion = false)
+ Repartition(numPartitions, shuffle = false, logicalPlan)
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 030ef118f7..3a0a6c8670 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -283,7 +283,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.Distinct(child) =>
execution.Distinct(partial = false,
execution.Distinct(partial = true, planLater(child))) :: Nil
-
+ case logical.Repartition(numPartitions, shuffle, child) =>
+ execution.Repartition(numPartitions, shuffle, planLater(child)) :: Nil
case logical.SortPartitions(sortExprs, child) =>
// This sort only sorts tuples within a partition. Its requiredDistribution will be
// an UnspecifiedDistribution.
@@ -317,7 +318,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
generator, join = join, outer = outer, g.output, planLater(child)) :: Nil
case logical.OneRowRelation =>
execution.PhysicalRDD(Nil, singleRowRdd) :: Nil
- case logical.Repartition(expressions, child) =>
+ case logical.RepartitionByExpression(expressions, child) =>
execution.Exchange(
HashPartitioning(expressions, numPartitions), Nil, planLater(child)) :: Nil
case e @ EvaluatePython(udf, child, _) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index d286fe81be..1afdb40941 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -245,6 +245,20 @@ case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode {
}
}
+/**
+ * :: DeveloperApi ::
+ * Return a new RDD that has exactly `numPartitions` partitions.
+ */
+@DeveloperApi
+case class Repartition(numPartitions: Int, shuffle: Boolean, child: SparkPlan)
+ extends UnaryNode {
+ override def output: Seq[Attribute] = child.output
+
+ override def execute(): RDD[Row] = {
+ child.execute().map(_.copy()).coalesce(numPartitions, shuffle)
+ }
+}
+
/**
* :: DeveloperApi ::
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 0ea6d57b81..2dc6463aba 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -783,13 +783,13 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
case (None, Some(perPartitionOrdering), None, None) =>
Sort(perPartitionOrdering.getChildren.map(nodeToSortOrder), false, withHaving)
case (None, None, Some(partitionExprs), None) =>
- Repartition(partitionExprs.getChildren.map(nodeToExpr), withHaving)
+ RepartitionByExpression(partitionExprs.getChildren.map(nodeToExpr), withHaving)
case (None, Some(perPartitionOrdering), Some(partitionExprs), None) =>
Sort(perPartitionOrdering.getChildren.map(nodeToSortOrder), false,
- Repartition(partitionExprs.getChildren.map(nodeToExpr), withHaving))
+ RepartitionByExpression(partitionExprs.getChildren.map(nodeToExpr), withHaving))
case (None, None, None, Some(clusterExprs)) =>
Sort(clusterExprs.getChildren.map(nodeToExpr).map(SortOrder(_, Ascending)), false,
- Repartition(clusterExprs.getChildren.map(nodeToExpr), withHaving))
+ RepartitionByExpression(clusterExprs.getChildren.map(nodeToExpr), withHaving))
case (None, None, None, None) => withHaving
case _ => sys.error("Unsupported set of ordering / distribution clauses.")
}