aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-10 05:06:15 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-10 05:06:15 -0800
commit740730a17901f914d0e9d470b8f40e30be33a9bb (patch)
tree2785b33228ffd96b4d87dae491d34d20ccd3856a
parent38d75e18fa48bd230e2ff9478e87274fb0cceb9f (diff)
downloadspark-740730a17901f914d0e9d470b8f40e30be33a9bb.tar.gz
spark-740730a17901f914d0e9d470b8f40e30be33a9bb.tar.bz2
spark-740730a17901f914d0e9d470b8f40e30be33a9bb.zip
Fixed conf/slaves and updated docs.
-rw-r--r--conf/slaves3
-rw-r--r--core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala9
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala17
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala3
4 files changed, 23 insertions, 9 deletions
diff --git a/conf/slaves b/conf/slaves
index 2fbb50c4a8..da0a01343d 100644
--- a/conf/slaves
+++ b/conf/slaves
@@ -1 +1,2 @@
-localhost
+# A Spark Worker will be started on each of the machines listed below.
+localhost \ No newline at end of file
diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
index 9ce4ef744e..dde504fc52 100644
--- a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
@@ -26,9 +26,12 @@ import org.apache.spark.Logging
/**
* This is a custom implementation of scala.collection.mutable.Map which stores the insertion
- * time stamp along with each key-value pair. Key-value pairs that are older than a particular
- * threshold time can them be removed using the clearOldValues method. This is intended to be a drop-in
- * replacement of scala.collection.mutable.HashMap.
+ * timestamp along with each key-value pair. If specified, the timestamp of each pair can be
+ * updated every it is accessed. Key-value pairs whose timestamp are older than a particular
+ * threshold time can them be removed using the clearOldValues method. This is intended to
+ * be a drop-in replacement of scala.collection.mutable.HashMap.
+ * @param updateTimeStampOnGet When enabled, the timestamp of a pair will be
+ * updated when it is accessed
*/
class TimeStampedHashMap[A, B](updateTimeStampOnGet: Boolean = false)
extends Map[A, B]() with Logging {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala
index 1589bc19a2..671f7bbce7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala
@@ -34,8 +34,10 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
// Mapping of the batch time to the checkpointed RDD file of that time
@transient private var timeToCheckpointFile = new HashMap[Time, String]
- // Mapping of the batch time to the time of the oldest checkpointed RDD in that batch's checkpoint data
+ // Mapping of the batch time to the time of the oldest checkpointed RDD
+ // in that batch's checkpoint data
@transient private var timeToOldestCheckpointFileTime = new HashMap[Time, Time]
+
@transient private var fileSystem : FileSystem = null
protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]]
@@ -55,19 +57,26 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
if (!checkpointFiles.isEmpty) {
currentCheckpointFiles.clear()
currentCheckpointFiles ++= checkpointFiles
+ // Add the current checkpoint files to the map of all checkpoint files
+ // This will be used to delete old checkpoint files
timeToCheckpointFile ++= currentCheckpointFiles
+ // Remember the time of the oldest checkpoint RDD in current state
timeToOldestCheckpointFileTime(time) = currentCheckpointFiles.keys.min(Time.ordering)
}
}
/**
- * Cleanup old checkpoint data. This gets called every time the graph
- * checkpoint is initiated, but after `update` is called. Default
- * implementation, cleans up old checkpoint files.
+ * Cleanup old checkpoint data. This gets called after a checkpoint of `time` has been
+ * written to the checkpoint directory.
*/
def cleanup(time: Time) {
+ // Get the time of the oldest checkpointed RDD that was written as part of the
+ // checkpoint of `time`
timeToOldestCheckpointFileTime.remove(time) match {
case Some(lastCheckpointFileTime) =>
+ // Find all the checkpointed RDDs (i.e. files) that are older than `lastCheckpointFileTime`
+ // This is because checkpointed RDDs older than this are not going to be needed
+ // even after master fails, as the checkpoint data of `time` does not refer to those files
val filesToDelete = timeToCheckpointFile.filter(_._1 < lastCheckpointFileTime)
logDebug("Files to delete:\n" + filesToDelete.mkString(","))
filesToDelete.foreach {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 4585e3f6bd..38aa119239 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -98,7 +98,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
(time - rememberDuration) + ": " + oldFiles.keys.mkString(", "))
logDebug("Cleared files are:\n" +
oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
- // Delete file times that weren't accessed in the last round of getting new files
+ // Delete file mod times that weren't accessed in the last round of getting new files
fileModTimes.clearOldValues(lastNewFileFindingTime - 1)
}
@@ -147,6 +147,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
}
private def getFileModTime(path: Path) = {
+ // Get file mod time from cache or fetch it from the file system
fileModTimes.getOrElseUpdate(path.toString, fs.getFileStatus(path).getModificationTime())
}