diff options
Diffstat (limited to 'core')
11 files changed, 98 insertions, 44 deletions
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala index 645ede26a0..7750a09623 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala @@ -28,7 +28,7 @@ private[v1] class AllRDDResource(ui: SparkUI) { @GET def rddList(): Seq[RDDStorageInfo] = { - val storageStatusList = ui.storageListener.storageStatusList + val storageStatusList = ui.storageListener.activeStorageStatusList val rddInfos = ui.storageListener.rddInfoList rddInfos.map{rddInfo => AllRDDResource.getRDDStorageInfo(rddInfo.id, rddInfo, storageStatusList, @@ -44,7 +44,7 @@ private[spark] object AllRDDResource { rddId: Int, listener: StorageListener, includeDetails: Boolean): Option[RDDStorageInfo] = { - val storageStatusList = listener.storageStatusList + val storageStatusList = listener.activeStorageStatusList listener.rddInfoList.find { _.id == rddId }.map { rddInfo => getRDDStorageInfo(rddId, rddInfo, storageStatusList, includeDetails) } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala index 3bdba92232..6ca59c2f3c 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala @@ -31,9 +31,9 @@ private[v1] class ExecutorListResource(ui: SparkUI) { listener.synchronized { // The follow codes should be protected by `listener` to make sure no executors will be // removed before we query their status. See SPARK-12784. - val storageStatusList = listener.storageStatusList + val storageStatusList = listener.activeStorageStatusList (0 until storageStatusList.size).map { statusId => - ExecutorsPage.getExecInfo(listener, statusId) + ExecutorsPage.getExecInfo(listener, statusId, isActive = true) } } } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index d116e68c17..909dd0c07e 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -54,6 +54,7 @@ class ExecutorStageSummary private[spark]( class ExecutorSummary private[spark]( val id: String, val hostPort: String, + val isActive: Boolean, val rddBlocks: Int, val memoryUsed: Long, val diskUsed: Long, diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala index d98aae8ff0..f552b498a7 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala @@ -19,6 +19,7 @@ package org.apache.spark.storage import scala.collection.mutable +import org.apache.spark.SparkConf import org.apache.spark.annotation.DeveloperApi import org.apache.spark.scheduler._ @@ -29,14 +30,20 @@ import org.apache.spark.scheduler._ * This class is thread-safe (unlike JobProgressListener) */ @DeveloperApi -class StorageStatusListener extends SparkListener { +class StorageStatusListener(conf: SparkConf) extends SparkListener { // This maintains only blocks that are cached (i.e. storage level is not StorageLevel.NONE) private[storage] val executorIdToStorageStatus = mutable.Map[String, StorageStatus]() + private[storage] val deadExecutorStorageStatus = new mutable.ListBuffer[StorageStatus]() + private[this] val retainedDeadExecutors = conf.getInt("spark.ui.retainedDeadExecutors", 100) def storageStatusList: Seq[StorageStatus] = synchronized { executorIdToStorageStatus.values.toSeq } + def deadStorageStatusList: Seq[StorageStatus] = synchronized { + deadExecutorStorageStatus.toSeq + } + /** Update storage status list to reflect updated block statuses */ private def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) { executorIdToStorageStatus.get(execId).foreach { storageStatus => @@ -87,8 +94,12 @@ class StorageStatusListener extends SparkListener { override def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved) { synchronized { val executorId = blockManagerRemoved.blockManagerId.executorId - executorIdToStorageStatus.remove(executorId) + executorIdToStorageStatus.remove(executorId).foreach { status => + deadExecutorStorageStatus += status + } + if (deadExecutorStorageStatus.size > retainedDeadExecutors) { + deadExecutorStorageStatus.trimStart(1) + } } } - } diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 6cc30eeaf5..5324a76829 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -196,7 +196,7 @@ private[spark] object SparkUI { } val environmentListener = new EnvironmentListener - val storageStatusListener = new StorageStatusListener + val storageStatusListener = new StorageStatusListener(conf) val executorsListener = new ExecutorsListener(storageStatusListener, conf) val storageListener = new StorageListener(storageStatusListener) val operationGraphListener = new RDDOperationGraphListener(conf) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index e1f7549999..eba7a312ba 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -54,24 +54,30 @@ private[ui] class ExecutorsPage( private val GCTimePercent = 0.1 def render(request: HttpServletRequest): Seq[Node] = { - val (storageStatusList, execInfo) = listener.synchronized { + val (activeExecutorInfo, deadExecutorInfo) = listener.synchronized { // The follow codes should be protected by `listener` to make sure no executors will be // removed before we query their status. See SPARK-12784. - val _storageStatusList = listener.storageStatusList - val _execInfo = { - for (statusId <- 0 until _storageStatusList.size) - yield ExecutorsPage.getExecInfo(listener, statusId) + val _activeExecutorInfo = { + for (statusId <- 0 until listener.activeStorageStatusList.size) + yield ExecutorsPage.getExecInfo(listener, statusId, isActive = true) } - (_storageStatusList, _execInfo) + val _deadExecutorInfo = { + for (statusId <- 0 until listener.deadStorageStatusList.size) + yield ExecutorsPage.getExecInfo(listener, statusId, isActive = false) + } + (_activeExecutorInfo, _deadExecutorInfo) } + + val execInfo = activeExecutorInfo ++ deadExecutorInfo val execInfoSorted = execInfo.sortBy(_.id) val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty - val execTable = + val execTable = { <table class={UIUtils.TABLE_CLASS_STRIPED_SORTABLE}> <thead> <th>Executor ID</th> <th>Address</th> + <th>Status</th> <th>RDD Blocks</th> <th><span data-toggle="tooltip" title={ToolTips.STORAGE_MEMORY}>Storage Memory</span></th> <th>Disk Used</th> @@ -98,22 +104,28 @@ private[ui] class ExecutorsPage( {execInfoSorted.map(execRow(_, logsExist))} </tbody> </table> + } val content = <div class="row"> <div class="span12"> - <h4>Totals for {execInfo.size} Executors</h4> - {execSummary(execInfo)} + <h4>Dead Executors({deadExecutorInfo.size})</h4> + </div> + </div> + <div class="row"> + <div class="span12"> + <h4>Active Executors({activeExecutorInfo.size})</h4> + {execSummary(activeExecutorInfo)} </div> </div> <div class = "row"> <div class="span12"> - <h4>Active Executors</h4> + <h4>Executors</h4> {execTable} </div> </div>; - UIUtils.headerSparkPage("Executors (" + execInfo.size + ")", content, parent) + UIUtils.headerSparkPage("Executors", content, parent) } /** Render an HTML row representing an executor */ @@ -121,9 +133,19 @@ private[ui] class ExecutorsPage( val maximumMemory = info.maxMemory val memoryUsed = info.memoryUsed val diskUsed = info.diskUsed + val executorStatus = + if (info.isActive) { + "Active" + } else { + "Dead" + } + <tr> <td>{info.id}</td> <td>{info.hostPort}</td> + <td sorttable_customkey={executorStatus.toString}> + {executorStatus} + </td> <td>{info.rddBlocks}</td> <td sorttable_customkey={memoryUsed.toString}> {Utils.bytesToString(memoryUsed)} / @@ -161,10 +183,14 @@ private[ui] class ExecutorsPage( } { if (threadDumpEnabled) { - val encodedId = URLEncoder.encode(info.id, "UTF-8") - <td> - <a href={s"threadDump/?executorId=${encodedId}"}>Thread Dump</a> - </td> + if (info.isActive) { + val encodedId = URLEncoder.encode(info.id, "UTF-8") + <td> + <a href={s"threadDump/?executorId=${encodedId}"}>Thread Dump</a> + </td> + } else { + <td> </td> + } } else { Seq.empty } @@ -236,14 +262,13 @@ private[ui] class ExecutorsPage( } private def taskData( - maxTasks: Int, - activeTasks: Int, - failedTasks: Int, - completedTasks: Int, - totalTasks: Int, - totalDuration: Long, - totalGCTime: Long): - Seq[Node] = { + maxTasks: Int, + activeTasks: Int, + failedTasks: Int, + completedTasks: Int, + totalTasks: Int, + totalDuration: Long, + totalGCTime: Long): Seq[Node] = { // Determine Color Opacity from 0.5-1 // activeTasks range from 0 to maxTasks val activeTasksAlpha = @@ -302,8 +327,15 @@ private[ui] class ExecutorsPage( private[spark] object ExecutorsPage { /** Represent an executor's info as a map given a storage status index */ - def getExecInfo(listener: ExecutorsListener, statusId: Int): ExecutorSummary = { - val status = listener.storageStatusList(statusId) + def getExecInfo( + listener: ExecutorsListener, + statusId: Int, + isActive: Boolean): ExecutorSummary = { + val status = if (isActive) { + listener.activeStorageStatusList(statusId) + } else { + listener.deadStorageStatusList(statusId) + } val execId = status.blockManagerId.executorId val hostPort = status.blockManagerId.hostPort val rddBlocks = status.numBlocks @@ -326,6 +358,7 @@ private[spark] object ExecutorsPage { new ExecutorSummary( execId, hostPort, + isActive, rddBlocks, memUsed, diskUsed, diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index dcfebe92ed..788f35ec77 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -61,7 +61,9 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar val executorToLogUrls = HashMap[String, Map[String, String]]() val executorIdToData = HashMap[String, ExecutorUIData]() - def storageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList + def activeStorageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList + + def deadStorageStatusList: Seq[StorageStatus] = storageStatusListener.deadStorageStatusList override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = synchronized { val eid = executorAdded.executorId @@ -81,7 +83,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { applicationStart.driverLogs.foreach { logs => - val storageStatus = storageStatusList.find { s => + val storageStatus = activeStorageStatusList.find { s => s.blockManagerId.executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER || s.blockManagerId.executorId == SparkContext.DRIVER_IDENTIFIER } diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala index f1e28b4e1e..8f75b586e1 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala @@ -43,7 +43,7 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Bloc private[ui] val _rddInfoMap = mutable.Map[Int, RDDInfo]() // exposed for testing - def storageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList + def activeStorageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList /** Filter RDD info to include only those with cached partitions */ def rddInfoList: Seq[RDDInfo] = synchronized { @@ -54,7 +54,7 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Bloc private def updateRDDInfo(updatedBlocks: Seq[(BlockId, BlockStatus)]): Unit = { val rddIdsToUpdate = updatedBlocks.flatMap { case (bid, _) => bid.asRDDId.map(_.rddId) }.toSet val rddInfosToUpdate = _rddInfoMap.values.toSeq.filter { s => rddIdsToUpdate.contains(s.id) } - StorageUtils.updateRddInfo(rddInfosToUpdate, storageStatusList) + StorageUtils.updateRddInfo(rddInfosToUpdate, activeStorageStatusList) } /** diff --git a/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json index 9d5d224e55..4a88eeee74 100644 --- a/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json @@ -1,6 +1,7 @@ [ { "id" : "<driver>", "hostPort" : "localhost:57971", + "isActive" : true, "rddBlocks" : 8, "memoryUsed" : 28000128, "diskUsed" : 0, diff --git a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala index 9de434166b..14daa003bc 100644 --- a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.storage -import org.apache.spark.{SparkFunSuite, Success} +import org.apache.spark.{SparkConf, SparkFunSuite, Success} import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler._ @@ -29,9 +29,11 @@ class StorageStatusListenerSuite extends SparkFunSuite { private val bm2 = BlockManagerId("fat", "duck", 2) private val taskInfo1 = new TaskInfo(0, 0, 0, 0, "big", "dog", TaskLocality.ANY, false) private val taskInfo2 = new TaskInfo(0, 0, 0, 0, "fat", "duck", TaskLocality.ANY, false) + private val conf = new SparkConf() test("block manager added/removed") { - val listener = new StorageStatusListener + conf.set("spark.ui.retainedDeadExecutors", "1") + val listener = new StorageStatusListener(conf) // Block manager add assert(listener.executorIdToStorageStatus.size === 0) @@ -53,14 +55,18 @@ class StorageStatusListenerSuite extends SparkFunSuite { assert(listener.executorIdToStorageStatus.size === 1) assert(!listener.executorIdToStorageStatus.get("big").isDefined) assert(listener.executorIdToStorageStatus.get("fat").isDefined) + assert(listener.deadExecutorStorageStatus.size === 1) + assert(listener.deadExecutorStorageStatus(0).blockManagerId.executorId.equals("big")) listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(1L, bm2)) assert(listener.executorIdToStorageStatus.size === 0) assert(!listener.executorIdToStorageStatus.get("big").isDefined) assert(!listener.executorIdToStorageStatus.get("fat").isDefined) + assert(listener.deadExecutorStorageStatus.size === 1) + assert(listener.deadExecutorStorageStatus(0).blockManagerId.executorId.equals("fat")) } test("task end without updated blocks") { - val listener = new StorageStatusListener + val listener = new StorageStatusListener(conf) listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L)) val taskMetrics = new TaskMetrics @@ -77,7 +83,7 @@ class StorageStatusListenerSuite extends SparkFunSuite { } test("task end with updated blocks") { - val listener = new StorageStatusListener + val listener = new StorageStatusListener(conf) listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L)) val taskMetrics1 = new TaskMetrics @@ -126,7 +132,7 @@ class StorageStatusListenerSuite extends SparkFunSuite { } test("unpersist RDD") { - val listener = new StorageStatusListener + val listener = new StorageStatusListener(conf) listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) val taskMetrics1 = new TaskMetrics val taskMetrics2 = new TaskMetrics diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala index d1dbf7c155..6b7c538ac8 100644 --- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.ui.storage import org.scalatest.BeforeAndAfter -import org.apache.spark.{SparkFunSuite, Success} +import org.apache.spark.{SparkConf, SparkFunSuite, Success} import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler._ import org.apache.spark.storage._ @@ -44,7 +44,7 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter { before { bus = new LiveListenerBus - storageStatusListener = new StorageStatusListener + storageStatusListener = new StorageStatusListener(new SparkConf()) storageListener = new StorageListener(storageStatusListener) bus.addListener(storageStatusListener) bus.addListener(storageListener) |