diff options
author | zsxwing <zsxwing@gmail.com> | 2015-09-23 19:52:02 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-09-23 19:52:02 -0700 |
commit | 758c9d25e92417f8c06328c3af7ea2ef0212c79f (patch) | |
tree | 62b08771c98fd69367cec1a675ae2c2fadaf8520 /streaming/src/main/scala | |
parent | 83f6f54d12a418f5158ee7ee985b54eef8cc1cf0 (diff) | |
download | spark-758c9d25e92417f8c06328c3af7ea2ef0212c79f.tar.gz spark-758c9d25e92417f8c06328c3af7ea2ef0212c79f.tar.bz2 spark-758c9d25e92417f8c06328c3af7ea2ef0212c79f.zip |
[SPARK-10692] [STREAMING] Expose failureReasons in BatchInfo for streaming UI to clear failed batches
Slightly modified version of #8818, all credit goes to zsxwing
Author: zsxwing <zsxwing@gmail.com>
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #8892 from tdas/SPARK-10692.
Diffstat (limited to 'streaming/src/main/scala')
3 files changed, 39 insertions, 16 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala index 9922b6bc12..3c869561ef 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala @@ -39,6 +39,8 @@ case class BatchInfo( processingEndTime: Option[Long] ) { + private var _failureReasons: Map[Int, String] = Map.empty + @deprecated("Use streamIdToInputInfo instead", "1.5.0") def streamIdToNumRecords: Map[Int, Long] = streamIdToInputInfo.mapValues(_.numRecords) @@ -67,4 +69,12 @@ case class BatchInfo( * The number of recorders received by the receivers in this batch. */ def numRecords: Long = streamIdToInputInfo.values.map(_.numRecords).sum + + /** Set the failure reasons corresponding to every output ops in the batch */ + private[streaming] def setFailureReason(reasons: Map[Int, String]): Unit = { + _failureReasons = reasons + } + + /** Failure reasons corresponding to every output ops in the batch */ + private[streaming] def failureReasons = _failureReasons } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 32d995dc42..66afbf1b11 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -166,22 +166,22 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { } private def handleJobCompletion(job: Job) { + val jobSet = jobSets.get(job.time) + jobSet.handleJobCompletion(job) + logInfo("Finished job " + job.id + " from job set of time " + jobSet.time) + if (jobSet.hasCompleted) { + jobSets.remove(jobSet.time) + jobGenerator.onBatchCompletion(jobSet.time) + logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format( + jobSet.totalDelay / 1000.0, jobSet.time.toString, + jobSet.processingDelay / 1000.0 + )) + listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo)) + } job.result match { - case Success(_) => - val jobSet = jobSets.get(job.time) - jobSet.handleJobCompletion(job) - logInfo("Finished job " + job.id + " from job set of time " + jobSet.time) - if (jobSet.hasCompleted) { - jobSets.remove(jobSet.time) - jobGenerator.onBatchCompletion(jobSet.time) - logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format( - jobSet.totalDelay / 1000.0, jobSet.time.toString, - jobSet.processingDelay / 1000.0 - )) - listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo)) - } case Failure(e) => reportError("Error running job " + job, e) + case _ => } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala index 95833efc94..255ccf0536 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala @@ -18,8 +18,10 @@ package org.apache.spark.streaming.scheduler import scala.collection.mutable.HashSet +import scala.util.Failure import org.apache.spark.streaming.Time +import org.apache.spark.util.Utils /** Class representing a set of Jobs * belong to the same batch. @@ -62,12 +64,23 @@ case class JobSet( } def toBatchInfo: BatchInfo = { - new BatchInfo( + val failureReasons: Map[Int, String] = { + if (hasCompleted) { + jobs.filter(_.result.isFailure).map { job => + (job.outputOpId, Utils.exceptionString(job.result.asInstanceOf[Failure[_]].exception)) + }.toMap + } else { + Map.empty + } + } + val binfo = new BatchInfo( time, streamIdToInputInfo, submissionTime, - if (processingStartTime >= 0 ) Some(processingStartTime) else None, - if (processingEndTime >= 0 ) Some(processingEndTime) else None + if (processingStartTime >= 0) Some(processingStartTime) else None, + if (processingEndTime >= 0) Some(processingEndTime) else None ) + binfo.setFailureReason(failureReasons) + binfo } } |