aboutsummaryrefslogtreecommitdiff
path: root/launcher
diff options
context:
space:
mode:
authorSubroto Sanyal <ssanyal@datameer.com>2016-06-06 16:05:40 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-06-06 16:05:40 -0700
commitc409e23abd128dad33557025f1e824ef47e6222f (patch)
tree4f016f241e0ae2f81a39275e7c357e8059293bb9 /launcher
parent36d3dfa59a1ec0af6118e0667b80e9b7628e2cb6 (diff)
downloadspark-c409e23abd128dad33557025f1e824ef47e6222f.tar.gz
spark-c409e23abd128dad33557025f1e824ef47e6222f.tar.bz2
spark-c409e23abd128dad33557025f1e824ef47e6222f.zip
[SPARK-15652][LAUNCHER] Added a new State (LOST) for the listeners of SparkLauncher
## What changes were proposed in this pull request? This situation can happen when the LauncherConnection gets an exception while reading through the socket and terminating silently without notifying making the client/listener think that the job is still in previous state. The fix force sends a notification to client that the job finished with unknown status and let client handle it accordingly. ## How was this patch tested? Added a unit test. Author: Subroto Sanyal <ssanyal@datameer.com> Closes #13497 from subrotosanyal/SPARK-15652-handle-spark-submit-jvm-crash.
Diffstat (limited to 'launcher')
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java4
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java4
-rw-r--r--launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java31
3 files changed, 38 insertions, 1 deletions
diff --git a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
index e3413fd665..28e9420b28 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
@@ -337,6 +337,10 @@ class LauncherServer implements Closeable {
}
super.close();
if (handle != null) {
+ if (!handle.getState().isFinal()) {
+ LOG.log(Level.WARNING, "Lost connection to spark application.");
+ handle.setState(SparkAppHandle.State.LOST);
+ }
handle.disconnect();
}
}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
index 625d026321..0aa7bd197d 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
@@ -46,7 +46,9 @@ public interface SparkAppHandle {
/** The application finished with a failed status. */
FAILED(true),
/** The application was killed. */
- KILLED(true);
+ KILLED(true),
+ /** The Spark Submit JVM exited with a unknown status. */
+ LOST(true);
private final boolean isFinal;
diff --git a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
index bfe1fcc87f..12f1a0ce2d 100644
--- a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
+++ b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
@@ -152,6 +152,37 @@ public class LauncherServerSuite extends BaseSuite {
}
}
+ @Test
+ public void testSparkSubmitVmShutsDown() throws Exception {
+ ChildProcAppHandle handle = LauncherServer.newAppHandle();
+ TestClient client = null;
+ final Semaphore semaphore = new Semaphore(0);
+ try {
+ Socket s = new Socket(InetAddress.getLoopbackAddress(),
+ LauncherServer.getServerInstance().getPort());
+ handle.addListener(new SparkAppHandle.Listener() {
+ public void stateChanged(SparkAppHandle handle) {
+ semaphore.release();
+ }
+ public void infoChanged(SparkAppHandle handle) {
+ semaphore.release();
+ }
+ });
+ client = new TestClient(s);
+ client.send(new Hello(handle.getSecret(), "1.4.0"));
+ assertTrue(semaphore.tryAcquire(30, TimeUnit.SECONDS));
+ // Make sure the server matched the client to the handle.
+ assertNotNull(handle.getConnection());
+ close(client);
+ assertTrue(semaphore.tryAcquire(30, TimeUnit.SECONDS));
+ assertEquals(SparkAppHandle.State.LOST, handle.getState());
+ } finally {
+ kill(handle);
+ close(client);
+ client.clientThread.join();
+ }
+ }
+
private void kill(SparkAppHandle handle) {
if (handle != null) {
handle.kill();