aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-01-02 18:10:37 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-01-06 17:15:52 -0800
commitf236ddd1a245a587d5ee331fb67cf41456ed383c (patch)
tree5740f1e6224783f7b27b42d663f5f8aa14dc0594
parent7a99702ce2fb04c4d76f0ce9f6df6608e0a5cce1 (diff)
downloadspark-f236ddd1a245a587d5ee331fb67cf41456ed383c.tar.gz
spark-f236ddd1a245a587d5ee331fb67cf41456ed383c.tar.bz2
spark-f236ddd1a245a587d5ee331fb67cf41456ed383c.zip
Changes based on review feedback.
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala26
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala2
7 files changed, 34 insertions, 22 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
index 8a4cdf07bb..e319e75bae 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
@@ -62,7 +62,7 @@ object DriverClient extends Logging {
// TODO: See if we can initialize akka so return messages are sent back using the same TCP
// flow. Else, this (sadly) requires the DriverClient be routable from the Master.
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
+ val (actorSystem, _) = AkkaUtils.createActorSystem(
"driverClient", Utils.localHostName(), 0)
val master = driverArgs.master
val response = promise[(Boolean, String)]
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala
index 6a15422c6c..d9e1c8a1b0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala
@@ -23,6 +23,9 @@ import scala.collection.mutable.ListBuffer
* Command-line parser for the driver client.
*/
private[spark] class DriverClientArguments(args: Array[String]) {
+ val defaultCores = 1
+ val defaultMemory = 512
+
var cmd: String = "" // 'launch' or 'kill'
// launch parameters
@@ -30,8 +33,8 @@ private[spark] class DriverClientArguments(args: Array[String]) {
var jarUrl: String = ""
var mainClass: String = ""
var supervise: Boolean = false
- var memory: Int = 512
- var cores: Int = 1
+ var memory: Int = defaultMemory
+ var cores: Int = defaultCores
private var _driverOptions = ListBuffer[String]()
def driverOptions = _driverOptions.toSeq
@@ -78,14 +81,17 @@ private[spark] class DriverClientArguments(args: Array[String]) {
def printUsageAndExit(exitCode: Int) {
// TODO: It wouldn't be too hard to allow users to submit their app and dependency jars
// separately similar to in the YARN client.
- System.err.println(
- "usage: DriverClient [options] launch <active-master> <jar-url> <main-class> " +
- "[driver options]\n" +
- "usage: DriverClient kill <active-master> <driver-id>\n\n" +
- "Options:\n" +
- " -c CORES, --cores CORES Number of cores to request \n" +
- " -m MEMORY, --memory MEMORY Megabytes of memory to request\n" +
- " -s, --supervise Whether to restart the driver on failure\n")
+ val usage =
+ s"""
+ |Usage: DriverClient [options] launch <active-master> <jar-url> <main-class> [driver options]
+ |Usage: DriverClient kill <active-master> <driver-id>
+ |
+ |Options:
+ | -c CORES, --cores CORES Number of cores to request (default: $defaultCores)
+ | -m MEMORY, --memory MEMORY Megabytes of memory to request (default: $defaultMemory)
+ | -s, --supervise Whether to restart the driver on failure
+ """.stripMargin
+ System.err.println(usage)
System.exit(exitCode)
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
index 785aecf1fe..7507bf8ad0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
@@ -21,7 +21,7 @@ object CommandUtils extends Logging {
}
private def getEnv(key: String, command: Command): Option[String] =
- command.environment.get(key).orElse(Option(getenv(key)))
+ command.environment.get(key).orElse(Option(System.getenv(key)))
/**
* Attention: this must always be aligned with the environment variables in the run scripts and
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index e8ae2d302b..f726089faa 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -119,15 +119,14 @@ private[spark] class DriverRunner(
val emptyConf = new Configuration() // TODO: In docs explain it needs to be full HDFS path
val jarFileSystem = jarPath.getFileSystem(emptyConf)
- val destPath = new Path(driverDir.getAbsolutePath())
- val destFileSystem = destPath.getFileSystem(emptyConf)
+ val destPath = new File(driverDir.getAbsolutePath, jarPath.getName)
val jarFileName = jarPath.getName
val localJarFile = new File(driverDir, jarFileName)
val localJarFilename = localJarFile.getAbsolutePath
if (!localJarFile.exists()) { // May already exist if running multiple workers on one node
logInfo(s"Copying user jar $jarPath to $destPath")
- FileUtil.copy(jarFileSystem, jarPath, destFileSystem, destPath, false, false, emptyConf)
+ FileUtil.copy(jarFileSystem, jarPath, destPath, false, emptyConf)
}
if (!localJarFile.exists()) { // Verify copy succeeded
@@ -140,8 +139,12 @@ private[spark] class DriverRunner(
/** Launch the supplied command. */
private def runCommand(command: Seq[String], envVars: Map[String, String], baseDir: File,
supervise: Boolean) {
+
// Time to wait between submission retries.
var waitSeconds = 1
+ // A run of this many seconds resets the exponential back-off.
+ val successfulRunDuration = 1
+
var keepTrying = !killed
while (keepTrying) {
@@ -161,11 +164,15 @@ private[spark] class DriverRunner(
val stderr = new File(baseDir, "stderr")
val header = "Launch Command: %s\n%s\n\n".format(
command.mkString("\"", "\" \"", "\""), "=" * 40)
- Files.write(header, stderr, Charsets.UTF_8)
+ Files.append(header, stderr, Charsets.UTF_8)
CommandUtils.redirectStream(process.get.getErrorStream, stderr)
}
+ val processStart = System.currentTimeMillis()
val exitCode = process.get.waitFor()
+ if (System.currentTimeMillis() - processStart > successfulRunDuration * 1000) {
+ waitSeconds = 1
+ }
if (supervise && exitCode != 0 && !killed) {
waitSeconds = waitSeconds * 2 // exponential back-off
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
index 8c13b10c51..2deb21aac6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
@@ -11,7 +11,7 @@ object DriverWrapper {
def main(args: Array[String]) {
args.toList match {
case workerUrl :: mainClass :: extraArgs =>
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem("Driver",
+ val (actorSystem, _) = AkkaUtils.createActorSystem("Driver",
Utils.localHostName(), 0)
actorSystem.actorOf(Props(classOf[WorkerWatcher], workerUrl), name = "workerWatcher")
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 4e23e0d1eb..2947ed1692 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -260,8 +260,8 @@ private[spark] class Worker(
case KillDriver(driverId) => {
logInfo(s"Asked to kill driver $driverId")
- drivers.find(_._1 == driverId) match {
- case Some((id, runner)) =>
+ drivers.get(driverId) match {
+ case Some(runner) =>
runner.kill()
case None =>
logError(s"Asked to kill unknown driver $driverId")
@@ -280,8 +280,7 @@ private[spark] class Worker(
masterLock.synchronized {
master ! DriverStateChanged(driverId, state, exception)
}
- val driver = drivers(driverId)
- drivers -= driverId
+ val driver = drivers.remove(driverId).get
finishedDrivers(driverId) = driver
memoryUsed -= driver.driverDesc.mem
coresUsed -= driver.driverDesc.cores
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 921b887a89..0615f7b565 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -47,7 +47,7 @@ private[spark] class SparkDeploySchedulerBackend(
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
- val args = Seq(driverUrl, "{{WORKER_URL}}", "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
+ val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}")
val command = Command(
"org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs)
val sparkHome = sc.getSparkHome().getOrElse(null)