aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-10-23 22:13:49 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-10-24 14:31:33 -0700
commit08c1a42d7d9edef02a24a3bc5045b2dce035a93b (patch)
tree19c5f8e3be71eac820320aa84b7e89df27ef26a7 /core
parent1dc776b863663af713920d18cecaf57762c2fd77 (diff)
downloadspark-08c1a42d7d9edef02a24a3bc5045b2dce035a93b.tar.gz
spark-08c1a42d7d9edef02a24a3bc5045b2dce035a93b.tar.bz2
spark-08c1a42d7d9edef02a24a3bc5045b2dce035a93b.zip
Add a `repartition` operator.
This patch adds an operator called repartition with more straightforward semantics than the current `coalesce` operator. There are a few use cases where this operator is useful: 1. If a user wants to increase the number of partitions in the RDD. This is more common now with streaming. E.g. a user is ingesting data on one node but they want to add more partitions to ensure parallelism of subsequent operations across threads or the cluster. Right now they have to call rdd.coalesce(numSplits, shuffle=true) - that's super confusing. 2. If a user has input data where the number of partitions is not known. E.g. > sc.textFile("some file").coalesce(50).... This is both vague semantically (am I growing or shrinking this RDD) but also, may not work correctly if the base RDD has fewer than 50 partitions. The new operator forces shuffles every time, so it will always produce exactly the number of new partitions. It also throws an exception rather than silently not-working if a bad input is passed. I am currently adding streaming tests (requires refactoring some of the test suite to allow testing at partition granularity), so this is not ready for merge yet. But feedback is welcome.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala21
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala33
2 files changed, 54 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 0355618e43..e2652f13c4 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -266,6 +266,27 @@ abstract class RDD[T: ClassManifest](
def distinct(): RDD[T] = distinct(partitions.size)
/**
+ * Return a new RDD that has exactly numPartitions partitions.
+ *
+ * Used to increase or decrease the level of parallelism in this RDD. By default, this will use
+ * a shuffle to redistribute data. If you are shrinking the RDD into fewer partitions, you can
+ * set skipShuffle = false to avoid a shuffle. Skipping shuffles is not supported when
+ * increasing the number of partitions.
+ *
+ * Similar to `coalesce`, but shuffles by default, allowing you to call this safely even
+ * if you don't know the number of partitions.
+ */
+ def repartition(numPartitions: Int, skipShuffle: Boolean = false): RDD[T] = {
+ if (skipShuffle && numPartitions > this.partitions.size) {
+ val msg = "repartition must grow %s from %s to %s partitions, cannot skip shuffle.".format(
+ this.name, this.partitions.size, numPartitions
+ )
+ throw new IllegalArgumentException(msg)
+ }
+ coalesce(numPartitions, !skipShuffle)
+ }
+
+ /**
* Return a new RDD that is reduced into `numPartitions` partitions.
*
* This results in a narrow dependency, e.g. if you go from 1000 partitions
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 6d1bc5e296..fd00183668 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -139,6 +139,39 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(rdd.union(emptyKv).collect().size === 2)
}
+ test("repartitioned RDDs") {
+ val data = sc.parallelize(1 to 1000, 10)
+
+ // Coalesce partitions
+ val repartitioned1 = data.repartition(2)
+ assert(repartitioned1.partitions.size == 2)
+ val partitions1 = repartitioned1.glom().collect()
+ assert(partitions1(0).length > 0)
+ assert(partitions1(1).length > 0)
+ assert(repartitioned1.collect().toSet === (1 to 1000).toSet)
+
+ // Split partitions
+ val repartitioned2 = data.repartition(20)
+ assert(repartitioned2.partitions.size == 20)
+ val partitions2 = repartitioned2.glom().collect()
+ assert(partitions2(0).length > 0)
+ assert(partitions2(19).length > 0)
+ assert(repartitioned2.collect().toSet === (1 to 1000).toSet)
+
+ // Coalesce partitions - no shuffle
+ val repartitioned3 = data.repartition(2, skipShuffle = true)
+ assert(repartitioned3.partitions.size == 2)
+ val partitions3 = repartitioned3.glom().collect()
+ assert(partitions3(0).toList === (1 to 500).toList)
+ assert(partitions3(1).toList === (501 to 1000).toList)
+ assert(repartitioned3.collect().toSet === (1 to 1000).toSet)
+
+ // Split partitions - no shuffle (should throw exn)
+ intercept[IllegalArgumentException] {
+ data.repartition(20, skipShuffle = true)
+ }
+ }
+
test("coalesced RDDs") {
val data = sc.parallelize(1 to 10, 10)