aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorAli Ghodsi <alig@cs.berkeley.edu>2013-08-13 20:46:22 -0700
committerAli Ghodsi <alig@cs.berkeley.edu>2013-08-20 16:13:36 -0700
commit1ede102ba5863f6cee27437b0adbc4d54cedffb3 (patch)
treed4d575072e64103382ed248ff855898de93ac3c9 /core/src/test
parentaa2b89d98d6d195a38e36c1947d437ab7346e5c9 (diff)
downloadspark-1ede102ba5863f6cee27437b0adbc4d54cedffb3.tar.gz
spark-1ede102ba5863f6cee27437b0adbc4d54cedffb3.tar.bz2
spark-1ede102ba5863f6cee27437b0adbc4d54cedffb3.zip
load balancing coalescer
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala22
1 files changed, 22 insertions, 0 deletions
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index 75778de1cc..881bdedfe5 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -173,6 +173,28 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(coalesced5.dependencies.head.rdd.dependencies.head.rdd.asInstanceOf[ShuffledRDD[_, _, _]] !=
null)
}
+ test("cogrouped RDDs with locality") {
+ // RDD with locality preferences spread (non-randomly) over 6 machines, m0 through m5
+ val data = sc.makeRDD((1 to 9).map( i => (i, (i to (i+2)).map{ j => "m" + (j%6)} )))
+ val coalesced1 = data.coalesce(3)
+ assert(coalesced1.collect().toList.sorted === (1 to 9).toList) // no data lost (NB: order might reshuffle)
+
+ val splits = coalesced1.glom().collect().map(_.toList).toList
+ assert(splits.length === 3) // ensure it indeed created 3 partitions
+
+ assert(splits.foldLeft(true)( (x,y) => if (!x) false else y.length >= 2) === true) // descent balance (2+ per bin)
+
+ val prefs = List(List("m1","m2","m3"), List("m4","m5","m6"))
+ val data2 = sc.makeRDD((1 to 100).map( i => (i, prefs(i % 2) ))) // alternate machine prefs
+ val coalesced2 = data2.coalesce(10)
+ val splits2 = coalesced2.glom().collect().map(_.toList).toList
+
+ // this gives a list of pairs, each pair is of the form (even,odd), where even is the number of even elements...
+ val list = splits2.map( ls => ls.foldLeft((0,0))( (x,y) => if (y % 2 == 0) (x._1+1,x._2) else (x._1,x._2+1)) )
+ val maxes = list.map( { case (a,b) => if (a>b) a else b } ) // get the maxs, this represents the locality
+ maxes.foreach( locality => assert( locality > 7) ) // at least 70% locality in each partition
+
+ }
test("zipped RDDs") {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)