aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org
diff options
context:
space:
mode:
authorJosé Hiram Soltren <jose@cloudera.com>2017-01-19 09:08:18 -0600
committerImran Rashid <irashid@cloudera.com>2017-01-19 09:08:18 -0600
commit640f942337e1ce87075195998bd051e19c4b50b9 (patch)
treeb1904dc723de8a075e95cd53b3de8ca1a495efe4 /core/src/main/scala/org
parent064fadd2a25d1c118e062e505a0ed56be31bdf34 (diff)
downloadspark-640f942337e1ce87075195998bd051e19c4b50b9.tar.gz
spark-640f942337e1ce87075195998bd051e19c4b50b9.tar.bz2
spark-640f942337e1ce87075195998bd051e19c4b50b9.zip
[SPARK-16654][CORE] Add UI coverage for Application Level Blacklisting
Builds on top of work in SPARK-8425 to update Application Level Blacklisting in the scheduler. ## What changes were proposed in this pull request? Adds a UI to these patches by: - defining new listener events for blacklisting and unblacklisting, nodes and executors; - sending said events at the relevant points in BlacklistTracker; - adding JSON (de)serialization code for these events; - augmenting the Executors UI page to show which, and how many, executors are blacklisted; - adding a unit test to make sure events are being fired; - adding HistoryServerSuite coverage to verify that the SHS reads these events correctly. - updates the Executor UI to show Blacklisted/Active/Dead as a tri-state in Executors Status Updates .rat-excludes to pass tests. username squito ## How was this patch tested? ./dev/run-tests testOnly org.apache.spark.util.JsonProtocolSuite testOnly org.apache.spark.scheduler.BlacklistTrackerSuite testOnly org.apache.spark.deploy.history.HistoryServerSuite https://github.com/jsoltren/jose-utils/blob/master/blacklist/test-blacklist.sh ![blacklist-20161219](https://cloud.githubusercontent.com/assets/1208477/21335321/9eda320a-c623-11e6-8b8c-9c912a73c276.jpg) Author: José Hiram Soltren <jose@cloudera.com> Closes #16346 from jsoltren/SPARK-16654-submit.
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala54
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/api.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/ui/ToolTips.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala57
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala2
11 files changed, 163 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
index bf7a62ea33..77d5c97a32 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicReference
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.util.{Clock, SystemClock, Utils}
@@ -48,9 +48,14 @@ import org.apache.spark.util.{Clock, SystemClock, Utils}
* one exception is [[nodeBlacklist()]], which can be called without holding a lock.
*/
private[scheduler] class BlacklistTracker (
+ private val listenerBus: LiveListenerBus,
conf: SparkConf,
clock: Clock = new SystemClock()) extends Logging {
+ def this(sc: SparkContext) = {
+ this(sc.listenerBus, sc.conf)
+ }
+
BlacklistTracker.validateBlacklistConfs(conf)
private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC)
private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE)
@@ -103,6 +108,7 @@ private[scheduler] class BlacklistTracker (
execsToUnblacklist.foreach { exec =>
val status = executorIdToBlacklistStatus.remove(exec).get
val failedExecsOnNode = nodeToBlacklistedExecs(status.node)
+ listenerBus.post(SparkListenerExecutorUnblacklisted(now, exec))
failedExecsOnNode.remove(exec)
if (failedExecsOnNode.isEmpty) {
nodeToBlacklistedExecs.remove(status.node)
@@ -114,7 +120,10 @@ private[scheduler] class BlacklistTracker (
// Un-blacklist any nodes that have been blacklisted longer than the blacklist timeout.
logInfo(s"Removing nodes $nodesToUnblacklist from blacklist because the blacklist " +
s"has timed out")
- nodeIdToBlacklistExpiryTime --= nodesToUnblacklist
+ nodesToUnblacklist.foreach { node =>
+ nodeIdToBlacklistExpiryTime.remove(node)
+ listenerBus.post(SparkListenerNodeUnblacklisted(now, node))
+ }
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
}
updateNextExpiryTime()
@@ -161,6 +170,8 @@ private[scheduler] class BlacklistTracker (
s" task failures in successful task sets")
val node = failuresInTaskSet.node
executorIdToBlacklistStatus.put(exec, BlacklistedExecutor(node, expiryTimeForNewBlacklists))
+ listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, newTotal))
+ executorIdToFailureList.remove(exec)
updateNextExpiryTime()
// In addition to blacklisting the executor, we also update the data for failures on the
@@ -174,6 +185,7 @@ private[scheduler] class BlacklistTracker (
logInfo(s"Blacklisting node $node because it has ${blacklistedExecsOnNode.size} " +
s"executors blacklisted: ${blacklistedExecsOnNode}")
nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists)
+ listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size))
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index f39565edd2..af9bdefc96 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -193,6 +193,22 @@ private[spark] class EventLoggingListener(
logEvent(event, flushLogger = true)
}
+ override def onExecutorBlacklisted(event: SparkListenerExecutorBlacklisted): Unit = {
+ logEvent(event, flushLogger = true)
+ }
+
+ override def onExecutorUnblacklisted(event: SparkListenerExecutorUnblacklisted): Unit = {
+ logEvent(event, flushLogger = true)
+ }
+
+ override def onNodeBlacklisted(event: SparkListenerNodeBlacklisted): Unit = {
+ logEvent(event, flushLogger = true)
+ }
+
+ override def onNodeUnblacklisted(event: SparkListenerNodeUnblacklisted): Unit = {
+ logEvent(event, flushLogger = true)
+ }
+
// No-op because logging every update would be overkill
override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 7618dfeeed..1b12af7586 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -106,6 +106,28 @@ case class SparkListenerExecutorRemoved(time: Long, executorId: String, reason:
extends SparkListenerEvent
@DeveloperApi
+case class SparkListenerExecutorBlacklisted(
+ time: Long,
+ executorId: String,
+ taskFailures: Int)
+ extends SparkListenerEvent
+
+@DeveloperApi
+case class SparkListenerExecutorUnblacklisted(time: Long, executorId: String)
+ extends SparkListenerEvent
+
+@DeveloperApi
+case class SparkListenerNodeBlacklisted(
+ time: Long,
+ hostId: String,
+ executorFailures: Int)
+ extends SparkListenerEvent
+
+@DeveloperApi
+case class SparkListenerNodeUnblacklisted(time: Long, hostId: String)
+ extends SparkListenerEvent
+
+@DeveloperApi
case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends SparkListenerEvent
/**
@@ -239,6 +261,26 @@ private[spark] trait SparkListenerInterface {
def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit
/**
+ * Called when the driver blacklists an executor for a Spark application.
+ */
+ def onExecutorBlacklisted(executorBlacklisted: SparkListenerExecutorBlacklisted): Unit
+
+ /**
+ * Called when the driver re-enables a previously blacklisted executor.
+ */
+ def onExecutorUnblacklisted(executorUnblacklisted: SparkListenerExecutorUnblacklisted): Unit
+
+ /**
+ * Called when the driver blacklists a node for a Spark application.
+ */
+ def onNodeBlacklisted(nodeBlacklisted: SparkListenerNodeBlacklisted): Unit
+
+ /**
+ * Called when the driver re-enables a previously blacklisted node.
+ */
+ def onNodeUnblacklisted(nodeUnblacklisted: SparkListenerNodeUnblacklisted): Unit
+
+ /**
* Called when the driver receives a block update info.
*/
def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit
@@ -293,6 +335,18 @@ abstract class SparkListener extends SparkListenerInterface {
override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { }
+ override def onExecutorBlacklisted(
+ executorBlacklisted: SparkListenerExecutorBlacklisted): Unit = { }
+
+ override def onExecutorUnblacklisted(
+ executorUnblacklisted: SparkListenerExecutorUnblacklisted): Unit = { }
+
+ override def onNodeBlacklisted(
+ nodeBlacklisted: SparkListenerNodeBlacklisted): Unit = { }
+
+ override def onNodeUnblacklisted(
+ nodeUnblacklisted: SparkListenerNodeUnblacklisted): Unit = { }
+
override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { }
override def onOtherEvent(event: SparkListenerEvent): Unit = { }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index 471586ac08..3ff363321e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -61,6 +61,14 @@ private[spark] trait SparkListenerBus
listener.onExecutorAdded(executorAdded)
case executorRemoved: SparkListenerExecutorRemoved =>
listener.onExecutorRemoved(executorRemoved)
+ case executorBlacklisted: SparkListenerExecutorBlacklisted =>
+ listener.onExecutorBlacklisted(executorBlacklisted)
+ case executorUnblacklisted: SparkListenerExecutorUnblacklisted =>
+ listener.onExecutorUnblacklisted(executorUnblacklisted)
+ case nodeBlacklisted: SparkListenerNodeBlacklisted =>
+ listener.onNodeBlacklisted(nodeBlacklisted)
+ case nodeUnblacklisted: SparkListenerNodeUnblacklisted =>
+ listener.onNodeUnblacklisted(nodeUnblacklisted)
case blockUpdated: SparkListenerBlockUpdated =>
listener.onBlockUpdated(blockUpdated)
case logStart: SparkListenerLogStart => // ignore event log metadata
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 9a8e313f9e..72ed55af41 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -63,14 +63,14 @@ private[spark] class TaskSchedulerImpl private[scheduler](
this(
sc,
sc.conf.get(config.MAX_TASK_FAILURES),
- TaskSchedulerImpl.maybeCreateBlacklistTracker(sc.conf))
+ TaskSchedulerImpl.maybeCreateBlacklistTracker(sc))
}
def this(sc: SparkContext, maxTaskFailures: Int, isLocal: Boolean) = {
this(
sc,
maxTaskFailures,
- TaskSchedulerImpl.maybeCreateBlacklistTracker(sc.conf),
+ TaskSchedulerImpl.maybeCreateBlacklistTracker(sc),
isLocal = isLocal)
}
@@ -717,9 +717,9 @@ private[spark] object TaskSchedulerImpl {
retval.toList
}
- private def maybeCreateBlacklistTracker(conf: SparkConf): Option[BlacklistTracker] = {
- if (BlacklistTracker.isBlacklistEnabled(conf)) {
- Some(new BlacklistTracker(conf))
+ private def maybeCreateBlacklistTracker(sc: SparkContext): Option[BlacklistTracker] = {
+ if (BlacklistTracker.isBlacklistEnabled(sc.conf)) {
+ Some(new BlacklistTracker(sc))
} else {
None
}
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index 44a929b310..7d035b11fa 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -73,6 +73,7 @@ class ExecutorSummary private[spark](
val totalInputBytes: Long,
val totalShuffleRead: Long,
val totalShuffleWrite: Long,
+ val isBlacklisted: Boolean,
val maxMemory: Long,
val executorLogs: Map[String, String])
diff --git a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
index 3cc5353f47..766cc65084 100644
--- a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
+++ b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
@@ -91,6 +91,9 @@ private[spark] object ToolTips {
val TASK_TIME =
"Shaded red when garbage collection (GC) time is over 10% of task time"
+ val BLACKLISTED =
+ "Shows if this executor has been blacklisted by the scheduler due to task failures."
+
val APPLICATION_EXECUTOR_LIMIT =
"""Maximum number of executors that this application will use. This limit is finite only when
dynamic allocation is enabled. The number of granted executors may exceed the limit
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
index 7953d77fd7..2d1691e55c 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
@@ -39,6 +39,7 @@ private[ui] case class ExecutorSummaryInfo(
totalInputBytes: Long,
totalShuffleRead: Long,
totalShuffleWrite: Long,
+ isBlacklisted: Int,
maxMemory: Long,
executorLogs: Map[String, String])
@@ -101,6 +102,7 @@ private[spark] object ExecutorsPage {
taskSummary.inputBytes,
taskSummary.shuffleRead,
taskSummary.shuffleWrite,
+ taskSummary.isBlacklisted,
maxMem,
taskSummary.executorLogs
)
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index 678571fd4f..8ae712f8ed 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -53,7 +53,8 @@ private[ui] case class ExecutorTaskSummary(
var shuffleRead: Long = 0L,
var shuffleWrite: Long = 0L,
var executorLogs: Map[String, String] = Map.empty,
- var isAlive: Boolean = true
+ var isAlive: Boolean = true,
+ var isBlacklisted: Boolean = false
)
/**
@@ -73,7 +74,8 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
def deadStorageStatusList: Seq[StorageStatus] = storageStatusListener.deadStorageStatusList
- override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = synchronized {
+ override def onExecutorAdded(
+ executorAdded: SparkListenerExecutorAdded): Unit = synchronized {
val eid = executorAdded.executorId
val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid))
taskSummary.executorLogs = executorAdded.executorInfo.logUrlMap
@@ -100,7 +102,8 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
executorToTaskSummary.get(executorRemoved.executorId).foreach(e => e.isAlive = false)
}
- override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
+ override def onApplicationStart(
+ applicationStart: SparkListenerApplicationStart): Unit = {
applicationStart.driverLogs.foreach { logs =>
val storageStatus = activeStorageStatusList.find { s =>
s.blockManagerId.executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER ||
@@ -114,13 +117,15 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
}
}
- override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized {
+ override def onTaskStart(
+ taskStart: SparkListenerTaskStart): Unit = synchronized {
val eid = taskStart.taskInfo.executorId
val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid))
taskSummary.tasksActive += 1
}
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
+ override def onTaskEnd(
+ taskEnd: SparkListenerTaskEnd): Unit = synchronized {
val info = taskEnd.taskInfo
if (info != null) {
val eid = info.executorId
@@ -157,4 +162,46 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
}
}
+ private def updateExecutorBlacklist(
+ eid: String,
+ isBlacklisted: Boolean): Unit = {
+ val execTaskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid))
+ execTaskSummary.isBlacklisted = isBlacklisted
+ }
+
+ override def onExecutorBlacklisted(
+ executorBlacklisted: SparkListenerExecutorBlacklisted)
+ : Unit = synchronized {
+ updateExecutorBlacklist(executorBlacklisted.executorId, true)
+ }
+
+ override def onExecutorUnblacklisted(
+ executorUnblacklisted: SparkListenerExecutorUnblacklisted)
+ : Unit = synchronized {
+ updateExecutorBlacklist(executorUnblacklisted.executorId, false)
+ }
+
+ override def onNodeBlacklisted(
+ nodeBlacklisted: SparkListenerNodeBlacklisted)
+ : Unit = synchronized {
+ // Implicitly blacklist every executor associated with this node, and show this in the UI.
+ activeStorageStatusList.foreach { status =>
+ if (status.blockManagerId.host == nodeBlacklisted.hostId) {
+ updateExecutorBlacklist(status.blockManagerId.executorId, true)
+ }
+ }
+ }
+
+ override def onNodeUnblacklisted(
+ nodeUnblacklisted: SparkListenerNodeUnblacklisted)
+ : Unit = synchronized {
+ // Implicitly unblacklist every executor associated with this node, regardless of how
+ // they may have been blacklisted initially (either explicitly through executor blacklisting
+ // or implicitly through node blacklisting). Show this in the UI.
+ activeStorageStatusList.foreach { status =>
+ if (status.blockManagerId.host == nodeUnblacklisted.hostId) {
+ updateExecutorBlacklist(status.blockManagerId.executorId, false)
+ }
+ }
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index 9fb3f35fd9..cd1b02addc 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -85,6 +85,11 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage
<th>Shuffle Spill (Memory)</th>
<th>Shuffle Spill (Disk)</th>
}}
+ <th>
+ <span data-toggle="tooltip" title={ToolTips.BLACKLISTED}>
+ Blacklisted
+ </span>
+ </th>
</thead>
<tbody>
{createExecutorTable()}
@@ -160,6 +165,7 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage
{Utils.bytesToString(v.diskBytesSpilled)}
</td>
}}
+ <td>{v.isBlacklisted}</td>
</tr>
}
case None =>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
index 9ce8542f02..371dad966d 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -43,6 +43,7 @@ private[spark] object UIData {
var shuffleWriteRecords : Long = 0
var memoryBytesSpilled : Long = 0
var diskBytesSpilled : Long = 0
+ var isBlacklisted : Int = 0
}
class JobUIData(
@@ -92,6 +93,7 @@ private[spark] object UIData {
var shuffleWriteRecords: Long = _
var memoryBytesSpilled: Long = _
var diskBytesSpilled: Long = _
+ var isBlacklisted: Int = _
var schedulingPool: String = ""
var description: Option[String] = None