aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Duffy <root@aduffy.org>2016-07-19 17:08:38 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-07-19 17:08:38 -0700
commit004e29cba518684d239d2d1661dce7c894a79f14 (patch)
treec6ae39115d5ff888ad3a2bdb73919ac601a9e14f
parent2ae7b88a07140e012b6c60db3c4a2a8ca360c684 (diff)
downloadspark-004e29cba518684d239d2d1661dce7c894a79f14.tar.gz
spark-004e29cba518684d239d2d1661dce7c894a79f14.tar.bz2
spark-004e29cba518684d239d2d1661dce7c894a79f14.zip
[SPARK-14702] Make environment of SparkLauncher launched process more configurable
## What changes were proposed in this pull request? Adds a few public methods to `SparkLauncher` to allow configuring some extra features of the `ProcessBuilder`, including the working directory, output and error stream redirection. ## How was this patch tested? Unit testing + simple Spark driver programs Author: Andrew Duffy <root@aduffy.org> Closes #14201 from andreweduffy/feature/launcher.
-rw-r--r--core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java67
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java5
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java167
3 files changed, 208 insertions, 31 deletions
diff --git a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
index 8ca54b24d8..e393db06a0 100644
--- a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
+++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
@@ -21,6 +21,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
+import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,10 +41,15 @@ public class SparkLauncherSuite {
private static final Logger LOG = LoggerFactory.getLogger(SparkLauncherSuite.class);
private static final NamedThreadFactory TF = new NamedThreadFactory("SparkLauncherSuite-%d");
+ private SparkLauncher launcher;
+
+ @Before
+ public void configureLauncher() {
+ launcher = new SparkLauncher().setSparkHome(System.getProperty("spark.test.home"));
+ }
+
@Test
public void testSparkArgumentHandling() throws Exception {
- SparkLauncher launcher = new SparkLauncher()
- .setSparkHome(System.getProperty("spark.test.home"));
SparkSubmitOptionParser opts = new SparkSubmitOptionParser();
launcher.addSparkArg(opts.HELP);
@@ -85,14 +91,67 @@ public class SparkLauncherSuite {
assertEquals("bar", launcher.builder.conf.get("spark.foo"));
}
+ @Test(expected=IllegalStateException.class)
+ public void testRedirectTwiceFails() throws Exception {
+ launcher.setAppResource("fake-resource.jar")
+ .setMainClass("my.fake.class.Fake")
+ .redirectError()
+ .redirectError(ProcessBuilder.Redirect.PIPE)
+ .launch();
+ }
+
+ @Test(expected=IllegalStateException.class)
+ public void testRedirectToLogWithOthersFails() throws Exception {
+ launcher.setAppResource("fake-resource.jar")
+ .setMainClass("my.fake.class.Fake")
+ .redirectToLog("fakeLog")
+ .redirectError(ProcessBuilder.Redirect.PIPE)
+ .launch();
+ }
+
+ @Test
+ public void testRedirectErrorToOutput() throws Exception {
+ launcher.redirectError();
+ assertTrue(launcher.redirectErrorStream);
+ }
+
+ @Test
+ public void testRedirectsSimple() throws Exception {
+ launcher.redirectError(ProcessBuilder.Redirect.PIPE);
+ assertNotNull(launcher.errorStream);
+ assertEquals(launcher.errorStream.type(), ProcessBuilder.Redirect.Type.PIPE);
+
+ launcher.redirectOutput(ProcessBuilder.Redirect.PIPE);
+ assertNotNull(launcher.outputStream);
+ assertEquals(launcher.outputStream.type(), ProcessBuilder.Redirect.Type.PIPE);
+ }
+
+ @Test
+ public void testRedirectLastWins() throws Exception {
+ launcher.redirectError(ProcessBuilder.Redirect.PIPE)
+ .redirectError(ProcessBuilder.Redirect.INHERIT);
+ assertEquals(launcher.errorStream.type(), ProcessBuilder.Redirect.Type.INHERIT);
+
+ launcher.redirectOutput(ProcessBuilder.Redirect.PIPE)
+ .redirectOutput(ProcessBuilder.Redirect.INHERIT);
+ assertEquals(launcher.outputStream.type(), ProcessBuilder.Redirect.Type.INHERIT);
+ }
+
+ @Test
+ public void testRedirectToLog() throws Exception {
+ launcher.redirectToLog("fakeLogger");
+ assertTrue(launcher.redirectToLog);
+ assertTrue(launcher.builder.getEffectiveConfig()
+ .containsKey(SparkLauncher.CHILD_PROCESS_LOGGER_NAME));
+ }
+
@Test
public void testChildProcLauncher() throws Exception {
SparkSubmitOptionParser opts = new SparkSubmitOptionParser();
Map<String, String> env = new HashMap<>();
env.put("SPARK_PRINT_LAUNCH_COMMAND", "1");
- SparkLauncher launcher = new SparkLauncher(env)
- .setSparkHome(System.getProperty("spark.test.home"))
+ launcher
.setMaster("local")
.setAppResource(SparkLauncher.NO_RESOURCE)
.addSparkArg(opts.CONF,
diff --git a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
index 1bfda289de..c0779e1c4e 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.ThreadFactory;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -31,8 +30,6 @@ import java.util.logging.Logger;
class ChildProcAppHandle implements SparkAppHandle {
private static final Logger LOG = Logger.getLogger(ChildProcAppHandle.class.getName());
- private static final ThreadFactory REDIRECTOR_FACTORY =
- new NamedThreadFactory("launcher-proc-%d");
private final String secret;
private final LauncherServer server;
@@ -127,7 +124,7 @@ class ChildProcAppHandle implements SparkAppHandle {
void setChildProc(Process childProc, String loggerName) {
this.childProc = childProc;
this.redirector = new OutputRedirector(childProc.getInputStream(), loggerName,
- REDIRECTOR_FACTORY);
+ SparkLauncher.REDIRECTOR_FACTORY);
}
void setConnection(LauncherConnection connection) {
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
index 08873f5811..41f7f1f3ed 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.spark.launcher.CommandBuilderUtils.*;
@@ -82,6 +83,9 @@ public class SparkLauncher {
/** Used internally to create unique logger names. */
private static final AtomicInteger COUNTER = new AtomicInteger();
+ /** Factory for creating OutputRedirector threads. **/
+ static final ThreadFactory REDIRECTOR_FACTORY = new NamedThreadFactory("launcher-proc-%d");
+
static final Map<String, String> launcherConfig = new HashMap<>();
/**
@@ -99,6 +103,11 @@ public class SparkLauncher {
// Visible for testing.
final SparkSubmitCommandBuilder builder;
+ File workingDir;
+ boolean redirectToLog;
+ boolean redirectErrorStream;
+ ProcessBuilder.Redirect errorStream;
+ ProcessBuilder.Redirect outputStream;
public SparkLauncher() {
this(null);
@@ -359,6 +368,83 @@ public class SparkLauncher {
}
/**
+ * Sets the working directory of spark-submit.
+ *
+ * @param dir The directory to set as spark-submit's working directory.
+ * @return This launcher.
+ */
+ public SparkLauncher directory(File dir) {
+ workingDir = dir;
+ return this;
+ }
+
+ /**
+ * Specifies that stderr in spark-submit should be redirected to stdout.
+ *
+ * @return This launcher.
+ */
+ public SparkLauncher redirectError() {
+ redirectErrorStream = true;
+ return this;
+ }
+
+ /**
+ * Redirects error output to the specified Redirect.
+ *
+ * @param to The method of redirection.
+ * @return This launcher.
+ */
+ public SparkLauncher redirectError(ProcessBuilder.Redirect to) {
+ errorStream = to;
+ return this;
+ }
+
+ /**
+ * Redirects standard output to the specified Redirect.
+ *
+ * @param to The method of redirection.
+ * @return This launcher.
+ */
+ public SparkLauncher redirectOutput(ProcessBuilder.Redirect to) {
+ outputStream = to;
+ return this;
+ }
+
+ /**
+ * Redirects error output to the specified File.
+ *
+ * @param errFile The file to which stderr is written.
+ * @return This launcher.
+ */
+ public SparkLauncher redirectError(File errFile) {
+ errorStream = ProcessBuilder.Redirect.to(errFile);
+ return this;
+ }
+
+ /**
+ * Redirects error output to the specified File.
+ *
+ * @param outFile The file to which stdout is written.
+ * @return This launcher.
+ */
+ public SparkLauncher redirectOutput(File outFile) {
+ outputStream = ProcessBuilder.Redirect.to(outFile);
+ return this;
+ }
+
+ /**
+ * Sets all output to be logged and redirected to a logger with the specified name.
+ *
+ * @param loggerName The name of the logger to log stdout and stderr.
+ * @return This launcher.
+ */
+ public SparkLauncher redirectToLog(String loggerName) {
+ setConf(CHILD_PROCESS_LOGGER_NAME, loggerName);
+ redirectToLog = true;
+ return this;
+ }
+
+ /**
* Launches a sub-process that will start the configured Spark application.
* <p>
* The {@link #startApplication(SparkAppHandle.Listener...)} method is preferred when launching
@@ -367,7 +453,12 @@ public class SparkLauncher {
* @return A process handle for the Spark app.
*/
public Process launch() throws IOException {
- return createBuilder().start();
+ Process childProc = createBuilder().start();
+ if (redirectToLog) {
+ String loggerName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
+ new OutputRedirector(childProc.getInputStream(), loggerName, REDIRECTOR_FACTORY);
+ }
+ return childProc;
}
/**
@@ -383,12 +474,13 @@ public class SparkLauncher {
* a child process, {@link SparkAppHandle#kill()} can still be used to kill the child process.
* <p>
* Currently, all applications are launched as child processes. The child's stdout and stderr
- * are merged and written to a logger (see <code>java.util.logging</code>). The logger's name
- * can be defined by setting {@link #CHILD_PROCESS_LOGGER_NAME} in the app's configuration. If
- * that option is not set, the code will try to derive a name from the application's name or
- * main class / script file. If those cannot be determined, an internal, unique name will be
- * used. In all cases, the logger name will start with "org.apache.spark.launcher.app", to fit
- * more easily into the configuration of commonly-used logging systems.
+ * are merged and written to a logger (see <code>java.util.logging</code>) only if redirection
+ * has not otherwise been configured on this <code>SparkLauncher</code>. The logger's name can be
+ * defined by setting {@link #CHILD_PROCESS_LOGGER_NAME} in the app's configuration. If that
+ * option is not set, the code will try to derive a name from the application's name or main
+ * class / script file. If those cannot be determined, an internal, unique name will be used.
+ * In all cases, the logger name will start with "org.apache.spark.launcher.app", to fit more
+ * easily into the configuration of commonly-used logging systems.
*
* @since 1.6.0
* @param listeners Listeners to add to the handle before the app is launched.
@@ -400,27 +492,33 @@ public class SparkLauncher {
handle.addListener(l);
}
- String appName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
- if (appName == null) {
- if (builder.appName != null) {
- appName = builder.appName;
- } else if (builder.mainClass != null) {
- int dot = builder.mainClass.lastIndexOf(".");
- if (dot >= 0 && dot < builder.mainClass.length() - 1) {
- appName = builder.mainClass.substring(dot + 1, builder.mainClass.length());
+ String loggerName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
+ ProcessBuilder pb = createBuilder();
+ // Only setup stderr + stdout to logger redirection if user has not otherwise configured output
+ // redirection.
+ if (loggerName == null) {
+ String appName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
+ if (appName == null) {
+ if (builder.appName != null) {
+ appName = builder.appName;
+ } else if (builder.mainClass != null) {
+ int dot = builder.mainClass.lastIndexOf(".");
+ if (dot >= 0 && dot < builder.mainClass.length() - 1) {
+ appName = builder.mainClass.substring(dot + 1, builder.mainClass.length());
+ } else {
+ appName = builder.mainClass;
+ }
+ } else if (builder.appResource != null) {
+ appName = new File(builder.appResource).getName();
} else {
- appName = builder.mainClass;
+ appName = String.valueOf(COUNTER.incrementAndGet());
}
- } else if (builder.appResource != null) {
- appName = new File(builder.appResource).getName();
- } else {
- appName = String.valueOf(COUNTER.incrementAndGet());
}
+ String loggerPrefix = getClass().getPackage().getName();
+ loggerName = String.format("%s.app.%s", loggerPrefix, appName);
+ pb.redirectErrorStream(true);
}
- String loggerPrefix = getClass().getPackage().getName();
- String loggerName = String.format("%s.app.%s", loggerPrefix, appName);
- ProcessBuilder pb = createBuilder().redirectErrorStream(true);
pb.environment().put(LauncherProtocol.ENV_LAUNCHER_PORT,
String.valueOf(LauncherServer.getServerInstance().getPort()));
pb.environment().put(LauncherProtocol.ENV_LAUNCHER_SECRET, handle.getSecret());
@@ -455,6 +553,29 @@ public class SparkLauncher {
for (Map.Entry<String, String> e : builder.childEnv.entrySet()) {
pb.environment().put(e.getKey(), e.getValue());
}
+
+ if (workingDir != null) {
+ pb.directory(workingDir);
+ }
+
+ // Only one of redirectError and redirectError(...) can be specified.
+ // Similarly, if redirectToLog is specified, no other redirections should be specified.
+ checkState(!redirectErrorStream || errorStream == null,
+ "Cannot specify both redirectError() and redirectError(...) ");
+ checkState(!redirectToLog ||
+ (!redirectErrorStream && errorStream == null && outputStream == null),
+ "Cannot used redirectToLog() in conjunction with other redirection methods.");
+
+ if (redirectErrorStream || redirectToLog) {
+ pb.redirectErrorStream(true);
+ }
+ if (errorStream != null) {
+ pb.redirectError(errorStream);
+ }
+ if (outputStream != null) {
+ pb.redirectOutput(outputStream);
+ }
+
return pb;
}