diff options
Diffstat (limited to 'core')
4 files changed, 107 insertions, 87 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index 46b9f4dc7d..72d0589689 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -20,6 +20,7 @@ package org.apache.spark.deploy.master import java.util.Date import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import akka.actor.ActorRef @@ -36,6 +37,7 @@ private[spark] class ApplicationInfo( @transient var state: ApplicationState.Value = _ @transient var executors: mutable.HashMap[Int, ExecutorInfo] = _ + @transient var removedExecutors: ArrayBuffer[ExecutorInfo] = _ @transient var coresGranted: Int = _ @transient var endTime: Long = _ @transient var appSource: ApplicationSource = _ @@ -51,6 +53,7 @@ private[spark] class ApplicationInfo( endTime = -1L appSource = new ApplicationSource(this) nextExecutorId = 0 + removedExecutors = new ArrayBuffer[ExecutorInfo] } private def newExecutorId(useID: Option[Int] = None): Int = { @@ -74,6 +77,7 @@ private[spark] class ApplicationInfo( def removeExecutor(exec: ExecutorInfo) { if (executors.contains(exec.id)) { + removedExecutors += executors(exec.id) executors -= exec.id coresGranted -= exec.cores } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala index 76db61dd61..d417070c51 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala @@ -34,4 +34,19 @@ private[spark] class ExecutorInfo( } def fullId: String = application.id + "/" + id + + override def equals(other: Any): Boolean = { + other match { + case info: ExecutorInfo => + fullId == info.fullId && + worker.id == info.worker.id && + cores == info.cores && + memory == info.memory + case _ => false + } + } + + override def toString: String = fullId + + override def hashCode: Int = toString.hashCode() } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index b5cd4d2ea9..34fa1429c8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -25,7 +25,7 @@ import scala.xml.Node import akka.pattern.ask import org.json4s.JValue -import org.apache.spark.deploy.JsonProtocol +import org.apache.spark.deploy.{ExecutorState, JsonProtocol} import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} import org.apache.spark.deploy.master.ExecutorInfo import org.apache.spark.ui.{WebUIPage, UIUtils} @@ -57,43 +57,55 @@ private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app }) val executorHeaders = Seq("ExecutorID", "Worker", "Cores", "Memory", "State", "Logs") - val executors = app.executors.values.toSeq - val executorTable = UIUtils.listingTable(executorHeaders, executorRow, executors) + val allExecutors = (app.executors.values ++ app.removedExecutors).toSet.toSeq + // This includes executors that are either still running or have exited cleanly + val executors = allExecutors.filter { exec => + !ExecutorState.isFinished(exec.state) || exec.state == ExecutorState.EXITED + } + val removedExecutors = allExecutors.diff(executors) + val executorsTable = UIUtils.listingTable(executorHeaders, executorRow, executors) + val removedExecutorsTable = UIUtils.listingTable(executorHeaders, executorRow, removedExecutors) val content = - <div class="row-fluid"> - <div class="span12"> - <ul class="unstyled"> - <li><strong>ID:</strong> {app.id}</li> - <li><strong>Name:</strong> {app.desc.name}</li> - <li><strong>User:</strong> {app.desc.user}</li> - <li><strong>Cores:</strong> - { - if (app.desc.maxCores.isEmpty) { - "Unlimited (%s granted)".format(app.coresGranted) - } else { - "%s (%s granted, %s left)".format( - app.desc.maxCores.get, app.coresGranted, app.coresLeft) - } - } - </li> - <li> - <strong>Executor Memory:</strong> - {Utils.megabytesToString(app.desc.memoryPerSlave)} - </li> - <li><strong>Submit Date:</strong> {app.submitDate}</li> - <li><strong>State:</strong> {app.state}</li> - <li><strong><a href={app.desc.appUiUrl}>Application Detail UI</a></strong></li> - </ul> - </div> + <div class="row-fluid"> + <div class="span12"> + <ul class="unstyled"> + <li><strong>ID:</strong> {app.id}</li> + <li><strong>Name:</strong> {app.desc.name}</li> + <li><strong>User:</strong> {app.desc.user}</li> + <li><strong>Cores:</strong> + { + if (app.desc.maxCores.isEmpty) { + "Unlimited (%s granted)".format(app.coresGranted) + } else { + "%s (%s granted, %s left)".format( + app.desc.maxCores.get, app.coresGranted, app.coresLeft) + } + } + </li> + <li> + <strong>Executor Memory:</strong> + {Utils.megabytesToString(app.desc.memoryPerSlave)} + </li> + <li><strong>Submit Date:</strong> {app.submitDate}</li> + <li><strong>State:</strong> {app.state}</li> + <li><strong><a href={app.desc.appUiUrl}>Application Detail UI</a></strong></li> + </ul> </div> + </div> - <div class="row-fluid"> <!-- Executors --> - <div class="span12"> - <h4> Executor Summary </h4> - {executorTable} - </div> - </div>; + <div class="row-fluid"> <!-- Executors --> + <div class="span12"> + <h4> Executor Summary </h4> + {executorsTable} + { + if (removedExecutors.nonEmpty) { + <h4> Removed Executors </h4> ++ + removedExecutorsTable + } + } + </div> + </div>; UIUtils.basicSparkPage(content, "Application: " + app.desc.name) } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala index d4513118ce..327b905032 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala @@ -46,74 +46,62 @@ private[spark] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") { val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse] val workerState = Await.result(stateFuture, timeout) - val executorHeaders = Seq("ExecutorID", "Cores", "Memory", "Job Details", "Logs") + val executorHeaders = Seq("ExecutorID", "Cores", "State", "Memory", "Job Details", "Logs") + val runningExecutors = workerState.executors val runningExecutorTable = - UIUtils.listingTable(executorHeaders, executorRow, workerState.executors) + UIUtils.listingTable(executorHeaders, executorRow, runningExecutors) + val finishedExecutors = workerState.finishedExecutors val finishedExecutorTable = - UIUtils.listingTable(executorHeaders, executorRow, workerState.finishedExecutors) + UIUtils.listingTable(executorHeaders, executorRow, finishedExecutors) val driverHeaders = Seq("DriverID", "Main Class", "State", "Cores", "Memory", "Logs", "Notes") val runningDrivers = workerState.drivers.sortBy(_.driverId).reverse val runningDriverTable = UIUtils.listingTable(driverHeaders, driverRow, runningDrivers) val finishedDrivers = workerState.finishedDrivers.sortBy(_.driverId).reverse - def finishedDriverTable = UIUtils.listingTable(driverHeaders, driverRow, finishedDrivers) + val finishedDriverTable = UIUtils.listingTable(driverHeaders, driverRow, finishedDrivers) // For now we only show driver information if the user has submitted drivers to the cluster. // This is until we integrate the notion of drivers and applications in the UI. - def hasDrivers = runningDrivers.length > 0 || finishedDrivers.length > 0 val content = - <div class="row-fluid"> <!-- Worker Details --> - <div class="span12"> - <ul class="unstyled"> - <li><strong>ID:</strong> {workerState.workerId}</li> - <li><strong> - Master URL:</strong> {workerState.masterUrl} - </li> - <li><strong>Cores:</strong> {workerState.cores} ({workerState.coresUsed} Used)</li> - <li><strong>Memory:</strong> {Utils.megabytesToString(workerState.memory)} - ({Utils.megabytesToString(workerState.memoryUsed)} Used)</li> - </ul> - <p><a href={workerState.masterWebUiUrl}>Back to Master</a></p> - </div> + <div class="row-fluid"> <!-- Worker Details --> + <div class="span12"> + <ul class="unstyled"> + <li><strong>ID:</strong> {workerState.workerId}</li> + <li><strong> + Master URL:</strong> {workerState.masterUrl} + </li> + <li><strong>Cores:</strong> {workerState.cores} ({workerState.coresUsed} Used)</li> + <li><strong>Memory:</strong> {Utils.megabytesToString(workerState.memory)} + ({Utils.megabytesToString(workerState.memoryUsed)} Used)</li> + </ul> + <p><a href={workerState.masterWebUiUrl}>Back to Master</a></p> </div> - - <div class="row-fluid"> <!-- Running Executors --> - <div class="span12"> - <h4> Running Executors {workerState.executors.size} </h4> - {runningExecutorTable} - </div> - </div> - // scalastyle:off - <div> - {if (hasDrivers) - <div class="row-fluid"> <!-- Running Drivers --> - <div class="span12"> - <h4> Running Drivers {workerState.drivers.size} </h4> - {runningDriverTable} - </div> - </div> + </div> + <div class="row-fluid"> <!-- Executors and Drivers --> + <div class="span12"> + <h4> Running Executors ({runningExecutors.size}) </h4> + {runningExecutorTable} + { + if (runningDrivers.nonEmpty) { + <h4> Running Drivers ({runningDrivers.size}) </h4> ++ + runningDriverTable + } } - </div> - - <div class="row-fluid"> <!-- Finished Executors --> - <div class="span12"> - <h4> Finished Executors </h4> - {finishedExecutorTable} - </div> - </div> - - <div> - {if (hasDrivers) - <div class="row-fluid"> <!-- Finished Drivers --> - <div class="span12"> - <h4> Finished Drivers </h4> - {finishedDriverTable} - </div> - </div> + { + if (finishedExecutors.nonEmpty) { + <h4>Finished Executors ({finishedExecutors.size}) </h4> ++ + finishedExecutorTable + } } - </div>; - // scalastyle:on + { + if (finishedDrivers.nonEmpty) { + <h4> Finished Drivers ({finishedDrivers.size}) </h4> ++ + finishedDriverTable + } + } + </div> + </div>; UIUtils.basicSparkPage(content, "Spark Worker at %s:%s".format( workerState.host, workerState.port)) } @@ -122,6 +110,7 @@ private[spark] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") { <tr> <td>{executor.execId}</td> <td>{executor.cores}</td> + <td>{executor.state}</td> <td sorttable_customkey={executor.memory.toString}> {Utils.megabytesToString(executor.memory)} </td> |