diff options
author | Andrew Or <andrewor14@gmail.com> | 2014-01-04 00:00:57 -0800 |
---|---|---|
committer | Andrew Or <andrewor14@gmail.com> | 2014-01-04 00:00:57 -0800 |
commit | 4296d96c82881cde5832bd8f8a3b48eb9817a218 (patch) | |
tree | b2445be45520fac4d69210a9f93f2a2999d6ee28 /core/src/test/scala | |
parent | 333d58df8676b30adc86e479579e2659e24d01a3 (diff) | |
download | spark-4296d96c82881cde5832bd8f8a3b48eb9817a218.tar.gz spark-4296d96c82881cde5832bd8f8a3b48eb9817a218.tar.bz2 spark-4296d96c82881cde5832bd8f8a3b48eb9817a218.zip |
Assign spill threshold as a fraction of maximum memory
Further, divide this threshold by the number of tasks running concurrently.
Note that this does not guard against the following scenario: a new task
quickly fills up its share of the memory before old tasks finish spilling
their contents, in which case the total memory used by such maps may exceed
what was specified. Currently, spark.shuffle.safetyFraction mitigates the
effect of this.
Diffstat (limited to 'core/src/test/scala')
-rw-r--r-- | core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala | 17 |
1 files changed, 9 insertions, 8 deletions
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index 6c93b1f5a0..ef957bb0e5 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -5,15 +5,13 @@ import scala.collection.mutable.ArrayBuffer import org.scalatest.{BeforeAndAfter, FunSuite} import org.apache.spark._ -import org.apache.spark.SparkContext.rddToPairRDDFunctions +import org.apache.spark.SparkContext._ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with LocalSparkContext { override def beforeEach() { val conf = new SparkConf(false) conf.set("spark.shuffle.externalSorting", "true") - conf.set("spark.shuffle.buffer.mb", "1024") - conf.set("spark.shuffle.buffer.fraction", "0.8") sc = new SparkContext("local", "test", conf) } @@ -27,14 +25,14 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local } test("simple insert") { - var map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner, + val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner, mergeValue, mergeCombiners) // Single insert map.insert(1, 10) var it = map.iterator assert(it.hasNext) - var kv = it.next() + val kv = it.next() assert(kv._1 == 1 && kv._2 == ArrayBuffer[Int](10)) assert(!it.hasNext) @@ -59,7 +57,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local map.insert(1, 100) map.insert(2, 200) map.insert(1, 1000) - var it = map.iterator + val it = map.iterator assert(it.hasNext) val result = it.toSet[(Int, ArrayBuffer[Int])].map(kv => (kv._1, kv._2.toSet)) assert(result == Set[(Int, Set[Int])]( @@ -177,8 +175,9 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local } test("spilling") { - System.setProperty("spark.shuffle.buffer.mb", "1") - System.setProperty("spark.shuffle.buffer.fraction", "0.05") + // TODO: Figure out correct memory parameters to actually induce spilling + // System.setProperty("spark.shuffle.buffer.mb", "1") + // System.setProperty("spark.shuffle.buffer.fraction", "0.05") // reduceByKey - should spill exactly 6 times val rddA = sc.parallelize(0 until 10000).map(i => (i/2, i)) @@ -226,4 +225,6 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local } } } + + // TODO: Test memory allocation for multiple concurrently running tasks } |