diff options
Diffstat (limited to 'core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala')
-rw-r--r-- | core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala | 35 |
1 files changed, 21 insertions, 14 deletions
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala index 47d3390928..5e53d95ac2 100644 --- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala @@ -19,14 +19,15 @@ package spark.deploy.worker import java.io._ import java.lang.System.getenv -import spark.deploy.{ExecutorState, ExecutorStateChanged, ApplicationDescription} + import akka.actor.ActorRef + +import com.google.common.base.Charsets +import com.google.common.io.Files + import spark.{Utils, Logging} -import java.net.{URI, URL} -import org.apache.hadoop.fs.{Path, FileSystem} -import org.apache.hadoop.conf.Configuration -import scala.Some -import spark.deploy.ExecutorStateChanged +import spark.deploy.{ExecutorState, ApplicationDescription} +import spark.deploy.DeployMessages.ExecutorStateChanged /** * Manages the execution of one executor process. @@ -39,13 +40,11 @@ private[spark] class ExecutorRunner( val memory: Int, val worker: ActorRef, val workerId: String, - val hostPort: String, + val host: String, val sparkHome: File, val workDir: File) extends Logging { - Utils.checkHostPort(hostPort, "Expected hostport") - val fullId = appId + "/" + execId var workerThread: Thread = null var process: Process = null @@ -91,7 +90,7 @@ private[spark] class ExecutorRunner( /** Replace variables such as {{EXECUTOR_ID}} and {{CORES}} in a command argument passed to us */ def substituteVariables(argument: String): String = argument match { case "{{EXECUTOR_ID}}" => execId.toString - case "{{HOSTNAME}}" => Utils.parseHostPort(hostPort)._1 + case "{{HOSTNAME}}" => host case "{{CORES}}" => cores.toString case other => other } @@ -113,6 +112,7 @@ private[spark] class ExecutorRunner( val libraryOpts = getAppEnv("SPARK_LIBRARY_PATH") .map(p => List("-Djava.library.path=" + p)) .getOrElse(Nil) + val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS")).map(Utils.splitCommandString).getOrElse(Nil) val userOpts = getAppEnv("SPARK_JAVA_OPTS").map(Utils.splitCommandString).getOrElse(Nil) val memoryOpts = Seq("-Xms" + memory + "M", "-Xmx" + memory + "M") @@ -122,12 +122,12 @@ private[spark] class ExecutorRunner( Seq(sparkHome + "/bin/compute-classpath" + ext), extraEnvironment=appDesc.command.environment) - Seq("-cp", classPath) ++ libraryOpts ++ userOpts ++ memoryOpts + Seq("-cp", classPath) ++ libraryOpts ++ workerLocalOpts ++ userOpts ++ memoryOpts } /** Spawn a thread that will redirect a given stream to a file */ def redirectStream(in: InputStream, file: File) { - val out = new FileOutputStream(file) + val out = new FileOutputStream(file, true) new Thread("redirect output to " + file) { override def run() { try { @@ -163,9 +163,16 @@ private[spark] class ExecutorRunner( env.put("SPARK_LAUNCH_WITH_SCALA", "0") process = builder.start() + val header = "Spark Executor Command: %s\n%s\n\n".format( + command.mkString("\"", "\" \"", "\""), "=" * 40) + // Redirect its stdout and stderr to files - redirectStream(process.getInputStream, new File(executorDir, "stdout")) - redirectStream(process.getErrorStream, new File(executorDir, "stderr")) + val stdout = new File(executorDir, "stdout") + redirectStream(process.getInputStream, stdout) + + val stderr = new File(executorDir, "stderr") + Files.write(header, stderr, Charsets.UTF_8) + redirectStream(process.getErrorStream, stderr) // Wait for it to exit; this is actually a bad thing if it happens, because we expect to run // long-lived processes only. However, in the future, we might restart the executor a few |