aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorDaniel Darabos <darabos.daniel@gmail.com>2014-06-17 00:08:05 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-06-17 00:08:05 -0700
commit23a12ce20c55653b08b16e6159ab31d2ca88acf1 (patch)
tree82ea030ca187555266dac0aab70ce2c8d1491395 /core
parent8cd04c3eecc2dd827ea163dcd5e08af9912fa323 (diff)
downloadspark-23a12ce20c55653b08b16e6159ab31d2ca88acf1.tar.gz
spark-23a12ce20c55653b08b16e6159ab31d2ca88acf1.tar.bz2
spark-23a12ce20c55653b08b16e6159ab31d2ca88acf1.zip
SPARK-2035: Store call stack for stages, display it on the UI.
I'm not sure about the test -- I get a lot of unrelated failures for some reason. I'll try to sort it out. But hopefully the automation will test this for me if I send a pull request :). I'll attach a demo HTML in [Jira](https://issues.apache.org/jira/browse/SPARK-2035). Author: Daniel Darabos <darabos.daniel@gmail.com> Author: Patrick Wendell <pwendell@gmail.com> Closes #981 from darabos/darabos-call-stack and squashes the following commits: f7c6bfa [Daniel Darabos] Fix bad merge. I undid 83c226d454 by Doris. 3d0a48d [Daniel Darabos] Merge remote-tracking branch 'upstream/master' into darabos-call-stack b857849 [Daniel Darabos] Style: Break long line. ecb5690 [Daniel Darabos] Include the last Spark method in the full stack trace. Otherwise it is not visible if the stage name is overridden. d00a85b [Patrick Wendell] Make call sites for stages non-optional and well defined b9eba24 [Daniel Darabos] Make StageInfo.details non-optional. Add JSON serialization code for the new field. Verify JSON backward compatibility. 4312828 [Daniel Darabos] Remove Mima excludes for CallSite. They should be unnecessary now, with SPARK-2070 fixed. 0920750 [Daniel Darabos] Merge remote-tracking branch 'upstream/master' into darabos-call-stack a4b1faf [Daniel Darabos] Add Mima exclusions for the CallSite changes it has picked up. They are private methods/classes, so we ought to be safe. 932f810 [Daniel Darabos] Use empty CallSite instead of null in DAGSchedulerSuite. Outside of testing, this parameter always originates in SparkContext.scala, and will never be null. ccd89d1 [Daniel Darabos] Fix long lines. ac173e4 [Daniel Darabos] Hide "show details" if there are no details to show. 6182da6 [Daniel Darabos] Set a configurable limit on maximum call stack depth. It can be useful in memory-constrained situations with large numbers of stages. 8fe2e34 [Daniel Darabos] Store call stack for stages, display it on the UI.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/resources/org/apache/spark/ui/static/webui.css21
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Stage.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala32
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala14
15 files changed, 115 insertions, 53 deletions
diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.css b/core/src/main/resources/org/apache/spark/ui/static/webui.css
index 599c3ac9b5..a8bc141208 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/webui.css
+++ b/core/src/main/resources/org/apache/spark/ui/static/webui.css
@@ -87,3 +87,24 @@ span.kill-link {
span.kill-link a {
color: gray;
}
+
+span.expand-details {
+ font-size: 10pt;
+ cursor: pointer;
+ color: grey;
+ float: right;
+}
+
+.stage-details {
+ max-height: 100px;
+ overflow-y: auto;
+ margin: 0;
+ transition: max-height 0.5s ease-out, padding 0.5s ease-out;
+}
+
+.stage-details.collapsed {
+ max-height: 0;
+ padding-top: 0;
+ padding-bottom: 0;
+ border: none;
+}
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 35970c2f50..0678bdd021 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -49,7 +49,7 @@ import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, Me
import org.apache.spark.scheduler.local.LocalBackend
import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}
+import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -1036,9 +1036,11 @@ class SparkContext(config: SparkConf) extends Logging {
* Capture the current user callsite and return a formatted version for printing. If the user
* has overridden the call site, this will return the user's version.
*/
- private[spark] def getCallSite(): String = {
- val defaultCallSite = Utils.getCallSiteInfo
- Option(getLocalProperty("externalCallSite")).getOrElse(defaultCallSite.toString)
+ private[spark] def getCallSite(): CallSite = {
+ Option(getLocalProperty("externalCallSite")) match {
+ case Some(callSite) => CallSite(callSite, long = "")
+ case None => Utils.getCallSite
+ }
}
/**
@@ -1058,11 +1060,11 @@ class SparkContext(config: SparkConf) extends Logging {
}
val callSite = getCallSite
val cleanedFunc = clean(func)
- logInfo("Starting job: " + callSite)
+ logInfo("Starting job: " + callSite.short)
val start = System.nanoTime
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
resultHandler, localProperties.get)
- logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
+ logInfo("Job finished: " + callSite.short + ", took " + (System.nanoTime - start) / 1e9 + " s")
rdd.doCheckpoint()
}
@@ -1143,11 +1145,11 @@ class SparkContext(config: SparkConf) extends Logging {
evaluator: ApproximateEvaluator[U, R],
timeout: Long): PartialResult[R] = {
val callSite = getCallSite
- logInfo("Starting job: " + callSite)
+ logInfo("Starting job: " + callSite.short)
val start = System.nanoTime
val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout,
localProperties.get)
- logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
+ logInfo("Job finished: " + callSite.short + ", took " + (System.nanoTime - start) / 1e9 + " s")
result
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 446f369c9e..27cc60d775 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -40,7 +40,7 @@ import org.apache.spark.partial.CountEvaluator
import org.apache.spark.partial.GroupedCountEvaluator
import org.apache.spark.partial.PartialResult
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.{BoundedPriorityQueue, Utils}
+import org.apache.spark.util.{BoundedPriorityQueue, CallSite, Utils}
import org.apache.spark.util.collection.OpenHashMap
import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, SamplingUtils}
@@ -1189,8 +1189,8 @@ abstract class RDD[T: ClassTag](
private var storageLevel: StorageLevel = StorageLevel.NONE
/** User code that created this RDD (e.g. `textFile`, `parallelize`). */
- @transient private[spark] val creationSiteInfo = Utils.getCallSiteInfo
- private[spark] def getCreationSite: String = Option(creationSiteInfo).getOrElse("").toString
+ @transient private[spark] val creationSite = Utils.getCallSite
+ private[spark] def getCreationSite: String = Option(creationSite).map(_.short).getOrElse("")
private[spark] def elementClassTag: ClassTag[T] = classTag[T]
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala b/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala
index 9257f48559..b755d8fb15 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler
import java.util.Properties
import org.apache.spark.TaskContext
+import org.apache.spark.util.CallSite
/**
* Tracks information about an active job in the DAGScheduler.
@@ -29,7 +30,7 @@ private[spark] class ActiveJob(
val finalStage: Stage,
val func: (TaskContext, Iterator[_]) => _,
val partitions: Array[Int],
- val callSite: String,
+ val callSite: CallSite,
val listener: JobListener,
val properties: Properties) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 3c85b5a2ae..b3ebaa547d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -38,7 +38,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerMaster, RDDBlockId}
-import org.apache.spark.util.{SystemClock, Clock, Utils}
+import org.apache.spark.util.{CallSite, SystemClock, Clock, Utils}
/**
* The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of
@@ -195,7 +195,9 @@ class DAGScheduler(
case Some(stage) => stage
case None =>
val stage =
- newOrUsedStage(shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId)
+ newOrUsedStage(
+ shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId,
+ shuffleDep.rdd.creationSite)
shuffleToMapStage(shuffleDep.shuffleId) = stage
stage
}
@@ -212,7 +214,7 @@ class DAGScheduler(
numTasks: Int,
shuffleDep: Option[ShuffleDependency[_, _, _]],
jobId: Int,
- callSite: Option[String] = None)
+ callSite: CallSite)
: Stage =
{
val id = nextStageId.getAndIncrement()
@@ -235,7 +237,7 @@ class DAGScheduler(
numTasks: Int,
shuffleDep: ShuffleDependency[_, _, _],
jobId: Int,
- callSite: Option[String] = None)
+ callSite: CallSite)
: Stage =
{
val stage = newStage(rdd, numTasks, Some(shuffleDep), jobId, callSite)
@@ -413,7 +415,7 @@ class DAGScheduler(
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
- callSite: String,
+ callSite: CallSite,
allowLocal: Boolean,
resultHandler: (Int, U) => Unit,
properties: Properties = null): JobWaiter[U] =
@@ -443,7 +445,7 @@ class DAGScheduler(
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
- callSite: String,
+ callSite: CallSite,
allowLocal: Boolean,
resultHandler: (Int, U) => Unit,
properties: Properties = null)
@@ -452,7 +454,7 @@ class DAGScheduler(
waiter.awaitResult() match {
case JobSucceeded => {}
case JobFailed(exception: Exception) =>
- logInfo("Failed to run " + callSite)
+ logInfo("Failed to run " + callSite.short)
throw exception
}
}
@@ -461,7 +463,7 @@ class DAGScheduler(
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
evaluator: ApproximateEvaluator[U, R],
- callSite: String,
+ callSite: CallSite,
timeout: Long,
properties: Properties = null)
: PartialResult[R] =
@@ -666,7 +668,7 @@ class DAGScheduler(
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
allowLocal: Boolean,
- callSite: String,
+ callSite: CallSite,
listener: JobListener,
properties: Properties = null)
{
@@ -674,7 +676,7 @@ class DAGScheduler(
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
- finalStage = newStage(finalRDD, partitions.size, None, jobId, Some(callSite))
+ finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
@@ -685,7 +687,7 @@ class DAGScheduler(
val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(
- job.jobId, callSite, partitions.length, allowLocal))
+ job.jobId, callSite.short, partitions.length, allowLocal))
logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index 23f57441b4..2b6f7e4205 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -25,6 +25,7 @@ import scala.language.existentials
import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.rdd.RDD
+import org.apache.spark.util.CallSite
/**
* Types of events that can be handled by the DAGScheduler. The DAGScheduler uses an event queue
@@ -40,7 +41,7 @@ private[scheduler] case class JobSubmitted(
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
allowLocal: Boolean,
- callSite: String,
+ callSite: CallSite,
listener: JobListener,
properties: Properties = null)
extends DAGSchedulerEvent
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index 3bf9713f72..9a4be43ee2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.util.CallSite
/**
* A stage is a set of independent tasks all computing the same function that need to run as part
@@ -35,6 +36,11 @@ import org.apache.spark.storage.BlockManagerId
* Each Stage also has a jobId, identifying the job that first submitted the stage. When FIFO
* scheduling is used, this allows Stages from earlier jobs to be computed first or recovered
* faster on failure.
+ *
+ * The callSite provides a location in user code which relates to the stage. For a shuffle map
+ * stage, the callSite gives the user code that created the RDD being shuffled. For a result
+ * stage, the callSite gives the user code that executes the associated action (e.g. count()).
+ *
*/
private[spark] class Stage(
val id: Int,
@@ -43,7 +49,7 @@ private[spark] class Stage(
val shuffleDep: Option[ShuffleDependency[_, _, _]], // Output shuffle if stage is a map stage
val parents: List[Stage],
val jobId: Int,
- callSite: Option[String])
+ val callSite: CallSite)
extends Logging {
val isShuffleMap = shuffleDep.isDefined
@@ -100,7 +106,8 @@ private[spark] class Stage(
id
}
- val name = callSite.getOrElse(rdd.getCreationSite)
+ val name = callSite.short
+ val details = callSite.long
override def toString = "Stage " + id
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
index b42e231e11..7644e3f351 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -25,7 +25,12 @@ import org.apache.spark.storage.RDDInfo
* Stores information about a stage to pass from the scheduler to SparkListeners.
*/
@DeveloperApi
-class StageInfo(val stageId: Int, val name: String, val numTasks: Int, val rddInfos: Seq[RDDInfo]) {
+class StageInfo(
+ val stageId: Int,
+ val name: String,
+ val numTasks: Int,
+ val rddInfos: Seq[RDDInfo],
+ val details: String) {
/** When this stage was submitted from the DAGScheduler to a TaskScheduler. */
var submissionTime: Option[Long] = None
/** Time when all tasks in the stage completed or when the stage was cancelled. */
@@ -52,6 +57,6 @@ private[spark] object StageInfo {
def fromStage(stage: Stage): StageInfo = {
val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd)
val rddInfos = Seq(RDDInfo.fromRdd(stage.rdd)) ++ ancestorRddInfos
- new StageInfo(stage.id, stage.name, stage.numTasks, rddInfos)
+ new StageInfo(stage.id, stage.name, stage.numTasks, rddInfos, stage.details)
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 153434a203..a3f824a4e1 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -91,9 +91,17 @@ private[ui] class StageTableBase(
{s.name}
</a>
+ val details = if (s.details.nonEmpty) (
+ <span onclick="this.parentNode.querySelector('.stage-details').classList.toggle('collapsed')"
+ class="expand-details">
+ +show details
+ </span>
+ <pre class="stage-details collapsed">{s.details}</pre>
+ )
+
listener.stageIdToDescription.get(s.stageId)
.map(d => <div><em>{d}</em></div><div>{nameLink} {killLink}</div>)
- .getOrElse(<div> {killLink}{nameLink}</div>)
+ .getOrElse(<div>{killLink} {nameLink} {details}</div>)
}
protected def stageRow(s: StageInfo): Seq[Node] = {
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 09825087bb..7cecbfe62a 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -184,6 +184,7 @@ private[spark] object JsonProtocol {
("Stage Name" -> stageInfo.name) ~
("Number of Tasks" -> stageInfo.numTasks) ~
("RDD Info" -> rddInfo) ~
+ ("Details" -> stageInfo.details) ~
("Submission Time" -> submissionTime) ~
("Completion Time" -> completionTime) ~
("Failure Reason" -> failureReason) ~
@@ -469,12 +470,13 @@ private[spark] object JsonProtocol {
val stageName = (json \ "Stage Name").extract[String]
val numTasks = (json \ "Number of Tasks").extract[Int]
val rddInfos = (json \ "RDD Info").extract[List[JValue]].map(rddInfoFromJson)
+ val details = (json \ "Details").extractOpt[String].getOrElse("")
val submissionTime = Utils.jsonOption(json \ "Submission Time").map(_.extract[Long])
val completionTime = Utils.jsonOption(json \ "Completion Time").map(_.extract[Long])
val failureReason = Utils.jsonOption(json \ "Failure Reason").map(_.extract[String])
val emittedTaskSizeWarning = (json \ "Emitted Task Size Warning").extract[Boolean]
- val stageInfo = new StageInfo(stageId, stageName, numTasks, rddInfos)
+ val stageInfo = new StageInfo(stageId, stageName, numTasks, rddInfos, details)
stageInfo.submissionTime = submissionTime
stageInfo.completionTime = completionTime
stageInfo.failureReason = failureReason
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 4ce28bb0cf..a2454e120a 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -43,6 +43,9 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.ExecutorUncaughtExceptionHandler
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
+/** CallSite represents a place in user code. It can have a short and a long form. */
+private[spark] case class CallSite(val short: String, val long: String)
+
/**
* Various utility methods used by Spark.
*/
@@ -799,21 +802,12 @@ private[spark] object Utils extends Logging {
*/
private val SPARK_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?\.[A-Z]""".r
- private[spark] class CallSiteInfo(val lastSparkMethod: String, val firstUserFile: String,
- val firstUserLine: Int, val firstUserClass: String) {
-
- /** Returns a printable version of the call site info suitable for logs. */
- override def toString = {
- "%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine)
- }
- }
-
/**
* When called inside a class in the spark package, returns the name of the user code class
* (outside the spark package) that called into Spark, as well as which Spark method they called.
* This is used, for example, to tell users where in their code each RDD got created.
*/
- def getCallSiteInfo: CallSiteInfo = {
+ def getCallSite: CallSite = {
val trace = Thread.currentThread.getStackTrace()
.filterNot(_.getMethodName.contains("getStackTrace"))
@@ -824,11 +818,11 @@ private[spark] object Utils extends Logging {
var lastSparkMethod = "<unknown>"
var firstUserFile = "<unknown>"
var firstUserLine = 0
- var finished = false
- var firstUserClass = "<unknown>"
+ var insideSpark = true
+ var callStack = new ArrayBuffer[String]() :+ "<unknown>"
for (el <- trace) {
- if (!finished) {
+ if (insideSpark) {
if (SPARK_CLASS_REGEX.findFirstIn(el.getClassName).isDefined) {
lastSparkMethod = if (el.getMethodName == "<init>") {
// Spark method is a constructor; get its class name
@@ -836,15 +830,21 @@ private[spark] object Utils extends Logging {
} else {
el.getMethodName
}
+ callStack(0) = el.toString // Put last Spark method on top of the stack trace.
} else {
firstUserLine = el.getLineNumber
firstUserFile = el.getFileName
- firstUserClass = el.getClassName
- finished = true
+ callStack += el.toString
+ insideSpark = false
}
+ } else {
+ callStack += el.toString
}
}
- new CallSiteInfo(lastSparkMethod, firstUserFile, firstUserLine, firstUserClass)
+ val callStackDepth = System.getProperty("spark.callstack.depth", "20").toInt
+ CallSite(
+ short = "%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine),
+ long = callStack.take(callStackDepth).mkString("\n"))
}
/** Return a string containing part of a file from byte 'start' to 'end'. */
diff --git a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala
index cd3887dcc7..1fde4badda 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala
@@ -70,7 +70,7 @@ package object testPackage extends Assertions {
def runCallSiteTest(sc: SparkContext) {
val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2)
val rddCreationSite = rdd.getCreationSite
- val curCallSite = sc.getCallSite() // note: 2 lines after definition of "rdd"
+ val curCallSite = sc.getCallSite().short // note: 2 lines after definition of "rdd"
val rddCreationLine = rddCreationSite match {
case CALL_SITE_REGEX(func, file, line) => {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 7506d56d7e..4536832829 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -29,6 +29,7 @@ import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
+import org.apache.spark.util.CallSite
class BuggyDAGEventProcessActor extends Actor {
val state = 0
@@ -211,7 +212,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
allowLocal: Boolean = false,
listener: JobListener = jobListener): Int = {
val jobId = scheduler.nextJobId.getAndIncrement()
- runEvent(JobSubmitted(jobId, rdd, func, partitions, allowLocal, null, listener))
+ runEvent(JobSubmitted(jobId, rdd, func, partitions, allowLocal, CallSite("", ""), listener))
jobId
}
@@ -251,7 +252,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
override def toString = "DAGSchedulerSuite Local RDD"
}
val jobId = scheduler.nextJobId.getAndIncrement()
- runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, null, jobListener))
+ runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, CallSite("", ""), jobListener))
assert(results === Map(0 -> 42))
assertDataStructuresEmpty
}
@@ -265,7 +266,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
override def toString = "DAGSchedulerSuite Local RDD"
}
val jobId = scheduler.nextJobId.getAndIncrement()
- runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, null, jobListener))
+ runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, CallSite("", ""), jobListener))
assert(results.size == 0)
assertDataStructuresEmpty
}
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 91b4c7b0dd..c3a14f48de 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -32,12 +32,12 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
val listener = new JobProgressListener(conf)
def createStageStartEvent(stageId: Int) = {
- val stageInfo = new StageInfo(stageId, stageId.toString, 0, null)
+ val stageInfo = new StageInfo(stageId, stageId.toString, 0, null, "")
SparkListenerStageSubmitted(stageInfo)
}
def createStageEndEvent(stageId: Int) = {
- val stageInfo = new StageInfo(stageId, stageId.toString, 0, null)
+ val stageInfo = new StageInfo(stageId, stageId.toString, 0, null, "")
SparkListenerStageCompleted(stageInfo)
}
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 3031015256..f72389b6b3 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -117,6 +117,17 @@ class JsonProtocolSuite extends FunSuite {
testBlockId(StreamBlockId(1, 2L))
}
+ test("Backward compatibility") {
+ // StageInfo.details was added after 1.0.0.
+ val info = makeStageInfo(1, 2, 3, 4L, 5L)
+ assert(info.details.nonEmpty)
+ val newJson = JsonProtocol.stageInfoToJson(info)
+ val oldJson = newJson.removeField { case (field, _) => field == "Details" }
+ val newInfo = JsonProtocol.stageInfoFromJson(oldJson)
+ assert(info.name === newInfo.name)
+ assert("" === newInfo.details)
+ }
+
/** -------------------------- *
| Helper test running methods |
@@ -235,6 +246,7 @@ class JsonProtocolSuite extends FunSuite {
(0 until info1.rddInfos.size).foreach { i =>
assertEquals(info1.rddInfos(i), info2.rddInfos(i))
}
+ assert(info1.details === info2.details)
}
private def assertEquals(info1: RDDInfo, info2: RDDInfo) {
@@ -438,7 +450,7 @@ class JsonProtocolSuite extends FunSuite {
private def makeStageInfo(a: Int, b: Int, c: Int, d: Long, e: Long) = {
val rddInfos = (1 to a % 5).map { i => makeRddInfo(a % i, b % i, c % i, d % i, e % i) }
- new StageInfo(a, "greetings", b, rddInfos)
+ new StageInfo(a, "greetings", b, rddInfos, "details")
}
private def makeTaskInfo(a: Long, b: Int, c: Long) = {