aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2014-01-01 11:42:33 -0800
committerAndrew Or <andrewor14@gmail.com>2014-01-01 11:42:33 -0800
commit92c304fd0321d77941f0b029dc7b7f61804d8bca (patch)
treeb8094f0b987cd9fea2d0a6a83a62a2435e28b127
parent3bc9e391a3eb1dd21bb93b15caf49627134c1917 (diff)
downloadspark-92c304fd0321d77941f0b029dc7b7f61804d8bca.tar.gz
spark-92c304fd0321d77941f0b029dc7b7f61804d8bca.tar.bz2
spark-92c304fd0321d77941f0b029dc7b7f61804d8bca.zip
Simplify ExternalAppendOnlyMap on the assumption that the mergeCombiners function is specified
-rw-r--r--core/src/main/scala/org/apache/spark/Aggregator.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala162
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala23
3 files changed, 53 insertions, 135 deletions
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 784c09ec51..c408d5f145 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -71,8 +71,7 @@ case class Aggregator[K, V, C: ClassTag] (
}
combiners.iterator
} else {
- val combiners =
- new ExternalAppendOnlyMap[K, C, C](Predef.identity, mergeCombiners, mergeCombiners)
+ val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners)
while (iter.hasNext) {
val kc = iter.next()
combiners.insert(kc._1, kc._2)
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 223fae128e..9e147feec4 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -23,98 +23,40 @@ import java.util.Comparator
import it.unimi.dsi.fastutil.io.FastBufferedInputStream
import scala.collection.mutable.{ArrayBuffer, PriorityQueue}
-import scala.reflect.ClassTag
import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage.{DiskBlockManager, DiskBlockObjectWriter}
/**
- * A wrapper for SpillableAppendOnlyMap that handles two cases:
+ * An append-only map that spills sorted content to disk when the memory threshold is exceeded.
*
- * (1) If a mergeCombiners function is specified, merge values into combiners before disk
- * spill, as it is possible to merge the resulting combiners later.
+ * This map takes two passes over the data:
+ * (1) Values are merged into combiners, which are sorted and spilled to disk in as necessary.
+ * (2) Combiners are read from disk and merged together
*
- * (2) Otherwise, group values of the same key together before disk spill, and merge them
- * into combiners only after reading them back from disk.
+ * Two parameters control the memory threshold: `spark.shuffle.buffer.mb` specifies the maximum
+ * size of the in-memory map before a spill, and `spark.shuffle.buffer.fraction` specifies an
+ * additional margin of safety. The second parameter is important for the following reason:
*
- * In the latter case, values occupy much more space because they are not collapsed as soon
- * as they are inserted. This in turn leads to more disk spills, degrading performance.
- * For this reason, a mergeCombiners function should be specified if possible.
+ * If the spill threshold is set too high, the in-memory map may occupy more memory than is
+ * available, resulting in OOM. However, if the spill threshold is set too low, we spill
+ * frequently and incur unnecessary disk writes. This may lead to a performance regression
+ * compared to the normal case of using the non-spilling AppendOnlyMap.
*/
-private[spark] class ExternalAppendOnlyMap[K, V, C: ClassTag](
+
+private[spark] class ExternalAppendOnlyMap[K, V, C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
serializer: Serializer = SparkEnv.get.serializerManager.default,
diskBlockManager: DiskBlockManager = SparkEnv.get.blockManager.diskBlockManager)
- extends Iterable[(K, C)] with Serializable {
-
- private val mergeBeforeSpill: Boolean = mergeCombiners != null
-
- private val map: SpillableAppendOnlyMap[K, V, _, C] = {
- if (mergeBeforeSpill) {
- new SpillableAppendOnlyMap[K, V, C, C] (createCombiner, mergeValue, mergeCombiners,
- identity, serializer, diskBlockManager)
- } else {
- // Use ArrayBuffer[V] as the intermediate combiner
- val createGroup: (V => ArrayBuffer[V]) = value => ArrayBuffer[V](value)
- val mergeValueIntoGroup: (ArrayBuffer[V], V) => ArrayBuffer[V] = (group, value) => {
- group += value
- }
- val mergeGroups: (ArrayBuffer[V], ArrayBuffer[V]) => ArrayBuffer[V] = (group1, group2) => {
- group1 ++= group2
- }
- val combineGroup: (ArrayBuffer[V] => C) = group => {
- var combiner : Option[C] = None
- group.foreach { v =>
- combiner match {
- case None => combiner = Some(createCombiner(v))
- case Some(c) => combiner = Some(mergeValue(c, v))
- }
- }
- combiner.getOrElse(null.asInstanceOf[C])
- }
- new SpillableAppendOnlyMap[K, V, ArrayBuffer[V], C](createGroup, mergeValueIntoGroup,
- mergeGroups, combineGroup, serializer, diskBlockManager)
- }
- }
-
- def insert(key: K, value: V): Unit = map.insert(key, value)
-
- override def iterator: Iterator[(K, C)] = map.iterator
-}
-
-/**
- * An append-only map that spills sorted content to disk when the memory threshold is exceeded.
- * A group is an intermediate combiner, with type G equal to either C or ArrayBuffer[V].
- *
- * This map takes two passes over the data:
- * (1) Values are merged into groups, which are spilled to disk as necessary.
- * (2) Groups are read from disk and merged into combiners, which are returned.
- *
- * If we never spill to disk, we avoid the second pass provided that groups G are already
- * combiners C.
- *
- * Note that OOM is still possible with the SpillableAppendOnlyMap. This may occur if the
- * collective G values do not fit into memory, or if the size estimation is not sufficiently
- * accurate. To account for the latter, `spark.shuffle.buffer.fraction` specifies an additional
- * margin of safety, while `spark.shuffle.buffer.mb` specifies the raw memory threshold.
- */
-private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
- createGroup: V => G,
- mergeValue: (G, V) => G,
- mergeGroups: (G, G) => G,
- createCombiner: G => C,
- serializer: Serializer,
- diskBlockManager: DiskBlockManager)
extends Iterable[(K, C)] with Serializable with Logging {
- import SpillableAppendOnlyMap._
+ import ExternalAppendOnlyMap._
- private var currentMap = new SizeTrackingAppendOnlyMap[K, G]
+ private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
private val spilledMaps = new ArrayBuffer[DiskIterator]
-
private val memoryThresholdMB = {
// TODO: Turn this into a fraction of memory per reducer
val bufferSize = System.getProperty("spark.shuffle.buffer.mb", "1024").toLong
@@ -123,13 +65,13 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
}
private val fileBufferSize =
System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024
- private val comparator = new KeyGroupComparator[K, G]
+ private val comparator = new KCComparator[K, C]
private val ser = serializer.newInstance()
private var spillCount = 0
def insert(key: K, value: V): Unit = {
- val update: (Boolean, G) => G = (hadVal, oldVal) => {
- if (hadVal) mergeValue(oldVal, value) else createGroup(value)
+ val update: (Boolean, C) => C = (hadVal, oldVal) => {
+ if (hadVal) mergeValue(oldVal, value) else createCombiner(value)
}
currentMap.changeValue(key, update)
if (currentMap.estimateSize() > memoryThresholdMB * 1024 * 1024) {
@@ -154,19 +96,19 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
// Partial failures cannot be tolerated; do not revert partial writes
writer.close()
}
- currentMap = new SizeTrackingAppendOnlyMap[K, G]
+ currentMap = new SizeTrackingAppendOnlyMap[K, C]
spilledMaps.append(new DiskIterator(file))
}
override def iterator: Iterator[(K, C)] = {
- if (spilledMaps.isEmpty && implicitly[ClassTag[G]] == implicitly[ClassTag[C]]) {
- currentMap.iterator.asInstanceOf[Iterator[(K, C)]]
+ if (spilledMaps.isEmpty) {
+ currentMap.iterator
} else {
new ExternalIterator()
}
}
- /** An iterator that sort-merges (K, G) pairs from memory and disk into (K, C) pairs. */
+ /** An iterator that sort-merges (K, C) pairs from the in-memory and on-disk maps */
private class ExternalIterator extends Iterator[(K, C)] {
// A fixed-size queue that maintains a buffer for each stream we are currently merging
@@ -177,43 +119,43 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
val inputStreams = Seq(currentMap.destructiveSortedIterator(comparator)) ++ spilledMaps
inputStreams.foreach{ it =>
- val kgPairs = getMorePairs(it)
- mergeHeap.enqueue(StreamBuffer(it, kgPairs))
+ val kcPairs = getMorePairs(it)
+ mergeHeap.enqueue(StreamBuffer(it, kcPairs))
}
/**
* Fetch from the given iterator until a key of different hash is retrieved. In the
* event of key hash collisions, this ensures no pairs are hidden from being merged.
*/
- def getMorePairs(it: Iterator[(K, G)]): ArrayBuffer[(K, G)] = {
- val kgPairs = new ArrayBuffer[(K, G)]
+ def getMorePairs(it: Iterator[(K, C)]): ArrayBuffer[(K, C)] = {
+ val kcPairs = new ArrayBuffer[(K, C)]
if (it.hasNext) {
- var kg = it.next()
- kgPairs += kg
- val minHash = kg._1.hashCode()
- while (it.hasNext && kg._1.hashCode() == minHash) {
- kg = it.next()
- kgPairs += kg
+ var kc = it.next()
+ kcPairs += kc
+ val minHash = kc._1.hashCode()
+ while (it.hasNext && kc._1.hashCode() == minHash) {
+ kc = it.next()
+ kcPairs += kc
}
}
- kgPairs
+ kcPairs
}
/**
* If the given buffer contains a value for the given key, merge that value into
- * baseGroup and remove the corresponding (K, G) pair from the buffer
+ * baseCombiner and remove the corresponding (K, C) pair from the buffer
*/
- def mergeIfKeyExists(key: K, baseGroup: G, buffer: StreamBuffer): G = {
+ def mergeIfKeyExists(key: K, baseCombiner: C, buffer: StreamBuffer): C = {
var i = 0
while (i < buffer.pairs.size) {
- val (k, g) = buffer.pairs(i)
+ val (k, c) = buffer.pairs(i)
if (k == key) {
buffer.pairs.remove(i)
- return mergeGroups(baseGroup, g)
+ return mergeCombiners(baseCombiner, c)
}
i += 1
}
- baseGroup
+ baseCombiner
}
override def hasNext: Boolean = {
@@ -233,7 +175,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
// Should only happen when no other stream buffers have any pairs left
throw new NoSuchElementException
}
- var (minKey, minGroup) = minPairs.remove(0)
+ var (minKey, minCombiner) = minPairs.remove(0)
assert(minKey.hashCode() == minHash)
// For all other streams that may have this key (i.e. have the same minimum key hash),
@@ -241,7 +183,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
val mergedBuffers = ArrayBuffer[StreamBuffer](minBuffer)
while (!mergeHeap.isEmpty && mergeHeap.head.minKeyHash == minHash) {
val newBuffer = mergeHeap.dequeue()
- minGroup = mergeIfKeyExists(minKey, minGroup, newBuffer)
+ minCombiner = mergeIfKeyExists(minKey, minCombiner, newBuffer)
mergedBuffers += newBuffer
}
@@ -253,7 +195,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
mergeHeap.enqueue(buffer)
}
- (minKey, createCombiner(minGroup))
+ (minKey, minCombiner)
}
/**
@@ -263,7 +205,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
*
* StreamBuffers are ordered by the minimum key hash found across all of their own pairs.
*/
- case class StreamBuffer(iterator: Iterator[(K, G)], pairs: ArrayBuffer[(K, G)])
+ case class StreamBuffer(iterator: Iterator[(K, C)], pairs: ArrayBuffer[(K, C)])
extends Comparable[StreamBuffer] {
def minKeyHash: Int = {
@@ -282,18 +224,18 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
}
}
- // Iterate through (K, G) pairs in sorted order from an on-disk map
- private class DiskIterator(file: File) extends Iterator[(K, G)] {
+ // Iterate through (K, C) pairs in sorted order from an on-disk map
+ private class DiskIterator(file: File) extends Iterator[(K, C)] {
val fileStream = new FileInputStream(file)
val bufferedStream = new FastBufferedInputStream(fileStream)
val deserializeStream = ser.deserializeStream(bufferedStream)
- var nextItem: Option[(K, G)] = None
+ var nextItem: Option[(K, C)] = None
var eof = false
- def readNextItem(): Option[(K, G)] = {
+ def readNextItem(): Option[(K, C)] = {
if (!eof) {
try {
- return Some(deserializeStream.readObject().asInstanceOf[(K, G)])
+ return Some(deserializeStream.readObject().asInstanceOf[(K, C)])
} catch {
case e: EOFException =>
eof = true
@@ -312,7 +254,7 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
}
}
- override def next(): (K, G) = {
+ override def next(): (K, C) = {
nextItem match {
case Some(item) =>
nextItem = None
@@ -331,10 +273,10 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
}
}
-private[spark] object SpillableAppendOnlyMap {
- private class KeyGroupComparator[K, G] extends Comparator[(K, G)] {
- def compare(kg1: (K, G), kg2: (K, G)): Int = {
- kg1._1.hashCode().compareTo(kg2._1.hashCode())
+private[spark] object ExternalAppendOnlyMap {
+ private class KCComparator[K, C] extends Comparator[(K, C)] {
+ def compare(kc1: (K, C), kc2: (K, C)): Int = {
+ kc1._1.hashCode().compareTo(kc2._1.hashCode())
}
}
}
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index baf94b4728..a18d466baa 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -229,27 +229,4 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local
}
}
}
-
- test("spilling with no mergeCombiners function") {
- System.setProperty("spark.shuffle.buffer.mb", "1")
- System.setProperty("spark.shuffle.buffer.fraction", "0.05")
-
- // combineByKey - should spill exactly 11 times
- val _createCombiner: Int => ArrayBuffer[Int] = i => ArrayBuffer[Int](i)
- val _mergeValue: (ArrayBuffer[Int], Int) => ArrayBuffer[Int] = (buf, i) => buf += i
- val rdd = sc.parallelize(0 until 10000).map(i => (i/4, i))
- val result = rdd.combineByKey[ArrayBuffer[Int]](_createCombiner, _mergeValue, null,
- new HashPartitioner(1), mapSideCombine=false).collect()
-
- // result should be the same as groupByKey
- assert(result.length == 2500)
- result.foreach { case(i, seq) =>
- i match {
- case 0 => assert(seq.toSet == Set[Int](0, 1, 2, 3))
- case 1250 => assert(seq.toSet == Set[Int](5000, 5001, 5002, 5003))
- case 2499 => assert(seq.toSet == Set[Int](9996, 9997, 9998, 9999))
- case _ =>
- }
- }
- }
}