aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala14
1 files changed, 12 insertions, 2 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
index c28aa06623..2ba63da881 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
@@ -28,6 +28,8 @@ import scala.reflect.ClassTag
import org.scalactic.TripleEquals
import org.scalatest.Assertions.AssertionsHelper
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
import org.apache.spark._
import org.apache.spark.TaskState._
@@ -157,8 +159,16 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
}
// When a job fails, we terminate before waiting for all the task end events to come in,
// so there might still be a running task set. So we only check these conditions
- // when the job succeeds
- assert(taskScheduler.runningTaskSets.isEmpty)
+ // when the job succeeds.
+ // When the final task of a taskset completes, we post
+ // the event to the DAGScheduler event loop before we finish processing in the taskscheduler
+ // thread. It's possible the DAGScheduler thread processes the event, finishes the job,
+ // and notifies the job waiter before our original thread in the task scheduler finishes
+ // handling the event and marks the taskset as complete. So its ok if we need to wait a
+ // *little* bit longer for the original taskscheduler thread to finish up to deal w/ the race.
+ eventually(timeout(1 second), interval(10 millis)) {
+ assert(taskScheduler.runningTaskSets.isEmpty)
+ }
assert(!backend.hasTasks)
} else {
assert(failure != null)