diff options
6 files changed, 14 insertions, 5 deletions
diff --git a/conf/streaming-env.sh.template b/conf/streaming-env.sh.template index 6b4094c515..1ea9ba5541 100755 --- a/conf/streaming-env.sh.template +++ b/conf/streaming-env.sh.template @@ -11,7 +11,7 @@ SPARK_JAVA_OPTS+=" -XX:+UseConcMarkSweepGC" -# Using of Kryo serialization can improve serialization performance +# Using Kryo serialization can improve serialization performance # and therefore the throughput of the Spark Streaming programs. However, # using Kryo serialization with custom classes may required you to # register the classes with Kryo. Refer to the Spark documentation diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 6af8c377b5..8af6c9bd6a 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -222,12 +222,13 @@ abstract class RDD[T: ClassManifest]( rdd.checkpointFile = new Path(context.checkpointDir, "rdd-" + id).toString rdd.saveAsObjectFile(checkpointFile) rdd.synchronized { - rdd.checkpointRDD = context.objectFile[T](checkpointFile) + rdd.checkpointRDD = context.objectFile[T](checkpointFile, rdd.splits.size) rdd.checkpointRDDSplits = rdd.checkpointRDD.splits rdd.changeDependencies(rdd.checkpointRDD) rdd.shouldCheckpoint = false rdd.isCheckpointInProgress = false rdd.isCheckpointed = true + println("Done checkpointing RDD " + rdd.id + ", " + rdd) } } } diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 13770aa8fd..26d5ce9198 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -321,7 +321,8 @@ extends Serializable with Logging { } } } - logInfo("Updated checkpoint data for time " + currentTime) + logInfo("Updated checkpoint data for time " + currentTime + ", " + checkpointData.size + " checkpoints, " + + "[" + checkpointData.mkString(",") + "]") } /** diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala index e09d27d34f..720e63bba0 100644 --- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala @@ -4,6 +4,7 @@ import spark.streaming.StreamingContext._ import spark.{Manifests, RDD, Partitioner, HashPartitioner} import spark.SparkContext._ +import spark.storage.StorageLevel import scala.collection.mutable.ArrayBuffer @@ -115,7 +116,10 @@ extends Serializable { slideTime: Time, partitioner: Partitioner ): DStream[(K, V)] = { - self.window(windowTime, slideTime).reduceByKey(ssc.sc.clean(reduceFunc), partitioner) + val cleanedReduceFunc = ssc.sc.clean(reduceFunc) + self.reduceByKey(cleanedReduceFunc, partitioner) + .window(windowTime, slideTime) + .reduceByKey(cleanedReduceFunc, partitioner) } // This method is the efficient sliding window reduce operation, diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala index e2dca91179..014021be61 100644 --- a/streaming/src/main/scala/spark/streaming/Scheduler.scala +++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala @@ -17,7 +17,7 @@ extends Logging { val graph = ssc.graph - val concurrentJobs = System.getProperty("spark.stream.concurrentJobs", "1").toInt + val concurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt val jobManager = new JobManager(ssc, concurrentJobs) val checkpointWriter = if (ssc.checkpointInterval != null && ssc.checkpointDir != null) { diff --git a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala b/streaming/src/main/scala/spark/streaming/WindowedDStream.scala index ce89a3f99b..e4d2a634f5 100644 --- a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/WindowedDStream.scala @@ -2,6 +2,7 @@ package spark.streaming import spark.RDD import spark.rdd.UnionRDD +import spark.storage.StorageLevel class WindowedDStream[T: ClassManifest]( @@ -18,6 +19,8 @@ class WindowedDStream[T: ClassManifest]( throw new Exception("The slide duration of WindowedDStream (" + _slideTime + ") " + "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")") + parent.persist(StorageLevel.MEMORY_ONLY_SER) + def windowTime: Time = _windowTime override def dependencies = List(parent) |