aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorCodingCat <zhunansjtu@gmail.com>2017-01-16 18:33:20 -0800
committerShixiong Zhu <shixiong@databricks.com>2017-01-16 18:33:20 -0800
commitf8db8945f25cb884278ff8841bac5f6f28f0dec6 (patch)
treee2873e39e8c6c1926a15576d5fe78068e5921884 /streaming
parentc84f7d3e1b845bc1e595ce9a6e2de663c2d218f4 (diff)
downloadspark-f8db8945f25cb884278ff8841bac5f6f28f0dec6.tar.gz
spark-f8db8945f25cb884278ff8841bac5f6f28f0dec6.tar.bz2
spark-f8db8945f25cb884278ff8841bac5f6f28f0dec6.zip
[SPARK-18905][STREAMING] Fix the issue of removing a failed jobset from JobScheduler.jobSets
## What changes were proposed in this pull request? the current implementation of Spark streaming considers a batch is completed no matter the results of the jobs (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203) Let's consider the following case: A micro batch contains 2 jobs and they read from two different kafka topics respectively. One of these jobs is failed due to some problem in the user defined logic, after the other one is finished successfully. 1. The main thread in the Spark streaming application will execute the line mentioned above, 2. and another thread (checkpoint writer) will make a checkpoint file immediately after this line is executed. 3. Then due to the current error handling mechanism in Spark Streaming, StreamingContext will be closed (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214) the user recovers from the checkpoint file, and because the JobSet containing the failed job has been removed (taken as completed) before the checkpoint is constructed, the data being processed by the failed job would never be reprocessed This PR fix it by removing jobset from JobScheduler.jobSets only when all jobs in a jobset are successfully finished ## How was this patch tested? existing tests Author: CodingCat <zhunansjtu@gmail.com> Author: Nan Zhu <zhunansjtu@gmail.com> Closes #16542 from CodingCat/SPARK-18905.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala14
1 files changed, 8 insertions, 6 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index b7d114bc16..2fa3bf7d52 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -201,18 +201,20 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo))
logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
if (jobSet.hasCompleted) {
- jobSets.remove(jobSet.time)
- jobGenerator.onBatchCompletion(jobSet.time)
- logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
- jobSet.totalDelay / 1000.0, jobSet.time.toString,
- jobSet.processingDelay / 1000.0
- ))
listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
}
job.result match {
case Failure(e) =>
reportError("Error running job " + job, e)
case _ =>
+ if (jobSet.hasCompleted) {
+ jobSets.remove(jobSet.time)
+ jobGenerator.onBatchCompletion(jobSet.time)
+ logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
+ jobSet.totalDelay / 1000.0, jobSet.time.toString,
+ jobSet.processingDelay / 1000.0
+ ))
+ }
}
}