aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-02-10 19:48:42 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-02-10 19:48:42 -0800
commitfd90daf850a922fe33c3638b18304d827953e2cb (patch)
treed58932b0e0526bb1227624d0a135c9d4826a3a38 /streaming/src/main
parent16baea62bce62987158acce0595a0916c25b32b2 (diff)
downloadspark-fd90daf850a922fe33c3638b18304d827953e2cb.tar.gz
spark-fd90daf850a922fe33c3638b18304d827953e2cb.tar.bz2
spark-fd90daf850a922fe33c3638b18304d827953e2cb.zip
Fixed bugs in FileInputDStream and Scheduler that occasionally failed to reprocess old files after recovering from master failure. Completely modified spark.streaming.FailureTest to test multiple master failures using file input stream.
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/spark/streaming/DStreamGraph.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/JobManager.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/Scheduler.scala8
-rw-r--r--streaming/src/main/scala/spark/streaming/Time.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala13
5 files changed, 21 insertions, 10 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
index d5a5496839..7aa9d20004 100644
--- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
@@ -81,12 +81,14 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
private[streaming] def generateRDDs(time: Time): Seq[Job] = {
this.synchronized {
+ logInfo("Generating RDDs for time " + time)
outputStreams.flatMap(outputStream => outputStream.generateJob(time))
}
}
private[streaming] def forgetOldRDDs(time: Time) {
this.synchronized {
+ logInfo("Forgetting old RDDs for time " + time)
outputStreams.foreach(_.forgetOldMetadata(time))
}
}
diff --git a/streaming/src/main/scala/spark/streaming/JobManager.scala b/streaming/src/main/scala/spark/streaming/JobManager.scala
index 5acdd01e58..8b18c7bc6a 100644
--- a/streaming/src/main/scala/spark/streaming/JobManager.scala
+++ b/streaming/src/main/scala/spark/streaming/JobManager.scala
@@ -15,8 +15,8 @@ class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging {
SparkEnv.set(ssc.env)
try {
val timeTaken = job.run()
- logInfo("Total delay: %.5f s for job %s (execution: %.5f s)".format(
- (System.currentTimeMillis() - job.time.milliseconds) / 1000.0, job.id, timeTaken / 1000.0))
+ logInfo("Total delay: %.5f s for job %s of time %s (execution: %.5f s)".format(
+ (System.currentTimeMillis() - job.time.milliseconds) / 1000.0, job.id, job.time.milliseconds, timeTaken / 1000.0))
} catch {
case e: Exception =>
logError("Running " + job + " failed", e)
diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala
index b77986a3ba..23a0f0974d 100644
--- a/streaming/src/main/scala/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala
@@ -40,7 +40,11 @@ class Scheduler(ssc: StreamingContext) extends Logging {
clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
}
// Reschedule the batches that were received but not processed before failure
- ssc.initialCheckpoint.pendingTimes.foreach(time => generateRDDs(time))
+ //ssc.initialCheckpoint.pendingTimes.foreach(time => generateRDDs(time))
+ val pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted(Time.ordering)
+ println(pendingTimes.mkString(", "))
+ pendingTimes.foreach(time =>
+ graph.generateRDDs(time).foreach(jobManager.runJob))
// Restart the timer
timer.restart(graph.zeroTime.milliseconds)
logInfo("Scheduler's timer restarted")
@@ -64,11 +68,11 @@ class Scheduler(ssc: StreamingContext) extends Logging {
graph.generateRDDs(time).foreach(jobManager.runJob)
graph.forgetOldRDDs(time)
doCheckpoint(time)
- logInfo("Generated RDDs for time " + time)
}
private def doCheckpoint(time: Time) {
if (ssc.checkpointDuration != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
+ logInfo("Checkpointing graph for time " + time)
val startTime = System.currentTimeMillis()
ssc.graph.updateCheckpointData(time)
checkpointWriter.write(new Checkpoint(ssc, time))
diff --git a/streaming/src/main/scala/spark/streaming/Time.scala b/streaming/src/main/scala/spark/streaming/Time.scala
index 5daeb761dd..8a6c9a5cb5 100644
--- a/streaming/src/main/scala/spark/streaming/Time.scala
+++ b/streaming/src/main/scala/spark/streaming/Time.scala
@@ -39,4 +39,8 @@ case class Time(private val millis: Long) {
override def toString: String = (millis.toString + " ms")
+}
+
+object Time {
+ val ordering = Ordering.by((time: Time) => time.millis)
} \ No newline at end of file
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
index c6ffb252ce..10ccb4318d 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
@@ -128,7 +128,7 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
private[streaming]
class FileInputDStreamCheckpointData extends DStreamCheckpointData(this) {
- def hadoopFiles = data.asInstanceOf[HashMap[Time, Array[String]]]
+ def hadoopFiles = data.asInstanceOf[HashMap[Time, Array[String]]]
override def update() {
hadoopFiles.clear()
@@ -139,11 +139,12 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
override def restore() {
hadoopFiles.foreach {
- case (time, files) => {
- logInfo("Restoring Hadoop RDD for time " + time + " from files " +
- files.mkString("[", ",", "]") )
- files
- generatedRDDs += ((time, filesToRDD(files)))
+ case (t, f) => {
+ // Restore the metadata in both files and generatedRDDs
+ logInfo("Restoring files for time " + t + " - " +
+ f.mkString("[", ", ", "]") )
+ files += ((t, f))
+ generatedRDDs += ((t, filesToRDD(f)))
}
}
}