aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorFelix Cheung <felixcheung_m@hotmail.com>2017-02-15 10:45:37 -0800
committerFelix Cheung <felixcheung@apache.org>2017-02-15 10:45:37 -0800
commit671bc08ed502815bfa2254c30d64149402acb0c7 (patch)
tree3edcf2548e8f58a6a27db9c16050a3ff1d8ae261 /sql/core
parentc97f4e17de0ce39e8172a5a4ae81f1914816a358 (diff)
downloadspark-671bc08ed502815bfa2254c30d64149402acb0c7.tar.gz
spark-671bc08ed502815bfa2254c30d64149402acb0c7.tar.bz2
spark-671bc08ed502815bfa2254c30d64149402acb0c7.zip
[SPARK-19399][SPARKR] Add R coalesce API for DataFrame and Column
## What changes were proposed in this pull request? Add coalesce on DataFrame for down partitioning without shuffle and coalesce on Column ## How was this patch tested? manual, unit tests Author: Felix Cheung <felixcheung_m@hotmail.com> Closes #16739 from felixcheung/rcoalesce.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala10
2 files changed, 18 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index ce6e8be8b0..6b80ff48bb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2432,7 +2432,15 @@ class Dataset[T] private[sql](
* Returns a new Dataset that has exactly `numPartitions` partitions.
* Similar to coalesce defined on an `RDD`, this operation results in a narrow dependency, e.g.
* if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of
- * the 100 new partitions will claim 10 of the current partitions.
+ * the 100 new partitions will claim 10 of the current partitions. If a larger number of
+ * partitions is requested, it will stay at the current number of partitions.
+ *
+ * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
+ * this may result in your computation taking place on fewer nodes than
+ * you like (e.g. one node in the case of numPartitions = 1). To avoid this,
+ * you can call repartition. This will add a shuffle step, but means the
+ * current upstream partitions will be executed in parallel (per whatever
+ * the current partitioning is).
*
* @group typedrel
* @since 1.6.0
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index 649c21b294..c01f9c5e3d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -541,7 +541,15 @@ case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan {
* Physical plan for returning a new RDD that has exactly `numPartitions` partitions.
* Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g.
* if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of
- * the 100 new partitions will claim 10 of the current partitions.
+ * the 100 new partitions will claim 10 of the current partitions. If a larger number of partitions
+ * is requested, it will stay at the current number of partitions.
+ *
+ * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
+ * this may result in your computation taking place on fewer nodes than
+ * you like (e.g. one node in the case of numPartitions = 1). To avoid this,
+ * you see ShuffleExchange. This will add a shuffle step, but means the
+ * current upstream partitions will be executed in parallel (per whatever
+ * the current partitioning is).
*/
case class CoalesceExec(numPartitions: Int, child: SparkPlan) extends UnaryExecNode {
override def output: Seq[Attribute] = child.output