aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-08 04:12:05 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-08 04:12:05 -0800
commita17cc602ac79b22457ed457023493fe82e9d39df (patch)
treedb3991176da0aba713da8c3b7c6298aa67b54b02 /streaming
parent0b7a132d03d5a0106d85a8cca1ab28d6af9c8b55 (diff)
downloadspark-a17cc602ac79b22457ed457023493fe82e9d39df.tar.gz
spark-a17cc602ac79b22457ed457023493fe82e9d39df.tar.bz2
spark-a17cc602ac79b22457ed457023493fe82e9d39df.zip
More bug fixes.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala45
1 files changed, 26 insertions, 19 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala
index cc2f08a7d1..e0567a1c19 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala
@@ -22,7 +22,6 @@ import scala.reflect.ClassTag
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
-import org.apache.hadoop.conf.Configuration
import org.apache.spark.Logging
@@ -53,16 +52,17 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
val checkpointFiles = dstream.generatedRDDs.filter(_._2.getCheckpointFile.isDefined)
.map(x => (x._1, x._2.getCheckpointFile.get))
+ logInfo("Current checkpoint files:\n" + checkpointFiles.toSeq.mkString("\n"))
// Make a copy of the existing checkpoint data (checkpointed RDDs)
- //lastCheckpointFiles = checkpointFiles.clone()
+ // lastCheckpointFiles = checkpointFiles.clone()
// If the new checkpoint data has checkpoints then replace existing with the new one
- if (currentCheckpointFiles.size > 0) {
+ if (!currentCheckpointFiles.isEmpty) {
currentCheckpointFiles.clear()
currentCheckpointFiles ++= checkpointFiles
+ allCheckpointFiles ++= currentCheckpointFiles
+ timeToLastCheckpointFileTime(time) = currentCheckpointFiles.keys.min(Time.ordering)
}
- allCheckpointFiles ++= currentCheckpointFiles
- timeToLastCheckpointFileTime(time) = currentCheckpointFiles.keys.min(Time.ordering)
}
/**
@@ -92,21 +92,28 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
}
}
*/
- val lastCheckpointFileTime = timeToLastCheckpointFileTime.remove(time).get
- allCheckpointFiles.filter(_._1 < lastCheckpointFileTime).foreach {
- case (time, file) =>
- try {
- val path = new Path(file)
- if (fileSystem == null) {
- fileSystem = path.getFileSystem(dstream.ssc.sparkContext.hadoopConfiguration)
- }
- fileSystem.delete(path, true)
- allCheckpointFiles -= time
- logInfo("Deleted checkpoint file '" + file + "' for time " + time)
- } catch {
- case e: Exception =>
- logWarning("Error deleting old checkpoint file '" + file + "' for time " + time, e)
+ timeToLastCheckpointFileTime.remove(time) match {
+ case Some(lastCheckpointFileTime) =>
+ logInfo("Deleting all files before " + time)
+ val filesToDelete = allCheckpointFiles.filter(_._1 < lastCheckpointFileTime)
+ logInfo("Files to delete:\n" + filesToDelete.mkString(","))
+ filesToDelete.foreach {
+ case (time, file) =>
+ try {
+ val path = new Path(file)
+ if (fileSystem == null) {
+ fileSystem = path.getFileSystem(dstream.ssc.sparkContext.hadoopConfiguration)
+ }
+ fileSystem.delete(path, true)
+ allCheckpointFiles -= time
+ logInfo("Deleted checkpoint file '" + file + "' for time " + time)
+ } catch {
+ case e: Exception =>
+ logWarning("Error deleting old checkpoint file '" + file + "' for time " + time, e)
+ }
}
+ case None =>
+ logInfo("Nothing to delete")
}
}