aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/FutureAction.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala121
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/JobResult.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala23
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala11
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala45
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala3
11 files changed, 151 insertions, 78 deletions
diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala
index f2decd14ef..2eec09cd1c 100644
--- a/core/src/main/scala/org/apache/spark/FutureAction.scala
+++ b/core/src/main/scala/org/apache/spark/FutureAction.scala
@@ -141,7 +141,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
private def awaitResult(): Try[T] = {
jobWaiter.awaitResult() match {
case JobSucceeded => scala.util.Success(resultFunc)
- case JobFailed(e: Exception, _) => scala.util.Failure(e)
+ case JobFailed(e: Exception) => scala.util.Failure(e)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index c96d7435a7..c41d6d75a1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -342,22 +342,24 @@ class DAGScheduler(
}
/**
- * Removes job and any stages that are not needed by any other job. Returns the set of ids for
- * stages that were removed. The associated tasks for those stages need to be cancelled if we
- * got here via job cancellation.
+ * Removes state for job and any stages that are not needed by any other job. Does not
+ * handle cancelling tasks or notifying the SparkListener about finished jobs/stages/tasks.
+ *
+ * @param job The job whose state to cleanup.
+ * @param resultStage Specifies the result stage for the job; if set to None, this method
+ * searches resultStagesToJob to find and cleanup the appropriate result stage.
*/
- private def removeJobAndIndependentStages(jobId: Int): Set[Int] = {
- val registeredStages = jobIdToStageIds(jobId)
- val independentStages = new HashSet[Int]()
- if (registeredStages.isEmpty) {
- logError("No stages registered for job " + jobId)
+ private def cleanupStateForJobAndIndependentStages(job: ActiveJob, resultStage: Option[Stage]) {
+ val registeredStages = jobIdToStageIds.get(job.jobId)
+ if (registeredStages.isEmpty || registeredStages.get.isEmpty) {
+ logError("No stages registered for job " + job.jobId)
} else {
- stageIdToJobIds.filterKeys(stageId => registeredStages.contains(stageId)).foreach {
+ stageIdToJobIds.filterKeys(stageId => registeredStages.get.contains(stageId)).foreach {
case (stageId, jobSet) =>
- if (!jobSet.contains(jobId)) {
+ if (!jobSet.contains(job.jobId)) {
logError(
"Job %d not registered for stage %d even though that stage was registered for the job"
- .format(jobId, stageId))
+ .format(job.jobId, stageId))
} else {
def removeStage(stageId: Int) {
// data structures based on Stage
@@ -394,23 +396,28 @@ class DAGScheduler(
.format(stageId, stageIdToStage.size))
}
- jobSet -= jobId
+ jobSet -= job.jobId
if (jobSet.isEmpty) { // no other job needs this stage
- independentStages += stageId
removeStage(stageId)
}
}
}
}
- independentStages.toSet
- }
+ jobIdToStageIds -= job.jobId
+ jobIdToActiveJob -= job.jobId
+ activeJobs -= job
- private def jobIdToStageIdsRemove(jobId: Int) {
- if (!jobIdToStageIds.contains(jobId)) {
- logDebug("Trying to remove unregistered job " + jobId)
+ if (resultStage.isEmpty) {
+ // Clean up result stages.
+ val resultStagesForJob = resultStageToJob.keySet.filter(
+ stage => resultStageToJob(stage).jobId == job.jobId)
+ if (resultStagesForJob.size != 1) {
+ logWarning(
+ s"${resultStagesForJob.size} result stages for job ${job.jobId} (expect exactly 1)")
+ }
+ resultStageToJob --= resultStagesForJob
} else {
- removeJobAndIndependentStages(jobId)
- jobIdToStageIds -= jobId
+ resultStageToJob -= resultStage.get
}
}
@@ -460,7 +467,7 @@ class DAGScheduler(
val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
waiter.awaitResult() match {
case JobSucceeded => {}
- case JobFailed(exception: Exception, _) =>
+ case JobFailed(exception: Exception) =>
logInfo("Failed to run " + callSite)
throw exception
}
@@ -606,7 +613,16 @@ class DAGScheduler(
for (job <- activeJobs) {
val error = new SparkException("Job cancelled because SparkContext was shut down")
job.listener.jobFailed(error)
- listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error, -1)))
+ // Tell the listeners that all of the running stages have ended. Don't bother
+ // cancelling the stages because if the DAG scheduler is stopped, the entire application
+ // is in the process of getting stopped.
+ val stageFailedMessage = "Stage cancelled because SparkContext was shut down"
+ runningStages.foreach { stage =>
+ val info = stageToInfos(stage)
+ info.stageFailed(stageFailedMessage)
+ listenerBus.post(SparkListenerStageCompleted(info))
+ }
+ listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
}
return true
}
@@ -676,7 +692,7 @@ class DAGScheduler(
}
} catch {
case e: Exception =>
- jobResult = JobFailed(e, job.finalStage.id)
+ jobResult = JobFailed(e)
job.listener.jobFailed(e)
} finally {
val s = job.finalStage
@@ -826,11 +842,8 @@ class DAGScheduler(
job.numFinished += 1
// If the whole job has finished, remove it
if (job.numFinished == job.numPartitions) {
- jobIdToActiveJob -= stage.jobId
- activeJobs -= job
- resultStageToJob -= stage
markStageAsFinished(stage)
- jobIdToStageIdsRemove(job.jobId)
+ cleanupStateForJobAndIndependentStages(job, Some(stage))
listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded))
}
job.listener.taskSucceeded(rt.outputId, event.result)
@@ -982,7 +995,7 @@ class DAGScheduler(
if (!jobIdToStageIds.contains(jobId)) {
logDebug("Trying to cancel unregistered job " + jobId)
} else {
- failJobAndIndependentStages(jobIdToActiveJob(jobId), s"Job $jobId cancelled")
+ failJobAndIndependentStages(jobIdToActiveJob(jobId), s"Job $jobId cancelled", None)
}
}
@@ -999,7 +1012,8 @@ class DAGScheduler(
stageToInfos(failedStage).completionTime = Some(System.currentTimeMillis())
for (resultStage <- dependentStages) {
val job = resultStageToJob(resultStage)
- failJobAndIndependentStages(job, s"Job aborted due to stage failure: $reason")
+ failJobAndIndependentStages(job, s"Job aborted due to stage failure: $reason",
+ Some(resultStage))
}
if (dependentStages.isEmpty) {
logInfo("Ignoring failure of " + failedStage + " because all jobs depending on it are done")
@@ -1008,28 +1022,45 @@ class DAGScheduler(
/**
* Fails a job and all stages that are only used by that job, and cleans up relevant state.
+ *
+ * @param resultStage The result stage for the job, if known. Used to cleanup state for the job
+ * slightly more efficiently than when not specified.
*/
- private def failJobAndIndependentStages(job: ActiveJob, failureReason: String) {
+ private def failJobAndIndependentStages(job: ActiveJob, failureReason: String,
+ resultStage: Option[Stage]) {
val error = new SparkException(failureReason)
job.listener.jobFailed(error)
- // Cancel all tasks in independent stages.
- val independentStages = removeJobAndIndependentStages(job.jobId)
- independentStages.foreach(taskScheduler.cancelTasks)
-
- // Clean up remaining state we store for the job.
- jobIdToActiveJob -= job.jobId
- activeJobs -= job
- jobIdToStageIds -= job.jobId
- val resultStagesForJob = resultStageToJob.keySet.filter(
- stage => resultStageToJob(stage).jobId == job.jobId)
- if (resultStagesForJob.size != 1) {
- logWarning(
- s"${resultStagesForJob.size} result stages for job ${job.jobId} (expect exactly 1)")
+ // Cancel all independent, running stages.
+ val stages = jobIdToStageIds(job.jobId)
+ if (stages.isEmpty) {
+ logError("No stages registered for job " + job.jobId)
}
- resultStageToJob --= resultStagesForJob
+ stages.foreach { stageId =>
+ val jobsForStage = stageIdToJobIds.get(stageId)
+ if (jobsForStage.isEmpty || !jobsForStage.get.contains(job.jobId)) {
+ logError(
+ "Job %d not registered for stage %d even though that stage was registered for the job"
+ .format(job.jobId, stageId))
+ } else if (jobsForStage.get.size == 1) {
+ if (!stageIdToStage.contains(stageId)) {
+ logError("Missing Stage for stage with id $stageId")
+ } else {
+ // This is the only job that uses this stage, so fail the stage if it is running.
+ val stage = stageIdToStage(stageId)
+ if (runningStages.contains(stage)) {
+ taskScheduler.cancelTasks(stageId)
+ val stageInfo = stageToInfos(stage)
+ stageInfo.stageFailed(failureReason)
+ listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
+ }
+ }
+ }
+ }
+
+ cleanupStateForJobAndIndependentStages(job, resultStage)
- listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error, job.finalStage.id)))
+ listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
}
/**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index 5cecf9416b..7c5053998f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -191,7 +191,11 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
*/
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
val stageId = stageCompleted.stageInfo.stageId
- stageLogInfo(stageId, "STAGE_ID=%d STATUS=COMPLETED".format(stageId))
+ if (stageCompleted.stageInfo.failureReason.isEmpty) {
+ stageLogInfo(stageId, s"STAGE_ID=$stageId STATUS=COMPLETED")
+ } else {
+ stageLogInfo(stageId, s"STAGE_ID=$stageId STATUS=FAILED")
+ }
}
/**
@@ -227,7 +231,7 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
var info = "JOB_ID=" + jobId
jobEnd.jobResult match {
case JobSucceeded => info += " STATUS=SUCCESS"
- case JobFailed(exception, _) =>
+ case JobFailed(exception) =>
info += " STATUS=FAILED REASON="
exception.getMessage.split("\\s+").foreach(info += _ + "_")
case _ =>
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala b/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala
index 3cf4e3077e..047bd27056 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala
@@ -24,5 +24,4 @@ private[spark] sealed trait JobResult
private[spark] case object JobSucceeded extends JobResult
-// A failed stage ID of -1 means there is not a particular stage that caused the failure
-private[spark] case class JobFailed(exception: Exception, failedStageId: Int) extends JobResult
+private[spark] case class JobFailed(exception: Exception) extends JobResult
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
index 8007b54187..e9bfee2248 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
@@ -64,7 +64,7 @@ private[spark] class JobWaiter[T](
override def jobFailed(exception: Exception): Unit = synchronized {
_jobFinished = true
- jobResult = JobFailed(exception, -1)
+ jobResult = JobFailed(exception)
this.notifyAll()
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index d4eb0ac88d..d42e67742a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -71,7 +71,7 @@ private[spark] case object SparkListenerShutdown extends SparkListenerEvent
*/
trait SparkListener {
/**
- * Called when a stage is completed, with information on the completed stage
+ * Called when a stage completes successfully or fails, with information on the completed stage.
*/
def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
index 8115a7ed78..eec409b182 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -26,8 +26,17 @@ private[spark]
class StageInfo(val stageId: Int, val name: String, val numTasks: Int, val rddInfo: RDDInfo) {
/** When this stage was submitted from the DAGScheduler to a TaskScheduler. */
var submissionTime: Option[Long] = None
+ /** Time when all tasks in the stage completed or when the stage was cancelled. */
var completionTime: Option[Long] = None
+ /** If the stage failed, the reason why. */
+ var failureReason: Option[String] = None
+
var emittedTaskSizeWarning = false
+
+ def stageFailed(reason: String) {
+ failureReason = Some(reason)
+ completionTime = Some(System.currentTimeMillis)
+ }
}
private[spark]
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 048f671c87..5167e20ea3 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -74,8 +74,13 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener {
// Remove by stageId, rather than by StageInfo, in case the StageInfo is from storage
poolToActiveStages(stageIdToPool(stageId)).remove(stageId)
activeStages.remove(stageId)
- completedStages += stage
- trimIfNecessary(completedStages)
+ if (stage.failureReason.isEmpty) {
+ completedStages += stage
+ trimIfNecessary(completedStages)
+ } else {
+ failedStages += stage
+ trimIfNecessary(failedStages)
+ }
}
/** If stages is too large, remove and garbage collect old stages */
@@ -215,20 +220,6 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener {
}
}
- override def onJobEnd(jobEnd: SparkListenerJobEnd) = synchronized {
- jobEnd.jobResult match {
- case JobFailed(_, stageId) =>
- activeStages.get(stageId).foreach { s =>
- // Remove by stageId, rather than by StageInfo, in case the StageInfo is from storage
- activeStages.remove(s.stageId)
- poolToActiveStages(stageIdToPool(stageId)).remove(s.stageId)
- failedStages += s
- trimIfNecessary(failedStages)
- }
- case _ =>
- }
- }
-
override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) {
synchronized {
val schedulingModeName =
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 2155a8888c..19654892bf 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -166,12 +166,14 @@ private[spark] object JsonProtocol {
val rddInfo = rddInfoToJson(stageInfo.rddInfo)
val submissionTime = stageInfo.submissionTime.map(JInt(_)).getOrElse(JNothing)
val completionTime = stageInfo.completionTime.map(JInt(_)).getOrElse(JNothing)
+ val failureReason = stageInfo.failureReason.map(JString(_)).getOrElse(JNothing)
("Stage ID" -> stageInfo.stageId) ~
("Stage Name" -> stageInfo.name) ~
("Number of Tasks" -> stageInfo.numTasks) ~
("RDD Info" -> rddInfo) ~
("Submission Time" -> submissionTime) ~
("Completion Time" -> completionTime) ~
+ ("Failure Reason" -> failureReason) ~
("Emitted Task Size Warning" -> stageInfo.emittedTaskSizeWarning)
}
@@ -259,9 +261,7 @@ private[spark] object JsonProtocol {
val json = jobResult match {
case JobSucceeded => Utils.emptyJson
case jobFailed: JobFailed =>
- val exception = exceptionToJson(jobFailed.exception)
- ("Exception" -> exception) ~
- ("Failed Stage ID" -> jobFailed.failedStageId)
+ JObject("Exception" -> exceptionToJson(jobFailed.exception))
}
("Result" -> result) ~ json
}
@@ -442,11 +442,13 @@ private[spark] object JsonProtocol {
val rddInfo = rddInfoFromJson(json \ "RDD Info")
val submissionTime = Utils.jsonOption(json \ "Submission Time").map(_.extract[Long])
val completionTime = Utils.jsonOption(json \ "Completion Time").map(_.extract[Long])
+ val failureReason = Utils.jsonOption(json \ "Failure Reason").map(_.extract[String])
val emittedTaskSizeWarning = (json \ "Emitted Task Size Warning").extract[Boolean]
val stageInfo = new StageInfo(stageId, stageName, numTasks, rddInfo)
stageInfo.submissionTime = submissionTime
stageInfo.completionTime = completionTime
+ stageInfo.failureReason = failureReason
stageInfo.emittedTaskSizeWarning = emittedTaskSizeWarning
stageInfo
}
@@ -561,8 +563,7 @@ private[spark] object JsonProtocol {
case `jobSucceeded` => JobSucceeded
case `jobFailed` =>
val exception = exceptionFromJson(json \ "Exception")
- val failedStageId = (json \ "Failed Stage ID").extract[Int]
- new JobFailed(exception, failedStageId)
+ new JobFailed(exception)
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 2e3026bffb..a74724d785 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -64,6 +64,21 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
override def defaultParallelism() = 2
}
+ /** Length of time to wait while draining listener events. */
+ val WAIT_TIMEOUT_MILLIS = 10000
+ val sparkListener = new SparkListener() {
+ val successfulStages = new HashSet[Int]()
+ val failedStages = new HashSet[Int]()
+ override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
+ val stageInfo = stageCompleted.stageInfo
+ if (stageInfo.failureReason.isEmpty) {
+ successfulStages += stageInfo.stageId
+ } else {
+ failedStages += stageInfo.stageId
+ }
+ }
+ }
+
var mapOutputTracker: MapOutputTrackerMaster = null
var scheduler: DAGScheduler = null
@@ -89,13 +104,16 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
/** The list of results that DAGScheduler has collected. */
val results = new HashMap[Int, Any]()
var failure: Exception = _
- val listener = new JobListener() {
+ val jobListener = new JobListener() {
override def taskSucceeded(index: Int, result: Any) = results.put(index, result)
override def jobFailed(exception: Exception) = { failure = exception }
}
before {
sc = new SparkContext("local", "DAGSchedulerSuite")
+ sparkListener.successfulStages.clear()
+ sparkListener.failedStages.clear()
+ sc.addSparkListener(sparkListener)
taskSets.clear()
cancelledStages.clear()
cacheLocations.clear()
@@ -187,7 +205,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
partitions: Array[Int],
func: (TaskContext, Iterator[_]) => _ = jobComputeFunc,
allowLocal: Boolean = false,
- listener: JobListener = listener): Int = {
+ listener: JobListener = jobListener): Int = {
val jobId = scheduler.nextJobId.getAndIncrement()
runEvent(JobSubmitted(jobId, rdd, func, partitions, allowLocal, null, listener))
return jobId
@@ -231,7 +249,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
override def toString = "DAGSchedulerSuite Local RDD"
}
val jobId = scheduler.nextJobId.getAndIncrement()
- runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, null, listener))
+ runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, null, jobListener))
assert(results === Map(0 -> 42))
assertDataStructuresEmpty
}
@@ -262,6 +280,9 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
submit(makeRdd(1, Nil), Array(0))
failed(taskSets(0), "some failure")
assert(failure.getMessage === "Job aborted due to stage failure: some failure")
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+ assert(sparkListener.failedStages.contains(0))
+ assert(sparkListener.failedStages.size === 1)
assertDataStructuresEmpty
}
@@ -270,6 +291,9 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
val jobId = submit(rdd, Array(0))
cancel(jobId)
assert(failure.getMessage === s"Job $jobId cancelled")
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+ assert(sparkListener.failedStages.contains(0))
+ assert(sparkListener.failedStages.size === 1)
assertDataStructuresEmpty
}
@@ -354,6 +378,13 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
val stageFailureMessage = "Exception failure in map stage"
failed(taskSets(0), stageFailureMessage)
assert(failure.getMessage === s"Job aborted due to stage failure: $stageFailureMessage")
+
+ // Listener bus should get told about the map stage failing, but not the reduce stage
+ // (since the reduce stage hasn't been started yet).
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+ assert(sparkListener.failedStages.contains(1))
+ assert(sparkListener.failedStages.size === 1)
+
assertDataStructuresEmpty
}
@@ -400,6 +431,14 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
failed(taskSets(0), stageFailureMessage)
assert(cancelledStages.contains(1))
+
+ // Make sure the listeners got told about both failed stages.
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+ assert(sparkListener.successfulStages.isEmpty)
+ assert(sparkListener.failedStages.contains(1))
+ assert(sparkListener.failedStages.contains(3))
+ assert(sparkListener.failedStages.size === 2)
+
assert(listener1.failureMessage === s"Job aborted due to stage failure: $stageFailureMessage")
assert(listener2.failureMessage === s"Job aborted due to stage failure: $stageFailureMessage")
assertDataStructuresEmpty
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 7bab7da8fe..0342a8aff3 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -89,7 +89,7 @@ class JsonProtocolSuite extends FunSuite {
// JobResult
val exception = new Exception("Out of Memory! Please restock film.")
exception.setStackTrace(stackTrace)
- val jobFailed = JobFailed(exception, 2)
+ val jobFailed = JobFailed(exception)
testJobResult(JobSucceeded)
testJobResult(jobFailed)
@@ -294,7 +294,6 @@ class JsonProtocolSuite extends FunSuite {
(result1, result2) match {
case (JobSucceeded, JobSucceeded) =>
case (r1: JobFailed, r2: JobFailed) =>
- assert(r1.failedStageId === r2.failedStageId)
assertEquals(r1.exception, r2.exception)
case _ => fail("Job results don't match in types!")
}