aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala')
-rw-r--r--core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala26
1 files changed, 13 insertions, 13 deletions
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index 4ef637090c..de11771c8e 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -1,7 +1,7 @@
package spark.deploy.worker
import java.io._
-import spark.deploy.{ExecutorState, ExecutorStateChanged, JobDescription}
+import spark.deploy.{ExecutorState, ExecutorStateChanged, ApplicationDescription}
import akka.actor.ActorRef
import spark.{Utils, Logging}
import java.net.{URI, URL}
@@ -14,9 +14,9 @@ import spark.deploy.ExecutorStateChanged
* Manages the execution of one executor process.
*/
private[spark] class ExecutorRunner(
- val jobId: String,
+ val appId: String,
val execId: Int,
- val jobDesc: JobDescription,
+ val appDesc: ApplicationDescription,
val cores: Int,
val memory: Int,
val worker: ActorRef,
@@ -26,7 +26,7 @@ private[spark] class ExecutorRunner(
val workDir: File)
extends Logging {
- val fullId = jobId + "/" + execId
+ val fullId = appId + "/" + execId
var workerThread: Thread = null
var process: Process = null
var shutdownHook: Thread = null
@@ -60,7 +60,7 @@ private[spark] class ExecutorRunner(
process.destroy()
process.waitFor()
}
- worker ! ExecutorStateChanged(jobId, execId, ExecutorState.KILLED, None, None)
+ worker ! ExecutorStateChanged(appId, execId, ExecutorState.KILLED, None, None)
Runtime.getRuntime.removeShutdownHook(shutdownHook)
}
}
@@ -74,10 +74,10 @@ private[spark] class ExecutorRunner(
}
def buildCommandSeq(): Seq[String] = {
- val command = jobDesc.command
- val script = if (System.getProperty("os.name").startsWith("Windows")) "run.cmd" else "run";
+ val command = appDesc.command
+ val script = if (System.getProperty("os.name").startsWith("Windows")) "run.cmd" else "run"
val runScript = new File(sparkHome, script).getCanonicalPath
- Seq(runScript, command.mainClass) ++ command.arguments.map(substituteVariables)
+ Seq(runScript, command.mainClass) ++ (command.arguments ++ Seq(appId)).map(substituteVariables)
}
/** Spawn a thread that will redirect a given stream to a file */
@@ -96,12 +96,12 @@ private[spark] class ExecutorRunner(
}
/**
- * Download and run the executor described in our JobDescription
+ * Download and run the executor described in our ApplicationDescription
*/
def fetchAndRunExecutor() {
try {
// Create the executor's working directory
- val executorDir = new File(workDir, jobId + "/" + execId)
+ val executorDir = new File(workDir, appId + "/" + execId)
if (!executorDir.mkdirs()) {
throw new IOException("Failed to create directory " + executorDir)
}
@@ -110,7 +110,7 @@ private[spark] class ExecutorRunner(
val command = buildCommandSeq()
val builder = new ProcessBuilder(command: _*).directory(executorDir)
val env = builder.environment()
- for ((key, value) <- jobDesc.command.environment) {
+ for ((key, value) <- appDesc.command.environment) {
env.put(key, value)
}
env.put("SPARK_MEM", memory.toString + "m")
@@ -128,7 +128,7 @@ private[spark] class ExecutorRunner(
// times on the same machine.
val exitCode = process.waitFor()
val message = "Command exited with code " + exitCode
- worker ! ExecutorStateChanged(jobId, execId, ExecutorState.FAILED, Some(message),
+ worker ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(message),
Some(exitCode))
} catch {
case interrupted: InterruptedException =>
@@ -140,7 +140,7 @@ private[spark] class ExecutorRunner(
process.destroy()
}
val message = e.getClass + ": " + e.getMessage
- worker ! ExecutorStateChanged(jobId, execId, ExecutorState.FAILED, Some(message), None)
+ worker ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(message), None)
}
}
}