aboutsummaryrefslogtreecommitdiff
path: root/launcher
diff options
context:
space:
mode:
authorAndrew Duffy <root@aduffy.org>2016-07-19 17:08:38 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-07-19 17:08:38 -0700
commit004e29cba518684d239d2d1661dce7c894a79f14 (patch)
treec6ae39115d5ff888ad3a2bdb73919ac601a9e14f /launcher
parent2ae7b88a07140e012b6c60db3c4a2a8ca360c684 (diff)
downloadspark-004e29cba518684d239d2d1661dce7c894a79f14.tar.gz
spark-004e29cba518684d239d2d1661dce7c894a79f14.tar.bz2
spark-004e29cba518684d239d2d1661dce7c894a79f14.zip
[SPARK-14702] Make environment of SparkLauncher launched process more configurable
## What changes were proposed in this pull request? Adds a few public methods to `SparkLauncher` to allow configuring some extra features of the `ProcessBuilder`, including the working directory, output and error stream redirection. ## How was this patch tested? Unit testing + simple Spark driver programs Author: Andrew Duffy <root@aduffy.org> Closes #14201 from andreweduffy/feature/launcher.
Diffstat (limited to 'launcher')
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java5
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java167
2 files changed, 145 insertions, 27 deletions
diff --git a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
index 1bfda289de..c0779e1c4e 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.ThreadFactory;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -31,8 +30,6 @@ import java.util.logging.Logger;
class ChildProcAppHandle implements SparkAppHandle {
private static final Logger LOG = Logger.getLogger(ChildProcAppHandle.class.getName());
- private static final ThreadFactory REDIRECTOR_FACTORY =
- new NamedThreadFactory("launcher-proc-%d");
private final String secret;
private final LauncherServer server;
@@ -127,7 +124,7 @@ class ChildProcAppHandle implements SparkAppHandle {
void setChildProc(Process childProc, String loggerName) {
this.childProc = childProc;
this.redirector = new OutputRedirector(childProc.getInputStream(), loggerName,
- REDIRECTOR_FACTORY);
+ SparkLauncher.REDIRECTOR_FACTORY);
}
void setConnection(LauncherConnection connection) {
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
index 08873f5811..41f7f1f3ed 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.spark.launcher.CommandBuilderUtils.*;
@@ -82,6 +83,9 @@ public class SparkLauncher {
/** Used internally to create unique logger names. */
private static final AtomicInteger COUNTER = new AtomicInteger();
+ /** Factory for creating OutputRedirector threads. **/
+ static final ThreadFactory REDIRECTOR_FACTORY = new NamedThreadFactory("launcher-proc-%d");
+
static final Map<String, String> launcherConfig = new HashMap<>();
/**
@@ -99,6 +103,11 @@ public class SparkLauncher {
// Visible for testing.
final SparkSubmitCommandBuilder builder;
+ File workingDir;
+ boolean redirectToLog;
+ boolean redirectErrorStream;
+ ProcessBuilder.Redirect errorStream;
+ ProcessBuilder.Redirect outputStream;
public SparkLauncher() {
this(null);
@@ -359,6 +368,83 @@ public class SparkLauncher {
}
/**
+ * Sets the working directory of spark-submit.
+ *
+ * @param dir The directory to set as spark-submit's working directory.
+ * @return This launcher.
+ */
+ public SparkLauncher directory(File dir) {
+ workingDir = dir;
+ return this;
+ }
+
+ /**
+ * Specifies that stderr in spark-submit should be redirected to stdout.
+ *
+ * @return This launcher.
+ */
+ public SparkLauncher redirectError() {
+ redirectErrorStream = true;
+ return this;
+ }
+
+ /**
+ * Redirects error output to the specified Redirect.
+ *
+ * @param to The method of redirection.
+ * @return This launcher.
+ */
+ public SparkLauncher redirectError(ProcessBuilder.Redirect to) {
+ errorStream = to;
+ return this;
+ }
+
+ /**
+ * Redirects standard output to the specified Redirect.
+ *
+ * @param to The method of redirection.
+ * @return This launcher.
+ */
+ public SparkLauncher redirectOutput(ProcessBuilder.Redirect to) {
+ outputStream = to;
+ return this;
+ }
+
+ /**
+ * Redirects error output to the specified File.
+ *
+ * @param errFile The file to which stderr is written.
+ * @return This launcher.
+ */
+ public SparkLauncher redirectError(File errFile) {
+ errorStream = ProcessBuilder.Redirect.to(errFile);
+ return this;
+ }
+
+ /**
+ * Redirects error output to the specified File.
+ *
+ * @param outFile The file to which stdout is written.
+ * @return This launcher.
+ */
+ public SparkLauncher redirectOutput(File outFile) {
+ outputStream = ProcessBuilder.Redirect.to(outFile);
+ return this;
+ }
+
+ /**
+ * Sets all output to be logged and redirected to a logger with the specified name.
+ *
+ * @param loggerName The name of the logger to log stdout and stderr.
+ * @return This launcher.
+ */
+ public SparkLauncher redirectToLog(String loggerName) {
+ setConf(CHILD_PROCESS_LOGGER_NAME, loggerName);
+ redirectToLog = true;
+ return this;
+ }
+
+ /**
* Launches a sub-process that will start the configured Spark application.
* <p>
* The {@link #startApplication(SparkAppHandle.Listener...)} method is preferred when launching
@@ -367,7 +453,12 @@ public class SparkLauncher {
* @return A process handle for the Spark app.
*/
public Process launch() throws IOException {
- return createBuilder().start();
+ Process childProc = createBuilder().start();
+ if (redirectToLog) {
+ String loggerName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
+ new OutputRedirector(childProc.getInputStream(), loggerName, REDIRECTOR_FACTORY);
+ }
+ return childProc;
}
/**
@@ -383,12 +474,13 @@ public class SparkLauncher {
* a child process, {@link SparkAppHandle#kill()} can still be used to kill the child process.
* <p>
* Currently, all applications are launched as child processes. The child's stdout and stderr
- * are merged and written to a logger (see <code>java.util.logging</code>). The logger's name
- * can be defined by setting {@link #CHILD_PROCESS_LOGGER_NAME} in the app's configuration. If
- * that option is not set, the code will try to derive a name from the application's name or
- * main class / script file. If those cannot be determined, an internal, unique name will be
- * used. In all cases, the logger name will start with "org.apache.spark.launcher.app", to fit
- * more easily into the configuration of commonly-used logging systems.
+ * are merged and written to a logger (see <code>java.util.logging</code>) only if redirection
+ * has not otherwise been configured on this <code>SparkLauncher</code>. The logger's name can be
+ * defined by setting {@link #CHILD_PROCESS_LOGGER_NAME} in the app's configuration. If that
+ * option is not set, the code will try to derive a name from the application's name or main
+ * class / script file. If those cannot be determined, an internal, unique name will be used.
+ * In all cases, the logger name will start with "org.apache.spark.launcher.app", to fit more
+ * easily into the configuration of commonly-used logging systems.
*
* @since 1.6.0
* @param listeners Listeners to add to the handle before the app is launched.
@@ -400,27 +492,33 @@ public class SparkLauncher {
handle.addListener(l);
}
- String appName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
- if (appName == null) {
- if (builder.appName != null) {
- appName = builder.appName;
- } else if (builder.mainClass != null) {
- int dot = builder.mainClass.lastIndexOf(".");
- if (dot >= 0 && dot < builder.mainClass.length() - 1) {
- appName = builder.mainClass.substring(dot + 1, builder.mainClass.length());
+ String loggerName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
+ ProcessBuilder pb = createBuilder();
+ // Only setup stderr + stdout to logger redirection if user has not otherwise configured output
+ // redirection.
+ if (loggerName == null) {
+ String appName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
+ if (appName == null) {
+ if (builder.appName != null) {
+ appName = builder.appName;
+ } else if (builder.mainClass != null) {
+ int dot = builder.mainClass.lastIndexOf(".");
+ if (dot >= 0 && dot < builder.mainClass.length() - 1) {
+ appName = builder.mainClass.substring(dot + 1, builder.mainClass.length());
+ } else {
+ appName = builder.mainClass;
+ }
+ } else if (builder.appResource != null) {
+ appName = new File(builder.appResource).getName();
} else {
- appName = builder.mainClass;
+ appName = String.valueOf(COUNTER.incrementAndGet());
}
- } else if (builder.appResource != null) {
- appName = new File(builder.appResource).getName();
- } else {
- appName = String.valueOf(COUNTER.incrementAndGet());
}
+ String loggerPrefix = getClass().getPackage().getName();
+ loggerName = String.format("%s.app.%s", loggerPrefix, appName);
+ pb.redirectErrorStream(true);
}
- String loggerPrefix = getClass().getPackage().getName();
- String loggerName = String.format("%s.app.%s", loggerPrefix, appName);
- ProcessBuilder pb = createBuilder().redirectErrorStream(true);
pb.environment().put(LauncherProtocol.ENV_LAUNCHER_PORT,
String.valueOf(LauncherServer.getServerInstance().getPort()));
pb.environment().put(LauncherProtocol.ENV_LAUNCHER_SECRET, handle.getSecret());
@@ -455,6 +553,29 @@ public class SparkLauncher {
for (Map.Entry<String, String> e : builder.childEnv.entrySet()) {
pb.environment().put(e.getKey(), e.getValue());
}
+
+ if (workingDir != null) {
+ pb.directory(workingDir);
+ }
+
+ // Only one of redirectError and redirectError(...) can be specified.
+ // Similarly, if redirectToLog is specified, no other redirections should be specified.
+ checkState(!redirectErrorStream || errorStream == null,
+ "Cannot specify both redirectError() and redirectError(...) ");
+ checkState(!redirectToLog ||
+ (!redirectErrorStream && errorStream == null && outputStream == null),
+ "Cannot used redirectToLog() in conjunction with other redirection methods.");
+
+ if (redirectErrorStream || redirectToLog) {
+ pb.redirectErrorStream(true);
+ }
+ if (errorStream != null) {
+ pb.redirectError(errorStream);
+ }
+ if (outputStream != null) {
+ pb.redirectOutput(outputStream);
+ }
+
return pb;
}