diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-12-25 00:54:34 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-12-25 01:19:25 -0800 |
commit | c9c0f745afcf00c17fa073e4ca6dd9433400be95 (patch) | |
tree | c868a10213451ce3b37065171a6bd583d9861ad3 | |
parent | b2b7514ba31be9c18f107ed466849e6360ded7c6 (diff) | |
download | spark-c9c0f745afcf00c17fa073e4ca6dd9433400be95.tar.gz spark-c9c0f745afcf00c17fa073e4ca6dd9433400be95.tar.bz2 spark-c9c0f745afcf00c17fa073e4ca6dd9433400be95.zip |
Minor style clean-up
6 files changed, 17 insertions, 18 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala index 9c0a626204..28c851bcfe 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala @@ -101,6 +101,8 @@ object DriverClient { def main(args: Array[String]) { val driverArgs = new DriverClientArguments(args) + // TODO: See if we can initialize akka so return messages are sent back using the same TCP + // flow. Else, this (sadly) requires the DriverClient be routable from the Master. val (actorSystem, boundPort) = AkkaUtils.createActorSystem( "driverClient", Utils.localHostName(), 0) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index f5d6fdab5f..0528ef43a1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -185,7 +185,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act schedule() // TODO: It might be good to instead have the submission client poll the master to determine - // the current status of the driver. Since we may already want to expose this. + // the current status of the driver. For now it's simply "fire and forget". sender ! SubmitDriverResponse(true, "Driver successfully submitted") } @@ -611,7 +611,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } } - /** Generate a new driver ID given a driver's submission date */ def newDriverId(submitDate: Date): String = { val appId = "driver-%s-%04d".format(DATE_FORMAT.format(submitDate), nextDriverNumber) nextDriverNumber += 1 diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala index 6a99d7ac02..3c6fca3780 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala @@ -106,20 +106,20 @@ private[spark] class IndexPage(parent: MasterWebUI) { </div> </div> - <div class="row-fluid"> - <div class="span12"> - <h4> Active Drivers </h4> + <div class="row-fluid"> + <div class="span12"> + <h4> Active Drivers </h4> - {activeDriversTable} - </div> + {activeDriversTable} </div> + </div> - <div class="row-fluid"> - <div class="span12"> - <h4> Completed Drivers </h4> - {completedDriversTable} - </div> - </div>; + <div class="row-fluid"> + <div class="span12"> + <h4> Completed Drivers </h4> + {completedDriversTable} + </div> + </div>; UIUtils.basicSparkPage(content, "Spark Master at " + state.uri) } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index b030d6041a..28d4297299 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -32,7 +32,7 @@ import org.apache.spark.deploy.master.DriverState import org.apache.spark.util.Utils /** - * Manages the execution of one driver process. + * Manages the execution of one driver, including automatically restarting the driver on failure. */ private[spark] class DriverRunner( val driverId: String, @@ -133,7 +133,7 @@ private[spark] class DriverRunner( localJarFilename } - /** Continue launching the supplied command until it exits zero. */ + /** Continue launching the supplied command until it exits zero or is killed. */ def runCommandWithRetry(command: Seq[String], envVars: Seq[(String, String)], baseDir: File) = { // Time to wait between submission retries. var waitSeconds = 1 diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index b6a84fc371..42c28cf22d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -254,17 +254,14 @@ private[spark] class Worker( case KillDriver(driverId) => { logInfo(s"Asked to kill driver $driverId") - drivers.find(_._1 == driverId) match { case Some((id, runner)) => runner.kill() case None => logError(s"Asked to kill unknown driver $driverId") } - } - case DriverStateChanged(driverId, state, exception) => { state match { case DriverState.FAILED => diff --git a/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala b/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala index 9055ce7a39..65251e9319 100644 --- a/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala @@ -39,6 +39,7 @@ object DriverSubmissionTest { properties.filter{case (k, v) => k.toString.contains("spark.test")}.foreach(println) for (i <- 1 until numSecondsToSleep) { + println(s"Alive for $i out of $numSecondsToSleep seconds") Thread.sleep(1000) } } |