diff options
author | Mark Hamstra <markhamstra@gmail.com> | 2013-03-16 11:54:44 -0700 |
---|---|---|
committer | Mark Hamstra <markhamstra@gmail.com> | 2013-03-16 11:54:44 -0700 |
commit | 38454c4aedcc4b454d3470ed853d4741ca920db2 (patch) | |
tree | af81b3b052a5563d5613cba9cfdba728e5cdc8d0 /core/src/test/scala | |
parent | cd5b947cf64ce0c8abb4b4bf5f37550522eac8e1 (diff) | |
parent | c1e9cdc49f89222b366a14a20ffd937ca0fb9adc (diff) | |
download | spark-38454c4aedcc4b454d3470ed853d4741ca920db2.tar.gz spark-38454c4aedcc4b454d3470ed853d4741ca920db2.tar.bz2 spark-38454c4aedcc4b454d3470ed853d4741ca920db2.zip |
Merge branch 'master' of https://github.com/mesos/spark into WithThing
Diffstat (limited to 'core/src/test/scala')
-rw-r--r-- | core/src/test/scala/spark/ShuffleSuite.scala | 28 | ||||
-rw-r--r-- | core/src/test/scala/spark/scheduler/SparkListenerSuite.scala | 86 | ||||
-rw-r--r-- | core/src/test/scala/spark/util/NextIteratorSuite.scala | 68 |
3 files changed, 181 insertions, 1 deletions
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala index 8411291b2c..2b2a90defa 100644 --- a/core/src/test/scala/spark/ShuffleSuite.scala +++ b/core/src/test/scala/spark/ShuffleSuite.scala @@ -272,13 +272,39 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext { } // partitionBy so we have a narrow dependency val a = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"))).partitionBy(p) - println(sc.runJob(a, (i: Iterator[(Int, String)]) => i.toList).toList) // more partitions/no partitioner so a shuffle dependency val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4) val c = a.subtract(b) assert(c.collect().toSet === Set((1, "a"), (3, "c"))) + // Ideally we could keep the original partitioner... + assert(c.partitioner === None) + } + + test("subtractByKey") { + sc = new SparkContext("local", "test") + val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 2) + val b = sc.parallelize(Array((2, 20), (3, 30), (4, 40)), 4) + val c = a.subtractByKey(b) + assert(c.collect().toSet === Set((1, "a"), (1, "a"))) + assert(c.partitions.size === a.partitions.size) + } + + test("subtractByKey with narrow dependency") { + sc = new SparkContext("local", "test") + // use a deterministic partitioner + val p = new Partitioner() { + def numPartitions = 5 + def getPartition(key: Any) = key.asInstanceOf[Int] + } + // partitionBy so we have a narrow dependency + val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c"))).partitionBy(p) + // more partitions/no partitioner so a shuffle dependency + val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4) + val c = a.subtractByKey(b) + assert(c.collect().toSet === Set((1, "a"), (1, "a"))) assert(c.partitioner.get === p) } + } object ShuffleSuite { diff --git a/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala new file mode 100644 index 0000000000..2f5af10e69 --- /dev/null +++ b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala @@ -0,0 +1,86 @@ +package spark.scheduler + +import org.scalatest.FunSuite +import spark.{SparkContext, LocalSparkContext} +import scala.collection.mutable +import org.scalatest.matchers.ShouldMatchers +import spark.SparkContext._ + +/** + * + */ + +class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers { + + test("local metrics") { + sc = new SparkContext("local[4]", "test") + val listener = new SaveStageInfo + sc.addSparkListener(listener) + sc.addSparkListener(new StatsReportListener) + //just to make sure some of the tasks take a noticeable amount of time + val w = {i:Int => + if (i == 0) + Thread.sleep(100) + i + } + + val d = sc.parallelize(1 to 1e4.toInt, 64).map{i => w(i)} + d.count + listener.stageInfos.size should be (1) + + val d2 = d.map{i => w(i) -> i * 2}.setName("shuffle input 1") + + val d3 = d.map{i => w(i) -> (0 to (i % 5))}.setName("shuffle input 2") + + val d4 = d2.cogroup(d3, 64).map{case(k,(v1,v2)) => w(k) -> (v1.size, v2.size)} + d4.setName("A Cogroup") + + d4.collectAsMap + + listener.stageInfos.size should be (4) + listener.stageInfos.foreach {stageInfo => + //small test, so some tasks might take less than 1 millisecond, but average should be greater than 1 ms + checkNonZeroAvg(stageInfo.taskInfos.map{_._1.duration}, stageInfo + " duration") + checkNonZeroAvg(stageInfo.taskInfos.map{_._2.executorRunTime.toLong}, stageInfo + " executorRunTime") + checkNonZeroAvg(stageInfo.taskInfos.map{_._2.executorDeserializeTime.toLong}, stageInfo + " executorDeserializeTime") + if (stageInfo.stage.rdd.name == d4.name) { + checkNonZeroAvg(stageInfo.taskInfos.map{_._2.shuffleReadMetrics.get.fetchWaitTime}, stageInfo + " fetchWaitTime") + } + + stageInfo.taskInfos.foreach{case (taskInfo, taskMetrics) => + taskMetrics.resultSize should be > (0l) + if (isStage(stageInfo, Set(d2.name, d3.name), Set(d4.name))) { + taskMetrics.shuffleWriteMetrics should be ('defined) + taskMetrics.shuffleWriteMetrics.get.shuffleBytesWritten should be > (0l) + } + if (stageInfo.stage.rdd.name == d4.name) { + taskMetrics.shuffleReadMetrics should be ('defined) + val sm = taskMetrics.shuffleReadMetrics.get + sm.totalBlocksFetched should be > (0) + sm.shuffleReadMillis should be > (0l) + sm.localBlocksFetched should be > (0) + sm.remoteBlocksFetched should be (0) + sm.remoteBytesRead should be (0l) + sm.remoteFetchTime should be (0l) + } + } + } + } + + def checkNonZeroAvg(m: Traversable[Long], msg: String) { + assert(m.sum / m.size.toDouble > 0.0, msg) + } + + def isStage(stageInfo: StageInfo, rddNames: Set[String], excludedNames: Set[String]) = { + val names = Set(stageInfo.stage.rdd.name) ++ stageInfo.stage.rdd.dependencies.map{_.rdd.name} + !names.intersect(rddNames).isEmpty && names.intersect(excludedNames).isEmpty + } + + class SaveStageInfo extends SparkListener { + val stageInfos = mutable.Buffer[StageInfo]() + def onStageCompleted(stage: StageCompleted) { + stageInfos += stage.stageInfo + } + } + +} diff --git a/core/src/test/scala/spark/util/NextIteratorSuite.scala b/core/src/test/scala/spark/util/NextIteratorSuite.scala new file mode 100644 index 0000000000..ed5b36da73 --- /dev/null +++ b/core/src/test/scala/spark/util/NextIteratorSuite.scala @@ -0,0 +1,68 @@ +package spark.util + +import org.scalatest.FunSuite +import org.scalatest.matchers.ShouldMatchers +import scala.collection.mutable.Buffer +import java.util.NoSuchElementException + +class NextIteratorSuite extends FunSuite with ShouldMatchers { + test("one iteration") { + val i = new StubIterator(Buffer(1)) + i.hasNext should be === true + i.next should be === 1 + i.hasNext should be === false + intercept[NoSuchElementException] { i.next() } + } + + test("two iterations") { + val i = new StubIterator(Buffer(1, 2)) + i.hasNext should be === true + i.next should be === 1 + i.hasNext should be === true + i.next should be === 2 + i.hasNext should be === false + intercept[NoSuchElementException] { i.next() } + } + + test("empty iteration") { + val i = new StubIterator(Buffer()) + i.hasNext should be === false + intercept[NoSuchElementException] { i.next() } + } + + test("close is called once for empty iterations") { + val i = new StubIterator(Buffer()) + i.hasNext should be === false + i.hasNext should be === false + i.closeCalled should be === 1 + } + + test("close is called once for non-empty iterations") { + val i = new StubIterator(Buffer(1, 2)) + i.next should be === 1 + i.next should be === 2 + // close isn't called until we check for the next element + i.closeCalled should be === 0 + i.hasNext should be === false + i.closeCalled should be === 1 + i.hasNext should be === false + i.closeCalled should be === 1 + } + + class StubIterator(ints: Buffer[Int]) extends NextIterator[Int] { + var closeCalled = 0 + + override def getNext() = { + if (ints.size == 0) { + finished = true + 0 + } else { + ints.remove(0) + } + } + + override def close() { + closeCalled += 1 + } + } +} |