diff options
author | Ali Ghodsi <alig@cs.berkeley.edu> | 2013-08-13 20:46:22 -0700 |
---|---|---|
committer | Ali Ghodsi <alig@cs.berkeley.edu> | 2013-08-20 16:13:36 -0700 |
commit | 1ede102ba5863f6cee27437b0adbc4d54cedffb3 (patch) | |
tree | d4d575072e64103382ed248ff855898de93ac3c9 /core/src/test | |
parent | aa2b89d98d6d195a38e36c1947d437ab7346e5c9 (diff) | |
download | spark-1ede102ba5863f6cee27437b0adbc4d54cedffb3.tar.gz spark-1ede102ba5863f6cee27437b0adbc4d54cedffb3.tar.bz2 spark-1ede102ba5863f6cee27437b0adbc4d54cedffb3.zip |
load balancing coalescer
Diffstat (limited to 'core/src/test')
-rw-r--r-- | core/src/test/scala/spark/RDDSuite.scala | 22 |
1 files changed, 22 insertions, 0 deletions
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 75778de1cc..881bdedfe5 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -173,6 +173,28 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(coalesced5.dependencies.head.rdd.dependencies.head.rdd.asInstanceOf[ShuffledRDD[_, _, _]] != null) } + test("cogrouped RDDs with locality") { + // RDD with locality preferences spread (non-randomly) over 6 machines, m0 through m5 + val data = sc.makeRDD((1 to 9).map( i => (i, (i to (i+2)).map{ j => "m" + (j%6)} ))) + val coalesced1 = data.coalesce(3) + assert(coalesced1.collect().toList.sorted === (1 to 9).toList) // no data lost (NB: order might reshuffle) + + val splits = coalesced1.glom().collect().map(_.toList).toList + assert(splits.length === 3) // ensure it indeed created 3 partitions + + assert(splits.foldLeft(true)( (x,y) => if (!x) false else y.length >= 2) === true) // descent balance (2+ per bin) + + val prefs = List(List("m1","m2","m3"), List("m4","m5","m6")) + val data2 = sc.makeRDD((1 to 100).map( i => (i, prefs(i % 2) ))) // alternate machine prefs + val coalesced2 = data2.coalesce(10) + val splits2 = coalesced2.glom().collect().map(_.toList).toList + + // this gives a list of pairs, each pair is of the form (even,odd), where even is the number of even elements... + val list = splits2.map( ls => ls.foldLeft((0,0))( (x,y) => if (y % 2 == 0) (x._1+1,x._2) else (x._1,x._2+1)) ) + val maxes = list.map( { case (a,b) => if (a>b) a else b } ) // get the maxs, this represents the locality + maxes.foreach( locality => assert( locality > 7) ) // at least 70% locality in each partition + + } test("zipped RDDs") { val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) |