aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStephen Haberman <stephen@exigencecorp.com>2013-02-05 21:23:36 -0600
committerStephen Haberman <stephen@exigencecorp.com>2013-02-05 21:23:36 -0600
commitf2bc7480131c7468eb6d3bc6089a4deadf0a2a88 (patch)
tree76b62b37ba42e0f562b87524e658e150b99fe16a
parent67df7f2fa2e09487fe8dcf39ab80606d95383ea5 (diff)
downloadspark-f2bc7480131c7468eb6d3bc6089a4deadf0a2a88.tar.gz
spark-f2bc7480131c7468eb6d3bc6089a4deadf0a2a88.tar.bz2
spark-f2bc7480131c7468eb6d3bc6089a4deadf0a2a88.zip
Add RDD.coalesce.
-rw-r--r--core/src/main/scala/spark/RDD.scala7
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDDLike.scala10
-rw-r--r--core/src/test/scala/spark/CheckpointSuite.scala4
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala8
4 files changed, 23 insertions, 6 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 9d6ea782bd..f0bc85865c 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -20,6 +20,7 @@ import spark.partial.BoundedDouble
import spark.partial.CountEvaluator
import spark.partial.GroupedCountEvaluator
import spark.partial.PartialResult
+import spark.rdd.CoalescedRDD
import spark.rdd.CartesianRDD
import spark.rdd.FilteredRDD
import spark.rdd.FlatMappedRDD
@@ -232,6 +233,12 @@ abstract class RDD[T: ClassManifest](
def distinct(): RDD[T] = distinct(splits.size)
/**
+ * Return a new RDD that is reduced into `numSplits` partitions.
+ */
+ def coalesce(numSplits: Int = sc.defaultParallelism): RDD[T] =
+ new CoalescedRDD(this, numSplits)
+
+ /**
* Return a sampled subset of this RDD.
*/
def sample(withReplacement: Boolean, fraction: Double, seed: Int): RDD[T] =
diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
index 60025b459c..295eaa57c0 100644
--- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
@@ -131,6 +131,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround
other.classManifest)
/**
+ * Return a new RDD that is reduced into the default number of partitions.
+ */
+ def coalesce(): RDD[T] = coalesce(rdd.context.defaultParallelism)
+
+ /**
+ * Return a new RDD that is reduced into `numSplits` partitions.
+ */
+ def coalesce(numSplits: Int): RDD[T] = rdd.coalesce(numSplits)
+
+ /**
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala
index 0b74607fb8..0d08fd2396 100644
--- a/core/src/test/scala/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/spark/CheckpointSuite.scala
@@ -114,12 +114,12 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
}
test("CoalescedRDD") {
- testCheckpointing(new CoalescedRDD(_, 2))
+ testCheckpointing(_.coalesce(2))
// Test whether size of CoalescedRDD reduce in size after parent RDD is checkpointed
// Current implementation of CoalescedRDDSplit has transient reference to parent RDD,
// so only the RDD will reduce in serialized size, not the splits.
- testParentCheckpointing(new CoalescedRDD(_, 2), true, false)
+ testParentCheckpointing(_.coalesce(2), true, false)
// Test that the CoalescedRDDSplit updates parent splits (CoalescedRDDSplit.parents) after
// the parent RDD has been checkpointed and parent splits have been changed to HadoopSplits.
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index fe7deb10d6..ffa866de75 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -122,7 +122,7 @@ class RDDSuite extends FunSuite with LocalSparkContext {
sc = new SparkContext("local", "test")
val data = sc.parallelize(1 to 10, 10)
- val coalesced1 = new CoalescedRDD(data, 2)
+ val coalesced1 = data.coalesce(2)
assert(coalesced1.collect().toList === (1 to 10).toList)
assert(coalesced1.glom().collect().map(_.toList).toList ===
List(List(1, 2, 3, 4, 5), List(6, 7, 8, 9, 10)))
@@ -133,19 +133,19 @@ class RDDSuite extends FunSuite with LocalSparkContext {
assert(coalesced1.dependencies.head.asInstanceOf[NarrowDependency[_]].getParents(1).toList ===
List(5, 6, 7, 8, 9))
- val coalesced2 = new CoalescedRDD(data, 3)
+ val coalesced2 = data.coalesce(3)
assert(coalesced2.collect().toList === (1 to 10).toList)
assert(coalesced2.glom().collect().map(_.toList).toList ===
List(List(1, 2, 3), List(4, 5, 6), List(7, 8, 9, 10)))
- val coalesced3 = new CoalescedRDD(data, 10)
+ val coalesced3 = data.coalesce(10)
assert(coalesced3.collect().toList === (1 to 10).toList)
assert(coalesced3.glom().collect().map(_.toList).toList ===
(1 to 10).map(x => List(x)).toList)
// If we try to coalesce into more partitions than the original RDD, it should just
// keep the original number of partitions.
- val coalesced4 = new CoalescedRDD(data, 20)
+ val coalesced4 = data.coalesce(20)
assert(coalesced4.collect().toList === (1 to 10).toList)
assert(coalesced4.glom().collect().map(_.toList).toList ===
(1 to 10).map(x => List(x)).toList)