aboutsummaryrefslogtreecommitdiff
path: root/launcher
diff options
context:
space:
mode:
Diffstat (limited to 'launcher')
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java4
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java4
-rw-r--r--launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java31
3 files changed, 38 insertions, 1 deletions
diff --git a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
index e3413fd665..28e9420b28 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
@@ -337,6 +337,10 @@ class LauncherServer implements Closeable {
}
super.close();
if (handle != null) {
+ if (!handle.getState().isFinal()) {
+ LOG.log(Level.WARNING, "Lost connection to spark application.");
+ handle.setState(SparkAppHandle.State.LOST);
+ }
handle.disconnect();
}
}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
index 625d026321..0aa7bd197d 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
@@ -46,7 +46,9 @@ public interface SparkAppHandle {
/** The application finished with a failed status. */
FAILED(true),
/** The application was killed. */
- KILLED(true);
+ KILLED(true),
+ /** The Spark Submit JVM exited with a unknown status. */
+ LOST(true);
private final boolean isFinal;
diff --git a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
index bfe1fcc87f..12f1a0ce2d 100644
--- a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
+++ b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
@@ -152,6 +152,37 @@ public class LauncherServerSuite extends BaseSuite {
}
}
+ @Test
+ public void testSparkSubmitVmShutsDown() throws Exception {
+ ChildProcAppHandle handle = LauncherServer.newAppHandle();
+ TestClient client = null;
+ final Semaphore semaphore = new Semaphore(0);
+ try {
+ Socket s = new Socket(InetAddress.getLoopbackAddress(),
+ LauncherServer.getServerInstance().getPort());
+ handle.addListener(new SparkAppHandle.Listener() {
+ public void stateChanged(SparkAppHandle handle) {
+ semaphore.release();
+ }
+ public void infoChanged(SparkAppHandle handle) {
+ semaphore.release();
+ }
+ });
+ client = new TestClient(s);
+ client.send(new Hello(handle.getSecret(), "1.4.0"));
+ assertTrue(semaphore.tryAcquire(30, TimeUnit.SECONDS));
+ // Make sure the server matched the client to the handle.
+ assertNotNull(handle.getConnection());
+ close(client);
+ assertTrue(semaphore.tryAcquire(30, TimeUnit.SECONDS));
+ assertEquals(SparkAppHandle.State.LOST, handle.getState());
+ } finally {
+ kill(handle);
+ close(client);
+ client.clientThread.join();
+ }
+ }
+
private void kill(SparkAppHandle handle) {
if (handle != null) {
handle.kill();