diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-10 19:48:42 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-10 19:48:42 -0800 |
commit | fd90daf850a922fe33c3638b18304d827953e2cb (patch) | |
tree | d58932b0e0526bb1227624d0a135c9d4826a3a38 /streaming/src/main | |
parent | 16baea62bce62987158acce0595a0916c25b32b2 (diff) | |
download | spark-fd90daf850a922fe33c3638b18304d827953e2cb.tar.gz spark-fd90daf850a922fe33c3638b18304d827953e2cb.tar.bz2 spark-fd90daf850a922fe33c3638b18304d827953e2cb.zip |
Fixed bugs in FileInputDStream and Scheduler that occasionally failed to reprocess old files after recovering from master failure. Completely modified spark.streaming.FailureTest to test multiple master failures using file input stream.
Diffstat (limited to 'streaming/src/main')
5 files changed, 21 insertions, 10 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala index d5a5496839..7aa9d20004 100644 --- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala @@ -81,12 +81,14 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { private[streaming] def generateRDDs(time: Time): Seq[Job] = { this.synchronized { + logInfo("Generating RDDs for time " + time) outputStreams.flatMap(outputStream => outputStream.generateJob(time)) } } private[streaming] def forgetOldRDDs(time: Time) { this.synchronized { + logInfo("Forgetting old RDDs for time " + time) outputStreams.foreach(_.forgetOldMetadata(time)) } } diff --git a/streaming/src/main/scala/spark/streaming/JobManager.scala b/streaming/src/main/scala/spark/streaming/JobManager.scala index 5acdd01e58..8b18c7bc6a 100644 --- a/streaming/src/main/scala/spark/streaming/JobManager.scala +++ b/streaming/src/main/scala/spark/streaming/JobManager.scala @@ -15,8 +15,8 @@ class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging { SparkEnv.set(ssc.env) try { val timeTaken = job.run() - logInfo("Total delay: %.5f s for job %s (execution: %.5f s)".format( - (System.currentTimeMillis() - job.time.milliseconds) / 1000.0, job.id, timeTaken / 1000.0)) + logInfo("Total delay: %.5f s for job %s of time %s (execution: %.5f s)".format( + (System.currentTimeMillis() - job.time.milliseconds) / 1000.0, job.id, job.time.milliseconds, timeTaken / 1000.0)) } catch { case e: Exception => logError("Running " + job + " failed", e) diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala index b77986a3ba..23a0f0974d 100644 --- a/streaming/src/main/scala/spark/streaming/Scheduler.scala +++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala @@ -40,7 +40,11 @@ class Scheduler(ssc: StreamingContext) extends Logging { clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime) } // Reschedule the batches that were received but not processed before failure - ssc.initialCheckpoint.pendingTimes.foreach(time => generateRDDs(time)) + //ssc.initialCheckpoint.pendingTimes.foreach(time => generateRDDs(time)) + val pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted(Time.ordering) + println(pendingTimes.mkString(", ")) + pendingTimes.foreach(time => + graph.generateRDDs(time).foreach(jobManager.runJob)) // Restart the timer timer.restart(graph.zeroTime.milliseconds) logInfo("Scheduler's timer restarted") @@ -64,11 +68,11 @@ class Scheduler(ssc: StreamingContext) extends Logging { graph.generateRDDs(time).foreach(jobManager.runJob) graph.forgetOldRDDs(time) doCheckpoint(time) - logInfo("Generated RDDs for time " + time) } private def doCheckpoint(time: Time) { if (ssc.checkpointDuration != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) { + logInfo("Checkpointing graph for time " + time) val startTime = System.currentTimeMillis() ssc.graph.updateCheckpointData(time) checkpointWriter.write(new Checkpoint(ssc, time)) diff --git a/streaming/src/main/scala/spark/streaming/Time.scala b/streaming/src/main/scala/spark/streaming/Time.scala index 5daeb761dd..8a6c9a5cb5 100644 --- a/streaming/src/main/scala/spark/streaming/Time.scala +++ b/streaming/src/main/scala/spark/streaming/Time.scala @@ -39,4 +39,8 @@ case class Time(private val millis: Long) { override def toString: String = (millis.toString + " ms") +} + +object Time { + val ordering = Ordering.by((time: Time) => time.millis) }
\ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala index c6ffb252ce..10ccb4318d 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala @@ -128,7 +128,7 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K private[streaming] class FileInputDStreamCheckpointData extends DStreamCheckpointData(this) { - def hadoopFiles = data.asInstanceOf[HashMap[Time, Array[String]]] + def hadoopFiles = data.asInstanceOf[HashMap[Time, Array[String]]] override def update() { hadoopFiles.clear() @@ -139,11 +139,12 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K override def restore() { hadoopFiles.foreach { - case (time, files) => { - logInfo("Restoring Hadoop RDD for time " + time + " from files " + - files.mkString("[", ",", "]") ) - files - generatedRDDs += ((time, filesToRDD(files))) + case (t, f) => { + // Restore the metadata in both files and generatedRDDs + logInfo("Restoring files for time " + t + " - " + + f.mkString("[", ", ", "]") ) + files += ((t, f)) + generatedRDDs += ((t, filesToRDD(f))) } } } |