diff options
author | gatorsmile <gatorsmile@gmail.com> | 2015-11-24 15:54:10 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-11-24 15:54:10 -0800 |
commit | 238ae51b66ac12d15fba6aff061804004c5ca6cb (patch) | |
tree | db647d8e9a9527b02e125673eff0848f86001c8e /sql | |
parent | c7f95df5c6d8eb2e6f11cf58b704fea34326a5f2 (diff) | |
download | spark-238ae51b66ac12d15fba6aff061804004c5ca6cb.tar.gz spark-238ae51b66ac12d15fba6aff061804004c5ca6cb.tar.bz2 spark-238ae51b66ac12d15fba6aff061804004c5ca6cb.zip |
[SPARK-11914][SQL] Support coalesce and repartition in Dataset APIs
This PR is to provide two common `coalesce` and `repartition` in Dataset APIs.
After reading the comments of SPARK-9999, I am unclear about the plan for supporting re-partitioning in Dataset APIs. Currently, both RDD APIs and Dataframe APIs provide users such a flexibility to control the number of partitions.
In most traditional RDBMS, they expose the number of partitions, the partitioning columns, the table partitioning methods to DBAs for performance tuning and storage planning. Normally, these parameters could largely affect the query performance. Since the actual performance depends on the workload types, I think it is almost impossible to automate the discovery of the best partitioning strategy for all the scenarios.
I am wondering if Dataset APIs are planning to hide these APIs from users? Feel free to reject my PR if it does not match the plan.
Thank you for your answers. marmbrus rxin cloud-fan
Author: gatorsmile <gatorsmile@gmail.com>
Closes #9899 from gatorsmile/coalesce.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 19 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 15 |
2 files changed, 34 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 0764750842..17e2611790 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -152,6 +152,25 @@ class Dataset[T] private[sql]( */ def count(): Long = toDF().count() + /** + * Returns a new [[Dataset]] that has exactly `numPartitions` partitions. + * @since 1.6.0 + */ + def repartition(numPartitions: Int): Dataset[T] = withPlan { + Repartition(numPartitions, shuffle = true, _) + } + + /** + * Returns a new [[Dataset]] that has exactly `numPartitions` partitions. + * Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g. + * if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of + * the 100 new partitions will claim 10 of the current partitions. + * @since 1.6.0 + */ + def coalesce(numPartitions: Int): Dataset[T] = withPlan { + Repartition(numPartitions, shuffle = false, _) + } + /* *********************** * * Functional Operations * * *********************** */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 13eede1b17..c253fdbb8c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -52,6 +52,21 @@ class DatasetSuite extends QueryTest with SharedSQLContext { assert(ds.takeAsList(1).get(0) == item) } + test("coalesce, repartition") { + val data = (1 to 100).map(i => ClassData(i.toString, i)) + val ds = data.toDS() + + assert(ds.repartition(10).rdd.partitions.length == 10) + checkAnswer( + ds.repartition(10), + data: _*) + + assert(ds.coalesce(1).rdd.partitions.length == 1) + checkAnswer( + ds.coalesce(1), + data: _*) + } + test("as tuple") { val data = Seq(("a", 1), ("b", 2)).toDF("a", "b") checkAnswer( |