aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-02-02 21:42:18 -0800
committerReynold Xin <rxin@databricks.com>2015-02-02 21:42:18 -0800
commitc306555f491e45ef870f58938af397f9ec5f166a (patch)
treec611105fb375bb006c2e8661b8de1d3ea515625a /core
parent60f67e7a142f831f91f60676f94affa8add9944f (diff)
downloadspark-c306555f491e45ef870f58938af397f9ec5f166a.tar.gz
spark-c306555f491e45ef870f58938af397f9ec5f166a.tar.bz2
spark-c306555f491e45ef870f58938af397f9ec5f166a.zip
[SPARK-5219][Core] Add locks to avoid scheduling race conditions
Author: zsxwing <zsxwing@gmail.com> Closes #4019 from zsxwing/SPARK-5219 and squashes the following commits: 36a8b4e [zsxwing] Add locks to avoid race conditions
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala4
2 files changed, 3 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 33a7aae5d3..79f84e70df 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -361,7 +361,7 @@ private[spark] class TaskSchedulerImpl(
dagScheduler.executorHeartbeatReceived(execId, metricsWithStageIds, blockManagerId)
}
- def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long) {
+ def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long): Unit = synchronized {
taskSetManager.handleTaskGettingResult(tid)
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 5c94c6bbcb..97c22fe724 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -542,7 +542,7 @@ private[spark] class TaskSetManager(
/**
* Check whether has enough quota to fetch the result with `size` bytes
*/
- def canFetchMoreResults(size: Long): Boolean = synchronized {
+ def canFetchMoreResults(size: Long): Boolean = sched.synchronized {
totalResultSize += size
calculatedTasks += 1
if (maxResultSize > 0 && totalResultSize > maxResultSize) {
@@ -671,7 +671,7 @@ private[spark] class TaskSetManager(
maybeFinishTaskSet()
}
- def abort(message: String) {
+ def abort(message: String): Unit = sched.synchronized {
// TODO: Kill running tasks if we were not terminated due to a Mesos error
sched.dagScheduler.taskSetFailed(taskSet, message)
isZombie = true