diff options
author | Andrew Or <andrewor14@gmail.com> | 2013-12-26 14:26:22 -0800 |
---|---|---|
committer | Andrew Or <andrewor14@gmail.com> | 2013-12-26 23:40:07 -0800 |
commit | 64b2d54a02ba885c29bdfeaf36701721352de8e6 (patch) | |
tree | 4bccc1d5a5ed1e06647699ccc5a008da01c171b7 | |
parent | 804beb43bebe50e88814c0ca702a51571cd044e7 (diff) | |
download | spark-64b2d54a02ba885c29bdfeaf36701721352de8e6.tar.gz spark-64b2d54a02ba885c29bdfeaf36701721352de8e6.tar.bz2 spark-64b2d54a02ba885c29bdfeaf36701721352de8e6.zip |
Move maps to util, and refactor more
-rw-r--r-- | core/src/main/scala/org/apache/spark/Aggregator.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala (renamed from core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala) | 2 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala (renamed from core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala) | 31 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala | 2 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/util/SamplingSizeTrackerSuite.scala | 4 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala (renamed from core/src/test/scala/org/apache/spark/util/AppendOnlyMapSuite.scala) | 2 |
7 files changed, 22 insertions, 23 deletions
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index 8863c3175b..59e5102d3e 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -17,7 +17,7 @@ package org.apache.spark -import org.apache.spark.util.{AppendOnlyMap, ExternalAppendOnlyMap} +import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap} /** * A set of functions used to aggregate data. diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index 113a912f16..3a549b7b4a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv, TaskContext} import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency} -import org.apache.spark.util.{AppendOnlyMap, ExternalAppendOnlyMap} +import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap} private[spark] sealed trait CoGroupSplitDep extends Serializable diff --git a/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala index 899cd6ac14..cb0ca8f8c1 100644 --- a/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.util +package org.apache.spark.util.collection /** * A simple open hash table optimized for the append-only use case, where keys diff --git a/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index b97b28282a..4bda763ffe 100644 --- a/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -15,14 +15,11 @@ * limitations under the License. */ -package org.apache.spark.util +package org.apache.spark.util.collection import java.io._ - import scala.collection.mutable.{ArrayBuffer, PriorityQueue} -import org.apache.spark.util.collection.SizeTrackingAppendOnlyMap - /** * A wrapper for SpillableAppendOnlyMap that handles two cases: * @@ -35,16 +32,15 @@ import org.apache.spark.util.collection.SizeTrackingAppendOnlyMap class ExternalAppendOnlyMap[K, V, C]( createCombiner: V => C, mergeValue: (C, V) => C, - mergeCombiners: (C, C) => C, - memoryThresholdMB: Long = 1024) + mergeCombiners: (C, C) => C) extends Iterable[(K, C)] with Serializable { private val mergeBeforeSpill: Boolean = mergeCombiners != null private val map: SpillableAppendOnlyMap[K, V, _, C] = { if (mergeBeforeSpill) { - new SpillableAppendOnlyMap[K, V, C, C] (createCombiner, - mergeValue, mergeCombiners, Predef.identity, memoryThresholdMB) + new SpillableAppendOnlyMap[K, V, C, C] (createCombiner, mergeValue, + mergeCombiners, Predef.identity) } else { // Use ArrayBuffer[V] as the intermediate combiner val createGroup: (V => ArrayBuffer[V]) = value => ArrayBuffer[V](value) @@ -64,8 +60,8 @@ class ExternalAppendOnlyMap[K, V, C]( } combiner.getOrElse(null.asInstanceOf[C]) } - new SpillableAppendOnlyMap[K, V, ArrayBuffer[V], C](createGroup, - mergeValueIntoGroup, mergeGroups, combineGroup, memoryThresholdMB) + new SpillableAppendOnlyMap[K, V, ArrayBuffer[V], C](createGroup, mergeValueIntoGroup, + mergeGroups, combineGroup) } } @@ -82,26 +78,29 @@ class SpillableAppendOnlyMap[K, V, M, C]( createGroup: V => M, mergeValue: (M, V) => M, mergeGroups: (M, M) => M, - createCombiner: M => C, - memoryThresholdMB: Long = 1024) + createCombiner: M => C) extends Iterable[(K, C)] with Serializable { var currentMap = new SizeTrackingAppendOnlyMap[K, M] - var oldMaps = new ArrayBuffer[DiskIterator] + val oldMaps = new ArrayBuffer[DiskIterator] + val memoryThreshold = { + val bufferSize = System.getProperty("spark.shuffle.buffer", "1024").toLong * 1024 * 1024 + val bufferPercent = System.getProperty("spark.shuffle.buffer.percent", "0.8").toFloat + bufferSize * bufferPercent + } def insert(key: K, value: V): Unit = { val update: (Boolean, M) => M = (hadVal, oldVal) => { if (hadVal) mergeValue(oldVal, value) else createGroup(value) } currentMap.changeValue(key, update) - // TODO: Make sure we're only using some % of the actual threshold due to error - if (currentMap.estimateSize() > memoryThresholdMB * 1024 * 1024) { + if (currentMap.estimateSize() > memoryThreshold) { spill() } } def spill(): Unit = { - val file = File.createTempFile("external_append_only_map", "") // Add spill location + val file = File.createTempFile("external_append_only_map", "") val out = new ObjectOutputStream(new FileOutputStream(file)) val sortedMap = currentMap.iterator.toList.sortBy(kv => kv._1.hashCode()) sortedMap.foreach(out.writeObject) diff --git a/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala index 2b2417efd9..738908a660 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala @@ -1,6 +1,6 @@ package org.apache.spark.util.collection -import org.apache.spark.util.{AppendOnlyMap, SamplingSizeTracker} +import org.apache.spark.util.SamplingSizeTracker /** Append-only map that keeps track of its estimated size in bytes. */ class SizeTrackingAppendOnlyMap[K, V] extends AppendOnlyMap[K, V] { diff --git a/core/src/test/scala/org/apache/spark/util/SamplingSizeTrackerSuite.scala b/core/src/test/scala/org/apache/spark/util/SamplingSizeTrackerSuite.scala index bd3ff5ff41..6b772131a7 100644 --- a/core/src/test/scala/org/apache/spark/util/SamplingSizeTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/SamplingSizeTrackerSuite.scala @@ -5,7 +5,7 @@ import scala.util.Random import org.scalatest.{BeforeAndAfterAll, FunSuite} import org.apache.spark.util.SamplingSizeTrackerSuite.LargeDummyClass -import org.apache.spark.util.collection.SizeTrackingAppendOnlyMap +import org.apache.spark.util.collection.{AppendOnlyMap, SizeTrackingAppendOnlyMap} class SamplingSizeTrackerSuite extends FunSuite with BeforeAndAfterAll { val NORMAL_ERROR = 0.20 @@ -100,4 +100,4 @@ object SamplingSizeTrackerSuite { private class LargeDummyClass { val arr = new Array[Int](100) } -}
\ No newline at end of file +} diff --git a/core/src/test/scala/org/apache/spark/util/AppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala index 7177919a58..7e7aa7800d 100644 --- a/core/src/test/scala/org/apache/spark/util/AppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.util +package org.apache.spark.util.collection import scala.collection.mutable.HashSet |