aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2015-04-16 21:49:26 -0500
committerReynold Xin <rxin@databricks.com>2015-04-16 21:49:26 -0500
commit8220d5265f1bbea9dfdaeec4f2d06d7fe24c0bc3 (patch)
treee580dcd6be4715df0462b944493811df77171801
parente5949c287ed19e78b6eecc61c3e88a07ad452eb9 (diff)
downloadspark-8220d5265f1bbea9dfdaeec4f2d06d7fe24c0bc3.tar.gz
spark-8220d5265f1bbea9dfdaeec4f2d06d7fe24c0bc3.tar.bz2
spark-8220d5265f1bbea9dfdaeec4f2d06d7fe24c0bc3.zip
[SPARK-6972][SQL] Add Coalesce to DataFrame
Author: Michael Armbrust <michael@databricks.com> Closes #5545 from marmbrus/addCoalesce and squashes the following commits: 9fdf3f6 [Michael Armbrust] [SPARK-6972][SQL] Add Coalesce to DataFrame
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala8
3 files changed, 24 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 3235f85d5b..17c21f6e3a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -909,6 +909,20 @@ class DataFrame private[sql](
}
/**
+ * Returns a new [[DataFrame]] that has exactly `numPartitions` partitions.
+ * Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g.
+ * if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of
+ * the 100 new partitions will claim 10 of the current partitions.
+ * @group rdd
+ */
+ override def coalesce(numPartitions: Int): DataFrame = {
+ sqlContext.createDataFrame(
+ queryExecution.toRdd.coalesce(numPartitions),
+ schema,
+ needsConversion = false)
+ }
+
+ /**
* Returns a new [[DataFrame]] that contains only the unique rows from this [[DataFrame]].
* @group dfops
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala b/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala
index ba4373f012..63dbab1994 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala
@@ -61,5 +61,7 @@ private[sql] trait RDDApi[T] {
def repartition(numPartitions: Int): DataFrame
+ def coalesce(numPartitions: Int): DataFrame
+
def distinct: DataFrame
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 44a7d1e7bb..3250ab476a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -178,6 +178,14 @@ class DataFrameSuite extends QueryTest {
testData.select('key).collect().toSeq)
}
+ test("coalesce") {
+ assert(testData.select('key).coalesce(1).rdd.partitions.size === 1)
+
+ checkAnswer(
+ testData.select('key).coalesce(1).select('key),
+ testData.select('key).collect().toSeq)
+ }
+
test("groupBy") {
checkAnswer(
testData2.groupBy("a").agg($"a", sum($"b")),