diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2012-11-17 17:27:00 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2012-11-17 17:27:00 -0800 |
commit | 10c1abcb6ac42b248818fa585a9ad49c2fa4851a (patch) | |
tree | 03d150d3914319bc1351b4a233f1f50da5eeb905 /streaming | |
parent | 26fec8f0b850e7eb0b6cfe63770f2e68cd50441b (diff) | |
download | spark-10c1abcb6ac42b248818fa585a9ad49c2fa4851a.tar.gz spark-10c1abcb6ac42b248818fa585a9ad49c2fa4851a.tar.bz2 spark-10c1abcb6ac42b248818fa585a9ad49c2fa4851a.zip |
Fixed checkpointing bug in CoGroupedRDD. CoGroupSplits kept around the RDD splits of its parent RDDs, thus checkpointing its parents did not release the references to the parent splits.
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/DStream.scala | 4 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/DStreamGraph.scala | 2 |
2 files changed, 3 insertions, 3 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 76cdf8c464..13770aa8fd 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -226,11 +226,11 @@ extends Serializable with Logging { case Some(newRDD) => if (storageLevel != StorageLevel.NONE) { newRDD.persist(storageLevel) - logInfo("Persisting RDD for time " + time + " to " + storageLevel + " at time " + time) + logInfo("Persisting RDD " + newRDD.id + " for time " + time + " to " + storageLevel + " at time " + time) } if (checkpointInterval != null && (time - zeroTime).isMultipleOf(checkpointInterval)) { newRDD.checkpoint() - logInfo("Marking RDD " + newRDD + " for time " + time + " for checkpointing at time " + time) + logInfo("Marking RDD " + newRDD.id + " for time " + time + " for checkpointing at time " + time) } generatedRDDs.put(time, newRDD) Some(newRDD) diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala index 246522838a..bd8c033eab 100644 --- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala @@ -105,7 +105,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { private[streaming] def validate() { this.synchronized { assert(batchDuration != null, "Batch duration has not been set") - assert(batchDuration > Milliseconds(100), "Batch duration of " + batchDuration + " is very low") + //assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration + " is very low") assert(getOutputStreams().size > 0, "No output streams registered, so nothing to execute") } } |