aboutsummaryrefslogtreecommitdiff
path: root/launcher/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'launcher/src/main')
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java38
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java159
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/LauncherConnection.java110
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/LauncherProtocol.java93
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java341
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/NamedThreadFactory.java40
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java78
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java126
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java106
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java22
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/package-info.java38
11 files changed, 1126 insertions, 25 deletions
diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
index 610e8bdaaa..cf3729b7fe 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
@@ -47,7 +47,7 @@ abstract class AbstractCommandBuilder {
String javaHome;
String mainClass;
String master;
- String propertiesFile;
+ protected String propertiesFile;
final List<String> appArgs;
final List<String> jars;
final List<String> files;
@@ -55,6 +55,10 @@ abstract class AbstractCommandBuilder {
final Map<String, String> childEnv;
final Map<String, String> conf;
+ // The merged configuration for the application. Cached to avoid having to read / parse
+ // properties files multiple times.
+ private Map<String, String> effectiveConfig;
+
public AbstractCommandBuilder() {
this.appArgs = new ArrayList<String>();
this.childEnv = new HashMap<String, String>();
@@ -257,12 +261,38 @@ abstract class AbstractCommandBuilder {
return path;
}
+ String getenv(String key) {
+ return firstNonEmpty(childEnv.get(key), System.getenv(key));
+ }
+
+ void setPropertiesFile(String path) {
+ effectiveConfig = null;
+ this.propertiesFile = path;
+ }
+
+ Map<String, String> getEffectiveConfig() throws IOException {
+ if (effectiveConfig == null) {
+ if (propertiesFile == null) {
+ effectiveConfig = conf;
+ } else {
+ effectiveConfig = new HashMap<>(conf);
+ Properties p = loadPropertiesFile();
+ for (String key : p.stringPropertyNames()) {
+ if (!effectiveConfig.containsKey(key)) {
+ effectiveConfig.put(key, p.getProperty(key));
+ }
+ }
+ }
+ }
+ return effectiveConfig;
+ }
+
/**
* Loads the configuration file for the application, if it exists. This is either the
* user-specified properties file, or the spark-defaults.conf file under the Spark configuration
* directory.
*/
- Properties loadPropertiesFile() throws IOException {
+ private Properties loadPropertiesFile() throws IOException {
Properties props = new Properties();
File propsFile;
if (propertiesFile != null) {
@@ -294,10 +324,6 @@ abstract class AbstractCommandBuilder {
return props;
}
- String getenv(String key) {
- return firstNonEmpty(childEnv.get(key), System.getenv(key));
- }
-
private String findAssembly() {
String sparkHome = getSparkHome();
File libdir;
diff --git a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
new file mode 100644
index 0000000000..de50f14fbd
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadFactory;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Handle implementation for monitoring apps started as a child process.
+ */
+class ChildProcAppHandle implements SparkAppHandle {
+
+ private static final Logger LOG = Logger.getLogger(ChildProcAppHandle.class.getName());
+ private static final ThreadFactory REDIRECTOR_FACTORY =
+ new NamedThreadFactory("launcher-proc-%d");
+
+ private final String secret;
+ private final LauncherServer server;
+
+ private Process childProc;
+ private boolean disposed;
+ private LauncherConnection connection;
+ private List<Listener> listeners;
+ private State state;
+ private String appId;
+ private OutputRedirector redirector;
+
+ ChildProcAppHandle(String secret, LauncherServer server) {
+ this.secret = secret;
+ this.server = server;
+ this.state = State.UNKNOWN;
+ }
+
+ @Override
+ public synchronized void addListener(Listener l) {
+ if (listeners == null) {
+ listeners = new ArrayList<>();
+ }
+ listeners.add(l);
+ }
+
+ @Override
+ public State getState() {
+ return state;
+ }
+
+ @Override
+ public String getAppId() {
+ return appId;
+ }
+
+ @Override
+ public void stop() {
+ CommandBuilderUtils.checkState(connection != null, "Application is still not connected.");
+ try {
+ connection.send(new LauncherProtocol.Stop());
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ @Override
+ public synchronized void disconnect() {
+ if (!disposed) {
+ disposed = true;
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (IOException ioe) {
+ // no-op.
+ }
+ }
+ server.unregister(this);
+ if (redirector != null) {
+ redirector.stop();
+ }
+ }
+ }
+
+ @Override
+ public synchronized void kill() {
+ if (!disposed) {
+ disconnect();
+ }
+ if (childProc != null) {
+ childProc.destroy();
+ childProc = null;
+ }
+ }
+
+ String getSecret() {
+ return secret;
+ }
+
+ void setChildProc(Process childProc, String loggerName) {
+ this.childProc = childProc;
+ this.redirector = new OutputRedirector(childProc.getInputStream(), loggerName,
+ REDIRECTOR_FACTORY);
+ }
+
+ void setConnection(LauncherConnection connection) {
+ this.connection = connection;
+ }
+
+ LauncherServer getServer() {
+ return server;
+ }
+
+ LauncherConnection getConnection() {
+ return connection;
+ }
+
+ void setState(State s) {
+ if (!state.isFinal()) {
+ state = s;
+ fireEvent(false);
+ } else {
+ LOG.log(Level.WARNING, "Backend requested transition from final state {0} to {1}.",
+ new Object[] { state, s });
+ }
+ }
+
+ void setAppId(String appId) {
+ this.appId = appId;
+ fireEvent(true);
+ }
+
+ private synchronized void fireEvent(boolean isInfoChanged) {
+ if (listeners != null) {
+ for (Listener l : listeners) {
+ if (isInfoChanged) {
+ l.infoChanged(this);
+ } else {
+ l.stateChanged(this);
+ }
+ }
+ }
+ }
+
+}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/LauncherConnection.java b/launcher/src/main/java/org/apache/spark/launcher/LauncherConnection.java
new file mode 100644
index 0000000000..eec264909b
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherConnection.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.net.Socket;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import static org.apache.spark.launcher.LauncherProtocol.*;
+
+/**
+ * Encapsulates a connection between a launcher server and client. This takes care of the
+ * communication (sending and receiving messages), while processing of messages is left for
+ * the implementations.
+ */
+abstract class LauncherConnection implements Closeable, Runnable {
+
+ private static final Logger LOG = Logger.getLogger(LauncherConnection.class.getName());
+
+ private final Socket socket;
+ private final ObjectOutputStream out;
+
+ private volatile boolean closed;
+
+ LauncherConnection(Socket socket) throws IOException {
+ this.socket = socket;
+ this.out = new ObjectOutputStream(socket.getOutputStream());
+ this.closed = false;
+ }
+
+ protected abstract void handle(Message msg) throws IOException;
+
+ @Override
+ public void run() {
+ try {
+ ObjectInputStream in = new ObjectInputStream(socket.getInputStream());
+ while (!closed) {
+ Message msg = (Message) in.readObject();
+ handle(msg);
+ }
+ } catch (EOFException eof) {
+ // Remote side has closed the connection, just cleanup.
+ try {
+ close();
+ } catch (Exception unused) {
+ // no-op.
+ }
+ } catch (Exception e) {
+ if (!closed) {
+ LOG.log(Level.WARNING, "Error in inbound message handling.", e);
+ try {
+ close();
+ } catch (Exception unused) {
+ // no-op.
+ }
+ }
+ }
+ }
+
+ protected synchronized void send(Message msg) throws IOException {
+ try {
+ CommandBuilderUtils.checkState(!closed, "Disconnected.");
+ out.writeObject(msg);
+ out.flush();
+ } catch (IOException ioe) {
+ if (!closed) {
+ LOG.log(Level.WARNING, "Error when sending message.", ioe);
+ try {
+ close();
+ } catch (Exception unused) {
+ // no-op.
+ }
+ }
+ throw ioe;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!closed) {
+ synchronized (this) {
+ if (!closed) {
+ closed = true;
+ socket.close();
+ }
+ }
+ }
+ }
+
+}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/LauncherProtocol.java b/launcher/src/main/java/org/apache/spark/launcher/LauncherProtocol.java
new file mode 100644
index 0000000000..50f136497e
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherProtocol.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.net.Socket;
+import java.util.Map;
+
+/**
+ * Message definitions for the launcher communication protocol. These messages must remain
+ * backwards-compatible, so that the launcher can talk to older versions of Spark that support
+ * the protocol.
+ */
+final class LauncherProtocol {
+
+ /** Environment variable where the server port is stored. */
+ static final String ENV_LAUNCHER_PORT = "_SPARK_LAUNCHER_PORT";
+
+ /** Environment variable where the secret for connecting back to the server is stored. */
+ static final String ENV_LAUNCHER_SECRET = "_SPARK_LAUNCHER_SECRET";
+
+ static class Message implements Serializable {
+
+ }
+
+ /**
+ * Hello message, sent from client to server.
+ */
+ static class Hello extends Message {
+
+ final String secret;
+ final String sparkVersion;
+
+ Hello(String secret, String version) {
+ this.secret = secret;
+ this.sparkVersion = version;
+ }
+
+ }
+
+ /**
+ * SetAppId message, sent from client to server.
+ */
+ static class SetAppId extends Message {
+
+ final String appId;
+
+ SetAppId(String appId) {
+ this.appId = appId;
+ }
+
+ }
+
+ /**
+ * SetState message, sent from client to server.
+ */
+ static class SetState extends Message {
+
+ final SparkAppHandle.State state;
+
+ SetState(SparkAppHandle.State state) {
+ this.state = state;
+ }
+
+ }
+
+ /**
+ * Stop message, send from server to client to stop the application.
+ */
+ static class Stop extends Message {
+
+ }
+
+}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
new file mode 100644
index 0000000000..c5fd40816d
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import static org.apache.spark.launcher.LauncherProtocol.*;
+
+/**
+ * A server that listens locally for connections from client launched by the library. Each client
+ * has a secret that it needs to send to the server to identify itself and establish the session.
+ *
+ * I/O is currently blocking (one thread per client). Clients have a limited time to connect back
+ * to the server, otherwise the server will ignore the connection.
+ *
+ * === Architecture Overview ===
+ *
+ * The launcher server is used when Spark apps are launched as separate processes than the calling
+ * app. It looks more or less like the following:
+ *
+ * ----------------------- -----------------------
+ * | User App | spark-submit | Spark App |
+ * | | -------------------> | |
+ * | ------------| |------------- |
+ * | | | hello | | |
+ * | | L. Server |<----------------------| L. Backend | |
+ * | | | | | |
+ * | ------------- -----------------------
+ * | | | ^
+ * | v | |
+ * | -------------| |
+ * | | | <per-app channel> |
+ * | | App Handle |<------------------------------
+ * | | |
+ * -----------------------
+ *
+ * The server is started on demand and remains active while there are active or outstanding clients,
+ * to avoid opening too many ports when multiple clients are launched. Each client is given a unique
+ * secret, and have a limited amount of time to connect back
+ * ({@link SparkLauncher#CHILD_CONNECTION_TIMEOUT}), at which point the server will throw away
+ * that client's state. A client is only allowed to connect back to the server once.
+ *
+ * The launcher server listens on the localhost only, so it doesn't need access controls (aside from
+ * the per-app secret) nor encryption. It thus requires that the launched app has a local process
+ * that communicates with the server. In cluster mode, this means that the client that launches the
+ * application must remain alive for the duration of the application (or until the app handle is
+ * disconnected).
+ */
+class LauncherServer implements Closeable {
+
+ private static final Logger LOG = Logger.getLogger(LauncherServer.class.getName());
+ private static final String THREAD_NAME_FMT = "LauncherServer-%d";
+ private static final long DEFAULT_CONNECT_TIMEOUT = 10000L;
+
+ /** For creating secrets used for communication with child processes. */
+ private static final SecureRandom RND = new SecureRandom();
+
+ private static volatile LauncherServer serverInstance;
+
+ /**
+ * Creates a handle for an app to be launched. This method will start a server if one hasn't been
+ * started yet. The server is shared for multiple handles, and once all handles are disposed of,
+ * the server is shut down.
+ */
+ static synchronized ChildProcAppHandle newAppHandle() throws IOException {
+ LauncherServer server = serverInstance != null ? serverInstance : new LauncherServer();
+ server.ref();
+ serverInstance = server;
+
+ String secret = server.createSecret();
+ while (server.pending.containsKey(secret)) {
+ secret = server.createSecret();
+ }
+
+ return server.newAppHandle(secret);
+ }
+
+ static LauncherServer getServerInstance() {
+ return serverInstance;
+ }
+
+ private final AtomicLong refCount;
+ private final AtomicLong threadIds;
+ private final ConcurrentMap<String, ChildProcAppHandle> pending;
+ private final List<ServerConnection> clients;
+ private final ServerSocket server;
+ private final Thread serverThread;
+ private final ThreadFactory factory;
+ private final Timer timeoutTimer;
+
+ private volatile boolean running;
+
+ private LauncherServer() throws IOException {
+ this.refCount = new AtomicLong(0);
+
+ ServerSocket server = new ServerSocket();
+ try {
+ server.setReuseAddress(true);
+ server.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
+
+ this.clients = new ArrayList<ServerConnection>();
+ this.threadIds = new AtomicLong();
+ this.factory = new NamedThreadFactory(THREAD_NAME_FMT);
+ this.pending = new ConcurrentHashMap<>();
+ this.timeoutTimer = new Timer("LauncherServer-TimeoutTimer", true);
+ this.server = server;
+ this.running = true;
+
+ this.serverThread = factory.newThread(new Runnable() {
+ @Override
+ public void run() {
+ acceptConnections();
+ }
+ });
+ serverThread.start();
+ } catch (IOException ioe) {
+ close();
+ throw ioe;
+ } catch (Exception e) {
+ close();
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Creates a new app handle. The handle will wait for an incoming connection for a configurable
+ * amount of time, and if one doesn't arrive, it will transition to an error state.
+ */
+ ChildProcAppHandle newAppHandle(String secret) {
+ ChildProcAppHandle handle = new ChildProcAppHandle(secret, this);
+ ChildProcAppHandle existing = pending.putIfAbsent(secret, handle);
+ CommandBuilderUtils.checkState(existing == null, "Multiple handles with the same secret.");
+ return handle;
+ }
+
+ @Override
+ public void close() throws IOException {
+ synchronized (this) {
+ if (running) {
+ running = false;
+ timeoutTimer.cancel();
+ server.close();
+ synchronized (clients) {
+ List<ServerConnection> copy = new ArrayList<>(clients);
+ clients.clear();
+ for (ServerConnection client : copy) {
+ client.close();
+ }
+ }
+ }
+ }
+ if (serverThread != null) {
+ try {
+ serverThread.join();
+ } catch (InterruptedException ie) {
+ // no-op
+ }
+ }
+ }
+
+ void ref() {
+ refCount.incrementAndGet();
+ }
+
+ void unref() {
+ synchronized(LauncherServer.class) {
+ if (refCount.decrementAndGet() == 0) {
+ try {
+ close();
+ } catch (IOException ioe) {
+ // no-op.
+ } finally {
+ serverInstance = null;
+ }
+ }
+ }
+ }
+
+ int getPort() {
+ return server.getLocalPort();
+ }
+
+ /**
+ * Removes the client handle from the pending list (in case it's still there), and unrefs
+ * the server.
+ */
+ void unregister(ChildProcAppHandle handle) {
+ pending.remove(handle.getSecret());
+ unref();
+ }
+
+ private void acceptConnections() {
+ try {
+ while (running) {
+ final Socket client = server.accept();
+ TimerTask timeout = new TimerTask() {
+ @Override
+ public void run() {
+ LOG.warning("Timed out waiting for hello message from client.");
+ try {
+ client.close();
+ } catch (IOException ioe) {
+ // no-op.
+ }
+ }
+ };
+ ServerConnection clientConnection = new ServerConnection(client, timeout);
+ Thread clientThread = factory.newThread(clientConnection);
+ synchronized (timeout) {
+ clientThread.start();
+ synchronized (clients) {
+ clients.add(clientConnection);
+ }
+ timeoutTimer.schedule(timeout, getConnectionTimeout());
+ }
+ }
+ } catch (IOException ioe) {
+ if (running) {
+ LOG.log(Level.SEVERE, "Error in accept loop.", ioe);
+ }
+ }
+ }
+
+ private long getConnectionTimeout() {
+ String value = SparkLauncher.launcherConfig.get(SparkLauncher.CHILD_CONNECTION_TIMEOUT);
+ return (value != null) ? Long.parseLong(value) : DEFAULT_CONNECT_TIMEOUT;
+ }
+
+ private String createSecret() {
+ byte[] secret = new byte[128];
+ RND.nextBytes(secret);
+
+ StringBuilder sb = new StringBuilder();
+ for (byte b : secret) {
+ int ival = b >= 0 ? b : Byte.MAX_VALUE - b;
+ if (ival < 0x10) {
+ sb.append("0");
+ }
+ sb.append(Integer.toHexString(ival));
+ }
+ return sb.toString();
+ }
+
+ private class ServerConnection extends LauncherConnection {
+
+ private TimerTask timeout;
+ private ChildProcAppHandle handle;
+
+ ServerConnection(Socket socket, TimerTask timeout) throws IOException {
+ super(socket);
+ this.timeout = timeout;
+ }
+
+ @Override
+ protected void handle(Message msg) throws IOException {
+ try {
+ if (msg instanceof Hello) {
+ synchronized (timeout) {
+ timeout.cancel();
+ }
+ timeout = null;
+ Hello hello = (Hello) msg;
+ ChildProcAppHandle handle = pending.remove(hello.secret);
+ if (handle != null) {
+ handle.setState(SparkAppHandle.State.CONNECTED);
+ handle.setConnection(this);
+ this.handle = handle;
+ } else {
+ throw new IllegalArgumentException("Received Hello for unknown client.");
+ }
+ } else {
+ if (handle == null) {
+ throw new IllegalArgumentException("Expected hello, got: " +
+ msg != null ? msg.getClass().getName() : null);
+ }
+ if (msg instanceof SetAppId) {
+ SetAppId set = (SetAppId) msg;
+ handle.setAppId(set.appId);
+ } else if (msg instanceof SetState) {
+ handle.setState(((SetState)msg).state);
+ } else {
+ throw new IllegalArgumentException("Invalid message: " +
+ msg != null ? msg.getClass().getName() : null);
+ }
+ }
+ } catch (Exception e) {
+ LOG.log(Level.INFO, "Error handling message from client.", e);
+ if (timeout != null) {
+ timeout.cancel();
+ }
+ close();
+ } finally {
+ timeoutTimer.purge();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ synchronized (clients) {
+ clients.remove(this);
+ }
+ super.close();
+ if (handle != null) {
+ handle.disconnect();
+ }
+ }
+
+ }
+
+}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/NamedThreadFactory.java b/launcher/src/main/java/org/apache/spark/launcher/NamedThreadFactory.java
new file mode 100644
index 0000000000..995f4d73da
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/NamedThreadFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
+
+class NamedThreadFactory implements ThreadFactory {
+
+ private final String nameFormat;
+ private final AtomicLong threadIds;
+
+ NamedThreadFactory(String nameFormat) {
+ this.nameFormat = nameFormat;
+ this.threadIds = new AtomicLong();
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, String.format(nameFormat, threadIds.incrementAndGet()));
+ t.setDaemon(true);
+ return t;
+ }
+
+}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java b/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java
new file mode 100644
index 0000000000..6e7120167d
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.IOException;
+import java.util.concurrent.ThreadFactory;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Redirects lines read from a given input stream to a j.u.l.Logger (at INFO level).
+ */
+class OutputRedirector {
+
+ private final BufferedReader reader;
+ private final Logger sink;
+ private final Thread thread;
+
+ private volatile boolean active;
+
+ OutputRedirector(InputStream in, ThreadFactory tf) {
+ this(in, OutputRedirector.class.getName(), tf);
+ }
+
+ OutputRedirector(InputStream in, String loggerName, ThreadFactory tf) {
+ this.active = true;
+ this.reader = new BufferedReader(new InputStreamReader(in));
+ this.thread = tf.newThread(new Runnable() {
+ @Override
+ public void run() {
+ redirect();
+ }
+ });
+ this.sink = Logger.getLogger(loggerName);
+ thread.start();
+ }
+
+ private void redirect() {
+ try {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ if (active) {
+ sink.info(line.replaceFirst("\\s*$", ""));
+ }
+ }
+ } catch (IOException e) {
+ sink.log(Level.FINE, "Error reading child process output.", e);
+ }
+ }
+
+ /**
+ * This method just stops the output of the process from showing up in the local logs.
+ * The child's output will still be read (and, thus, the redirect thread will still be
+ * alive) to avoid the child process hanging because of lack of output buffer.
+ */
+ void stop() {
+ active = false;
+ }
+
+}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
new file mode 100644
index 0000000000..2896a91d5e
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+/**
+ * A handle to a running Spark application.
+ * <p/>
+ * Provides runtime information about the underlying Spark application, and actions to control it.
+ *
+ * @since 1.6.0
+ */
+public interface SparkAppHandle {
+
+ /**
+ * Represents the application's state. A state can be "final", in which case it will not change
+ * after it's reached, and means the application is not running anymore.
+ *
+ * @since 1.6.0
+ */
+ public enum State {
+ /** The application has not reported back yet. */
+ UNKNOWN(false),
+ /** The application has connected to the handle. */
+ CONNECTED(false),
+ /** The application has been submitted to the cluster. */
+ SUBMITTED(false),
+ /** The application is running. */
+ RUNNING(false),
+ /** The application finished with a successful status. */
+ FINISHED(true),
+ /** The application finished with a failed status. */
+ FAILED(true),
+ /** The application was killed. */
+ KILLED(true);
+
+ private final boolean isFinal;
+
+ State(boolean isFinal) {
+ this.isFinal = isFinal;
+ }
+
+ /**
+ * Whether this state is a final state, meaning the application is not running anymore
+ * once it's reached.
+ */
+ public boolean isFinal() {
+ return isFinal;
+ }
+ }
+
+ /**
+ * Adds a listener to be notified of changes to the handle's information. Listeners will be called
+ * from the thread processing updates from the application, so they should avoid blocking or
+ * long-running operations.
+ *
+ * @param l Listener to add.
+ */
+ void addListener(Listener l);
+
+ /** Returns the current application state. */
+ State getState();
+
+ /** Returns the application ID, or <code>null</code> if not yet known. */
+ String getAppId();
+
+ /**
+ * Asks the application to stop. This is best-effort, since the application may fail to receive
+ * or act on the command. Callers should watch for a state transition that indicates the
+ * application has really stopped.
+ */
+ void stop();
+
+ /**
+ * Tries to kill the underlying application. Implies {@link #disconnect()}. This will not send
+ * a {@link #stop()} message to the application, so it's recommended that users first try to
+ * stop the application cleanly and only resort to this method if that fails.
+ */
+ void kill();
+
+ /**
+ * Disconnects the handle from the application, without stopping it. After this method is called,
+ * the handle will not be able to communicate with the application anymore.
+ */
+ void disconnect();
+
+ /**
+ * Listener for updates to a handle's state. The callbacks do not receive information about
+ * what exactly has changed, just that an update has occurred.
+ *
+ * @since 1.6.0
+ */
+ public interface Listener {
+
+ /**
+ * Callback for changes in the handle's state.
+ *
+ * @param handle The updated handle.
+ * @see {@link SparkAppHandle#getState()}
+ */
+ void stateChanged(SparkAppHandle handle);
+
+ /**
+ * Callback for changes in any information that is not the handle's state.
+ *
+ * @param handle The updated handle.
+ */
+ void infoChanged(SparkAppHandle handle);
+
+ }
+
+}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
index 57993405e4..5d74b37033 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
@@ -21,8 +21,10 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.spark.launcher.CommandBuilderUtils.*;
@@ -58,6 +60,33 @@ public class SparkLauncher {
/** Configuration key for the number of executor CPU cores. */
public static final String EXECUTOR_CORES = "spark.executor.cores";
+ /** Logger name to use when launching a child process. */
+ public static final String CHILD_PROCESS_LOGGER_NAME = "spark.launcher.childProcLoggerName";
+
+ /**
+ * Maximum time (in ms) to wait for a child process to connect back to the launcher server
+ * when using @link{#start()}.
+ */
+ public static final String CHILD_CONNECTION_TIMEOUT = "spark.launcher.childConectionTimeout";
+
+ /** Used internally to create unique logger names. */
+ private static final AtomicInteger COUNTER = new AtomicInteger();
+
+ static final Map<String, String> launcherConfig = new HashMap<String, String>();
+
+ /**
+ * Set a configuration value for the launcher library. These config values do not affect the
+ * launched application, but rather the behavior of the launcher library itself when managing
+ * applications.
+ *
+ * @since 1.6.0
+ * @param name Config name.
+ * @param value Config value.
+ */
+ public static void setConfig(String name, String value) {
+ launcherConfig.put(name, value);
+ }
+
// Visible for testing.
final SparkSubmitCommandBuilder builder;
@@ -109,7 +138,7 @@ public class SparkLauncher {
*/
public SparkLauncher setPropertiesFile(String path) {
checkNotNull(path, "path");
- builder.propertiesFile = path;
+ builder.setPropertiesFile(path);
return this;
}
@@ -197,6 +226,7 @@ public class SparkLauncher {
* Use this method with caution. It is possible to create an invalid Spark command by passing
* unknown arguments to this method, since those are allowed for forward compatibility.
*
+ * @since 1.5.0
* @param arg Argument to add.
* @return This launcher.
*/
@@ -218,6 +248,7 @@ public class SparkLauncher {
* Use this method with caution. It is possible to create an invalid Spark command by passing
* unknown arguments to this method, since those are allowed for forward compatibility.
*
+ * @since 1.5.0
* @param name Name of argument to add.
* @param value Value of the argument.
* @return This launcher.
@@ -319,10 +350,81 @@ public class SparkLauncher {
/**
* Launches a sub-process that will start the configured Spark application.
+ * <p/>
+ * The {@link #startApplication(SparkAppHandle.Listener...)} method is preferred when launching
+ * Spark, since it provides better control of the child application.
*
* @return A process handle for the Spark app.
*/
public Process launch() throws IOException {
+ return createBuilder().start();
+ }
+
+ /**
+ * Starts a Spark application.
+ * <p/>
+ * This method returns a handle that provides information about the running application and can
+ * be used to do basic interaction with it.
+ * <p/>
+ * The returned handle assumes that the application will instantiate a single SparkContext
+ * during its lifetime. Once that context reports a final state (one that indicates the
+ * SparkContext has stopped), the handle will not perform new state transitions, so anything
+ * that happens after that cannot be monitored. If the underlying application is launched as
+ * a child process, {@link SparkAppHandle#kill()} can still be used to kill the child process.
+ * <p/>
+ * Currently, all applications are launched as child processes. The child's stdout and stderr
+ * are merged and written to a logger (see <code>java.util.logging</code>). The logger's name
+ * can be defined by setting {@link #CHILD_PROCESS_LOGGER_NAME} in the app's configuration. If
+ * that option is not set, the code will try to derive a name from the application's name or
+ * main class / script file. If those cannot be determined, an internal, unique name will be
+ * used. In all cases, the logger name will start with "org.apache.spark.launcher.app", to fit
+ * more easily into the configuration of commonly-used logging systems.
+ *
+ * @since 1.6.0
+ * @param listeners Listeners to add to the handle before the app is launched.
+ * @return A handle for the launched application.
+ */
+ public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) throws IOException {
+ ChildProcAppHandle handle = LauncherServer.newAppHandle();
+ for (SparkAppHandle.Listener l : listeners) {
+ handle.addListener(l);
+ }
+
+ String appName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
+ if (appName == null) {
+ if (builder.appName != null) {
+ appName = builder.appName;
+ } else if (builder.mainClass != null) {
+ int dot = builder.mainClass.lastIndexOf(".");
+ if (dot >= 0 && dot < builder.mainClass.length() - 1) {
+ appName = builder.mainClass.substring(dot + 1, builder.mainClass.length());
+ } else {
+ appName = builder.mainClass;
+ }
+ } else if (builder.appResource != null) {
+ appName = new File(builder.appResource).getName();
+ } else {
+ appName = String.valueOf(COUNTER.incrementAndGet());
+ }
+ }
+
+ String loggerPrefix = getClass().getPackage().getName();
+ String loggerName = String.format("%s.app.%s", loggerPrefix, appName);
+ ProcessBuilder pb = createBuilder().redirectErrorStream(true);
+ pb.environment().put(LauncherProtocol.ENV_LAUNCHER_PORT,
+ String.valueOf(LauncherServer.getServerInstance().getPort()));
+ pb.environment().put(LauncherProtocol.ENV_LAUNCHER_SECRET, handle.getSecret());
+ try {
+ handle.setChildProc(pb.start(), loggerName);
+ } catch (IOException ioe) {
+ handle.kill();
+ throw ioe;
+ }
+
+ return handle;
+ }
+
+ private ProcessBuilder createBuilder() {
List<String> cmd = new ArrayList<String>();
String script = isWindows() ? "spark-submit.cmd" : "spark-submit";
cmd.add(join(File.separator, builder.getSparkHome(), "bin", script));
@@ -343,7 +445,7 @@ public class SparkLauncher {
for (Map.Entry<String, String> e : builder.childEnv.entrySet()) {
pb.environment().put(e.getKey(), e.getValue());
}
- return pb.start();
+ return pb;
}
private static class ArgumentValidator extends SparkSubmitOptionParser {
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
index fc87814a59..39b46e0db8 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
@@ -188,10 +188,9 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
// Load the properties file and check whether spark-submit will be running the app's driver
// or just launching a cluster app. When running the driver, the JVM's argument will be
// modified to cover the driver's configuration.
- Properties props = loadPropertiesFile();
- boolean isClientMode = isClientMode(props);
- String extraClassPath = isClientMode ?
- firstNonEmptyValue(SparkLauncher.DRIVER_EXTRA_CLASSPATH, conf, props) : null;
+ Map<String, String> config = getEffectiveConfig();
+ boolean isClientMode = isClientMode(config);
+ String extraClassPath = isClientMode ? config.get(SparkLauncher.DRIVER_EXTRA_CLASSPATH) : null;
List<String> cmd = buildJavaCommand(extraClassPath);
// Take Thrift Server as daemon
@@ -212,14 +211,13 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
// Take Thrift Server as daemon
String tsMemory =
isThriftServer(mainClass) ? System.getenv("SPARK_DAEMON_MEMORY") : null;
- String memory = firstNonEmpty(tsMemory,
- firstNonEmptyValue(SparkLauncher.DRIVER_MEMORY, conf, props),
+ String memory = firstNonEmpty(tsMemory, config.get(SparkLauncher.DRIVER_MEMORY),
System.getenv("SPARK_DRIVER_MEMORY"), System.getenv("SPARK_MEM"), DEFAULT_MEM);
cmd.add("-Xms" + memory);
cmd.add("-Xmx" + memory);
- addOptionString(cmd, firstNonEmptyValue(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, conf, props));
+ addOptionString(cmd, config.get(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS));
mergeEnvPathList(env, getLibPathEnvName(),
- firstNonEmptyValue(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, conf, props));
+ config.get(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH));
}
addPermGenSizeOpt(cmd);
@@ -281,9 +279,8 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
private void constructEnvVarArgs(
Map<String, String> env,
String submitArgsEnvVariable) throws IOException {
- Properties props = loadPropertiesFile();
mergeEnvPathList(env, getLibPathEnvName(),
- firstNonEmptyValue(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, conf, props));
+ getEffectiveConfig().get(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH));
StringBuilder submitArgs = new StringBuilder();
for (String arg : buildSparkSubmitArgs()) {
@@ -295,9 +292,8 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
env.put(submitArgsEnvVariable, submitArgs.toString());
}
-
- private boolean isClientMode(Properties userProps) {
- String userMaster = firstNonEmpty(master, (String) userProps.get(SparkLauncher.SPARK_MASTER));
+ private boolean isClientMode(Map<String, String> userProps) {
+ String userMaster = firstNonEmpty(master, userProps.get(SparkLauncher.SPARK_MASTER));
// Default master is "local[*]", so assume client mode in that case.
return userMaster == null ||
"client".equals(deployMode) ||
diff --git a/launcher/src/main/java/org/apache/spark/launcher/package-info.java b/launcher/src/main/java/org/apache/spark/launcher/package-info.java
index 7c97dba511..d1ac39bdc7 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/package-info.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/package-info.java
@@ -17,17 +17,42 @@
/**
* Library for launching Spark applications.
- *
+ *
* <p>
* This library allows applications to launch Spark programmatically. There's only one entry
* point to the library - the {@link org.apache.spark.launcher.SparkLauncher} class.
* </p>
*
* <p>
- * To launch a Spark application, just instantiate a {@link org.apache.spark.launcher.SparkLauncher}
- * and configure the application to run. For example:
+ * The {@link org.apache.spark.launcher.SparkLauncher#startApplication(
+ * org.apache.spark.launcher.SparkAppHandle.Listener...)} can be used to start Spark and provide
+ * a handle to monitor and control the running application:
* </p>
- *
+ *
+ * <pre>
+ * {@code
+ * import org.apache.spark.launcher.SparkAppHandle;
+ * import org.apache.spark.launcher.SparkLauncher;
+ *
+ * public class MyLauncher {
+ * public static void main(String[] args) throws Exception {
+ * SparkAppHandle handle = new SparkLauncher()
+ * .setAppResource("/my/app.jar")
+ * .setMainClass("my.spark.app.Main")
+ * .setMaster("local")
+ * .setConf(SparkLauncher.DRIVER_MEMORY, "2g")
+ * .startApplication();
+ * // Use handle API to monitor / control application.
+ * }
+ * }
+ * }
+ * </pre>
+ *
+ * <p>
+ * It's also possible to launch a raw child process, using the
+ * {@link org.apache.spark.launcher.SparkLauncher#launch()} method:
+ * </p>
+ *
* <pre>
* {@code
* import org.apache.spark.launcher.SparkLauncher;
@@ -45,5 +70,10 @@
* }
* }
* </pre>
+ *
+ * <p>This method requires the calling code to manually manage the child process, including its
+ * output streams (to avoid possible deadlocks). It's recommended that
+ * {@link org.apache.spark.launcher.SparkLauncher#startApplication(
+ * org.apache.spark.launcher.SparkAppHandle.Listener...)} be used instead.</p>
*/
package org.apache.spark.launcher;