diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-10 05:06:15 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-10 05:06:15 -0800 |
commit | 740730a17901f914d0e9d470b8f40e30be33a9bb (patch) | |
tree | 2785b33228ffd96b4d87dae491d34d20ccd3856a /streaming | |
parent | 38d75e18fa48bd230e2ff9478e87274fb0cceb9f (diff) | |
download | spark-740730a17901f914d0e9d470b8f40e30be33a9bb.tar.gz spark-740730a17901f914d0e9d470b8f40e30be33a9bb.tar.bz2 spark-740730a17901f914d0e9d470b8f40e30be33a9bb.zip |
Fixed conf/slaves and updated docs.
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala | 17 | ||||
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala | 3 |
2 files changed, 15 insertions, 5 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala index 1589bc19a2..671f7bbce7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala @@ -34,8 +34,10 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) // Mapping of the batch time to the checkpointed RDD file of that time @transient private var timeToCheckpointFile = new HashMap[Time, String] - // Mapping of the batch time to the time of the oldest checkpointed RDD in that batch's checkpoint data + // Mapping of the batch time to the time of the oldest checkpointed RDD + // in that batch's checkpoint data @transient private var timeToOldestCheckpointFileTime = new HashMap[Time, Time] + @transient private var fileSystem : FileSystem = null protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]] @@ -55,19 +57,26 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) if (!checkpointFiles.isEmpty) { currentCheckpointFiles.clear() currentCheckpointFiles ++= checkpointFiles + // Add the current checkpoint files to the map of all checkpoint files + // This will be used to delete old checkpoint files timeToCheckpointFile ++= currentCheckpointFiles + // Remember the time of the oldest checkpoint RDD in current state timeToOldestCheckpointFileTime(time) = currentCheckpointFiles.keys.min(Time.ordering) } } /** - * Cleanup old checkpoint data. This gets called every time the graph - * checkpoint is initiated, but after `update` is called. Default - * implementation, cleans up old checkpoint files. + * Cleanup old checkpoint data. This gets called after a checkpoint of `time` has been + * written to the checkpoint directory. */ def cleanup(time: Time) { + // Get the time of the oldest checkpointed RDD that was written as part of the + // checkpoint of `time` timeToOldestCheckpointFileTime.remove(time) match { case Some(lastCheckpointFileTime) => + // Find all the checkpointed RDDs (i.e. files) that are older than `lastCheckpointFileTime` + // This is because checkpointed RDDs older than this are not going to be needed + // even after master fails, as the checkpoint data of `time` does not refer to those files val filesToDelete = timeToCheckpointFile.filter(_._1 < lastCheckpointFileTime) logDebug("Files to delete:\n" + filesToDelete.mkString(",")) filesToDelete.foreach { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 4585e3f6bd..38aa119239 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -98,7 +98,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas (time - rememberDuration) + ": " + oldFiles.keys.mkString(", ")) logDebug("Cleared files are:\n" + oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n")) - // Delete file times that weren't accessed in the last round of getting new files + // Delete file mod times that weren't accessed in the last round of getting new files fileModTimes.clearOldValues(lastNewFileFindingTime - 1) } @@ -147,6 +147,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas } private def getFileModTime(path: Path) = { + // Get file mod time from cache or fetch it from the file system fileModTimes.getOrElseUpdate(path.toString, fs.getFileStatus(path).getModificationTime()) } |