aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLianhui Wang <lianhuiwang09@gmail.com>2016-02-23 11:08:39 -0800
committerAndrew Or <andrew@databricks.com>2016-02-23 11:08:39 -0800
commit9f4263392e492b5bc0acecec2712438ff9a257b7 (patch)
tree88c559539bf463d31d3f84d8a87794868ce88c35
parent5d69eaf097bfb9fad9f6e4433c6cd40ba0552a56 (diff)
downloadspark-9f4263392e492b5bc0acecec2712438ff9a257b7.tar.gz
spark-9f4263392e492b5bc0acecec2712438ff9a257b7.tar.bz2
spark-9f4263392e492b5bc0acecec2712438ff9a257b7.zip
[SPARK-7729][UI] Executor which has been killed should also be displayed on Executor Tab
andrewor14 squito Dead Executors should also be displayed on Executor Tab. as following: ![image](https://cloud.githubusercontent.com/assets/545478/11492707/ae55d7f6-982b-11e5-919a-b62cd84684b2.png) Author: Lianhui Wang <lianhuiwang09@gmail.com> This patch had conflicts when merged, resolved by Committer: Andrew Or <andrew@databricks.com> Closes #10058 from lianhuiwang/SPARK-7729.
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/api.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/ui/SparkUI.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala83
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala4
-rw-r--r--core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json1
-rw-r--r--core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala16
-rw-r--r--core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala4
-rw-r--r--docs/configuration.md7
-rw-r--r--project/MimaExcludes.scala3
13 files changed, 108 insertions, 44 deletions
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala
index 645ede26a0..7750a09623 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala
@@ -28,7 +28,7 @@ private[v1] class AllRDDResource(ui: SparkUI) {
@GET
def rddList(): Seq[RDDStorageInfo] = {
- val storageStatusList = ui.storageListener.storageStatusList
+ val storageStatusList = ui.storageListener.activeStorageStatusList
val rddInfos = ui.storageListener.rddInfoList
rddInfos.map{rddInfo =>
AllRDDResource.getRDDStorageInfo(rddInfo.id, rddInfo, storageStatusList,
@@ -44,7 +44,7 @@ private[spark] object AllRDDResource {
rddId: Int,
listener: StorageListener,
includeDetails: Boolean): Option[RDDStorageInfo] = {
- val storageStatusList = listener.storageStatusList
+ val storageStatusList = listener.activeStorageStatusList
listener.rddInfoList.find { _.id == rddId }.map { rddInfo =>
getRDDStorageInfo(rddId, rddInfo, storageStatusList, includeDetails)
}
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala
index 3bdba92232..6ca59c2f3c 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala
@@ -31,9 +31,9 @@ private[v1] class ExecutorListResource(ui: SparkUI) {
listener.synchronized {
// The follow codes should be protected by `listener` to make sure no executors will be
// removed before we query their status. See SPARK-12784.
- val storageStatusList = listener.storageStatusList
+ val storageStatusList = listener.activeStorageStatusList
(0 until storageStatusList.size).map { statusId =>
- ExecutorsPage.getExecInfo(listener, statusId)
+ ExecutorsPage.getExecInfo(listener, statusId, isActive = true)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index d116e68c17..909dd0c07e 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -54,6 +54,7 @@ class ExecutorStageSummary private[spark](
class ExecutorSummary private[spark](
val id: String,
val hostPort: String,
+ val isActive: Boolean,
val rddBlocks: Int,
val memoryUsed: Long,
val diskUsed: Long,
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
index d98aae8ff0..f552b498a7 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
@@ -19,6 +19,7 @@ package org.apache.spark.storage
import scala.collection.mutable
+import org.apache.spark.SparkConf
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._
@@ -29,14 +30,20 @@ import org.apache.spark.scheduler._
* This class is thread-safe (unlike JobProgressListener)
*/
@DeveloperApi
-class StorageStatusListener extends SparkListener {
+class StorageStatusListener(conf: SparkConf) extends SparkListener {
// This maintains only blocks that are cached (i.e. storage level is not StorageLevel.NONE)
private[storage] val executorIdToStorageStatus = mutable.Map[String, StorageStatus]()
+ private[storage] val deadExecutorStorageStatus = new mutable.ListBuffer[StorageStatus]()
+ private[this] val retainedDeadExecutors = conf.getInt("spark.ui.retainedDeadExecutors", 100)
def storageStatusList: Seq[StorageStatus] = synchronized {
executorIdToStorageStatus.values.toSeq
}
+ def deadStorageStatusList: Seq[StorageStatus] = synchronized {
+ deadExecutorStorageStatus.toSeq
+ }
+
/** Update storage status list to reflect updated block statuses */
private def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) {
executorIdToStorageStatus.get(execId).foreach { storageStatus =>
@@ -87,8 +94,12 @@ class StorageStatusListener extends SparkListener {
override def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved) {
synchronized {
val executorId = blockManagerRemoved.blockManagerId.executorId
- executorIdToStorageStatus.remove(executorId)
+ executorIdToStorageStatus.remove(executorId).foreach { status =>
+ deadExecutorStorageStatus += status
+ }
+ if (deadExecutorStorageStatus.size > retainedDeadExecutors) {
+ deadExecutorStorageStatus.trimStart(1)
+ }
}
}
-
}
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index 6cc30eeaf5..5324a76829 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -196,7 +196,7 @@ private[spark] object SparkUI {
}
val environmentListener = new EnvironmentListener
- val storageStatusListener = new StorageStatusListener
+ val storageStatusListener = new StorageStatusListener(conf)
val executorsListener = new ExecutorsListener(storageStatusListener, conf)
val storageListener = new StorageListener(storageStatusListener)
val operationGraphListener = new RDDOperationGraphListener(conf)
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
index e1f7549999..eba7a312ba 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
@@ -54,24 +54,30 @@ private[ui] class ExecutorsPage(
private val GCTimePercent = 0.1
def render(request: HttpServletRequest): Seq[Node] = {
- val (storageStatusList, execInfo) = listener.synchronized {
+ val (activeExecutorInfo, deadExecutorInfo) = listener.synchronized {
// The follow codes should be protected by `listener` to make sure no executors will be
// removed before we query their status. See SPARK-12784.
- val _storageStatusList = listener.storageStatusList
- val _execInfo = {
- for (statusId <- 0 until _storageStatusList.size)
- yield ExecutorsPage.getExecInfo(listener, statusId)
+ val _activeExecutorInfo = {
+ for (statusId <- 0 until listener.activeStorageStatusList.size)
+ yield ExecutorsPage.getExecInfo(listener, statusId, isActive = true)
}
- (_storageStatusList, _execInfo)
+ val _deadExecutorInfo = {
+ for (statusId <- 0 until listener.deadStorageStatusList.size)
+ yield ExecutorsPage.getExecInfo(listener, statusId, isActive = false)
+ }
+ (_activeExecutorInfo, _deadExecutorInfo)
}
+
+ val execInfo = activeExecutorInfo ++ deadExecutorInfo
val execInfoSorted = execInfo.sortBy(_.id)
val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty
- val execTable =
+ val execTable = {
<table class={UIUtils.TABLE_CLASS_STRIPED_SORTABLE}>
<thead>
<th>Executor ID</th>
<th>Address</th>
+ <th>Status</th>
<th>RDD Blocks</th>
<th><span data-toggle="tooltip" title={ToolTips.STORAGE_MEMORY}>Storage Memory</span></th>
<th>Disk Used</th>
@@ -98,22 +104,28 @@ private[ui] class ExecutorsPage(
{execInfoSorted.map(execRow(_, logsExist))}
</tbody>
</table>
+ }
val content =
<div class="row">
<div class="span12">
- <h4>Totals for {execInfo.size} Executors</h4>
- {execSummary(execInfo)}
+ <h4>Dead Executors({deadExecutorInfo.size})</h4>
+ </div>
+ </div>
+ <div class="row">
+ <div class="span12">
+ <h4>Active Executors({activeExecutorInfo.size})</h4>
+ {execSummary(activeExecutorInfo)}
</div>
</div>
<div class = "row">
<div class="span12">
- <h4>Active Executors</h4>
+ <h4>Executors</h4>
{execTable}
</div>
</div>;
- UIUtils.headerSparkPage("Executors (" + execInfo.size + ")", content, parent)
+ UIUtils.headerSparkPage("Executors", content, parent)
}
/** Render an HTML row representing an executor */
@@ -121,9 +133,19 @@ private[ui] class ExecutorsPage(
val maximumMemory = info.maxMemory
val memoryUsed = info.memoryUsed
val diskUsed = info.diskUsed
+ val executorStatus =
+ if (info.isActive) {
+ "Active"
+ } else {
+ "Dead"
+ }
+
<tr>
<td>{info.id}</td>
<td>{info.hostPort}</td>
+ <td sorttable_customkey={executorStatus.toString}>
+ {executorStatus}
+ </td>
<td>{info.rddBlocks}</td>
<td sorttable_customkey={memoryUsed.toString}>
{Utils.bytesToString(memoryUsed)} /
@@ -161,10 +183,14 @@ private[ui] class ExecutorsPage(
}
{
if (threadDumpEnabled) {
- val encodedId = URLEncoder.encode(info.id, "UTF-8")
- <td>
- <a href={s"threadDump/?executorId=${encodedId}"}>Thread Dump</a>
- </td>
+ if (info.isActive) {
+ val encodedId = URLEncoder.encode(info.id, "UTF-8")
+ <td>
+ <a href={s"threadDump/?executorId=${encodedId}"}>Thread Dump</a>
+ </td>
+ } else {
+ <td> </td>
+ }
} else {
Seq.empty
}
@@ -236,14 +262,13 @@ private[ui] class ExecutorsPage(
}
private def taskData(
- maxTasks: Int,
- activeTasks: Int,
- failedTasks: Int,
- completedTasks: Int,
- totalTasks: Int,
- totalDuration: Long,
- totalGCTime: Long):
- Seq[Node] = {
+ maxTasks: Int,
+ activeTasks: Int,
+ failedTasks: Int,
+ completedTasks: Int,
+ totalTasks: Int,
+ totalDuration: Long,
+ totalGCTime: Long): Seq[Node] = {
// Determine Color Opacity from 0.5-1
// activeTasks range from 0 to maxTasks
val activeTasksAlpha =
@@ -302,8 +327,15 @@ private[ui] class ExecutorsPage(
private[spark] object ExecutorsPage {
/** Represent an executor's info as a map given a storage status index */
- def getExecInfo(listener: ExecutorsListener, statusId: Int): ExecutorSummary = {
- val status = listener.storageStatusList(statusId)
+ def getExecInfo(
+ listener: ExecutorsListener,
+ statusId: Int,
+ isActive: Boolean): ExecutorSummary = {
+ val status = if (isActive) {
+ listener.activeStorageStatusList(statusId)
+ } else {
+ listener.deadStorageStatusList(statusId)
+ }
val execId = status.blockManagerId.executorId
val hostPort = status.blockManagerId.hostPort
val rddBlocks = status.numBlocks
@@ -326,6 +358,7 @@ private[spark] object ExecutorsPage {
new ExecutorSummary(
execId,
hostPort,
+ isActive,
rddBlocks,
memUsed,
diskUsed,
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index dcfebe92ed..788f35ec77 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -61,7 +61,9 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
val executorToLogUrls = HashMap[String, Map[String, String]]()
val executorIdToData = HashMap[String, ExecutorUIData]()
- def storageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList
+ def activeStorageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList
+
+ def deadStorageStatusList: Seq[StorageStatus] = storageStatusListener.deadStorageStatusList
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = synchronized {
val eid = executorAdded.executorId
@@ -81,7 +83,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
applicationStart.driverLogs.foreach { logs =>
- val storageStatus = storageStatusList.find { s =>
+ val storageStatus = activeStorageStatusList.find { s =>
s.blockManagerId.executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER ||
s.blockManagerId.executorId == SparkContext.DRIVER_IDENTIFIER
}
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
index f1e28b4e1e..8f75b586e1 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
@@ -43,7 +43,7 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Bloc
private[ui] val _rddInfoMap = mutable.Map[Int, RDDInfo]() // exposed for testing
- def storageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList
+ def activeStorageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList
/** Filter RDD info to include only those with cached partitions */
def rddInfoList: Seq[RDDInfo] = synchronized {
@@ -54,7 +54,7 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Bloc
private def updateRDDInfo(updatedBlocks: Seq[(BlockId, BlockStatus)]): Unit = {
val rddIdsToUpdate = updatedBlocks.flatMap { case (bid, _) => bid.asRDDId.map(_.rddId) }.toSet
val rddInfosToUpdate = _rddInfoMap.values.toSeq.filter { s => rddIdsToUpdate.contains(s.id) }
- StorageUtils.updateRddInfo(rddInfosToUpdate, storageStatusList)
+ StorageUtils.updateRddInfo(rddInfosToUpdate, activeStorageStatusList)
}
/**
diff --git a/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json
index 9d5d224e55..4a88eeee74 100644
--- a/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json
@@ -1,6 +1,7 @@
[ {
"id" : "<driver>",
"hostPort" : "localhost:57971",
+ "isActive" : true,
"rddBlocks" : 8,
"memoryUsed" : 28000128,
"diskUsed" : 0,
diff --git a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
index 9de434166b..14daa003bc 100644
--- a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.storage
-import org.apache.spark.{SparkFunSuite, Success}
+import org.apache.spark.{SparkConf, SparkFunSuite, Success}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
@@ -29,9 +29,11 @@ class StorageStatusListenerSuite extends SparkFunSuite {
private val bm2 = BlockManagerId("fat", "duck", 2)
private val taskInfo1 = new TaskInfo(0, 0, 0, 0, "big", "dog", TaskLocality.ANY, false)
private val taskInfo2 = new TaskInfo(0, 0, 0, 0, "fat", "duck", TaskLocality.ANY, false)
+ private val conf = new SparkConf()
test("block manager added/removed") {
- val listener = new StorageStatusListener
+ conf.set("spark.ui.retainedDeadExecutors", "1")
+ val listener = new StorageStatusListener(conf)
// Block manager add
assert(listener.executorIdToStorageStatus.size === 0)
@@ -53,14 +55,18 @@ class StorageStatusListenerSuite extends SparkFunSuite {
assert(listener.executorIdToStorageStatus.size === 1)
assert(!listener.executorIdToStorageStatus.get("big").isDefined)
assert(listener.executorIdToStorageStatus.get("fat").isDefined)
+ assert(listener.deadExecutorStorageStatus.size === 1)
+ assert(listener.deadExecutorStorageStatus(0).blockManagerId.executorId.equals("big"))
listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(1L, bm2))
assert(listener.executorIdToStorageStatus.size === 0)
assert(!listener.executorIdToStorageStatus.get("big").isDefined)
assert(!listener.executorIdToStorageStatus.get("fat").isDefined)
+ assert(listener.deadExecutorStorageStatus.size === 1)
+ assert(listener.deadExecutorStorageStatus(0).blockManagerId.executorId.equals("fat"))
}
test("task end without updated blocks") {
- val listener = new StorageStatusListener
+ val listener = new StorageStatusListener(conf)
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L))
val taskMetrics = new TaskMetrics
@@ -77,7 +83,7 @@ class StorageStatusListenerSuite extends SparkFunSuite {
}
test("task end with updated blocks") {
- val listener = new StorageStatusListener
+ val listener = new StorageStatusListener(conf)
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L))
val taskMetrics1 = new TaskMetrics
@@ -126,7 +132,7 @@ class StorageStatusListenerSuite extends SparkFunSuite {
}
test("unpersist RDD") {
- val listener = new StorageStatusListener
+ val listener = new StorageStatusListener(conf)
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
val taskMetrics1 = new TaskMetrics
val taskMetrics2 = new TaskMetrics
diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
index d1dbf7c155..6b7c538ac8 100644
--- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.ui.storage
import org.scalatest.BeforeAndAfter
-import org.apache.spark.{SparkFunSuite, Success}
+import org.apache.spark.{SparkConf, SparkFunSuite, Success}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.storage._
@@ -44,7 +44,7 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
before {
bus = new LiveListenerBus
- storageStatusListener = new StorageStatusListener
+ storageStatusListener = new StorageStatusListener(new SparkConf())
storageListener = new StorageListener(storageStatusListener)
bus.addListener(storageStatusListener)
bus.addListener(storageListener)
diff --git a/docs/configuration.md b/docs/configuration.md
index 568eca9b5f..4b1b00720b 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -584,6 +584,13 @@ Apart from these, the following properties are also available, and may be useful
How many finished batches the Spark UI and status APIs remember before garbage collecting.
</td>
</tr>
+<tr>
+ <td><code>spark.ui.retainedDeadExecutors</code></td>
+ <td>100</td>
+ <td>
+ How many dead executors the Spark UI and status APIs remember before garbage collecting.
+ </td>
+</tr>
</table>
#### Compression and Serialization
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 47693aa29d..b12aefcf83 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -272,6 +272,9 @@ object MimaExcludes {
// SPARK-13220 Deprecate yarn-client and yarn-cluster mode
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.SparkContext.org$apache$spark$SparkContext$$createTaskScheduler")
+ ) ++ Seq (
+ // SPARK-7729 Executor which has been killed should also be displayed on Executor Tab
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.status.api.v1.ExecutorSummary.this")
)
case v if v.startsWith("1.6") =>
Seq(