diff options
author | Kousuke Saruta <sarutak@oss.nttdata.co.jp> | 2015-02-18 12:20:11 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2015-02-18 12:20:11 +0000 |
commit | 82197ed3bd4b8c29b0c4b183994753f0e02b6903 (patch) | |
tree | 02e46509b73a4d51b08331f2ae07895d4da678d1 | |
parent | e79a7a626d9ac2e2474b9d5008c6b5d07df5c6f1 (diff) | |
download | spark-82197ed3bd4b8c29b0c4b183994753f0e02b6903.tar.gz spark-82197ed3bd4b8c29b0c4b183994753f0e02b6903.tar.bz2 spark-82197ed3bd4b8c29b0c4b183994753f0e02b6903.zip |
[SPARK-4949]shutdownCallback in SparkDeploySchedulerBackend should be enclosed by synchronized block.
A variable `shutdownCallback` in SparkDeploySchedulerBackend can be accessed from multiple threads so it should be enclosed by synchronized block.
Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>
Closes #3781 from sarutak/SPARK-4949 and squashes the following commits:
c146c93 [Kousuke Saruta] Removed "setShutdownCallback" method
c7265dc [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-4949
42ca528 [Kousuke Saruta] Changed the declaration of the variable "shutdownCallback" as a volatile reference instead of AtomicReference
552df7c [Kousuke Saruta] Changed the declaration of the variable "shutdownCallback" as a volatile reference instead of AtomicReference
f556819 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-4949
1b60fd1 [Kousuke Saruta] Improved the locking logics
5942765 [Kousuke Saruta] Enclosed shutdownCallback in SparkDeploySchedulerBackend by synchronized block
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala | 35 |
1 files changed, 16 insertions, 19 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 40fc6b59cd..a0aa555f62 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -17,6 +17,8 @@ package org.apache.spark.scheduler.cluster +import java.util.concurrent.Semaphore + import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv} import org.apache.spark.deploy.{ApplicationDescription, Command} import org.apache.spark.deploy.client.{AppClient, AppClientListener} @@ -31,16 +33,16 @@ private[spark] class SparkDeploySchedulerBackend( with AppClientListener with Logging { - var client: AppClient = null - var stopping = false - var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _ - @volatile var appId: String = _ + private var client: AppClient = null + private var stopping = false + + @volatile var shutdownCallback: SparkDeploySchedulerBackend => Unit = _ + @volatile private var appId: String = _ - val registrationLock = new Object() - var registrationDone = false + private val registrationBarrier = new Semaphore(0) - val maxCores = conf.getOption("spark.cores.max").map(_.toInt) - val totalExpectedCores = maxCores.getOrElse(0) + private val maxCores = conf.getOption("spark.cores.max").map(_.toInt) + private val totalExpectedCores = maxCores.getOrElse(0) override def start() { super.start() @@ -95,8 +97,10 @@ private[spark] class SparkDeploySchedulerBackend( stopping = true super.stop() client.stop() - if (shutdownCallback != null) { - shutdownCallback(this) + + val callback = shutdownCallback + if (callback != null) { + callback(this) } } @@ -149,18 +153,11 @@ private[spark] class SparkDeploySchedulerBackend( } private def waitForRegistration() = { - registrationLock.synchronized { - while (!registrationDone) { - registrationLock.wait() - } - } + registrationBarrier.acquire() } private def notifyContext() = { - registrationLock.synchronized { - registrationDone = true - registrationLock.notifyAll() - } + registrationBarrier.release() } } |