aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-08-03 09:19:46 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-08-03 09:19:46 -0700
commitad94fbb3221fb50a613cdcaf1c8d8f08b45f4064 (patch)
treeff7bf1c30f56f64ef98dcd94bd3812fbad322dca
parentb4905c383bf24143966d8b2d0ae69ef9ee7c3b0e (diff)
downloadspark-ad94fbb3221fb50a613cdcaf1c8d8f08b45f4064.tar.gz
spark-ad94fbb3221fb50a613cdcaf1c8d8f08b45f4064.tar.bz2
spark-ad94fbb3221fb50a613cdcaf1c8d8f08b45f4064.zip
Log the launch command for Spark executors
-rw-r--r--core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala16
1 files changed, 13 insertions, 3 deletions
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index 345dfe879c..1dcdf20960 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -22,6 +22,9 @@ import java.lang.System.getenv
import akka.actor.ActorRef
+import com.google.common.base.Charsets
+import com.google.common.io.Files
+
import spark.{Utils, Logging}
import spark.deploy.{ExecutorState, ApplicationDescription}
import spark.deploy.DeployMessages.ExecutorStateChanged
@@ -125,7 +128,7 @@ private[spark] class ExecutorRunner(
/** Spawn a thread that will redirect a given stream to a file */
def redirectStream(in: InputStream, file: File) {
- val out = new FileOutputStream(file)
+ val out = new FileOutputStream(file, true)
new Thread("redirect output to " + file) {
override def run() {
try {
@@ -161,9 +164,16 @@ private[spark] class ExecutorRunner(
env.put("SPARK_LAUNCH_WITH_SCALA", "0")
process = builder.start()
+ val header = "Spark Executor Command: %s\n======================================\n\n"
+ .format(command.mkString(" "))
+
// Redirect its stdout and stderr to files
- redirectStream(process.getInputStream, new File(executorDir, "stdout"))
- redirectStream(process.getErrorStream, new File(executorDir, "stderr"))
+ val stdout = new File(executorDir, "stdout")
+ Files.write(header, stdout, Charsets.UTF_8)
+ redirectStream(process.getInputStream, stdout)
+
+ val stderr = new File(executorDir, "stderr")
+ redirectStream(process.getErrorStream, stderr)
// Wait for it to exit; this is actually a bad thing if it happens, because we expect to run
// long-lived processes only. However, in the future, we might restart the executor a few