From 985705301e5e55de14b00ad8ce3143e91aae185d Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Sun, 27 Jul 2014 11:20:20 -0700 Subject: SPARK-2684: Update ExternalAppendOnlyMap to take an iterator as input This will decrease object allocation from the "update" closure used in map.changeValue. Author: Matei Zaharia Closes #1607 from mateiz/spark-2684 and squashes the following commits: b7d89e6 [Matei Zaharia] Add insertAll for Iterables too, and fix some code style 561fc97 [Matei Zaharia] Update ExternalAppendOnlyMap to take an iterator as input --- .../main/scala/org/apache/spark/Aggregator.scala | 5 +- .../scala/org/apache/spark/rdd/CoGroupedRDD.scala | 7 +- .../util/collection/ExternalAppendOnlyMap.scala | 77 +++++++++++++++------- .../collection/ExternalAppendOnlyMapSuite.scala | 17 ++--- 4 files changed, 64 insertions(+), 42 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index 1d640579ef..ff0ca11749 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -55,10 +55,7 @@ case class Aggregator[K, V, C] ( combiners.iterator } else { val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners) - while (iter.hasNext) { - val pair = iter.next() - combiners.insert(pair._1, pair._2) - } + combiners.insertAll(iter) // TODO: Make this non optional in a future release Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled) Option(context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled) diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index 7d96089e52..6388ef82cc 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -154,11 +154,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]]) } else { val map = createExternalMap(numRdds) - rddIterators.foreach { case (it, depNum) => - while (it.hasNext) { - val kv = it.next() - map.insert(kv._1, new CoGroupValue(kv._2, depNum)) - } + for ((it, depNum) <- rddIterators) { + map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum)))) } context.taskMetrics.memoryBytesSpilled = map.memoryBytesSpilled context.taskMetrics.diskBytesSpilled = map.diskBytesSpilled diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index c22bb8d9c6..6f263c39d1 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -110,42 +110,69 @@ class ExternalAppendOnlyMap[K, V, C]( /** * Insert the given key and value into the map. + */ + def insert(key: K, value: V): Unit = { + insertAll(Iterator((key, value))) + } + + /** + * Insert the given iterator of keys and values into the map. * - * If the underlying map is about to grow, check if the global pool of shuffle memory has + * When the underlying map needs to grow, check if the global pool of shuffle memory has * enough room for this to happen. If so, allocate the memory required to grow the map; * otherwise, spill the in-memory map to disk. * * The shuffle memory usage of the first trackMemoryThreshold entries is not tracked. */ - def insert(key: K, value: V) { + def insertAll(entries: Iterator[Product2[K, V]]): Unit = { + // An update function for the map that we reuse across entries to avoid allocating + // a new closure each time + var curEntry: Product2[K, V] = null val update: (Boolean, C) => C = (hadVal, oldVal) => { - if (hadVal) mergeValue(oldVal, value) else createCombiner(value) + if (hadVal) mergeValue(oldVal, curEntry._2) else createCombiner(curEntry._2) } - if (numPairsInMemory > trackMemoryThreshold && currentMap.atGrowThreshold) { - val mapSize = currentMap.estimateSize() - var shouldSpill = false - val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap - - // Atomically check whether there is sufficient memory in the global pool for - // this map to grow and, if possible, allocate the required amount - shuffleMemoryMap.synchronized { - val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId) - val availableMemory = maxMemoryThreshold - - (shuffleMemoryMap.values.sum - previouslyOccupiedMemory.getOrElse(0L)) - - // Assume map growth factor is 2x - shouldSpill = availableMemory < mapSize * 2 - if (!shouldSpill) { - shuffleMemoryMap(threadId) = mapSize * 2 + + while (entries.hasNext) { + curEntry = entries.next() + if (numPairsInMemory > trackMemoryThreshold && currentMap.atGrowThreshold) { + val mapSize = currentMap.estimateSize() + var shouldSpill = false + val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap + + // Atomically check whether there is sufficient memory in the global pool for + // this map to grow and, if possible, allocate the required amount + shuffleMemoryMap.synchronized { + val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId) + val availableMemory = maxMemoryThreshold - + (shuffleMemoryMap.values.sum - previouslyOccupiedMemory.getOrElse(0L)) + + // Assume map growth factor is 2x + shouldSpill = availableMemory < mapSize * 2 + if (!shouldSpill) { + shuffleMemoryMap(threadId) = mapSize * 2 + } + } + // Do not synchronize spills + if (shouldSpill) { + spill(mapSize) } } - // Do not synchronize spills - if (shouldSpill) { - spill(mapSize) - } + currentMap.changeValue(curEntry._1, update) + numPairsInMemory += 1 } - currentMap.changeValue(key, update) - numPairsInMemory += 1 + } + + /** + * Insert the given iterable of keys and values into the map. + * + * When the underlying map needs to grow, check if the global pool of shuffle memory has + * enough room for this to happen. If so, allocate the memory required to grow the map; + * otherwise, spill the in-memory map to disk. + * + * The shuffle memory usage of the first trackMemoryThreshold entries is not tracked. + */ + def insertAll(entries: Iterable[Product2[K, V]]): Unit = { + insertAll(entries.iterator) } /** diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index 428822949c..0b7ad184a4 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -63,12 +63,13 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner, mergeValue, mergeCombiners) - map.insert(1, 10) - map.insert(2, 20) - map.insert(3, 30) - map.insert(1, 100) - map.insert(2, 200) - map.insert(1, 1000) + map.insertAll(Seq( + (1, 10), + (2, 20), + (3, 30), + (1, 100), + (2, 200), + (1, 1000))) val it = map.iterator assert(it.hasNext) val result = it.toSet[(Int, ArrayBuffer[Int])].map(kv => (kv._1, kv._2.toSet)) @@ -282,7 +283,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { assert(w1.hashCode === w2.hashCode) } - (1 to 100000).map(_.toString).foreach { i => map.insert(i, i) } + map.insertAll((1 to 100000).iterator.map(_.toString).map(i => (i, i))) collisionPairs.foreach { case (w1, w2) => map.insert(w1, w2) map.insert(w2, w1) @@ -355,7 +356,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]]( createCombiner, mergeValue, mergeCombiners) - (1 to 100000).foreach { i => map.insert(i, i) } + map.insertAll((1 to 100000).iterator.map(i => (i, i))) map.insert(null.asInstanceOf[Int], 1) map.insert(1, null.asInstanceOf[Int]) map.insert(null.asInstanceOf[Int], null.asInstanceOf[Int]) -- cgit v1.2.3