diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-01-23 03:15:36 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-01-23 03:15:36 -0800 |
commit | 666ce431aa03239d580a8c78b3a2f34a851eb413 (patch) | |
tree | b8d9123fac27ed8f4d72e3e285f687cc70223cc6 /streaming | |
parent | fad2b82fc8fb49f2171af10cf7e408d8b8dd7349 (diff) | |
download | spark-666ce431aa03239d580a8c78b3a2f34a851eb413.tar.gz spark-666ce431aa03239d580a8c78b3a2f34a851eb413.tar.bz2 spark-666ce431aa03239d580a8c78b3a2f34a851eb413.zip |
Added support for rescheduling unprocessed batches on master failure.
Diffstat (limited to 'streaming')
5 files changed, 53 insertions, 12 deletions
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala index 2f3adb39c2..b9eb7f8ec4 100644 --- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala @@ -17,7 +17,8 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) val jars = ssc.sc.jars val graph = ssc.graph val checkpointDir = ssc.checkpointDir - val checkpointDuration: Duration = ssc.checkpointDuration + val checkpointDuration = ssc.checkpointDuration + val pendingTimes = ssc.scheduler.jobManager.getPendingTimes() def validate() { assert(master != null, "Checkpoint.master is null") diff --git a/streaming/src/main/scala/spark/streaming/JobManager.scala b/streaming/src/main/scala/spark/streaming/JobManager.scala index 3b910538e0..5acdd01e58 100644 --- a/streaming/src/main/scala/spark/streaming/JobManager.scala +++ b/streaming/src/main/scala/spark/streaming/JobManager.scala @@ -3,6 +3,8 @@ package spark.streaming import spark.Logging import spark.SparkEnv import java.util.concurrent.Executors +import collection.mutable.HashMap +import collection.mutable.ArrayBuffer private[streaming] @@ -19,15 +21,41 @@ class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging { case e: Exception => logError("Running " + job + " failed", e) } + clearJob(job) } } initLogging() val jobExecutor = Executors.newFixedThreadPool(numThreads) - + val jobs = new HashMap[Time, ArrayBuffer[Job]] + def runJob(job: Job) { + jobs.synchronized { + jobs.getOrElseUpdate(job.time, new ArrayBuffer[Job]) += job + } jobExecutor.execute(new JobHandler(ssc, job)) logInfo("Added " + job + " to queue") } + + private def clearJob(job: Job) { + jobs.synchronized { + val jobsOfTime = jobs.get(job.time) + if (jobsOfTime.isDefined) { + jobsOfTime.get -= job + if (jobsOfTime.get.isEmpty) { + jobs -= job.time + } + } else { + throw new Exception("Job finished for time " + job.time + + " but time does not exist in jobs") + } + } + } + + def getPendingTimes(): Array[Time] = { + jobs.synchronized { + jobs.keySet.toArray + } + } } diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala index c04ed37de8..b77986a3ba 100644 --- a/streaming/src/main/scala/spark/streaming/Scheduler.scala +++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala @@ -35,10 +35,13 @@ class Scheduler(ssc: StreamingContext) extends Logging { // either set the manual clock to the last checkpointed time, // or if the property is defined set it to that time if (clock.isInstanceOf[ManualClock]) { - val lastTime = ssc.getInitialCheckpoint.checkpointTime.milliseconds + val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds val jumpTime = System.getProperty("spark.streaming.manualClock.jump", "0").toLong clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime) } + // Reschedule the batches that were received but not processed before failure + ssc.initialCheckpoint.pendingTimes.foreach(time => generateRDDs(time)) + // Restart the timer timer.restart(graph.zeroTime.milliseconds) logInfo("Scheduler's timer restarted") } else { diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 2cf00e3baa..5781b1cc72 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -133,7 +133,7 @@ class StreamingContext private ( } } - protected[streaming] def getInitialCheckpoint(): Checkpoint = { + protected[streaming] def initialCheckpoint: Checkpoint = { if (isCheckpointPresent) cp_ else null } @@ -367,7 +367,7 @@ class StreamingContext private ( } /** - * Sstops the execution of the streams. + * Stops the execution of the streams. */ def stop() { try { diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index 4f6204f205..34e51e9562 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -44,7 +44,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.master.port") } - + /* test("network input stream") { // Start the server testServer = new TestServer(testPort) @@ -236,8 +236,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { assert(output(i).head.toString === expectedOutput(i)) } } - - test("file input stream with checkpoint") { + */ + test("file input stream with master failure") { // Create a temporary directory testDir = { var temp = File.createTempFile(".temp.", Random.nextInt().toString) @@ -251,11 +251,17 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { var ssc = new StreamingContext(master, framework, batchDuration) ssc.checkpoint(checkpointDir, checkpointInterval) val fileStream = ssc.textFileStream(testDir.toString) - val outputBuffer = new ArrayBuffer[Seq[Int]] - // Reduced over a large window to ensure that recovery from master failure + // Making value 3 take large time to process, to ensure that the master + // shuts down in the middle of processing the 3rd batch + val mappedStream = fileStream.map(s => { + val i = s.toInt + if (i == 3) Thread.sleep(1000) + i + }) + // Reducing over a large window to ensure that recovery from master failure // requires reprocessing of all the files seen before the failure - val reducedStream = fileStream.map(_.toInt) - .reduceByWindow(_ + _, batchDuration * 30, batchDuration) + val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30, batchDuration) + val outputBuffer = new ArrayBuffer[Seq[Int]] var outputStream = new TestOutputStream(reducedStream, outputBuffer) ssc.registerOutputStream(outputStream) ssc.start() @@ -275,6 +281,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { assert(outputStream.output.size > 0, "No files processed before restart") ssc.stop() + // Create files while the master is down for (i <- Seq(4, 5, 6)) { FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") Thread.sleep(1000) @@ -293,6 +300,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { Thread.sleep(500) } Thread.sleep(1000) + logInfo("Output = " + outputStream.output.mkString(",")) assert(outputStream.output.size > 0, "No files processed after restart") ssc.stop() @@ -316,6 +324,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { assert(outputBuffer(i).head === expectedOutput(i)) } } + } |