aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2013-12-29 16:22:44 -0800
committerAndrew Or <andrewor14@gmail.com>2013-12-29 16:22:44 -0800
commit8fbff9f5d04064b870e372db0e3885e3fbf28222 (patch)
tree56cf01c7921065d631ddb14708ff32e546305fbf /core
parent2a7b3511f45dcaceaa099e99b7a561b4a266d647 (diff)
downloadspark-8fbff9f5d04064b870e372db0e3885e3fbf28222.tar.gz
spark-8fbff9f5d04064b870e372db0e3885e3fbf28222.tar.bz2
spark-8fbff9f5d04064b870e372db0e3885e3fbf28222.zip
Address Aaron's comments
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/Aggregator.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala31
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala221
4 files changed, 184 insertions, 84 deletions
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 470694ed35..784c09ec51 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -74,7 +74,7 @@ case class Aggregator[K, V, C: ClassTag] (
val combiners =
new ExternalAppendOnlyMap[K, C, C](Predef.identity, mergeCombiners, mergeCombiners)
while (iter.hasNext) {
- var kc = iter.next()
+ val kc = iter.next()
combiners.insert(kc._1, kc._2)
}
combiners.iterator
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index dd02c8a3fe..77a594a3e4 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -43,8 +43,7 @@ private[spark] case class NarrowCoGroupSplitDep(
private[spark] case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep
-private[spark]
-class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep])
+private[spark] class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep])
extends Partition with Serializable {
override val index: Int = idx
override def hashCode(): Int = idx
@@ -60,6 +59,9 @@ class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep])
class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {
+ // For example, `(k, a) cogroup (k, b)` produces k -> Seq(ArrayBuffer as, ArrayBuffer bs).
+ // Each ArrayBuffer is represented as a CoGroup, and the resulting Seq as a CoGroupCombiner.
+ // CoGroupValue is the intermediate state of each value before being merged in compute.
private type CoGroup = ArrayBuffer[Any]
private type CoGroupValue = (Any, Int) // Int is dependency number
private type CoGroupCombiner = Seq[CoGroup]
@@ -103,7 +105,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
override val partitioner = Some(part)
override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = {
- // e.g. for `(k, a) cogroup (k, b)`, K -> Seq(ArrayBuffer as, ArrayBuffer bs)
+
val externalSorting = System.getProperty("spark.shuffle.externalSorting", "false").toBoolean
val split = s.asInstanceOf[CoGroupPartition]
val numRdds = split.deps.size
@@ -113,27 +115,30 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
case NarrowCoGroupSplitDep(rdd, _, itsSplit) => {
// Read them from the parent
- val v = (rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]], depNum)
- rddIterators += v
+ val it = rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]]
+ rddIterators += ((it, depNum))
}
case ShuffleCoGroupSplitDep(shuffleId) => {
// Read map outputs of shuffle
val fetcher = SparkEnv.get.shuffleFetcher
val ser = SparkEnv.get.serializerManager.get(serializerClass)
- val v = (fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context, ser), depNum)
- rddIterators += v
+ val it = fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context, ser)
+ rddIterators += ((it, depNum))
}
}
if (!externalSorting) {
val map = new AppendOnlyMap[K, CoGroupCombiner]
val update: (Boolean, CoGroupCombiner) => CoGroupCombiner = (hadVal, oldVal) => {
- if (hadVal) oldVal else Array.fill(numRdds)(new ArrayBuffer[Any])
+ if (hadVal) oldVal else Array.fill(numRdds)(new CoGroup)
+ }
+ val getCombiner: K => CoGroupCombiner = key => {
+ map.changeValue(key, update)
}
rddIterators.foreach { case (it, depNum) =>
while (it.hasNext) {
val kv = it.next()
- map.changeValue(kv._1, update)(depNum) += kv._2
+ getCombiner(kv._1)(depNum) += kv._2
}
}
new InterruptibleIterator(context, map.iterator)
@@ -149,17 +154,17 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
}
}
- private def createExternalMap(numRdds: Int):
- ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner] = {
+ private def createExternalMap(numRdds: Int)
+ : ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner] = {
val createCombiner: (CoGroupValue => CoGroupCombiner) = value => {
val newCombiner = Array.fill(numRdds)(new CoGroup)
- value match { case(v, depNum) => newCombiner(depNum) += v }
+ value match { case (v, depNum) => newCombiner(depNum) += v }
newCombiner
}
val mergeValue: (CoGroupCombiner, CoGroupValue) => CoGroupCombiner =
(combiner, value) => {
- value match { case(v, depNum) => combiner(depNum) += v }
+ value match { case (v, depNum) => combiner(depNum) += v }
combiner
}
val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner =
diff --git a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
index 38f3c556ae..7810119847 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
@@ -237,10 +237,12 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
if (highBit == n) n else highBit << 1
}
- // Return an iterator of the map in sorted order.
- // Note that the validity of the map is no longer preserved.
- def destructiveSortedIterator(ord: Ordering[(K, V)]): Iterator[(K, V)] = {
+ /** Return an iterator of the map in sorted order. This provides a way to sort the map without
+ * using additional memory, at the expense of destroying the validity of the map.
+ */
+ def destructiveSortedIterator(ordering: Ordering[(K, V)]): Iterator[(K, V)] = {
var keyIndex, newIndex = 0
+ // Pack KV pairs into the front of the underlying array
while (keyIndex < capacity) {
if (data(2 * keyIndex) != null) {
data(newIndex) = (data(2 * keyIndex), data(2 * keyIndex + 1))
@@ -248,11 +250,11 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
}
keyIndex += 1
}
- // sort
assert(newIndex == curSize)
+ // Sort by the given ordering
val rawOrdering = new Ordering[AnyRef] {
- def compare(x: AnyRef, y: AnyRef): Int ={
- ord.compare(x.asInstanceOf[(K, V)], y.asInstanceOf[(K, V)])
+ def compare(x: AnyRef, y: AnyRef): Int = {
+ ordering.compare(x.asInstanceOf[(K, V)], y.asInstanceOf[(K, V)])
}
}
util.Arrays.sort(data, 0, curSize, rawOrdering)
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index f5a2e8db59..1de545c05b 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -33,6 +33,10 @@ import org.apache.spark.serializer.Serializer
*
* (2) Otherwise, group values of the same key together before disk spill, and merge them
* into combiners only after reading them back from disk.
+ *
+ * In the latter case, values occupy much more space because they are not collapsed as soon
+ * as they are inserted. This in turn leads to more disk spills, degrading performance.
+ * For this reason, a mergeCombiners function should be specified if possible.
*/
private[spark] class ExternalAppendOnlyMap[K, V, C: ClassTag](
createCombiner: V => C,
@@ -78,28 +82,42 @@ private[spark] class ExternalAppendOnlyMap[K, V, C: ClassTag](
/**
* An append-only map that spills sorted content to disk when the memory threshold is exceeded.
- * A group is an intermediate combiner, with type M equal to either C or ArrayBuffer[V].
+ * A group is an intermediate combiner, with type G equal to either C or ArrayBuffer[V].
+ *
+ * This map takes two passes over the data:
+ * (1) Values are merged into groups, which are spilled to disk as necessary.
+ * (2) Groups are read from disk and merged into combiners, which are returned.
+ *
+ * If we never spill to disk, we avoid the second pass provided that groups G are already
+ * combiners C.
+ *
+ * Note that OOM is still possible with the SpillableAppendOnlyMap. This may occur if the
+ * collective G values do not fit into memory, or if the size estimation is not sufficiently
+ * accurate. To account for the latter, `spark.shuffle.buffer.fraction` specifies an additional
+ * margin of safety, while `spark.shuffle.buffer.mb` specifies the raw memory threshold.
*/
-private[spark] class SpillableAppendOnlyMap[K, V, M: ClassTag, C: ClassTag](
- createGroup: V => M,
- mergeValue: (M, V) => M,
- mergeGroups: (M, M) => M,
- createCombiner: M => C,
+private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
+ createGroup: V => G,
+ mergeValue: (G, V) => G,
+ mergeGroups: (G, G) => G,
+ createCombiner: G => C,
serializer: Serializer)
extends Iterable[(K, C)] with Serializable {
- private var currentMap = new SizeTrackingAppendOnlyMap[K, M]
- private val oldMaps = new ArrayBuffer[DiskIterator]
+ import SpillableAppendOnlyMap._
+
+ private var currentMap = new SizeTrackingAppendOnlyMap[K, G]
+ private val oldMaps = new ArrayBuffer[DiskKGIterator]
private val memoryThreshold = {
val bufferSize = System.getProperty("spark.shuffle.buffer.mb", "1024").toLong * 1024 * 1024
val bufferPercent = System.getProperty("spark.shuffle.buffer.fraction", "0.8").toFloat
bufferSize * bufferPercent
}
- private val ordering = new SpillableAppendOnlyMap.KeyHashOrdering[K, M]()
+ private val ordering = new KeyGroupOrdering[K, G]
private val ser = serializer.newInstance()
def insert(key: K, value: V): Unit = {
- val update: (Boolean, M) => M = (hadVal, oldVal) => {
+ val update: (Boolean, G) => G = (hadVal, oldVal) => {
if (hadVal) mergeValue(oldVal, value) else createGroup(value)
}
currentMap.changeValue(key, update)
@@ -117,98 +135,173 @@ private[spark] class SpillableAppendOnlyMap[K, V, M: ClassTag, C: ClassTag](
out.writeObject(kv)
}
out.close()
- currentMap = new SizeTrackingAppendOnlyMap[K, M]
- oldMaps.append(new DiskIterator(file))
+ currentMap = new SizeTrackingAppendOnlyMap[K, G]
+ oldMaps.append(new DiskKGIterator(file))
}
override def iterator: Iterator[(K, C)] = {
- if (oldMaps.isEmpty && implicitly[ClassTag[M]] == implicitly[ClassTag[C]]) {
+ if (oldMaps.isEmpty && implicitly[ClassTag[G]] == implicitly[ClassTag[C]]) {
currentMap.iterator.asInstanceOf[Iterator[(K, C)]]
} else {
new ExternalIterator()
}
}
- // An iterator that sort-merges (K, M) pairs from memory and disk into (K, C) pairs
+ // An iterator that sort-merges (K, G) pairs from memory and disk into (K, C) pairs
private class ExternalIterator extends Iterator[(K, C)] {
- val pq = new PriorityQueue[KMITuple]
- val inputStreams = Seq(currentMap.destructiveSortedIterator(ordering)) ++ oldMaps
- inputStreams.foreach(readFromIterator)
-
- // Read from the given iterator until a key of different hash is retrieved
- def readFromIterator(it: Iterator[(K, M)]): Unit = {
- var minHash : Option[Int] = None
- while (it.hasNext) {
- val (k, m) = it.next()
- pq.enqueue(KMITuple(k, m, it))
- minHash match {
- case None => minHash = Some(k.hashCode())
- case Some(expectedHash) =>
- if (k.hashCode() != expectedHash) {
- return
- }
+ val mergeHeap = new PriorityQueue[KGITuple]
+ val inputStreams = oldMaps ++ Seq(currentMap.destructiveSortedIterator(ordering))
+
+ // Invariant: size of mergeHeap == number of input streams
+ inputStreams.foreach{ it =>
+ val kgPairs = readFromIterator(it)
+ mergeHeap.enqueue(KGITuple(it, kgPairs))
+ }
+
+ // Read from the given iterator until a key of different hash is retrieved.
+ // The resulting ArrayBuffer includes this key, and is ordered by key hash.
+ def readFromIterator(it: Iterator[(K, G)]): ArrayBuffer[(K, G)] = {
+ val kgPairs = new ArrayBuffer[(K, G)]
+ if (it.hasNext) {
+ var kg = it.next()
+ kgPairs += kg
+ val minHash = kg._1.hashCode()
+ while (it.hasNext && kg._1.hashCode() == minHash) {
+ kg = it.next()
+ kgPairs += kg
+ }
+ }
+ kgPairs
+ }
+
+ // Drop and return all (K, G) pairs with K = the given key from the given KGITuple
+ def dropKey(kgi: KGITuple, key: K): ArrayBuffer[(K, G)] = {
+ val dropped = new ArrayBuffer[(K, G)]
+ var i = 0
+ while (i < kgi.pairs.length) {
+ if (kgi.pairs(i)._1 == key) {
+ dropped += kgi.pairs.remove(i)
+ } else {
+ i += 1
}
}
+ dropped
}
- override def hasNext: Boolean = !pq.isEmpty
+ // Merge all (K, G) pairs with K = the given key into baseGroup
+ def mergeIntoGroup(key: K, baseGroup: G, kgPairs: ArrayBuffer[(K, G)]): G = {
+ var mergedGroup = baseGroup
+ kgPairs.foreach { case (k, g) =>
+ if (k == key){
+ mergedGroup = mergeGroups(mergedGroup, g)
+ }
+ }
+ mergedGroup
+ }
+
+ override def hasNext: Boolean = {
+ mergeHeap.foreach{ kgi =>
+ if (!kgi.pairs.isEmpty) {
+ return true
+ }
+ }
+ false
+ }
override def next(): (K, C) = {
- val minKMI = pq.dequeue()
- var (minKey, minGroup) = (minKMI.key, minKMI.group)
- val minHash = minKey.hashCode()
- readFromIterator(minKMI.iterator)
-
- // Merge groups with the same key into minGroup
- var collidedKMI = ArrayBuffer[KMITuple]()
- while (!pq.isEmpty && pq.head.key.hashCode() == minHash) {
- val newKMI = pq.dequeue()
- if (newKMI.key == minKey) {
- minGroup = mergeGroups(minGroup, newKMI.group)
- readFromIterator(newKMI.iterator)
- } else {
- // Collision
- collidedKMI += newKMI
+ var minKGI = mergeHeap.dequeue()
+ val (minPairs, minHash) = (minKGI.pairs, minKGI.minHash)
+ if (minPairs.length == 0) {
+ // Should only happen when hasNext is false
+ throw new NoSuchElementException
+ }
+ var (minKey, minGroup) = minPairs(0)
+ assert(minKey.hashCode() == minHash)
+
+ // Merge the rest of minPairs into minGroup
+ val minPairsWithKey = dropKey(minKGI, minKey).tail
+ minGroup = mergeIntoGroup(minKey, minGroup, minPairsWithKey)
+ if (minPairs.length == 0) {
+ minPairs ++= readFromIterator(minKGI.iterator)
+ }
+
+ // Do the same for all other KGITuples with the same minHash
+ val tuplesToAddBack = ArrayBuffer[KGITuple](minKGI)
+ while (!mergeHeap.isEmpty && mergeHeap.head.minHash == minHash) {
+ val newKGI = mergeHeap.dequeue()
+ val pairsWithKey = dropKey(newKGI, minKey)
+ minGroup = mergeIntoGroup(minKey, minGroup, pairsWithKey)
+ if (newKGI.pairs.length == 0) {
+ newKGI.pairs ++= readFromIterator(newKGI.iterator)
}
+ tuplesToAddBack += newKGI
}
- collidedKMI.foreach(pq.enqueue(_))
+ tuplesToAddBack.foreach(mergeHeap.enqueue(_))
(minKey, createCombiner(minGroup))
}
- case class KMITuple(key: K, group: M, iterator: Iterator[(K, M)]) extends Ordered[KMITuple] {
- def compare(other: KMITuple): Int = {
- other.key.hashCode().compareTo(key.hashCode())
+ case class KGITuple(iterator: Iterator[(K, G)], pairs: ArrayBuffer[(K, G)])
+ extends Ordered[KGITuple] {
+
+ // Invariant: pairs are ordered by key hash
+ def minHash: Int = {
+ if (pairs.length > 0){
+ pairs(0)._1.hashCode()
+ } else {
+ Int.MaxValue
+ }
+ }
+
+ def compare(other: KGITuple): Int = {
+ // mutable.PriorityQueue dequeues the max, not the min
+ -minHash.compareTo(other.minHash)
}
}
}
- // Iterate through (K, M) pairs in sorted order from an on-disk map
- private class DiskIterator(file: File) extends Iterator[(K, M)] {
+ // Iterate through (K, G) pairs in sorted order from an on-disk map
+ private class DiskKGIterator(file: File) extends Iterator[(K, G)] {
val in = ser.deserializeStream(new FileInputStream(file))
- var nextItem: Option[(K, M)] = None
+ var nextItem: Option[(K, G)] = None
+ var eof = false
+
+ def readNextItem(): Option[(K, G)] = {
+ if (!eof) {
+ try {
+ return Some(in.readObject().asInstanceOf[(K, G)])
+ } catch {
+ case e: EOFException => eof = true
+ }
+ }
+ None
+ }
override def hasNext: Boolean = {
- nextItem = try {
- Some(in.readObject().asInstanceOf[(K, M)])
- } catch {
- case e: EOFException => None
+ nextItem match {
+ case Some(item) => true
+ case None =>
+ nextItem = readNextItem()
+ nextItem.isDefined
}
- nextItem.isDefined
}
- override def next(): (K, M) = {
+ override def next(): (K, G) = {
nextItem match {
- case Some(item) => item
- case None => throw new NoSuchElementException
+ case Some(item) =>
+ nextItem = None
+ item
+ case None =>
+ val item = readNextItem()
+ item.getOrElse(throw new NoSuchElementException)
}
}
}
}
private[spark] object SpillableAppendOnlyMap {
- private class KeyHashOrdering[K, M] extends Ordering[(K, M)] {
- def compare(x: (K, M), y: (K, M)): Int = {
- x._1.hashCode().compareTo(y._1.hashCode())
+ private class KeyGroupOrdering[K, G] extends Ordering[(K, G)] {
+ def compare(kg1: (K, G), kg2: (K, G)): Int = {
+ kg1._1.hashCode().compareTo(kg2._1.hashCode())
}
}
}