aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2014-07-27 11:20:20 -0700
committerAaron Davidson <aaron@databricks.com>2014-07-27 11:20:20 -0700
commit985705301e5e55de14b00ad8ce3143e91aae185d (patch)
treebe918629d7111d634700a715697f56862222e452
parent3a69c72e5cbe270b76f6ab6a84a2e334e87cce8c (diff)
downloadspark-985705301e5e55de14b00ad8ce3143e91aae185d.tar.gz
spark-985705301e5e55de14b00ad8ce3143e91aae185d.tar.bz2
spark-985705301e5e55de14b00ad8ce3143e91aae185d.zip
SPARK-2684: Update ExternalAppendOnlyMap to take an iterator as input
This will decrease object allocation from the "update" closure used in map.changeValue. Author: Matei Zaharia <matei@databricks.com> Closes #1607 from mateiz/spark-2684 and squashes the following commits: b7d89e6 [Matei Zaharia] Add insertAll for Iterables too, and fix some code style 561fc97 [Matei Zaharia] Update ExternalAppendOnlyMap to take an iterator as input
-rw-r--r--core/src/main/scala/org/apache/spark/Aggregator.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala77
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala17
4 files changed, 64 insertions, 42 deletions
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 1d640579ef..ff0ca11749 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -55,10 +55,7 @@ case class Aggregator[K, V, C] (
combiners.iterator
} else {
val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
- while (iter.hasNext) {
- val pair = iter.next()
- combiners.insert(pair._1, pair._2)
- }
+ combiners.insertAll(iter)
// TODO: Make this non optional in a future release
Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
Option(context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled)
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 7d96089e52..6388ef82cc 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -154,11 +154,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
} else {
val map = createExternalMap(numRdds)
- rddIterators.foreach { case (it, depNum) =>
- while (it.hasNext) {
- val kv = it.next()
- map.insert(kv._1, new CoGroupValue(kv._2, depNum))
- }
+ for ((it, depNum) <- rddIterators) {
+ map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum))))
}
context.taskMetrics.memoryBytesSpilled = map.memoryBytesSpilled
context.taskMetrics.diskBytesSpilled = map.diskBytesSpilled
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index c22bb8d9c6..6f263c39d1 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -110,42 +110,69 @@ class ExternalAppendOnlyMap[K, V, C](
/**
* Insert the given key and value into the map.
+ */
+ def insert(key: K, value: V): Unit = {
+ insertAll(Iterator((key, value)))
+ }
+
+ /**
+ * Insert the given iterator of keys and values into the map.
*
- * If the underlying map is about to grow, check if the global pool of shuffle memory has
+ * When the underlying map needs to grow, check if the global pool of shuffle memory has
* enough room for this to happen. If so, allocate the memory required to grow the map;
* otherwise, spill the in-memory map to disk.
*
* The shuffle memory usage of the first trackMemoryThreshold entries is not tracked.
*/
- def insert(key: K, value: V) {
+ def insertAll(entries: Iterator[Product2[K, V]]): Unit = {
+ // An update function for the map that we reuse across entries to avoid allocating
+ // a new closure each time
+ var curEntry: Product2[K, V] = null
val update: (Boolean, C) => C = (hadVal, oldVal) => {
- if (hadVal) mergeValue(oldVal, value) else createCombiner(value)
+ if (hadVal) mergeValue(oldVal, curEntry._2) else createCombiner(curEntry._2)
}
- if (numPairsInMemory > trackMemoryThreshold && currentMap.atGrowThreshold) {
- val mapSize = currentMap.estimateSize()
- var shouldSpill = false
- val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap
-
- // Atomically check whether there is sufficient memory in the global pool for
- // this map to grow and, if possible, allocate the required amount
- shuffleMemoryMap.synchronized {
- val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId)
- val availableMemory = maxMemoryThreshold -
- (shuffleMemoryMap.values.sum - previouslyOccupiedMemory.getOrElse(0L))
-
- // Assume map growth factor is 2x
- shouldSpill = availableMemory < mapSize * 2
- if (!shouldSpill) {
- shuffleMemoryMap(threadId) = mapSize * 2
+
+ while (entries.hasNext) {
+ curEntry = entries.next()
+ if (numPairsInMemory > trackMemoryThreshold && currentMap.atGrowThreshold) {
+ val mapSize = currentMap.estimateSize()
+ var shouldSpill = false
+ val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap
+
+ // Atomically check whether there is sufficient memory in the global pool for
+ // this map to grow and, if possible, allocate the required amount
+ shuffleMemoryMap.synchronized {
+ val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId)
+ val availableMemory = maxMemoryThreshold -
+ (shuffleMemoryMap.values.sum - previouslyOccupiedMemory.getOrElse(0L))
+
+ // Assume map growth factor is 2x
+ shouldSpill = availableMemory < mapSize * 2
+ if (!shouldSpill) {
+ shuffleMemoryMap(threadId) = mapSize * 2
+ }
+ }
+ // Do not synchronize spills
+ if (shouldSpill) {
+ spill(mapSize)
}
}
- // Do not synchronize spills
- if (shouldSpill) {
- spill(mapSize)
- }
+ currentMap.changeValue(curEntry._1, update)
+ numPairsInMemory += 1
}
- currentMap.changeValue(key, update)
- numPairsInMemory += 1
+ }
+
+ /**
+ * Insert the given iterable of keys and values into the map.
+ *
+ * When the underlying map needs to grow, check if the global pool of shuffle memory has
+ * enough room for this to happen. If so, allocate the memory required to grow the map;
+ * otherwise, spill the in-memory map to disk.
+ *
+ * The shuffle memory usage of the first trackMemoryThreshold entries is not tracked.
+ */
+ def insertAll(entries: Iterable[Product2[K, V]]): Unit = {
+ insertAll(entries.iterator)
}
/**
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index 428822949c..0b7ad184a4 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -63,12 +63,13 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
mergeValue, mergeCombiners)
- map.insert(1, 10)
- map.insert(2, 20)
- map.insert(3, 30)
- map.insert(1, 100)
- map.insert(2, 200)
- map.insert(1, 1000)
+ map.insertAll(Seq(
+ (1, 10),
+ (2, 20),
+ (3, 30),
+ (1, 100),
+ (2, 200),
+ (1, 1000)))
val it = map.iterator
assert(it.hasNext)
val result = it.toSet[(Int, ArrayBuffer[Int])].map(kv => (kv._1, kv._2.toSet))
@@ -282,7 +283,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
assert(w1.hashCode === w2.hashCode)
}
- (1 to 100000).map(_.toString).foreach { i => map.insert(i, i) }
+ map.insertAll((1 to 100000).iterator.map(_.toString).map(i => (i, i)))
collisionPairs.foreach { case (w1, w2) =>
map.insert(w1, w2)
map.insert(w2, w1)
@@ -355,7 +356,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](
createCombiner, mergeValue, mergeCombiners)
- (1 to 100000).foreach { i => map.insert(i, i) }
+ map.insertAll((1 to 100000).iterator.map(i => (i, i)))
map.insert(null.asInstanceOf[Int], 1)
map.insert(1, null.asInstanceOf[Int])
map.insert(null.asInstanceOf[Int], null.asInstanceOf[Int])