aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala200
1 files changed, 200 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
new file mode 100644
index 0000000000..e3dc30eefc
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.worker
+
+import java.io._
+import java.lang.System.getenv
+
+import akka.actor.ActorRef
+
+import com.google.common.base.Charsets
+import com.google.common.io.Files
+
+import org.apache.spark.{Logging}
+import org.apache.spark.deploy.{ExecutorState, ApplicationDescription}
+import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
+import org.apache.spark.util.Utils
+
+/**
+ * Manages the execution of one executor process.
+ */
+private[spark] class ExecutorRunner(
+ val appId: String,
+ val execId: Int,
+ val appDesc: ApplicationDescription,
+ val cores: Int,
+ val memory: Int,
+ val worker: ActorRef,
+ val workerId: String,
+ val host: String,
+ val sparkHome: File,
+ val workDir: File)
+ extends Logging {
+
+ val fullId = appId + "/" + execId
+ var workerThread: Thread = null
+ var process: Process = null
+ var shutdownHook: Thread = null
+
+ private def getAppEnv(key: String): Option[String] =
+ appDesc.command.environment.get(key).orElse(Option(getenv(key)))
+
+ def start() {
+ workerThread = new Thread("ExecutorRunner for " + fullId) {
+ override def run() { fetchAndRunExecutor() }
+ }
+ workerThread.start()
+
+ // Shutdown hook that kills actors on shutdown.
+ shutdownHook = new Thread() {
+ override def run() {
+ if (process != null) {
+ logInfo("Shutdown hook killing child process.")
+ process.destroy()
+ process.waitFor()
+ }
+ }
+ }
+ Runtime.getRuntime.addShutdownHook(shutdownHook)
+ }
+
+ /** Stop this executor runner, including killing the process it launched */
+ def kill() {
+ if (workerThread != null) {
+ workerThread.interrupt()
+ workerThread = null
+ if (process != null) {
+ logInfo("Killing process!")
+ process.destroy()
+ process.waitFor()
+ }
+ worker ! ExecutorStateChanged(appId, execId, ExecutorState.KILLED, None, None)
+ Runtime.getRuntime.removeShutdownHook(shutdownHook)
+ }
+ }
+
+ /** Replace variables such as {{EXECUTOR_ID}} and {{CORES}} in a command argument passed to us */
+ def substituteVariables(argument: String): String = argument match {
+ case "{{EXECUTOR_ID}}" => execId.toString
+ case "{{HOSTNAME}}" => host
+ case "{{CORES}}" => cores.toString
+ case other => other
+ }
+
+ def buildCommandSeq(): Seq[String] = {
+ val command = appDesc.command
+ val runner = getAppEnv("JAVA_HOME").map(_ + "/bin/java").getOrElse("java")
+ // SPARK-698: do not call the run.cmd script, as process.destroy()
+ // fails to kill a process tree on Windows
+ Seq(runner) ++ buildJavaOpts() ++ Seq(command.mainClass) ++
+ command.arguments.map(substituteVariables)
+ }
+
+ /**
+ * Attention: this must always be aligned with the environment variables in the run scripts and
+ * the way the JAVA_OPTS are assembled there.
+ */
+ def buildJavaOpts(): Seq[String] = {
+ val libraryOpts = getAppEnv("SPARK_LIBRARY_PATH")
+ .map(p => List("-Djava.library.path=" + p))
+ .getOrElse(Nil)
+ val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS")).map(Utils.splitCommandString).getOrElse(Nil)
+ val userOpts = getAppEnv("SPARK_JAVA_OPTS").map(Utils.splitCommandString).getOrElse(Nil)
+ val memoryOpts = Seq("-Xms" + memory + "M", "-Xmx" + memory + "M")
+
+ // Figure out our classpath with the external compute-classpath script
+ val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh"
+ val classPath = Utils.executeAndGetOutput(
+ Seq(sparkHome + "/bin/compute-classpath" + ext),
+ extraEnvironment=appDesc.command.environment)
+
+ Seq("-cp", classPath) ++ libraryOpts ++ workerLocalOpts ++ userOpts ++ memoryOpts
+ }
+
+ /** Spawn a thread that will redirect a given stream to a file */
+ def redirectStream(in: InputStream, file: File) {
+ val out = new FileOutputStream(file, true)
+ new Thread("redirect output to " + file) {
+ override def run() {
+ try {
+ Utils.copyStream(in, out, true)
+ } catch {
+ case e: IOException =>
+ logInfo("Redirection to " + file + " closed: " + e.getMessage)
+ }
+ }
+ }.start()
+ }
+
+ /**
+ * Download and run the executor described in our ApplicationDescription
+ */
+ def fetchAndRunExecutor() {
+ try {
+ // Create the executor's working directory
+ val executorDir = new File(workDir, appId + "/" + execId)
+ if (!executorDir.mkdirs()) {
+ throw new IOException("Failed to create directory " + executorDir)
+ }
+
+ // Launch the process
+ val command = buildCommandSeq()
+ logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
+ val builder = new ProcessBuilder(command: _*).directory(executorDir)
+ val env = builder.environment()
+ for ((key, value) <- appDesc.command.environment) {
+ env.put(key, value)
+ }
+ // In case we are running this from within the Spark Shell, avoid creating a "scala"
+ // parent process for the executor command
+ env.put("SPARK_LAUNCH_WITH_SCALA", "0")
+ process = builder.start()
+
+ val header = "Spark Executor Command: %s\n%s\n\n".format(
+ command.mkString("\"", "\" \"", "\""), "=" * 40)
+
+ // Redirect its stdout and stderr to files
+ val stdout = new File(executorDir, "stdout")
+ redirectStream(process.getInputStream, stdout)
+
+ val stderr = new File(executorDir, "stderr")
+ Files.write(header, stderr, Charsets.UTF_8)
+ redirectStream(process.getErrorStream, stderr)
+
+ // Wait for it to exit; this is actually a bad thing if it happens, because we expect to run
+ // long-lived processes only. However, in the future, we might restart the executor a few
+ // times on the same machine.
+ val exitCode = process.waitFor()
+ val message = "Command exited with code " + exitCode
+ worker ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(message),
+ Some(exitCode))
+ } catch {
+ case interrupted: InterruptedException =>
+ logInfo("Runner thread for executor " + fullId + " interrupted")
+
+ case e: Exception => {
+ logError("Error running executor", e)
+ if (process != null) {
+ process.destroy()
+ }
+ val message = e.getClass + ": " + e.getMessage
+ worker ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(message), None)
+ }
+ }
+ }
+}