diff options
author | Andrew Or <andrewor14@gmail.com> | 2013-12-24 16:15:02 -0800 |
---|---|---|
committer | Andrew Or <andrewor14@gmail.com> | 2013-12-26 23:40:07 -0800 |
commit | 17def8cc1132a5c94d895dacba4217ef9a0e5bd0 (patch) | |
tree | 7e2cbfbe67a31f50322793c2b929dc22db987391 /core | |
parent | 6a45ec1972d6fc053a10fbd2373f43e0c7562aa5 (diff) | |
download | spark-17def8cc1132a5c94d895dacba4217ef9a0e5bd0.tar.gz spark-17def8cc1132a5c94d895dacba4217ef9a0e5bd0.tar.bz2 spark-17def8cc1132a5c94d895dacba4217ef9a0e5bd0.zip |
Refactor ExternalAppendOnlyMap to take in KVC instead of just KV
Diffstat (limited to 'core')
3 files changed, 78 insertions, 76 deletions
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index 77a24733aa..c51fb1d630 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -33,28 +33,20 @@ case class Aggregator[K, V, C] ( def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = { //val combiners = new AppendOnlyMap[K, C] - val combiners = new ExternalAppendOnlyMap[K, C](mergeCombiners) - var kv: Product2[K, V] = null - val update = (hadValue: Boolean, oldValue: C) => { - if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) - } + val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners) while (iter.hasNext) { - kv = iter.next() - combiners.changeValue(kv._1, update) + val kv = iter.next() + combiners.insert(kv._1, kv._2) } combiners.iterator } def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = { //val combiners = new AppendOnlyMap[K, C] - val combiners = new ExternalAppendOnlyMap[K, C](mergeCombiners) - var kc: (K, C) = null - val update = (hadValue: Boolean, oldValue: C) => { - if (hadValue) mergeCombiners(oldValue, kc._2) else kc._2 - } + val combiners = new ExternalAppendOnlyMap[K, C, C]((c:C) => c, mergeCombiners, mergeCombiners) while (iter.hasNext) { - kc = iter.next() - combiners.changeValue(kc._1, update) + val kc = iter.next() + combiners.insert(kc._1, kc._2) } combiners.iterator } diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index 4c45a94af9..b93c60cd67 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -100,52 +100,56 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: override val partitioner = Some(part) - override def compute(s: Partition, context: TaskContext): Iterator[(K, Seq[Seq[_]])] = { + override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = { // e.g. for `(k, a) cogroup (k, b)`, K -> Seq(ArrayBuffer as, ArrayBuffer bs) val split = s.asInstanceOf[CoGroupPartition] val numRdds = split.deps.size - def combine(x: Seq[ArrayBuffer[Any]], y: Seq[ArrayBuffer[Any]]) = { - x.zipAll(y, ArrayBuffer[Any](), ArrayBuffer[Any]()).map { - case (a, b) => a ++ b - } - } - //val map = new AppendOnlyMap[K, Seq[ArrayBuffer[Any]]] - val map = new ExternalAppendOnlyMap[K, Seq[ArrayBuffer[Any]]](combine) + //val combiners = new AppendOnlyMap[K, Seq[ArrayBuffer[Any]]] + val combiners = createExternalMap(numRdds) val ser = SparkEnv.get.serializerManager.get(serializerClass) for ((dep, depNum) <- split.deps.zipWithIndex) dep match { case NarrowCoGroupSplitDep(rdd, _, itsSplit) => { // Read them from the parent rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]].foreach { - kv => addToMap(kv._1, kv._2, depNum) + kv => combiners.insert(kv._1, new CoGroupValue(kv._2, depNum)) } } case ShuffleCoGroupSplitDep(shuffleId) => { // Read map outputs of shuffle val fetcher = SparkEnv.get.shuffleFetcher fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context, ser).foreach { - kv => addToMap(kv._1, kv._2, depNum) + kv => combiners.insert(kv._1, new CoGroupValue(kv._2, depNum)) } } } + new InterruptibleIterator(context, combiners.iterator) + } - def addToMap(key: K, value: Any, depNum: Int) { - def update(hadVal: Boolean, oldVal: Seq[ArrayBuffer[Any]]): Seq[ArrayBuffer[Any]] = { - var newVal = oldVal - if (!hadVal){ - newVal = Array.fill(numRdds)(new ArrayBuffer[Any]) - } - newVal(depNum) += value - newVal + def createExternalMap(numRdds:Int): ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner] = { + def createCombiner(v: CoGroupValue): CoGroupCombiner = { + val newCombiner = Array.fill(numRdds)(new CoGroup) + mergeValue(newCombiner, v) + } + def mergeValue(c: CoGroupCombiner, v: CoGroupValue): CoGroupCombiner = { + v match { case (value, depNum) => c(depNum) += value } + c + } + def mergeCombiners(c1: CoGroupCombiner, c2: CoGroupCombiner): CoGroupCombiner = { + c1.zipAll(c2, new CoGroup, new CoGroup).map { + case (v1, v2) => v1 ++ v2 } - map.changeValue(key, update) } - - new InterruptibleIterator(context, map.iterator) + new ExternalAppendOnlyMap [K, CoGroupValue, CoGroupCombiner] ( + createCombiner,mergeValue, mergeCombiners) } override def clearDependencies() { super.clearDependencies() rdds = null } + + type CoGroup = ArrayBuffer[Any] + type CoGroupValue = (Any, Int) // Int is dependency number + type CoGroupCombiner = Seq[CoGroup] } diff --git a/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala index 857f8e3439..1a5753603e 100644 --- a/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala @@ -22,80 +22,86 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.mutable /** - * A simple map that spills sorted content to disk when the memory threshold is exceeded. A combiner - * function must be specified to merge values back into memory during read. + * An append-only map that spills sorted content to disk when the memory threshold is exceeded. */ -class ExternalAppendOnlyMap[K, V](combineFunction: (V, V) => V, - memoryThresholdMB: Int = 1024) - extends Iterable[(K, V)] with Serializable { +class ExternalAppendOnlyMap[K, V, C] (createCombiner: V => C, + mergeValue: (C, V) => C, + mergeCombiners: (C, C) => C, + memoryThresholdMB: Int = 1024) + extends Iterable[(K, C)] with Serializable { - var currentMap = new AppendOnlyMap[K, V] - var oldMaps = new ArrayBuffer[DiskKVIterator] + var currentMap = new AppendOnlyMap[K, C] + var oldMaps = new ArrayBuffer[DiskKCIterator] - def changeValue(key: K, updateFunc: (Boolean, V) => V): Unit = { - currentMap.changeValue(key, updateFunc) + def insert(key: K, value: V): Unit = { + currentMap.changeValue(key, updateFunction(value)) val mapSize = SizeEstimator.estimate(currentMap) if (mapSize > memoryThresholdMB * math.pow(1024, 2)) { spill() } } + def updateFunction(value: V) : (Boolean, C) => C = { + (hadVal: Boolean, oldVal: C) => + if (hadVal) mergeValue(oldVal, value) else createCombiner(value) + } + def spill(): Unit = { val file = File.createTempFile("external_append_only_map", "") // Add spill location val out = new ObjectOutputStream(new FileOutputStream(file)) val sortedMap = currentMap.iterator.toList.sortBy(kv => kv._1.hashCode()) sortedMap.foreach { out.writeObject( _ ) } out.close() - currentMap = new AppendOnlyMap[K, V] - oldMaps.append(new DiskKVIterator(file)) + currentMap = new AppendOnlyMap[K, C] + oldMaps.append(new DiskKCIterator(file)) } - override def iterator: Iterator[(K, V)] = new ExternalIterator() + override def iterator: Iterator[(K, C)] = new ExternalIterator() /** * An iterator that merges KV pairs from memory and disk in sorted order */ - class ExternalIterator extends Iterator[(K, V)] { + class ExternalIterator extends Iterator[(K, C)] { // Order by increasing key hash value - implicit object KVOrdering extends Ordering[KVITuple] { - def compare(a:KVITuple, b:KVITuple) = -a.key.hashCode().compareTo(b.key.hashCode()) + implicit object KVOrdering extends Ordering[KCITuple] { + def compare(a:KCITuple, b:KCITuple) = -a.key.hashCode().compareTo(b.key.hashCode()) } - val pq = mutable.PriorityQueue[KVITuple]() - val inputStreams = Seq(new MemoryKVIterator(currentMap)) ++ oldMaps + val pq = mutable.PriorityQueue[KCITuple]() + val inputStreams = Seq(new MemoryKCIterator(currentMap)) ++ oldMaps inputStreams.foreach { readFromIterator( _ ) } override def hasNext: Boolean = !pq.isEmpty // Combine all values from all input streams corresponding to the same key - override def next(): (K,V) = { - val minKVI = pq.dequeue() - var (minKey, minValue) = (minKVI.key, minKVI.value) + override def next(): (K, C) = { + val minKCI = pq.dequeue() + var (minKey, minCombiner) = (minKCI.key, minKCI.combiner) val minHash = minKey.hashCode() - readFromIterator(minKVI.iter) + readFromIterator(minKCI.iter) - var collidedKVI = ArrayBuffer[KVITuple]() + var collidedKCI = ArrayBuffer[KCITuple]() while (!pq.isEmpty && pq.head.key.hashCode() == minHash) { - val newKVI: KVITuple = pq.dequeue() - if (newKVI.key == minKey){ - minValue = combineFunction(minValue, newKVI.value) - readFromIterator(newKVI.iter) + val newKCI: KCITuple = pq.dequeue() + if (newKCI.key == minKey){ + minCombiner = mergeCombiners(minCombiner, newKCI.combiner) + readFromIterator(newKCI.iter) } else { // Collision - collidedKVI += newKVI + collidedKCI += newKCI } } - collidedKVI.foreach { pq.enqueue( _ ) } - (minKey, minValue) + collidedKCI.foreach { pq.enqueue( _ ) } + (minKey, minCombiner) } // Read from the given iterator until a key of different hash is retrieved, - // Add each KV pair read from this iterator to the heap - def readFromIterator(iter: Iterator[(K, V)]): Unit = { + // Add each KC pair read from this iterator to the heap + def readFromIterator(iter: Iterator[(K, C)]): Unit = { var minHash : Option[Int] = None while (iter.hasNext) { - val (k, v) = iter.next() - pq.enqueue(KVITuple(k, v, iter)) + val (k, c) = iter.next() + pq.enqueue(KCITuple(k, c, iter)) minHash match { case None => minHash = Some(k.hashCode()) case Some(expectedHash) => @@ -106,19 +112,19 @@ class ExternalAppendOnlyMap[K, V](combineFunction: (V, V) => V, } } - case class KVITuple(key:K, value:V, iter:Iterator[(K, V)]) + case class KCITuple(key:K, combiner:C, iter:Iterator[(K, C)]) } - class MemoryKVIterator(map: AppendOnlyMap[K, V]) extends Iterator[(K, V)] { - val sortedMap = currentMap.iterator.toList.sortBy(kv => kv._1.hashCode()) + class MemoryKCIterator(map: AppendOnlyMap[K, C]) extends Iterator[(K, C)] { + val sortedMap = currentMap.iterator.toList.sortBy(kc => kc._1.hashCode()) val it = sortedMap.iterator override def hasNext: Boolean = it.hasNext - override def next(): (K, V) = it.next() + override def next(): (K, C) = it.next() } - class DiskKVIterator(file: File) extends Iterator[(K, V)] { + class DiskKCIterator(file: File) extends Iterator[(K, C)] { val in = new ObjectInputStream(new FileInputStream(file)) - var nextItem:(K, V) = _ + var nextItem:(K, C) = _ var eof = false override def hasNext: Boolean = { @@ -126,7 +132,7 @@ class ExternalAppendOnlyMap[K, V](combineFunction: (V, V) => V, return false } try { - nextItem = in.readObject().asInstanceOf[(K, V)] + nextItem = in.readObject().asInstanceOf[(K, C)] } catch { case e: EOFException => eof = true @@ -135,7 +141,7 @@ class ExternalAppendOnlyMap[K, V](combineFunction: (V, V) => V, true } - override def next(): (K, V) = { + override def next(): (K, C) = { if (eof) { throw new NoSuchElementException } |