diff options
-rw-r--r-- | core/src/main/scala/spark/PairRDDFunctions.scala | 9 |
1 files changed, 9 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 78dcfcc689..9ae47a3385 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -77,6 +77,15 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) ex bufs.asInstanceOf[RDD[(K, Seq[V])]] } + def partitionBy(partitioner: Partitioner): RDD[(K, V)] = { + def createCombiner(v: V) = ArrayBuffer(v) + def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v + def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2 + val bufs = combineByKey[ArrayBuffer[V]]( + createCombiner _, mergeValue _, mergeCombiners _, defaultParallelism, partitioner) + bufs.flatMapValues(buf => buf) + } + def join[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, W))] = { val vs: RDD[(K, Either[V, W])] = self.map { case (k, v) => (k, Left(v)) } val ws: RDD[(K, Either[V, W])] = other.map { case (k, w) => (k, Right(w)) } |