aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2014-11-03 18:18:47 -0800
committerAndrew Or <andrew@databricks.com>2014-11-03 18:18:47 -0800
commit4f035dd2cd6f1ec9059811f3495f3e0a8ec5fb84 (patch)
treed2953d26c768e91ac106ea3e84b65994f902d299 /core/src/main
parent97a466eca0a629f17e9662ca2b59eeca99142c54 (diff)
downloadspark-4f035dd2cd6f1ec9059811f3495f3e0a8ec5fb84.tar.gz
spark-4f035dd2cd6f1ec9059811f3495f3e0a8ec5fb84.tar.bz2
spark-4f035dd2cd6f1ec9059811f3495f3e0a8ec5fb84.zip
[SPARK-611] Display executor thread dumps in web UI
This patch allows executor thread dumps to be collected on-demand and viewed in the Spark web UI. The thread dumps are collected using Thread.getAllStackTraces(). To allow remote thread dumps to be triggered from the web UI, I added a new `ExecutorActor` that runs inside of the Executor actor system and responds to RPCs from the driver. The driver's mechanism for obtaining a reference to this actor is a little bit hacky: it uses the block manager master actor to determine the host/port of the executor actor systems in order to construct ActorRefs to ExecutorActor. Unfortunately, I couldn't find a much cleaner way to do this without a big refactoring of the executor -> driver communication. Screenshots: ![image](https://cloud.githubusercontent.com/assets/50748/4781793/7e7a0776-5cbf-11e4-874d-a91cd04620bd.png) ![image](https://cloud.githubusercontent.com/assets/50748/4781794/8bce76aa-5cbf-11e4-8d13-8477748c9f7e.png) ![image](https://cloud.githubusercontent.com/assets/50748/4781797/bd11a8b8-5cbf-11e4-9ad7-a7459467ec8e.png) Author: Josh Rosen <joshrosen@databricks.com> Closes #2944 from JoshRosen/jstack-in-web-ui and squashes the following commits: 3c21a5d [Josh Rosen] Address review comments: 880f7f7 [Josh Rosen] Merge remote-tracking branch 'origin/master' into jstack-in-web-ui f719266 [Josh Rosen] Merge remote-tracking branch 'origin/master' into jstack-in-web-ui 19707b0 [Josh Rosen] Add one comment. 127a130 [Josh Rosen] Update to use SparkContext.DRIVER_IDENTIFIER b8e69aa [Josh Rosen] Merge remote-tracking branch 'origin/master' into jstack-in-web-ui 3dfc2d4 [Josh Rosen] Add missing file. bc1e675 [Josh Rosen] Undo some leftover changes from the earlier approach. f4ac1c1 [Josh Rosen] Switch to on-demand collection of thread dumps dfec08b [Josh Rosen] Add option to disable thread dumps in UI. 4c87d7f [Josh Rosen] Use separate RPC for sending thread dumps. 2b8bdf3 [Josh Rosen] Enable thread dumps from the driver when running in non-local mode. cc3e6b3 [Josh Rosen] Fix test code in DAGSchedulerSuite. 87b8b65 [Josh Rosen] Add new listener event for thread dumps. 8c10216 [Josh Rosen] Add missing file. 0f198ac [Josh Rosen] [SPARK-611] Display executor thread dumps in web UI
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ExecutorActor.scala41
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala73
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/util/ThreadStackTrace.scala27
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala13
13 files changed, 247 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 8b4db78397..40444c237b 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -21,9 +21,8 @@ import scala.language.implicitConversions
import java.io._
import java.net.URI
-import java.util.Arrays
+import java.util.{Arrays, Properties, UUID}
import java.util.concurrent.atomic.AtomicInteger
-import java.util.{Properties, UUID}
import java.util.UUID.randomUUID
import scala.collection.{Map, Set}
import scala.collection.generic.Growable
@@ -41,6 +40,7 @@ import akka.actor.Props
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
+import org.apache.spark.executor.TriggerThreadDump
import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat, FixedLengthBinaryInputFormat}
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
@@ -51,7 +51,7 @@ import org.apache.spark.scheduler.local.LocalBackend
import org.apache.spark.storage._
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.jobs.JobProgressListener
-import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}
+import org.apache.spark.util._
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -361,6 +361,29 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
override protected def childValue(parent: Properties): Properties = new Properties(parent)
}
+ /**
+ * Called by the web UI to obtain executor thread dumps. This method may be expensive.
+ * Logs an error and returns None if we failed to obtain a thread dump, which could occur due
+ * to an executor being dead or unresponsive or due to network issues while sending the thread
+ * dump message back to the driver.
+ */
+ private[spark] def getExecutorThreadDump(executorId: String): Option[Array[ThreadStackTrace]] = {
+ try {
+ if (executorId == SparkContext.DRIVER_IDENTIFIER) {
+ Some(Utils.getThreadDump())
+ } else {
+ val (host, port) = env.blockManager.master.getActorSystemHostPortForExecutor(executorId).get
+ val actorRef = AkkaUtils.makeExecutorRef("ExecutorActor", conf, host, port, env.actorSystem)
+ Some(AkkaUtils.askWithReply[Array[ThreadStackTrace]](TriggerThreadDump, actorRef,
+ AkkaUtils.numRetries(conf), AkkaUtils.retryWaitMs(conf), AkkaUtils.askTimeout(conf)))
+ }
+ } catch {
+ case e: Exception =>
+ logError(s"Exception getting thread dump from executor $executorId", e)
+ None
+ }
+ }
+
private[spark] def getLocalProperties: Properties = localProperties.get()
private[spark] def setLocalProperties(props: Properties) {
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 697154d762..3711824a40 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -131,7 +131,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
// Create a new ActorSystem using driver's Spark properties to run the backend.
val driverConf = new SparkConf().setAll(props)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
- "sparkExecutor", hostname, port, driverConf, new SecurityManager(driverConf))
+ SparkEnv.executorActorSystemName,
+ hostname, port, driverConf, new SecurityManager(driverConf))
// set it
val sparkHostPort = hostname + ":" + boundPort
actorSystem.actorOf(
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index e24a15f015..8b095e23f3 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -26,7 +26,7 @@ import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.util.control.NonFatal
-import akka.actor.ActorSystem
+import akka.actor.{Props, ActorSystem}
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
@@ -92,6 +92,10 @@ private[spark] class Executor(
}
}
+ // Create an actor for receiving RPCs from the driver
+ private val executorActor = env.actorSystem.actorOf(
+ Props(new ExecutorActor(executorId)), "ExecutorActor")
+
// Create our ClassLoader
// do this after SparkEnv creation so can access the SecurityManager
private val urlClassLoader = createClassLoader()
@@ -131,6 +135,7 @@ private[spark] class Executor(
def stop() {
env.metricsSystem.report()
+ env.actorSystem.stop(executorActor)
isStopped = true
threadPool.shutdown()
if (!isLocal) {
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorActor.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorActor.scala
new file mode 100644
index 0000000000..41925f7e97
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorActor.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import akka.actor.Actor
+import org.apache.spark.Logging
+
+import org.apache.spark.util.{Utils, ActorLogReceive}
+
+/**
+ * Driver -> Executor message to trigger a thread dump.
+ */
+private[spark] case object TriggerThreadDump
+
+/**
+ * Actor that runs inside of executors to enable driver -> executor RPC.
+ */
+private[spark]
+class ExecutorActor(executorId: String) extends Actor with ActorLogReceive with Logging {
+
+ override def receiveWithLogging = {
+ case TriggerThreadDump =>
+ sender ! Utils.getThreadDump()
+ }
+
+}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index d08e1419e3..b63c7f1911 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -88,6 +88,10 @@ class BlockManagerMaster(
askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId))
}
+ def getActorSystemHostPortForExecutor(executorId: String): Option[(String, Int)] = {
+ askDriverWithReply[Option[(String, Int)]](GetActorSystemHostPortForExecutor(executorId))
+ }
+
/**
* Remove a block from the slaves that have it. This can only be used to remove
* blocks that the driver knows about.
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 5e375a2553..685b2e1144 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -86,6 +86,9 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
case GetPeers(blockManagerId) =>
sender ! getPeers(blockManagerId)
+ case GetActorSystemHostPortForExecutor(executorId) =>
+ sender ! getActorSystemHostPortForExecutor(executorId)
+
case GetMemoryStatus =>
sender ! memoryStatus
@@ -412,6 +415,21 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
Seq.empty
}
}
+
+ /**
+ * Returns the hostname and port of an executor's actor system, based on the Akka address of its
+ * BlockManagerSlaveActor.
+ */
+ private def getActorSystemHostPortForExecutor(executorId: String): Option[(String, Int)] = {
+ for (
+ blockManagerId <- blockManagerIdByExecutor.get(executorId);
+ info <- blockManagerInfo.get(blockManagerId);
+ host <- info.slaveActor.path.address.host;
+ port <- info.slaveActor.path.address.port
+ ) yield {
+ (host, port)
+ }
+ }
}
@DeveloperApi
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index 291ddfcc11..3f32099d08 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -92,6 +92,8 @@ private[spark] object BlockManagerMessages {
case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
+ case class GetActorSystemHostPortForExecutor(executorId: String) extends ToBlockManagerMaster
+
case class RemoveExecutor(execId: String) extends ToBlockManagerMaster
case object StopBlockManagerMaster extends ToBlockManagerMaster
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala
new file mode 100644
index 0000000000..e9c755e36f
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.exec
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.util.Try
+import scala.xml.{Text, Node}
+
+import org.apache.spark.ui.{UIUtils, WebUIPage}
+
+private[ui] class ExecutorThreadDumpPage(parent: ExecutorsTab) extends WebUIPage("threadDump") {
+
+ private val sc = parent.sc
+
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val executorId = Option(request.getParameter("executorId")).getOrElse {
+ return Text(s"Missing executorId parameter")
+ }
+ val time = System.currentTimeMillis()
+ val maybeThreadDump = sc.get.getExecutorThreadDump(executorId)
+
+ val content = maybeThreadDump.map { threadDump =>
+ val dumpRows = threadDump.map { thread =>
+ <div class="accordion-group">
+ <div class="accordion-heading" onclick="$(this).next().toggleClass('hidden')">
+ <a class="accordion-toggle">
+ Thread {thread.threadId}: {thread.threadName} ({thread.threadState})
+ </a>
+ </div>
+ <div class="accordion-body hidden">
+ <div class="accordion-inner">
+ <pre>{thread.stackTrace}</pre>
+ </div>
+ </div>
+ </div>
+ }
+
+ <div class="row-fluid">
+ <p>Updated at {UIUtils.formatDate(time)}</p>
+ {
+ // scalastyle:off
+ <p><a class="expandbutton"
+ onClick="$('.accordion-body').removeClass('hidden'); $('.expandbutton').toggleClass('hidden')">
+ Expand All
+ </a></p>
+ <p><a class="expandbutton hidden"
+ onClick="$('.accordion-body').addClass('hidden'); $('.expandbutton').toggleClass('hidden')">
+ Collapse All
+ </a></p>
+ // scalastyle:on
+ }
+ <div class="accordion">{dumpRows}</div>
+ </div>
+ }.getOrElse(Text("Error fetching thread dump"))
+ UIUtils.headerSparkPage(s"Thread dump for executor $executorId", content, parent)
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
index b0e3bb3b55..048fee3ce1 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
@@ -41,7 +41,10 @@ private case class ExecutorSummaryInfo(
totalShuffleWrite: Long,
maxMemory: Long)
-private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
+private[ui] class ExecutorsPage(
+ parent: ExecutorsTab,
+ threadDumpEnabled: Boolean)
+ extends WebUIPage("") {
private val listener = parent.listener
def render(request: HttpServletRequest): Seq[Node] = {
@@ -75,6 +78,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
Shuffle Write
</span>
</th>
+ {if (threadDumpEnabled) <th class="sorttable_nosort">Thread Dump</th> else Seq.empty}
</thead>
<tbody>
{execInfoSorted.map(execRow)}
@@ -133,6 +137,15 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
<td sorttable_customkey={info.totalShuffleWrite.toString}>
{Utils.bytesToString(info.totalShuffleWrite)}
</td>
+ {
+ if (threadDumpEnabled) {
+ <td>
+ <a href={s"threadDump/?executorId=${info.id}"}>Thread Dump</a>
+ </td>
+ } else {
+ Seq.empty
+ }
+ }
</tr>
}
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index 9e0e71a51a..ba97630f02 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -27,8 +27,14 @@ import org.apache.spark.ui.{SparkUI, SparkUITab}
private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "executors") {
val listener = parent.executorsListener
+ val sc = parent.sc
+ val threadDumpEnabled =
+ sc.isDefined && parent.conf.getBoolean("spark.ui.threadDumpsEnabled", true)
- attachPage(new ExecutorsPage(this))
+ attachPage(new ExecutorsPage(this, threadDumpEnabled))
+ if (threadDumpEnabled) {
+ attachPage(new ExecutorThreadDumpPage(this))
+ }
}
/**
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 79e398eb8c..10010bdfa1 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -212,4 +212,18 @@ private[spark] object AkkaUtils extends Logging {
logInfo(s"Connecting to $name: $url")
Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
}
+
+ def makeExecutorRef(
+ name: String,
+ conf: SparkConf,
+ host: String,
+ port: Int,
+ actorSystem: ActorSystem): ActorRef = {
+ val executorActorSystemName = SparkEnv.executorActorSystemName
+ Utils.checkHost(host, "Expected hostname")
+ val url = s"akka.tcp://$executorActorSystemName@$host:$port/user/$name"
+ val timeout = AkkaUtils.lookupTimeout(conf)
+ logInfo(s"Connecting to $name: $url")
+ Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadStackTrace.scala b/core/src/main/scala/org/apache/spark/util/ThreadStackTrace.scala
new file mode 100644
index 0000000000..d4e0ad93b9
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/ThreadStackTrace.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+/**
+ * Used for shipping per-thread stacktraces from the executors to driver.
+ */
+private[spark] case class ThreadStackTrace(
+ threadId: Long,
+ threadName: String,
+ threadState: Thread.State,
+ stackTrace: String)
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index a33046d204..6ab94af9f3 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -18,6 +18,7 @@
package org.apache.spark.util
import java.io._
+import java.lang.management.ManagementFactory
import java.net._
import java.nio.ByteBuffer
import java.util.jar.Attributes.Name
@@ -1611,6 +1612,18 @@ private[spark] object Utils extends Logging {
s"$className: $desc\n$st"
}
+ /** Return a thread dump of all threads' stacktraces. Used to capture dumps for the web UI */
+ def getThreadDump(): Array[ThreadStackTrace] = {
+ // We need to filter out null values here because dumpAllThreads() may return null array
+ // elements for threads that are dead / don't exist.
+ val threadInfos = ManagementFactory.getThreadMXBean.dumpAllThreads(true, true).filter(_ != null)
+ threadInfos.sortBy(_.getThreadId).map { case threadInfo =>
+ val stackTrace = threadInfo.getStackTrace.map(_.toString).mkString("\n")
+ ThreadStackTrace(threadInfo.getThreadId, threadInfo.getThreadName,
+ threadInfo.getThreadState, stackTrace)
+ }
+ }
+
/**
* Convert all spark properties set in the given SparkConf to a sequence of java options.
*/