aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjinxing <jinxing@meituan.com>2017-02-09 16:05:44 -0800
committerKay Ousterhout <kayousterhout@gmail.com>2017-02-09 16:05:44 -0800
commitfd6c3a0b10ce43a56df845ba66d160b77f02e576 (patch)
tree2f2dc33b6eebb5278211093970c7205659e05a6c
parent303f00a4bf6660dd83c8bd9e3a107bb3438a421b (diff)
downloadspark-fd6c3a0b10ce43a56df845ba66d160b77f02e576.tar.gz
spark-fd6c3a0b10ce43a56df845ba66d160b77f02e576.tar.bz2
spark-fd6c3a0b10ce43a56df845ba66d160b77f02e576.zip
[SPARK-19263] Fix race in SchedulerIntegrationSuite.
## What changes were proposed in this pull request? All the process of offering resource and generating `TaskDescription` should be guarded by taskScheduler.synchronized in `reviveOffers`, otherwise there is race condition. ## How was this patch tested? Existing unit tests. Author: jinxing <jinxing@meituan.com> Closes #16831 from jinxing64/SPARK-19263-FixRaceInTest.
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala14
1 files changed, 7 insertions, 7 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
index 2ba63da881..398ac3d620 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
@@ -391,17 +391,17 @@ private[spark] abstract class MockBackend(
* scheduling.
*/
override def reviveOffers(): Unit = {
- val newTaskDescriptions = taskScheduler.resourceOffers(generateOffers()).flatten
- // get the task now, since that requires a lock on TaskSchedulerImpl, to prevent individual
- // tests from introducing a race if they need it
- val newTasks = taskScheduler.synchronized {
- newTaskDescriptions.map { taskDescription =>
+ // Need a lock on the entire scheduler to protect freeCores -- otherwise, multiple threads
+ // may make offers at the same time, though they are using the same set of freeCores.
+ taskScheduler.synchronized {
+ val newTaskDescriptions = taskScheduler.resourceOffers(generateOffers()).flatten
+ // get the task now, since that requires a lock on TaskSchedulerImpl, to prevent individual
+ // tests from introducing a race if they need it.
+ val newTasks = newTaskDescriptions.map { taskDescription =>
val taskSet = taskScheduler.taskIdToTaskSetManager(taskDescription.taskId).taskSet
val task = taskSet.tasks(taskDescription.index)
(taskDescription, task)
}
- }
- synchronized {
newTasks.foreach { case (taskDescription, _) =>
executorIdToExecutor(taskDescription.executorId).freeCores -= taskScheduler.CPUS_PER_TASK
}