diff options
author | Stephen Haberman <stephen@exigencecorp.com> | 2013-02-05 21:23:36 -0600 |
---|---|---|
committer | Stephen Haberman <stephen@exigencecorp.com> | 2013-02-05 21:23:36 -0600 |
commit | f2bc7480131c7468eb6d3bc6089a4deadf0a2a88 (patch) | |
tree | 76b62b37ba42e0f562b87524e658e150b99fe16a | |
parent | 67df7f2fa2e09487fe8dcf39ab80606d95383ea5 (diff) | |
download | spark-f2bc7480131c7468eb6d3bc6089a4deadf0a2a88.tar.gz spark-f2bc7480131c7468eb6d3bc6089a4deadf0a2a88.tar.bz2 spark-f2bc7480131c7468eb6d3bc6089a4deadf0a2a88.zip |
Add RDD.coalesce.
-rw-r--r-- | core/src/main/scala/spark/RDD.scala | 7 | ||||
-rw-r--r-- | core/src/main/scala/spark/api/java/JavaRDDLike.scala | 10 | ||||
-rw-r--r-- | core/src/test/scala/spark/CheckpointSuite.scala | 4 | ||||
-rw-r--r-- | core/src/test/scala/spark/RDDSuite.scala | 8 |
4 files changed, 23 insertions, 6 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 9d6ea782bd..f0bc85865c 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -20,6 +20,7 @@ import spark.partial.BoundedDouble import spark.partial.CountEvaluator import spark.partial.GroupedCountEvaluator import spark.partial.PartialResult +import spark.rdd.CoalescedRDD import spark.rdd.CartesianRDD import spark.rdd.FilteredRDD import spark.rdd.FlatMappedRDD @@ -232,6 +233,12 @@ abstract class RDD[T: ClassManifest]( def distinct(): RDD[T] = distinct(splits.size) /** + * Return a new RDD that is reduced into `numSplits` partitions. + */ + def coalesce(numSplits: Int = sc.defaultParallelism): RDD[T] = + new CoalescedRDD(this, numSplits) + + /** * Return a sampled subset of this RDD. */ def sample(withReplacement: Boolean, fraction: Double, seed: Int): RDD[T] = diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala index 60025b459c..295eaa57c0 100644 --- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala @@ -131,6 +131,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround other.classManifest) /** + * Return a new RDD that is reduced into the default number of partitions. + */ + def coalesce(): RDD[T] = coalesce(rdd.context.defaultParallelism) + + /** + * Return a new RDD that is reduced into `numSplits` partitions. + */ + def coalesce(numSplits: Int): RDD[T] = rdd.coalesce(numSplits) + + /** * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements * mapping to that key. */ diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala index 0b74607fb8..0d08fd2396 100644 --- a/core/src/test/scala/spark/CheckpointSuite.scala +++ b/core/src/test/scala/spark/CheckpointSuite.scala @@ -114,12 +114,12 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { } test("CoalescedRDD") { - testCheckpointing(new CoalescedRDD(_, 2)) + testCheckpointing(_.coalesce(2)) // Test whether size of CoalescedRDD reduce in size after parent RDD is checkpointed // Current implementation of CoalescedRDDSplit has transient reference to parent RDD, // so only the RDD will reduce in serialized size, not the splits. - testParentCheckpointing(new CoalescedRDD(_, 2), true, false) + testParentCheckpointing(_.coalesce(2), true, false) // Test that the CoalescedRDDSplit updates parent splits (CoalescedRDDSplit.parents) after // the parent RDD has been checkpointed and parent splits have been changed to HadoopSplits. diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index fe7deb10d6..ffa866de75 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -122,7 +122,7 @@ class RDDSuite extends FunSuite with LocalSparkContext { sc = new SparkContext("local", "test") val data = sc.parallelize(1 to 10, 10) - val coalesced1 = new CoalescedRDD(data, 2) + val coalesced1 = data.coalesce(2) assert(coalesced1.collect().toList === (1 to 10).toList) assert(coalesced1.glom().collect().map(_.toList).toList === List(List(1, 2, 3, 4, 5), List(6, 7, 8, 9, 10))) @@ -133,19 +133,19 @@ class RDDSuite extends FunSuite with LocalSparkContext { assert(coalesced1.dependencies.head.asInstanceOf[NarrowDependency[_]].getParents(1).toList === List(5, 6, 7, 8, 9)) - val coalesced2 = new CoalescedRDD(data, 3) + val coalesced2 = data.coalesce(3) assert(coalesced2.collect().toList === (1 to 10).toList) assert(coalesced2.glom().collect().map(_.toList).toList === List(List(1, 2, 3), List(4, 5, 6), List(7, 8, 9, 10))) - val coalesced3 = new CoalescedRDD(data, 10) + val coalesced3 = data.coalesce(10) assert(coalesced3.collect().toList === (1 to 10).toList) assert(coalesced3.glom().collect().map(_.toList).toList === (1 to 10).map(x => List(x)).toList) // If we try to coalesce into more partitions than the original RDD, it should just // keep the original number of partitions. - val coalesced4 = new CoalescedRDD(data, 20) + val coalesced4 = data.coalesce(20) assert(coalesced4.collect().toList === (1 to 10).toList) assert(coalesced4.glom().collect().map(_.toList).toList === (1 to 10).map(x => List(x)).toList) |