aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala14
1 files changed, 8 insertions, 6 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index b7d114bc16..2fa3bf7d52 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -201,18 +201,20 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo))
logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
if (jobSet.hasCompleted) {
- jobSets.remove(jobSet.time)
- jobGenerator.onBatchCompletion(jobSet.time)
- logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
- jobSet.totalDelay / 1000.0, jobSet.time.toString,
- jobSet.processingDelay / 1000.0
- ))
listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
}
job.result match {
case Failure(e) =>
reportError("Error running job " + job, e)
case _ =>
+ if (jobSet.hasCompleted) {
+ jobSets.remove(jobSet.time)
+ jobGenerator.onBatchCompletion(jobSet.time)
+ logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
+ jobSet.totalDelay / 1000.0, jobSet.time.toString,
+ jobSet.processingDelay / 1000.0
+ ))
+ }
}
}