aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-01-14 09:50:57 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-01-14 09:50:57 -0800
commit501e99ef0fbd2f2165095548fe67a3447ccbfc91 (patch)
tree3805cc1ccc2f0074b220dca06c2821743cabfaa9 /core
parent56cdbd654d54bf07a063a03a5c34c4165818eeb2 (diff)
downloadspark-501e99ef0fbd2f2165095548fe67a3447ccbfc91.tar.gz
spark-501e99ef0fbd2f2165095548fe67a3447ccbfc91.tar.bz2
spark-501e99ef0fbd2f2165095548fe67a3447ccbfc91.zip
[SPARK-12784][UI] Fix Spark UI IndexOutOfBoundsException with dynamic allocation
Add `listener.synchronized` to get `storageStatusList` and `execInfo` atomically. Author: Shixiong Zhu <shixiong@databricks.com> Closes #10728 from zsxwing/SPARK-12784.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala13
2 files changed, 17 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala
index 8ad4656b4d..3bdba92232 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala
@@ -28,9 +28,13 @@ private[v1] class ExecutorListResource(ui: SparkUI) {
@GET
def executorList(): Seq[ExecutorSummary] = {
val listener = ui.executorsListener
- val storageStatusList = listener.storageStatusList
- (0 until storageStatusList.size).map { statusId =>
- ExecutorsPage.getExecInfo(listener, statusId)
+ listener.synchronized {
+ // The follow codes should be protected by `listener` to make sure no executors will be
+ // removed before we query their status. See SPARK-12784.
+ val storageStatusList = listener.storageStatusList
+ (0 until storageStatusList.size).map { statusId =>
+ ExecutorsPage.getExecInfo(listener, statusId)
+ }
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
index 1a29b0f412..7072a152d6 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
@@ -52,12 +52,19 @@ private[ui] class ExecutorsPage(
private val listener = parent.listener
def render(request: HttpServletRequest): Seq[Node] = {
- val storageStatusList = listener.storageStatusList
+ val (storageStatusList, execInfo) = listener.synchronized {
+ // The follow codes should be protected by `listener` to make sure no executors will be
+ // removed before we query their status. See SPARK-12784.
+ val _storageStatusList = listener.storageStatusList
+ val _execInfo = {
+ for (statusId <- 0 until _storageStatusList.size)
+ yield ExecutorsPage.getExecInfo(listener, statusId)
+ }
+ (_storageStatusList, _execInfo)
+ }
val maxMem = storageStatusList.map(_.maxMem).sum
val memUsed = storageStatusList.map(_.memUsed).sum
val diskUsed = storageStatusList.map(_.diskUsed).sum
- val execInfo = for (statusId <- 0 until storageStatusList.size) yield
- ExecutorsPage.getExecInfo(listener, statusId)
val execInfoSorted = execInfo.sortBy(_.id)
val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty