diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2012-11-19 23:22:07 +0000 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2012-11-19 23:22:07 +0000 |
commit | c97ebf64377e853ab7c616a103869a4417f25954 (patch) | |
tree | b915b399fa0369c7398e1916e4b003635b388daa /streaming/src | |
parent | 3fd7b8319ba9886a74fb4abce96bbc88c98472dc (diff) | |
download | spark-c97ebf64377e853ab7c616a103869a4417f25954.tar.gz spark-c97ebf64377e853ab7c616a103869a4417f25954.tar.bz2 spark-c97ebf64377e853ab7c616a103869a4417f25954.zip |
Fixed bug in the number of splits in RDD after checkpointing. Modified reduceByKeyAndWindow (naive) computation from window+reduceByKey to reduceByKey+window+reduceByKey.
Diffstat (limited to 'streaming/src')
4 files changed, 11 insertions, 3 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 13770aa8fd..26d5ce9198 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -321,7 +321,8 @@ extends Serializable with Logging { } } } - logInfo("Updated checkpoint data for time " + currentTime) + logInfo("Updated checkpoint data for time " + currentTime + ", " + checkpointData.size + " checkpoints, " + + "[" + checkpointData.mkString(",") + "]") } /** diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala index e09d27d34f..720e63bba0 100644 --- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala @@ -4,6 +4,7 @@ import spark.streaming.StreamingContext._ import spark.{Manifests, RDD, Partitioner, HashPartitioner} import spark.SparkContext._ +import spark.storage.StorageLevel import scala.collection.mutable.ArrayBuffer @@ -115,7 +116,10 @@ extends Serializable { slideTime: Time, partitioner: Partitioner ): DStream[(K, V)] = { - self.window(windowTime, slideTime).reduceByKey(ssc.sc.clean(reduceFunc), partitioner) + val cleanedReduceFunc = ssc.sc.clean(reduceFunc) + self.reduceByKey(cleanedReduceFunc, partitioner) + .window(windowTime, slideTime) + .reduceByKey(cleanedReduceFunc, partitioner) } // This method is the efficient sliding window reduce operation, diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala index e2dca91179..014021be61 100644 --- a/streaming/src/main/scala/spark/streaming/Scheduler.scala +++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala @@ -17,7 +17,7 @@ extends Logging { val graph = ssc.graph - val concurrentJobs = System.getProperty("spark.stream.concurrentJobs", "1").toInt + val concurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt val jobManager = new JobManager(ssc, concurrentJobs) val checkpointWriter = if (ssc.checkpointInterval != null && ssc.checkpointDir != null) { diff --git a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala b/streaming/src/main/scala/spark/streaming/WindowedDStream.scala index ce89a3f99b..e4d2a634f5 100644 --- a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/WindowedDStream.scala @@ -2,6 +2,7 @@ package spark.streaming import spark.RDD import spark.rdd.UnionRDD +import spark.storage.StorageLevel class WindowedDStream[T: ClassManifest]( @@ -18,6 +19,8 @@ class WindowedDStream[T: ClassManifest]( throw new Exception("The slide duration of WindowedDStream (" + _slideTime + ") " + "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")") + parent.persist(StorageLevel.MEMORY_ONLY_SER) + def windowTime: Time = _windowTime override def dependencies = List(parent) |