aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorJosé Hiram Soltren <jose@cloudera.com>2017-01-19 09:08:18 -0600
committerImran Rashid <irashid@cloudera.com>2017-01-19 09:08:18 -0600
commit640f942337e1ce87075195998bd051e19c4b50b9 (patch)
treeb1904dc723de8a075e95cd53b3de8ca1a495efe4 /core/src/main
parent064fadd2a25d1c118e062e505a0ed56be31bdf34 (diff)
downloadspark-640f942337e1ce87075195998bd051e19c4b50b9.tar.gz
spark-640f942337e1ce87075195998bd051e19c4b50b9.tar.bz2
spark-640f942337e1ce87075195998bd051e19c4b50b9.zip
[SPARK-16654][CORE] Add UI coverage for Application Level Blacklisting
Builds on top of work in SPARK-8425 to update Application Level Blacklisting in the scheduler. ## What changes were proposed in this pull request? Adds a UI to these patches by: - defining new listener events for blacklisting and unblacklisting, nodes and executors; - sending said events at the relevant points in BlacklistTracker; - adding JSON (de)serialization code for these events; - augmenting the Executors UI page to show which, and how many, executors are blacklisted; - adding a unit test to make sure events are being fired; - adding HistoryServerSuite coverage to verify that the SHS reads these events correctly. - updates the Executor UI to show Blacklisted/Active/Dead as a tri-state in Executors Status Updates .rat-excludes to pass tests. username squito ## How was this patch tested? ./dev/run-tests testOnly org.apache.spark.util.JsonProtocolSuite testOnly org.apache.spark.scheduler.BlacklistTrackerSuite testOnly org.apache.spark.deploy.history.HistoryServerSuite https://github.com/jsoltren/jose-utils/blob/master/blacklist/test-blacklist.sh ![blacklist-20161219](https://cloud.githubusercontent.com/assets/1208477/21335321/9eda320a-c623-11e6-8b8c-9c912a73c276.jpg) Author: José Hiram Soltren <jose@cloudera.com> Closes #16346 from jsoltren/SPARK-16654-submit.
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/java/org/apache/spark/SparkFirehoseListener.java20
-rw-r--r--core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html5
-rw-r--r--core/src/main/resources/org/apache/spark/ui/static/executorspage.js39
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala54
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/api.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/ui/ToolTips.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala57
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala2
14 files changed, 215 insertions, 24 deletions
diff --git a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
index 97eed611e8..9fe97b4d9c 100644
--- a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
+++ b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
@@ -114,6 +114,26 @@ public class SparkFirehoseListener implements SparkListenerInterface {
}
@Override
+ public final void onExecutorBlacklisted(SparkListenerExecutorBlacklisted executorBlacklisted) {
+ onEvent(executorBlacklisted);
+ }
+
+ @Override
+ public final void onExecutorUnblacklisted(SparkListenerExecutorUnblacklisted executorUnblacklisted) {
+ onEvent(executorUnblacklisted);
+ }
+
+ @Override
+ public final void onNodeBlacklisted(SparkListenerNodeBlacklisted nodeBlacklisted) {
+ onEvent(nodeBlacklisted);
+ }
+
+ @Override
+ public final void onNodeUnblacklisted(SparkListenerNodeUnblacklisted nodeUnblacklisted) {
+ onEvent(nodeUnblacklisted);
+ }
+
+ @Override
public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) {
onEvent(blockUpdated);
}
diff --git a/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html b/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html
index 64ea719141..4e83d6d564 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html
+++ b/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html
@@ -45,6 +45,11 @@ limitations under the License.
title="Bytes and records written to disk in order to be read by a shuffle in a future stage.">
Shuffle Write</span>
</th>
+ <th>
+ <span data-toggle="tooltip" data-placement="left"
+ title="Number of executors blacklisted by the scheduler due to task failures.">
+ Blacklisted</span>
+ </th>
</thead>
<tbody>
</tbody>
diff --git a/core/src/main/resources/org/apache/spark/ui/static/executorspage.js b/core/src/main/resources/org/apache/spark/ui/static/executorspage.js
index fe5db6aa26..7dbfe32de9 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/executorspage.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/executorspage.js
@@ -182,7 +182,7 @@ $(document).ready(function () {
executorsSummary = $("#active-executors");
getStandAloneppId(function (appId) {
-
+
var endPoint = createRESTEndPoint(appId);
$.getJSON(endPoint, function (response, status, jqXHR) {
var summary = [];
@@ -202,7 +202,8 @@ $(document).ready(function () {
var allTotalInputBytes = 0;
var allTotalShuffleRead = 0;
var allTotalShuffleWrite = 0;
-
+ var allTotalBlacklisted = 0;
+
var activeExecCnt = 0;
var activeRDDBlocks = 0;
var activeMemoryUsed = 0;
@@ -219,7 +220,8 @@ $(document).ready(function () {
var activeTotalInputBytes = 0;
var activeTotalShuffleRead = 0;
var activeTotalShuffleWrite = 0;
-
+ var activeTotalBlacklisted = 0;
+
var deadExecCnt = 0;
var deadRDDBlocks = 0;
var deadMemoryUsed = 0;
@@ -236,7 +238,8 @@ $(document).ready(function () {
var deadTotalInputBytes = 0;
var deadTotalShuffleRead = 0;
var deadTotalShuffleWrite = 0;
-
+ var deadTotalBlacklisted = 0;
+
response.forEach(function (exec) {
allExecCnt += 1;
allRDDBlocks += exec.rddBlocks;
@@ -254,6 +257,7 @@ $(document).ready(function () {
allTotalInputBytes += exec.totalInputBytes;
allTotalShuffleRead += exec.totalShuffleRead;
allTotalShuffleWrite += exec.totalShuffleWrite;
+ allTotalBlacklisted += exec.isBlacklisted ? 1 : 0;
if (exec.isActive) {
activeExecCnt += 1;
activeRDDBlocks += exec.rddBlocks;
@@ -271,6 +275,7 @@ $(document).ready(function () {
activeTotalInputBytes += exec.totalInputBytes;
activeTotalShuffleRead += exec.totalShuffleRead;
activeTotalShuffleWrite += exec.totalShuffleWrite;
+ activeTotalBlacklisted += exec.isBlacklisted ? 1 : 0;
} else {
deadExecCnt += 1;
deadRDDBlocks += exec.rddBlocks;
@@ -288,9 +293,10 @@ $(document).ready(function () {
deadTotalInputBytes += exec.totalInputBytes;
deadTotalShuffleRead += exec.totalShuffleRead;
deadTotalShuffleWrite += exec.totalShuffleWrite;
+ deadTotalBlacklisted += exec.isBlacklisted ? 1 : 0;
}
});
-
+
var totalSummary = {
"execCnt": ( "Total(" + allExecCnt + ")"),
"allRDDBlocks": allRDDBlocks,
@@ -307,7 +313,8 @@ $(document).ready(function () {
"allTotalGCTime": allTotalGCTime,
"allTotalInputBytes": allTotalInputBytes,
"allTotalShuffleRead": allTotalShuffleRead,
- "allTotalShuffleWrite": allTotalShuffleWrite
+ "allTotalShuffleWrite": allTotalShuffleWrite,
+ "allTotalBlacklisted": allTotalBlacklisted
};
var activeSummary = {
"execCnt": ( "Active(" + activeExecCnt + ")"),
@@ -325,7 +332,8 @@ $(document).ready(function () {
"allTotalGCTime": activeTotalGCTime,
"allTotalInputBytes": activeTotalInputBytes,
"allTotalShuffleRead": activeTotalShuffleRead,
- "allTotalShuffleWrite": activeTotalShuffleWrite
+ "allTotalShuffleWrite": activeTotalShuffleWrite,
+ "allTotalBlacklisted": activeTotalBlacklisted
};
var deadSummary = {
"execCnt": ( "Dead(" + deadExecCnt + ")" ),
@@ -343,12 +351,13 @@ $(document).ready(function () {
"allTotalGCTime": deadTotalGCTime,
"allTotalInputBytes": deadTotalInputBytes,
"allTotalShuffleRead": deadTotalShuffleRead,
- "allTotalShuffleWrite": deadTotalShuffleWrite
+ "allTotalShuffleWrite": deadTotalShuffleWrite,
+ "allTotalBlacklisted": deadTotalBlacklisted
};
-
+
var data = {executors: response, "execSummary": [activeSummary, deadSummary, totalSummary]};
$.get(createTemplateURI(appId), function (template) {
-
+
executorsSummary.append(Mustache.render($(template).filter("#executors-summary-template").html(), data));
var selector = "#active-executors-table";
var conf = {
@@ -360,7 +369,12 @@ $(document).ready(function () {
}
},
{data: 'hostPort'},
- {data: 'isActive', render: formatStatus},
+ {data: 'isActive', render: function (data, type, row) {
+ if (type !== 'display') return data;
+ if (row.isBlacklisted) return "Blacklisted";
+ else return formatStatus (data, type);
+ }
+ },
{data: 'rddBlocks'},
{
data: function (row, type) {
@@ -474,7 +488,8 @@ $(document).ready(function () {
},
{data: 'allTotalInputBytes', render: formatBytes},
{data: 'allTotalShuffleRead', render: formatBytes},
- {data: 'allTotalShuffleWrite', render: formatBytes}
+ {data: 'allTotalShuffleWrite', render: formatBytes},
+ {data: 'allTotalBlacklisted'}
],
"paging": false,
"searching": false,
diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
index bf7a62ea33..77d5c97a32 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicReference
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.util.{Clock, SystemClock, Utils}
@@ -48,9 +48,14 @@ import org.apache.spark.util.{Clock, SystemClock, Utils}
* one exception is [[nodeBlacklist()]], which can be called without holding a lock.
*/
private[scheduler] class BlacklistTracker (
+ private val listenerBus: LiveListenerBus,
conf: SparkConf,
clock: Clock = new SystemClock()) extends Logging {
+ def this(sc: SparkContext) = {
+ this(sc.listenerBus, sc.conf)
+ }
+
BlacklistTracker.validateBlacklistConfs(conf)
private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC)
private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE)
@@ -103,6 +108,7 @@ private[scheduler] class BlacklistTracker (
execsToUnblacklist.foreach { exec =>
val status = executorIdToBlacklistStatus.remove(exec).get
val failedExecsOnNode = nodeToBlacklistedExecs(status.node)
+ listenerBus.post(SparkListenerExecutorUnblacklisted(now, exec))
failedExecsOnNode.remove(exec)
if (failedExecsOnNode.isEmpty) {
nodeToBlacklistedExecs.remove(status.node)
@@ -114,7 +120,10 @@ private[scheduler] class BlacklistTracker (
// Un-blacklist any nodes that have been blacklisted longer than the blacklist timeout.
logInfo(s"Removing nodes $nodesToUnblacklist from blacklist because the blacklist " +
s"has timed out")
- nodeIdToBlacklistExpiryTime --= nodesToUnblacklist
+ nodesToUnblacklist.foreach { node =>
+ nodeIdToBlacklistExpiryTime.remove(node)
+ listenerBus.post(SparkListenerNodeUnblacklisted(now, node))
+ }
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
}
updateNextExpiryTime()
@@ -161,6 +170,8 @@ private[scheduler] class BlacklistTracker (
s" task failures in successful task sets")
val node = failuresInTaskSet.node
executorIdToBlacklistStatus.put(exec, BlacklistedExecutor(node, expiryTimeForNewBlacklists))
+ listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, newTotal))
+ executorIdToFailureList.remove(exec)
updateNextExpiryTime()
// In addition to blacklisting the executor, we also update the data for failures on the
@@ -174,6 +185,7 @@ private[scheduler] class BlacklistTracker (
logInfo(s"Blacklisting node $node because it has ${blacklistedExecsOnNode.size} " +
s"executors blacklisted: ${blacklistedExecsOnNode}")
nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists)
+ listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size))
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index f39565edd2..af9bdefc96 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -193,6 +193,22 @@ private[spark] class EventLoggingListener(
logEvent(event, flushLogger = true)
}
+ override def onExecutorBlacklisted(event: SparkListenerExecutorBlacklisted): Unit = {
+ logEvent(event, flushLogger = true)
+ }
+
+ override def onExecutorUnblacklisted(event: SparkListenerExecutorUnblacklisted): Unit = {
+ logEvent(event, flushLogger = true)
+ }
+
+ override def onNodeBlacklisted(event: SparkListenerNodeBlacklisted): Unit = {
+ logEvent(event, flushLogger = true)
+ }
+
+ override def onNodeUnblacklisted(event: SparkListenerNodeUnblacklisted): Unit = {
+ logEvent(event, flushLogger = true)
+ }
+
// No-op because logging every update would be overkill
override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 7618dfeeed..1b12af7586 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -106,6 +106,28 @@ case class SparkListenerExecutorRemoved(time: Long, executorId: String, reason:
extends SparkListenerEvent
@DeveloperApi
+case class SparkListenerExecutorBlacklisted(
+ time: Long,
+ executorId: String,
+ taskFailures: Int)
+ extends SparkListenerEvent
+
+@DeveloperApi
+case class SparkListenerExecutorUnblacklisted(time: Long, executorId: String)
+ extends SparkListenerEvent
+
+@DeveloperApi
+case class SparkListenerNodeBlacklisted(
+ time: Long,
+ hostId: String,
+ executorFailures: Int)
+ extends SparkListenerEvent
+
+@DeveloperApi
+case class SparkListenerNodeUnblacklisted(time: Long, hostId: String)
+ extends SparkListenerEvent
+
+@DeveloperApi
case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends SparkListenerEvent
/**
@@ -239,6 +261,26 @@ private[spark] trait SparkListenerInterface {
def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit
/**
+ * Called when the driver blacklists an executor for a Spark application.
+ */
+ def onExecutorBlacklisted(executorBlacklisted: SparkListenerExecutorBlacklisted): Unit
+
+ /**
+ * Called when the driver re-enables a previously blacklisted executor.
+ */
+ def onExecutorUnblacklisted(executorUnblacklisted: SparkListenerExecutorUnblacklisted): Unit
+
+ /**
+ * Called when the driver blacklists a node for a Spark application.
+ */
+ def onNodeBlacklisted(nodeBlacklisted: SparkListenerNodeBlacklisted): Unit
+
+ /**
+ * Called when the driver re-enables a previously blacklisted node.
+ */
+ def onNodeUnblacklisted(nodeUnblacklisted: SparkListenerNodeUnblacklisted): Unit
+
+ /**
* Called when the driver receives a block update info.
*/
def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit
@@ -293,6 +335,18 @@ abstract class SparkListener extends SparkListenerInterface {
override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { }
+ override def onExecutorBlacklisted(
+ executorBlacklisted: SparkListenerExecutorBlacklisted): Unit = { }
+
+ override def onExecutorUnblacklisted(
+ executorUnblacklisted: SparkListenerExecutorUnblacklisted): Unit = { }
+
+ override def onNodeBlacklisted(
+ nodeBlacklisted: SparkListenerNodeBlacklisted): Unit = { }
+
+ override def onNodeUnblacklisted(
+ nodeUnblacklisted: SparkListenerNodeUnblacklisted): Unit = { }
+
override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { }
override def onOtherEvent(event: SparkListenerEvent): Unit = { }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index 471586ac08..3ff363321e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -61,6 +61,14 @@ private[spark] trait SparkListenerBus
listener.onExecutorAdded(executorAdded)
case executorRemoved: SparkListenerExecutorRemoved =>
listener.onExecutorRemoved(executorRemoved)
+ case executorBlacklisted: SparkListenerExecutorBlacklisted =>
+ listener.onExecutorBlacklisted(executorBlacklisted)
+ case executorUnblacklisted: SparkListenerExecutorUnblacklisted =>
+ listener.onExecutorUnblacklisted(executorUnblacklisted)
+ case nodeBlacklisted: SparkListenerNodeBlacklisted =>
+ listener.onNodeBlacklisted(nodeBlacklisted)
+ case nodeUnblacklisted: SparkListenerNodeUnblacklisted =>
+ listener.onNodeUnblacklisted(nodeUnblacklisted)
case blockUpdated: SparkListenerBlockUpdated =>
listener.onBlockUpdated(blockUpdated)
case logStart: SparkListenerLogStart => // ignore event log metadata
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 9a8e313f9e..72ed55af41 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -63,14 +63,14 @@ private[spark] class TaskSchedulerImpl private[scheduler](
this(
sc,
sc.conf.get(config.MAX_TASK_FAILURES),
- TaskSchedulerImpl.maybeCreateBlacklistTracker(sc.conf))
+ TaskSchedulerImpl.maybeCreateBlacklistTracker(sc))
}
def this(sc: SparkContext, maxTaskFailures: Int, isLocal: Boolean) = {
this(
sc,
maxTaskFailures,
- TaskSchedulerImpl.maybeCreateBlacklistTracker(sc.conf),
+ TaskSchedulerImpl.maybeCreateBlacklistTracker(sc),
isLocal = isLocal)
}
@@ -717,9 +717,9 @@ private[spark] object TaskSchedulerImpl {
retval.toList
}
- private def maybeCreateBlacklistTracker(conf: SparkConf): Option[BlacklistTracker] = {
- if (BlacklistTracker.isBlacklistEnabled(conf)) {
- Some(new BlacklistTracker(conf))
+ private def maybeCreateBlacklistTracker(sc: SparkContext): Option[BlacklistTracker] = {
+ if (BlacklistTracker.isBlacklistEnabled(sc.conf)) {
+ Some(new BlacklistTracker(sc))
} else {
None
}
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index 44a929b310..7d035b11fa 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -73,6 +73,7 @@ class ExecutorSummary private[spark](
val totalInputBytes: Long,
val totalShuffleRead: Long,
val totalShuffleWrite: Long,
+ val isBlacklisted: Boolean,
val maxMemory: Long,
val executorLogs: Map[String, String])
diff --git a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
index 3cc5353f47..766cc65084 100644
--- a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
+++ b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
@@ -91,6 +91,9 @@ private[spark] object ToolTips {
val TASK_TIME =
"Shaded red when garbage collection (GC) time is over 10% of task time"
+ val BLACKLISTED =
+ "Shows if this executor has been blacklisted by the scheduler due to task failures."
+
val APPLICATION_EXECUTOR_LIMIT =
"""Maximum number of executors that this application will use. This limit is finite only when
dynamic allocation is enabled. The number of granted executors may exceed the limit
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
index 7953d77fd7..2d1691e55c 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
@@ -39,6 +39,7 @@ private[ui] case class ExecutorSummaryInfo(
totalInputBytes: Long,
totalShuffleRead: Long,
totalShuffleWrite: Long,
+ isBlacklisted: Int,
maxMemory: Long,
executorLogs: Map[String, String])
@@ -101,6 +102,7 @@ private[spark] object ExecutorsPage {
taskSummary.inputBytes,
taskSummary.shuffleRead,
taskSummary.shuffleWrite,
+ taskSummary.isBlacklisted,
maxMem,
taskSummary.executorLogs
)
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index 678571fd4f..8ae712f8ed 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -53,7 +53,8 @@ private[ui] case class ExecutorTaskSummary(
var shuffleRead: Long = 0L,
var shuffleWrite: Long = 0L,
var executorLogs: Map[String, String] = Map.empty,
- var isAlive: Boolean = true
+ var isAlive: Boolean = true,
+ var isBlacklisted: Boolean = false
)
/**
@@ -73,7 +74,8 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
def deadStorageStatusList: Seq[StorageStatus] = storageStatusListener.deadStorageStatusList
- override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = synchronized {
+ override def onExecutorAdded(
+ executorAdded: SparkListenerExecutorAdded): Unit = synchronized {
val eid = executorAdded.executorId
val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid))
taskSummary.executorLogs = executorAdded.executorInfo.logUrlMap
@@ -100,7 +102,8 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
executorToTaskSummary.get(executorRemoved.executorId).foreach(e => e.isAlive = false)
}
- override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
+ override def onApplicationStart(
+ applicationStart: SparkListenerApplicationStart): Unit = {
applicationStart.driverLogs.foreach { logs =>
val storageStatus = activeStorageStatusList.find { s =>
s.blockManagerId.executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER ||
@@ -114,13 +117,15 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
}
}
- override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized {
+ override def onTaskStart(
+ taskStart: SparkListenerTaskStart): Unit = synchronized {
val eid = taskStart.taskInfo.executorId
val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid))
taskSummary.tasksActive += 1
}
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
+ override def onTaskEnd(
+ taskEnd: SparkListenerTaskEnd): Unit = synchronized {
val info = taskEnd.taskInfo
if (info != null) {
val eid = info.executorId
@@ -157,4 +162,46 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
}
}
+ private def updateExecutorBlacklist(
+ eid: String,
+ isBlacklisted: Boolean): Unit = {
+ val execTaskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid))
+ execTaskSummary.isBlacklisted = isBlacklisted
+ }
+
+ override def onExecutorBlacklisted(
+ executorBlacklisted: SparkListenerExecutorBlacklisted)
+ : Unit = synchronized {
+ updateExecutorBlacklist(executorBlacklisted.executorId, true)
+ }
+
+ override def onExecutorUnblacklisted(
+ executorUnblacklisted: SparkListenerExecutorUnblacklisted)
+ : Unit = synchronized {
+ updateExecutorBlacklist(executorUnblacklisted.executorId, false)
+ }
+
+ override def onNodeBlacklisted(
+ nodeBlacklisted: SparkListenerNodeBlacklisted)
+ : Unit = synchronized {
+ // Implicitly blacklist every executor associated with this node, and show this in the UI.
+ activeStorageStatusList.foreach { status =>
+ if (status.blockManagerId.host == nodeBlacklisted.hostId) {
+ updateExecutorBlacklist(status.blockManagerId.executorId, true)
+ }
+ }
+ }
+
+ override def onNodeUnblacklisted(
+ nodeUnblacklisted: SparkListenerNodeUnblacklisted)
+ : Unit = synchronized {
+ // Implicitly unblacklist every executor associated with this node, regardless of how
+ // they may have been blacklisted initially (either explicitly through executor blacklisting
+ // or implicitly through node blacklisting). Show this in the UI.
+ activeStorageStatusList.foreach { status =>
+ if (status.blockManagerId.host == nodeUnblacklisted.hostId) {
+ updateExecutorBlacklist(status.blockManagerId.executorId, false)
+ }
+ }
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index 9fb3f35fd9..cd1b02addc 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -85,6 +85,11 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage
<th>Shuffle Spill (Memory)</th>
<th>Shuffle Spill (Disk)</th>
}}
+ <th>
+ <span data-toggle="tooltip" title={ToolTips.BLACKLISTED}>
+ Blacklisted
+ </span>
+ </th>
</thead>
<tbody>
{createExecutorTable()}
@@ -160,6 +165,7 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage
{Utils.bytesToString(v.diskBytesSpilled)}
</td>
}}
+ <td>{v.isBlacklisted}</td>
</tr>
}
case None =>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
index 9ce8542f02..371dad966d 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -43,6 +43,7 @@ private[spark] object UIData {
var shuffleWriteRecords : Long = 0
var memoryBytesSpilled : Long = 0
var diskBytesSpilled : Long = 0
+ var isBlacklisted : Int = 0
}
class JobUIData(
@@ -92,6 +93,7 @@ private[spark] object UIData {
var shuffleWriteRecords: Long = _
var memoryBytesSpilled: Long = _
var diskBytesSpilled: Long = _
+ var isBlacklisted: Int = _
var schedulingPool: String = ""
var description: Option[String] = None