aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-11-13 11:05:57 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2012-11-13 11:05:57 -0800
commit26fec8f0b850e7eb0b6cfe63770f2e68cd50441b (patch)
tree7b783760eac6e9eb3297831666715aff2e818ab0 /streaming
parentc3ccd14cf8d7c5a867992758b74922890408541e (diff)
downloadspark-26fec8f0b850e7eb0b6cfe63770f2e68cd50441b.tar.gz
spark-26fec8f0b850e7eb0b6cfe63770f2e68cd50441b.tar.bz2
spark-26fec8f0b850e7eb0b6cfe63770f2e68cd50441b.zip
Fixed bug in MappedValuesRDD, and set default graph checkpoint interval to be batch duration.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala23
3 files changed, 22 insertions, 5 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 7e6f73dd7d..76cdf8c464 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -230,7 +230,7 @@ extends Serializable with Logging {
}
if (checkpointInterval != null && (time - zeroTime).isMultipleOf(checkpointInterval)) {
newRDD.checkpoint()
- logInfo("Marking RDD for time " + time + " for checkpointing at time " + time)
+ logInfo("Marking RDD " + newRDD + " for time " + time + " for checkpointing at time " + time)
}
generatedRDDs.put(time, newRDD)
Some(newRDD)
diff --git a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
index b07d51fa6b..8b484e6acf 100644
--- a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
@@ -57,7 +57,7 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
override def checkpoint(interval: Time): DStream[(K, V)] = {
super.checkpoint(interval)
- reducedStream.checkpoint(interval)
+ //reducedStream.checkpoint(interval)
this
}
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index ef6a05a392..7a9a71f303 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -85,7 +85,7 @@ final class StreamingContext (
graph.setRememberDuration(duration)
}
- def checkpoint(dir: String, interval: Time) {
+ def checkpoint(dir: String, interval: Time = null) {
if (dir != null) {
sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(dir))
checkpointDir = dir
@@ -186,12 +186,29 @@ final class StreamingContext (
graph.addOutputStream(outputStream)
}
+ def validate() {
+ assert(graph != null, "Graph is null")
+ graph.validate()
+
+ assert(
+ checkpointDir == null || checkpointInterval != null,
+ "Checkpoint directory has been set, but the graph checkpointing interval has " +
+ "not been set. Please use StreamingContext.checkpoint() to set the interval."
+ )
+
+
+ }
+
+
/**
* This function starts the execution of the streams.
*/
def start() {
- assert(graph != null, "Graph is null")
- graph.validate()
+ if (checkpointDir != null && checkpointInterval == null && graph != null) {
+ checkpointInterval = graph.batchDuration
+ }
+
+ validate()
val networkInputStreams = graph.getInputStreams().filter(s => s match {
case n: NetworkInputDStream[_] => true