aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-11-12 14:29:16 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2015-11-12 14:29:16 -0800
commit767d288b6b33a79d99324b70c2ac079fcf484a50 (patch)
tree405830e799677dda7a8edaa469077fca9977e091 /core
parentd292f74831de7e69c852ed26d9c15df85b4fb568 (diff)
downloadspark-767d288b6b33a79d99324b70c2ac079fcf484a50.tar.gz
spark-767d288b6b33a79d99324b70c2ac079fcf484a50.tar.bz2
spark-767d288b6b33a79d99324b70c2ac079fcf484a50.zip
[SPARK-11655][CORE] Fix deadlock in handling of launcher stop().
The stop() callback was trying to close the launcher connection in the same thread that handles connection data, which ended up causing a deadlock. So avoid that by dispatching the stop() request in its own thread. On top of that, add some exception safety to a few parts of the code, and use "destroyForcibly" from Java 8 if it's available, to force kill the child process. The flip side is that "kill()" may not actually work if running Java 7. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #9633 from vanzin/SPARK-11655.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala20
2 files changed, 21 insertions, 11 deletions
diff --git a/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala b/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala
index 3ea984c501..a5d41a1eeb 100644
--- a/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala
+++ b/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala
@@ -21,7 +21,7 @@ import java.net.{InetAddress, Socket}
import org.apache.spark.SPARK_VERSION
import org.apache.spark.launcher.LauncherProtocol._
-import org.apache.spark.util.ThreadUtils
+import org.apache.spark.util.{ThreadUtils, Utils}
/**
* A class that can be used to talk to a launcher server. Users should extend this class to
@@ -88,12 +88,20 @@ private[spark] abstract class LauncherBackend {
*/
protected def onDisconnected() : Unit = { }
+ private def fireStopRequest(): Unit = {
+ val thread = LauncherBackend.threadFactory.newThread(new Runnable() {
+ override def run(): Unit = Utils.tryLogNonFatalError {
+ onStopRequest()
+ }
+ })
+ thread.start()
+ }
private class BackendConnection(s: Socket) extends LauncherConnection(s) {
override protected def handle(m: Message): Unit = m match {
case _: Stop =>
- onStopRequest()
+ fireStopRequest()
case _ =>
throw new IllegalArgumentException(s"Unexpected message type: ${m.getClass().getName()}")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 05d9bc92f2..5105475c76 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -191,17 +191,19 @@ private[spark] class SparkDeploySchedulerBackend(
}
private def stop(finalState: SparkAppHandle.State): Unit = synchronized {
- stopping = true
+ try {
+ stopping = true
- launcherBackend.setState(finalState)
- launcherBackend.close()
+ super.stop()
+ client.stop()
- super.stop()
- client.stop()
-
- val callback = shutdownCallback
- if (callback != null) {
- callback(this)
+ val callback = shutdownCallback
+ if (callback != null) {
+ callback(this)
+ }
+ } finally {
+ launcherBackend.setState(finalState)
+ launcherBackend.close()
}
}