aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2013-12-26 14:26:22 -0800
committerAndrew Or <andrewor14@gmail.com>2013-12-26 23:40:07 -0800
commit64b2d54a02ba885c29bdfeaf36701721352de8e6 (patch)
tree4bccc1d5a5ed1e06647699ccc5a008da01c171b7
parent804beb43bebe50e88814c0ca702a51571cd044e7 (diff)
downloadspark-64b2d54a02ba885c29bdfeaf36701721352de8e6.tar.gz
spark-64b2d54a02ba885c29bdfeaf36701721352de8e6.tar.bz2
spark-64b2d54a02ba885c29bdfeaf36701721352de8e6.zip
Move maps to util, and refactor more
-rw-r--r--core/src/main/scala/org/apache/spark/Aggregator.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala (renamed from core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala)2
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala (renamed from core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala)31
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/util/SamplingSizeTrackerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala (renamed from core/src/test/scala/org/apache/spark/util/AppendOnlyMapSuite.scala)2
7 files changed, 22 insertions, 23 deletions
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 8863c3175b..59e5102d3e 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -17,7 +17,7 @@
package org.apache.spark
-import org.apache.spark.util.{AppendOnlyMap, ExternalAppendOnlyMap}
+import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap}
/**
* A set of functions used to aggregate data.
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 113a912f16..3a549b7b4a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv, TaskContext}
import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency}
-import org.apache.spark.util.{AppendOnlyMap, ExternalAppendOnlyMap}
+import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap}
private[spark] sealed trait CoGroupSplitDep extends Serializable
diff --git a/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
index 899cd6ac14..cb0ca8f8c1 100644
--- a/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.util
+package org.apache.spark.util.collection
/**
* A simple open hash table optimized for the append-only use case, where keys
diff --git a/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index b97b28282a..4bda763ffe 100644
--- a/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -15,14 +15,11 @@
* limitations under the License.
*/
-package org.apache.spark.util
+package org.apache.spark.util.collection
import java.io._
-
import scala.collection.mutable.{ArrayBuffer, PriorityQueue}
-import org.apache.spark.util.collection.SizeTrackingAppendOnlyMap
-
/**
* A wrapper for SpillableAppendOnlyMap that handles two cases:
*
@@ -35,16 +32,15 @@ import org.apache.spark.util.collection.SizeTrackingAppendOnlyMap
class ExternalAppendOnlyMap[K, V, C](
createCombiner: V => C,
mergeValue: (C, V) => C,
- mergeCombiners: (C, C) => C,
- memoryThresholdMB: Long = 1024)
+ mergeCombiners: (C, C) => C)
extends Iterable[(K, C)] with Serializable {
private val mergeBeforeSpill: Boolean = mergeCombiners != null
private val map: SpillableAppendOnlyMap[K, V, _, C] = {
if (mergeBeforeSpill) {
- new SpillableAppendOnlyMap[K, V, C, C] (createCombiner,
- mergeValue, mergeCombiners, Predef.identity, memoryThresholdMB)
+ new SpillableAppendOnlyMap[K, V, C, C] (createCombiner, mergeValue,
+ mergeCombiners, Predef.identity)
} else {
// Use ArrayBuffer[V] as the intermediate combiner
val createGroup: (V => ArrayBuffer[V]) = value => ArrayBuffer[V](value)
@@ -64,8 +60,8 @@ class ExternalAppendOnlyMap[K, V, C](
}
combiner.getOrElse(null.asInstanceOf[C])
}
- new SpillableAppendOnlyMap[K, V, ArrayBuffer[V], C](createGroup,
- mergeValueIntoGroup, mergeGroups, combineGroup, memoryThresholdMB)
+ new SpillableAppendOnlyMap[K, V, ArrayBuffer[V], C](createGroup, mergeValueIntoGroup,
+ mergeGroups, combineGroup)
}
}
@@ -82,26 +78,29 @@ class SpillableAppendOnlyMap[K, V, M, C](
createGroup: V => M,
mergeValue: (M, V) => M,
mergeGroups: (M, M) => M,
- createCombiner: M => C,
- memoryThresholdMB: Long = 1024)
+ createCombiner: M => C)
extends Iterable[(K, C)] with Serializable {
var currentMap = new SizeTrackingAppendOnlyMap[K, M]
- var oldMaps = new ArrayBuffer[DiskIterator]
+ val oldMaps = new ArrayBuffer[DiskIterator]
+ val memoryThreshold = {
+ val bufferSize = System.getProperty("spark.shuffle.buffer", "1024").toLong * 1024 * 1024
+ val bufferPercent = System.getProperty("spark.shuffle.buffer.percent", "0.8").toFloat
+ bufferSize * bufferPercent
+ }
def insert(key: K, value: V): Unit = {
val update: (Boolean, M) => M = (hadVal, oldVal) => {
if (hadVal) mergeValue(oldVal, value) else createGroup(value)
}
currentMap.changeValue(key, update)
- // TODO: Make sure we're only using some % of the actual threshold due to error
- if (currentMap.estimateSize() > memoryThresholdMB * 1024 * 1024) {
+ if (currentMap.estimateSize() > memoryThreshold) {
spill()
}
}
def spill(): Unit = {
- val file = File.createTempFile("external_append_only_map", "") // Add spill location
+ val file = File.createTempFile("external_append_only_map", "")
val out = new ObjectOutputStream(new FileOutputStream(file))
val sortedMap = currentMap.iterator.toList.sortBy(kv => kv._1.hashCode())
sortedMap.foreach(out.writeObject)
diff --git a/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala
index 2b2417efd9..738908a660 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala
@@ -1,6 +1,6 @@
package org.apache.spark.util.collection
-import org.apache.spark.util.{AppendOnlyMap, SamplingSizeTracker}
+import org.apache.spark.util.SamplingSizeTracker
/** Append-only map that keeps track of its estimated size in bytes. */
class SizeTrackingAppendOnlyMap[K, V] extends AppendOnlyMap[K, V] {
diff --git a/core/src/test/scala/org/apache/spark/util/SamplingSizeTrackerSuite.scala b/core/src/test/scala/org/apache/spark/util/SamplingSizeTrackerSuite.scala
index bd3ff5ff41..6b772131a7 100644
--- a/core/src/test/scala/org/apache/spark/util/SamplingSizeTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/SamplingSizeTrackerSuite.scala
@@ -5,7 +5,7 @@ import scala.util.Random
import org.scalatest.{BeforeAndAfterAll, FunSuite}
import org.apache.spark.util.SamplingSizeTrackerSuite.LargeDummyClass
-import org.apache.spark.util.collection.SizeTrackingAppendOnlyMap
+import org.apache.spark.util.collection.{AppendOnlyMap, SizeTrackingAppendOnlyMap}
class SamplingSizeTrackerSuite extends FunSuite with BeforeAndAfterAll {
val NORMAL_ERROR = 0.20
@@ -100,4 +100,4 @@ object SamplingSizeTrackerSuite {
private class LargeDummyClass {
val arr = new Array[Int](100)
}
-} \ No newline at end of file
+}
diff --git a/core/src/test/scala/org/apache/spark/util/AppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala
index 7177919a58..7e7aa7800d 100644
--- a/core/src/test/scala/org/apache/spark/util/AppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.util
+package org.apache.spark.util.collection
import scala.collection.mutable.HashSet