aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2014-01-10 18:33:48 -0800
committerAndrew Or <andrewor14@gmail.com>2014-01-10 18:33:48 -0800
commite6447152b323a8fdf71ae3a8c1086ba6948e7512 (patch)
tree790e89fb45d76ba80df77b135a4b83e4fdd9adaa /core/src/test
parent2e393cd5fdfbf3a85fced370b5c42315e86dad49 (diff)
downloadspark-e6447152b323a8fdf71ae3a8c1086ba6948e7512.tar.gz
spark-e6447152b323a8fdf71ae3a8c1086ba6948e7512.tar.bz2
spark-e6447152b323a8fdf71ae3a8c1086ba6948e7512.zip
Induce spilling in ExternalAppendOnlyMapSuite
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala77
1 files changed, 44 insertions, 33 deletions
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index ef957bb0e5..c3391f3e53 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -9,22 +9,19 @@ import org.apache.spark.SparkContext._
class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
- override def beforeEach() {
- val conf = new SparkConf(false)
- conf.set("spark.shuffle.externalSorting", "true")
- sc = new SparkContext("local", "test", conf)
- }
-
- val createCombiner: (Int => ArrayBuffer[Int]) = i => ArrayBuffer[Int](i)
- val mergeValue: (ArrayBuffer[Int], Int) => ArrayBuffer[Int] = (buffer, i) => {
+ private val createCombiner: (Int => ArrayBuffer[Int]) = i => ArrayBuffer[Int](i)
+ private val mergeValue: (ArrayBuffer[Int], Int) => ArrayBuffer[Int] = (buffer, i) => {
buffer += i
}
- val mergeCombiners: (ArrayBuffer[Int], ArrayBuffer[Int]) => ArrayBuffer[Int] =
+ private val mergeCombiners: (ArrayBuffer[Int], ArrayBuffer[Int]) => ArrayBuffer[Int] =
(buf1, buf2) => {
buf1 ++= buf2
}
test("simple insert") {
+ val conf = new SparkConf(false)
+ sc = new SparkContext("local", "test", conf)
+
val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
mergeValue, mergeCombiners)
@@ -48,6 +45,9 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local
}
test("insert with collision") {
+ val conf = new SparkConf(false)
+ sc = new SparkContext("local", "test", conf)
+
val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
mergeValue, mergeCombiners)
@@ -67,6 +67,9 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local
}
test("ordering") {
+ val conf = new SparkConf(false)
+ sc = new SparkContext("local", "test", conf)
+
val map1 = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
mergeValue, mergeCombiners)
map1.insert(1, 10)
@@ -109,6 +112,9 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local
}
test("null keys and values") {
+ val conf = new SparkConf(false)
+ sc = new SparkContext("local", "test", conf)
+
val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
mergeValue, mergeCombiners)
map.insert(1, 5)
@@ -147,6 +153,9 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local
}
test("simple aggregator") {
+ val conf = new SparkConf(false)
+ sc = new SparkContext("local", "test", conf)
+
// reduceByKey
val rdd = sc.parallelize(1 to 10).map(i => (i%2, 1))
val result1 = rdd.reduceByKey(_+_).collect()
@@ -159,6 +168,8 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local
}
test("simple cogroup") {
+ val conf = new SparkConf(false)
+ sc = new SparkContext("local", "test", conf)
val rdd1 = sc.parallelize(1 to 4).map(i => (i, i))
val rdd2 = sc.parallelize(1 to 4).map(i => (i%2, i))
val result = rdd1.cogroup(rdd2).collect()
@@ -175,56 +186,56 @@ class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with Local
}
test("spilling") {
- // TODO: Figure out correct memory parameters to actually induce spilling
- // System.setProperty("spark.shuffle.buffer.mb", "1")
- // System.setProperty("spark.shuffle.buffer.fraction", "0.05")
+ // TODO: Use SparkConf (which currently throws connection reset exception)
+ System.setProperty("spark.shuffle.memoryFraction", "0.001")
+ sc = new SparkContext("local-cluster[1,1,512]", "test")
- // reduceByKey - should spill exactly 6 times
- val rddA = sc.parallelize(0 until 10000).map(i => (i/2, i))
+ // reduceByKey - should spill ~8 times
+ val rddA = sc.parallelize(0 until 100000).map(i => (i/2, i))
val resultA = rddA.reduceByKey(math.max(_, _)).collect()
- assert(resultA.length == 5000)
+ assert(resultA.length == 50000)
resultA.foreach { case(k, v) =>
k match {
case 0 => assert(v == 1)
- case 2500 => assert(v == 5001)
- case 4999 => assert(v == 9999)
+ case 25000 => assert(v == 50001)
+ case 49999 => assert(v == 99999)
case _ =>
}
}
- // groupByKey - should spill exactly 11 times
- val rddB = sc.parallelize(0 until 10000).map(i => (i/4, i))
+ // groupByKey - should spill ~17 times
+ val rddB = sc.parallelize(0 until 100000).map(i => (i/4, i))
val resultB = rddB.groupByKey().collect()
- assert(resultB.length == 2500)
+ assert(resultB.length == 25000)
resultB.foreach { case(i, seq) =>
i match {
case 0 => assert(seq.toSet == Set[Int](0, 1, 2, 3))
- case 1250 => assert(seq.toSet == Set[Int](5000, 5001, 5002, 5003))
- case 2499 => assert(seq.toSet == Set[Int](9996, 9997, 9998, 9999))
+ case 12500 => assert(seq.toSet == Set[Int](50000, 50001, 50002, 50003))
+ case 24999 => assert(seq.toSet == Set[Int](99996, 99997, 99998, 99999))
case _ =>
}
}
- // cogroup - should spill exactly 7 times
- val rddC1 = sc.parallelize(0 until 1000).map(i => (i, i))
- val rddC2 = sc.parallelize(0 until 1000).map(i => (i%100, i))
+ // cogroup - should spill ~7 times
+ val rddC1 = sc.parallelize(0 until 10000).map(i => (i, i))
+ val rddC2 = sc.parallelize(0 until 10000).map(i => (i%1000, i))
val resultC = rddC1.cogroup(rddC2).collect()
- assert(resultC.length == 1000)
+ assert(resultC.length == 10000)
resultC.foreach { case(i, (seq1, seq2)) =>
i match {
case 0 =>
assert(seq1.toSet == Set[Int](0))
- assert(seq2.toSet == Set[Int](0, 100, 200, 300, 400, 500, 600, 700, 800, 900))
- case 500 =>
- assert(seq1.toSet == Set[Int](500))
+ assert(seq2.toSet == Set[Int](0, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000))
+ case 5000 =>
+ assert(seq1.toSet == Set[Int](5000))
assert(seq2.toSet == Set[Int]())
- case 999 =>
- assert(seq1.toSet == Set[Int](999))
+ case 9999 =>
+ assert(seq1.toSet == Set[Int](9999))
assert(seq2.toSet == Set[Int]())
case _ =>
}
}
- }
- // TODO: Test memory allocation for multiple concurrently running tasks
+ System.clearProperty("spark.shuffle.memoryFraction")
+ }
}