aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2014-01-04 00:00:57 -0800
committerAndrew Or <andrewor14@gmail.com>2014-01-04 00:00:57 -0800
commit4296d96c82881cde5832bd8f8a3b48eb9817a218 (patch)
treeb2445be45520fac4d69210a9f93f2a2999d6ee28 /core/src/test/scala
parent333d58df8676b30adc86e479579e2659e24d01a3 (diff)
downloadspark-4296d96c82881cde5832bd8f8a3b48eb9817a218.tar.gz
spark-4296d96c82881cde5832bd8f8a3b48eb9817a218.tar.bz2
spark-4296d96c82881cde5832bd8f8a3b48eb9817a218.zip
Assign spill threshold as a fraction of maximum memory
Further, divide this threshold by the number of tasks running concurrently. Note that this does not guard against the following scenario: a new task quickly fills up its share of the memory before old tasks finish spilling their contents, in which case the total memory used by such maps may exceed what was specified. Currently, spark.shuffle.safetyFraction mitigates the effect of this.
Diffstat (limited to 'core/src/test/scala')
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala17
1 files changed, 9 insertions, 8 deletions
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index 6c93b1f5a0..ef957bb0e5 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -5,15 +5,13 @@ import scala.collection.mutable.ArrayBuffer
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.apache.spark._
-import org.apache.spark.SparkContext.rddToPairRDDFunctions
+import org.apache.spark.SparkContext._
class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
override def beforeEach() {
val conf = new SparkConf(false)
conf.set("spark.shuffle.externalSorting", "true")
- conf.set("spark.shuffle.buffer.mb", "1024")
- conf.set("spark.shuffle.buffer.fraction", "0.8")
sc = new SparkContext("local", "test", conf)
}
@@ -27,14 +25,14 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local
}
test("simple insert") {
- var map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
+ val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
mergeValue, mergeCombiners)
// Single insert
map.insert(1, 10)
var it = map.iterator
assert(it.hasNext)
- var kv = it.next()
+ val kv = it.next()
assert(kv._1 == 1 && kv._2 == ArrayBuffer[Int](10))
assert(!it.hasNext)
@@ -59,7 +57,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local
map.insert(1, 100)
map.insert(2, 200)
map.insert(1, 1000)
- var it = map.iterator
+ val it = map.iterator
assert(it.hasNext)
val result = it.toSet[(Int, ArrayBuffer[Int])].map(kv => (kv._1, kv._2.toSet))
assert(result == Set[(Int, Set[Int])](
@@ -177,8 +175,9 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local
}
test("spilling") {
- System.setProperty("spark.shuffle.buffer.mb", "1")
- System.setProperty("spark.shuffle.buffer.fraction", "0.05")
+ // TODO: Figure out correct memory parameters to actually induce spilling
+ // System.setProperty("spark.shuffle.buffer.mb", "1")
+ // System.setProperty("spark.shuffle.buffer.fraction", "0.05")
// reduceByKey - should spill exactly 6 times
val rddA = sc.parallelize(0 until 10000).map(i => (i/2, i))
@@ -226,4 +225,6 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local
}
}
}
+
+ // TODO: Test memory allocation for multiple concurrently running tasks
}