aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-12-04 01:42:29 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-12-04 01:42:29 -0800
commit4106d80fb6a16713a6cd2f15ab9d60f2527d9be5 (patch)
treeb2cd9717507fa8427e160022e240a044a9aa0e1d /streaming/src/main
parent5011f264fb53705c528250bd055acbc2eca2baaa (diff)
downloadspark-4106d80fb6a16713a6cd2f15ab9d60f2527d9be5.tar.gz
spark-4106d80fb6a16713a6cd2f15ab9d60f2527d9be5.tar.bz2
spark-4106d80fb6a16713a6cd2f15ab9d60f2527d9be5.zip
[SPARK-12122][STREAMING] Prevent batches from being submitted twice after recovering StreamingContext from checkpoint
Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #10127 from tdas/SPARK-12122.
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala3
1 files changed, 2 insertions, 1 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 2de035d166..8dfdc1f57b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -220,7 +220,8 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
logInfo("Batches pending processing (" + pendingTimes.size + " batches): " +
pendingTimes.mkString(", "))
// Reschedule jobs for these times
- val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering)
+ val timesToReschedule = (pendingTimes ++ downTimes).filter { _ < restartTime }
+ .distinct.sorted(Time.ordering)
logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " +
timesToReschedule.mkString(", "))
timesToReschedule.foreach { time =>