diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2015-12-04 01:42:29 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-12-04 01:42:29 -0800 |
commit | 4106d80fb6a16713a6cd2f15ab9d60f2527d9be5 (patch) | |
tree | b2cd9717507fa8427e160022e240a044a9aa0e1d /streaming | |
parent | 5011f264fb53705c528250bd055acbc2eca2baaa (diff) | |
download | spark-4106d80fb6a16713a6cd2f15ab9d60f2527d9be5.tar.gz spark-4106d80fb6a16713a6cd2f15ab9d60f2527d9be5.tar.bz2 spark-4106d80fb6a16713a6cd2f15ab9d60f2527d9be5.zip |
[SPARK-12122][STREAMING] Prevent batches from being submitted twice after recovering StreamingContext from checkpoint
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #10127 from tdas/SPARK-12122.
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala | 3 |
1 files changed, 2 insertions, 1 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 2de035d166..8dfdc1f57b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -220,7 +220,8 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { logInfo("Batches pending processing (" + pendingTimes.size + " batches): " + pendingTimes.mkString(", ")) // Reschedule jobs for these times - val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering) + val timesToReschedule = (pendingTimes ++ downTimes).filter { _ < restartTime } + .distinct.sorted(Time.ordering) logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " + timesToReschedule.mkString(", ")) timesToReschedule.foreach { time => |