aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala10
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala68
3 files changed, 54 insertions, 25 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 5cb1af9db0..c3adc28685 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -83,6 +83,7 @@ private[spark] class TaskSchedulerImpl(
// on this class.
private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, TaskSetManager]]
+ // Protected by `this`
private[scheduler] val taskIdToTaskSetManager = new HashMap[Long, TaskSetManager]
val taskIdToExecutorId = new HashMap[Long, String]
diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
index 6c9d4fb6f3..3a4b7af71b 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
@@ -30,12 +30,12 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
* all tasks.
*/
def badHostBackend(): Unit = {
- val task = backend.beginTask()
- val host = backend.executorIdToExecutor(task.executorId).host
+ val (taskDescription, _) = backend.beginTask()
+ val host = backend.executorIdToExecutor(taskDescription.executorId).host
if (host == badHost) {
- backend.taskFailed(task, new RuntimeException("I'm a bad host!"))
+ backend.taskFailed(taskDescription, new RuntimeException("I'm a bad host!"))
} else {
- backend.taskSuccess(task, 42)
+ backend.taskSuccess(taskDescription, 42)
}
}
@@ -48,7 +48,6 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
val duration = Duration(1, SECONDS)
Await.ready(jobFuture, duration)
}
- assert(results.isEmpty)
assertDataStructuresEmpty(noFailure = false)
}
@@ -68,7 +67,6 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
val duration = Duration(3, SECONDS)
Await.ready(jobFuture, duration)
}
- assert(results.isEmpty)
assertDataStructuresEmpty(noFailure = false)
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
index 02aa5caa73..92bd76548e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
@@ -89,7 +89,26 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
}
}
+ /**
+ * A map from partition -> results for all tasks of a job when you call this test framework's
+ * [[submit]] method. Two important considerations:
+ *
+ * 1. If there is a job failure, results may or may not be empty. If any tasks succeed before
+ * the job has failed, they will get included in `results`. Instead, check for job failure by
+ * checking [[failure]]. (Also see [[assertDataStructuresEmpty()]])
+ *
+ * 2. This only gets cleared between tests. So you'll need to do special handling if you submit
+ * more than one job in one test.
+ */
val results = new HashMap[Int, Any]()
+
+ /**
+ * If a call to [[submit]] results in a job failure, this will hold the exception, else it will
+ * be null.
+ *
+ * As with [[results]], this only gets cleared between tests, so care must be taken if you are
+ * submitting more than one job in one test.
+ */
var failure: Throwable = _
/**
@@ -113,6 +132,11 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
}
}
+ /**
+ * Helper to run a few common asserts after a job has completed, in particular some internal
+ * datastructures for bookkeeping. This only does a very minimal check for whether the job
+ * failed or succeeded -- often you will want extra asserts on [[results]] or [[failure]].
+ */
protected def assertDataStructuresEmpty(noFailure: Boolean = true): Unit = {
if (noFailure) {
if (failure != null) {
@@ -133,6 +157,8 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
// when the job succeeds
assert(taskScheduler.runningTaskSets.isEmpty)
assert(!backend.hasTasks)
+ } else {
+ assert(failure != null)
}
assert(scheduler.activeJobs.isEmpty)
}
@@ -217,10 +243,10 @@ private[spark] abstract class MockBackend(
* Test backends should call this to get a task that has been assigned to them by the scheduler.
* Each task should be responded to with either [[taskSuccess]] or [[taskFailed]].
*/
- def beginTask(): TaskDescription = {
+ def beginTask(): (TaskDescription, Task[_]) = {
synchronized {
val toRun = assignedTasksWaitingToRun.remove(assignedTasksWaitingToRun.size - 1)
- runningTasks += toRun
+ runningTasks += toRun._1.taskId
toRun
}
}
@@ -255,7 +281,7 @@ private[spark] abstract class MockBackend(
taskScheduler.statusUpdate(task.taskId, state, resultBytes)
if (TaskState.isFinished(state)) {
synchronized {
- runningTasks -= task
+ runningTasks -= task.taskId
executorIdToExecutor(task.executorId).freeCores += taskScheduler.CPUS_PER_TASK
freeCores += taskScheduler.CPUS_PER_TASK
}
@@ -264,9 +290,9 @@ private[spark] abstract class MockBackend(
}
// protected by this
- private val assignedTasksWaitingToRun = new ArrayBuffer[TaskDescription](10000)
+ private val assignedTasksWaitingToRun = new ArrayBuffer[(TaskDescription, Task[_])](10000)
// protected by this
- private val runningTasks = ArrayBuffer[TaskDescription]()
+ private val runningTasks = HashSet[Long]()
def hasTasks: Boolean = synchronized {
assignedTasksWaitingToRun.nonEmpty || runningTasks.nonEmpty
@@ -307,10 +333,19 @@ private[spark] abstract class MockBackend(
*/
override def reviveOffers(): Unit = {
val offers: Seq[WorkerOffer] = generateOffers()
- val newTasks = taskScheduler.resourceOffers(offers).flatten
+ val newTaskDescriptions = taskScheduler.resourceOffers(offers).flatten
+ // get the task now, since that requires a lock on TaskSchedulerImpl, to prevent individual
+ // tests from introducing a race if they need it
+ val newTasks = taskScheduler.synchronized {
+ newTaskDescriptions.map { taskDescription =>
+ val taskSet = taskScheduler.taskIdToTaskSetManager(taskDescription.taskId).taskSet
+ val task = taskSet.tasks(taskDescription.index)
+ (taskDescription, task)
+ }
+ }
synchronized {
- newTasks.foreach { task =>
- executorIdToExecutor(task.executorId).freeCores -= taskScheduler.CPUS_PER_TASK
+ newTasks.foreach { case (taskDescription, _) =>
+ executorIdToExecutor(taskDescription.executorId).freeCores -= taskScheduler.CPUS_PER_TASK
}
freeCores -= newTasks.size * taskScheduler.CPUS_PER_TASK
assignedTasksWaitingToRun ++= newTasks
@@ -437,8 +472,8 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
*/
testScheduler("super simple job") {
def runBackend(): Unit = {
- val task = backend.beginTask()
- backend.taskSuccess(task, 42)
+ val (taskDescripition, _) = backend.beginTask()
+ backend.taskSuccess(taskDescripition, 42)
}
withBackend(runBackend _) {
val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
@@ -473,9 +508,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
val d = join(30, b, c)
def runBackend(): Unit = {
- val taskDescription = backend.beginTask()
- val taskSet = taskScheduler.taskIdToTaskSetManager(taskDescription.taskId).taskSet
- val task = taskSet.tasks(taskDescription.index)
+ val (taskDescription, task) = backend.beginTask()
// make sure the required map output is available
task.stageId match {
@@ -515,9 +548,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
val stageToAttempts = new HashMap[Int, HashSet[Int]]()
def runBackend(): Unit = {
- val taskDescription = backend.beginTask()
- val taskSet = taskScheduler.taskIdToTaskSetManager(taskDescription.taskId).taskSet
- val task = taskSet.tasks(taskDescription.index)
+ val (taskDescription, task) = backend.beginTask()
stageToAttempts.getOrElseUpdate(task.stageId, new HashSet()) += task.stageAttemptId
// make sure the required map output is available
@@ -549,8 +580,8 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
testScheduler("job failure after 4 attempts") {
def runBackend(): Unit = {
- val task = backend.beginTask()
- backend.taskFailed(task, new RuntimeException("test task failure"))
+ val (taskDescription, _) = backend.beginTask()
+ backend.taskFailed(taskDescription, new RuntimeException("test task failure"))
}
withBackend(runBackend _) {
val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
@@ -558,7 +589,6 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
Await.ready(jobFuture, duration)
failure.getMessage.contains("test task failure")
}
- assert(results.isEmpty)
assertDataStructuresEmpty(noFailure = false)
}
}