aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2013-12-25 23:10:53 -0800
committerAndrew Or <andrewor14@gmail.com>2013-12-26 23:40:07 -0800
commit7ad4408255e37f95e545d9c21a4460cbf98c05dd (patch)
tree8ca7f291d96429dfc5da03e7fb22632c337449a3
parentfcc443b3db3664987a6f863b59c06be7169175d5 (diff)
downloadspark-7ad4408255e37f95e545d9c21a4460cbf98c05dd.tar.gz
spark-7ad4408255e37f95e545d9c21a4460cbf98c05dd.tar.bz2
spark-7ad4408255e37f95e545d9c21a4460cbf98c05dd.zip
New minor edits
-rw-r--r--core/src/main/scala/org/apache/spark/Aggregator.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala37
-rw-r--r--core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala61
3 files changed, 49 insertions, 54 deletions
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 582625577f..8863c3175b 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -45,8 +45,8 @@ case class Aggregator[K, V, C] (
}
combiners.iterator
} else {
- // Spilling
- val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
+ val combiners =
+ new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
iter.foreach { case(k, v) => combiners.insert(k, v) }
combiners.iterator
}
@@ -66,7 +66,6 @@ case class Aggregator[K, V, C] (
}
combiners.iterator
} else {
- // Spilling
val combiners =
new ExternalAppendOnlyMap[K, C, C](Predef.identity, mergeCombiners, mergeCombiners)
iter.foreach { case(k, c) => combiners.insert(k, c) }
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 3af0376a4d..113a912f16 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -50,7 +50,6 @@ class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep])
override def hashCode(): Int = idx
}
-
/**
* A RDD that cogroups its parents. For each key k in parent RDDs, the resulting RDD contains a
* tuple with the list of values for that key.
@@ -108,7 +107,6 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
val externalSorting = System.getProperty("spark.shuffle.externalSorting", "false").toBoolean
val split = s.asInstanceOf[CoGroupPartition]
val numRdds = split.deps.size
- val ser = SparkEnv.get.serializerManager.get(serializerClass)
// A list of (rdd iterator, dependency number) pairs
val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
@@ -121,6 +119,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
case ShuffleCoGroupSplitDep(shuffleId) => {
// Read map outputs of shuffle
val fetcher = SparkEnv.get.shuffleFetcher
+ val ser = SparkEnv.get.serializerManager.get(serializerClass)
val v = (fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context, ser), depNum)
rddIterators += v
}
@@ -131,39 +130,39 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
val update: (Boolean, CoGroupCombiner) => CoGroupCombiner = (hadVal, oldVal) => {
if (hadVal) oldVal else Array.fill(numRdds)(new ArrayBuffer[Any])
}
- val getSeq = (k: K) => map.changeValue(k, update)
- rddIterators.foreach { case(iter, depNum) =>
- iter.foreach {
- case(k, v) => getSeq(k)(depNum) += v
+ rddIterators.foreach { case(it, depNum) =>
+ it.foreach { case(k, v) =>
+ map.changeValue(k, update)(depNum) += v
}
}
new InterruptibleIterator(context, map.iterator)
} else {
- // Spilling
val map = createExternalMap(numRdds)
- rddIterators.foreach { case(iter, depNum) =>
- iter.foreach {
- case(k, v) => map.insert(k, new CoGroupValue(v, depNum))
+ rddIterators.foreach { case(it, depNum) =>
+ it.foreach { case(k, v) =>
+ map.insert(k, new CoGroupValue(v, depNum))
}
}
new InterruptibleIterator(context, map.iterator)
}
}
- private def createExternalMap(numRdds: Int)
- : ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner] = {
+ private def createExternalMap(numRdds: Int):
+ ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner] = {
- val createCombiner: (CoGroupValue) => CoGroupCombiner = v => {
+ val createCombiner: (CoGroupValue => CoGroupCombiner) = value => {
val newCombiner = Array.fill(numRdds)(new CoGroup)
- v match { case (value, depNum) => newCombiner(depNum) += value }
+ value match { case(v, depNum) => newCombiner(depNum) += v }
newCombiner
}
- val mergeValue: (CoGroupCombiner, CoGroupValue) => CoGroupCombiner = (c, v) => {
- v match { case (value, depNum) => c(depNum) += value }
- c
+ val mergeValue: (CoGroupCombiner, CoGroupValue) => CoGroupCombiner =
+ (combiner, value) => {
+ value match { case(v, depNum) => combiner(depNum) += v }
+ combiner
}
- val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner = (c1, c2) => {
- c1.zipAll(c2, new CoGroup, new CoGroup).map {
+ val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner =
+ (combiner1, combiner2) => {
+ combiner1.zipAll(combiner2, new CoGroup, new CoGroup).map {
case (v1, v2) => v1 ++ v2
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
index c8c053460c..413f83862d 100644
--- a/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
@@ -28,11 +28,11 @@ import scala.util.Random
/**
* A wrapper for SpillableAppendOnlyMap that handles two cases:
*
- * (1) If a mergeCombiners function is specified, merge values into combiners before
- * disk spill, as it is possible to merge the resulting combiners later.
+ * (1) If a mergeCombiners function is specified, merge values into combiners before disk
+ * spill, as it is possible to merge the resulting combiners later.
*
- * (2) Otherwise, group values of the same key together before disk spill, and merge
- * them into combiners only after reading them back from disk.
+ * (2) Otherwise, group values of the same key together before disk spill, and merge them
+ * into combiners only after reading them back from disk.
*/
class ExternalAppendOnlyMap[K, V, C](
createCombiner: V => C,
@@ -48,8 +48,25 @@ class ExternalAppendOnlyMap[K, V, C](
new SpillableAppendOnlyMap[K, V, C, C] (createCombiner,
mergeValue, mergeCombiners, Predef.identity, memoryThresholdMB)
} else {
+ // Use ArrayBuffer[V] as the intermediate combiner
val createGroup: (V => ArrayBuffer[V]) = value => ArrayBuffer[V](value)
- new SpillableAppendOnlyMap[K, V, ArrayBuffer[V], C] (createGroup,
+ val mergeValueIntoGroup: (ArrayBuffer[V], V) => ArrayBuffer[V] = (group, value) => {
+ group += value
+ }
+ val mergeGroups: (ArrayBuffer[V], ArrayBuffer[V]) => ArrayBuffer[V] = (group1, group2) => {
+ group1 ++= group2
+ }
+ val combineGroup: (ArrayBuffer[V] => C) = group => {
+ var combiner : Option[C] = None
+ group.foreach { v =>
+ combiner match {
+ case None => combiner = Some(createCombiner(v))
+ case Some(c) => combiner = Some(mergeValue(c, v))
+ }
+ }
+ combiner.getOrElse(null.asInstanceOf[C])
+ }
+ new SpillableAppendOnlyMap[K, V, ArrayBuffer[V], C](createGroup,
mergeValueIntoGroup, mergeGroups, combineGroup, memoryThresholdMB)
}
}
@@ -57,31 +74,11 @@ class ExternalAppendOnlyMap[K, V, C](
def insert(key: K, value: V): Unit = map.insert(key, value)
override def iterator: Iterator[(K, C)] = map.iterator
-
- private def mergeValueIntoGroup(group: ArrayBuffer[V], value: V): ArrayBuffer[V] = {
- group += value
- group
- }
- private def mergeGroups(group1: ArrayBuffer[V], group2: ArrayBuffer[V]): ArrayBuffer[V] = {
- group1 ++= group2
- group1
- }
- private def combineGroup(group: ArrayBuffer[V]): C = {
- var combiner : Option[C] = None
- group.foreach { v =>
- combiner match {
- case None => combiner = Some(createCombiner(v))
- case Some(c) => combiner = Some(mergeValue(c, v))
- }
- }
- combiner.get
- }
}
/**
- * An append-only map that spills sorted content to disk when the memory threshold
- * is exceeded. A group with type M is an intermediate combiner, and shares the same
- * type as either C or ArrayBuffer[V].
+ * An append-only map that spills sorted content to disk when the memory threshold is exceeded.
+ * A group is an intermediate combiner, with type M equal to either C or ArrayBuffer[V].
*/
class SpillableAppendOnlyMap[K, V, M, C](
createGroup: V => M,
@@ -96,7 +93,7 @@ class SpillableAppendOnlyMap[K, V, M, C](
var oldMaps = new ArrayBuffer[DiskIterator]
def insert(key: K, value: V): Unit = {
- def update(hadVal: Boolean, oldVal: M): M = {
+ val update: (Boolean, M) => M = (hadVal, oldVal) => {
if (hadVal) mergeValue(oldVal, value) else createGroup(value)
}
currentMap.changeValue(key, update)
@@ -128,11 +125,11 @@ class SpillableAppendOnlyMap[K, V, M, C](
inputStreams.foreach(readFromIterator)
// Read from the given iterator until a key of different hash is retrieved
- def readFromIterator(iter: Iterator[(K, M)]): Unit = {
+ def readFromIterator(it: Iterator[(K, M)]): Unit = {
var minHash : Option[Int] = None
- while (iter.hasNext) {
- val (k, m) = iter.next()
- pq.enqueue(KMITuple(k, m, iter))
+ while (it.hasNext) {
+ val (k, m) = it.next()
+ pq.enqueue(KMITuple(k, m, it))
minHash match {
case None => minHash = Some(k.hashCode())
case Some(expectedHash) if k.hashCode() != expectedHash => return