diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-06 02:21:56 +0000 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-06 02:21:56 +0000 |
commit | 8e88db3ca56e6a56668b029e39c8e96b86d4dd5e (patch) | |
tree | 1128e7b153b788fe3fa040e434ad469f57c3cd4c /streaming | |
parent | 3d4474330d9cd7d7c1b1e9fc1f8678bc6ee905e9 (diff) | |
download | spark-8e88db3ca56e6a56668b029e39c8e96b86d4dd5e.tar.gz spark-8e88db3ca56e6a56668b029e39c8e96b86d4dd5e.tar.bz2 spark-8e88db3ca56e6a56668b029e39c8e96b86d4dd5e.zip |
Bug fixes to the DriverRunner and minor changes here and there.
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala | 2 | ||||
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala | 8 |
2 files changed, 5 insertions, 5 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 39e25239bf..a5a5f2e751 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -175,7 +175,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas override def cleanup() { } override def restore() { - hadoopFiles.foreach { + hadoopFiles.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, f) => { // Restore the metadata in both files and generatedRDDs logInfo("Restoring files for time " + t + " - " + diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 1cd0b9b0a4..6c1df4f9c9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -85,14 +85,14 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { val checkpointTime = ssc.initialCheckpoint.checkpointTime val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds)) val downTimes = checkpointTime.until(restartTime, batchDuration) - logInfo("Batches during down time: " + downTimes.mkString(", ")) + logInfo("Batches during down time (" + downTimes.size + " batches): " + downTimes.mkString(", ")) // Batches that were unprocessed before failure - val pendingTimes = ssc.initialCheckpoint.pendingTimes - logInfo("Batches pending processing: " + pendingTimes.mkString(", ")) + val pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted(Time.ordering) + logInfo("Batches pending processing (" + pendingTimes.size + " batches): " + pendingTimes.mkString(", ")) // Reschedule jobs for these times val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering) - logInfo("Batches to reschedule: " + timesToReschedule.mkString(", ")) + logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " + timesToReschedule.mkString(", ")) timesToReschedule.foreach(time => jobScheduler.runJobs(time, graph.generateJobs(time)) ) |