diff options
author | Andrew Or <andrewor14@gmail.com> | 2014-01-01 11:42:33 -0800 |
---|---|---|
committer | Andrew Or <andrewor14@gmail.com> | 2014-01-01 11:42:33 -0800 |
commit | 92c304fd0321d77941f0b029dc7b7f61804d8bca (patch) | |
tree | b8094f0b987cd9fea2d0a6a83a62a2435e28b127 | |
parent | 3bc9e391a3eb1dd21bb93b15caf49627134c1917 (diff) | |
download | spark-92c304fd0321d77941f0b029dc7b7f61804d8bca.tar.gz spark-92c304fd0321d77941f0b029dc7b7f61804d8bca.tar.bz2 spark-92c304fd0321d77941f0b029dc7b7f61804d8bca.zip |
Simplify ExternalAppendOnlyMap on the assumption that the mergeCombiners function is specified
3 files changed, 53 insertions, 135 deletions
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index 784c09ec51..c408d5f145 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -71,8 +71,7 @@ case class Aggregator[K, V, C: ClassTag] ( } combiners.iterator } else { - val combiners = - new ExternalAppendOnlyMap[K, C, C](Predef.identity, mergeCombiners, mergeCombiners) + val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners) while (iter.hasNext) { val kc = iter.next() combiners.insert(kc._1, kc._2) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 223fae128e..9e147feec4 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -23,98 +23,40 @@ import java.util.Comparator import it.unimi.dsi.fastutil.io.FastBufferedInputStream import scala.collection.mutable.{ArrayBuffer, PriorityQueue} -import scala.reflect.ClassTag import org.apache.spark.{Logging, SparkEnv} import org.apache.spark.serializer.Serializer import org.apache.spark.storage.{DiskBlockManager, DiskBlockObjectWriter} /** - * A wrapper for SpillableAppendOnlyMap that handles two cases: + * An append-only map that spills sorted content to disk when the memory threshold is exceeded. * - * (1) If a mergeCombiners function is specified, merge values into combiners before disk - * spill, as it is possible to merge the resulting combiners later. + * This map takes two passes over the data: + * (1) Values are merged into combiners, which are sorted and spilled to disk in as necessary. + * (2) Combiners are read from disk and merged together * - * (2) Otherwise, group values of the same key together before disk spill, and merge them - * into combiners only after reading them back from disk. + * Two parameters control the memory threshold: `spark.shuffle.buffer.mb` specifies the maximum + * size of the in-memory map before a spill, and `spark.shuffle.buffer.fraction` specifies an + * additional margin of safety. The second parameter is important for the following reason: * - * In the latter case, values occupy much more space because they are not collapsed as soon - * as they are inserted. This in turn leads to more disk spills, degrading performance. - * For this reason, a mergeCombiners function should be specified if possible. + * If the spill threshold is set too high, the in-memory map may occupy more memory than is + * available, resulting in OOM. However, if the spill threshold is set too low, we spill + * frequently and incur unnecessary disk writes. This may lead to a performance regression + * compared to the normal case of using the non-spilling AppendOnlyMap. */ -private[spark] class ExternalAppendOnlyMap[K, V, C: ClassTag]( + +private[spark] class ExternalAppendOnlyMap[K, V, C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, serializer: Serializer = SparkEnv.get.serializerManager.default, diskBlockManager: DiskBlockManager = SparkEnv.get.blockManager.diskBlockManager) - extends Iterable[(K, C)] with Serializable { - - private val mergeBeforeSpill: Boolean = mergeCombiners != null - - private val map: SpillableAppendOnlyMap[K, V, _, C] = { - if (mergeBeforeSpill) { - new SpillableAppendOnlyMap[K, V, C, C] (createCombiner, mergeValue, mergeCombiners, - identity, serializer, diskBlockManager) - } else { - // Use ArrayBuffer[V] as the intermediate combiner - val createGroup: (V => ArrayBuffer[V]) = value => ArrayBuffer[V](value) - val mergeValueIntoGroup: (ArrayBuffer[V], V) => ArrayBuffer[V] = (group, value) => { - group += value - } - val mergeGroups: (ArrayBuffer[V], ArrayBuffer[V]) => ArrayBuffer[V] = (group1, group2) => { - group1 ++= group2 - } - val combineGroup: (ArrayBuffer[V] => C) = group => { - var combiner : Option[C] = None - group.foreach { v => - combiner match { - case None => combiner = Some(createCombiner(v)) - case Some(c) => combiner = Some(mergeValue(c, v)) - } - } - combiner.getOrElse(null.asInstanceOf[C]) - } - new SpillableAppendOnlyMap[K, V, ArrayBuffer[V], C](createGroup, mergeValueIntoGroup, - mergeGroups, combineGroup, serializer, diskBlockManager) - } - } - - def insert(key: K, value: V): Unit = map.insert(key, value) - - override def iterator: Iterator[(K, C)] = map.iterator -} - -/** - * An append-only map that spills sorted content to disk when the memory threshold is exceeded. - * A group is an intermediate combiner, with type G equal to either C or ArrayBuffer[V]. - * - * This map takes two passes over the data: - * (1) Values are merged into groups, which are spilled to disk as necessary. - * (2) Groups are read from disk and merged into combiners, which are returned. - * - * If we never spill to disk, we avoid the second pass provided that groups G are already - * combiners C. - * - * Note that OOM is still possible with the SpillableAppendOnlyMap. This may occur if the - * collective G values do not fit into memory, or if the size estimation is not sufficiently - * accurate. To account for the latter, `spark.shuffle.buffer.fraction` specifies an additional - * margin of safety, while `spark.shuffle.buffer.mb` specifies the raw memory threshold. - */ -private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag]( - createGroup: V => G, - mergeValue: (G, V) => G, - mergeGroups: (G, G) => G, - createCombiner: G => C, - serializer: Serializer, - diskBlockManager: DiskBlockManager) extends Iterable[(K, C)] with Serializable with Logging { - import SpillableAppendOnlyMap._ + import ExternalAppendOnlyMap._ - private var currentMap = new SizeTrackingAppendOnlyMap[K, G] + private var currentMap = new SizeTrackingAppendOnlyMap[K, C] private val spilledMaps = new ArrayBuffer[DiskIterator] - private val memoryThresholdMB = { // TODO: Turn this into a fraction of memory per reducer val bufferSize = System.getProperty("spark.shuffle.buffer.mb", "1024").toLong @@ -123,13 +65,13 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag]( } private val fileBufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024 - private val comparator = new KeyGroupComparator[K, G] + private val comparator = new KCComparator[K, C] private val ser = serializer.newInstance() private var spillCount = 0 def insert(key: K, value: V): Unit = { - val update: (Boolean, G) => G = (hadVal, oldVal) => { - if (hadVal) mergeValue(oldVal, value) else createGroup(value) + val update: (Boolean, C) => C = (hadVal, oldVal) => { + if (hadVal) mergeValue(oldVal, value) else createCombiner(value) } currentMap.changeValue(key, update) if (currentMap.estimateSize() > memoryThresholdMB * 1024 * 1024) { @@ -154,19 +96,19 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag]( // Partial failures cannot be tolerated; do not revert partial writes writer.close() } - currentMap = new SizeTrackingAppendOnlyMap[K, G] + currentMap = new SizeTrackingAppendOnlyMap[K, C] spilledMaps.append(new DiskIterator(file)) } override def iterator: Iterator[(K, C)] = { - if (spilledMaps.isEmpty && implicitly[ClassTag[G]] == implicitly[ClassTag[C]]) { - currentMap.iterator.asInstanceOf[Iterator[(K, C)]] + if (spilledMaps.isEmpty) { + currentMap.iterator } else { new ExternalIterator() } } - /** An iterator that sort-merges (K, G) pairs from memory and disk into (K, C) pairs. */ + /** An iterator that sort-merges (K, C) pairs from the in-memory and on-disk maps */ private class ExternalIterator extends Iterator[(K, C)] { // A fixed-size queue that maintains a buffer for each stream we are currently merging @@ -177,43 +119,43 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag]( val inputStreams = Seq(currentMap.destructiveSortedIterator(comparator)) ++ spilledMaps inputStreams.foreach{ it => - val kgPairs = getMorePairs(it) - mergeHeap.enqueue(StreamBuffer(it, kgPairs)) + val kcPairs = getMorePairs(it) + mergeHeap.enqueue(StreamBuffer(it, kcPairs)) } /** * Fetch from the given iterator until a key of different hash is retrieved. In the * event of key hash collisions, this ensures no pairs are hidden from being merged. */ - def getMorePairs(it: Iterator[(K, G)]): ArrayBuffer[(K, G)] = { - val kgPairs = new ArrayBuffer[(K, G)] + def getMorePairs(it: Iterator[(K, C)]): ArrayBuffer[(K, C)] = { + val kcPairs = new ArrayBuffer[(K, C)] if (it.hasNext) { - var kg = it.next() - kgPairs += kg - val minHash = kg._1.hashCode() - while (it.hasNext && kg._1.hashCode() == minHash) { - kg = it.next() - kgPairs += kg + var kc = it.next() + kcPairs += kc + val minHash = kc._1.hashCode() + while (it.hasNext && kc._1.hashCode() == minHash) { + kc = it.next() + kcPairs += kc } } - kgPairs + kcPairs } /** * If the given buffer contains a value for the given key, merge that value into - * baseGroup and remove the corresponding (K, G) pair from the buffer + * baseCombiner and remove the corresponding (K, C) pair from the buffer */ - def mergeIfKeyExists(key: K, baseGroup: G, buffer: StreamBuffer): G = { + def mergeIfKeyExists(key: K, baseCombiner: C, buffer: StreamBuffer): C = { var i = 0 while (i < buffer.pairs.size) { - val (k, g) = buffer.pairs(i) + val (k, c) = buffer.pairs(i) if (k == key) { buffer.pairs.remove(i) - return mergeGroups(baseGroup, g) + return mergeCombiners(baseCombiner, c) } i += 1 } - baseGroup + baseCombiner } override def hasNext: Boolean = { @@ -233,7 +175,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag]( // Should only happen when no other stream buffers have any pairs left throw new NoSuchElementException } - var (minKey, minGroup) = minPairs.remove(0) + var (minKey, minCombiner) = minPairs.remove(0) assert(minKey.hashCode() == minHash) // For all other streams that may have this key (i.e. have the same minimum key hash), @@ -241,7 +183,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag]( val mergedBuffers = ArrayBuffer[StreamBuffer](minBuffer) while (!mergeHeap.isEmpty && mergeHeap.head.minKeyHash == minHash) { val newBuffer = mergeHeap.dequeue() - minGroup = mergeIfKeyExists(minKey, minGroup, newBuffer) + minCombiner = mergeIfKeyExists(minKey, minCombiner, newBuffer) mergedBuffers += newBuffer } @@ -253,7 +195,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag]( mergeHeap.enqueue(buffer) } - (minKey, createCombiner(minGroup)) + (minKey, minCombiner) } /** @@ -263,7 +205,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag]( * * StreamBuffers are ordered by the minimum key hash found across all of their own pairs. */ - case class StreamBuffer(iterator: Iterator[(K, G)], pairs: ArrayBuffer[(K, G)]) + case class StreamBuffer(iterator: Iterator[(K, C)], pairs: ArrayBuffer[(K, C)]) extends Comparable[StreamBuffer] { def minKeyHash: Int = { @@ -282,18 +224,18 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag]( } } - // Iterate through (K, G) pairs in sorted order from an on-disk map - private class DiskIterator(file: File) extends Iterator[(K, G)] { + // Iterate through (K, C) pairs in sorted order from an on-disk map + private class DiskIterator(file: File) extends Iterator[(K, C)] { val fileStream = new FileInputStream(file) val bufferedStream = new FastBufferedInputStream(fileStream) val deserializeStream = ser.deserializeStream(bufferedStream) - var nextItem: Option[(K, G)] = None + var nextItem: Option[(K, C)] = None var eof = false - def readNextItem(): Option[(K, G)] = { + def readNextItem(): Option[(K, C)] = { if (!eof) { try { - return Some(deserializeStream.readObject().asInstanceOf[(K, G)]) + return Some(deserializeStream.readObject().asInstanceOf[(K, C)]) } catch { case e: EOFException => eof = true @@ -312,7 +254,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag]( } } - override def next(): (K, G) = { + override def next(): (K, C) = { nextItem match { case Some(item) => nextItem = None @@ -331,10 +273,10 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag]( } } -private[spark] object SpillableAppendOnlyMap { - private class KeyGroupComparator[K, G] extends Comparator[(K, G)] { - def compare(kg1: (K, G), kg2: (K, G)): Int = { - kg1._1.hashCode().compareTo(kg2._1.hashCode()) +private[spark] object ExternalAppendOnlyMap { + private class KCComparator[K, C] extends Comparator[(K, C)] { + def compare(kc1: (K, C), kc2: (K, C)): Int = { + kc1._1.hashCode().compareTo(kc2._1.hashCode()) } } } diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index baf94b4728..a18d466baa 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -229,27 +229,4 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local } } } - - test("spilling with no mergeCombiners function") { - System.setProperty("spark.shuffle.buffer.mb", "1") - System.setProperty("spark.shuffle.buffer.fraction", "0.05") - - // combineByKey - should spill exactly 11 times - val _createCombiner: Int => ArrayBuffer[Int] = i => ArrayBuffer[Int](i) - val _mergeValue: (ArrayBuffer[Int], Int) => ArrayBuffer[Int] = (buf, i) => buf += i - val rdd = sc.parallelize(0 until 10000).map(i => (i/4, i)) - val result = rdd.combineByKey[ArrayBuffer[Int]](_createCombiner, _mergeValue, null, - new HashPartitioner(1), mapSideCombine=false).collect() - - // result should be the same as groupByKey - assert(result.length == 2500) - result.foreach { case(i, seq) => - i match { - case 0 => assert(seq.toSet == Set[Int](0, 1, 2, 3)) - case 1250 => assert(seq.toSet == Set[Int](5000, 5001, 5002, 5003)) - case 2499 => assert(seq.toSet == Set[Int](9996, 9997, 9998, 9999)) - case _ => - } - } - } } |