diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2017-04-12 17:44:18 -0700 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2017-04-12 17:44:18 -0700 |
commit | c5f1cc370f0aa1f0151fd34251607a8de861395e (patch) | |
tree | 4f0c270e84225cb5e87852db51e2205bc37f5e6f /core/src | |
parent | a7b430b5717e263c1fbb55114deca6028ea9c3b3 (diff) | |
download | spark-c5f1cc370f0aa1f0151fd34251607a8de861395e.tar.gz spark-c5f1cc370f0aa1f0151fd34251607a8de861395e.tar.bz2 spark-c5f1cc370f0aa1f0151fd34251607a8de861395e.zip |
[SPARK-20131][CORE] Don't use `this` lock in StandaloneSchedulerBackend.stop
## What changes were proposed in this pull request?
`o.a.s.streaming.StreamingContextSuite.SPARK-18560 Receiver data should be deserialized properly` is flaky is because there is a potential dead-lock in StandaloneSchedulerBackend which causes `await` timeout. Here is the related stack trace:
```
"Thread-31" #211 daemon prio=5 os_prio=31 tid=0x00007fedd4808000 nid=0x16403 waiting on condition [0x00007000239b7000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000079b49ca10> (a scala.concurrent.impl.Promise$CompletionLatch)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208)
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:76)
at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stop(CoarseGrainedSchedulerBackend.scala:402)
at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.org$apache$spark$scheduler$cluster$StandaloneSchedulerBackend$$stop(StandaloneSchedulerBackend.scala:213)
- locked <0x00000007066fca38> (a org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend)
at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.stop(StandaloneSchedulerBackend.scala:116)
- locked <0x00000007066fca38> (a org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend)
at org.apache.spark.scheduler.TaskSchedulerImpl.stop(TaskSchedulerImpl.scala:517)
at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1657)
at org.apache.spark.SparkContext$$anonfun$stop$8.apply$mcV$sp(SparkContext.scala:1921)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1302)
at org.apache.spark.SparkContext.stop(SparkContext.scala:1920)
at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:708)
at org.apache.spark.streaming.StreamingContextSuite$$anonfun$43$$anonfun$apply$mcV$sp$66$$anon$3.run(StreamingContextSuite.scala:827)
"dispatcher-event-loop-3" #18 daemon prio=5 os_prio=31 tid=0x00007fedd603a000 nid=0x6203 waiting for monitor entry [0x0000700003be4000]
java.lang.Thread.State: BLOCKED (on object monitor)
at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$DriverEndpoint$$makeOffers(CoarseGrainedSchedulerBackend.scala:253)
- waiting to lock <0x00000007066fca38> (a org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend)
at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:124)
at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
```
This PR removes `synchronized` and changes `stopping` to AtomicBoolean to ensure idempotent to fix the dead-lock.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #17610 from zsxwing/SPARK-20131.
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala | 33 |
1 files changed, 17 insertions, 16 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 7befdb0c1f..0529fe9eed 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler.cluster import java.util.concurrent.Semaphore +import java.util.concurrent.atomic.AtomicBoolean import scala.concurrent.Future @@ -42,7 +43,7 @@ private[spark] class StandaloneSchedulerBackend( with Logging { private var client: StandaloneAppClient = null - private var stopping = false + private val stopping = new AtomicBoolean(false) private val launcherBackend = new LauncherBackend() { override protected def onStopRequest(): Unit = stop(SparkAppHandle.State.KILLED) } @@ -112,7 +113,7 @@ private[spark] class StandaloneSchedulerBackend( launcherBackend.setState(SparkAppHandle.State.RUNNING) } - override def stop(): Unit = synchronized { + override def stop(): Unit = { stop(SparkAppHandle.State.FINISHED) } @@ -125,14 +126,14 @@ private[spark] class StandaloneSchedulerBackend( override def disconnected() { notifyContext() - if (!stopping) { + if (!stopping.get) { logWarning("Disconnected from Spark cluster! Waiting for reconnection...") } } override def dead(reason: String) { notifyContext() - if (!stopping) { + if (!stopping.get) { launcherBackend.setState(SparkAppHandle.State.KILLED) logError("Application has been killed. Reason: " + reason) try { @@ -206,20 +207,20 @@ private[spark] class StandaloneSchedulerBackend( registrationBarrier.release() } - private def stop(finalState: SparkAppHandle.State): Unit = synchronized { - try { - stopping = true - - super.stop() - client.stop() + private def stop(finalState: SparkAppHandle.State): Unit = { + if (stopping.compareAndSet(false, true)) { + try { + super.stop() + client.stop() - val callback = shutdownCallback - if (callback != null) { - callback(this) + val callback = shutdownCallback + if (callback != null) { + callback(this) + } + } finally { + launcherBackend.setState(finalState) + launcherBackend.close() } - } finally { - launcherBackend.setState(finalState) - launcherBackend.close() } } |