aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala8
4 files changed, 18 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
index a72d76be52..db1dde59d6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
@@ -162,7 +162,7 @@ private[spark] class IndexPage(parent: MasterWebUI) {
<tr>
<td>{driver.id} </td>
<td>{driver.submitDate}</td>
- <td>{driver.worker.map(w => w.id.toString).getOrElse("None")}</td>
+ <td>{driver.worker.map(w => <a href={w.webUiAddress}>{w.id.toString}</a>).getOrElse("None")}</td>
<td>{driver.state}</td>
<td sorttable_customkey={driver.desc.cores.toString}>
{driver.desc.cores}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index d13d7eff09..ad70345a7f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -32,6 +32,7 @@ import org.apache.spark.Logging
import org.apache.spark.deploy.{Command, DriverDescription}
import org.apache.spark.deploy.DeployMessages.DriverStateChanged
import org.apache.spark.deploy.master.DriverState
+import org.apache.spark.deploy.master.DriverState.DriverState
/**
* Manages the execution of one driver, including automatically restarting the driver on failure.
@@ -48,6 +49,10 @@ private[spark] class DriverRunner(
@volatile var process: Option[Process] = None
@volatile var killed = false
+ // Populated once finished
+ var finalState: Option[DriverState] = None
+ var finalException: Option[Exception] = None
+
// Decoupled for testing
private[deploy] def setClock(_clock: Clock) = clock = _clock
private[deploy] def setSleeper(_sleeper: Sleeper) = sleeper = _sleeper
@@ -62,8 +67,6 @@ private[spark] class DriverRunner(
def start() = {
new Thread("DriverRunner for " + driverId) {
override def run() {
- var exn: Option[Exception] = None
-
try {
val driverDir = createWorkingDirectory()
val localJarFilename = downloadUserJar(driverDir)
@@ -79,15 +82,16 @@ private[spark] class DriverRunner(
launchDriver(command, env, driverDir, driverDesc.supervise)
}
catch {
- case e: Exception => exn = Some(e)
+ case e: Exception => finalException = Some(e)
}
- val finalState =
+ val state =
if (killed) { DriverState.KILLED }
- else if (exn.isDefined) { DriverState.FAILED }
+ else if (finalException.isDefined) { DriverState.FAILED }
else { DriverState.FINISHED }
+ finalState = Some(state)
- worker ! DriverStateChanged(driverId, finalState, exn)
+ worker ! DriverStateChanged(driverId, state, finalException)
}
}.start()
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 4546e3892f..2a2b7a3881 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -31,6 +31,7 @@ import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
+import org.apache.spark.deploy.master.DriverState.DriverState
import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{AkkaUtils, Utils}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
index 93c6ad49d7..0a7f56e2d3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
@@ -18,7 +18,6 @@
package org.apache.spark.deploy.worker.ui
import scala.concurrent.Await
-import scala.concurrent.duration._
import scala.xml.Node
import akka.pattern.ask
@@ -27,6 +26,7 @@ import net.liftweb.json.JsonAST.JValue
import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse}
+import org.apache.spark.deploy.master.DriverState
import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
import org.apache.spark.ui.UIUtils
import org.apache.spark.util.Utils
@@ -52,7 +52,7 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
val finishedExecutorTable =
UIUtils.listingTable(executorHeaders, executorRow, workerState.finishedExecutors)
- val driverHeaders = Seq("DriverID", "Main Class", "Cores", "Memory", "Logs")
+ val driverHeaders = Seq("DriverID", "Main Class", "State", "Cores", "Memory", "Logs", "Notes")
val runningDrivers = workerState.drivers.sortBy(_.driverId).reverse
val runningDriverTable = UIUtils.listingTable(driverHeaders, driverRow, runningDrivers)
val finishedDrivers = workerState.finishedDrivers.sortBy(_.driverId).reverse
@@ -134,6 +134,7 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
<tr>
<td>{driver.driverId}</td>
<td>{driver.driverDesc.command.mainClass}</td>
+ <td>{driver.finalState.getOrElse(DriverState.RUNNING)}</td>
<td sorttable_customkey={driver.driverDesc.cores.toString}>
{driver.driverDesc.cores.toString}
</td>
@@ -144,6 +145,9 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
<a href={s"logPage?driverId=${driver.driverId}&logType=stdout"}>stdout</a>
<a href={s"logPage?driverId=${driver.driverId}&logType=stderr"}>stderr</a>
</td>
+ <td>
+ {driver.finalException.getOrElse("")}
+ </td>
</tr>
}
}