aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-11-17 17:27:00 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2012-11-17 17:27:00 -0800
commit10c1abcb6ac42b248818fa585a9ad49c2fa4851a (patch)
tree03d150d3914319bc1351b4a233f1f50da5eeb905 /streaming
parent26fec8f0b850e7eb0b6cfe63770f2e68cd50441b (diff)
downloadspark-10c1abcb6ac42b248818fa585a9ad49c2fa4851a.tar.gz
spark-10c1abcb6ac42b248818fa585a9ad49c2fa4851a.tar.bz2
spark-10c1abcb6ac42b248818fa585a9ad49c2fa4851a.zip
Fixed checkpointing bug in CoGroupedRDD. CoGroupSplits kept around the RDD splits of its parent RDDs, thus checkpointing its parents did not release the references to the parent splits.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/DStreamGraph.scala2
2 files changed, 3 insertions, 3 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 76cdf8c464..13770aa8fd 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -226,11 +226,11 @@ extends Serializable with Logging {
case Some(newRDD) =>
if (storageLevel != StorageLevel.NONE) {
newRDD.persist(storageLevel)
- logInfo("Persisting RDD for time " + time + " to " + storageLevel + " at time " + time)
+ logInfo("Persisting RDD " + newRDD.id + " for time " + time + " to " + storageLevel + " at time " + time)
}
if (checkpointInterval != null && (time - zeroTime).isMultipleOf(checkpointInterval)) {
newRDD.checkpoint()
- logInfo("Marking RDD " + newRDD + " for time " + time + " for checkpointing at time " + time)
+ logInfo("Marking RDD " + newRDD.id + " for time " + time + " for checkpointing at time " + time)
}
generatedRDDs.put(time, newRDD)
Some(newRDD)
diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
index 246522838a..bd8c033eab 100644
--- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
@@ -105,7 +105,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
private[streaming] def validate() {
this.synchronized {
assert(batchDuration != null, "Batch duration has not been set")
- assert(batchDuration > Milliseconds(100), "Batch duration of " + batchDuration + " is very low")
+ //assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration + " is very low")
assert(getOutputStreams().size > 0, "No output streams registered, so nothing to execute")
}
}