From 015f7ef503d5544f79512b6333326749a1f0c48b Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 9 Oct 2015 15:28:09 -0500 Subject: [SPARK-8673] [LAUNCHER] API and infrastructure for communicating with child apps. This change adds an API that encapsulates information about an app launched using the library. It also creates a socket-based communication layer for apps that are launched as child processes; the launching application listens for connections from launched apps, and once communication is established, the channel can be used to send updates to the launching app, or to send commands to the child app. The change also includes hooks for local, standalone/client and yarn masters. Author: Marcelo Vanzin Closes #7052 from vanzin/SPARK-8673. --- .../apache/spark/launcher/LauncherBackend.scala | 119 +++++++++++++++++++++ .../cluster/SparkDeploySchedulerBackend.scala | 35 ++++-- .../spark/scheduler/local/LocalBackend.scala | 19 +++- .../apache/spark/launcher/SparkLauncherSuite.java | 39 ++----- core/src/test/resources/log4j.properties | 11 +- .../spark/launcher/LauncherBackendSuite.scala | 81 ++++++++++++++ 6 files changed, 265 insertions(+), 39 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala create mode 100644 core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala b/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala new file mode 100644 index 0000000000..3ea984c501 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.launcher + +import java.net.{InetAddress, Socket} + +import org.apache.spark.SPARK_VERSION +import org.apache.spark.launcher.LauncherProtocol._ +import org.apache.spark.util.ThreadUtils + +/** + * A class that can be used to talk to a launcher server. Users should extend this class to + * provide implementation for the abstract methods. + * + * See `LauncherServer` for an explanation of how launcher communication works. + */ +private[spark] abstract class LauncherBackend { + + private var clientThread: Thread = _ + private var connection: BackendConnection = _ + private var lastState: SparkAppHandle.State = _ + @volatile private var _isConnected = false + + def connect(): Unit = { + val port = sys.env.get(LauncherProtocol.ENV_LAUNCHER_PORT).map(_.toInt) + val secret = sys.env.get(LauncherProtocol.ENV_LAUNCHER_SECRET) + if (port != None && secret != None) { + val s = new Socket(InetAddress.getLoopbackAddress(), port.get) + connection = new BackendConnection(s) + connection.send(new Hello(secret.get, SPARK_VERSION)) + clientThread = LauncherBackend.threadFactory.newThread(connection) + clientThread.start() + _isConnected = true + } + } + + def close(): Unit = { + if (connection != null) { + try { + connection.close() + } finally { + if (clientThread != null) { + clientThread.join() + } + } + } + } + + def setAppId(appId: String): Unit = { + if (connection != null) { + connection.send(new SetAppId(appId)) + } + } + + def setState(state: SparkAppHandle.State): Unit = { + if (connection != null && lastState != state) { + connection.send(new SetState(state)) + lastState = state + } + } + + /** Return whether the launcher handle is still connected to this backend. */ + def isConnected(): Boolean = _isConnected + + /** + * Implementations should provide this method, which should try to stop the application + * as gracefully as possible. + */ + protected def onStopRequest(): Unit + + /** + * Callback for when the launcher handle disconnects from this backend. + */ + protected def onDisconnected() : Unit = { } + + + private class BackendConnection(s: Socket) extends LauncherConnection(s) { + + override protected def handle(m: Message): Unit = m match { + case _: Stop => + onStopRequest() + + case _ => + throw new IllegalArgumentException(s"Unexpected message type: ${m.getClass().getName()}") + } + + override def close(): Unit = { + try { + super.close() + } finally { + onDisconnected() + _isConnected = false + } + } + + } + +} + +private object LauncherBackend { + + val threadFactory = ThreadUtils.namedThreadFactory("LauncherBackend") + +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 27491ecf8b..2625c3e7ac 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -23,6 +23,7 @@ import org.apache.spark.rpc.RpcAddress import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv} import org.apache.spark.deploy.{ApplicationDescription, Command} import org.apache.spark.deploy.client.{AppClient, AppClientListener} +import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} import org.apache.spark.scheduler._ import org.apache.spark.util.Utils @@ -36,6 +37,9 @@ private[spark] class SparkDeploySchedulerBackend( private var client: AppClient = null private var stopping = false + private val launcherBackend = new LauncherBackend() { + override protected def onStopRequest(): Unit = stop(SparkAppHandle.State.KILLED) + } @volatile var shutdownCallback: SparkDeploySchedulerBackend => Unit = _ @volatile private var appId: String = _ @@ -47,6 +51,7 @@ private[spark] class SparkDeploySchedulerBackend( override def start() { super.start() + launcherBackend.connect() // The endpoint for executors to talk to us val driverUrl = rpcEnv.uriOf(SparkEnv.driverActorSystemName, @@ -87,24 +92,20 @@ private[spark] class SparkDeploySchedulerBackend( command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor) client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf) client.start() + launcherBackend.setState(SparkAppHandle.State.SUBMITTED) waitForRegistration() + launcherBackend.setState(SparkAppHandle.State.RUNNING) } - override def stop() { - stopping = true - super.stop() - client.stop() - - val callback = shutdownCallback - if (callback != null) { - callback(this) - } + override def stop(): Unit = synchronized { + stop(SparkAppHandle.State.FINISHED) } override def connected(appId: String) { logInfo("Connected to Spark cluster with app ID " + appId) this.appId = appId notifyContext() + launcherBackend.setAppId(appId) } override def disconnected() { @@ -117,6 +118,7 @@ private[spark] class SparkDeploySchedulerBackend( override def dead(reason: String) { notifyContext() if (!stopping) { + launcherBackend.setState(SparkAppHandle.State.KILLED) logError("Application has been killed. Reason: " + reason) try { scheduler.error(reason) @@ -188,4 +190,19 @@ private[spark] class SparkDeploySchedulerBackend( registrationBarrier.release() } + private def stop(finalState: SparkAppHandle.State): Unit = synchronized { + stopping = true + + launcherBackend.setState(finalState) + launcherBackend.close() + + super.stop() + client.stop() + + val callback = shutdownCallback + if (callback != null) { + callback(this) + } + } + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index 4d48fcfea4..c633d860ae 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -24,6 +24,7 @@ import java.nio.ByteBuffer import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv, TaskState} import org.apache.spark.TaskState.TaskState import org.apache.spark.executor.{Executor, ExecutorBackend} +import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.ExecutorInfo @@ -103,6 +104,9 @@ private[spark] class LocalBackend( private var localEndpoint: RpcEndpointRef = null private val userClassPath = getUserClasspath(conf) private val listenerBus = scheduler.sc.listenerBus + private val launcherBackend = new LauncherBackend() { + override def onStopRequest(): Unit = stop(SparkAppHandle.State.KILLED) + } /** * Returns a list of URLs representing the user classpath. @@ -114,6 +118,8 @@ private[spark] class LocalBackend( userClassPathStr.map(_.split(File.pathSeparator)).toSeq.flatten.map(new File(_).toURI.toURL) } + launcherBackend.connect() + override def start() { val rpcEnv = SparkEnv.get.rpcEnv val executorEndpoint = new LocalEndpoint(rpcEnv, userClassPath, scheduler, this, totalCores) @@ -122,10 +128,12 @@ private[spark] class LocalBackend( System.currentTimeMillis, executorEndpoint.localExecutorId, new ExecutorInfo(executorEndpoint.localExecutorHostname, totalCores, Map.empty))) + launcherBackend.setAppId(appId) + launcherBackend.setState(SparkAppHandle.State.RUNNING) } override def stop() { - localEndpoint.ask(StopExecutor) + stop(SparkAppHandle.State.FINISHED) } override def reviveOffers() { @@ -145,4 +153,13 @@ private[spark] class LocalBackend( override def applicationId(): String = appId + private def stop(finalState: SparkAppHandle.State): Unit = { + localEndpoint.ask(StopExecutor) + try { + launcherBackend.setState(finalState) + } finally { + launcherBackend.close() + } + } + } diff --git a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java index d0c26dd056..aa15e792e2 100644 --- a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java +++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java @@ -27,6 +27,7 @@ import java.util.Map; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.bridge.SLF4JBridgeHandler; import static org.junit.Assert.*; /** @@ -34,7 +35,13 @@ import static org.junit.Assert.*; */ public class SparkLauncherSuite { + static { + SLF4JBridgeHandler.removeHandlersForRootLogger(); + SLF4JBridgeHandler.install(); + } + private static final Logger LOG = LoggerFactory.getLogger(SparkLauncherSuite.class); + private static final NamedThreadFactory TF = new NamedThreadFactory("SparkLauncherSuite-%d"); @Test public void testSparkArgumentHandling() throws Exception { @@ -94,14 +101,15 @@ public class SparkLauncherSuite { .addSparkArg(opts.CONF, String.format("%s=-Dfoo=ShouldBeOverriddenBelow", SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS)) .setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, - "-Dfoo=bar -Dtest.name=-testChildProcLauncher") + "-Dfoo=bar -Dtest.appender=childproc") .setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path")) .addSparkArg(opts.CLASS, "ShouldBeOverriddenBelow") .setMainClass(SparkLauncherTestApp.class.getName()) .addAppArgs("proc"); final Process app = launcher.launch(); - new Redirector("stdout", app.getInputStream()).start(); - new Redirector("stderr", app.getErrorStream()).start(); + + new OutputRedirector(app.getInputStream(), TF); + new OutputRedirector(app.getErrorStream(), TF); assertEquals(0, app.waitFor()); } @@ -116,29 +124,4 @@ public class SparkLauncherSuite { } - private static class Redirector extends Thread { - - private final InputStream in; - - Redirector(String name, InputStream in) { - this.in = in; - setName(name); - setDaemon(true); - } - - @Override - public void run() { - try { - BufferedReader reader = new BufferedReader(new InputStreamReader(in, "UTF-8")); - String line; - while ((line = reader.readLine()) != null) { - LOG.warn(line); - } - } catch (Exception e) { - LOG.error("Error reading process output.", e); - } - } - - } - } diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties index eb3b1999eb..a54d27de91 100644 --- a/core/src/test/resources/log4j.properties +++ b/core/src/test/resources/log4j.properties @@ -16,13 +16,22 @@ # # Set everything to be logged to the file target/unit-tests.log -log4j.rootCategory=INFO, file +test.appender=file +log4j.rootCategory=INFO, ${test.appender} log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=true log4j.appender.file.file=target/unit-tests.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n +# Tests that launch java subprocesses can set the "test.appender" system property to +# "console" to avoid having the child process's logs overwrite the unit test's +# log file. +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%t: %m%n + # Ignore messages below warning level from Jetty, because it's a bit verbose log4j.logger.org.spark-project.jetty=WARN org.spark-project.jetty.LEVEL=WARN diff --git a/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala b/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala new file mode 100644 index 0000000000..07e8869833 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.launcher + +import java.util.concurrent.TimeUnit + +import scala.concurrent.duration._ +import scala.language.postfixOps + +import org.scalatest.Matchers +import org.scalatest.concurrent.Eventually._ + +import org.apache.spark._ +import org.apache.spark.launcher._ + +class LauncherBackendSuite extends SparkFunSuite with Matchers { + + private val tests = Seq( + "local" -> "local", + "standalone/client" -> "local-cluster[1,1,1024]") + + tests.foreach { case (name, master) => + test(s"$name: launcher handle") { + testWithMaster(master) + } + } + + private def testWithMaster(master: String): Unit = { + val env = new java.util.HashMap[String, String]() + env.put("SPARK_PRINT_LAUNCH_COMMAND", "1") + val handle = new SparkLauncher(env) + .setSparkHome(sys.props("spark.test.home")) + .setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path")) + .setConf("spark.ui.enabled", "false") + .setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, s"-Dtest.appender=console") + .setMaster(master) + .setAppResource("spark-internal") + .setMainClass(TestApp.getClass.getName().stripSuffix("$")) + .startApplication() + + try { + eventually(timeout(10 seconds), interval(100 millis)) { + handle.getAppId() should not be (null) + } + + handle.stop() + + eventually(timeout(10 seconds), interval(100 millis)) { + handle.getState() should be (SparkAppHandle.State.KILLED) + } + } finally { + handle.kill() + } + } + +} + +object TestApp { + + def main(args: Array[String]): Unit = { + new SparkContext(new SparkConf()).parallelize(Seq(1)).foreach { i => + Thread.sleep(TimeUnit.SECONDS.toMillis(20)) + } + } + +} -- cgit v1.2.3