diff options
author | Felix Cheung <felixcheung_m@hotmail.com> | 2017-02-15 10:45:37 -0800 |
---|---|---|
committer | Felix Cheung <felixcheung@apache.org> | 2017-02-15 10:45:37 -0800 |
commit | 671bc08ed502815bfa2254c30d64149402acb0c7 (patch) | |
tree | 3edcf2548e8f58a6a27db9c16050a3ff1d8ae261 /sql/core/src/main/scala/org/apache | |
parent | c97f4e17de0ce39e8172a5a4ae81f1914816a358 (diff) | |
download | spark-671bc08ed502815bfa2254c30d64149402acb0c7.tar.gz spark-671bc08ed502815bfa2254c30d64149402acb0c7.tar.bz2 spark-671bc08ed502815bfa2254c30d64149402acb0c7.zip |
[SPARK-19399][SPARKR] Add R coalesce API for DataFrame and Column
## What changes were proposed in this pull request?
Add coalesce on DataFrame for down partitioning without shuffle and coalesce on Column
## How was this patch tested?
manual, unit tests
Author: Felix Cheung <felixcheung_m@hotmail.com>
Closes #16739 from felixcheung/rcoalesce.
Diffstat (limited to 'sql/core/src/main/scala/org/apache')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 10 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala | 10 |
2 files changed, 18 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index ce6e8be8b0..6b80ff48bb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2432,7 +2432,15 @@ class Dataset[T] private[sql]( * Returns a new Dataset that has exactly `numPartitions` partitions. * Similar to coalesce defined on an `RDD`, this operation results in a narrow dependency, e.g. * if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of - * the 100 new partitions will claim 10 of the current partitions. + * the 100 new partitions will claim 10 of the current partitions. If a larger number of + * partitions is requested, it will stay at the current number of partitions. + * + * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, + * this may result in your computation taking place on fewer nodes than + * you like (e.g. one node in the case of numPartitions = 1). To avoid this, + * you can call repartition. This will add a shuffle step, but means the + * current upstream partitions will be executed in parallel (per whatever + * the current partitioning is). * * @group typedrel * @since 1.6.0 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 649c21b294..c01f9c5e3d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -541,7 +541,15 @@ case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan { * Physical plan for returning a new RDD that has exactly `numPartitions` partitions. * Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g. * if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of - * the 100 new partitions will claim 10 of the current partitions. + * the 100 new partitions will claim 10 of the current partitions. If a larger number of partitions + * is requested, it will stay at the current number of partitions. + * + * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, + * this may result in your computation taking place on fewer nodes than + * you like (e.g. one node in the case of numPartitions = 1). To avoid this, + * you see ShuffleExchange. This will add a shuffle step, but means the + * current upstream partitions will be executed in parallel (per whatever + * the current partitioning is). */ case class CoalesceExec(numPartitions: Int, child: SparkPlan) extends UnaryExecNode { override def output: Seq[Attribute] = child.output |