diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-03-29 22:14:07 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-03-29 22:14:07 -0700 |
commit | 3cc8ab6e298b2dcdd236a38ab3d43aa617285a35 (patch) | |
tree | ca58fd97575e3f0da9656427cc5000fc3724afbe /core/src/main | |
parent | cad507adaf22f27393067b4d3da4f718cd294196 (diff) | |
parent | dd854d5b9fdc1fe60af4dc649af4202a8ddac0d8 (diff) | |
download | spark-3cc8ab6e298b2dcdd236a38ab3d43aa617285a35.tar.gz spark-3cc8ab6e298b2dcdd236a38ab3d43aa617285a35.tar.bz2 spark-3cc8ab6e298b2dcdd236a38ab3d43aa617285a35.zip |
Merge pull request #541 from stephenh/shufflecoalesce
Add a shuffle parameter to coalesce.
Diffstat (limited to 'core/src/main')
-rw-r--r-- | core/src/main/scala/spark/RDD.scala | 10 | ||||
-rw-r--r-- | core/src/main/scala/spark/api/java/JavaDoubleRDD.scala | 6 | ||||
-rw-r--r-- | core/src/main/scala/spark/api/java/JavaPairRDD.scala | 8 | ||||
-rw-r--r-- | core/src/main/scala/spark/api/java/JavaRDD.scala | 6 |
4 files changed, 28 insertions, 2 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index ed39732f13..33dc7627a3 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -31,6 +31,7 @@ import spark.rdd.MapPartitionsRDD import spark.rdd.MapPartitionsWithIndexRDD import spark.rdd.PipedRDD import spark.rdd.SampledRDD +import spark.rdd.ShuffledRDD import spark.rdd.SubtractedRDD import spark.rdd.UnionRDD import spark.rdd.ZippedRDD @@ -237,7 +238,14 @@ abstract class RDD[T: ClassManifest]( /** * Return a new RDD that is reduced into `numPartitions` partitions. */ - def coalesce(numPartitions: Int): RDD[T] = new CoalescedRDD(this, numPartitions) + def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T] = { + if (shuffle) { + // include a shuffle step so that our upstream tasks are still distributed + new CoalescedRDD(new ShuffledRDD(map(x => (x, null)), new HashPartitioner(numPartitions)), numPartitions).keys + } else { + new CoalescedRDD(this, numPartitions) + } + } /** * Return a sampled subset of this RDD. diff --git a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala index ba00b6a844..16692c0440 100644 --- a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala @@ -58,6 +58,12 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav def coalesce(numPartitions: Int): JavaDoubleRDD = fromRDD(srdd.coalesce(numPartitions)) /** + * Return a new RDD that is reduced into `numPartitions` partitions. + */ + def coalesce(numPartitions: Int, shuffle: Boolean): JavaDoubleRDD = + fromRDD(srdd.coalesce(numPartitions, shuffle)) + + /** * Return an RDD with the elements from `this` that are not in `other`. * * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala index 49aaabf835..30084df4e2 100644 --- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala @@ -66,7 +66,13 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif /** * Return a new RDD that is reduced into `numPartitions` partitions. */ - def coalesce(numPartitions: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.coalesce(numPartitions)) + def coalesce(numPartitions: Int): JavaPairRDD[K, V] = fromRDD(rdd.coalesce(numPartitions)) + + /** + * Return a new RDD that is reduced into `numPartitions` partitions. + */ + def coalesce(numPartitions: Int, shuffle: Boolean): JavaPairRDD[K, V] = + fromRDD(rdd.coalesce(numPartitions, shuffle)) /** * Return a sampled subset of this RDD. diff --git a/core/src/main/scala/spark/api/java/JavaRDD.scala b/core/src/main/scala/spark/api/java/JavaRDD.scala index 3016888898..e29f1e5899 100644 --- a/core/src/main/scala/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaRDD.scala @@ -44,6 +44,12 @@ JavaRDDLike[T, JavaRDD[T]] { def coalesce(numPartitions: Int): JavaRDD[T] = rdd.coalesce(numPartitions) /** + * Return a new RDD that is reduced into `numPartitions` partitions. + */ + def coalesce(numPartitions: Int, shuffle: Boolean): JavaRDD[T] = + rdd.coalesce(numPartitions, shuffle) + + /** * Return a sampled subset of this RDD. */ def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaRDD[T] = |