aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala33
1 files changed, 17 insertions, 16 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 7befdb0c1f..0529fe9eed 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -18,6 +18,7 @@
package org.apache.spark.scheduler.cluster
import java.util.concurrent.Semaphore
+import java.util.concurrent.atomic.AtomicBoolean
import scala.concurrent.Future
@@ -42,7 +43,7 @@ private[spark] class StandaloneSchedulerBackend(
with Logging {
private var client: StandaloneAppClient = null
- private var stopping = false
+ private val stopping = new AtomicBoolean(false)
private val launcherBackend = new LauncherBackend() {
override protected def onStopRequest(): Unit = stop(SparkAppHandle.State.KILLED)
}
@@ -112,7 +113,7 @@ private[spark] class StandaloneSchedulerBackend(
launcherBackend.setState(SparkAppHandle.State.RUNNING)
}
- override def stop(): Unit = synchronized {
+ override def stop(): Unit = {
stop(SparkAppHandle.State.FINISHED)
}
@@ -125,14 +126,14 @@ private[spark] class StandaloneSchedulerBackend(
override def disconnected() {
notifyContext()
- if (!stopping) {
+ if (!stopping.get) {
logWarning("Disconnected from Spark cluster! Waiting for reconnection...")
}
}
override def dead(reason: String) {
notifyContext()
- if (!stopping) {
+ if (!stopping.get) {
launcherBackend.setState(SparkAppHandle.State.KILLED)
logError("Application has been killed. Reason: " + reason)
try {
@@ -206,20 +207,20 @@ private[spark] class StandaloneSchedulerBackend(
registrationBarrier.release()
}
- private def stop(finalState: SparkAppHandle.State): Unit = synchronized {
- try {
- stopping = true
-
- super.stop()
- client.stop()
+ private def stop(finalState: SparkAppHandle.State): Unit = {
+ if (stopping.compareAndSet(false, true)) {
+ try {
+ super.stop()
+ client.stop()
- val callback = shutdownCallback
- if (callback != null) {
- callback(this)
+ val callback = shutdownCallback
+ if (callback != null) {
+ callback(this)
+ }
+ } finally {
+ launcherBackend.setState(finalState)
+ launcherBackend.close()
}
- } finally {
- launcherBackend.setState(finalState)
- launcherBackend.close()
}
}