diff options
15 files changed, 115 insertions, 53 deletions
diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.css b/core/src/main/resources/org/apache/spark/ui/static/webui.css index 599c3ac9b5..a8bc141208 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/webui.css +++ b/core/src/main/resources/org/apache/spark/ui/static/webui.css @@ -87,3 +87,24 @@ span.kill-link { span.kill-link a { color: gray; } + +span.expand-details { + font-size: 10pt; + cursor: pointer; + color: grey; + float: right; +} + +.stage-details { + max-height: 100px; + overflow-y: auto; + margin: 0; + transition: max-height 0.5s ease-out, padding 0.5s ease-out; +} + +.stage-details.collapsed { + max-height: 0; + padding-top: 0; + padding-bottom: 0; + border: none; +} diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 35970c2f50..0678bdd021 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -49,7 +49,7 @@ import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, Me import org.apache.spark.scheduler.local.LocalBackend import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils} import org.apache.spark.ui.SparkUI -import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils} +import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils} /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark @@ -1036,9 +1036,11 @@ class SparkContext(config: SparkConf) extends Logging { * Capture the current user callsite and return a formatted version for printing. If the user * has overridden the call site, this will return the user's version. */ - private[spark] def getCallSite(): String = { - val defaultCallSite = Utils.getCallSiteInfo - Option(getLocalProperty("externalCallSite")).getOrElse(defaultCallSite.toString) + private[spark] def getCallSite(): CallSite = { + Option(getLocalProperty("externalCallSite")) match { + case Some(callSite) => CallSite(callSite, long = "") + case None => Utils.getCallSite + } } /** @@ -1058,11 +1060,11 @@ class SparkContext(config: SparkConf) extends Logging { } val callSite = getCallSite val cleanedFunc = clean(func) - logInfo("Starting job: " + callSite) + logInfo("Starting job: " + callSite.short) val start = System.nanoTime dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal, resultHandler, localProperties.get) - logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s") + logInfo("Job finished: " + callSite.short + ", took " + (System.nanoTime - start) / 1e9 + " s") rdd.doCheckpoint() } @@ -1143,11 +1145,11 @@ class SparkContext(config: SparkConf) extends Logging { evaluator: ApproximateEvaluator[U, R], timeout: Long): PartialResult[R] = { val callSite = getCallSite - logInfo("Starting job: " + callSite) + logInfo("Starting job: " + callSite.short) val start = System.nanoTime val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout, localProperties.get) - logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s") + logInfo("Job finished: " + callSite.short + ", took " + (System.nanoTime - start) / 1e9 + " s") result } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 446f369c9e..27cc60d775 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -40,7 +40,7 @@ import org.apache.spark.partial.CountEvaluator import org.apache.spark.partial.GroupedCountEvaluator import org.apache.spark.partial.PartialResult import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.{BoundedPriorityQueue, Utils} +import org.apache.spark.util.{BoundedPriorityQueue, CallSite, Utils} import org.apache.spark.util.collection.OpenHashMap import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, SamplingUtils} @@ -1189,8 +1189,8 @@ abstract class RDD[T: ClassTag]( private var storageLevel: StorageLevel = StorageLevel.NONE /** User code that created this RDD (e.g. `textFile`, `parallelize`). */ - @transient private[spark] val creationSiteInfo = Utils.getCallSiteInfo - private[spark] def getCreationSite: String = Option(creationSiteInfo).getOrElse("").toString + @transient private[spark] val creationSite = Utils.getCallSite + private[spark] def getCreationSite: String = Option(creationSite).map(_.short).getOrElse("") private[spark] def elementClassTag: ClassTag[T] = classTag[T] diff --git a/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala b/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala index 9257f48559..b755d8fb15 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala @@ -20,6 +20,7 @@ package org.apache.spark.scheduler import java.util.Properties import org.apache.spark.TaskContext +import org.apache.spark.util.CallSite /** * Tracks information about an active job in the DAGScheduler. @@ -29,7 +30,7 @@ private[spark] class ActiveJob( val finalStage: Stage, val func: (TaskContext, Iterator[_]) => _, val partitions: Array[Int], - val callSite: String, + val callSite: CallSite, val listener: JobListener, val properties: Properties) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 3c85b5a2ae..b3ebaa547d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -38,7 +38,7 @@ import org.apache.spark.executor.TaskMetrics import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} import org.apache.spark.rdd.RDD import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerMaster, RDDBlockId} -import org.apache.spark.util.{SystemClock, Clock, Utils} +import org.apache.spark.util.{CallSite, SystemClock, Clock, Utils} /** * The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of @@ -195,7 +195,9 @@ class DAGScheduler( case Some(stage) => stage case None => val stage = - newOrUsedStage(shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId) + newOrUsedStage( + shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId, + shuffleDep.rdd.creationSite) shuffleToMapStage(shuffleDep.shuffleId) = stage stage } @@ -212,7 +214,7 @@ class DAGScheduler( numTasks: Int, shuffleDep: Option[ShuffleDependency[_, _, _]], jobId: Int, - callSite: Option[String] = None) + callSite: CallSite) : Stage = { val id = nextStageId.getAndIncrement() @@ -235,7 +237,7 @@ class DAGScheduler( numTasks: Int, shuffleDep: ShuffleDependency[_, _, _], jobId: Int, - callSite: Option[String] = None) + callSite: CallSite) : Stage = { val stage = newStage(rdd, numTasks, Some(shuffleDep), jobId, callSite) @@ -413,7 +415,7 @@ class DAGScheduler( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], - callSite: String, + callSite: CallSite, allowLocal: Boolean, resultHandler: (Int, U) => Unit, properties: Properties = null): JobWaiter[U] = @@ -443,7 +445,7 @@ class DAGScheduler( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], - callSite: String, + callSite: CallSite, allowLocal: Boolean, resultHandler: (Int, U) => Unit, properties: Properties = null) @@ -452,7 +454,7 @@ class DAGScheduler( waiter.awaitResult() match { case JobSucceeded => {} case JobFailed(exception: Exception) => - logInfo("Failed to run " + callSite) + logInfo("Failed to run " + callSite.short) throw exception } } @@ -461,7 +463,7 @@ class DAGScheduler( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, evaluator: ApproximateEvaluator[U, R], - callSite: String, + callSite: CallSite, timeout: Long, properties: Properties = null) : PartialResult[R] = @@ -666,7 +668,7 @@ class DAGScheduler( func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], allowLocal: Boolean, - callSite: String, + callSite: CallSite, listener: JobListener, properties: Properties = null) { @@ -674,7 +676,7 @@ class DAGScheduler( try { // New stage creation may throw an exception if, for example, jobs are run on a // HadoopRDD whose underlying HDFS files have been deleted. - finalStage = newStage(finalRDD, partitions.size, None, jobId, Some(callSite)) + finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite) } catch { case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) @@ -685,7 +687,7 @@ class DAGScheduler( val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties) clearCacheLocs() logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format( - job.jobId, callSite, partitions.length, allowLocal)) + job.jobId, callSite.short, partitions.length, allowLocal)) logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")") logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + getMissingParentStages(finalStage)) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index 23f57441b4..2b6f7e4205 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -25,6 +25,7 @@ import scala.language.existentials import org.apache.spark._ import org.apache.spark.executor.TaskMetrics import org.apache.spark.rdd.RDD +import org.apache.spark.util.CallSite /** * Types of events that can be handled by the DAGScheduler. The DAGScheduler uses an event queue @@ -40,7 +41,7 @@ private[scheduler] case class JobSubmitted( func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], allowLocal: Boolean, - callSite: String, + callSite: CallSite, listener: JobListener, properties: Properties = null) extends DAGSchedulerEvent diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index 3bf9713f72..9a4be43ee2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -20,6 +20,7 @@ package org.apache.spark.scheduler import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.storage.BlockManagerId +import org.apache.spark.util.CallSite /** * A stage is a set of independent tasks all computing the same function that need to run as part @@ -35,6 +36,11 @@ import org.apache.spark.storage.BlockManagerId * Each Stage also has a jobId, identifying the job that first submitted the stage. When FIFO * scheduling is used, this allows Stages from earlier jobs to be computed first or recovered * faster on failure. + * + * The callSite provides a location in user code which relates to the stage. For a shuffle map + * stage, the callSite gives the user code that created the RDD being shuffled. For a result + * stage, the callSite gives the user code that executes the associated action (e.g. count()). + * */ private[spark] class Stage( val id: Int, @@ -43,7 +49,7 @@ private[spark] class Stage( val shuffleDep: Option[ShuffleDependency[_, _, _]], // Output shuffle if stage is a map stage val parents: List[Stage], val jobId: Int, - callSite: Option[String]) + val callSite: CallSite) extends Logging { val isShuffleMap = shuffleDep.isDefined @@ -100,7 +106,8 @@ private[spark] class Stage( id } - val name = callSite.getOrElse(rdd.getCreationSite) + val name = callSite.short + val details = callSite.long override def toString = "Stage " + id diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala index b42e231e11..7644e3f351 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala @@ -25,7 +25,12 @@ import org.apache.spark.storage.RDDInfo * Stores information about a stage to pass from the scheduler to SparkListeners. */ @DeveloperApi -class StageInfo(val stageId: Int, val name: String, val numTasks: Int, val rddInfos: Seq[RDDInfo]) { +class StageInfo( + val stageId: Int, + val name: String, + val numTasks: Int, + val rddInfos: Seq[RDDInfo], + val details: String) { /** When this stage was submitted from the DAGScheduler to a TaskScheduler. */ var submissionTime: Option[Long] = None /** Time when all tasks in the stage completed or when the stage was cancelled. */ @@ -52,6 +57,6 @@ private[spark] object StageInfo { def fromStage(stage: Stage): StageInfo = { val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd) val rddInfos = Seq(RDDInfo.fromRdd(stage.rdd)) ++ ancestorRddInfos - new StageInfo(stage.id, stage.name, stage.numTasks, rddInfos) + new StageInfo(stage.id, stage.name, stage.numTasks, rddInfos, stage.details) } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 153434a203..a3f824a4e1 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -91,9 +91,17 @@ private[ui] class StageTableBase( {s.name} </a> + val details = if (s.details.nonEmpty) ( + <span onclick="this.parentNode.querySelector('.stage-details').classList.toggle('collapsed')" + class="expand-details"> + +show details + </span> + <pre class="stage-details collapsed">{s.details}</pre> + ) + listener.stageIdToDescription.get(s.stageId) .map(d => <div><em>{d}</em></div><div>{nameLink} {killLink}</div>) - .getOrElse(<div> {killLink}{nameLink}</div>) + .getOrElse(<div>{killLink} {nameLink} {details}</div>) } protected def stageRow(s: StageInfo): Seq[Node] = { diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 09825087bb..7cecbfe62a 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -184,6 +184,7 @@ private[spark] object JsonProtocol { ("Stage Name" -> stageInfo.name) ~ ("Number of Tasks" -> stageInfo.numTasks) ~ ("RDD Info" -> rddInfo) ~ + ("Details" -> stageInfo.details) ~ ("Submission Time" -> submissionTime) ~ ("Completion Time" -> completionTime) ~ ("Failure Reason" -> failureReason) ~ @@ -469,12 +470,13 @@ private[spark] object JsonProtocol { val stageName = (json \ "Stage Name").extract[String] val numTasks = (json \ "Number of Tasks").extract[Int] val rddInfos = (json \ "RDD Info").extract[List[JValue]].map(rddInfoFromJson) + val details = (json \ "Details").extractOpt[String].getOrElse("") val submissionTime = Utils.jsonOption(json \ "Submission Time").map(_.extract[Long]) val completionTime = Utils.jsonOption(json \ "Completion Time").map(_.extract[Long]) val failureReason = Utils.jsonOption(json \ "Failure Reason").map(_.extract[String]) val emittedTaskSizeWarning = (json \ "Emitted Task Size Warning").extract[Boolean] - val stageInfo = new StageInfo(stageId, stageName, numTasks, rddInfos) + val stageInfo = new StageInfo(stageId, stageName, numTasks, rddInfos, details) stageInfo.submissionTime = submissionTime stageInfo.completionTime = completionTime stageInfo.failureReason = failureReason diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 4ce28bb0cf..a2454e120a 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -43,6 +43,9 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.ExecutorUncaughtExceptionHandler import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} +/** CallSite represents a place in user code. It can have a short and a long form. */ +private[spark] case class CallSite(val short: String, val long: String) + /** * Various utility methods used by Spark. */ @@ -799,21 +802,12 @@ private[spark] object Utils extends Logging { */ private val SPARK_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?\.[A-Z]""".r - private[spark] class CallSiteInfo(val lastSparkMethod: String, val firstUserFile: String, - val firstUserLine: Int, val firstUserClass: String) { - - /** Returns a printable version of the call site info suitable for logs. */ - override def toString = { - "%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine) - } - } - /** * When called inside a class in the spark package, returns the name of the user code class * (outside the spark package) that called into Spark, as well as which Spark method they called. * This is used, for example, to tell users where in their code each RDD got created. */ - def getCallSiteInfo: CallSiteInfo = { + def getCallSite: CallSite = { val trace = Thread.currentThread.getStackTrace() .filterNot(_.getMethodName.contains("getStackTrace")) @@ -824,11 +818,11 @@ private[spark] object Utils extends Logging { var lastSparkMethod = "<unknown>" var firstUserFile = "<unknown>" var firstUserLine = 0 - var finished = false - var firstUserClass = "<unknown>" + var insideSpark = true + var callStack = new ArrayBuffer[String]() :+ "<unknown>" for (el <- trace) { - if (!finished) { + if (insideSpark) { if (SPARK_CLASS_REGEX.findFirstIn(el.getClassName).isDefined) { lastSparkMethod = if (el.getMethodName == "<init>") { // Spark method is a constructor; get its class name @@ -836,15 +830,21 @@ private[spark] object Utils extends Logging { } else { el.getMethodName } + callStack(0) = el.toString // Put last Spark method on top of the stack trace. } else { firstUserLine = el.getLineNumber firstUserFile = el.getFileName - firstUserClass = el.getClassName - finished = true + callStack += el.toString + insideSpark = false } + } else { + callStack += el.toString } } - new CallSiteInfo(lastSparkMethod, firstUserFile, firstUserLine, firstUserClass) + val callStackDepth = System.getProperty("spark.callstack.depth", "20").toInt + CallSite( + short = "%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine), + long = callStack.take(callStackDepth).mkString("\n")) } /** Return a string containing part of a file from byte 'start' to 'end'. */ diff --git a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala index cd3887dcc7..1fde4badda 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala @@ -70,7 +70,7 @@ package object testPackage extends Assertions { def runCallSiteTest(sc: SparkContext) { val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2) val rddCreationSite = rdd.getCreationSite - val curCallSite = sc.getCallSite() // note: 2 lines after definition of "rdd" + val curCallSite = sc.getCallSite().short // note: 2 lines after definition of "rdd" val rddCreationLine = rddCreationSite match { case CALL_SITE_REGEX(func, file, line) => { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 7506d56d7e..4536832829 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -29,6 +29,7 @@ import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster} +import org.apache.spark.util.CallSite class BuggyDAGEventProcessActor extends Actor { val state = 0 @@ -211,7 +212,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F allowLocal: Boolean = false, listener: JobListener = jobListener): Int = { val jobId = scheduler.nextJobId.getAndIncrement() - runEvent(JobSubmitted(jobId, rdd, func, partitions, allowLocal, null, listener)) + runEvent(JobSubmitted(jobId, rdd, func, partitions, allowLocal, CallSite("", ""), listener)) jobId } @@ -251,7 +252,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F override def toString = "DAGSchedulerSuite Local RDD" } val jobId = scheduler.nextJobId.getAndIncrement() - runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, null, jobListener)) + runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, CallSite("", ""), jobListener)) assert(results === Map(0 -> 42)) assertDataStructuresEmpty } @@ -265,7 +266,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F override def toString = "DAGSchedulerSuite Local RDD" } val jobId = scheduler.nextJobId.getAndIncrement() - runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, null, jobListener)) + runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, CallSite("", ""), jobListener)) assert(results.size == 0) assertDataStructuresEmpty } diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 91b4c7b0dd..c3a14f48de 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -32,12 +32,12 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc val listener = new JobProgressListener(conf) def createStageStartEvent(stageId: Int) = { - val stageInfo = new StageInfo(stageId, stageId.toString, 0, null) + val stageInfo = new StageInfo(stageId, stageId.toString, 0, null, "") SparkListenerStageSubmitted(stageInfo) } def createStageEndEvent(stageId: Int) = { - val stageInfo = new StageInfo(stageId, stageId.toString, 0, null) + val stageInfo = new StageInfo(stageId, stageId.toString, 0, null, "") SparkListenerStageCompleted(stageInfo) } diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 3031015256..f72389b6b3 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -117,6 +117,17 @@ class JsonProtocolSuite extends FunSuite { testBlockId(StreamBlockId(1, 2L)) } + test("Backward compatibility") { + // StageInfo.details was added after 1.0.0. + val info = makeStageInfo(1, 2, 3, 4L, 5L) + assert(info.details.nonEmpty) + val newJson = JsonProtocol.stageInfoToJson(info) + val oldJson = newJson.removeField { case (field, _) => field == "Details" } + val newInfo = JsonProtocol.stageInfoFromJson(oldJson) + assert(info.name === newInfo.name) + assert("" === newInfo.details) + } + /** -------------------------- * | Helper test running methods | @@ -235,6 +246,7 @@ class JsonProtocolSuite extends FunSuite { (0 until info1.rddInfos.size).foreach { i => assertEquals(info1.rddInfos(i), info2.rddInfos(i)) } + assert(info1.details === info2.details) } private def assertEquals(info1: RDDInfo, info2: RDDInfo) { @@ -438,7 +450,7 @@ class JsonProtocolSuite extends FunSuite { private def makeStageInfo(a: Int, b: Int, c: Int, d: Long, e: Long) = { val rddInfos = (1 to a % 5).map { i => makeRddInfo(a % i, b % i, c % i, d % i, e % i) } - new StageInfo(a, "greetings", b, rddInfos) + new StageInfo(a, "greetings", b, rddInfos, "details") } private def makeTaskInfo(a: Long, b: Int, c: Long) = { |