aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-03-29 22:14:07 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-03-29 22:14:07 -0700
commit3cc8ab6e298b2dcdd236a38ab3d43aa617285a35 (patch)
treeca58fd97575e3f0da9656427cc5000fc3724afbe /core
parentcad507adaf22f27393067b4d3da4f718cd294196 (diff)
parentdd854d5b9fdc1fe60af4dc649af4202a8ddac0d8 (diff)
downloadspark-3cc8ab6e298b2dcdd236a38ab3d43aa617285a35.tar.gz
spark-3cc8ab6e298b2dcdd236a38ab3d43aa617285a35.tar.bz2
spark-3cc8ab6e298b2dcdd236a38ab3d43aa617285a35.zip
Merge pull request #541 from stephenh/shufflecoalesce
Add a shuffle parameter to coalesce.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/RDD.scala10
-rw-r--r--core/src/main/scala/spark/api/java/JavaDoubleRDD.scala6
-rw-r--r--core/src/main/scala/spark/api/java/JavaPairRDD.scala8
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDD.scala6
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala7
5 files changed, 34 insertions, 3 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index ed39732f13..33dc7627a3 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -31,6 +31,7 @@ import spark.rdd.MapPartitionsRDD
import spark.rdd.MapPartitionsWithIndexRDD
import spark.rdd.PipedRDD
import spark.rdd.SampledRDD
+import spark.rdd.ShuffledRDD
import spark.rdd.SubtractedRDD
import spark.rdd.UnionRDD
import spark.rdd.ZippedRDD
@@ -237,7 +238,14 @@ abstract class RDD[T: ClassManifest](
/**
* Return a new RDD that is reduced into `numPartitions` partitions.
*/
- def coalesce(numPartitions: Int): RDD[T] = new CoalescedRDD(this, numPartitions)
+ def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T] = {
+ if (shuffle) {
+ // include a shuffle step so that our upstream tasks are still distributed
+ new CoalescedRDD(new ShuffledRDD(map(x => (x, null)), new HashPartitioner(numPartitions)), numPartitions).keys
+ } else {
+ new CoalescedRDD(this, numPartitions)
+ }
+ }
/**
* Return a sampled subset of this RDD.
diff --git a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
index ba00b6a844..16692c0440 100644
--- a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
@@ -58,6 +58,12 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
def coalesce(numPartitions: Int): JavaDoubleRDD = fromRDD(srdd.coalesce(numPartitions))
/**
+ * Return a new RDD that is reduced into `numPartitions` partitions.
+ */
+ def coalesce(numPartitions: Int, shuffle: Boolean): JavaDoubleRDD =
+ fromRDD(srdd.coalesce(numPartitions, shuffle))
+
+ /**
* Return an RDD with the elements from `this` that are not in `other`.
*
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
index 49aaabf835..30084df4e2 100644
--- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
@@ -66,7 +66,13 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
/**
* Return a new RDD that is reduced into `numPartitions` partitions.
*/
- def coalesce(numPartitions: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.coalesce(numPartitions))
+ def coalesce(numPartitions: Int): JavaPairRDD[K, V] = fromRDD(rdd.coalesce(numPartitions))
+
+ /**
+ * Return a new RDD that is reduced into `numPartitions` partitions.
+ */
+ def coalesce(numPartitions: Int, shuffle: Boolean): JavaPairRDD[K, V] =
+ fromRDD(rdd.coalesce(numPartitions, shuffle))
/**
* Return a sampled subset of this RDD.
diff --git a/core/src/main/scala/spark/api/java/JavaRDD.scala b/core/src/main/scala/spark/api/java/JavaRDD.scala
index 3016888898..e29f1e5899 100644
--- a/core/src/main/scala/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDD.scala
@@ -44,6 +44,12 @@ JavaRDDLike[T, JavaRDD[T]] {
def coalesce(numPartitions: Int): JavaRDD[T] = rdd.coalesce(numPartitions)
/**
+ * Return a new RDD that is reduced into `numPartitions` partitions.
+ */
+ def coalesce(numPartitions: Int, shuffle: Boolean): JavaRDD[T] =
+ rdd.coalesce(numPartitions, shuffle)
+
+ /**
* Return a sampled subset of this RDD.
*/
def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaRDD[T] =
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index 53635b1de6..7fbdd44340 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -3,7 +3,7 @@ package spark
import scala.collection.mutable.HashMap
import org.scalatest.FunSuite
import spark.SparkContext._
-import spark.rdd.{CoalescedRDD, CoGroupedRDD, PartitionPruningRDD}
+import spark.rdd.{CoalescedRDD, CoGroupedRDD, PartitionPruningRDD, ShuffledRDD}
class RDDSuite extends FunSuite with LocalSparkContext {
@@ -184,6 +184,11 @@ class RDDSuite extends FunSuite with LocalSparkContext {
assert(coalesced4.collect().toList === (1 to 10).toList)
assert(coalesced4.glom().collect().map(_.toList).toList ===
(1 to 10).map(x => List(x)).toList)
+
+ // we can optionally shuffle to keep the upstream parallel
+ val coalesced5 = data.coalesce(1, shuffle = true)
+ assert(coalesced5.dependencies.head.rdd.dependencies.head.rdd.asInstanceOf[ShuffledRDD[_, _]] !=
+ null)
}
test("zipped RDDs") {