aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala57
1 files changed, 52 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index 678571fd4f..8ae712f8ed 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -53,7 +53,8 @@ private[ui] case class ExecutorTaskSummary(
var shuffleRead: Long = 0L,
var shuffleWrite: Long = 0L,
var executorLogs: Map[String, String] = Map.empty,
- var isAlive: Boolean = true
+ var isAlive: Boolean = true,
+ var isBlacklisted: Boolean = false
)
/**
@@ -73,7 +74,8 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
def deadStorageStatusList: Seq[StorageStatus] = storageStatusListener.deadStorageStatusList
- override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = synchronized {
+ override def onExecutorAdded(
+ executorAdded: SparkListenerExecutorAdded): Unit = synchronized {
val eid = executorAdded.executorId
val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid))
taskSummary.executorLogs = executorAdded.executorInfo.logUrlMap
@@ -100,7 +102,8 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
executorToTaskSummary.get(executorRemoved.executorId).foreach(e => e.isAlive = false)
}
- override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
+ override def onApplicationStart(
+ applicationStart: SparkListenerApplicationStart): Unit = {
applicationStart.driverLogs.foreach { logs =>
val storageStatus = activeStorageStatusList.find { s =>
s.blockManagerId.executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER ||
@@ -114,13 +117,15 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
}
}
- override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized {
+ override def onTaskStart(
+ taskStart: SparkListenerTaskStart): Unit = synchronized {
val eid = taskStart.taskInfo.executorId
val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid))
taskSummary.tasksActive += 1
}
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
+ override def onTaskEnd(
+ taskEnd: SparkListenerTaskEnd): Unit = synchronized {
val info = taskEnd.taskInfo
if (info != null) {
val eid = info.executorId
@@ -157,4 +162,46 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
}
}
+ private def updateExecutorBlacklist(
+ eid: String,
+ isBlacklisted: Boolean): Unit = {
+ val execTaskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid))
+ execTaskSummary.isBlacklisted = isBlacklisted
+ }
+
+ override def onExecutorBlacklisted(
+ executorBlacklisted: SparkListenerExecutorBlacklisted)
+ : Unit = synchronized {
+ updateExecutorBlacklist(executorBlacklisted.executorId, true)
+ }
+
+ override def onExecutorUnblacklisted(
+ executorUnblacklisted: SparkListenerExecutorUnblacklisted)
+ : Unit = synchronized {
+ updateExecutorBlacklist(executorUnblacklisted.executorId, false)
+ }
+
+ override def onNodeBlacklisted(
+ nodeBlacklisted: SparkListenerNodeBlacklisted)
+ : Unit = synchronized {
+ // Implicitly blacklist every executor associated with this node, and show this in the UI.
+ activeStorageStatusList.foreach { status =>
+ if (status.blockManagerId.host == nodeBlacklisted.hostId) {
+ updateExecutorBlacklist(status.blockManagerId.executorId, true)
+ }
+ }
+ }
+
+ override def onNodeUnblacklisted(
+ nodeUnblacklisted: SparkListenerNodeUnblacklisted)
+ : Unit = synchronized {
+ // Implicitly unblacklist every executor associated with this node, regardless of how
+ // they may have been blacklisted initially (either explicitly through executor blacklisting
+ // or implicitly through node blacklisting). Show this in the UI.
+ activeStorageStatusList.foreach { status =>
+ if (status.blockManagerId.host == nodeUnblacklisted.hostId) {
+ updateExecutorBlacklist(status.blockManagerId.executorId, false)
+ }
+ }
+ }
}