diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala | 57 |
1 files changed, 52 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 678571fd4f..8ae712f8ed 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -53,7 +53,8 @@ private[ui] case class ExecutorTaskSummary( var shuffleRead: Long = 0L, var shuffleWrite: Long = 0L, var executorLogs: Map[String, String] = Map.empty, - var isAlive: Boolean = true + var isAlive: Boolean = true, + var isBlacklisted: Boolean = false ) /** @@ -73,7 +74,8 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar def deadStorageStatusList: Seq[StorageStatus] = storageStatusListener.deadStorageStatusList - override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = synchronized { + override def onExecutorAdded( + executorAdded: SparkListenerExecutorAdded): Unit = synchronized { val eid = executorAdded.executorId val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid)) taskSummary.executorLogs = executorAdded.executorInfo.logUrlMap @@ -100,7 +102,8 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar executorToTaskSummary.get(executorRemoved.executorId).foreach(e => e.isAlive = false) } - override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { + override def onApplicationStart( + applicationStart: SparkListenerApplicationStart): Unit = { applicationStart.driverLogs.foreach { logs => val storageStatus = activeStorageStatusList.find { s => s.blockManagerId.executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER || @@ -114,13 +117,15 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar } } - override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized { + override def onTaskStart( + taskStart: SparkListenerTaskStart): Unit = synchronized { val eid = taskStart.taskInfo.executorId val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid)) taskSummary.tasksActive += 1 } - override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized { + override def onTaskEnd( + taskEnd: SparkListenerTaskEnd): Unit = synchronized { val info = taskEnd.taskInfo if (info != null) { val eid = info.executorId @@ -157,4 +162,46 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar } } + private def updateExecutorBlacklist( + eid: String, + isBlacklisted: Boolean): Unit = { + val execTaskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid)) + execTaskSummary.isBlacklisted = isBlacklisted + } + + override def onExecutorBlacklisted( + executorBlacklisted: SparkListenerExecutorBlacklisted) + : Unit = synchronized { + updateExecutorBlacklist(executorBlacklisted.executorId, true) + } + + override def onExecutorUnblacklisted( + executorUnblacklisted: SparkListenerExecutorUnblacklisted) + : Unit = synchronized { + updateExecutorBlacklist(executorUnblacklisted.executorId, false) + } + + override def onNodeBlacklisted( + nodeBlacklisted: SparkListenerNodeBlacklisted) + : Unit = synchronized { + // Implicitly blacklist every executor associated with this node, and show this in the UI. + activeStorageStatusList.foreach { status => + if (status.blockManagerId.host == nodeBlacklisted.hostId) { + updateExecutorBlacklist(status.blockManagerId.executorId, true) + } + } + } + + override def onNodeUnblacklisted( + nodeUnblacklisted: SparkListenerNodeUnblacklisted) + : Unit = synchronized { + // Implicitly unblacklist every executor associated with this node, regardless of how + // they may have been blacklisted initially (either explicitly through executor blacklisting + // or implicitly through node blacklisting). Show this in the UI. + activeStorageStatusList.foreach { status => + if (status.blockManagerId.host == nodeUnblacklisted.hostId) { + updateExecutorBlacklist(status.blockManagerId.executorId, false) + } + } + } } |