aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorAhmed Mahran <ahmed.mahran@mashin.io>2016-07-22 12:39:12 +0100
committerSean Owen <sowen@cloudera.com>2016-07-22 12:39:12 +0100
commit2c72a4432b335f44a95feb340cebfd29488d1eb1 (patch)
tree285b685334dcef04cbf44d73e451912e56ceb062 /streaming/src
parente1bd70f44b11141b000821e9754efeabc14f24a5 (diff)
downloadspark-2c72a4432b335f44a95feb340cebfd29488d1eb1.tar.gz
spark-2c72a4432b335f44a95feb340cebfd29488d1eb1.tar.bz2
spark-2c72a4432b335f44a95feb340cebfd29488d1eb1.zip
[SPARK-16487][STREAMING] Fix some batches might not get marked as fully processed in JobGenerator
## What changes were proposed in this pull request? In `JobGenerator`, the code reads like that some batches might not get marked as fully processed. In the following flowchart, the batch should get marked fully processed before endpoint C however it is not. Currently, this does not actually cause an issue, as the condition `(time - zeroTime) is multiple of checkpoint duration?` always evaluates to `true` as the `checkpoint duration` is always set to be equal to the `batch duration`. ![Flowchart](https://s31.postimg.org/udy9lti2j/spark_streaming_job_generator.png) This PR fixes this issue so as to improve code readability and to avoid any potential issue in case there is any future change making checkpoint duration to be set different from batch duration. Author: Ahmed Mahran <ahmed.mahran@mashin.io> Closes #14145 from ahmed-mahran/b-mark-batch-fully-processed.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala4
1 files changed, 3 insertions, 1 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 19c88f1ee0..10d64f98ac 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -287,12 +287,14 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
markBatchFullyProcessed(time)
}
- /** Perform checkpoint for the give `time`. */
+ /** Perform checkpoint for the given `time`. */
private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {
if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
logInfo("Checkpointing graph for time " + time)
ssc.graph.updateCheckpointData(time)
checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)
+ } else if (clearCheckpointDataLater) {
+ markBatchFullyProcessed(time)
}
}