diff options
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala | 4 |
1 files changed, 3 insertions, 1 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index d48b51aa69..d043200f71 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -341,9 +341,11 @@ abstract class DStream[T: ClassTag] ( */ private[streaming] def clearMetadata(time: Time) { val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration)) + logDebug("Clearing references to old RDDs: [" + + oldRDDs.map(x => s"${x._1} -> ${x._2.id}").mkString(", ") + "]") generatedRDDs --= oldRDDs.keys if (ssc.conf.getBoolean("spark.streaming.unpersist", false)) { - logDebug("Unpersisting old RDDs: " + oldRDDs.keys.mkString(", ")) + logDebug("Unpersisting old RDDs: " + oldRDDs.values.map(_.id).mkString(", ")) oldRDDs.values.foreach(_.unpersist(false)) } logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " + |