diff options
author | zsxwing <zsxwing@gmail.com> | 2015-02-02 21:42:18 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-02-02 21:42:18 -0800 |
commit | c306555f491e45ef870f58938af397f9ec5f166a (patch) | |
tree | c611105fb375bb006c2e8661b8de1d3ea515625a /core | |
parent | 60f67e7a142f831f91f60676f94affa8add9944f (diff) | |
download | spark-c306555f491e45ef870f58938af397f9ec5f166a.tar.gz spark-c306555f491e45ef870f58938af397f9ec5f166a.tar.bz2 spark-c306555f491e45ef870f58938af397f9ec5f166a.zip |
[SPARK-5219][Core] Add locks to avoid scheduling race conditions
Author: zsxwing <zsxwing@gmail.com>
Closes #4019 from zsxwing/SPARK-5219 and squashes the following commits:
36a8b4e [zsxwing] Add locks to avoid race conditions
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 4 |
2 files changed, 3 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 33a7aae5d3..79f84e70df 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -361,7 +361,7 @@ private[spark] class TaskSchedulerImpl( dagScheduler.executorHeartbeatReceived(execId, metricsWithStageIds, blockManagerId) } - def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long) { + def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long): Unit = synchronized { taskSetManager.handleTaskGettingResult(tid) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 5c94c6bbcb..97c22fe724 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -542,7 +542,7 @@ private[spark] class TaskSetManager( /** * Check whether has enough quota to fetch the result with `size` bytes */ - def canFetchMoreResults(size: Long): Boolean = synchronized { + def canFetchMoreResults(size: Long): Boolean = sched.synchronized { totalResultSize += size calculatedTasks += 1 if (maxResultSize > 0 && totalResultSize > maxResultSize) { @@ -671,7 +671,7 @@ private[spark] class TaskSetManager( maybeFinishTaskSet() } - def abort(message: String) { + def abort(message: String): Unit = sched.synchronized { // TODO: Kill running tasks if we were not terminated due to a Mesos error sched.dagScheduler.taskSetFailed(taskSet, message) isZombie = true |