aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMark Hamstra <markhamstra@gmail.com>2013-11-26 13:30:17 -0800
committerMark Hamstra <markhamstra@gmail.com>2013-11-26 13:30:17 -0800
commited7ecb93ce6ce259eae1f5aeb28e9e336fafa30f (patch)
tree7cd3fc95b030ef114cdd46bac857c4af9aa8e30c /core
parentcb976dfb50d62b8a686b6249ff3b24c0837bd613 (diff)
downloadspark-ed7ecb93ce6ce259eae1f5aeb28e9e336fafa30f.tar.gz
spark-ed7ecb93ce6ce259eae1f5aeb28e9e336fafa30f.tar.bz2
spark-ed7ecb93ce6ce259eae1f5aeb28e9e336fafa30f.zip
[SPARK-963] Wait for SparkListenerBus eventQueue to be empty before checking jobLogger state
Diffstat (limited to 'core')
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala7
1 files changed, 6 insertions, 1 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
index 984881861c..002368ff55 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
@@ -31,6 +31,7 @@ import org.apache.spark.rdd.RDD
class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
+ val WAIT_TIMEOUT_MILLIS = 10000
test("inner method") {
sc = new SparkContext("local", "joblogger")
@@ -92,6 +93,8 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
val rdd = sc.parallelize(1 to 1e2.toInt, 4).map{ i => (i % 12, 2 * i) }
rdd.reduceByKey(_+_).collect()
+ assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+
val user = System.getProperty("user.name", SparkContext.SPARK_UNKNOWN_USER)
joblogger.getLogDir should be ("/tmp/spark-%s".format(user))
@@ -120,7 +123,9 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
sc.addSparkListener(joblogger)
val rdd = sc.parallelize(1 to 1e2.toInt, 4).map{ i => (i % 12, 2 * i) }
rdd.reduceByKey(_+_).collect()
-
+
+ assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+
joblogger.onJobStartCount should be (1)
joblogger.onJobEndCount should be (1)
joblogger.onTaskEndCount should be (8)