aboutsummaryrefslogtreecommitdiff
path: root/launcher
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-10-09 15:28:09 -0500
committerImran Rashid <irashid@cloudera.com>2015-10-09 15:28:09 -0500
commit015f7ef503d5544f79512b6333326749a1f0c48b (patch)
tree990942b40bd374f632c3954cd4aab3741dd17f63 /launcher
parent70f44ad2d836236c74e1336a7368982d5fe3abff (diff)
downloadspark-015f7ef503d5544f79512b6333326749a1f0c48b.tar.gz
spark-015f7ef503d5544f79512b6333326749a1f0c48b.tar.bz2
spark-015f7ef503d5544f79512b6333326749a1f0c48b.zip
[SPARK-8673] [LAUNCHER] API and infrastructure for communicating with child apps.
This change adds an API that encapsulates information about an app launched using the library. It also creates a socket-based communication layer for apps that are launched as child processes; the launching application listens for connections from launched apps, and once communication is established, the channel can be used to send updates to the launching app, or to send commands to the child app. The change also includes hooks for local, standalone/client and yarn masters. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #7052 from vanzin/SPARK-8673.
Diffstat (limited to 'launcher')
-rw-r--r--launcher/pom.xml5
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java38
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java159
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/LauncherConnection.java110
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/LauncherProtocol.java93
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java341
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/NamedThreadFactory.java40
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java78
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java126
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java106
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java22
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/package-info.java38
-rw-r--r--launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java32
-rw-r--r--launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java188
-rw-r--r--launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java4
-rw-r--r--launcher/src/test/java/org/apache/spark/launcher/SparkSubmitOptionParserSuite.java2
-rw-r--r--launcher/src/test/resources/log4j.properties13
17 files changed, 1362 insertions, 33 deletions
diff --git a/launcher/pom.xml b/launcher/pom.xml
index d595d74642..5739bfc169 100644
--- a/launcher/pom.xml
+++ b/launcher/pom.xml
@@ -49,6 +49,11 @@
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
+ <artifactId>jul-to-slf4j</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>test</scope>
</dependency>
diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
index 610e8bdaaa..cf3729b7fe 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
@@ -47,7 +47,7 @@ abstract class AbstractCommandBuilder {
String javaHome;
String mainClass;
String master;
- String propertiesFile;
+ protected String propertiesFile;
final List<String> appArgs;
final List<String> jars;
final List<String> files;
@@ -55,6 +55,10 @@ abstract class AbstractCommandBuilder {
final Map<String, String> childEnv;
final Map<String, String> conf;
+ // The merged configuration for the application. Cached to avoid having to read / parse
+ // properties files multiple times.
+ private Map<String, String> effectiveConfig;
+
public AbstractCommandBuilder() {
this.appArgs = new ArrayList<String>();
this.childEnv = new HashMap<String, String>();
@@ -257,12 +261,38 @@ abstract class AbstractCommandBuilder {
return path;
}
+ String getenv(String key) {
+ return firstNonEmpty(childEnv.get(key), System.getenv(key));
+ }
+
+ void setPropertiesFile(String path) {
+ effectiveConfig = null;
+ this.propertiesFile = path;
+ }
+
+ Map<String, String> getEffectiveConfig() throws IOException {
+ if (effectiveConfig == null) {
+ if (propertiesFile == null) {
+ effectiveConfig = conf;
+ } else {
+ effectiveConfig = new HashMap<>(conf);
+ Properties p = loadPropertiesFile();
+ for (String key : p.stringPropertyNames()) {
+ if (!effectiveConfig.containsKey(key)) {
+ effectiveConfig.put(key, p.getProperty(key));
+ }
+ }
+ }
+ }
+ return effectiveConfig;
+ }
+
/**
* Loads the configuration file for the application, if it exists. This is either the
* user-specified properties file, or the spark-defaults.conf file under the Spark configuration
* directory.
*/
- Properties loadPropertiesFile() throws IOException {
+ private Properties loadPropertiesFile() throws IOException {
Properties props = new Properties();
File propsFile;
if (propertiesFile != null) {
@@ -294,10 +324,6 @@ abstract class AbstractCommandBuilder {
return props;
}
- String getenv(String key) {
- return firstNonEmpty(childEnv.get(key), System.getenv(key));
- }
-
private String findAssembly() {
String sparkHome = getSparkHome();
File libdir;
diff --git a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
new file mode 100644
index 0000000000..de50f14fbd
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadFactory;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Handle implementation for monitoring apps started as a child process.
+ */
+class ChildProcAppHandle implements SparkAppHandle {
+
+ private static final Logger LOG = Logger.getLogger(ChildProcAppHandle.class.getName());
+ private static final ThreadFactory REDIRECTOR_FACTORY =
+ new NamedThreadFactory("launcher-proc-%d");
+
+ private final String secret;
+ private final LauncherServer server;
+
+ private Process childProc;
+ private boolean disposed;
+ private LauncherConnection connection;
+ private List<Listener> listeners;
+ private State state;
+ private String appId;
+ private OutputRedirector redirector;
+
+ ChildProcAppHandle(String secret, LauncherServer server) {
+ this.secret = secret;
+ this.server = server;
+ this.state = State.UNKNOWN;
+ }
+
+ @Override
+ public synchronized void addListener(Listener l) {
+ if (listeners == null) {
+ listeners = new ArrayList<>();
+ }
+ listeners.add(l);
+ }
+
+ @Override
+ public State getState() {
+ return state;
+ }
+
+ @Override
+ public String getAppId() {
+ return appId;
+ }
+
+ @Override
+ public void stop() {
+ CommandBuilderUtils.checkState(connection != null, "Application is still not connected.");
+ try {
+ connection.send(new LauncherProtocol.Stop());
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ @Override
+ public synchronized void disconnect() {
+ if (!disposed) {
+ disposed = true;
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (IOException ioe) {
+ // no-op.
+ }
+ }
+ server.unregister(this);
+ if (redirector != null) {
+ redirector.stop();
+ }
+ }
+ }
+
+ @Override
+ public synchronized void kill() {
+ if (!disposed) {
+ disconnect();
+ }
+ if (childProc != null) {
+ childProc.destroy();
+ childProc = null;
+ }
+ }
+
+ String getSecret() {
+ return secret;
+ }
+
+ void setChildProc(Process childProc, String loggerName) {
+ this.childProc = childProc;
+ this.redirector = new OutputRedirector(childProc.getInputStream(), loggerName,
+ REDIRECTOR_FACTORY);
+ }
+
+ void setConnection(LauncherConnection connection) {
+ this.connection = connection;
+ }
+
+ LauncherServer getServer() {
+ return server;
+ }
+
+ LauncherConnection getConnection() {
+ return connection;
+ }
+
+ void setState(State s) {
+ if (!state.isFinal()) {
+ state = s;
+ fireEvent(false);
+ } else {
+ LOG.log(Level.WARNING, "Backend requested transition from final state {0} to {1}.",
+ new Object[] { state, s });
+ }
+ }
+
+ void setAppId(String appId) {
+ this.appId = appId;
+ fireEvent(true);
+ }
+
+ private synchronized void fireEvent(boolean isInfoChanged) {
+ if (listeners != null) {
+ for (Listener l : listeners) {
+ if (isInfoChanged) {
+ l.infoChanged(this);
+ } else {
+ l.stateChanged(this);
+ }
+ }
+ }
+ }
+
+}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/LauncherConnection.java b/launcher/src/main/java/org/apache/spark/launcher/LauncherConnection.java
new file mode 100644
index 0000000000..eec264909b
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherConnection.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.net.Socket;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import static org.apache.spark.launcher.LauncherProtocol.*;
+
+/**
+ * Encapsulates a connection between a launcher server and client. This takes care of the
+ * communication (sending and receiving messages), while processing of messages is left for
+ * the implementations.
+ */
+abstract class LauncherConnection implements Closeable, Runnable {
+
+ private static final Logger LOG = Logger.getLogger(LauncherConnection.class.getName());
+
+ private final Socket socket;
+ private final ObjectOutputStream out;
+
+ private volatile boolean closed;
+
+ LauncherConnection(Socket socket) throws IOException {
+ this.socket = socket;
+ this.out = new ObjectOutputStream(socket.getOutputStream());
+ this.closed = false;
+ }
+
+ protected abstract void handle(Message msg) throws IOException;
+
+ @Override
+ public void run() {
+ try {
+ ObjectInputStream in = new ObjectInputStream(socket.getInputStream());
+ while (!closed) {
+ Message msg = (Message) in.readObject();
+ handle(msg);
+ }
+ } catch (EOFException eof) {
+ // Remote side has closed the connection, just cleanup.
+ try {
+ close();
+ } catch (Exception unused) {
+ // no-op.
+ }
+ } catch (Exception e) {
+ if (!closed) {
+ LOG.log(Level.WARNING, "Error in inbound message handling.", e);
+ try {
+ close();
+ } catch (Exception unused) {
+ // no-op.
+ }
+ }
+ }
+ }
+
+ protected synchronized void send(Message msg) throws IOException {
+ try {
+ CommandBuilderUtils.checkState(!closed, "Disconnected.");
+ out.writeObject(msg);
+ out.flush();
+ } catch (IOException ioe) {
+ if (!closed) {
+ LOG.log(Level.WARNING, "Error when sending message.", ioe);
+ try {
+ close();
+ } catch (Exception unused) {
+ // no-op.
+ }
+ }
+ throw ioe;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!closed) {
+ synchronized (this) {
+ if (!closed) {
+ closed = true;
+ socket.close();
+ }
+ }
+ }
+ }
+
+}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/LauncherProtocol.java b/launcher/src/main/java/org/apache/spark/launcher/LauncherProtocol.java
new file mode 100644
index 0000000000..50f136497e
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherProtocol.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.net.Socket;
+import java.util.Map;
+
+/**
+ * Message definitions for the launcher communication protocol. These messages must remain
+ * backwards-compatible, so that the launcher can talk to older versions of Spark that support
+ * the protocol.
+ */
+final class LauncherProtocol {
+
+ /** Environment variable where the server port is stored. */
+ static final String ENV_LAUNCHER_PORT = "_SPARK_LAUNCHER_PORT";
+
+ /** Environment variable where the secret for connecting back to the server is stored. */
+ static final String ENV_LAUNCHER_SECRET = "_SPARK_LAUNCHER_SECRET";
+
+ static class Message implements Serializable {
+
+ }
+
+ /**
+ * Hello message, sent from client to server.
+ */
+ static class Hello extends Message {
+
+ final String secret;
+ final String sparkVersion;
+
+ Hello(String secret, String version) {
+ this.secret = secret;
+ this.sparkVersion = version;
+ }
+
+ }
+
+ /**
+ * SetAppId message, sent from client to server.
+ */
+ static class SetAppId extends Message {
+
+ final String appId;
+
+ SetAppId(String appId) {
+ this.appId = appId;
+ }
+
+ }
+
+ /**
+ * SetState message, sent from client to server.
+ */
+ static class SetState extends Message {
+
+ final SparkAppHandle.State state;
+
+ SetState(SparkAppHandle.State state) {
+ this.state = state;
+ }
+
+ }
+
+ /**
+ * Stop message, send from server to client to stop the application.
+ */
+ static class Stop extends Message {
+
+ }
+
+}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
new file mode 100644
index 0000000000..c5fd40816d
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import static org.apache.spark.launcher.LauncherProtocol.*;
+
+/**
+ * A server that listens locally for connections from client launched by the library. Each client
+ * has a secret that it needs to send to the server to identify itself and establish the session.
+ *
+ * I/O is currently blocking (one thread per client). Clients have a limited time to connect back
+ * to the server, otherwise the server will ignore the connection.
+ *
+ * === Architecture Overview ===
+ *
+ * The launcher server is used when Spark apps are launched as separate processes than the calling
+ * app. It looks more or less like the following:
+ *
+ * ----------------------- -----------------------
+ * | User App | spark-submit | Spark App |
+ * | | -------------------> | |
+ * | ------------| |------------- |
+ * | | | hello | | |
+ * | | L. Server |<----------------------| L. Backend | |
+ * | | | | | |
+ * | ------------- -----------------------
+ * | | | ^
+ * | v | |
+ * | -------------| |
+ * | | | <per-app channel> |
+ * | | App Handle |<------------------------------
+ * | | |
+ * -----------------------
+ *
+ * The server is started on demand and remains active while there are active or outstanding clients,
+ * to avoid opening too many ports when multiple clients are launched. Each client is given a unique
+ * secret, and have a limited amount of time to connect back
+ * ({@link SparkLauncher#CHILD_CONNECTION_TIMEOUT}), at which point the server will throw away
+ * that client's state. A client is only allowed to connect back to the server once.
+ *
+ * The launcher server listens on the localhost only, so it doesn't need access controls (aside from
+ * the per-app secret) nor encryption. It thus requires that the launched app has a local process
+ * that communicates with the server. In cluster mode, this means that the client that launches the
+ * application must remain alive for the duration of the application (or until the app handle is
+ * disconnected).
+ */
+class LauncherServer implements Closeable {
+
+ private static final Logger LOG = Logger.getLogger(LauncherServer.class.getName());
+ private static final String THREAD_NAME_FMT = "LauncherServer-%d";
+ private static final long DEFAULT_CONNECT_TIMEOUT = 10000L;
+
+ /** For creating secrets used for communication with child processes. */
+ private static final SecureRandom RND = new SecureRandom();
+
+ private static volatile LauncherServer serverInstance;
+
+ /**
+ * Creates a handle for an app to be launched. This method will start a server if one hasn't been
+ * started yet. The server is shared for multiple handles, and once all handles are disposed of,
+ * the server is shut down.
+ */
+ static synchronized ChildProcAppHandle newAppHandle() throws IOException {
+ LauncherServer server = serverInstance != null ? serverInstance : new LauncherServer();
+ server.ref();
+ serverInstance = server;
+
+ String secret = server.createSecret();
+ while (server.pending.containsKey(secret)) {
+ secret = server.createSecret();
+ }
+
+ return server.newAppHandle(secret);
+ }
+
+ static LauncherServer getServerInstance() {
+ return serverInstance;
+ }
+
+ private final AtomicLong refCount;
+ private final AtomicLong threadIds;
+ private final ConcurrentMap<String, ChildProcAppHandle> pending;
+ private final List<ServerConnection> clients;
+ private final ServerSocket server;
+ private final Thread serverThread;
+ private final ThreadFactory factory;
+ private final Timer timeoutTimer;
+
+ private volatile boolean running;
+
+ private LauncherServer() throws IOException {
+ this.refCount = new AtomicLong(0);
+
+ ServerSocket server = new ServerSocket();
+ try {
+ server.setReuseAddress(true);
+ server.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
+
+ this.clients = new ArrayList<ServerConnection>();
+ this.threadIds = new AtomicLong();
+ this.factory = new NamedThreadFactory(THREAD_NAME_FMT);
+ this.pending = new ConcurrentHashMap<>();
+ this.timeoutTimer = new Timer("LauncherServer-TimeoutTimer", true);
+ this.server = server;
+ this.running = true;
+
+ this.serverThread = factory.newThread(new Runnable() {
+ @Override
+ public void run() {
+ acceptConnections();
+ }
+ });
+ serverThread.start();
+ } catch (IOException ioe) {
+ close();
+ throw ioe;
+ } catch (Exception e) {
+ close();
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Creates a new app handle. The handle will wait for an incoming connection for a configurable
+ * amount of time, and if one doesn't arrive, it will transition to an error state.
+ */
+ ChildProcAppHandle newAppHandle(String secret) {
+ ChildProcAppHandle handle = new ChildProcAppHandle(secret, this);
+ ChildProcAppHandle existing = pending.putIfAbsent(secret, handle);
+ CommandBuilderUtils.checkState(existing == null, "Multiple handles with the same secret.");
+ return handle;
+ }
+
+ @Override
+ public void close() throws IOException {
+ synchronized (this) {
+ if (running) {
+ running = false;
+ timeoutTimer.cancel();
+ server.close();
+ synchronized (clients) {
+ List<ServerConnection> copy = new ArrayList<>(clients);
+ clients.clear();
+ for (ServerConnection client : copy) {
+ client.close();
+ }
+ }
+ }
+ }
+ if (serverThread != null) {
+ try {
+ serverThread.join();
+ } catch (InterruptedException ie) {
+ // no-op
+ }
+ }
+ }
+
+ void ref() {
+ refCount.incrementAndGet();
+ }
+
+ void unref() {
+ synchronized(LauncherServer.class) {
+ if (refCount.decrementAndGet() == 0) {
+ try {
+ close();
+ } catch (IOException ioe) {
+ // no-op.
+ } finally {
+ serverInstance = null;
+ }
+ }
+ }
+ }
+
+ int getPort() {
+ return server.getLocalPort();
+ }
+
+ /**
+ * Removes the client handle from the pending list (in case it's still there), and unrefs
+ * the server.
+ */
+ void unregister(ChildProcAppHandle handle) {
+ pending.remove(handle.getSecret());
+ unref();
+ }
+
+ private void acceptConnections() {
+ try {
+ while (running) {
+ final Socket client = server.accept();
+ TimerTask timeout = new TimerTask() {
+ @Override
+ public void run() {
+ LOG.warning("Timed out waiting for hello message from client.");
+ try {
+ client.close();
+ } catch (IOException ioe) {
+ // no-op.
+ }
+ }
+ };
+ ServerConnection clientConnection = new ServerConnection(client, timeout);
+ Thread clientThread = factory.newThread(clientConnection);
+ synchronized (timeout) {
+ clientThread.start();
+ synchronized (clients) {
+ clients.add(clientConnection);
+ }
+ timeoutTimer.schedule(timeout, getConnectionTimeout());
+ }
+ }
+ } catch (IOException ioe) {
+ if (running) {
+ LOG.log(Level.SEVERE, "Error in accept loop.", ioe);
+ }
+ }
+ }
+
+ private long getConnectionTimeout() {
+ String value = SparkLauncher.launcherConfig.get(SparkLauncher.CHILD_CONNECTION_TIMEOUT);
+ return (value != null) ? Long.parseLong(value) : DEFAULT_CONNECT_TIMEOUT;
+ }
+
+ private String createSecret() {
+ byte[] secret = new byte[128];
+ RND.nextBytes(secret);
+
+ StringBuilder sb = new StringBuilder();
+ for (byte b : secret) {
+ int ival = b >= 0 ? b : Byte.MAX_VALUE - b;
+ if (ival < 0x10) {
+ sb.append("0");
+ }
+ sb.append(Integer.toHexString(ival));
+ }
+ return sb.toString();
+ }
+
+ private class ServerConnection extends LauncherConnection {
+
+ private TimerTask timeout;
+ private ChildProcAppHandle handle;
+
+ ServerConnection(Socket socket, TimerTask timeout) throws IOException {
+ super(socket);
+ this.timeout = timeout;
+ }
+
+ @Override
+ protected void handle(Message msg) throws IOException {
+ try {
+ if (msg instanceof Hello) {
+ synchronized (timeout) {
+ timeout.cancel();
+ }
+ timeout = null;
+ Hello hello = (Hello) msg;
+ ChildProcAppHandle handle = pending.remove(hello.secret);
+ if (handle != null) {
+ handle.setState(SparkAppHandle.State.CONNECTED);
+ handle.setConnection(this);
+ this.handle = handle;
+ } else {
+ throw new IllegalArgumentException("Received Hello for unknown client.");
+ }
+ } else {
+ if (handle == null) {
+ throw new IllegalArgumentException("Expected hello, got: " +
+ msg != null ? msg.getClass().getName() : null);
+ }
+ if (msg instanceof SetAppId) {
+ SetAppId set = (SetAppId) msg;
+ handle.setAppId(set.appId);
+ } else if (msg instanceof SetState) {
+ handle.setState(((SetState)msg).state);
+ } else {
+ throw new IllegalArgumentException("Invalid message: " +
+ msg != null ? msg.getClass().getName() : null);
+ }
+ }
+ } catch (Exception e) {
+ LOG.log(Level.INFO, "Error handling message from client.", e);
+ if (timeout != null) {
+ timeout.cancel();
+ }
+ close();
+ } finally {
+ timeoutTimer.purge();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ synchronized (clients) {
+ clients.remove(this);
+ }
+ super.close();
+ if (handle != null) {
+ handle.disconnect();
+ }
+ }
+
+ }
+
+}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/NamedThreadFactory.java b/launcher/src/main/java/org/apache/spark/launcher/NamedThreadFactory.java
new file mode 100644
index 0000000000..995f4d73da
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/NamedThreadFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
+
+class NamedThreadFactory implements ThreadFactory {
+
+ private final String nameFormat;
+ private final AtomicLong threadIds;
+
+ NamedThreadFactory(String nameFormat) {
+ this.nameFormat = nameFormat;
+ this.threadIds = new AtomicLong();
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, String.format(nameFormat, threadIds.incrementAndGet()));
+ t.setDaemon(true);
+ return t;
+ }
+
+}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java b/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java
new file mode 100644
index 0000000000..6e7120167d
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.IOException;
+import java.util.concurrent.ThreadFactory;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Redirects lines read from a given input stream to a j.u.l.Logger (at INFO level).
+ */
+class OutputRedirector {
+
+ private final BufferedReader reader;
+ private final Logger sink;
+ private final Thread thread;
+
+ private volatile boolean active;
+
+ OutputRedirector(InputStream in, ThreadFactory tf) {
+ this(in, OutputRedirector.class.getName(), tf);
+ }
+
+ OutputRedirector(InputStream in, String loggerName, ThreadFactory tf) {
+ this.active = true;
+ this.reader = new BufferedReader(new InputStreamReader(in));
+ this.thread = tf.newThread(new Runnable() {
+ @Override
+ public void run() {
+ redirect();
+ }
+ });
+ this.sink = Logger.getLogger(loggerName);
+ thread.start();
+ }
+
+ private void redirect() {
+ try {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ if (active) {
+ sink.info(line.replaceFirst("\\s*$", ""));
+ }
+ }
+ } catch (IOException e) {
+ sink.log(Level.FINE, "Error reading child process output.", e);
+ }
+ }
+
+ /**
+ * This method just stops the output of the process from showing up in the local logs.
+ * The child's output will still be read (and, thus, the redirect thread will still be
+ * alive) to avoid the child process hanging because of lack of output buffer.
+ */
+ void stop() {
+ active = false;
+ }
+
+}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
new file mode 100644
index 0000000000..2896a91d5e
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+/**
+ * A handle to a running Spark application.
+ * <p/>
+ * Provides runtime information about the underlying Spark application, and actions to control it.
+ *
+ * @since 1.6.0
+ */
+public interface SparkAppHandle {
+
+ /**
+ * Represents the application's state. A state can be "final", in which case it will not change
+ * after it's reached, and means the application is not running anymore.
+ *
+ * @since 1.6.0
+ */
+ public enum State {
+ /** The application has not reported back yet. */
+ UNKNOWN(false),
+ /** The application has connected to the handle. */
+ CONNECTED(false),
+ /** The application has been submitted to the cluster. */
+ SUBMITTED(false),
+ /** The application is running. */
+ RUNNING(false),
+ /** The application finished with a successful status. */
+ FINISHED(true),
+ /** The application finished with a failed status. */
+ FAILED(true),
+ /** The application was killed. */
+ KILLED(true);
+
+ private final boolean isFinal;
+
+ State(boolean isFinal) {
+ this.isFinal = isFinal;
+ }
+
+ /**
+ * Whether this state is a final state, meaning the application is not running anymore
+ * once it's reached.
+ */
+ public boolean isFinal() {
+ return isFinal;
+ }
+ }
+
+ /**
+ * Adds a listener to be notified of changes to the handle's information. Listeners will be called
+ * from the thread processing updates from the application, so they should avoid blocking or
+ * long-running operations.
+ *
+ * @param l Listener to add.
+ */
+ void addListener(Listener l);
+
+ /** Returns the current application state. */
+ State getState();
+
+ /** Returns the application ID, or <code>null</code> if not yet known. */
+ String getAppId();
+
+ /**
+ * Asks the application to stop. This is best-effort, since the application may fail to receive
+ * or act on the command. Callers should watch for a state transition that indicates the
+ * application has really stopped.
+ */
+ void stop();
+
+ /**
+ * Tries to kill the underlying application. Implies {@link #disconnect()}. This will not send
+ * a {@link #stop()} message to the application, so it's recommended that users first try to
+ * stop the application cleanly and only resort to this method if that fails.
+ */
+ void kill();
+
+ /**
+ * Disconnects the handle from the application, without stopping it. After this method is called,
+ * the handle will not be able to communicate with the application anymore.
+ */
+ void disconnect();
+
+ /**
+ * Listener for updates to a handle's state. The callbacks do not receive information about
+ * what exactly has changed, just that an update has occurred.
+ *
+ * @since 1.6.0
+ */
+ public interface Listener {
+
+ /**
+ * Callback for changes in the handle's state.
+ *
+ * @param handle The updated handle.
+ * @see {@link SparkAppHandle#getState()}
+ */
+ void stateChanged(SparkAppHandle handle);
+
+ /**
+ * Callback for changes in any information that is not the handle's state.
+ *
+ * @param handle The updated handle.
+ */
+ void infoChanged(SparkAppHandle handle);
+
+ }
+
+}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
index 57993405e4..5d74b37033 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
@@ -21,8 +21,10 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.spark.launcher.CommandBuilderUtils.*;
@@ -58,6 +60,33 @@ public class SparkLauncher {
/** Configuration key for the number of executor CPU cores. */
public static final String EXECUTOR_CORES = "spark.executor.cores";
+ /** Logger name to use when launching a child process. */
+ public static final String CHILD_PROCESS_LOGGER_NAME = "spark.launcher.childProcLoggerName";
+
+ /**
+ * Maximum time (in ms) to wait for a child process to connect back to the launcher server
+ * when using @link{#start()}.
+ */
+ public static final String CHILD_CONNECTION_TIMEOUT = "spark.launcher.childConectionTimeout";
+
+ /** Used internally to create unique logger names. */
+ private static final AtomicInteger COUNTER = new AtomicInteger();
+
+ static final Map<String, String> launcherConfig = new HashMap<String, String>();
+
+ /**
+ * Set a configuration value for the launcher library. These config values do not affect the
+ * launched application, but rather the behavior of the launcher library itself when managing
+ * applications.
+ *
+ * @since 1.6.0
+ * @param name Config name.
+ * @param value Config value.
+ */
+ public static void setConfig(String name, String value) {
+ launcherConfig.put(name, value);
+ }
+
// Visible for testing.
final SparkSubmitCommandBuilder builder;
@@ -109,7 +138,7 @@ public class SparkLauncher {
*/
public SparkLauncher setPropertiesFile(String path) {
checkNotNull(path, "path");
- builder.propertiesFile = path;
+ builder.setPropertiesFile(path);
return this;
}
@@ -197,6 +226,7 @@ public class SparkLauncher {
* Use this method with caution. It is possible to create an invalid Spark command by passing
* unknown arguments to this method, since those are allowed for forward compatibility.
*
+ * @since 1.5.0
* @param arg Argument to add.
* @return This launcher.
*/
@@ -218,6 +248,7 @@ public class SparkLauncher {
* Use this method with caution. It is possible to create an invalid Spark command by passing
* unknown arguments to this method, since those are allowed for forward compatibility.
*
+ * @since 1.5.0
* @param name Name of argument to add.
* @param value Value of the argument.
* @return This launcher.
@@ -319,10 +350,81 @@ public class SparkLauncher {
/**
* Launches a sub-process that will start the configured Spark application.
+ * <p/>
+ * The {@link #startApplication(SparkAppHandle.Listener...)} method is preferred when launching
+ * Spark, since it provides better control of the child application.
*
* @return A process handle for the Spark app.
*/
public Process launch() throws IOException {
+ return createBuilder().start();
+ }
+
+ /**
+ * Starts a Spark application.
+ * <p/>
+ * This method returns a handle that provides information about the running application and can
+ * be used to do basic interaction with it.
+ * <p/>
+ * The returned handle assumes that the application will instantiate a single SparkContext
+ * during its lifetime. Once that context reports a final state (one that indicates the
+ * SparkContext has stopped), the handle will not perform new state transitions, so anything
+ * that happens after that cannot be monitored. If the underlying application is launched as
+ * a child process, {@link SparkAppHandle#kill()} can still be used to kill the child process.
+ * <p/>
+ * Currently, all applications are launched as child processes. The child's stdout and stderr
+ * are merged and written to a logger (see <code>java.util.logging</code>). The logger's name
+ * can be defined by setting {@link #CHILD_PROCESS_LOGGER_NAME} in the app's configuration. If
+ * that option is not set, the code will try to derive a name from the application's name or
+ * main class / script file. If those cannot be determined, an internal, unique name will be
+ * used. In all cases, the logger name will start with "org.apache.spark.launcher.app", to fit
+ * more easily into the configuration of commonly-used logging systems.
+ *
+ * @since 1.6.0
+ * @param listeners Listeners to add to the handle before the app is launched.
+ * @return A handle for the launched application.
+ */
+ public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) throws IOException {
+ ChildProcAppHandle handle = LauncherServer.newAppHandle();
+ for (SparkAppHandle.Listener l : listeners) {
+ handle.addListener(l);
+ }
+
+ String appName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
+ if (appName == null) {
+ if (builder.appName != null) {
+ appName = builder.appName;
+ } else if (builder.mainClass != null) {
+ int dot = builder.mainClass.lastIndexOf(".");
+ if (dot >= 0 && dot < builder.mainClass.length() - 1) {
+ appName = builder.mainClass.substring(dot + 1, builder.mainClass.length());
+ } else {
+ appName = builder.mainClass;
+ }
+ } else if (builder.appResource != null) {
+ appName = new File(builder.appResource).getName();
+ } else {
+ appName = String.valueOf(COUNTER.incrementAndGet());
+ }
+ }
+
+ String loggerPrefix = getClass().getPackage().getName();
+ String loggerName = String.format("%s.app.%s", loggerPrefix, appName);
+ ProcessBuilder pb = createBuilder().redirectErrorStream(true);
+ pb.environment().put(LauncherProtocol.ENV_LAUNCHER_PORT,
+ String.valueOf(LauncherServer.getServerInstance().getPort()));
+ pb.environment().put(LauncherProtocol.ENV_LAUNCHER_SECRET, handle.getSecret());
+ try {
+ handle.setChildProc(pb.start(), loggerName);
+ } catch (IOException ioe) {
+ handle.kill();
+ throw ioe;
+ }
+
+ return handle;
+ }
+
+ private ProcessBuilder createBuilder() {
List<String> cmd = new ArrayList<String>();
String script = isWindows() ? "spark-submit.cmd" : "spark-submit";
cmd.add(join(File.separator, builder.getSparkHome(), "bin", script));
@@ -343,7 +445,7 @@ public class SparkLauncher {
for (Map.Entry<String, String> e : builder.childEnv.entrySet()) {
pb.environment().put(e.getKey(), e.getValue());
}
- return pb.start();
+ return pb;
}
private static class ArgumentValidator extends SparkSubmitOptionParser {
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
index fc87814a59..39b46e0db8 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
@@ -188,10 +188,9 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
// Load the properties file and check whether spark-submit will be running the app's driver
// or just launching a cluster app. When running the driver, the JVM's argument will be
// modified to cover the driver's configuration.
- Properties props = loadPropertiesFile();
- boolean isClientMode = isClientMode(props);
- String extraClassPath = isClientMode ?
- firstNonEmptyValue(SparkLauncher.DRIVER_EXTRA_CLASSPATH, conf, props) : null;
+ Map<String, String> config = getEffectiveConfig();
+ boolean isClientMode = isClientMode(config);
+ String extraClassPath = isClientMode ? config.get(SparkLauncher.DRIVER_EXTRA_CLASSPATH) : null;
List<String> cmd = buildJavaCommand(extraClassPath);
// Take Thrift Server as daemon
@@ -212,14 +211,13 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
// Take Thrift Server as daemon
String tsMemory =
isThriftServer(mainClass) ? System.getenv("SPARK_DAEMON_MEMORY") : null;
- String memory = firstNonEmpty(tsMemory,
- firstNonEmptyValue(SparkLauncher.DRIVER_MEMORY, conf, props),
+ String memory = firstNonEmpty(tsMemory, config.get(SparkLauncher.DRIVER_MEMORY),
System.getenv("SPARK_DRIVER_MEMORY"), System.getenv("SPARK_MEM"), DEFAULT_MEM);
cmd.add("-Xms" + memory);
cmd.add("-Xmx" + memory);
- addOptionString(cmd, firstNonEmptyValue(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, conf, props));
+ addOptionString(cmd, config.get(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS));
mergeEnvPathList(env, getLibPathEnvName(),
- firstNonEmptyValue(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, conf, props));
+ config.get(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH));
}
addPermGenSizeOpt(cmd);
@@ -281,9 +279,8 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
private void constructEnvVarArgs(
Map<String, String> env,
String submitArgsEnvVariable) throws IOException {
- Properties props = loadPropertiesFile();
mergeEnvPathList(env, getLibPathEnvName(),
- firstNonEmptyValue(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, conf, props));
+ getEffectiveConfig().get(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH));
StringBuilder submitArgs = new StringBuilder();
for (String arg : buildSparkSubmitArgs()) {
@@ -295,9 +292,8 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
env.put(submitArgsEnvVariable, submitArgs.toString());
}
-
- private boolean isClientMode(Properties userProps) {
- String userMaster = firstNonEmpty(master, (String) userProps.get(SparkLauncher.SPARK_MASTER));
+ private boolean isClientMode(Map<String, String> userProps) {
+ String userMaster = firstNonEmpty(master, userProps.get(SparkLauncher.SPARK_MASTER));
// Default master is "local[*]", so assume client mode in that case.
return userMaster == null ||
"client".equals(deployMode) ||
diff --git a/launcher/src/main/java/org/apache/spark/launcher/package-info.java b/launcher/src/main/java/org/apache/spark/launcher/package-info.java
index 7c97dba511..d1ac39bdc7 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/package-info.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/package-info.java
@@ -17,17 +17,42 @@
/**
* Library for launching Spark applications.
- *
+ *
* <p>
* This library allows applications to launch Spark programmatically. There's only one entry
* point to the library - the {@link org.apache.spark.launcher.SparkLauncher} class.
* </p>
*
* <p>
- * To launch a Spark application, just instantiate a {@link org.apache.spark.launcher.SparkLauncher}
- * and configure the application to run. For example:
+ * The {@link org.apache.spark.launcher.SparkLauncher#startApplication(
+ * org.apache.spark.launcher.SparkAppHandle.Listener...)} can be used to start Spark and provide
+ * a handle to monitor and control the running application:
* </p>
- *
+ *
+ * <pre>
+ * {@code
+ * import org.apache.spark.launcher.SparkAppHandle;
+ * import org.apache.spark.launcher.SparkLauncher;
+ *
+ * public class MyLauncher {
+ * public static void main(String[] args) throws Exception {
+ * SparkAppHandle handle = new SparkLauncher()
+ * .setAppResource("/my/app.jar")
+ * .setMainClass("my.spark.app.Main")
+ * .setMaster("local")
+ * .setConf(SparkLauncher.DRIVER_MEMORY, "2g")
+ * .startApplication();
+ * // Use handle API to monitor / control application.
+ * }
+ * }
+ * }
+ * </pre>
+ *
+ * <p>
+ * It's also possible to launch a raw child process, using the
+ * {@link org.apache.spark.launcher.SparkLauncher#launch()} method:
+ * </p>
+ *
* <pre>
* {@code
* import org.apache.spark.launcher.SparkLauncher;
@@ -45,5 +70,10 @@
* }
* }
* </pre>
+ *
+ * <p>This method requires the calling code to manually manage the child process, including its
+ * output streams (to avoid possible deadlocks). It's recommended that
+ * {@link org.apache.spark.launcher.SparkLauncher#startApplication(
+ * org.apache.spark.launcher.SparkAppHandle.Listener...)} be used instead.</p>
*/
package org.apache.spark.launcher;
diff --git a/launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java b/launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java
new file mode 100644
index 0000000000..23e2c64d6d
--- /dev/null
+++ b/launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import org.slf4j.bridge.SLF4JBridgeHandler;
+
+/**
+ * Handles configuring the JUL -> SLF4J bridge.
+ */
+class BaseSuite {
+
+ static {
+ SLF4JBridgeHandler.removeHandlersForRootLogger();
+ SLF4JBridgeHandler.install();
+ }
+
+}
diff --git a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
new file mode 100644
index 0000000000..27cd1061a1
--- /dev/null
+++ b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import static org.apache.spark.launcher.LauncherProtocol.*;
+
+public class LauncherServerSuite extends BaseSuite {
+
+ @Test
+ public void testLauncherServerReuse() throws Exception {
+ ChildProcAppHandle handle1 = null;
+ ChildProcAppHandle handle2 = null;
+ ChildProcAppHandle handle3 = null;
+
+ try {
+ handle1 = LauncherServer.newAppHandle();
+ handle2 = LauncherServer.newAppHandle();
+ LauncherServer server1 = handle1.getServer();
+ assertSame(server1, handle2.getServer());
+
+ handle1.kill();
+ handle2.kill();
+
+ handle3 = LauncherServer.newAppHandle();
+ assertNotSame(server1, handle3.getServer());
+
+ handle3.kill();
+
+ assertNull(LauncherServer.getServerInstance());
+ } finally {
+ kill(handle1);
+ kill(handle2);
+ kill(handle3);
+ }
+ }
+
+ @Test
+ public void testCommunication() throws Exception {
+ ChildProcAppHandle handle = LauncherServer.newAppHandle();
+ TestClient client = null;
+ try {
+ Socket s = new Socket(InetAddress.getLoopbackAddress(),
+ LauncherServer.getServerInstance().getPort());
+
+ final Object waitLock = new Object();
+ handle.addListener(new SparkAppHandle.Listener() {
+ @Override
+ public void stateChanged(SparkAppHandle handle) {
+ wakeUp();
+ }
+
+ @Override
+ public void infoChanged(SparkAppHandle handle) {
+ wakeUp();
+ }
+
+ private void wakeUp() {
+ synchronized (waitLock) {
+ waitLock.notifyAll();
+ }
+ }
+ });
+
+ client = new TestClient(s);
+ synchronized (waitLock) {
+ client.send(new Hello(handle.getSecret(), "1.4.0"));
+ waitLock.wait(TimeUnit.SECONDS.toMillis(10));
+ }
+
+ // Make sure the server matched the client to the handle.
+ assertNotNull(handle.getConnection());
+
+ synchronized (waitLock) {
+ client.send(new SetAppId("app-id"));
+ waitLock.wait(TimeUnit.SECONDS.toMillis(10));
+ }
+ assertEquals("app-id", handle.getAppId());
+
+ synchronized (waitLock) {
+ client.send(new SetState(SparkAppHandle.State.RUNNING));
+ waitLock.wait(TimeUnit.SECONDS.toMillis(10));
+ }
+ assertEquals(SparkAppHandle.State.RUNNING, handle.getState());
+
+ handle.stop();
+ Message stopMsg = client.inbound.poll(10, TimeUnit.SECONDS);
+ assertTrue(stopMsg instanceof Stop);
+ } finally {
+ kill(handle);
+ close(client);
+ client.clientThread.join();
+ }
+ }
+
+ @Test
+ public void testTimeout() throws Exception {
+ final long TEST_TIMEOUT = 10L;
+
+ ChildProcAppHandle handle = null;
+ TestClient client = null;
+ try {
+ SparkLauncher.setConfig(SparkLauncher.CHILD_CONNECTION_TIMEOUT, String.valueOf(TEST_TIMEOUT));
+
+ handle = LauncherServer.newAppHandle();
+
+ Socket s = new Socket(InetAddress.getLoopbackAddress(),
+ LauncherServer.getServerInstance().getPort());
+ client = new TestClient(s);
+
+ Thread.sleep(TEST_TIMEOUT * 10);
+ try {
+ client.send(new Hello(handle.getSecret(), "1.4.0"));
+ fail("Expected exception caused by connection timeout.");
+ } catch (IllegalStateException e) {
+ // Expected.
+ }
+ } finally {
+ SparkLauncher.launcherConfig.remove(SparkLauncher.CHILD_CONNECTION_TIMEOUT);
+ kill(handle);
+ close(client);
+ }
+ }
+
+ private void kill(SparkAppHandle handle) {
+ if (handle != null) {
+ handle.kill();
+ }
+ }
+
+ private void close(Closeable c) {
+ if (c != null) {
+ try {
+ c.close();
+ } catch (Exception e) {
+ // no-op.
+ }
+ }
+ }
+
+ private static class TestClient extends LauncherConnection {
+
+ final BlockingQueue<Message> inbound;
+ final Thread clientThread;
+
+ TestClient(Socket s) throws IOException {
+ super(s);
+ this.inbound = new LinkedBlockingQueue<Message>();
+ this.clientThread = new Thread(this);
+ clientThread.setName("TestClient");
+ clientThread.setDaemon(true);
+ clientThread.start();
+ }
+
+ @Override
+ protected void handle(Message msg) throws IOException {
+ inbound.offer(msg);
+ }
+
+ }
+
+}
diff --git a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
index 7329ac9f7f..d5397b0685 100644
--- a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
+++ b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
@@ -30,7 +30,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
-public class SparkSubmitCommandBuilderSuite {
+public class SparkSubmitCommandBuilderSuite extends BaseSuite {
private static File dummyPropsFile;
private static SparkSubmitOptionParser parser;
@@ -161,7 +161,7 @@ public class SparkSubmitCommandBuilderSuite {
launcher.appResource = "/foo";
launcher.appName = "MyApp";
launcher.mainClass = "my.Class";
- launcher.propertiesFile = dummyPropsFile.getAbsolutePath();
+ launcher.setPropertiesFile(dummyPropsFile.getAbsolutePath());
launcher.appArgs.add("foo");
launcher.appArgs.add("bar");
launcher.conf.put(SparkLauncher.DRIVER_MEMORY, "1g");
diff --git a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitOptionParserSuite.java b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitOptionParserSuite.java
index f3d2109917..3ee5b8cf96 100644
--- a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitOptionParserSuite.java
+++ b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitOptionParserSuite.java
@@ -28,7 +28,7 @@ import static org.mockito.Mockito.*;
import static org.apache.spark.launcher.SparkSubmitOptionParser.*;
-public class SparkSubmitOptionParserSuite {
+public class SparkSubmitOptionParserSuite extends BaseSuite {
private SparkSubmitOptionParser parser;
diff --git a/launcher/src/test/resources/log4j.properties b/launcher/src/test/resources/log4j.properties
index 67a6a98217..c64b1565e1 100644
--- a/launcher/src/test/resources/log4j.properties
+++ b/launcher/src/test/resources/log4j.properties
@@ -16,16 +16,19 @@
#
# Set everything to be logged to the file core/target/unit-tests.log
-log4j.rootCategory=INFO, file
+test.appender=file
+log4j.rootCategory=INFO, ${test.appender}
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
-
-# Some tests will set "test.name" to avoid overwriting the main log file.
-log4j.appender.file.file=target/unit-tests${test.name}.log
-
+log4j.appender.file.file=target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+log4j.appender.childproc=org.apache.log4j.ConsoleAppender
+log4j.appender.childproc.target=System.err
+log4j.appender.childproc.layout=org.apache.log4j.PatternLayout
+log4j.appender.childproc.layout.ConversionPattern=%t: %m%n
+
# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.spark-project.jetty=WARN
org.spark-project.jetty.LEVEL=WARN