diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-08 04:12:05 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-08 04:12:05 -0800 |
commit | a17cc602ac79b22457ed457023493fe82e9d39df (patch) | |
tree | db3991176da0aba713da8c3b7c6298aa67b54b02 /streaming/src | |
parent | 0b7a132d03d5a0106d85a8cca1ab28d6af9c8b55 (diff) | |
download | spark-a17cc602ac79b22457ed457023493fe82e9d39df.tar.gz spark-a17cc602ac79b22457ed457023493fe82e9d39df.tar.bz2 spark-a17cc602ac79b22457ed457023493fe82e9d39df.zip |
More bug fixes.
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala | 45 |
1 files changed, 26 insertions, 19 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala index cc2f08a7d1..e0567a1c19 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala @@ -22,7 +22,6 @@ import scala.reflect.ClassTag import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.FileSystem -import org.apache.hadoop.conf.Configuration import org.apache.spark.Logging @@ -53,16 +52,17 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) val checkpointFiles = dstream.generatedRDDs.filter(_._2.getCheckpointFile.isDefined) .map(x => (x._1, x._2.getCheckpointFile.get)) + logInfo("Current checkpoint files:\n" + checkpointFiles.toSeq.mkString("\n")) // Make a copy of the existing checkpoint data (checkpointed RDDs) - //lastCheckpointFiles = checkpointFiles.clone() + // lastCheckpointFiles = checkpointFiles.clone() // If the new checkpoint data has checkpoints then replace existing with the new one - if (currentCheckpointFiles.size > 0) { + if (!currentCheckpointFiles.isEmpty) { currentCheckpointFiles.clear() currentCheckpointFiles ++= checkpointFiles + allCheckpointFiles ++= currentCheckpointFiles + timeToLastCheckpointFileTime(time) = currentCheckpointFiles.keys.min(Time.ordering) } - allCheckpointFiles ++= currentCheckpointFiles - timeToLastCheckpointFileTime(time) = currentCheckpointFiles.keys.min(Time.ordering) } /** @@ -92,21 +92,28 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) } } */ - val lastCheckpointFileTime = timeToLastCheckpointFileTime.remove(time).get - allCheckpointFiles.filter(_._1 < lastCheckpointFileTime).foreach { - case (time, file) => - try { - val path = new Path(file) - if (fileSystem == null) { - fileSystem = path.getFileSystem(dstream.ssc.sparkContext.hadoopConfiguration) - } - fileSystem.delete(path, true) - allCheckpointFiles -= time - logInfo("Deleted checkpoint file '" + file + "' for time " + time) - } catch { - case e: Exception => - logWarning("Error deleting old checkpoint file '" + file + "' for time " + time, e) + timeToLastCheckpointFileTime.remove(time) match { + case Some(lastCheckpointFileTime) => + logInfo("Deleting all files before " + time) + val filesToDelete = allCheckpointFiles.filter(_._1 < lastCheckpointFileTime) + logInfo("Files to delete:\n" + filesToDelete.mkString(",")) + filesToDelete.foreach { + case (time, file) => + try { + val path = new Path(file) + if (fileSystem == null) { + fileSystem = path.getFileSystem(dstream.ssc.sparkContext.hadoopConfiguration) + } + fileSystem.delete(path, true) + allCheckpointFiles -= time + logInfo("Deleted checkpoint file '" + file + "' for time " + time) + } catch { + case e: Exception => + logWarning("Error deleting old checkpoint file '" + file + "' for time " + time, e) + } } + case None => + logInfo("Nothing to delete") } } |