From f236ddd1a245a587d5ee331fb67cf41456ed383c Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Thu, 2 Jan 2014 18:10:37 -0800 Subject: Changes based on review feedback. --- .../apache/spark/deploy/client/DriverClient.scala | 2 +- .../deploy/client/DriverClientArguments.scala | 26 +++++++++++++--------- .../apache/spark/deploy/worker/CommandUtils.scala | 2 +- .../apache/spark/deploy/worker/DriverRunner.scala | 15 +++++++++---- .../apache/spark/deploy/worker/DriverWrapper.scala | 2 +- .../org/apache/spark/deploy/worker/Worker.scala | 7 +++--- .../cluster/SparkDeploySchedulerBackend.scala | 2 +- 7 files changed, 34 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala index 8a4cdf07bb..e319e75bae 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala @@ -62,7 +62,7 @@ object DriverClient extends Logging { // TODO: See if we can initialize akka so return messages are sent back using the same TCP // flow. Else, this (sadly) requires the DriverClient be routable from the Master. - val (actorSystem, boundPort) = AkkaUtils.createActorSystem( + val (actorSystem, _) = AkkaUtils.createActorSystem( "driverClient", Utils.localHostName(), 0) val master = driverArgs.master val response = promise[(Boolean, String)] diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala index 6a15422c6c..d9e1c8a1b0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala @@ -23,6 +23,9 @@ import scala.collection.mutable.ListBuffer * Command-line parser for the driver client. */ private[spark] class DriverClientArguments(args: Array[String]) { + val defaultCores = 1 + val defaultMemory = 512 + var cmd: String = "" // 'launch' or 'kill' // launch parameters @@ -30,8 +33,8 @@ private[spark] class DriverClientArguments(args: Array[String]) { var jarUrl: String = "" var mainClass: String = "" var supervise: Boolean = false - var memory: Int = 512 - var cores: Int = 1 + var memory: Int = defaultMemory + var cores: Int = defaultCores private var _driverOptions = ListBuffer[String]() def driverOptions = _driverOptions.toSeq @@ -78,14 +81,17 @@ private[spark] class DriverClientArguments(args: Array[String]) { def printUsageAndExit(exitCode: Int) { // TODO: It wouldn't be too hard to allow users to submit their app and dependency jars // separately similar to in the YARN client. - System.err.println( - "usage: DriverClient [options] launch " + - "[driver options]\n" + - "usage: DriverClient kill \n\n" + - "Options:\n" + - " -c CORES, --cores CORES Number of cores to request \n" + - " -m MEMORY, --memory MEMORY Megabytes of memory to request\n" + - " -s, --supervise Whether to restart the driver on failure\n") + val usage = + s""" + |Usage: DriverClient [options] launch [driver options] + |Usage: DriverClient kill + | + |Options: + | -c CORES, --cores CORES Number of cores to request (default: $defaultCores) + | -m MEMORY, --memory MEMORY Megabytes of memory to request (default: $defaultMemory) + | -s, --supervise Whether to restart the driver on failure + """.stripMargin + System.err.println(usage) System.exit(exitCode) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala index 785aecf1fe..7507bf8ad0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala @@ -21,7 +21,7 @@ object CommandUtils extends Logging { } private def getEnv(key: String, command: Command): Option[String] = - command.environment.get(key).orElse(Option(getenv(key))) + command.environment.get(key).orElse(Option(System.getenv(key))) /** * Attention: this must always be aligned with the environment variables in the run scripts and diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index e8ae2d302b..f726089faa 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -119,15 +119,14 @@ private[spark] class DriverRunner( val emptyConf = new Configuration() // TODO: In docs explain it needs to be full HDFS path val jarFileSystem = jarPath.getFileSystem(emptyConf) - val destPath = new Path(driverDir.getAbsolutePath()) - val destFileSystem = destPath.getFileSystem(emptyConf) + val destPath = new File(driverDir.getAbsolutePath, jarPath.getName) val jarFileName = jarPath.getName val localJarFile = new File(driverDir, jarFileName) val localJarFilename = localJarFile.getAbsolutePath if (!localJarFile.exists()) { // May already exist if running multiple workers on one node logInfo(s"Copying user jar $jarPath to $destPath") - FileUtil.copy(jarFileSystem, jarPath, destFileSystem, destPath, false, false, emptyConf) + FileUtil.copy(jarFileSystem, jarPath, destPath, false, emptyConf) } if (!localJarFile.exists()) { // Verify copy succeeded @@ -140,8 +139,12 @@ private[spark] class DriverRunner( /** Launch the supplied command. */ private def runCommand(command: Seq[String], envVars: Map[String, String], baseDir: File, supervise: Boolean) { + // Time to wait between submission retries. var waitSeconds = 1 + // A run of this many seconds resets the exponential back-off. + val successfulRunDuration = 1 + var keepTrying = !killed while (keepTrying) { @@ -161,11 +164,15 @@ private[spark] class DriverRunner( val stderr = new File(baseDir, "stderr") val header = "Launch Command: %s\n%s\n\n".format( command.mkString("\"", "\" \"", "\""), "=" * 40) - Files.write(header, stderr, Charsets.UTF_8) + Files.append(header, stderr, Charsets.UTF_8) CommandUtils.redirectStream(process.get.getErrorStream, stderr) } + val processStart = System.currentTimeMillis() val exitCode = process.get.waitFor() + if (System.currentTimeMillis() - processStart > successfulRunDuration * 1000) { + waitSeconds = 1 + } if (supervise && exitCode != 0 && !killed) { waitSeconds = waitSeconds * 2 // exponential back-off diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala index 8c13b10c51..2deb21aac6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala @@ -11,7 +11,7 @@ object DriverWrapper { def main(args: Array[String]) { args.toList match { case workerUrl :: mainClass :: extraArgs => - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("Driver", + val (actorSystem, _) = AkkaUtils.createActorSystem("Driver", Utils.localHostName(), 0) actorSystem.actorOf(Props(classOf[WorkerWatcher], workerUrl), name = "workerWatcher") diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 4e23e0d1eb..2947ed1692 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -260,8 +260,8 @@ private[spark] class Worker( case KillDriver(driverId) => { logInfo(s"Asked to kill driver $driverId") - drivers.find(_._1 == driverId) match { - case Some((id, runner)) => + drivers.get(driverId) match { + case Some(runner) => runner.kill() case None => logError(s"Asked to kill unknown driver $driverId") @@ -280,8 +280,7 @@ private[spark] class Worker( masterLock.synchronized { master ! DriverStateChanged(driverId, state, exception) } - val driver = drivers(driverId) - drivers -= driverId + val driver = drivers.remove(driverId).get finishedDrivers(driverId) = driver memoryUsed -= driver.driverDesc.mem coresUsed -= driver.driverDesc.cores diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 921b887a89..0615f7b565 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -47,7 +47,7 @@ private[spark] class SparkDeploySchedulerBackend( val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) - val args = Seq(driverUrl, "{{WORKER_URL}}", "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}") + val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}") val command = Command( "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs) val sparkHome = sc.getSparkHome().getOrElse(null) -- cgit v1.2.3