aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-10-09 15:28:09 -0500
committerImran Rashid <irashid@cloudera.com>2015-10-09 15:28:09 -0500
commit015f7ef503d5544f79512b6333326749a1f0c48b (patch)
tree990942b40bd374f632c3954cd4aab3741dd17f63 /core
parent70f44ad2d836236c74e1336a7368982d5fe3abff (diff)
downloadspark-015f7ef503d5544f79512b6333326749a1f0c48b.tar.gz
spark-015f7ef503d5544f79512b6333326749a1f0c48b.tar.bz2
spark-015f7ef503d5544f79512b6333326749a1f0c48b.zip
[SPARK-8673] [LAUNCHER] API and infrastructure for communicating with child apps.
This change adds an API that encapsulates information about an app launched using the library. It also creates a socket-based communication layer for apps that are launched as child processes; the launching application listens for connections from launched apps, and once communication is established, the channel can be used to send updates to the launching app, or to send commands to the child app. The change also includes hooks for local, standalone/client and yarn masters. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #7052 from vanzin/SPARK-8673.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala119
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala35
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala19
-rw-r--r--core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java39
-rw-r--r--core/src/test/resources/log4j.properties11
-rw-r--r--core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala81
6 files changed, 265 insertions, 39 deletions
diff --git a/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala b/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala
new file mode 100644
index 0000000000..3ea984c501
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher
+
+import java.net.{InetAddress, Socket}
+
+import org.apache.spark.SPARK_VERSION
+import org.apache.spark.launcher.LauncherProtocol._
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * A class that can be used to talk to a launcher server. Users should extend this class to
+ * provide implementation for the abstract methods.
+ *
+ * See `LauncherServer` for an explanation of how launcher communication works.
+ */
+private[spark] abstract class LauncherBackend {
+
+ private var clientThread: Thread = _
+ private var connection: BackendConnection = _
+ private var lastState: SparkAppHandle.State = _
+ @volatile private var _isConnected = false
+
+ def connect(): Unit = {
+ val port = sys.env.get(LauncherProtocol.ENV_LAUNCHER_PORT).map(_.toInt)
+ val secret = sys.env.get(LauncherProtocol.ENV_LAUNCHER_SECRET)
+ if (port != None && secret != None) {
+ val s = new Socket(InetAddress.getLoopbackAddress(), port.get)
+ connection = new BackendConnection(s)
+ connection.send(new Hello(secret.get, SPARK_VERSION))
+ clientThread = LauncherBackend.threadFactory.newThread(connection)
+ clientThread.start()
+ _isConnected = true
+ }
+ }
+
+ def close(): Unit = {
+ if (connection != null) {
+ try {
+ connection.close()
+ } finally {
+ if (clientThread != null) {
+ clientThread.join()
+ }
+ }
+ }
+ }
+
+ def setAppId(appId: String): Unit = {
+ if (connection != null) {
+ connection.send(new SetAppId(appId))
+ }
+ }
+
+ def setState(state: SparkAppHandle.State): Unit = {
+ if (connection != null && lastState != state) {
+ connection.send(new SetState(state))
+ lastState = state
+ }
+ }
+
+ /** Return whether the launcher handle is still connected to this backend. */
+ def isConnected(): Boolean = _isConnected
+
+ /**
+ * Implementations should provide this method, which should try to stop the application
+ * as gracefully as possible.
+ */
+ protected def onStopRequest(): Unit
+
+ /**
+ * Callback for when the launcher handle disconnects from this backend.
+ */
+ protected def onDisconnected() : Unit = { }
+
+
+ private class BackendConnection(s: Socket) extends LauncherConnection(s) {
+
+ override protected def handle(m: Message): Unit = m match {
+ case _: Stop =>
+ onStopRequest()
+
+ case _ =>
+ throw new IllegalArgumentException(s"Unexpected message type: ${m.getClass().getName()}")
+ }
+
+ override def close(): Unit = {
+ try {
+ super.close()
+ } finally {
+ onDisconnected()
+ _isConnected = false
+ }
+ }
+
+ }
+
+}
+
+private object LauncherBackend {
+
+ val threadFactory = ThreadUtils.namedThreadFactory("LauncherBackend")
+
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 27491ecf8b..2625c3e7ac 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -23,6 +23,7 @@ import org.apache.spark.rpc.RpcAddress
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.deploy.{ApplicationDescription, Command}
import org.apache.spark.deploy.client.{AppClient, AppClientListener}
+import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
import org.apache.spark.scheduler._
import org.apache.spark.util.Utils
@@ -36,6 +37,9 @@ private[spark] class SparkDeploySchedulerBackend(
private var client: AppClient = null
private var stopping = false
+ private val launcherBackend = new LauncherBackend() {
+ override protected def onStopRequest(): Unit = stop(SparkAppHandle.State.KILLED)
+ }
@volatile var shutdownCallback: SparkDeploySchedulerBackend => Unit = _
@volatile private var appId: String = _
@@ -47,6 +51,7 @@ private[spark] class SparkDeploySchedulerBackend(
override def start() {
super.start()
+ launcherBackend.connect()
// The endpoint for executors to talk to us
val driverUrl = rpcEnv.uriOf(SparkEnv.driverActorSystemName,
@@ -87,24 +92,20 @@ private[spark] class SparkDeploySchedulerBackend(
command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
client.start()
+ launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
waitForRegistration()
+ launcherBackend.setState(SparkAppHandle.State.RUNNING)
}
- override def stop() {
- stopping = true
- super.stop()
- client.stop()
-
- val callback = shutdownCallback
- if (callback != null) {
- callback(this)
- }
+ override def stop(): Unit = synchronized {
+ stop(SparkAppHandle.State.FINISHED)
}
override def connected(appId: String) {
logInfo("Connected to Spark cluster with app ID " + appId)
this.appId = appId
notifyContext()
+ launcherBackend.setAppId(appId)
}
override def disconnected() {
@@ -117,6 +118,7 @@ private[spark] class SparkDeploySchedulerBackend(
override def dead(reason: String) {
notifyContext()
if (!stopping) {
+ launcherBackend.setState(SparkAppHandle.State.KILLED)
logError("Application has been killed. Reason: " + reason)
try {
scheduler.error(reason)
@@ -188,4 +190,19 @@ private[spark] class SparkDeploySchedulerBackend(
registrationBarrier.release()
}
+ private def stop(finalState: SparkAppHandle.State): Unit = synchronized {
+ stopping = true
+
+ launcherBackend.setState(finalState)
+ launcherBackend.close()
+
+ super.stop()
+ client.stop()
+
+ val callback = shutdownCallback
+ if (callback != null) {
+ callback(this)
+ }
+ }
+
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 4d48fcfea4..c633d860ae 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.{Executor, ExecutorBackend}
+import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
@@ -103,6 +104,9 @@ private[spark] class LocalBackend(
private var localEndpoint: RpcEndpointRef = null
private val userClassPath = getUserClasspath(conf)
private val listenerBus = scheduler.sc.listenerBus
+ private val launcherBackend = new LauncherBackend() {
+ override def onStopRequest(): Unit = stop(SparkAppHandle.State.KILLED)
+ }
/**
* Returns a list of URLs representing the user classpath.
@@ -114,6 +118,8 @@ private[spark] class LocalBackend(
userClassPathStr.map(_.split(File.pathSeparator)).toSeq.flatten.map(new File(_).toURI.toURL)
}
+ launcherBackend.connect()
+
override def start() {
val rpcEnv = SparkEnv.get.rpcEnv
val executorEndpoint = new LocalEndpoint(rpcEnv, userClassPath, scheduler, this, totalCores)
@@ -122,10 +128,12 @@ private[spark] class LocalBackend(
System.currentTimeMillis,
executorEndpoint.localExecutorId,
new ExecutorInfo(executorEndpoint.localExecutorHostname, totalCores, Map.empty)))
+ launcherBackend.setAppId(appId)
+ launcherBackend.setState(SparkAppHandle.State.RUNNING)
}
override def stop() {
- localEndpoint.ask(StopExecutor)
+ stop(SparkAppHandle.State.FINISHED)
}
override def reviveOffers() {
@@ -145,4 +153,13 @@ private[spark] class LocalBackend(
override def applicationId(): String = appId
+ private def stop(finalState: SparkAppHandle.State): Unit = {
+ localEndpoint.ask(StopExecutor)
+ try {
+ launcherBackend.setState(finalState)
+ } finally {
+ launcherBackend.close()
+ }
+ }
+
}
diff --git a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
index d0c26dd056..aa15e792e2 100644
--- a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
+++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
@@ -27,6 +27,7 @@ import java.util.Map;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.bridge.SLF4JBridgeHandler;
import static org.junit.Assert.*;
/**
@@ -34,7 +35,13 @@ import static org.junit.Assert.*;
*/
public class SparkLauncherSuite {
+ static {
+ SLF4JBridgeHandler.removeHandlersForRootLogger();
+ SLF4JBridgeHandler.install();
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(SparkLauncherSuite.class);
+ private static final NamedThreadFactory TF = new NamedThreadFactory("SparkLauncherSuite-%d");
@Test
public void testSparkArgumentHandling() throws Exception {
@@ -94,14 +101,15 @@ public class SparkLauncherSuite {
.addSparkArg(opts.CONF,
String.format("%s=-Dfoo=ShouldBeOverriddenBelow", SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS))
.setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
- "-Dfoo=bar -Dtest.name=-testChildProcLauncher")
+ "-Dfoo=bar -Dtest.appender=childproc")
.setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path"))
.addSparkArg(opts.CLASS, "ShouldBeOverriddenBelow")
.setMainClass(SparkLauncherTestApp.class.getName())
.addAppArgs("proc");
final Process app = launcher.launch();
- new Redirector("stdout", app.getInputStream()).start();
- new Redirector("stderr", app.getErrorStream()).start();
+
+ new OutputRedirector(app.getInputStream(), TF);
+ new OutputRedirector(app.getErrorStream(), TF);
assertEquals(0, app.waitFor());
}
@@ -116,29 +124,4 @@ public class SparkLauncherSuite {
}
- private static class Redirector extends Thread {
-
- private final InputStream in;
-
- Redirector(String name, InputStream in) {
- this.in = in;
- setName(name);
- setDaemon(true);
- }
-
- @Override
- public void run() {
- try {
- BufferedReader reader = new BufferedReader(new InputStreamReader(in, "UTF-8"));
- String line;
- while ((line = reader.readLine()) != null) {
- LOG.warn(line);
- }
- } catch (Exception e) {
- LOG.error("Error reading process output.", e);
- }
- }
-
- }
-
}
diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties
index eb3b1999eb..a54d27de91 100644
--- a/core/src/test/resources/log4j.properties
+++ b/core/src/test/resources/log4j.properties
@@ -16,13 +16,22 @@
#
# Set everything to be logged to the file target/unit-tests.log
-log4j.rootCategory=INFO, file
+test.appender=file
+log4j.rootCategory=INFO, ${test.appender}
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=true
log4j.appender.file.file=target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+# Tests that launch java subprocesses can set the "test.appender" system property to
+# "console" to avoid having the child process's logs overwrite the unit test's
+# log file.
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%t: %m%n
+
# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.spark-project.jetty=WARN
org.spark-project.jetty.LEVEL=WARN
diff --git a/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala b/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala
new file mode 100644
index 0000000000..07e8869833
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher
+
+import java.util.concurrent.TimeUnit
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import org.scalatest.Matchers
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.spark._
+import org.apache.spark.launcher._
+
+class LauncherBackendSuite extends SparkFunSuite with Matchers {
+
+ private val tests = Seq(
+ "local" -> "local",
+ "standalone/client" -> "local-cluster[1,1,1024]")
+
+ tests.foreach { case (name, master) =>
+ test(s"$name: launcher handle") {
+ testWithMaster(master)
+ }
+ }
+
+ private def testWithMaster(master: String): Unit = {
+ val env = new java.util.HashMap[String, String]()
+ env.put("SPARK_PRINT_LAUNCH_COMMAND", "1")
+ val handle = new SparkLauncher(env)
+ .setSparkHome(sys.props("spark.test.home"))
+ .setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path"))
+ .setConf("spark.ui.enabled", "false")
+ .setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, s"-Dtest.appender=console")
+ .setMaster(master)
+ .setAppResource("spark-internal")
+ .setMainClass(TestApp.getClass.getName().stripSuffix("$"))
+ .startApplication()
+
+ try {
+ eventually(timeout(10 seconds), interval(100 millis)) {
+ handle.getAppId() should not be (null)
+ }
+
+ handle.stop()
+
+ eventually(timeout(10 seconds), interval(100 millis)) {
+ handle.getState() should be (SparkAppHandle.State.KILLED)
+ }
+ } finally {
+ handle.kill()
+ }
+ }
+
+}
+
+object TestApp {
+
+ def main(args: Array[String]): Unit = {
+ new SparkContext(new SparkConf()).parallelize(Seq(1)).foreach { i =>
+ Thread.sleep(TimeUnit.SECONDS.toMillis(20))
+ }
+ }
+
+}