aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala3
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala1
6 files changed, 17 insertions, 18 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
index 9c0a626204..28c851bcfe 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
@@ -101,6 +101,8 @@ object DriverClient {
def main(args: Array[String]) {
val driverArgs = new DriverClientArguments(args)
+ // TODO: See if we can initialize akka so return messages are sent back using the same TCP
+ // flow. Else, this (sadly) requires the DriverClient be routable from the Master.
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
"driverClient", Utils.localHostName(), 0)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index f5d6fdab5f..0528ef43a1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -185,7 +185,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
schedule()
// TODO: It might be good to instead have the submission client poll the master to determine
- // the current status of the driver. Since we may already want to expose this.
+ // the current status of the driver. For now it's simply "fire and forget".
sender ! SubmitDriverResponse(true, "Driver successfully submitted")
}
@@ -611,7 +611,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
}
- /** Generate a new driver ID given a driver's submission date */
def newDriverId(submitDate: Date): String = {
val appId = "driver-%s-%04d".format(DATE_FORMAT.format(submitDate), nextDriverNumber)
nextDriverNumber += 1
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
index 6a99d7ac02..3c6fca3780 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
@@ -106,20 +106,20 @@ private[spark] class IndexPage(parent: MasterWebUI) {
</div>
</div>
- <div class="row-fluid">
- <div class="span12">
- <h4> Active Drivers </h4>
+ <div class="row-fluid">
+ <div class="span12">
+ <h4> Active Drivers </h4>
- {activeDriversTable}
- </div>
+ {activeDriversTable}
</div>
+ </div>
- <div class="row-fluid">
- <div class="span12">
- <h4> Completed Drivers </h4>
- {completedDriversTable}
- </div>
- </div>;
+ <div class="row-fluid">
+ <div class="span12">
+ <h4> Completed Drivers </h4>
+ {completedDriversTable}
+ </div>
+ </div>;
UIUtils.basicSparkPage(content, "Spark Master at " + state.uri)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index b030d6041a..28d4297299 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -32,7 +32,7 @@ import org.apache.spark.deploy.master.DriverState
import org.apache.spark.util.Utils
/**
- * Manages the execution of one driver process.
+ * Manages the execution of one driver, including automatically restarting the driver on failure.
*/
private[spark] class DriverRunner(
val driverId: String,
@@ -133,7 +133,7 @@ private[spark] class DriverRunner(
localJarFilename
}
- /** Continue launching the supplied command until it exits zero. */
+ /** Continue launching the supplied command until it exits zero or is killed. */
def runCommandWithRetry(command: Seq[String], envVars: Seq[(String, String)], baseDir: File) = {
// Time to wait between submission retries.
var waitSeconds = 1
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index b6a84fc371..42c28cf22d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -254,17 +254,14 @@ private[spark] class Worker(
case KillDriver(driverId) => {
logInfo(s"Asked to kill driver $driverId")
-
drivers.find(_._1 == driverId) match {
case Some((id, runner)) =>
runner.kill()
case None =>
logError(s"Asked to kill unknown driver $driverId")
}
-
}
-
case DriverStateChanged(driverId, state, exception) => {
state match {
case DriverState.FAILED =>
diff --git a/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala b/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala
index 9055ce7a39..65251e9319 100644
--- a/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala
@@ -39,6 +39,7 @@ object DriverSubmissionTest {
properties.filter{case (k, v) => k.toString.contains("spark.test")}.foreach(println)
for (i <- 1 until numSecondsToSleep) {
+ println(s"Alive for $i out of $numSecondsToSleep seconds")
Thread.sleep(1000)
}
}