aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-10-09 15:28:09 -0500
committerImran Rashid <irashid@cloudera.com>2015-10-09 15:28:09 -0500
commit015f7ef503d5544f79512b6333326749a1f0c48b (patch)
tree990942b40bd374f632c3954cd4aab3741dd17f63 /core/src/test
parent70f44ad2d836236c74e1336a7368982d5fe3abff (diff)
downloadspark-015f7ef503d5544f79512b6333326749a1f0c48b.tar.gz
spark-015f7ef503d5544f79512b6333326749a1f0c48b.tar.bz2
spark-015f7ef503d5544f79512b6333326749a1f0c48b.zip
[SPARK-8673] [LAUNCHER] API and infrastructure for communicating with child apps.
This change adds an API that encapsulates information about an app launched using the library. It also creates a socket-based communication layer for apps that are launched as child processes; the launching application listens for connections from launched apps, and once communication is established, the channel can be used to send updates to the launching app, or to send commands to the child app. The change also includes hooks for local, standalone/client and yarn masters. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #7052 from vanzin/SPARK-8673.
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java39
-rw-r--r--core/src/test/resources/log4j.properties11
-rw-r--r--core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala81
3 files changed, 102 insertions, 29 deletions
diff --git a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
index d0c26dd056..aa15e792e2 100644
--- a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
+++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
@@ -27,6 +27,7 @@ import java.util.Map;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.bridge.SLF4JBridgeHandler;
import static org.junit.Assert.*;
/**
@@ -34,7 +35,13 @@ import static org.junit.Assert.*;
*/
public class SparkLauncherSuite {
+ static {
+ SLF4JBridgeHandler.removeHandlersForRootLogger();
+ SLF4JBridgeHandler.install();
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(SparkLauncherSuite.class);
+ private static final NamedThreadFactory TF = new NamedThreadFactory("SparkLauncherSuite-%d");
@Test
public void testSparkArgumentHandling() throws Exception {
@@ -94,14 +101,15 @@ public class SparkLauncherSuite {
.addSparkArg(opts.CONF,
String.format("%s=-Dfoo=ShouldBeOverriddenBelow", SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS))
.setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
- "-Dfoo=bar -Dtest.name=-testChildProcLauncher")
+ "-Dfoo=bar -Dtest.appender=childproc")
.setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path"))
.addSparkArg(opts.CLASS, "ShouldBeOverriddenBelow")
.setMainClass(SparkLauncherTestApp.class.getName())
.addAppArgs("proc");
final Process app = launcher.launch();
- new Redirector("stdout", app.getInputStream()).start();
- new Redirector("stderr", app.getErrorStream()).start();
+
+ new OutputRedirector(app.getInputStream(), TF);
+ new OutputRedirector(app.getErrorStream(), TF);
assertEquals(0, app.waitFor());
}
@@ -116,29 +124,4 @@ public class SparkLauncherSuite {
}
- private static class Redirector extends Thread {
-
- private final InputStream in;
-
- Redirector(String name, InputStream in) {
- this.in = in;
- setName(name);
- setDaemon(true);
- }
-
- @Override
- public void run() {
- try {
- BufferedReader reader = new BufferedReader(new InputStreamReader(in, "UTF-8"));
- String line;
- while ((line = reader.readLine()) != null) {
- LOG.warn(line);
- }
- } catch (Exception e) {
- LOG.error("Error reading process output.", e);
- }
- }
-
- }
-
}
diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties
index eb3b1999eb..a54d27de91 100644
--- a/core/src/test/resources/log4j.properties
+++ b/core/src/test/resources/log4j.properties
@@ -16,13 +16,22 @@
#
# Set everything to be logged to the file target/unit-tests.log
-log4j.rootCategory=INFO, file
+test.appender=file
+log4j.rootCategory=INFO, ${test.appender}
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=true
log4j.appender.file.file=target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+# Tests that launch java subprocesses can set the "test.appender" system property to
+# "console" to avoid having the child process's logs overwrite the unit test's
+# log file.
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%t: %m%n
+
# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.spark-project.jetty=WARN
org.spark-project.jetty.LEVEL=WARN
diff --git a/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala b/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala
new file mode 100644
index 0000000000..07e8869833
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher
+
+import java.util.concurrent.TimeUnit
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import org.scalatest.Matchers
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.spark._
+import org.apache.spark.launcher._
+
+class LauncherBackendSuite extends SparkFunSuite with Matchers {
+
+ private val tests = Seq(
+ "local" -> "local",
+ "standalone/client" -> "local-cluster[1,1,1024]")
+
+ tests.foreach { case (name, master) =>
+ test(s"$name: launcher handle") {
+ testWithMaster(master)
+ }
+ }
+
+ private def testWithMaster(master: String): Unit = {
+ val env = new java.util.HashMap[String, String]()
+ env.put("SPARK_PRINT_LAUNCH_COMMAND", "1")
+ val handle = new SparkLauncher(env)
+ .setSparkHome(sys.props("spark.test.home"))
+ .setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path"))
+ .setConf("spark.ui.enabled", "false")
+ .setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, s"-Dtest.appender=console")
+ .setMaster(master)
+ .setAppResource("spark-internal")
+ .setMainClass(TestApp.getClass.getName().stripSuffix("$"))
+ .startApplication()
+
+ try {
+ eventually(timeout(10 seconds), interval(100 millis)) {
+ handle.getAppId() should not be (null)
+ }
+
+ handle.stop()
+
+ eventually(timeout(10 seconds), interval(100 millis)) {
+ handle.getState() should be (SparkAppHandle.State.KILLED)
+ }
+ } finally {
+ handle.kill()
+ }
+ }
+
+}
+
+object TestApp {
+
+ def main(args: Array[String]): Unit = {
+ new SparkContext(new SparkConf()).parallelize(Seq(1)).foreach { i =>
+ Thread.sleep(TimeUnit.SECONDS.toMillis(20))
+ }
+ }
+
+}