aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorImran Rashid <irashid@cloudera.com>2016-06-03 11:49:33 -0500
committerImran Rashid <irashid@cloudera.com>2016-06-03 11:49:33 -0500
commitc2f0cb4f6380c500f9ba37b2429503b762204973 (patch)
treed7fbc6945dea32a2e71e8aee927d3cf7658506b7
parent190ff274fd71662023a804cf98400c71f9f7da4f (diff)
downloadspark-c2f0cb4f6380c500f9ba37b2429503b762204973.tar.gz
spark-c2f0cb4f6380c500f9ba37b2429503b762204973.tar.bz2
spark-c2f0cb4f6380c500f9ba37b2429503b762204973.zip
[SPARK-15714][CORE] Fix flaky o.a.s.scheduler.BlacklistIntegrationSuite
## What changes were proposed in this pull request? BlacklistIntegrationSuite (introduced by SPARK-10372) is a bit flaky because of some race conditions: 1. Failed jobs might have non-empty results, because the resultHandler will be invoked for successful tasks (if there are task successes before failures) 2. taskScheduler.taskIdToTaskSetManager must be protected by a lock on taskScheduler (1) has failed a handful of jenkins builds recently. I don't think I've seen (2) in jenkins, but I've run into with some uncommitted tests I'm working on where there are lots more tasks. While I was in there, I also made an unrelated fix to `runningTasks`in the test framework -- there was a pointless `O(n)` operation to remove completed tasks, could be `O(1)`. ## How was this patch tested? I modified the o.a.s.scheduler.BlacklistIntegrationSuite to have it run the tests 1k times on my laptop. It failed 11 times before this change, and none with it. (Pretty sure all the failures were problem (1), though I didn't check all of them). Also the full suite of tests via jenkins. Author: Imran Rashid <irashid@cloudera.com> Closes #13454 from squito/SPARK-15714.
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala10
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala68
3 files changed, 54 insertions, 25 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 5cb1af9db0..c3adc28685 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -83,6 +83,7 @@ private[spark] class TaskSchedulerImpl(
// on this class.
private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, TaskSetManager]]
+ // Protected by `this`
private[scheduler] val taskIdToTaskSetManager = new HashMap[Long, TaskSetManager]
val taskIdToExecutorId = new HashMap[Long, String]
diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
index 6c9d4fb6f3..3a4b7af71b 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
@@ -30,12 +30,12 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
* all tasks.
*/
def badHostBackend(): Unit = {
- val task = backend.beginTask()
- val host = backend.executorIdToExecutor(task.executorId).host
+ val (taskDescription, _) = backend.beginTask()
+ val host = backend.executorIdToExecutor(taskDescription.executorId).host
if (host == badHost) {
- backend.taskFailed(task, new RuntimeException("I'm a bad host!"))
+ backend.taskFailed(taskDescription, new RuntimeException("I'm a bad host!"))
} else {
- backend.taskSuccess(task, 42)
+ backend.taskSuccess(taskDescription, 42)
}
}
@@ -48,7 +48,6 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
val duration = Duration(1, SECONDS)
Await.ready(jobFuture, duration)
}
- assert(results.isEmpty)
assertDataStructuresEmpty(noFailure = false)
}
@@ -68,7 +67,6 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
val duration = Duration(3, SECONDS)
Await.ready(jobFuture, duration)
}
- assert(results.isEmpty)
assertDataStructuresEmpty(noFailure = false)
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
index 02aa5caa73..92bd76548e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
@@ -89,7 +89,26 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
}
}
+ /**
+ * A map from partition -> results for all tasks of a job when you call this test framework's
+ * [[submit]] method. Two important considerations:
+ *
+ * 1. If there is a job failure, results may or may not be empty. If any tasks succeed before
+ * the job has failed, they will get included in `results`. Instead, check for job failure by
+ * checking [[failure]]. (Also see [[assertDataStructuresEmpty()]])
+ *
+ * 2. This only gets cleared between tests. So you'll need to do special handling if you submit
+ * more than one job in one test.
+ */
val results = new HashMap[Int, Any]()
+
+ /**
+ * If a call to [[submit]] results in a job failure, this will hold the exception, else it will
+ * be null.
+ *
+ * As with [[results]], this only gets cleared between tests, so care must be taken if you are
+ * submitting more than one job in one test.
+ */
var failure: Throwable = _
/**
@@ -113,6 +132,11 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
}
}
+ /**
+ * Helper to run a few common asserts after a job has completed, in particular some internal
+ * datastructures for bookkeeping. This only does a very minimal check for whether the job
+ * failed or succeeded -- often you will want extra asserts on [[results]] or [[failure]].
+ */
protected def assertDataStructuresEmpty(noFailure: Boolean = true): Unit = {
if (noFailure) {
if (failure != null) {
@@ -133,6 +157,8 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
// when the job succeeds
assert(taskScheduler.runningTaskSets.isEmpty)
assert(!backend.hasTasks)
+ } else {
+ assert(failure != null)
}
assert(scheduler.activeJobs.isEmpty)
}
@@ -217,10 +243,10 @@ private[spark] abstract class MockBackend(
* Test backends should call this to get a task that has been assigned to them by the scheduler.
* Each task should be responded to with either [[taskSuccess]] or [[taskFailed]].
*/
- def beginTask(): TaskDescription = {
+ def beginTask(): (TaskDescription, Task[_]) = {
synchronized {
val toRun = assignedTasksWaitingToRun.remove(assignedTasksWaitingToRun.size - 1)
- runningTasks += toRun
+ runningTasks += toRun._1.taskId
toRun
}
}
@@ -255,7 +281,7 @@ private[spark] abstract class MockBackend(
taskScheduler.statusUpdate(task.taskId, state, resultBytes)
if (TaskState.isFinished(state)) {
synchronized {
- runningTasks -= task
+ runningTasks -= task.taskId
executorIdToExecutor(task.executorId).freeCores += taskScheduler.CPUS_PER_TASK
freeCores += taskScheduler.CPUS_PER_TASK
}
@@ -264,9 +290,9 @@ private[spark] abstract class MockBackend(
}
// protected by this
- private val assignedTasksWaitingToRun = new ArrayBuffer[TaskDescription](10000)
+ private val assignedTasksWaitingToRun = new ArrayBuffer[(TaskDescription, Task[_])](10000)
// protected by this
- private val runningTasks = ArrayBuffer[TaskDescription]()
+ private val runningTasks = HashSet[Long]()
def hasTasks: Boolean = synchronized {
assignedTasksWaitingToRun.nonEmpty || runningTasks.nonEmpty
@@ -307,10 +333,19 @@ private[spark] abstract class MockBackend(
*/
override def reviveOffers(): Unit = {
val offers: Seq[WorkerOffer] = generateOffers()
- val newTasks = taskScheduler.resourceOffers(offers).flatten
+ val newTaskDescriptions = taskScheduler.resourceOffers(offers).flatten
+ // get the task now, since that requires a lock on TaskSchedulerImpl, to prevent individual
+ // tests from introducing a race if they need it
+ val newTasks = taskScheduler.synchronized {
+ newTaskDescriptions.map { taskDescription =>
+ val taskSet = taskScheduler.taskIdToTaskSetManager(taskDescription.taskId).taskSet
+ val task = taskSet.tasks(taskDescription.index)
+ (taskDescription, task)
+ }
+ }
synchronized {
- newTasks.foreach { task =>
- executorIdToExecutor(task.executorId).freeCores -= taskScheduler.CPUS_PER_TASK
+ newTasks.foreach { case (taskDescription, _) =>
+ executorIdToExecutor(taskDescription.executorId).freeCores -= taskScheduler.CPUS_PER_TASK
}
freeCores -= newTasks.size * taskScheduler.CPUS_PER_TASK
assignedTasksWaitingToRun ++= newTasks
@@ -437,8 +472,8 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
*/
testScheduler("super simple job") {
def runBackend(): Unit = {
- val task = backend.beginTask()
- backend.taskSuccess(task, 42)
+ val (taskDescripition, _) = backend.beginTask()
+ backend.taskSuccess(taskDescripition, 42)
}
withBackend(runBackend _) {
val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
@@ -473,9 +508,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
val d = join(30, b, c)
def runBackend(): Unit = {
- val taskDescription = backend.beginTask()
- val taskSet = taskScheduler.taskIdToTaskSetManager(taskDescription.taskId).taskSet
- val task = taskSet.tasks(taskDescription.index)
+ val (taskDescription, task) = backend.beginTask()
// make sure the required map output is available
task.stageId match {
@@ -515,9 +548,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
val stageToAttempts = new HashMap[Int, HashSet[Int]]()
def runBackend(): Unit = {
- val taskDescription = backend.beginTask()
- val taskSet = taskScheduler.taskIdToTaskSetManager(taskDescription.taskId).taskSet
- val task = taskSet.tasks(taskDescription.index)
+ val (taskDescription, task) = backend.beginTask()
stageToAttempts.getOrElseUpdate(task.stageId, new HashSet()) += task.stageAttemptId
// make sure the required map output is available
@@ -549,8 +580,8 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
testScheduler("job failure after 4 attempts") {
def runBackend(): Unit = {
- val task = backend.beginTask()
- backend.taskFailed(task, new RuntimeException("test task failure"))
+ val (taskDescription, _) = backend.beginTask()
+ backend.taskFailed(taskDescription, new RuntimeException("test task failure"))
}
withBackend(runBackend _) {
val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
@@ -558,7 +589,6 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
Await.ready(jobFuture, duration)
failure.getMessage.contains("test task failure")
}
- assert(results.isEmpty)
assertDataStructuresEmpty(noFailure = false)
}
}