aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorDaniel Darabos <darabos.daniel@gmail.com>2014-06-17 00:08:05 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-06-17 00:08:05 -0700
commit23a12ce20c55653b08b16e6159ab31d2ca88acf1 (patch)
tree82ea030ca187555266dac0aab70ce2c8d1491395 /core/src/test
parent8cd04c3eecc2dd827ea163dcd5e08af9912fa323 (diff)
downloadspark-23a12ce20c55653b08b16e6159ab31d2ca88acf1.tar.gz
spark-23a12ce20c55653b08b16e6159ab31d2ca88acf1.tar.bz2
spark-23a12ce20c55653b08b16e6159ab31d2ca88acf1.zip
SPARK-2035: Store call stack for stages, display it on the UI.
I'm not sure about the test -- I get a lot of unrelated failures for some reason. I'll try to sort it out. But hopefully the automation will test this for me if I send a pull request :). I'll attach a demo HTML in [Jira](https://issues.apache.org/jira/browse/SPARK-2035). Author: Daniel Darabos <darabos.daniel@gmail.com> Author: Patrick Wendell <pwendell@gmail.com> Closes #981 from darabos/darabos-call-stack and squashes the following commits: f7c6bfa [Daniel Darabos] Fix bad merge. I undid 83c226d454 by Doris. 3d0a48d [Daniel Darabos] Merge remote-tracking branch 'upstream/master' into darabos-call-stack b857849 [Daniel Darabos] Style: Break long line. ecb5690 [Daniel Darabos] Include the last Spark method in the full stack trace. Otherwise it is not visible if the stage name is overridden. d00a85b [Patrick Wendell] Make call sites for stages non-optional and well defined b9eba24 [Daniel Darabos] Make StageInfo.details non-optional. Add JSON serialization code for the new field. Verify JSON backward compatibility. 4312828 [Daniel Darabos] Remove Mima excludes for CallSite. They should be unnecessary now, with SPARK-2070 fixed. 0920750 [Daniel Darabos] Merge remote-tracking branch 'upstream/master' into darabos-call-stack a4b1faf [Daniel Darabos] Add Mima exclusions for the CallSite changes it has picked up. They are private methods/classes, so we ought to be safe. 932f810 [Daniel Darabos] Use empty CallSite instead of null in DAGSchedulerSuite. Outside of testing, this parameter always originates in SparkContext.scala, and will never be null. ccd89d1 [Daniel Darabos] Fix long lines. ac173e4 [Daniel Darabos] Hide "show details" if there are no details to show. 6182da6 [Daniel Darabos] Set a configurable limit on maximum call stack depth. It can be useful in memory-constrained situations with large numbers of stages. 8fe2e34 [Daniel Darabos] Store call stack for stages, display it on the UI.
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala14
4 files changed, 20 insertions, 7 deletions
diff --git a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala
index cd3887dcc7..1fde4badda 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala
@@ -70,7 +70,7 @@ package object testPackage extends Assertions {
def runCallSiteTest(sc: SparkContext) {
val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2)
val rddCreationSite = rdd.getCreationSite
- val curCallSite = sc.getCallSite() // note: 2 lines after definition of "rdd"
+ val curCallSite = sc.getCallSite().short // note: 2 lines after definition of "rdd"
val rddCreationLine = rddCreationSite match {
case CALL_SITE_REGEX(func, file, line) => {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 7506d56d7e..4536832829 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -29,6 +29,7 @@ import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
+import org.apache.spark.util.CallSite
class BuggyDAGEventProcessActor extends Actor {
val state = 0
@@ -211,7 +212,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
allowLocal: Boolean = false,
listener: JobListener = jobListener): Int = {
val jobId = scheduler.nextJobId.getAndIncrement()
- runEvent(JobSubmitted(jobId, rdd, func, partitions, allowLocal, null, listener))
+ runEvent(JobSubmitted(jobId, rdd, func, partitions, allowLocal, CallSite("", ""), listener))
jobId
}
@@ -251,7 +252,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
override def toString = "DAGSchedulerSuite Local RDD"
}
val jobId = scheduler.nextJobId.getAndIncrement()
- runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, null, jobListener))
+ runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, CallSite("", ""), jobListener))
assert(results === Map(0 -> 42))
assertDataStructuresEmpty
}
@@ -265,7 +266,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
override def toString = "DAGSchedulerSuite Local RDD"
}
val jobId = scheduler.nextJobId.getAndIncrement()
- runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, null, jobListener))
+ runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, CallSite("", ""), jobListener))
assert(results.size == 0)
assertDataStructuresEmpty
}
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 91b4c7b0dd..c3a14f48de 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -32,12 +32,12 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
val listener = new JobProgressListener(conf)
def createStageStartEvent(stageId: Int) = {
- val stageInfo = new StageInfo(stageId, stageId.toString, 0, null)
+ val stageInfo = new StageInfo(stageId, stageId.toString, 0, null, "")
SparkListenerStageSubmitted(stageInfo)
}
def createStageEndEvent(stageId: Int) = {
- val stageInfo = new StageInfo(stageId, stageId.toString, 0, null)
+ val stageInfo = new StageInfo(stageId, stageId.toString, 0, null, "")
SparkListenerStageCompleted(stageInfo)
}
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 3031015256..f72389b6b3 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -117,6 +117,17 @@ class JsonProtocolSuite extends FunSuite {
testBlockId(StreamBlockId(1, 2L))
}
+ test("Backward compatibility") {
+ // StageInfo.details was added after 1.0.0.
+ val info = makeStageInfo(1, 2, 3, 4L, 5L)
+ assert(info.details.nonEmpty)
+ val newJson = JsonProtocol.stageInfoToJson(info)
+ val oldJson = newJson.removeField { case (field, _) => field == "Details" }
+ val newInfo = JsonProtocol.stageInfoFromJson(oldJson)
+ assert(info.name === newInfo.name)
+ assert("" === newInfo.details)
+ }
+
/** -------------------------- *
| Helper test running methods |
@@ -235,6 +246,7 @@ class JsonProtocolSuite extends FunSuite {
(0 until info1.rddInfos.size).foreach { i =>
assertEquals(info1.rddInfos(i), info2.rddInfos(i))
}
+ assert(info1.details === info2.details)
}
private def assertEquals(info1: RDDInfo, info2: RDDInfo) {
@@ -438,7 +450,7 @@ class JsonProtocolSuite extends FunSuite {
private def makeStageInfo(a: Int, b: Int, c: Int, d: Long, e: Long) = {
val rddInfos = (1 to a % 5).map { i => makeRddInfo(a % i, b % i, c % i, d % i, e % i) }
- new StageInfo(a, "greetings", b, rddInfos)
+ new StageInfo(a, "greetings", b, rddInfos, "details")
}
private def makeTaskInfo(a: Long, b: Int, c: Long) = {