aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-11-12 14:29:16 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2015-11-12 14:29:16 -0800
commit767d288b6b33a79d99324b70c2ac079fcf484a50 (patch)
tree405830e799677dda7a8edaa469077fca9977e091
parentd292f74831de7e69c852ed26d9c15df85b4fb568 (diff)
downloadspark-767d288b6b33a79d99324b70c2ac079fcf484a50.tar.gz
spark-767d288b6b33a79d99324b70c2ac079fcf484a50.tar.bz2
spark-767d288b6b33a79d99324b70c2ac079fcf484a50.zip
[SPARK-11655][CORE] Fix deadlock in handling of launcher stop().
The stop() callback was trying to close the launcher connection in the same thread that handles connection data, which ended up causing a deadlock. So avoid that by dispatching the stop() request in its own thread. On top of that, add some exception safety to a few parts of the code, and use "destroyForcibly" from Java 8 if it's available, to force kill the child process. The flip side is that "kill()" may not actually work if running Java 7. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #9633 from vanzin/SPARK-11655.
-rw-r--r--core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala20
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java17
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java3
4 files changed, 39 insertions, 13 deletions
diff --git a/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala b/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala
index 3ea984c501..a5d41a1eeb 100644
--- a/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala
+++ b/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala
@@ -21,7 +21,7 @@ import java.net.{InetAddress, Socket}
import org.apache.spark.SPARK_VERSION
import org.apache.spark.launcher.LauncherProtocol._
-import org.apache.spark.util.ThreadUtils
+import org.apache.spark.util.{ThreadUtils, Utils}
/**
* A class that can be used to talk to a launcher server. Users should extend this class to
@@ -88,12 +88,20 @@ private[spark] abstract class LauncherBackend {
*/
protected def onDisconnected() : Unit = { }
+ private def fireStopRequest(): Unit = {
+ val thread = LauncherBackend.threadFactory.newThread(new Runnable() {
+ override def run(): Unit = Utils.tryLogNonFatalError {
+ onStopRequest()
+ }
+ })
+ thread.start()
+ }
private class BackendConnection(s: Socket) extends LauncherConnection(s) {
override protected def handle(m: Message): Unit = m match {
case _: Stop =>
- onStopRequest()
+ fireStopRequest()
case _ =>
throw new IllegalArgumentException(s"Unexpected message type: ${m.getClass().getName()}")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 05d9bc92f2..5105475c76 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -191,17 +191,19 @@ private[spark] class SparkDeploySchedulerBackend(
}
private def stop(finalState: SparkAppHandle.State): Unit = synchronized {
- stopping = true
+ try {
+ stopping = true
- launcherBackend.setState(finalState)
- launcherBackend.close()
+ super.stop()
+ client.stop()
- super.stop()
- client.stop()
-
- val callback = shutdownCallback
- if (callback != null) {
- callback(this)
+ val callback = shutdownCallback
+ if (callback != null) {
+ callback(this)
+ }
+ } finally {
+ launcherBackend.setState(finalState)
+ launcherBackend.close()
}
}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
index de50f14fbd..1bfda289de 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
@@ -18,6 +18,7 @@
package org.apache.spark.launcher;
import java.io.IOException;
+import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadFactory;
@@ -102,8 +103,20 @@ class ChildProcAppHandle implements SparkAppHandle {
disconnect();
}
if (childProc != null) {
- childProc.destroy();
- childProc = null;
+ try {
+ childProc.exitValue();
+ } catch (IllegalThreadStateException e) {
+ // Child is still alive. Try to use Java 8's "destroyForcibly()" if available,
+ // fall back to the old API if it's not there.
+ try {
+ Method destroy = childProc.getClass().getMethod("destroyForcibly");
+ destroy.invoke(childProc);
+ } catch (Exception inner) {
+ childProc.destroy();
+ }
+ } finally {
+ childProc = null;
+ }
}
}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
index 13dd9f1739..e9caf0b3cb 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
@@ -89,6 +89,9 @@ public interface SparkAppHandle {
* Tries to kill the underlying application. Implies {@link #disconnect()}. This will not send
* a {@link #stop()} message to the application, so it's recommended that users first try to
* stop the application cleanly and only resort to this method if that fails.
+ * <p>
+ * Note that if the application is running as a child process, this method fail to kill the
+ * process when using Java 7. This may happen if, for example, the application is deadlocked.
*/
void kill();