aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorKousuke Saruta <sarutak@oss.nttdata.co.jp>2015-02-18 12:20:11 +0000
committerSean Owen <sowen@cloudera.com>2015-02-18 12:20:11 +0000
commit82197ed3bd4b8c29b0c4b183994753f0e02b6903 (patch)
tree02e46509b73a4d51b08331f2ae07895d4da678d1 /core
parente79a7a626d9ac2e2474b9d5008c6b5d07df5c6f1 (diff)
downloadspark-82197ed3bd4b8c29b0c4b183994753f0e02b6903.tar.gz
spark-82197ed3bd4b8c29b0c4b183994753f0e02b6903.tar.bz2
spark-82197ed3bd4b8c29b0c4b183994753f0e02b6903.zip
[SPARK-4949]shutdownCallback in SparkDeploySchedulerBackend should be enclosed by synchronized block.
A variable `shutdownCallback` in SparkDeploySchedulerBackend can be accessed from multiple threads so it should be enclosed by synchronized block. Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp> Closes #3781 from sarutak/SPARK-4949 and squashes the following commits: c146c93 [Kousuke Saruta] Removed "setShutdownCallback" method c7265dc [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-4949 42ca528 [Kousuke Saruta] Changed the declaration of the variable "shutdownCallback" as a volatile reference instead of AtomicReference 552df7c [Kousuke Saruta] Changed the declaration of the variable "shutdownCallback" as a volatile reference instead of AtomicReference f556819 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-4949 1b60fd1 [Kousuke Saruta] Improved the locking logics 5942765 [Kousuke Saruta] Enclosed shutdownCallback in SparkDeploySchedulerBackend by synchronized block
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala35
1 files changed, 16 insertions, 19 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 40fc6b59cd..a0aa555f62 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -17,6 +17,8 @@
package org.apache.spark.scheduler.cluster
+import java.util.concurrent.Semaphore
+
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.deploy.{ApplicationDescription, Command}
import org.apache.spark.deploy.client.{AppClient, AppClientListener}
@@ -31,16 +33,16 @@ private[spark] class SparkDeploySchedulerBackend(
with AppClientListener
with Logging {
- var client: AppClient = null
- var stopping = false
- var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
- @volatile var appId: String = _
+ private var client: AppClient = null
+ private var stopping = false
+
+ @volatile var shutdownCallback: SparkDeploySchedulerBackend => Unit = _
+ @volatile private var appId: String = _
- val registrationLock = new Object()
- var registrationDone = false
+ private val registrationBarrier = new Semaphore(0)
- val maxCores = conf.getOption("spark.cores.max").map(_.toInt)
- val totalExpectedCores = maxCores.getOrElse(0)
+ private val maxCores = conf.getOption("spark.cores.max").map(_.toInt)
+ private val totalExpectedCores = maxCores.getOrElse(0)
override def start() {
super.start()
@@ -95,8 +97,10 @@ private[spark] class SparkDeploySchedulerBackend(
stopping = true
super.stop()
client.stop()
- if (shutdownCallback != null) {
- shutdownCallback(this)
+
+ val callback = shutdownCallback
+ if (callback != null) {
+ callback(this)
}
}
@@ -149,18 +153,11 @@ private[spark] class SparkDeploySchedulerBackend(
}
private def waitForRegistration() = {
- registrationLock.synchronized {
- while (!registrationDone) {
- registrationLock.wait()
- }
- }
+ registrationBarrier.acquire()
}
private def notifyContext() = {
- registrationLock.synchronized {
- registrationDone = true
- registrationLock.notifyAll()
- }
+ registrationBarrier.release()
}
}