aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-09-23 19:52:02 -0700
committerReynold Xin <rxin@databricks.com>2015-09-23 19:52:02 -0700
commit758c9d25e92417f8c06328c3af7ea2ef0212c79f (patch)
tree62b08771c98fd69367cec1a675ae2c2fadaf8520 /streaming/src/main
parent83f6f54d12a418f5158ee7ee985b54eef8cc1cf0 (diff)
downloadspark-758c9d25e92417f8c06328c3af7ea2ef0212c79f.tar.gz
spark-758c9d25e92417f8c06328c3af7ea2ef0212c79f.tar.bz2
spark-758c9d25e92417f8c06328c3af7ea2ef0212c79f.zip
[SPARK-10692] [STREAMING] Expose failureReasons in BatchInfo for streaming UI to clear failed batches
Slightly modified version of #8818, all credit goes to zsxwing Author: zsxwing <zsxwing@gmail.com> Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #8892 from tdas/SPARK-10692.
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala10
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala26
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala19
3 files changed, 39 insertions, 16 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
index 9922b6bc12..3c869561ef 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
@@ -39,6 +39,8 @@ case class BatchInfo(
processingEndTime: Option[Long]
) {
+ private var _failureReasons: Map[Int, String] = Map.empty
+
@deprecated("Use streamIdToInputInfo instead", "1.5.0")
def streamIdToNumRecords: Map[Int, Long] = streamIdToInputInfo.mapValues(_.numRecords)
@@ -67,4 +69,12 @@ case class BatchInfo(
* The number of recorders received by the receivers in this batch.
*/
def numRecords: Long = streamIdToInputInfo.values.map(_.numRecords).sum
+
+ /** Set the failure reasons corresponding to every output ops in the batch */
+ private[streaming] def setFailureReason(reasons: Map[Int, String]): Unit = {
+ _failureReasons = reasons
+ }
+
+ /** Failure reasons corresponding to every output ops in the batch */
+ private[streaming] def failureReasons = _failureReasons
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 32d995dc42..66afbf1b11 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -166,22 +166,22 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
}
private def handleJobCompletion(job: Job) {
+ val jobSet = jobSets.get(job.time)
+ jobSet.handleJobCompletion(job)
+ logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
+ if (jobSet.hasCompleted) {
+ jobSets.remove(jobSet.time)
+ jobGenerator.onBatchCompletion(jobSet.time)
+ logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
+ jobSet.totalDelay / 1000.0, jobSet.time.toString,
+ jobSet.processingDelay / 1000.0
+ ))
+ listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
+ }
job.result match {
- case Success(_) =>
- val jobSet = jobSets.get(job.time)
- jobSet.handleJobCompletion(job)
- logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
- if (jobSet.hasCompleted) {
- jobSets.remove(jobSet.time)
- jobGenerator.onBatchCompletion(jobSet.time)
- logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
- jobSet.totalDelay / 1000.0, jobSet.time.toString,
- jobSet.processingDelay / 1000.0
- ))
- listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
- }
case Failure(e) =>
reportError("Error running job " + job, e)
+ case _ =>
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
index 95833efc94..255ccf0536 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
@@ -18,8 +18,10 @@
package org.apache.spark.streaming.scheduler
import scala.collection.mutable.HashSet
+import scala.util.Failure
import org.apache.spark.streaming.Time
+import org.apache.spark.util.Utils
/** Class representing a set of Jobs
* belong to the same batch.
@@ -62,12 +64,23 @@ case class JobSet(
}
def toBatchInfo: BatchInfo = {
- new BatchInfo(
+ val failureReasons: Map[Int, String] = {
+ if (hasCompleted) {
+ jobs.filter(_.result.isFailure).map { job =>
+ (job.outputOpId, Utils.exceptionString(job.result.asInstanceOf[Failure[_]].exception))
+ }.toMap
+ } else {
+ Map.empty
+ }
+ }
+ val binfo = new BatchInfo(
time,
streamIdToInputInfo,
submissionTime,
- if (processingStartTime >= 0 ) Some(processingStartTime) else None,
- if (processingEndTime >= 0 ) Some(processingEndTime) else None
+ if (processingStartTime >= 0) Some(processingStartTime) else None,
+ if (processingEndTime >= 0) Some(processingEndTime) else None
)
+ binfo.setFailureReason(failureReasons)
+ binfo
}
}