From 740730a17901f914d0e9d470b8f40e30be33a9bb Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 10 Jan 2014 05:06:15 -0800 Subject: Fixed conf/slaves and updated docs. --- conf/slaves | 3 ++- .../org/apache/spark/util/TimeStampedHashMap.scala | 9 ++++++--- .../apache/spark/streaming/DStreamCheckpointData.scala | 17 +++++++++++++---- .../spark/streaming/dstream/FileInputDStream.scala | 3 ++- 4 files changed, 23 insertions(+), 9 deletions(-) diff --git a/conf/slaves b/conf/slaves index 2fbb50c4a8..da0a01343d 100644 --- a/conf/slaves +++ b/conf/slaves @@ -1 +1,2 @@ -localhost +# A Spark Worker will be started on each of the machines listed below. +localhost \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala index 9ce4ef744e..dde504fc52 100644 --- a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala @@ -26,9 +26,12 @@ import org.apache.spark.Logging /** * This is a custom implementation of scala.collection.mutable.Map which stores the insertion - * time stamp along with each key-value pair. Key-value pairs that are older than a particular - * threshold time can them be removed using the clearOldValues method. This is intended to be a drop-in - * replacement of scala.collection.mutable.HashMap. + * timestamp along with each key-value pair. If specified, the timestamp of each pair can be + * updated every it is accessed. Key-value pairs whose timestamp are older than a particular + * threshold time can them be removed using the clearOldValues method. This is intended to + * be a drop-in replacement of scala.collection.mutable.HashMap. + * @param updateTimeStampOnGet When enabled, the timestamp of a pair will be + * updated when it is accessed */ class TimeStampedHashMap[A, B](updateTimeStampOnGet: Boolean = false) extends Map[A, B]() with Logging { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala index 1589bc19a2..671f7bbce7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala @@ -34,8 +34,10 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) // Mapping of the batch time to the checkpointed RDD file of that time @transient private var timeToCheckpointFile = new HashMap[Time, String] - // Mapping of the batch time to the time of the oldest checkpointed RDD in that batch's checkpoint data + // Mapping of the batch time to the time of the oldest checkpointed RDD + // in that batch's checkpoint data @transient private var timeToOldestCheckpointFileTime = new HashMap[Time, Time] + @transient private var fileSystem : FileSystem = null protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]] @@ -55,19 +57,26 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) if (!checkpointFiles.isEmpty) { currentCheckpointFiles.clear() currentCheckpointFiles ++= checkpointFiles + // Add the current checkpoint files to the map of all checkpoint files + // This will be used to delete old checkpoint files timeToCheckpointFile ++= currentCheckpointFiles + // Remember the time of the oldest checkpoint RDD in current state timeToOldestCheckpointFileTime(time) = currentCheckpointFiles.keys.min(Time.ordering) } } /** - * Cleanup old checkpoint data. This gets called every time the graph - * checkpoint is initiated, but after `update` is called. Default - * implementation, cleans up old checkpoint files. + * Cleanup old checkpoint data. This gets called after a checkpoint of `time` has been + * written to the checkpoint directory. */ def cleanup(time: Time) { + // Get the time of the oldest checkpointed RDD that was written as part of the + // checkpoint of `time` timeToOldestCheckpointFileTime.remove(time) match { case Some(lastCheckpointFileTime) => + // Find all the checkpointed RDDs (i.e. files) that are older than `lastCheckpointFileTime` + // This is because checkpointed RDDs older than this are not going to be needed + // even after master fails, as the checkpoint data of `time` does not refer to those files val filesToDelete = timeToCheckpointFile.filter(_._1 < lastCheckpointFileTime) logDebug("Files to delete:\n" + filesToDelete.mkString(",")) filesToDelete.foreach { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 4585e3f6bd..38aa119239 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -98,7 +98,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas (time - rememberDuration) + ": " + oldFiles.keys.mkString(", ")) logDebug("Cleared files are:\n" + oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n")) - // Delete file times that weren't accessed in the last round of getting new files + // Delete file mod times that weren't accessed in the last round of getting new files fileModTimes.clearOldValues(lastNewFileFindingTime - 1) } @@ -147,6 +147,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas } private def getFileModTime(path: Path) = { + // Get file mod time from cache or fetch it from the file system fileModTimes.getOrElseUpdate(path.toString, fs.getFileStatus(path).getModificationTime()) } -- cgit v1.2.3