diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-06-09 14:44:18 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-06-09 14:44:18 -0700 |
commit | a96558caa3c0feb20bbf0f3ec367673886fc78c6 (patch) | |
tree | 186307c3756a244a2a1b2df6a8140947eb967ca8 /core | |
parent | 048276799ae15ce5978733722e8ddde6a07302ff (diff) | |
download | spark-a96558caa3c0feb20bbf0f3ec367673886fc78c6.tar.gz spark-a96558caa3c0feb20bbf0f3ec367673886fc78c6.tar.bz2 spark-a96558caa3c0feb20bbf0f3ec367673886fc78c6.zip |
Performance improvements to shuffle operations: in particular, preserve
RDD partitioning in more cases where it's possible, and use iterators
instead of materializing collections when doing joins.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/PairRDDFunctions.scala | 187 | ||||
-rw-r--r-- | core/src/main/scala/spark/Partitioner.scala | 7 | ||||
-rw-r--r-- | core/src/main/scala/spark/RDD.scala | 4 | ||||
-rw-r--r-- | core/src/test/scala/spark/PartitioningSuite.scala | 101 |
4 files changed, 207 insertions, 92 deletions
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 8b63d1aba1..e880f9872f 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -60,7 +60,6 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, - numSplits: Int, partitioner: Partitioner): RDD[(K, C)] = { val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) new ShuffledRDD(self, aggregator, partitioner) @@ -70,21 +69,15 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numSplits: Int): RDD[(K, C)] = { - combineByKey(createCombiner, mergeValue, mergeCombiners, numSplits, - new HashPartitioner(numSplits)) + combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numSplits)) } - def reduceByKey(func: (V, V) => V, numSplits: Int): RDD[(K, V)] = { - combineByKey[V]((v: V) => v, func, func, numSplits) + def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = { + combineByKey[V]((v: V) => v, func, func, partitioner) } - def groupByKey(numSplits: Int): RDD[(K, Seq[V])] = { - def createCombiner(v: V) = ArrayBuffer(v) - def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v - def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2 - val bufs = combineByKey[ArrayBuffer[V]]( - createCombiner _, mergeValue _, mergeCombiners _, numSplits) - bufs.asInstanceOf[RDD[(K, Seq[V])]] + def reduceByKey(func: (V, V) => V, numSplits: Int): RDD[(K, V)] = { + reduceByKey(new HashPartitioner(numSplits), func) } def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = { @@ -92,100 +85,90 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2 val bufs = combineByKey[ArrayBuffer[V]]( - createCombiner _, mergeValue _, mergeCombiners _, partitioner.numPartitions, partitioner) + createCombiner _, mergeValue _, mergeCombiners _, partitioner) bufs.asInstanceOf[RDD[(K, Seq[V])]] } + def groupByKey(numSplits: Int): RDD[(K, Seq[V])] = { + groupByKey(new HashPartitioner(numSplits)) + } + def partitionBy(partitioner: Partitioner): RDD[(K, V)] = { def createCombiner(v: V) = ArrayBuffer(v) def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2 val bufs = combineByKey[ArrayBuffer[V]]( - createCombiner _, mergeValue _, mergeCombiners _, partitioner.numPartitions, partitioner) + createCombiner _, mergeValue _, mergeCombiners _, partitioner) bufs.flatMapValues(buf => buf) } - def join[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, W))] = { - val vs: RDD[(K, Either[V, W])] = self.map { case (k, v) => (k, Left(v)) } - val ws: RDD[(K, Either[V, W])] = other.map { case (k, w) => (k, Right(w)) } - (vs ++ ws).groupByKey(numSplits).flatMap { - case (k, seq) => { - val vbuf = new ArrayBuffer[V] - val wbuf = new ArrayBuffer[W] - seq.foreach(_ match { - case Left(v) => vbuf += v - case Right(w) => wbuf += w - }) - for (v <- vbuf; w <- wbuf) yield (k, (v, w)) - } + def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = { + this.cogroup(other, partitioner).flatMapValues { + case (vs, ws) => + for (v <- vs.iterator; w <- ws.iterator) yield (v, w) } } - def leftOuterJoin[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, Option[W]))] = { - val vs: RDD[(K, Either[V, W])] = self.map { case (k, v) => (k, Left(v)) } - val ws: RDD[(K, Either[V, W])] = other.map { case (k, w) => (k, Right(w)) } - (vs ++ ws).groupByKey(numSplits).flatMap { - case (k, seq) => { - val vbuf = new ArrayBuffer[V] - val wbuf = new ArrayBuffer[Option[W]] - seq.foreach(_ match { - case Left(v) => vbuf += v - case Right(w) => wbuf += Some(w) - }) - if (wbuf.isEmpty) { - wbuf += None + def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = { + this.cogroup(other, partitioner).flatMapValues { + case (vs, ws) => + if (ws.isEmpty) { + vs.iterator.map(v => (v, None)) + } else { + for (v <- vs.iterator; w <- ws.iterator) yield (v, Some(w)) } - for (v <- vbuf; w <- wbuf) yield (k, (v, w)) - } } } - def rightOuterJoin[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (Option[V], W))] = { - val vs: RDD[(K, Either[V, W])] = self.map { case (k, v) => (k, Left(v)) } - val ws: RDD[(K, Either[V, W])] = other.map { case (k, w) => (k, Right(w)) } - (vs ++ ws).groupByKey(numSplits).flatMap { - case (k, seq) => { - val vbuf = new ArrayBuffer[Option[V]] - val wbuf = new ArrayBuffer[W] - seq.foreach(_ match { - case Left(v) => vbuf += Some(v) - case Right(w) => wbuf += w - }) - if (vbuf.isEmpty) { - vbuf += None + def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) + : RDD[(K, (Option[V], W))] = { + this.cogroup(other, partitioner).flatMapValues { + case (vs, ws) => + if (vs.isEmpty) { + ws.iterator.map(w => (None, w)) + } else { + for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), w) } - for (v <- vbuf; w <- wbuf) yield (k, (v, w)) - } } } def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) : RDD[(K, C)] = { - combineByKey(createCombiner, mergeValue, mergeCombiners, defaultParallelism) + combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self)) } def reduceByKey(func: (V, V) => V): RDD[(K, V)] = { - reduceByKey(func, defaultParallelism) + reduceByKey(defaultPartitioner(self), func) } def groupByKey(): RDD[(K, Seq[V])] = { - groupByKey(defaultParallelism) + groupByKey(defaultPartitioner(self)) } def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = { - join(other, defaultParallelism) + join(other, defaultPartitioner(self, other)) + } + + def join[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, W))] = { + join(other, new HashPartitioner(numSplits)) } def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = { - leftOuterJoin(other, defaultParallelism) + leftOuterJoin(other, defaultPartitioner(self, other)) + } + + def leftOuterJoin[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, Option[W]))] = { + leftOuterJoin(other, new HashPartitioner(numSplits)) } def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = { - rightOuterJoin(other, defaultParallelism) + rightOuterJoin(other, defaultPartitioner(self, other)) } - def defaultParallelism = self.context.defaultParallelism + def rightOuterJoin[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (Option[V], W))] = { + rightOuterJoin(other, new HashPartitioner(numSplits)) + } def collectAsMap(): Map[K, V] = HashMap(self.collect(): _*) @@ -194,42 +177,72 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( new MappedValuesRDD(self, cleanF) } - def flatMapValues[U](f: V => Traversable[U]): RDD[(K, U)] = { + def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = { val cleanF = self.context.clean(f) new FlatMappedValuesRDD(self, cleanF) } - def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { - val part = self.partitioner match { - case Some(p) => p - case None => new HashPartitioner(defaultParallelism) - } + def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = { val cg = new CoGroupedRDD[K]( Seq(self.asInstanceOf[RDD[(_, _)]], other.asInstanceOf[RDD[(_, _)]]), - part) - val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)( - classManifest[K], - Manifests.seqSeqManifest) + partitioner) + val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest) prfs.mapValues { case Seq(vs, ws) => (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]]) } } - def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) + def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner) : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { - val part = self.partitioner match { - case Some(p) => p - case None => new HashPartitioner(defaultParallelism) - } - new CoGroupedRDD[K]( + val cg = new CoGroupedRDD[K]( Seq(self.asInstanceOf[RDD[(_, _)]], other1.asInstanceOf[RDD[(_, _)]], other2.asInstanceOf[RDD[(_, _)]]), - part).map { - case (k, Seq(vs, w1s, w2s)) => - (k, (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]])) + partitioner) + val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest) + prfs.mapValues { + case Seq(vs, w1s, w2s) => + (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]]) + } + } + + def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { + cogroup(other, defaultPartitioner(self, other)) + } + + def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) + : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { + cogroup(other1, other2, defaultPartitioner(self, other1, other2)) + } + + def cogroup[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (Seq[V], Seq[W]))] = { + cogroup(other, new HashPartitioner(numSplits)) + } + + def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numSplits: Int) + : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { + cogroup(other1, other2, new HashPartitioner(numSplits)) + } + + def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { + cogroup(other, defaultPartitioner(self, other)) + } + + def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) + : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { + cogroup(other1, other2, defaultPartitioner(self, other1, other2)) + } + + /** + * Choose a partitioner to use for a cogroup-like operation between a number of RDDs. If any of + * the RDDs already has a partitioner, choose that one, otherwise use a default HashPartitioner. + */ + def defaultPartitioner(rdds: RDD[_]*): Partitioner = { + for (r <- rdds if r.partitioner != None) { + return r.partitioner.get } + return new HashPartitioner(self.context.defaultParallelism) } def lookup(key: K): Seq[V] = { @@ -376,6 +389,7 @@ class SortedRDD[K <% Ordered[K], V](prev: RDD[(K, V)], ascending: Boolean) override def splits = prev.splits override val partitioner = prev.partitioner override val dependencies = List(new OneToOneDependency(prev)) + override def compute(split: Split) = { prev.iterator(split).toArray .sortWith((x, y) => if (ascending) x._1 < y._1 else x._1 > y._1).iterator @@ -389,16 +403,15 @@ class MappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => U) extends RDD[(K, U)] override def compute(split: Split) = prev.iterator(split).map{case (k, v) => (k, f(v))} } -class FlatMappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => Traversable[U]) +class FlatMappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => TraversableOnce[U]) extends RDD[(K, U)](prev.context) { override def splits = prev.splits override val dependencies = List(new OneToOneDependency(prev)) override val partitioner = prev.partitioner + override def compute(split: Split) = { - prev.iterator(split).toStream.flatMap { - case (k, v) => f(v).map(x => (k, x)) - }.iterator + prev.iterator(split).flatMap { case (k, v) => f(v).map(x => (k, x)) } } } diff --git a/core/src/main/scala/spark/Partitioner.scala b/core/src/main/scala/spark/Partitioner.scala index ac61fe3b54..024a4580ac 100644 --- a/core/src/main/scala/spark/Partitioner.scala +++ b/core/src/main/scala/spark/Partitioner.scala @@ -26,8 +26,9 @@ class HashPartitioner(partitions: Int) extends Partitioner { } class RangePartitioner[K <% Ordered[K]: ClassManifest, V]( - partitions: Int, rdd: RDD[(K,V)], - ascending: Boolean = true) + partitions: Int, + @transient rdd: RDD[(K,V)], + private val ascending: Boolean = true) extends Partitioner { private val rangeBounds: Array[K] = { @@ -65,7 +66,7 @@ class RangePartitioner[K <% Ordered[K]: ClassManifest, V]( override def equals(other: Any): Boolean = other match { case r: RangePartitioner[_,_] => - r.rangeBounds.sameElements(rangeBounds) + r.rangeBounds.sameElements(rangeBounds) && r.ascending == ascending case _ => false } diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index fa53d9be2c..4c4b2ee30d 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -83,7 +83,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f)) - def flatMap[U: ClassManifest](f: T => Traversable[U]): RDD[U] = + def flatMap[U: ClassManifest](f: T => TraversableOnce[U]): RDD[U] = new FlatMappedRDD(this, sc.clean(f)) def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f)) @@ -275,7 +275,7 @@ class MappedRDD[U: ClassManifest, T: ClassManifest]( class FlatMappedRDD[U: ClassManifest, T: ClassManifest]( prev: RDD[T], - f: T => Traversable[U]) + f: T => TraversableOnce[U]) extends RDD[U](prev.context) { override def splits = prev.splits diff --git a/core/src/test/scala/spark/PartitioningSuite.scala b/core/src/test/scala/spark/PartitioningSuite.scala new file mode 100644 index 0000000000..7f7f9493dc --- /dev/null +++ b/core/src/test/scala/spark/PartitioningSuite.scala @@ -0,0 +1,101 @@ +package spark + +import org.scalatest.FunSuite + +import scala.collection.mutable.ArrayBuffer + +import SparkContext._ + +class PartitioningSuite extends FunSuite { + test("HashPartitioner equality") { + val p2 = new HashPartitioner(2) + val p4 = new HashPartitioner(4) + val anotherP4 = new HashPartitioner(4) + assert(p2 === p2) + assert(p4 === p4) + assert(p2 != p4) + assert(p4 != p2) + assert(p4 === anotherP4) + assert(anotherP4 === p4) + } + + test("RangePartitioner equality") { + val sc = new SparkContext("local", "test") + + // Make an RDD where all the elements are the same so that the partition range bounds + // are deterministically all the same. + val rdd = sc.parallelize(Seq(1, 1, 1, 1)).map(x => (x, x)) + + val p2 = new RangePartitioner(2, rdd) + val p4 = new RangePartitioner(4, rdd) + val anotherP4 = new RangePartitioner(4, rdd) + val descendingP2 = new RangePartitioner(2, rdd, false) + val descendingP4 = new RangePartitioner(4, rdd, false) + + assert(p2 === p2) + assert(p4 === p4) + assert(p2 != p4) + assert(p4 != p2) + assert(p4 === anotherP4) + assert(anotherP4 === p4) + assert(descendingP2 === descendingP2) + assert(descendingP4 === descendingP4) + assert(descendingP2 != descendingP4) + assert(descendingP4 != descendingP2) + assert(p2 != descendingP2) + assert(p4 != descendingP4) + assert(descendingP2 != p2) + assert(descendingP4 != p4) + + sc.stop() + } + + test("HashPartitioner not equal to RangePartitioner") { + val sc = new SparkContext("local", "test") + val rdd = sc.parallelize(1 to 10).map(x => (x, x)) + val rangeP2 = new RangePartitioner(2, rdd) + val hashP2 = new HashPartitioner(2) + assert(rangeP2 === rangeP2) + assert(hashP2 === hashP2) + assert(hashP2 != rangeP2) + assert(rangeP2 != hashP2) + sc.stop() + } + + test("partitioner preservation") { + val sc = new SparkContext("local", "test") + + val rdd = sc.parallelize(1 to 10, 4).map(x => (x, x)) + + val grouped2 = rdd.groupByKey(2) + val grouped4 = rdd.groupByKey(4) + val reduced2 = rdd.reduceByKey(_ + _, 2) + val reduced4 = rdd.reduceByKey(_ + _, 4) + + assert(rdd.partitioner === None) + + assert(grouped2.partitioner === Some(new HashPartitioner(2))) + assert(grouped4.partitioner === Some(new HashPartitioner(4))) + assert(reduced2.partitioner === Some(new HashPartitioner(2))) + assert(reduced4.partitioner === Some(new HashPartitioner(4))) + + assert(grouped2.groupByKey().partitioner === grouped2.partitioner) + assert(grouped2.groupByKey(3).partitioner != grouped2.partitioner) + assert(grouped2.groupByKey(2).partitioner === grouped2.partitioner) + assert(grouped4.groupByKey().partitioner === grouped4.partitioner) + assert(grouped4.groupByKey(3).partitioner != grouped4.partitioner) + assert(grouped4.groupByKey(4).partitioner === grouped4.partitioner) + + assert(grouped2.join(grouped4).partitioner === grouped2.partitioner) + assert(grouped2.leftOuterJoin(grouped4).partitioner === grouped2.partitioner) + assert(grouped2.rightOuterJoin(grouped4).partitioner === grouped2.partitioner) + assert(grouped2.cogroup(grouped4).partitioner === grouped2.partitioner) + + assert(grouped2.join(reduced2).partitioner === grouped2.partitioner) + assert(grouped2.leftOuterJoin(reduced2).partitioner === grouped2.partitioner) + assert(grouped2.rightOuterJoin(reduced2).partitioner === grouped2.partitioner) + assert(grouped2.cogroup(reduced2).partitioner === grouped2.partitioner) + + sc.stop() + } +} |