diff options
author | Michael Armbrust <michael@databricks.com> | 2015-04-16 21:49:26 -0500 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-04-16 21:49:26 -0500 |
commit | 8220d5265f1bbea9dfdaeec4f2d06d7fe24c0bc3 (patch) | |
tree | e580dcd6be4715df0462b944493811df77171801 /sql/core | |
parent | e5949c287ed19e78b6eecc61c3e88a07ad452eb9 (diff) | |
download | spark-8220d5265f1bbea9dfdaeec4f2d06d7fe24c0bc3.tar.gz spark-8220d5265f1bbea9dfdaeec4f2d06d7fe24c0bc3.tar.bz2 spark-8220d5265f1bbea9dfdaeec4f2d06d7fe24c0bc3.zip |
[SPARK-6972][SQL] Add Coalesce to DataFrame
Author: Michael Armbrust <michael@databricks.com>
Closes #5545 from marmbrus/addCoalesce and squashes the following commits:
9fdf3f6 [Michael Armbrust] [SPARK-6972][SQL] Add Coalesce to DataFrame
Diffstat (limited to 'sql/core')
3 files changed, 24 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 3235f85d5b..17c21f6e3a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -909,6 +909,20 @@ class DataFrame private[sql]( } /** + * Returns a new [[DataFrame]] that has exactly `numPartitions` partitions. + * Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g. + * if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of + * the 100 new partitions will claim 10 of the current partitions. + * @group rdd + */ + override def coalesce(numPartitions: Int): DataFrame = { + sqlContext.createDataFrame( + queryExecution.toRdd.coalesce(numPartitions), + schema, + needsConversion = false) + } + + /** * Returns a new [[DataFrame]] that contains only the unique rows from this [[DataFrame]]. * @group dfops */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala b/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala index ba4373f012..63dbab1994 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala @@ -61,5 +61,7 @@ private[sql] trait RDDApi[T] { def repartition(numPartitions: Int): DataFrame + def coalesce(numPartitions: Int): DataFrame + def distinct: DataFrame } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 44a7d1e7bb..3250ab476a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -178,6 +178,14 @@ class DataFrameSuite extends QueryTest { testData.select('key).collect().toSeq) } + test("coalesce") { + assert(testData.select('key).coalesce(1).rdd.partitions.size === 1) + + checkAnswer( + testData.select('key).coalesce(1).select('key), + testData.select('key).collect().toSeq) + } + test("groupBy") { checkAnswer( testData2.groupBy("a").agg($"a", sum($"b")), |