aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-09-23 19:52:02 -0700
committerReynold Xin <rxin@databricks.com>2015-09-23 19:52:02 -0700
commit758c9d25e92417f8c06328c3af7ea2ef0212c79f (patch)
tree62b08771c98fd69367cec1a675ae2c2fadaf8520 /streaming
parent83f6f54d12a418f5158ee7ee985b54eef8cc1cf0 (diff)
downloadspark-758c9d25e92417f8c06328c3af7ea2ef0212c79f.tar.gz
spark-758c9d25e92417f8c06328c3af7ea2ef0212c79f.tar.bz2
spark-758c9d25e92417f8c06328c3af7ea2ef0212c79f.zip
[SPARK-10692] [STREAMING] Expose failureReasons in BatchInfo for streaming UI to clear failed batches
Slightly modified version of #8818, all credit goes to zsxwing Author: zsxwing <zsxwing@gmail.com> Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #8892 from tdas/SPARK-10692.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala10
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala26
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala19
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala76
4 files changed, 115 insertions, 16 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
index 9922b6bc12..3c869561ef 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
@@ -39,6 +39,8 @@ case class BatchInfo(
processingEndTime: Option[Long]
) {
+ private var _failureReasons: Map[Int, String] = Map.empty
+
@deprecated("Use streamIdToInputInfo instead", "1.5.0")
def streamIdToNumRecords: Map[Int, Long] = streamIdToInputInfo.mapValues(_.numRecords)
@@ -67,4 +69,12 @@ case class BatchInfo(
* The number of recorders received by the receivers in this batch.
*/
def numRecords: Long = streamIdToInputInfo.values.map(_.numRecords).sum
+
+ /** Set the failure reasons corresponding to every output ops in the batch */
+ private[streaming] def setFailureReason(reasons: Map[Int, String]): Unit = {
+ _failureReasons = reasons
+ }
+
+ /** Failure reasons corresponding to every output ops in the batch */
+ private[streaming] def failureReasons = _failureReasons
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 32d995dc42..66afbf1b11 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -166,22 +166,22 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
}
private def handleJobCompletion(job: Job) {
+ val jobSet = jobSets.get(job.time)
+ jobSet.handleJobCompletion(job)
+ logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
+ if (jobSet.hasCompleted) {
+ jobSets.remove(jobSet.time)
+ jobGenerator.onBatchCompletion(jobSet.time)
+ logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
+ jobSet.totalDelay / 1000.0, jobSet.time.toString,
+ jobSet.processingDelay / 1000.0
+ ))
+ listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
+ }
job.result match {
- case Success(_) =>
- val jobSet = jobSets.get(job.time)
- jobSet.handleJobCompletion(job)
- logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
- if (jobSet.hasCompleted) {
- jobSets.remove(jobSet.time)
- jobGenerator.onBatchCompletion(jobSet.time)
- logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
- jobSet.totalDelay / 1000.0, jobSet.time.toString,
- jobSet.processingDelay / 1000.0
- ))
- listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
- }
case Failure(e) =>
reportError("Error running job " + job, e)
+ case _ =>
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
index 95833efc94..255ccf0536 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
@@ -18,8 +18,10 @@
package org.apache.spark.streaming.scheduler
import scala.collection.mutable.HashSet
+import scala.util.Failure
import org.apache.spark.streaming.Time
+import org.apache.spark.util.Utils
/** Class representing a set of Jobs
* belong to the same batch.
@@ -62,12 +64,23 @@ case class JobSet(
}
def toBatchInfo: BatchInfo = {
- new BatchInfo(
+ val failureReasons: Map[Int, String] = {
+ if (hasCompleted) {
+ jobs.filter(_.result.isFailure).map { job =>
+ (job.outputOpId, Utils.exceptionString(job.result.asInstanceOf[Failure[_]].exception))
+ }.toMap
+ } else {
+ Map.empty
+ }
+ }
+ val binfo = new BatchInfo(
time,
streamIdToInputInfo,
submissionTime,
- if (processingStartTime >= 0 ) Some(processingStartTime) else None,
- if (processingEndTime >= 0 ) Some(processingEndTime) else None
+ if (processingStartTime >= 0) Some(processingStartTime) else None,
+ if (processingEndTime >= 0) Some(processingEndTime) else None
)
+ binfo.setFailureReason(failureReasons)
+ binfo
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index d840c349bb..d8fd2ced3b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -140,6 +140,69 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
}
}
+ test("onBatchCompleted with successful batch") {
+ ssc = new StreamingContext("local[2]", "test", Milliseconds(1000))
+ val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver)
+ inputStream.foreachRDD(_.count)
+
+ val failureReasons = startStreamingContextAndCollectFailureReasons(ssc)
+ assert(failureReasons != null && failureReasons.isEmpty,
+ "A successful batch should not set errorMessage")
+ }
+
+ test("onBatchCompleted with failed batch and one failed job") {
+ ssc = new StreamingContext("local[2]", "test", Milliseconds(1000))
+ val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver)
+ inputStream.foreachRDD { _ =>
+ throw new RuntimeException("This is a failed job")
+ }
+
+ // Check if failureReasons contains the correct error message
+ val failureReasons = startStreamingContextAndCollectFailureReasons(ssc, isFailed = true)
+ assert(failureReasons != null)
+ assert(failureReasons.size === 1)
+ assert(failureReasons.contains(0))
+ assert(failureReasons(0).contains("This is a failed job"))
+ }
+
+ test("onBatchCompleted with failed batch and multiple failed jobs") {
+ ssc = new StreamingContext("local[2]", "test", Milliseconds(1000))
+ val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver)
+ inputStream.foreachRDD { _ =>
+ throw new RuntimeException("This is a failed job")
+ }
+ inputStream.foreachRDD { _ =>
+ throw new RuntimeException("This is another failed job")
+ }
+
+ // Check if failureReasons contains the correct error messages
+ val failureReasons =
+ startStreamingContextAndCollectFailureReasons(ssc, isFailed = true)
+ assert(failureReasons != null)
+ assert(failureReasons.size === 2)
+ assert(failureReasons.contains(0))
+ assert(failureReasons.contains(1))
+ assert(failureReasons(0).contains("This is a failed job"))
+ assert(failureReasons(1).contains("This is another failed job"))
+ }
+
+ private def startStreamingContextAndCollectFailureReasons(
+ _ssc: StreamingContext, isFailed: Boolean = false): Map[Int, String] = {
+ val failureReasonsCollector = new FailureReasonsCollector()
+ _ssc.addStreamingListener(failureReasonsCollector)
+ val batchCounter = new BatchCounter(_ssc)
+ _ssc.start()
+ // Make sure running at least one batch
+ batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 1, timeout = 10000)
+ if (isFailed) {
+ intercept[RuntimeException] {
+ _ssc.awaitTerminationOrTimeout(10000)
+ }
+ }
+ _ssc.stop()
+ failureReasonsCollector.failureReasons
+ }
+
/** Check if a sequence of numbers is in increasing order */
def isInIncreasingOrder(seq: Seq[Long]): Boolean = {
for (i <- 1 until seq.size) {
@@ -205,3 +268,16 @@ class StreamingListenerSuiteReceiver extends Receiver[Any](StorageLevel.MEMORY_O
}
def onStop() { }
}
+
+/**
+ * A StreamingListener that saves the latest `failureReasons` in `BatchInfo` to the `failureReasons`
+ * field.
+ */
+class FailureReasonsCollector extends StreamingListener {
+
+ @volatile var failureReasons: Map[Int, String] = null
+
+ override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
+ failureReasons = batchCompleted.batchInfo.failureReasons
+ }
+}