aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-01-23 03:15:36 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-01-23 03:15:36 -0800
commit666ce431aa03239d580a8c78b3a2f34a851eb413 (patch)
treeb8d9123fac27ed8f4d72e3e285f687cc70223cc6 /streaming/src/main
parentfad2b82fc8fb49f2171af10cf7e408d8b8dd7349 (diff)
downloadspark-666ce431aa03239d580a8c78b3a2f34a851eb413.tar.gz
spark-666ce431aa03239d580a8c78b3a2f34a851eb413.tar.bz2
spark-666ce431aa03239d580a8c78b3a2f34a851eb413.zip
Added support for rescheduling unprocessed batches on master failure.
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/spark/streaming/Checkpoint.scala3
-rw-r--r--streaming/src/main/scala/spark/streaming/JobManager.scala30
-rw-r--r--streaming/src/main/scala/spark/streaming/Scheduler.scala5
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala4
4 files changed, 37 insertions, 5 deletions
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
index 2f3adb39c2..b9eb7f8ec4 100644
--- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
@@ -17,7 +17,8 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
val jars = ssc.sc.jars
val graph = ssc.graph
val checkpointDir = ssc.checkpointDir
- val checkpointDuration: Duration = ssc.checkpointDuration
+ val checkpointDuration = ssc.checkpointDuration
+ val pendingTimes = ssc.scheduler.jobManager.getPendingTimes()
def validate() {
assert(master != null, "Checkpoint.master is null")
diff --git a/streaming/src/main/scala/spark/streaming/JobManager.scala b/streaming/src/main/scala/spark/streaming/JobManager.scala
index 3b910538e0..5acdd01e58 100644
--- a/streaming/src/main/scala/spark/streaming/JobManager.scala
+++ b/streaming/src/main/scala/spark/streaming/JobManager.scala
@@ -3,6 +3,8 @@ package spark.streaming
import spark.Logging
import spark.SparkEnv
import java.util.concurrent.Executors
+import collection.mutable.HashMap
+import collection.mutable.ArrayBuffer
private[streaming]
@@ -19,15 +21,41 @@ class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging {
case e: Exception =>
logError("Running " + job + " failed", e)
}
+ clearJob(job)
}
}
initLogging()
val jobExecutor = Executors.newFixedThreadPool(numThreads)
-
+ val jobs = new HashMap[Time, ArrayBuffer[Job]]
+
def runJob(job: Job) {
+ jobs.synchronized {
+ jobs.getOrElseUpdate(job.time, new ArrayBuffer[Job]) += job
+ }
jobExecutor.execute(new JobHandler(ssc, job))
logInfo("Added " + job + " to queue")
}
+
+ private def clearJob(job: Job) {
+ jobs.synchronized {
+ val jobsOfTime = jobs.get(job.time)
+ if (jobsOfTime.isDefined) {
+ jobsOfTime.get -= job
+ if (jobsOfTime.get.isEmpty) {
+ jobs -= job.time
+ }
+ } else {
+ throw new Exception("Job finished for time " + job.time +
+ " but time does not exist in jobs")
+ }
+ }
+ }
+
+ def getPendingTimes(): Array[Time] = {
+ jobs.synchronized {
+ jobs.keySet.toArray
+ }
+ }
}
diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala
index c04ed37de8..b77986a3ba 100644
--- a/streaming/src/main/scala/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala
@@ -35,10 +35,13 @@ class Scheduler(ssc: StreamingContext) extends Logging {
// either set the manual clock to the last checkpointed time,
// or if the property is defined set it to that time
if (clock.isInstanceOf[ManualClock]) {
- val lastTime = ssc.getInitialCheckpoint.checkpointTime.milliseconds
+ val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
val jumpTime = System.getProperty("spark.streaming.manualClock.jump", "0").toLong
clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
}
+ // Reschedule the batches that were received but not processed before failure
+ ssc.initialCheckpoint.pendingTimes.foreach(time => generateRDDs(time))
+ // Restart the timer
timer.restart(graph.zeroTime.milliseconds)
logInfo("Scheduler's timer restarted")
} else {
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 2cf00e3baa..5781b1cc72 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -133,7 +133,7 @@ class StreamingContext private (
}
}
- protected[streaming] def getInitialCheckpoint(): Checkpoint = {
+ protected[streaming] def initialCheckpoint: Checkpoint = {
if (isCheckpointPresent) cp_ else null
}
@@ -367,7 +367,7 @@ class StreamingContext private (
}
/**
- * Sstops the execution of the streams.
+ * Stops the execution of the streams.
*/
def stop() {
try {