From c306555f491e45ef870f58938af397f9ec5f166a Mon Sep 17 00:00:00 2001 From: zsxwing Date: Mon, 2 Feb 2015 21:42:18 -0800 Subject: [SPARK-5219][Core] Add locks to avoid scheduling race conditions Author: zsxwing Closes #4019 from zsxwing/SPARK-5219 and squashes the following commits: 36a8b4e [zsxwing] Add locks to avoid race conditions --- .../src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 33a7aae5d3..79f84e70df 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -361,7 +361,7 @@ private[spark] class TaskSchedulerImpl( dagScheduler.executorHeartbeatReceived(execId, metricsWithStageIds, blockManagerId) } - def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long) { + def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long): Unit = synchronized { taskSetManager.handleTaskGettingResult(tid) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 5c94c6bbcb..97c22fe724 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -542,7 +542,7 @@ private[spark] class TaskSetManager( /** * Check whether has enough quota to fetch the result with `size` bytes */ - def canFetchMoreResults(size: Long): Boolean = synchronized { + def canFetchMoreResults(size: Long): Boolean = sched.synchronized { totalResultSize += size calculatedTasks += 1 if (maxResultSize > 0 && totalResultSize > maxResultSize) { @@ -671,7 +671,7 @@ private[spark] class TaskSetManager( maybeFinishTaskSet() } - def abort(message: String) { + def abort(message: String): Unit = sched.synchronized { // TODO: Kill running tasks if we were not terminated due to a Mesos error sched.dagScheduler.taskSetFailed(taskSet, message) isZombie = true -- cgit v1.2.3