aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-06-09 14:44:18 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-06-09 14:44:18 -0700
commita96558caa3c0feb20bbf0f3ec367673886fc78c6 (patch)
tree186307c3756a244a2a1b2df6a8140947eb967ca8
parent048276799ae15ce5978733722e8ddde6a07302ff (diff)
downloadspark-a96558caa3c0feb20bbf0f3ec367673886fc78c6.tar.gz
spark-a96558caa3c0feb20bbf0f3ec367673886fc78c6.tar.bz2
spark-a96558caa3c0feb20bbf0f3ec367673886fc78c6.zip
Performance improvements to shuffle operations: in particular, preserve
RDD partitioning in more cases where it's possible, and use iterators instead of materializing collections when doing joins.
-rw-r--r--bagel/src/main/scala/spark/bagel/Bagel.scala3
-rw-r--r--bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala1
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala187
-rw-r--r--core/src/main/scala/spark/Partitioner.scala7
-rw-r--r--core/src/main/scala/spark/RDD.scala4
-rw-r--r--core/src/test/scala/spark/PartitioningSuite.scala101
6 files changed, 208 insertions, 95 deletions
diff --git a/bagel/src/main/scala/spark/bagel/Bagel.scala b/bagel/src/main/scala/spark/bagel/Bagel.scala
index 2f57c9c0fd..996ca2a877 100644
--- a/bagel/src/main/scala/spark/bagel/Bagel.scala
+++ b/bagel/src/main/scala/spark/bagel/Bagel.scala
@@ -30,8 +30,7 @@ object Bagel extends Logging {
val aggregated = agg(verts, aggregator)
val combinedMsgs = msgs.combineByKey(
- combiner.createCombiner, combiner.mergeMsg, combiner.mergeCombiners,
- splits, partitioner)
+ combiner.createCombiner _, combiner.mergeMsg _, combiner.mergeCombiners _, partitioner)
val grouped = combinedMsgs.groupWith(verts)
val (processed, numMsgs, numActiveVerts) =
comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep))
diff --git a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala
index 7084ff97d9..8ce7abd03f 100644
--- a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala
+++ b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala
@@ -105,7 +105,6 @@ object WikipediaPageRankStandalone {
ranks = (contribs.combineByKey((x: Double) => x,
(x: Double, y: Double) => x + y,
(x: Double, y: Double) => x + y,
- numSplits,
partitioner)
.mapValues(sum => a/n + (1-a)*sum))
}
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index 8b63d1aba1..e880f9872f 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -60,7 +60,6 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
def combineByKey[C](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
- numSplits: Int,
partitioner: Partitioner): RDD[(K, C)] = {
val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
new ShuffledRDD(self, aggregator, partitioner)
@@ -70,21 +69,15 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
numSplits: Int): RDD[(K, C)] = {
- combineByKey(createCombiner, mergeValue, mergeCombiners, numSplits,
- new HashPartitioner(numSplits))
+ combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numSplits))
}
- def reduceByKey(func: (V, V) => V, numSplits: Int): RDD[(K, V)] = {
- combineByKey[V]((v: V) => v, func, func, numSplits)
+ def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
+ combineByKey[V]((v: V) => v, func, func, partitioner)
}
- def groupByKey(numSplits: Int): RDD[(K, Seq[V])] = {
- def createCombiner(v: V) = ArrayBuffer(v)
- def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
- def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2
- val bufs = combineByKey[ArrayBuffer[V]](
- createCombiner _, mergeValue _, mergeCombiners _, numSplits)
- bufs.asInstanceOf[RDD[(K, Seq[V])]]
+ def reduceByKey(func: (V, V) => V, numSplits: Int): RDD[(K, V)] = {
+ reduceByKey(new HashPartitioner(numSplits), func)
}
def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
@@ -92,100 +85,90 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2
val bufs = combineByKey[ArrayBuffer[V]](
- createCombiner _, mergeValue _, mergeCombiners _, partitioner.numPartitions, partitioner)
+ createCombiner _, mergeValue _, mergeCombiners _, partitioner)
bufs.asInstanceOf[RDD[(K, Seq[V])]]
}
+ def groupByKey(numSplits: Int): RDD[(K, Seq[V])] = {
+ groupByKey(new HashPartitioner(numSplits))
+ }
+
def partitionBy(partitioner: Partitioner): RDD[(K, V)] = {
def createCombiner(v: V) = ArrayBuffer(v)
def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2
val bufs = combineByKey[ArrayBuffer[V]](
- createCombiner _, mergeValue _, mergeCombiners _, partitioner.numPartitions, partitioner)
+ createCombiner _, mergeValue _, mergeCombiners _, partitioner)
bufs.flatMapValues(buf => buf)
}
- def join[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, W))] = {
- val vs: RDD[(K, Either[V, W])] = self.map { case (k, v) => (k, Left(v)) }
- val ws: RDD[(K, Either[V, W])] = other.map { case (k, w) => (k, Right(w)) }
- (vs ++ ws).groupByKey(numSplits).flatMap {
- case (k, seq) => {
- val vbuf = new ArrayBuffer[V]
- val wbuf = new ArrayBuffer[W]
- seq.foreach(_ match {
- case Left(v) => vbuf += v
- case Right(w) => wbuf += w
- })
- for (v <- vbuf; w <- wbuf) yield (k, (v, w))
- }
+ def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
+ this.cogroup(other, partitioner).flatMapValues {
+ case (vs, ws) =>
+ for (v <- vs.iterator; w <- ws.iterator) yield (v, w)
}
}
- def leftOuterJoin[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, Option[W]))] = {
- val vs: RDD[(K, Either[V, W])] = self.map { case (k, v) => (k, Left(v)) }
- val ws: RDD[(K, Either[V, W])] = other.map { case (k, w) => (k, Right(w)) }
- (vs ++ ws).groupByKey(numSplits).flatMap {
- case (k, seq) => {
- val vbuf = new ArrayBuffer[V]
- val wbuf = new ArrayBuffer[Option[W]]
- seq.foreach(_ match {
- case Left(v) => vbuf += v
- case Right(w) => wbuf += Some(w)
- })
- if (wbuf.isEmpty) {
- wbuf += None
+ def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
+ this.cogroup(other, partitioner).flatMapValues {
+ case (vs, ws) =>
+ if (ws.isEmpty) {
+ vs.iterator.map(v => (v, None))
+ } else {
+ for (v <- vs.iterator; w <- ws.iterator) yield (v, Some(w))
}
- for (v <- vbuf; w <- wbuf) yield (k, (v, w))
- }
}
}
- def rightOuterJoin[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (Option[V], W))] = {
- val vs: RDD[(K, Either[V, W])] = self.map { case (k, v) => (k, Left(v)) }
- val ws: RDD[(K, Either[V, W])] = other.map { case (k, w) => (k, Right(w)) }
- (vs ++ ws).groupByKey(numSplits).flatMap {
- case (k, seq) => {
- val vbuf = new ArrayBuffer[Option[V]]
- val wbuf = new ArrayBuffer[W]
- seq.foreach(_ match {
- case Left(v) => vbuf += Some(v)
- case Right(w) => wbuf += w
- })
- if (vbuf.isEmpty) {
- vbuf += None
+ def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
+ : RDD[(K, (Option[V], W))] = {
+ this.cogroup(other, partitioner).flatMapValues {
+ case (vs, ws) =>
+ if (vs.isEmpty) {
+ ws.iterator.map(w => (None, w))
+ } else {
+ for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), w)
}
- for (v <- vbuf; w <- wbuf) yield (k, (v, w))
- }
}
}
def combineByKey[C](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C) : RDD[(K, C)] = {
- combineByKey(createCombiner, mergeValue, mergeCombiners, defaultParallelism)
+ combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
}
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
- reduceByKey(func, defaultParallelism)
+ reduceByKey(defaultPartitioner(self), func)
}
def groupByKey(): RDD[(K, Seq[V])] = {
- groupByKey(defaultParallelism)
+ groupByKey(defaultPartitioner(self))
}
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = {
- join(other, defaultParallelism)
+ join(other, defaultPartitioner(self, other))
+ }
+
+ def join[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, W))] = {
+ join(other, new HashPartitioner(numSplits))
}
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = {
- leftOuterJoin(other, defaultParallelism)
+ leftOuterJoin(other, defaultPartitioner(self, other))
+ }
+
+ def leftOuterJoin[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, Option[W]))] = {
+ leftOuterJoin(other, new HashPartitioner(numSplits))
}
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = {
- rightOuterJoin(other, defaultParallelism)
+ rightOuterJoin(other, defaultPartitioner(self, other))
}
- def defaultParallelism = self.context.defaultParallelism
+ def rightOuterJoin[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (Option[V], W))] = {
+ rightOuterJoin(other, new HashPartitioner(numSplits))
+ }
def collectAsMap(): Map[K, V] = HashMap(self.collect(): _*)
@@ -194,42 +177,72 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
new MappedValuesRDD(self, cleanF)
}
- def flatMapValues[U](f: V => Traversable[U]): RDD[(K, U)] = {
+ def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = {
val cleanF = self.context.clean(f)
new FlatMappedValuesRDD(self, cleanF)
}
- def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
- val part = self.partitioner match {
- case Some(p) => p
- case None => new HashPartitioner(defaultParallelism)
- }
+ def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = {
val cg = new CoGroupedRDD[K](
Seq(self.asInstanceOf[RDD[(_, _)]], other.asInstanceOf[RDD[(_, _)]]),
- part)
- val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(
- classManifest[K],
- Manifests.seqSeqManifest)
+ partitioner)
+ val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
prfs.mapValues {
case Seq(vs, ws) =>
(vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
}
}
- def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
+ def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
- val part = self.partitioner match {
- case Some(p) => p
- case None => new HashPartitioner(defaultParallelism)
- }
- new CoGroupedRDD[K](
+ val cg = new CoGroupedRDD[K](
Seq(self.asInstanceOf[RDD[(_, _)]],
other1.asInstanceOf[RDD[(_, _)]],
other2.asInstanceOf[RDD[(_, _)]]),
- part).map {
- case (k, Seq(vs, w1s, w2s)) =>
- (k, (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]]))
+ partitioner)
+ val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
+ prfs.mapValues {
+ case Seq(vs, w1s, w2s) =>
+ (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]])
+ }
+ }
+
+ def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
+ cogroup(other, defaultPartitioner(self, other))
+ }
+
+ def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
+ : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
+ cogroup(other1, other2, defaultPartitioner(self, other1, other2))
+ }
+
+ def cogroup[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (Seq[V], Seq[W]))] = {
+ cogroup(other, new HashPartitioner(numSplits))
+ }
+
+ def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numSplits: Int)
+ : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
+ cogroup(other1, other2, new HashPartitioner(numSplits))
+ }
+
+ def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
+ cogroup(other, defaultPartitioner(self, other))
+ }
+
+ def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
+ : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
+ cogroup(other1, other2, defaultPartitioner(self, other1, other2))
+ }
+
+ /**
+ * Choose a partitioner to use for a cogroup-like operation between a number of RDDs. If any of
+ * the RDDs already has a partitioner, choose that one, otherwise use a default HashPartitioner.
+ */
+ def defaultPartitioner(rdds: RDD[_]*): Partitioner = {
+ for (r <- rdds if r.partitioner != None) {
+ return r.partitioner.get
}
+ return new HashPartitioner(self.context.defaultParallelism)
}
def lookup(key: K): Seq[V] = {
@@ -376,6 +389,7 @@ class SortedRDD[K <% Ordered[K], V](prev: RDD[(K, V)], ascending: Boolean)
override def splits = prev.splits
override val partitioner = prev.partitioner
override val dependencies = List(new OneToOneDependency(prev))
+
override def compute(split: Split) = {
prev.iterator(split).toArray
.sortWith((x, y) => if (ascending) x._1 < y._1 else x._1 > y._1).iterator
@@ -389,16 +403,15 @@ class MappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => U) extends RDD[(K, U)]
override def compute(split: Split) = prev.iterator(split).map{case (k, v) => (k, f(v))}
}
-class FlatMappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => Traversable[U])
+class FlatMappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => TraversableOnce[U])
extends RDD[(K, U)](prev.context) {
override def splits = prev.splits
override val dependencies = List(new OneToOneDependency(prev))
override val partitioner = prev.partitioner
+
override def compute(split: Split) = {
- prev.iterator(split).toStream.flatMap {
- case (k, v) => f(v).map(x => (k, x))
- }.iterator
+ prev.iterator(split).flatMap { case (k, v) => f(v).map(x => (k, x)) }
}
}
diff --git a/core/src/main/scala/spark/Partitioner.scala b/core/src/main/scala/spark/Partitioner.scala
index ac61fe3b54..024a4580ac 100644
--- a/core/src/main/scala/spark/Partitioner.scala
+++ b/core/src/main/scala/spark/Partitioner.scala
@@ -26,8 +26,9 @@ class HashPartitioner(partitions: Int) extends Partitioner {
}
class RangePartitioner[K <% Ordered[K]: ClassManifest, V](
- partitions: Int, rdd: RDD[(K,V)],
- ascending: Boolean = true)
+ partitions: Int,
+ @transient rdd: RDD[(K,V)],
+ private val ascending: Boolean = true)
extends Partitioner {
private val rangeBounds: Array[K] = {
@@ -65,7 +66,7 @@ class RangePartitioner[K <% Ordered[K]: ClassManifest, V](
override def equals(other: Any): Boolean = other match {
case r: RangePartitioner[_,_] =>
- r.rangeBounds.sameElements(rangeBounds)
+ r.rangeBounds.sameElements(rangeBounds) && r.ascending == ascending
case _ =>
false
}
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index fa53d9be2c..4c4b2ee30d 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -83,7 +83,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
- def flatMap[U: ClassManifest](f: T => Traversable[U]): RDD[U] =
+ def flatMap[U: ClassManifest](f: T => TraversableOnce[U]): RDD[U] =
new FlatMappedRDD(this, sc.clean(f))
def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f))
@@ -275,7 +275,7 @@ class MappedRDD[U: ClassManifest, T: ClassManifest](
class FlatMappedRDD[U: ClassManifest, T: ClassManifest](
prev: RDD[T],
- f: T => Traversable[U])
+ f: T => TraversableOnce[U])
extends RDD[U](prev.context) {
override def splits = prev.splits
diff --git a/core/src/test/scala/spark/PartitioningSuite.scala b/core/src/test/scala/spark/PartitioningSuite.scala
new file mode 100644
index 0000000000..7f7f9493dc
--- /dev/null
+++ b/core/src/test/scala/spark/PartitioningSuite.scala
@@ -0,0 +1,101 @@
+package spark
+
+import org.scalatest.FunSuite
+
+import scala.collection.mutable.ArrayBuffer
+
+import SparkContext._
+
+class PartitioningSuite extends FunSuite {
+ test("HashPartitioner equality") {
+ val p2 = new HashPartitioner(2)
+ val p4 = new HashPartitioner(4)
+ val anotherP4 = new HashPartitioner(4)
+ assert(p2 === p2)
+ assert(p4 === p4)
+ assert(p2 != p4)
+ assert(p4 != p2)
+ assert(p4 === anotherP4)
+ assert(anotherP4 === p4)
+ }
+
+ test("RangePartitioner equality") {
+ val sc = new SparkContext("local", "test")
+
+ // Make an RDD where all the elements are the same so that the partition range bounds
+ // are deterministically all the same.
+ val rdd = sc.parallelize(Seq(1, 1, 1, 1)).map(x => (x, x))
+
+ val p2 = new RangePartitioner(2, rdd)
+ val p4 = new RangePartitioner(4, rdd)
+ val anotherP4 = new RangePartitioner(4, rdd)
+ val descendingP2 = new RangePartitioner(2, rdd, false)
+ val descendingP4 = new RangePartitioner(4, rdd, false)
+
+ assert(p2 === p2)
+ assert(p4 === p4)
+ assert(p2 != p4)
+ assert(p4 != p2)
+ assert(p4 === anotherP4)
+ assert(anotherP4 === p4)
+ assert(descendingP2 === descendingP2)
+ assert(descendingP4 === descendingP4)
+ assert(descendingP2 != descendingP4)
+ assert(descendingP4 != descendingP2)
+ assert(p2 != descendingP2)
+ assert(p4 != descendingP4)
+ assert(descendingP2 != p2)
+ assert(descendingP4 != p4)
+
+ sc.stop()
+ }
+
+ test("HashPartitioner not equal to RangePartitioner") {
+ val sc = new SparkContext("local", "test")
+ val rdd = sc.parallelize(1 to 10).map(x => (x, x))
+ val rangeP2 = new RangePartitioner(2, rdd)
+ val hashP2 = new HashPartitioner(2)
+ assert(rangeP2 === rangeP2)
+ assert(hashP2 === hashP2)
+ assert(hashP2 != rangeP2)
+ assert(rangeP2 != hashP2)
+ sc.stop()
+ }
+
+ test("partitioner preservation") {
+ val sc = new SparkContext("local", "test")
+
+ val rdd = sc.parallelize(1 to 10, 4).map(x => (x, x))
+
+ val grouped2 = rdd.groupByKey(2)
+ val grouped4 = rdd.groupByKey(4)
+ val reduced2 = rdd.reduceByKey(_ + _, 2)
+ val reduced4 = rdd.reduceByKey(_ + _, 4)
+
+ assert(rdd.partitioner === None)
+
+ assert(grouped2.partitioner === Some(new HashPartitioner(2)))
+ assert(grouped4.partitioner === Some(new HashPartitioner(4)))
+ assert(reduced2.partitioner === Some(new HashPartitioner(2)))
+ assert(reduced4.partitioner === Some(new HashPartitioner(4)))
+
+ assert(grouped2.groupByKey().partitioner === grouped2.partitioner)
+ assert(grouped2.groupByKey(3).partitioner != grouped2.partitioner)
+ assert(grouped2.groupByKey(2).partitioner === grouped2.partitioner)
+ assert(grouped4.groupByKey().partitioner === grouped4.partitioner)
+ assert(grouped4.groupByKey(3).partitioner != grouped4.partitioner)
+ assert(grouped4.groupByKey(4).partitioner === grouped4.partitioner)
+
+ assert(grouped2.join(grouped4).partitioner === grouped2.partitioner)
+ assert(grouped2.leftOuterJoin(grouped4).partitioner === grouped2.partitioner)
+ assert(grouped2.rightOuterJoin(grouped4).partitioner === grouped2.partitioner)
+ assert(grouped2.cogroup(grouped4).partitioner === grouped2.partitioner)
+
+ assert(grouped2.join(reduced2).partitioner === grouped2.partitioner)
+ assert(grouped2.leftOuterJoin(reduced2).partitioner === grouped2.partitioner)
+ assert(grouped2.rightOuterJoin(reduced2).partitioner === grouped2.partitioner)
+ assert(grouped2.cogroup(reduced2).partitioner === grouped2.partitioner)
+
+ sc.stop()
+ }
+}