From 26fec8f0b850e7eb0b6cfe63770f2e68cd50441b Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 13 Nov 2012 11:05:57 -0800 Subject: Fixed bug in MappedValuesRDD, and set default graph checkpoint interval to be batch duration. --- .../src/main/scala/spark/streaming/DStream.scala | 2 +- .../spark/streaming/ReducedWindowedDStream.scala | 2 +- .../scala/spark/streaming/StreamingContext.scala | 23 +++++++++++++++++++--- 3 files changed, 22 insertions(+), 5 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 7e6f73dd7d..76cdf8c464 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -230,7 +230,7 @@ extends Serializable with Logging { } if (checkpointInterval != null && (time - zeroTime).isMultipleOf(checkpointInterval)) { newRDD.checkpoint() - logInfo("Marking RDD for time " + time + " for checkpointing at time " + time) + logInfo("Marking RDD " + newRDD + " for time " + time + " for checkpointing at time " + time) } generatedRDDs.put(time, newRDD) Some(newRDD) diff --git a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala index b07d51fa6b..8b484e6acf 100644 --- a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala @@ -57,7 +57,7 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( override def checkpoint(interval: Time): DStream[(K, V)] = { super.checkpoint(interval) - reducedStream.checkpoint(interval) + //reducedStream.checkpoint(interval) this } diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index ef6a05a392..7a9a71f303 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -85,7 +85,7 @@ final class StreamingContext ( graph.setRememberDuration(duration) } - def checkpoint(dir: String, interval: Time) { + def checkpoint(dir: String, interval: Time = null) { if (dir != null) { sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(dir)) checkpointDir = dir @@ -186,12 +186,29 @@ final class StreamingContext ( graph.addOutputStream(outputStream) } + def validate() { + assert(graph != null, "Graph is null") + graph.validate() + + assert( + checkpointDir == null || checkpointInterval != null, + "Checkpoint directory has been set, but the graph checkpointing interval has " + + "not been set. Please use StreamingContext.checkpoint() to set the interval." + ) + + + } + + /** * This function starts the execution of the streams. */ def start() { - assert(graph != null, "Graph is null") - graph.validate() + if (checkpointDir != null && checkpointInterval == null && graph != null) { + checkpointInterval = graph.batchDuration + } + + validate() val networkInputStreams = graph.getInputStreams().filter(s => s match { case n: NetworkInputDStream[_] => true -- cgit v1.2.3