diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-10 15:47:01 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-10 15:47:01 -0800 |
commit | 4f39e79c23b32a411a0d5fdc86b5c17ab2250f8d (patch) | |
tree | e62d1139d89f69c2281659d0919a566303d5d7a7 /streaming | |
parent | 82f07deeda89be2ad34e39ce83ac624c73b8d6e1 (diff) | |
parent | 7cef8435d7b6b43a33e8be684c769412186ad6ac (diff) | |
download | spark-4f39e79c23b32a411a0d5fdc86b5c17ab2250f8d.tar.gz spark-4f39e79c23b32a411a0d5fdc86b5c17ab2250f8d.tar.bz2 spark-4f39e79c23b32a411a0d5fdc86b5c17ab2250f8d.zip |
Merge remote-tracking branch 'apache/master' into driver-test
Conflicts:
streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/DStream.scala | 2 | ||||
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala | 14 |
2 files changed, 8 insertions, 8 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala index 2cdd45291d..b98f4a5101 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala @@ -332,7 +332,7 @@ abstract class DStream[T: ClassTag] ( protected[streaming] def clearMetadata(time: Time) { val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration)) generatedRDDs --= oldRDDs.keys - logInfo("Cleared " + oldRDDs.size + " RDDs that were older than " + + logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " + (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", ")) dependencies.foreach(_.clearMetadata(time)) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index 0cca6d50e6..eee9591ffc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -104,20 +104,20 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { def getOutputStreams() = this.synchronized { outputStreams.toArray } def generateJobs(time: Time): Seq[Job] = { - logInfo("Generating jobs for time " + time) - this.synchronized { - val jobs = outputStreams.flatMap(outputStream => outputStream.generateJob(time)) - logInfo("Generated " + jobs.length + " jobs for time " + time) - jobs + logDebug("Generating jobs for time " + time) + val jobs = this.synchronized { + outputStreams.flatMap(outputStream => outputStream.generateJob(time)) } + logDebug("Generated " + jobs.length + " jobs for time " + time) + jobs } def clearMetadata(time: Time) { - logInfo("Clearing metadata for time " + time) + logDebug("Clearing metadata for time " + time) this.synchronized { outputStreams.foreach(_.clearMetadata(time)) } - logInfo("Cleared old metadata for time " + time) + logDebug("Cleared old metadata for time " + time) } def updateCheckpointData(time: Time) { |