aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-12-28 11:08:37 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-12-29 11:14:36 -0800
commit35f6dc252a8961189837e79914f305d0745a8792 (patch)
tree921cf9dca3db07baf9a00a1cb90a44804b2bfb2f /core/src/main/scala/org/apache
parentc8c8b42a6fde0d59217b264bb2439751696c467f (diff)
downloadspark-35f6dc252a8961189837e79914f305d0745a8792.tar.gz
spark-35f6dc252a8961189837e79914f305d0745a8792.tar.bz2
spark-35f6dc252a8961189837e79914f305d0745a8792.zip
Changes to allow fate sharing of drivers/executors and workers.
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala23
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala63
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala50
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala30
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala67
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala47
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala27
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala2
16 files changed, 229 insertions, 127 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index ad3337d94c..41f810dde6 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -622,6 +622,7 @@ class SparkContext(
} else {
val uri = new URI(path)
key = uri.getScheme match {
+ // TODO: Have this load jars that are available on the driver
// A JAR file which exists only on the driver node
case null | "file" =>
if (SparkHadoopUtil.get.isYarnMode()) {
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 7bfc3779ab..34460d359d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -158,7 +158,7 @@ private[deploy] object DeployMessages {
assert (port > 0)
}
- // Actor System to Worker
+ // Liveness checks in various places
case object SendHeartbeat
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala
index 32ff6db8f0..aba81ec27c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala
@@ -19,13 +19,10 @@ package org.apache.spark.deploy
private[spark] class DriverDescription(
val jarUrl: String,
- val mainClass: String,
val mem: Int,
val cores: Int,
- val options: Seq[String],
- val javaOptions: Seq[String],
- val envVars: Seq[(String, String)])
+ val command: Command)
extends Serializable {
- override def toString: String = s"DriverDescription ($mainClass)"
+ override def toString: String = s"DriverDescription (${command.mainClass})"
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
index 8f19294849..7e75563e60 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
@@ -17,12 +17,14 @@
package org.apache.spark.deploy.client
+import scala.collection.JavaConversions._
+import scala.collection.mutable.Map
import scala.concurrent._
import akka.actor._
import org.apache.spark.Logging
-import org.apache.spark.deploy.DriverDescription
+import org.apache.spark.deploy.{Command, DriverDescription}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
import org.apache.spark.util.{AkkaUtils, Utils}
@@ -68,14 +70,20 @@ object DriverClient extends Logging {
driverArgs.cmd match {
case "launch" =>
+ // TODO: Could modify env here to pass a flag indicating Spark is in deploy-driver mode
+ // then use that to load jars locally
+ val env = Map[String, String]()
+ System.getenv().foreach{case (k, v) => env(k) = v}
+
+ val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
+ val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++
+ driverArgs.driverOptions, env)
+
val driverDescription = new DriverDescription(
driverArgs.jarUrl,
- driverArgs.mainClass,
driverArgs.memory,
driverArgs.cores,
- driverArgs.driverOptions,
- driverArgs.driverJavaOptions,
- driverArgs.driverEnvVars)
+ command)
driver ! RequestSubmitDriver(driverDescription)
case "kill" =>
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala
index 0c84cc9a4a..3875838319 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala
@@ -32,11 +32,7 @@ private[spark] class DriverClientArguments(args: Array[String]) {
var memory: Int = 512
var cores: Int = 1
private var _driverOptions = ListBuffer[String]()
- private var _driverJavaOptions = ListBuffer[String]()
- private var _driverEnvVars = ListBuffer[(String, String)]()
def driverOptions = _driverOptions.toSeq
- def driverJavaOptions = _driverJavaOptions.toSeq
- def driverEnvVars = _driverEnvVars.toSeq
// kill parameters
var driverId: String = ""
@@ -52,19 +48,6 @@ private[spark] class DriverClientArguments(args: Array[String]) {
memory = value.toInt
parse(tail)
- case ("--java-option" | "-j") :: value :: tail =>
- _driverJavaOptions += value
- parse(tail)
-
- case ("--environment-variable" | "-e") :: value :: tail =>
- val parts = value.split("=")
- if (parts.length != 2) {
- println(s"Error - invalid environment variable (expecting K=V): $value")
- printUsageAndExit(1)
- }
- _driverEnvVars += ((parts(0), parts(1)))
- parse(tail)
-
case ("--help" | "-h") :: tail =>
printUsageAndExit(0)
@@ -92,7 +75,7 @@ private[spark] class DriverClientArguments(args: Array[String]) {
// 1) Create an uber jar with your application and dependencies (excluding Spark)
// 2) You'll need to add this jar using addJar(X) inside of your spark context
- // TODO: It wouldnt be too hard to allow users to submit their app and dependency jars
+ // TODO: It wouldn't be too hard to allow users to submit their app and dependency jars
// separately similar to in the YARN client.
System.err.println(
"usage: DriverClient [options] launch <active-master> <jar-url> <main-class> " +
@@ -100,9 +83,7 @@ private[spark] class DriverClientArguments(args: Array[String]) {
"usage: DriverClient kill <active-master> <driver-id>\n\n" +
"Options:\n" +
" -c CORES, --cores CORES Number of cores to request \n" +
- " -m MEMORY, --memory MEMORY Megabytes of memory to request\n" +
- " -o JAVA_OPT, --java-option JAVA_OPT JVM option to pass to driver\n" +
- " -e K=V, --environment-variable K=V Environment variable to pass to driver\n")
+ " -m MEMORY, --memory MEMORY Megabytes of memory to request\n")
System.exit(exitCode)
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index a0db2a23be..efb9bf49f4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -179,7 +179,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
val msg = s"Can only accept driver submissions in ALIVE state. Current state: $state."
sender ! SubmitDriverResponse(false, msg)
} else {
- logInfo("Driver submitted " + description.mainClass)
+ logInfo("Driver submitted " + description.command.mainClass)
val driver = createDriver(description)
persistenceEngine.addDriver(driver)
waitingDrivers += driver
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
index 951fc679ef..a72d76be52 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
@@ -170,7 +170,7 @@ private[spark] class IndexPage(parent: MasterWebUI) {
<td sorttable_customkey={driver.desc.mem.toString}>
{Utils.megabytesToString(driver.desc.mem.toLong)}
</td>
- <td>{driver.desc.mainClass}</td>
+ <td>{driver.desc.command.mainClass}</td>
</tr>
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
new file mode 100644
index 0000000000..785aecf1fe
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
@@ -0,0 +1,63 @@
+package org.apache.spark.deploy.worker
+
+import java.io.{File, FileOutputStream, IOException, InputStream}
+import java.lang.System._
+
+import org.apache.spark.Logging
+import org.apache.spark.deploy.Command
+import org.apache.spark.util.Utils
+
+/**
+ ** Utilities for running commands with the spark classpath.
+ */
+object CommandUtils extends Logging {
+ private[spark] def buildCommandSeq(command: Command, memory: Int, sparkHome: String): Seq[String] = {
+ val runner = getEnv("JAVA_HOME", command).map(_ + "/bin/java").getOrElse("java")
+
+ // SPARK-698: do not call the run.cmd script, as process.destroy()
+ // fails to kill a process tree on Windows
+ Seq(runner) ++ buildJavaOpts(command, memory, sparkHome) ++ Seq(command.mainClass) ++
+ command.arguments
+ }
+
+ private def getEnv(key: String, command: Command): Option[String] =
+ command.environment.get(key).orElse(Option(getenv(key)))
+
+ /**
+ * Attention: this must always be aligned with the environment variables in the run scripts and
+ * the way the JAVA_OPTS are assembled there.
+ */
+ def buildJavaOpts(command: Command, memory: Int, sparkHome: String): Seq[String] = {
+ val libraryOpts = getEnv("SPARK_LIBRARY_PATH", command)
+ .map(p => List("-Djava.library.path=" + p))
+ .getOrElse(Nil)
+ val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS")).map(Utils.splitCommandString).getOrElse(Nil)
+ val userOpts = getEnv("SPARK_JAVA_OPTS", command).map(Utils.splitCommandString).getOrElse(Nil)
+ val memoryOpts = Seq(s"-Xms${memory}M", s"-Xmx${memory}M")
+
+ // Figure out our classpath with the external compute-classpath script
+ val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh"
+ val classPath = Utils.executeAndGetOutput(
+ Seq(sparkHome + "/bin/compute-classpath" + ext),
+ extraEnvironment=command.environment)
+
+ Seq("-cp", classPath) ++ libraryOpts ++ workerLocalOpts ++ userOpts ++ memoryOpts
+ }
+
+ /** Spawn a thread that will redirect a given stream to a file */
+ def redirectStream(in: InputStream, file: File) {
+ val out = new FileOutputStream(file, true)
+ // TODO: It would be nice to add a shutdown hook here that explains why the output is
+ // terminating. Otherwise if the worker dies the executor logs will silently stop.
+ new Thread("redirect output to " + file) {
+ override def run() {
+ try {
+ Utils.copyStream(in, out, true)
+ } catch {
+ case e: IOException =>
+ logInfo("Redirection to " + file + " closed: " + e.getMessage)
+ }
+ }
+ }.start()
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index 402ad53f18..8950fb71a1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -19,6 +19,8 @@ package org.apache.spark.deploy.worker
import java.io._
+import scala.collection.mutable.Map
+
import akka.actor.ActorRef
import com.google.common.base.Charsets
import com.google.common.io.Files
@@ -26,10 +28,9 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileUtil, Path}
import org.apache.spark.Logging
-import org.apache.spark.deploy.DriverDescription
+import org.apache.spark.deploy.{Command, DriverDescription}
import org.apache.spark.deploy.DeployMessages.DriverStateChanged
import org.apache.spark.deploy.master.DriverState
-import org.apache.spark.util.Utils
/**
* Manages the execution of one driver, including automatically restarting the driver on failure.
@@ -37,8 +38,10 @@ import org.apache.spark.util.Utils
private[spark] class DriverRunner(
val driverId: String,
val workDir: File,
+ val sparkHome: File,
val driverDesc: DriverDescription,
- val worker: ActorRef)
+ val worker: ActorRef,
+ val workerUrl: String)
extends Logging {
@volatile var process: Option[Process] = None
@@ -53,9 +56,17 @@ private[spark] class DriverRunner(
try {
val driverDir = createWorkingDirectory()
val localJarFilename = downloadUserJar(driverDir)
- val command = Seq("java") ++ driverDesc.javaOptions ++ Seq(s"-Xmx${driverDesc.mem}m") ++
- Seq("-cp", localJarFilename) ++ Seq(driverDesc.mainClass) ++ driverDesc.options
- runCommandWithRetry(command, driverDesc.envVars, driverDir)
+
+ // Make sure user application jar is on the classpath
+ // TODO: This could eventually exploit ability for driver to add jars
+ val env = Map(driverDesc.command.environment.toSeq: _*)
+ env("SPARK_CLASSPATH") = env.getOrElse("SPARK_CLASSPATH", "") + s":$localJarFilename"
+ val newCommand = Command(driverDesc.command.mainClass,
+ driverDesc.command.arguments.map(substituteVariables), env)
+
+ val command = CommandUtils.buildCommandSeq(newCommand, driverDesc.mem,
+ sparkHome.getAbsolutePath)
+ runCommandWithRetry(command, env, driverDir)
}
catch {
case e: Exception => exn = Some(e)
@@ -79,26 +90,17 @@ private[spark] class DriverRunner(
}
}
- /** Spawn a thread that will redirect a given stream to a file */
- def redirectStream(in: InputStream, file: File) {
- val out = new FileOutputStream(file, true)
- new Thread("redirect output to " + file) {
- override def run() {
- try {
- Utils.copyStream(in, out, true)
- } catch {
- case e: IOException =>
- logInfo("Redirection to " + file + " closed: " + e.getMessage)
- }
- }
- }.start()
+ /** Replace variables in a command argument passed to us */
+ private def substituteVariables(argument: String): String = argument match {
+ case "{{WORKER_URL}}" => workerUrl
+ case other => other
}
/**
* Creates the working directory for this driver.
* Will throw an exception if there are errors preparing the directory.
*/
- def createWorkingDirectory(): File = {
+ private def createWorkingDirectory(): File = {
val driverDir = new File(workDir, driverId)
if (!driverDir.exists() && !driverDir.mkdirs()) {
throw new IOException("Failed to create directory " + driverDir)
@@ -110,7 +112,7 @@ private[spark] class DriverRunner(
* Download the user jar into the supplied directory and return its local path.
* Will throw an exception if there are errors downloading the jar.
*/
- def downloadUserJar(driverDir: File): String = {
+ private def downloadUserJar(driverDir: File): String = {
val jarPath = new Path(driverDesc.jarUrl)
@@ -136,7 +138,7 @@ private[spark] class DriverRunner(
}
/** Continue launching the supplied command until it exits zero or is killed. */
- def runCommandWithRetry(command: Seq[String], envVars: Seq[(String, String)], baseDir: File) {
+ private def runCommandWithRetry(command: Seq[String], envVars: Map[String, String], baseDir: File) {
// Time to wait between submission retries.
var waitSeconds = 1
var cleanExit = false
@@ -153,13 +155,13 @@ private[spark] class DriverRunner(
// Redirect stdout and stderr to files
val stdout = new File(baseDir, "stdout")
- redirectStream(process.get.getInputStream, stdout)
+ CommandUtils.redirectStream(process.get.getInputStream, stdout)
val stderr = new File(baseDir, "stderr")
val header = "Launch Command: %s\n%s\n\n".format(
command.mkString("\"", "\" \"", "\""), "=" * 40)
Files.write(header, stderr, Charsets.UTF_8)
- redirectStream(process.get.getErrorStream, stderr)
+ CommandUtils.redirectStream(process.get.getErrorStream, stderr)
}
val exitCode = process.get.waitFor()
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
new file mode 100644
index 0000000000..8c13b10c51
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
@@ -0,0 +1,30 @@
+package org.apache.spark.deploy.worker
+
+import akka.actor._
+
+import org.apache.spark.util.{AkkaUtils, Utils}
+
+/**
+ * Utility object for launching driver programs such that they share fate with the Worker process.
+ */
+object DriverWrapper {
+ def main(args: Array[String]) {
+ args.toList match {
+ case workerUrl :: mainClass :: extraArgs =>
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("Driver",
+ Utils.localHostName(), 0)
+ actorSystem.actorOf(Props(classOf[WorkerWatcher], workerUrl), name = "workerWatcher")
+
+ // Delegate to supplied main class
+ val clazz = Class.forName(args(1))
+ val mainMethod = clazz.getMethod("main", classOf[Array[String]])
+ mainMethod.invoke(null, extraArgs.toArray[String])
+
+ actorSystem.awaitTermination()
+
+ case _ =>
+ System.err.println("Usage: DriverWrapper <workerUrl> <driverMainClass> [options]")
+ System.exit(-1)
+ }
+ }
+} \ No newline at end of file
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index fff9cb60c7..2e61d394ea 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -18,17 +18,15 @@
package org.apache.spark.deploy.worker
import java.io._
-import java.lang.System.getenv
import akka.actor.ActorRef
import com.google.common.base.Charsets
import com.google.common.io.Files
-import org.apache.spark.{Logging}
-import org.apache.spark.deploy.{ExecutorState, ApplicationDescription}
+import org.apache.spark.Logging
+import org.apache.spark.deploy.{ExecutorState, ApplicationDescription, Command}
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
-import org.apache.spark.util.Utils
/**
* Manages the execution of one executor process.
@@ -44,16 +42,17 @@ private[spark] class ExecutorRunner(
val host: String,
val sparkHome: File,
val workDir: File,
+ val workerUrl: String,
var state: ExecutorState.Value)
extends Logging {
val fullId = appId + "/" + execId
var workerThread: Thread = null
var process: Process = null
- var shutdownHook: Thread = null
- private def getAppEnv(key: String): Option[String] =
- appDesc.command.environment.get(key).orElse(Option(getenv(key)))
+ // NOTE: This is now redundant with the automated shut-down enforced by the Executor. It mike
+ // make sense to remove this in the future.
+ var shutdownHook: Thread = null
def start() {
workerThread = new Thread("ExecutorRunner for " + fullId) {
@@ -92,57 +91,13 @@ private[spark] class ExecutorRunner(
/** Replace variables such as {{EXECUTOR_ID}} and {{CORES}} in a command argument passed to us */
def substituteVariables(argument: String): String = argument match {
+ case "{{WORKER_URL}}" => workerUrl
case "{{EXECUTOR_ID}}" => execId.toString
case "{{HOSTNAME}}" => host
case "{{CORES}}" => cores.toString
case other => other
}
- def buildCommandSeq(): Seq[String] = {
- val command = appDesc.command
- val runner = getAppEnv("JAVA_HOME").map(_ + "/bin/java").getOrElse("java")
- // SPARK-698: do not call the run.cmd script, as process.destroy()
- // fails to kill a process tree on Windows
- Seq(runner) ++ buildJavaOpts() ++ Seq(command.mainClass) ++
- (command.arguments ++ Seq(appId)).map(substituteVariables)
- }
-
- /**
- * Attention: this must always be aligned with the environment variables in the run scripts and
- * the way the JAVA_OPTS are assembled there.
- */
- def buildJavaOpts(): Seq[String] = {
- val libraryOpts = getAppEnv("SPARK_LIBRARY_PATH")
- .map(p => List("-Djava.library.path=" + p))
- .getOrElse(Nil)
- val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS")).map(Utils.splitCommandString).getOrElse(Nil)
- val userOpts = getAppEnv("SPARK_JAVA_OPTS").map(Utils.splitCommandString).getOrElse(Nil)
- val memoryOpts = Seq("-Xms" + memory + "M", "-Xmx" + memory + "M")
-
- // Figure out our classpath with the external compute-classpath script
- val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh"
- val classPath = Utils.executeAndGetOutput(
- Seq(sparkHome + "/bin/compute-classpath" + ext),
- extraEnvironment=appDesc.command.environment)
-
- Seq("-cp", classPath) ++ libraryOpts ++ workerLocalOpts ++ userOpts ++ memoryOpts
- }
-
- /** Spawn a thread that will redirect a given stream to a file */
- def redirectStream(in: InputStream, file: File) {
- val out = new FileOutputStream(file, true)
- new Thread("redirect output to " + file) {
- override def run() {
- try {
- Utils.copyStream(in, out, true)
- } catch {
- case e: IOException =>
- logInfo("Redirection to " + file + " closed: " + e.getMessage)
- }
- }
- }.start()
- }
-
/**
* Download and run the executor described in our ApplicationDescription
*/
@@ -155,7 +110,9 @@ private[spark] class ExecutorRunner(
}
// Launch the process
- val command = buildCommandSeq()
+ val fullCommand = new Command(appDesc.command.mainClass,
+ appDesc.command.arguments.map(substituteVariables), appDesc.command.environment)
+ val command = CommandUtils.buildCommandSeq(fullCommand, memory, sparkHome.getAbsolutePath)
logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
val builder = new ProcessBuilder(command: _*).directory(executorDir)
val env = builder.environment()
@@ -172,11 +129,11 @@ private[spark] class ExecutorRunner(
// Redirect its stdout and stderr to files
val stdout = new File(executorDir, "stdout")
- redirectStream(process.getInputStream, stdout)
+ CommandUtils.redirectStream(process.getInputStream, stdout)
val stderr = new File(executorDir, "stderr")
Files.write(header, stderr, Charsets.UTF_8)
- redirectStream(process.getErrorStream, stderr)
+ CommandUtils.redirectStream(process.getErrorStream, stderr)
// Wait for it to exit; this is actually a bad thing if it happens, because we expect to run
// long-lived processes only. However, in the future, we might restart the executor a few
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 21ec881f46..4e23e0d1eb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -45,6 +45,8 @@ private[spark] class Worker(
cores: Int,
memory: Int,
masterUrls: Array[String],
+ actorSystemName: String,
+ actorName: String,
workDirPath: String = null)
extends Actor with Logging {
import context.dispatcher
@@ -68,6 +70,7 @@ private[spark] class Worker(
var masterAddress: Address = null
var activeMasterUrl: String = ""
var activeMasterWebUiUrl : String = ""
+ val akkaUrl = "akka.tcp://%s@%s:%s/user/%s".format(actorSystemName, host, port, actorName)
@volatile var registered = false
@volatile var connected = false
val workerId = generateWorkerId()
@@ -190,6 +193,9 @@ private[spark] class Worker(
map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
sender ! WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq)
+ case Heartbeat =>
+ logInfo(s"Received heartbeat from driver ${sender.path}")
+
case RegisterWorkerFailed(message) =>
if (!registered) {
logError("Worker registration failed: " + message)
@@ -202,7 +208,7 @@ private[spark] class Worker(
} else {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
- self, workerId, host, new File(execSparkHome_), workDir, ExecutorState.RUNNING)
+ self, workerId, host, new File(execSparkHome_), workDir, akkaUrl, ExecutorState.RUNNING)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
@@ -244,7 +250,7 @@ private[spark] class Worker(
case LaunchDriver(driverId, driverDesc) => {
logInfo(s"Asked to launch driver $driverId")
- val driver = new DriverRunner(driverId, workDir, driverDesc, self)
+ val driver = new DriverRunner(driverId, workDir, sparkHome, driverDesc, self, akkaUrl)
drivers(driverId) = driver
driver.start()
@@ -322,9 +328,10 @@ private[spark] object Worker {
: (ActorSystem, Int) = {
// The LocalSparkCluster runs multiple local sparkWorkerX actor systems
val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
+ val actorName = "Worker"
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,
- masterUrls, workDir), name = "Worker")
+ masterUrls, systemName, actorName, workDir), name = actorName)
(actorSystem, boundPort)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
new file mode 100644
index 0000000000..e4352f1ce2
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
@@ -0,0 +1,47 @@
+package org.apache.spark.deploy.worker
+
+import akka.actor.{Actor, Address, AddressFromURIString}
+import akka.remote.{AssociatedEvent, AssociationErrorEvent, AssociationEvent, DisassociatedEvent, RemotingLifecycleEvent}
+
+import org.apache.spark.Logging
+import org.apache.spark.deploy.DeployMessages.SendHeartbeat
+
+/**
+ * Actor which connects to a worker process and terminates the JVM if the connection is severed.
+ * Provides fate sharing between a worker and its associated child processes.
+ */
+private[spark] class WorkerWatcher(workerUrl: String) extends Actor with Logging {
+ override def preStart() {
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
+
+ logInfo(s"Connecting to worker $workerUrl")
+ val worker = context.actorSelection(workerUrl)
+ worker ! SendHeartbeat // need to send a message here to initiate connection
+ }
+
+ // Lets us filter events only from the worker actor
+ private val expectedHostPort = AddressFromURIString(workerUrl).hostPort
+ private def isWorker(address: Address) = address.hostPort == expectedHostPort
+
+ override def receive = {
+ case AssociatedEvent(localAddress, remoteAddress, inbound) if isWorker(remoteAddress) =>
+ logInfo(s"Successfully connected to $workerUrl")
+
+ case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound)
+ if isWorker(remoteAddress) =>
+ // These logs may not be seen if the worker (and associated pipe) has died
+ logError(s"Could not initialize connection to worker $workerUrl. Exiting.")
+ logError(s"Error was: $cause")
+ System.exit(-1)
+
+ case DisassociatedEvent(localAddress, remoteAddress, inbound) if isWorker(remoteAddress) =>
+ // This log message will never be seen
+ logError(s"Lost connection to worker actor $workerUrl. Exiting.")
+ System.exit(-1)
+
+ case e: AssociationEvent =>
+ // pass through association events relating to other remote actor systems
+
+ case e => logWarning(s"Received unexpected actor system event: $e")
+ }
+} \ No newline at end of file
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
index 35a15074db..93c6ad49d7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
@@ -133,7 +133,7 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
def driverRow(driver: DriverRunner): Seq[Node] = {
<tr>
<td>{driver.driverId}</td>
- <td>{driver.driverDesc.mainClass}</td>
+ <td>{driver.driverDesc.command.mainClass}</td>
<td sorttable_customkey={driver.driverDesc.cores.toString}>
{driver.driverDesc.cores.toString}
</td>
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index debbdd4c44..eb1199ed57 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -24,8 +24,9 @@ import akka.remote._
import org.apache.spark.Logging
import org.apache.spark.TaskState.TaskState
+import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
-import org.apache.spark.util.{Utils, AkkaUtils}
+import org.apache.spark.util.{AkkaUtils, Utils}
private[spark] class CoarseGrainedExecutorBackend(
driverUrl: String,
@@ -91,7 +92,8 @@ private[spark] class CoarseGrainedExecutorBackend(
}
private[spark] object CoarseGrainedExecutorBackend {
- def run(driverUrl: String, executorId: String, hostname: String, cores: Int) {
+ def run(driverUrl: String, executorId: String, hostname: String, cores: Int,
+ workerUrl: Option[String]) {
// Debug code
Utils.checkHost(hostname)
@@ -105,17 +107,24 @@ private[spark] object CoarseGrainedExecutorBackend {
actorSystem.actorOf(
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores),
name = "Executor")
+ workerUrl.foreach{ url =>
+ actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")
+ }
actorSystem.awaitTermination()
}
def main(args: Array[String]) {
- if (args.length < 4) {
- //the reason we allow the last appid argument is to make it easy to kill rogue executors
- System.err.println(
- "Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> <cores> " +
- "[<appid>]")
- System.exit(1)
+ args.length match {
+ case x if x < 4 =>
+ System.err.println(
+ // Worker url is used in spark standalone mode to enforce fate-sharing with worker
+ "Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> " +
+ "<cores> [<workerUrl>]")
+ System.exit(1)
+ case 4 =>
+ run(args(0), args(1), args(2), args(3).toInt, None)
+ case x if x > 4 =>
+ run(args(0), args(1), args(2), args(3).toInt, Some(args(4)))
}
- run(args(0), args(1), args(2), args(3).toInt)
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 4da49c0735..921b887a89 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -47,7 +47,7 @@ private[spark] class SparkDeploySchedulerBackend(
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
- val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
+ val args = Seq(driverUrl, "{{WORKER_URL}}", "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
val command = Command(
"org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs)
val sparkHome = sc.getSparkHome().getOrElse(null)