aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala9
1 files changed, 4 insertions, 5 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
index 365a6bc417..e73837eb96 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
@@ -29,7 +29,7 @@ import org.apache.spark.streaming.Time
import org.apache.spark.util.Utils
private[streaming]
-class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
+class DStreamCheckpointData[T: ClassTag](dstream: DStream[T])
extends Serializable with Logging {
protected val data = new HashMap[Time, AnyRef]()
@@ -45,7 +45,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
/**
* Updates the checkpoint data of the DStream. This gets called every time
* the graph checkpoint is initiated. Default implementation records the
- * checkpoint files to which the generate RDDs of the DStream has been saved.
+ * checkpoint files at which the generated RDDs of the DStream have been saved.
*/
def update(time: Time) {
@@ -103,16 +103,15 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
/**
* Restore the checkpoint data. This gets called once when the DStream graph
- * (along with its DStreams) are being restored from a graph checkpoint file.
+ * (along with its output DStreams) is being restored from a graph checkpoint file.
* Default implementation restores the RDDs from their checkpoint files.
*/
def restore() {
// Create RDDs from the checkpoint data
currentCheckpointFiles.foreach {
- case(time, file) => {
+ case(time, file) =>
logInfo("Restoring checkpointed RDD for time " + time + " from file '" + file + "'")
dstream.generatedRDDs += ((time, dstream.context.sparkContext.checkpointFile[T](file)))
- }
}
}