aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-12-26 14:14:49 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-12-26 14:39:39 -0800
commit5c1b4f64052e8fae0d942def4d6085a971faee4e (patch)
treec0b6ba7c5a4e9209b954b8242cc5852b95f772b7 /core/src/main/scala/org/apache
parentc23d640516e05a32f1380cdd3d35bf948c92cd60 (diff)
downloadspark-5c1b4f64052e8fae0d942def4d6085a971faee4e.tar.gz
spark-5c1b4f64052e8fae0d942def4d6085a971faee4e.tar.bz2
spark-5c1b4f64052e8fae0d942def4d6085a971faee4e.zip
Minor fixes
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala36
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala8
3 files changed, 25 insertions, 20 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
index d2f3c092fb..8f19294849 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
@@ -90,7 +90,6 @@ object DriverClient extends Logging {
case e: TimeoutException => (false, s"Master $master failed to respond in time")
}
if (success) logInfo(message) else logError(message)
- actorSystem.stop(driver)
actorSystem.shutdown()
actorSystem.awaitTermination()
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 7f9ad8a7ef..a0db2a23be 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -37,6 +37,7 @@ import org.apache.spark.deploy.master.MasterMessages._
import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{AkkaUtils, Utils}
+import org.apache.spark.deploy.master.DriverState.DriverState
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
import context.dispatcher
@@ -268,21 +269,11 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
case DriverStateChanged(driverId, state, exception) => {
- if (!(state == DriverState.FAILED || state == DriverState.FINISHED ||
- state == DriverState.KILLED)) {
- throw new Exception(s"Received unexpected state update for driver $driverId: $state")
- }
- drivers.find(_.id == driverId) match {
- case Some(driver) => {
- drivers -= driver
- completedDrivers += driver
- persistenceEngine.removeDriver(driver)
- driver.state = state
- driver.exception = exception
- driver.worker.foreach(w => w.removeDriver(driver))
- }
- case None =>
- logWarning(s"Got driver update for unknown driver $driverId")
+ state match {
+ case DriverState.FAILED | DriverState.FINISHED | DriverState.KILLED =>
+ removeDriver(driverId, state, exception)
+ case _ =>
+ throw new Exception(s"Received unexpected state update for driver $driverId: $state")
}
}
@@ -638,6 +629,21 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
worker.actor ! LaunchDriver(driver.id, driver.desc)
driver.state = DriverState.RUNNING
}
+
+ def removeDriver(driverId: String, finalState: DriverState, exception: Option[Exception]) {
+ drivers.find(d => d.id == driverId) match {
+ case Some(driver) =>
+ logInfo(s"Removing driver: $driverId")
+ drivers -= driver
+ completedDrivers += driver
+ persistenceEngine.removeDriver(driver)
+ driver.state = finalState
+ driver.exception = exception
+ driver.worker.foreach(w => w.removeDriver(driver))
+ case None =>
+ logWarning(s"Asked to remove unknown driver: $driverId")
+ }
+ }
}
private[spark] object Master {
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
index c8cafac3b6..35a15074db 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
@@ -53,10 +53,10 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
UIUtils.listingTable(executorHeaders, executorRow, workerState.finishedExecutors)
val driverHeaders = Seq("DriverID", "Main Class", "Cores", "Memory", "Logs")
- val runningDriverTable =
- UIUtils.listingTable(driverHeaders, driverRow, workerState.drivers)
- def finishedDriverTable =
- UIUtils.listingTable(driverHeaders, driverRow, workerState.finishedDrivers)
+ val runningDrivers = workerState.drivers.sortBy(_.driverId).reverse
+ val runningDriverTable = UIUtils.listingTable(driverHeaders, driverRow, runningDrivers)
+ val finishedDrivers = workerState.finishedDrivers.sortBy(_.driverId).reverse
+ def finishedDriverTable = UIUtils.listingTable(driverHeaders, driverRow, finishedDrivers)
val content =
<div class="row-fluid"> <!-- Worker Details -->