aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2013-12-24 16:15:02 -0800
committerAndrew Or <andrewor14@gmail.com>2013-12-26 23:40:07 -0800
commit17def8cc1132a5c94d895dacba4217ef9a0e5bd0 (patch)
tree7e2cbfbe67a31f50322793c2b929dc22db987391 /core
parent6a45ec1972d6fc053a10fbd2373f43e0c7562aa5 (diff)
downloadspark-17def8cc1132a5c94d895dacba4217ef9a0e5bd0.tar.gz
spark-17def8cc1132a5c94d895dacba4217ef9a0e5bd0.tar.bz2
spark-17def8cc1132a5c94d895dacba4217ef9a0e5bd0.zip
Refactor ExternalAppendOnlyMap to take in KVC instead of just KV
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/Aggregator.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala46
-rw-r--r--core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala88
3 files changed, 78 insertions, 76 deletions
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 77a24733aa..c51fb1d630 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -33,28 +33,20 @@ case class Aggregator[K, V, C] (
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
//val combiners = new AppendOnlyMap[K, C]
- val combiners = new ExternalAppendOnlyMap[K, C](mergeCombiners)
- var kv: Product2[K, V] = null
- val update = (hadValue: Boolean, oldValue: C) => {
- if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
- }
+ val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
while (iter.hasNext) {
- kv = iter.next()
- combiners.changeValue(kv._1, update)
+ val kv = iter.next()
+ combiners.insert(kv._1, kv._2)
}
combiners.iterator
}
def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = {
//val combiners = new AppendOnlyMap[K, C]
- val combiners = new ExternalAppendOnlyMap[K, C](mergeCombiners)
- var kc: (K, C) = null
- val update = (hadValue: Boolean, oldValue: C) => {
- if (hadValue) mergeCombiners(oldValue, kc._2) else kc._2
- }
+ val combiners = new ExternalAppendOnlyMap[K, C, C]((c:C) => c, mergeCombiners, mergeCombiners)
while (iter.hasNext) {
- kc = iter.next()
- combiners.changeValue(kc._1, update)
+ val kc = iter.next()
+ combiners.insert(kc._1, kc._2)
}
combiners.iterator
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 4c45a94af9..b93c60cd67 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -100,52 +100,56 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
override val partitioner = Some(part)
- override def compute(s: Partition, context: TaskContext): Iterator[(K, Seq[Seq[_]])] = {
+ override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = {
// e.g. for `(k, a) cogroup (k, b)`, K -> Seq(ArrayBuffer as, ArrayBuffer bs)
val split = s.asInstanceOf[CoGroupPartition]
val numRdds = split.deps.size
- def combine(x: Seq[ArrayBuffer[Any]], y: Seq[ArrayBuffer[Any]]) = {
- x.zipAll(y, ArrayBuffer[Any](), ArrayBuffer[Any]()).map {
- case (a, b) => a ++ b
- }
- }
- //val map = new AppendOnlyMap[K, Seq[ArrayBuffer[Any]]]
- val map = new ExternalAppendOnlyMap[K, Seq[ArrayBuffer[Any]]](combine)
+ //val combiners = new AppendOnlyMap[K, Seq[ArrayBuffer[Any]]]
+ val combiners = createExternalMap(numRdds)
val ser = SparkEnv.get.serializerManager.get(serializerClass)
for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
case NarrowCoGroupSplitDep(rdd, _, itsSplit) => {
// Read them from the parent
rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]].foreach {
- kv => addToMap(kv._1, kv._2, depNum)
+ kv => combiners.insert(kv._1, new CoGroupValue(kv._2, depNum))
}
}
case ShuffleCoGroupSplitDep(shuffleId) => {
// Read map outputs of shuffle
val fetcher = SparkEnv.get.shuffleFetcher
fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context, ser).foreach {
- kv => addToMap(kv._1, kv._2, depNum)
+ kv => combiners.insert(kv._1, new CoGroupValue(kv._2, depNum))
}
}
}
+ new InterruptibleIterator(context, combiners.iterator)
+ }
- def addToMap(key: K, value: Any, depNum: Int) {
- def update(hadVal: Boolean, oldVal: Seq[ArrayBuffer[Any]]): Seq[ArrayBuffer[Any]] = {
- var newVal = oldVal
- if (!hadVal){
- newVal = Array.fill(numRdds)(new ArrayBuffer[Any])
- }
- newVal(depNum) += value
- newVal
+ def createExternalMap(numRdds:Int): ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner] = {
+ def createCombiner(v: CoGroupValue): CoGroupCombiner = {
+ val newCombiner = Array.fill(numRdds)(new CoGroup)
+ mergeValue(newCombiner, v)
+ }
+ def mergeValue(c: CoGroupCombiner, v: CoGroupValue): CoGroupCombiner = {
+ v match { case (value, depNum) => c(depNum) += value }
+ c
+ }
+ def mergeCombiners(c1: CoGroupCombiner, c2: CoGroupCombiner): CoGroupCombiner = {
+ c1.zipAll(c2, new CoGroup, new CoGroup).map {
+ case (v1, v2) => v1 ++ v2
}
- map.changeValue(key, update)
}
-
- new InterruptibleIterator(context, map.iterator)
+ new ExternalAppendOnlyMap [K, CoGroupValue, CoGroupCombiner] (
+ createCombiner,mergeValue, mergeCombiners)
}
override def clearDependencies() {
super.clearDependencies()
rdds = null
}
+
+ type CoGroup = ArrayBuffer[Any]
+ type CoGroupValue = (Any, Int) // Int is dependency number
+ type CoGroupCombiner = Seq[CoGroup]
}
diff --git a/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
index 857f8e3439..1a5753603e 100644
--- a/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
@@ -22,80 +22,86 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable
/**
- * A simple map that spills sorted content to disk when the memory threshold is exceeded. A combiner
- * function must be specified to merge values back into memory during read.
+ * An append-only map that spills sorted content to disk when the memory threshold is exceeded.
*/
-class ExternalAppendOnlyMap[K, V](combineFunction: (V, V) => V,
- memoryThresholdMB: Int = 1024)
- extends Iterable[(K, V)] with Serializable {
+class ExternalAppendOnlyMap[K, V, C] (createCombiner: V => C,
+ mergeValue: (C, V) => C,
+ mergeCombiners: (C, C) => C,
+ memoryThresholdMB: Int = 1024)
+ extends Iterable[(K, C)] with Serializable {
- var currentMap = new AppendOnlyMap[K, V]
- var oldMaps = new ArrayBuffer[DiskKVIterator]
+ var currentMap = new AppendOnlyMap[K, C]
+ var oldMaps = new ArrayBuffer[DiskKCIterator]
- def changeValue(key: K, updateFunc: (Boolean, V) => V): Unit = {
- currentMap.changeValue(key, updateFunc)
+ def insert(key: K, value: V): Unit = {
+ currentMap.changeValue(key, updateFunction(value))
val mapSize = SizeEstimator.estimate(currentMap)
if (mapSize > memoryThresholdMB * math.pow(1024, 2)) {
spill()
}
}
+ def updateFunction(value: V) : (Boolean, C) => C = {
+ (hadVal: Boolean, oldVal: C) =>
+ if (hadVal) mergeValue(oldVal, value) else createCombiner(value)
+ }
+
def spill(): Unit = {
val file = File.createTempFile("external_append_only_map", "") // Add spill location
val out = new ObjectOutputStream(new FileOutputStream(file))
val sortedMap = currentMap.iterator.toList.sortBy(kv => kv._1.hashCode())
sortedMap.foreach { out.writeObject( _ ) }
out.close()
- currentMap = new AppendOnlyMap[K, V]
- oldMaps.append(new DiskKVIterator(file))
+ currentMap = new AppendOnlyMap[K, C]
+ oldMaps.append(new DiskKCIterator(file))
}
- override def iterator: Iterator[(K, V)] = new ExternalIterator()
+ override def iterator: Iterator[(K, C)] = new ExternalIterator()
/**
* An iterator that merges KV pairs from memory and disk in sorted order
*/
- class ExternalIterator extends Iterator[(K, V)] {
+ class ExternalIterator extends Iterator[(K, C)] {
// Order by increasing key hash value
- implicit object KVOrdering extends Ordering[KVITuple] {
- def compare(a:KVITuple, b:KVITuple) = -a.key.hashCode().compareTo(b.key.hashCode())
+ implicit object KVOrdering extends Ordering[KCITuple] {
+ def compare(a:KCITuple, b:KCITuple) = -a.key.hashCode().compareTo(b.key.hashCode())
}
- val pq = mutable.PriorityQueue[KVITuple]()
- val inputStreams = Seq(new MemoryKVIterator(currentMap)) ++ oldMaps
+ val pq = mutable.PriorityQueue[KCITuple]()
+ val inputStreams = Seq(new MemoryKCIterator(currentMap)) ++ oldMaps
inputStreams.foreach { readFromIterator( _ ) }
override def hasNext: Boolean = !pq.isEmpty
// Combine all values from all input streams corresponding to the same key
- override def next(): (K,V) = {
- val minKVI = pq.dequeue()
- var (minKey, minValue) = (minKVI.key, minKVI.value)
+ override def next(): (K, C) = {
+ val minKCI = pq.dequeue()
+ var (minKey, minCombiner) = (minKCI.key, minKCI.combiner)
val minHash = minKey.hashCode()
- readFromIterator(minKVI.iter)
+ readFromIterator(minKCI.iter)
- var collidedKVI = ArrayBuffer[KVITuple]()
+ var collidedKCI = ArrayBuffer[KCITuple]()
while (!pq.isEmpty && pq.head.key.hashCode() == minHash) {
- val newKVI: KVITuple = pq.dequeue()
- if (newKVI.key == minKey){
- minValue = combineFunction(minValue, newKVI.value)
- readFromIterator(newKVI.iter)
+ val newKCI: KCITuple = pq.dequeue()
+ if (newKCI.key == minKey){
+ minCombiner = mergeCombiners(minCombiner, newKCI.combiner)
+ readFromIterator(newKCI.iter)
} else {
// Collision
- collidedKVI += newKVI
+ collidedKCI += newKCI
}
}
- collidedKVI.foreach { pq.enqueue( _ ) }
- (minKey, minValue)
+ collidedKCI.foreach { pq.enqueue( _ ) }
+ (minKey, minCombiner)
}
// Read from the given iterator until a key of different hash is retrieved,
- // Add each KV pair read from this iterator to the heap
- def readFromIterator(iter: Iterator[(K, V)]): Unit = {
+ // Add each KC pair read from this iterator to the heap
+ def readFromIterator(iter: Iterator[(K, C)]): Unit = {
var minHash : Option[Int] = None
while (iter.hasNext) {
- val (k, v) = iter.next()
- pq.enqueue(KVITuple(k, v, iter))
+ val (k, c) = iter.next()
+ pq.enqueue(KCITuple(k, c, iter))
minHash match {
case None => minHash = Some(k.hashCode())
case Some(expectedHash) =>
@@ -106,19 +112,19 @@ class ExternalAppendOnlyMap[K, V](combineFunction: (V, V) => V,
}
}
- case class KVITuple(key:K, value:V, iter:Iterator[(K, V)])
+ case class KCITuple(key:K, combiner:C, iter:Iterator[(K, C)])
}
- class MemoryKVIterator(map: AppendOnlyMap[K, V]) extends Iterator[(K, V)] {
- val sortedMap = currentMap.iterator.toList.sortBy(kv => kv._1.hashCode())
+ class MemoryKCIterator(map: AppendOnlyMap[K, C]) extends Iterator[(K, C)] {
+ val sortedMap = currentMap.iterator.toList.sortBy(kc => kc._1.hashCode())
val it = sortedMap.iterator
override def hasNext: Boolean = it.hasNext
- override def next(): (K, V) = it.next()
+ override def next(): (K, C) = it.next()
}
- class DiskKVIterator(file: File) extends Iterator[(K, V)] {
+ class DiskKCIterator(file: File) extends Iterator[(K, C)] {
val in = new ObjectInputStream(new FileInputStream(file))
- var nextItem:(K, V) = _
+ var nextItem:(K, C) = _
var eof = false
override def hasNext: Boolean = {
@@ -126,7 +132,7 @@ class ExternalAppendOnlyMap[K, V](combineFunction: (V, V) => V,
return false
}
try {
- nextItem = in.readObject().asInstanceOf[(K, V)]
+ nextItem = in.readObject().asInstanceOf[(K, C)]
} catch {
case e: EOFException =>
eof = true
@@ -135,7 +141,7 @@ class ExternalAppendOnlyMap[K, V](combineFunction: (V, V) => V,
true
}
- override def next(): (K, V) = {
+ override def next(): (K, C) = {
if (eof) {
throw new NoSuchElementException
}