aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-12-24 23:20:34 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-12-25 01:19:01 -0800
commit760823d3937822ea4a6d6f476815442711c605fa (patch)
tree60267167282344a82717188f1c9bd1209434e791 /core/src/main/scala/org/apache
parent6a4acc4c2d5c510cc76049dd8727cec76a2173e8 (diff)
downloadspark-760823d3937822ea4a6d6f476815442711c605fa.tar.gz
spark-760823d3937822ea4a6d6f476815442711c605fa.tar.bz2
spark-760823d3937822ea4a6d6f476815442711c605fa.zip
Adding better option parsing
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala35
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala97
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala25
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala4
9 files changed, 142 insertions, 42 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 67435261e4..332c7e8bbf 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -84,7 +84,7 @@ private[deploy] object DeployMessages {
sparkHome: String)
extends DeployMessage
- case class LaunchDriver(driverId: String, jarUrl: String, mainClass: String, mem: Int)
+ case class LaunchDriver(driverId: String, driverDesc: DriverDescription)
extends DeployMessage
case class KillDriver(driverId: String) extends DeployMessage
diff --git a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala
index 52f6b1b554..32ff6db8f0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala
@@ -20,7 +20,11 @@ package org.apache.spark.deploy
private[spark] class DriverDescription(
val jarUrl: String,
val mainClass: String,
- val mem: Integer) // TODO: Should this be Long?
+ val mem: Int,
+ val cores: Int,
+ val options: Seq[String],
+ val javaOptions: Seq[String],
+ val envVars: Seq[(String, String)])
extends Serializable {
override def toString: String = s"DriverDescription ($mainClass)"
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
index 482bafd0e0..dd62172103 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
@@ -34,7 +34,7 @@ import scala.concurrent.Await
import akka.actor.Actor.emptyBehavior
/**
- * Parent class for actors that to send a single message to the standalone master and then die.
+ * Actor that sends a single message to the standalone master and then shuts down.
*/
private[spark] abstract class SingleMessageClient(
actorSystem: ActorSystem, master: String, message: DeployMessage)
@@ -94,34 +94,31 @@ private[spark] class TerminationClient(actorSystem: ActorSystem, master: String,
}
/**
- * Callable utility for starting and terminating drivers inside of the standalone scheduler.
+ * Executable utility for starting and terminating drivers inside of a standalone cluster.
*/
object DriverClient {
def main(args: Array[String]) {
- if (args.size < 3) {
- println("usage: DriverClient launch <active-master> <jar-url> <main-class>")
- println("usage: DriverClient kill <active-master> <driver-id>")
- System.exit(-1)
- }
+ val driverArgs = new DriverClientArguments(args)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
- "driverSubmission", Utils.localHostName(), 0)
-
- // TODO Should be configurable
- val mem = 512
+ "driverClient", Utils.localHostName(), 0)
- args(0) match {
+ driverArgs.cmd match {
case "launch" =>
- val master = args(1)
- val jarUrl = args(2)
- val mainClass = args(3)
- val driverDescription = new DriverDescription(jarUrl, mainClass, mem)
- val client = new SubmissionClient(actorSystem, master, driverDescription)
+ val driverDescription = new DriverDescription(
+ driverArgs.jarUrl,
+ driverArgs.mainClass,
+ driverArgs.memory,
+ driverArgs.cores,
+ driverArgs.driverOptions,
+ driverArgs.driverJavaOptions,
+ driverArgs.driverEnvVars)
+ val client = new SubmissionClient(actorSystem, driverArgs.master, driverDescription)
case "kill" =>
- val master = args(1)
- val driverId = args(2)
+ val master = driverArgs.master
+ val driverId = driverArgs.driverId
val client = new TerminationClient(actorSystem, master, driverId)
}
actorSystem.awaitTermination()
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala
new file mode 100644
index 0000000000..618467ce8c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.client
+
+import scala.collection.mutable.ListBuffer
+
+/**
+ * Command-line parser for the driver client.
+ */
+private[spark] class DriverClientArguments(args: Array[String]) {
+ var cmd: String = "" // 'launch' or 'kill'
+
+ // launch parameters
+ var master: String = ""
+ var jarUrl: String = ""
+ var mainClass: String = ""
+ var memory: Int = 512
+ var cores: Int = 1
+ private var _driverOptions = ListBuffer[String]()
+ private var _driverJavaOptions = ListBuffer[String]()
+ private var _driverEnvVars = ListBuffer[(String, String)]()
+ def driverOptions = _driverOptions.toSeq
+ def driverJavaOptions = _driverJavaOptions.toSeq
+ def driverEnvVars = _driverEnvVars.toSeq
+
+ // kill parameters
+ var driverId: String = ""
+
+ parse(args.toList)
+
+ def parse(args: List[String]): Unit = args match {
+ case ("--cores" | "-c") :: value :: tail =>
+ cores = value.toInt
+ parse(tail)
+
+ case ("--memory" | "-m") :: value :: tail =>
+ memory = value.toInt
+ parse(tail)
+
+ case ("--java-option" | "-j") :: value :: tail =>
+ _driverJavaOptions += value
+ parse(tail)
+
+ case ("--environment-variable" | "-e") :: value :: tail =>
+ val parts = value.split("=")
+ _driverEnvVars += ((parts(0), parts(1)))
+
+ case ("--help" | "-h") :: tail =>
+ printUsageAndExit(0)
+
+ case "launch" :: _master :: _jarUrl :: _mainClass :: tail =>
+ cmd = "launch"
+ master = _master
+ jarUrl = _jarUrl
+ mainClass = _mainClass
+ _driverOptions ++= tail
+
+ case "kill" :: _master :: _driverId :: tail =>
+ cmd = "kill"
+ master = _master
+ driverId = _driverId
+
+ case _ =>
+ printUsageAndExit(1)
+ }
+
+ /**
+ * Print usage and exit JVM with the given exit code.
+ */
+ def printUsageAndExit(exitCode: Int) {
+ System.err.println(
+ "usage: DriverClient launch [options] <active-master> <jar-url> <main-class> " +
+ "[driver options]\n" +
+ "usage: DriverClient kill <active-master> <driver-id>\n\n" +
+ "Options:\n" +
+ " -c CORES, --cores CORES Number of cores to request \n" +
+ " -m MEMORY, --memory MEMORY Megabytes of memory to request\n" +
+ " -j JAVA_OPT, --java-option JAVA_OPT Java option to pass to driver\n" +
+ " -e K=V, --environment-variable K=V Environment variable to pass to driver\n")
+ System.exit(exitCode)
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 76af332986..9bfacfc999 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -627,8 +627,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
logInfo("Launching driver " + driver.id + " on worker " + worker.id)
worker.addDriver(driver)
driver.worker = Some(worker)
- worker.actor ! LaunchDriver(driver.id, driver.desc.jarUrl, driver.desc.mainClass,
- driver.desc.mem)
+ worker.actor ! LaunchDriver(driver.id, driver.desc)
driver.state = DriverState.RUNNING
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index fccc36b660..41a089ad33 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -29,16 +29,15 @@ import org.apache.hadoop.conf.Configuration
import akka.actor.{ActorRef, ActorSelection}
import org.apache.spark.deploy.DeployMessages.DriverStateChanged
import org.apache.spark.deploy.master.DriverState
+import org.apache.spark.deploy.DriverDescription
/**
* Manages the execution of one driver process.
*/
private[spark] class DriverRunner(
val driverId: String,
- val jarUrl: String,
- val mainClass: String,
val workDir: File,
- val memory: Int,
+ val driverDesc: DriverDescription,
val worker: ActorRef)
extends Logging {
@@ -54,8 +53,9 @@ private[spark] class DriverRunner(
try {
val driverDir = createWorkingDirectory()
val localJarFilename = downloadUserJar(driverDir)
- val command = Seq("java", "-cp", localJarFilename, mainClass)
- runCommandWithRetry(command, driverDir)
+ val command = Seq("java") ++ driverDesc.javaOptions ++ Seq("-cp", localJarFilename) ++
+ Seq(driverDesc.mainClass) ++ driverDesc.options
+ runCommandWithRetry(command, driverDesc.envVars, driverDir)
}
catch {
case e: Exception => exn = Some(e)
@@ -110,7 +110,7 @@ private[spark] class DriverRunner(
*/
def downloadUserJar(driverDir: File): String = {
- val jarPath = new Path(jarUrl)
+ val jarPath = new Path(driverDesc.jarUrl)
val emptyConf = new Configuration() // TODO: In docs explain it needs to be full HDFS path
val jarFileSystem = jarPath.getFileSystem(emptyConf)
@@ -134,17 +134,17 @@ private[spark] class DriverRunner(
}
/** Continue launching the supplied command until it exits zero. */
- def runCommandWithRetry(command: Seq[String], baseDir: File) = {
- /* Time to wait between submission retries. */
+ def runCommandWithRetry(command: Seq[String], envVars: Seq[(String, String)], baseDir: File) = {
+ // Time to wait between submission retries.
var waitSeconds = 1
- // TODO: We should distinguish between "immediate" exits and cases where it was running
- // for a long time and then exits.
var cleanExit = false
while (!cleanExit && !killed) {
Thread.sleep(waitSeconds * 1000)
+
+ logInfo("Launch Command: " + command.mkString("\"", "\" \"", "\""))
val builder = new ProcessBuilder(command: _*).directory(baseDir)
- logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
+ envVars.map{ case(k,v) => builder.environment().put(k, v) }
process = Some(builder.start())
@@ -153,12 +153,11 @@ private[spark] class DriverRunner(
redirectStream(process.get.getInputStream, stdout)
val stderr = new File(baseDir, "stderr")
- val header = "Driver Command: %s\n%s\n\n".format(
+ val header = "Launch Command: %s\n%s\n\n".format(
command.mkString("\"", "\" \"", "\""), "=" * 40)
Files.write(header, stderr, Charsets.UTF_8)
redirectStream(process.get.getErrorStream, stderr)
-
val exitCode =
/* There is a race here I've elected to ignore for now because it's very unlikely and not
* simple to fix. This could see `killed=false` then the main thread gets a kill request
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index a2b491a72f..dd6783a344 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -242,9 +242,9 @@ private[spark] class Worker(
}
}
- case LaunchDriver(driverId, jarUrl, mainClass, memory) => {
+ case LaunchDriver(driverId, driverDesc) => {
logInfo(s"Asked to launch driver $driverId")
- val driver = new DriverRunner(driverId, jarUrl, mainClass, workDir, memory, self)
+ val driver = new DriverRunner(driverId, workDir, driverDesc, self)
drivers(driverId) = driver
driver.start()
@@ -278,7 +278,7 @@ private[spark] class Worker(
master ! DriverStateChanged(driverId, state, exception)
}
val driver = drivers(driverId)
- memoryUsed -= driver.memory
+ memoryUsed -= driver.driverDesc.mem
coresUsed -= 1
drivers -= driverId
finishedDrivers(driverId) = driver
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
index e233b82585..2c37b7184d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
@@ -137,9 +137,9 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
def driverRow(driver: DriverRunner): Seq[Node] = {
<tr>
<td>{driver.driverId}</td>
- <td>{driver.mainClass}</td>
- <td sorttable_customkey={driver.memory.toString}>
- {Utils.megabytesToString(driver.memory)}
+ <td>{driver.driverDesc.mainClass}</td>
+ <td sorttable_customkey={driver.driverDesc.mem.toString}>
+ {Utils.megabytesToString(driver.driverDesc.mem)}
</td>
<td>
<a href={s"logPage?driverId=${driver.driverId}&logType=stdout"}>stdout</a>
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index d128e58797..2fd862c4c4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -82,6 +82,8 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
s"${workDir.getPath}/$appId/$executorId/$logType"
case (None, None, Some(d)) =>
s"${workDir.getPath}/$driverId/$logType"
+ case _ =>
+ throw new Exception("Request must specify either application or driver identifiers")
}
val (startByte, endByte) = getByteRange(path, offset, byteLength)
@@ -106,6 +108,8 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
(s"${workDir.getPath}/$a/$e/$logType", s"appId=$a&executorId=$e")
case (None, None, Some(d)) =>
(s"${workDir.getPath}/$d/$logType", s"driverId=$d")
+ case _ =>
+ throw new Exception("Request must specify either application or driver identifiers")
}
val (startByte, endByte) = getByteRange(path, offset, byteLength)