aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorImran Rashid <imran@quantifind.com>2013-03-09 21:17:31 -0800
committerImran Rashid <imran@quantifind.com>2013-03-09 21:17:31 -0800
commit20f01a0a1be1349990bb86426f99f4f446432f0c (patch)
tree8e34fdaf2d9cd06131d51ea4ad22e882445eb496 /core/src/test
parentec30188a2ad6325356a5ccfe85cefee6872e8646 (diff)
downloadspark-20f01a0a1be1349990bb86426f99f4f446432f0c.tar.gz
spark-20f01a0a1be1349990bb86426f99f4f446432f0c.tar.bz2
spark-20f01a0a1be1349990bb86426f99f4f446432f0c.zip
enable task metrics in local mode, add tests
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/spark/scheduler/SparkListenerSuite.scala80
1 files changed, 80 insertions, 0 deletions
diff --git a/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala
new file mode 100644
index 0000000000..dd9f2d7e91
--- /dev/null
+++ b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala
@@ -0,0 +1,80 @@
+package spark.scheduler
+
+import org.scalatest.FunSuite
+import spark.{SparkContext, LocalSparkContext}
+import scala.collection.mutable
+import org.scalatest.matchers.ShouldMatchers
+import spark.SparkContext._
+
+/**
+ *
+ */
+
+class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
+
+ test("local metrics") {
+ sc = new SparkContext("local[4]", "test")
+ val listener = new SaveStageInfo
+ sc.addSparkListener(listener)
+ sc.addSparkListener(new StatsReportListener)
+
+ val d = sc.parallelize(1 to 1e4.toInt, 64)
+ d.count
+ listener.stageInfos.size should be (1)
+
+ val d2 = d.map{i => i -> i * 2}.setName("shuffle input 1")
+
+ val d3 = d.map{i => i -> (0 to (i % 5))}.setName("shuffle input 2")
+
+ val d4 = d2.cogroup(d3, 64).map{case(k,(v1,v2)) => k -> (v1.size, v2.size)}
+ d4.setName("A Cogroup")
+
+ d4.collectAsMap
+
+ listener.stageInfos.size should be (4)
+ listener.stageInfos.foreach {stageInfo =>
+ //small test, so some tasks might take less than 1 millisecond, but average should be greater than 1 ms
+ checkNonZeroAvg(stageInfo.taskInfos.map{_._1.duration}, stageInfo + " duration")
+ checkNonZeroAvg(stageInfo.taskInfos.map{_._2.executorRunTime.toLong}, stageInfo + " executorRunTime")
+ checkNonZeroAvg(stageInfo.taskInfos.map{_._2.executorDeserializeTime.toLong}, stageInfo + " executorDeserializeTime")
+ if (stageInfo.stage.rdd.name == d4.name) {
+ checkNonZeroAvg(stageInfo.taskInfos.map{_._2.shuffleReadMetrics.get.fetchWaitTime}, stageInfo + " fetchWaitTime")
+ }
+
+ stageInfo.taskInfos.foreach{case (taskInfo, taskMetrics) =>
+ taskMetrics.resultSize should be > (0l)
+ if (isStage(stageInfo, Set(d2.name, d3.name), Set(d4.name))) {
+ taskMetrics.shuffleWriteMetrics should be ('defined)
+ taskMetrics.shuffleWriteMetrics.get.shuffleBytesWritten should be > (0l)
+ }
+ if (stageInfo.stage.rdd.name == d4.name) {
+ taskMetrics.shuffleReadMetrics should be ('defined)
+ val sm = taskMetrics.shuffleReadMetrics.get
+ sm.totalBlocksFetched should be > (0)
+ sm.shuffleReadMillis should be > (0l)
+ sm.localBlocksFetched should be > (0)
+ sm.remoteBlocksFetched should be (0)
+ sm.remoteBytesRead should be (0l)
+ sm.remoteFetchTime should be (0l)
+ }
+ }
+ }
+ }
+
+ def checkNonZeroAvg(m: Traversable[Long], msg: String) {
+ assert(m.sum / m.size.toDouble > 0.0, msg)
+ }
+
+ def isStage(stageInfo: StageInfo, rddNames: Set[String], excludedNames: Set[String]) = {
+ val names = Set(stageInfo.stage.rdd.name) ++ stageInfo.stage.rdd.dependencies.map{_.rdd.name}
+ !names.intersect(rddNames).isEmpty && names.intersect(excludedNames).isEmpty
+ }
+
+ class SaveStageInfo extends SparkListener {
+ val stageInfos = mutable.Buffer[StageInfo]()
+ def onStageCompleted(stage: StageCompleted) {
+ stageInfos += stage.stageInfo
+ }
+ }
+
+}