aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala119
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala35
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala19
-rw-r--r--core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java39
-rw-r--r--core/src/test/resources/log4j.properties11
-rw-r--r--core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala81
6 files changed, 265 insertions, 39 deletions
diff --git a/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala b/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala
new file mode 100644
index 0000000000..3ea984c501
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher
+
+import java.net.{InetAddress, Socket}
+
+import org.apache.spark.SPARK_VERSION
+import org.apache.spark.launcher.LauncherProtocol._
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * A class that can be used to talk to a launcher server. Users should extend this class to
+ * provide implementation for the abstract methods.
+ *
+ * See `LauncherServer` for an explanation of how launcher communication works.
+ */
+private[spark] abstract class LauncherBackend {
+
+ private var clientThread: Thread = _
+ private var connection: BackendConnection = _
+ private var lastState: SparkAppHandle.State = _
+ @volatile private var _isConnected = false
+
+ def connect(): Unit = {
+ val port = sys.env.get(LauncherProtocol.ENV_LAUNCHER_PORT).map(_.toInt)
+ val secret = sys.env.get(LauncherProtocol.ENV_LAUNCHER_SECRET)
+ if (port != None && secret != None) {
+ val s = new Socket(InetAddress.getLoopbackAddress(), port.get)
+ connection = new BackendConnection(s)
+ connection.send(new Hello(secret.get, SPARK_VERSION))
+ clientThread = LauncherBackend.threadFactory.newThread(connection)
+ clientThread.start()
+ _isConnected = true
+ }
+ }
+
+ def close(): Unit = {
+ if (connection != null) {
+ try {
+ connection.close()
+ } finally {
+ if (clientThread != null) {
+ clientThread.join()
+ }
+ }
+ }
+ }
+
+ def setAppId(appId: String): Unit = {
+ if (connection != null) {
+ connection.send(new SetAppId(appId))
+ }
+ }
+
+ def setState(state: SparkAppHandle.State): Unit = {
+ if (connection != null && lastState != state) {
+ connection.send(new SetState(state))
+ lastState = state
+ }
+ }
+
+ /** Return whether the launcher handle is still connected to this backend. */
+ def isConnected(): Boolean = _isConnected
+
+ /**
+ * Implementations should provide this method, which should try to stop the application
+ * as gracefully as possible.
+ */
+ protected def onStopRequest(): Unit
+
+ /**
+ * Callback for when the launcher handle disconnects from this backend.
+ */
+ protected def onDisconnected() : Unit = { }
+
+
+ private class BackendConnection(s: Socket) extends LauncherConnection(s) {
+
+ override protected def handle(m: Message): Unit = m match {
+ case _: Stop =>
+ onStopRequest()
+
+ case _ =>
+ throw new IllegalArgumentException(s"Unexpected message type: ${m.getClass().getName()}")
+ }
+
+ override def close(): Unit = {
+ try {
+ super.close()
+ } finally {
+ onDisconnected()
+ _isConnected = false
+ }
+ }
+
+ }
+
+}
+
+private object LauncherBackend {
+
+ val threadFactory = ThreadUtils.namedThreadFactory("LauncherBackend")
+
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 27491ecf8b..2625c3e7ac 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -23,6 +23,7 @@ import org.apache.spark.rpc.RpcAddress
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.deploy.{ApplicationDescription, Command}
import org.apache.spark.deploy.client.{AppClient, AppClientListener}
+import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
import org.apache.spark.scheduler._
import org.apache.spark.util.Utils
@@ -36,6 +37,9 @@ private[spark] class SparkDeploySchedulerBackend(
private var client: AppClient = null
private var stopping = false
+ private val launcherBackend = new LauncherBackend() {
+ override protected def onStopRequest(): Unit = stop(SparkAppHandle.State.KILLED)
+ }
@volatile var shutdownCallback: SparkDeploySchedulerBackend => Unit = _
@volatile private var appId: String = _
@@ -47,6 +51,7 @@ private[spark] class SparkDeploySchedulerBackend(
override def start() {
super.start()
+ launcherBackend.connect()
// The endpoint for executors to talk to us
val driverUrl = rpcEnv.uriOf(SparkEnv.driverActorSystemName,
@@ -87,24 +92,20 @@ private[spark] class SparkDeploySchedulerBackend(
command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
client.start()
+ launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
waitForRegistration()
+ launcherBackend.setState(SparkAppHandle.State.RUNNING)
}
- override def stop() {
- stopping = true
- super.stop()
- client.stop()
-
- val callback = shutdownCallback
- if (callback != null) {
- callback(this)
- }
+ override def stop(): Unit = synchronized {
+ stop(SparkAppHandle.State.FINISHED)
}
override def connected(appId: String) {
logInfo("Connected to Spark cluster with app ID " + appId)
this.appId = appId
notifyContext()
+ launcherBackend.setAppId(appId)
}
override def disconnected() {
@@ -117,6 +118,7 @@ private[spark] class SparkDeploySchedulerBackend(
override def dead(reason: String) {
notifyContext()
if (!stopping) {
+ launcherBackend.setState(SparkAppHandle.State.KILLED)
logError("Application has been killed. Reason: " + reason)
try {
scheduler.error(reason)
@@ -188,4 +190,19 @@ private[spark] class SparkDeploySchedulerBackend(
registrationBarrier.release()
}
+ private def stop(finalState: SparkAppHandle.State): Unit = synchronized {
+ stopping = true
+
+ launcherBackend.setState(finalState)
+ launcherBackend.close()
+
+ super.stop()
+ client.stop()
+
+ val callback = shutdownCallback
+ if (callback != null) {
+ callback(this)
+ }
+ }
+
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 4d48fcfea4..c633d860ae 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.{Executor, ExecutorBackend}
+import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
@@ -103,6 +104,9 @@ private[spark] class LocalBackend(
private var localEndpoint: RpcEndpointRef = null
private val userClassPath = getUserClasspath(conf)
private val listenerBus = scheduler.sc.listenerBus
+ private val launcherBackend = new LauncherBackend() {
+ override def onStopRequest(): Unit = stop(SparkAppHandle.State.KILLED)
+ }
/**
* Returns a list of URLs representing the user classpath.
@@ -114,6 +118,8 @@ private[spark] class LocalBackend(
userClassPathStr.map(_.split(File.pathSeparator)).toSeq.flatten.map(new File(_).toURI.toURL)
}
+ launcherBackend.connect()
+
override def start() {
val rpcEnv = SparkEnv.get.rpcEnv
val executorEndpoint = new LocalEndpoint(rpcEnv, userClassPath, scheduler, this, totalCores)
@@ -122,10 +128,12 @@ private[spark] class LocalBackend(
System.currentTimeMillis,
executorEndpoint.localExecutorId,
new ExecutorInfo(executorEndpoint.localExecutorHostname, totalCores, Map.empty)))
+ launcherBackend.setAppId(appId)
+ launcherBackend.setState(SparkAppHandle.State.RUNNING)
}
override def stop() {
- localEndpoint.ask(StopExecutor)
+ stop(SparkAppHandle.State.FINISHED)
}
override def reviveOffers() {
@@ -145,4 +153,13 @@ private[spark] class LocalBackend(
override def applicationId(): String = appId
+ private def stop(finalState: SparkAppHandle.State): Unit = {
+ localEndpoint.ask(StopExecutor)
+ try {
+ launcherBackend.setState(finalState)
+ } finally {
+ launcherBackend.close()
+ }
+ }
+
}
diff --git a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
index d0c26dd056..aa15e792e2 100644
--- a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
+++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
@@ -27,6 +27,7 @@ import java.util.Map;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.bridge.SLF4JBridgeHandler;
import static org.junit.Assert.*;
/**
@@ -34,7 +35,13 @@ import static org.junit.Assert.*;
*/
public class SparkLauncherSuite {
+ static {
+ SLF4JBridgeHandler.removeHandlersForRootLogger();
+ SLF4JBridgeHandler.install();
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(SparkLauncherSuite.class);
+ private static final NamedThreadFactory TF = new NamedThreadFactory("SparkLauncherSuite-%d");
@Test
public void testSparkArgumentHandling() throws Exception {
@@ -94,14 +101,15 @@ public class SparkLauncherSuite {
.addSparkArg(opts.CONF,
String.format("%s=-Dfoo=ShouldBeOverriddenBelow", SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS))
.setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
- "-Dfoo=bar -Dtest.name=-testChildProcLauncher")
+ "-Dfoo=bar -Dtest.appender=childproc")
.setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path"))
.addSparkArg(opts.CLASS, "ShouldBeOverriddenBelow")
.setMainClass(SparkLauncherTestApp.class.getName())
.addAppArgs("proc");
final Process app = launcher.launch();
- new Redirector("stdout", app.getInputStream()).start();
- new Redirector("stderr", app.getErrorStream()).start();
+
+ new OutputRedirector(app.getInputStream(), TF);
+ new OutputRedirector(app.getErrorStream(), TF);
assertEquals(0, app.waitFor());
}
@@ -116,29 +124,4 @@ public class SparkLauncherSuite {
}
- private static class Redirector extends Thread {
-
- private final InputStream in;
-
- Redirector(String name, InputStream in) {
- this.in = in;
- setName(name);
- setDaemon(true);
- }
-
- @Override
- public void run() {
- try {
- BufferedReader reader = new BufferedReader(new InputStreamReader(in, "UTF-8"));
- String line;
- while ((line = reader.readLine()) != null) {
- LOG.warn(line);
- }
- } catch (Exception e) {
- LOG.error("Error reading process output.", e);
- }
- }
-
- }
-
}
diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties
index eb3b1999eb..a54d27de91 100644
--- a/core/src/test/resources/log4j.properties
+++ b/core/src/test/resources/log4j.properties
@@ -16,13 +16,22 @@
#
# Set everything to be logged to the file target/unit-tests.log
-log4j.rootCategory=INFO, file
+test.appender=file
+log4j.rootCategory=INFO, ${test.appender}
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=true
log4j.appender.file.file=target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+# Tests that launch java subprocesses can set the "test.appender" system property to
+# "console" to avoid having the child process's logs overwrite the unit test's
+# log file.
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%t: %m%n
+
# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.spark-project.jetty=WARN
org.spark-project.jetty.LEVEL=WARN
diff --git a/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala b/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala
new file mode 100644
index 0000000000..07e8869833
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher
+
+import java.util.concurrent.TimeUnit
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import org.scalatest.Matchers
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.spark._
+import org.apache.spark.launcher._
+
+class LauncherBackendSuite extends SparkFunSuite with Matchers {
+
+ private val tests = Seq(
+ "local" -> "local",
+ "standalone/client" -> "local-cluster[1,1,1024]")
+
+ tests.foreach { case (name, master) =>
+ test(s"$name: launcher handle") {
+ testWithMaster(master)
+ }
+ }
+
+ private def testWithMaster(master: String): Unit = {
+ val env = new java.util.HashMap[String, String]()
+ env.put("SPARK_PRINT_LAUNCH_COMMAND", "1")
+ val handle = new SparkLauncher(env)
+ .setSparkHome(sys.props("spark.test.home"))
+ .setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path"))
+ .setConf("spark.ui.enabled", "false")
+ .setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, s"-Dtest.appender=console")
+ .setMaster(master)
+ .setAppResource("spark-internal")
+ .setMainClass(TestApp.getClass.getName().stripSuffix("$"))
+ .startApplication()
+
+ try {
+ eventually(timeout(10 seconds), interval(100 millis)) {
+ handle.getAppId() should not be (null)
+ }
+
+ handle.stop()
+
+ eventually(timeout(10 seconds), interval(100 millis)) {
+ handle.getState() should be (SparkAppHandle.State.KILLED)
+ }
+ } finally {
+ handle.kill()
+ }
+ }
+
+}
+
+object TestApp {
+
+ def main(args: Array[String]): Unit = {
+ new SparkContext(new SparkConf()).parallelize(Seq(1)).foreach { i =>
+ Thread.sleep(TimeUnit.SECONDS.toMillis(20))
+ }
+ }
+
+}