aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-11-19 23:22:07 +0000
committerTathagata Das <tathagata.das1565@gmail.com>2012-11-19 23:22:07 +0000
commitc97ebf64377e853ab7c616a103869a4417f25954 (patch)
treeb915b399fa0369c7398e1916e4b003635b388daa /streaming
parent3fd7b8319ba9886a74fb4abce96bbc88c98472dc (diff)
downloadspark-c97ebf64377e853ab7c616a103869a4417f25954.tar.gz
spark-c97ebf64377e853ab7c616a103869a4417f25954.tar.bz2
spark-c97ebf64377e853ab7c616a103869a4417f25954.zip
Fixed bug in the number of splits in RDD after checkpointing. Modified reduceByKeyAndWindow (naive) computation from window+reduceByKey to reduceByKey+window+reduceByKey.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala3
-rw-r--r--streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala6
-rw-r--r--streaming/src/main/scala/spark/streaming/Scheduler.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/WindowedDStream.scala3
4 files changed, 11 insertions, 3 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 13770aa8fd..26d5ce9198 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -321,7 +321,8 @@ extends Serializable with Logging {
}
}
}
- logInfo("Updated checkpoint data for time " + currentTime)
+ logInfo("Updated checkpoint data for time " + currentTime + ", " + checkpointData.size + " checkpoints, "
+ + "[" + checkpointData.mkString(",") + "]")
}
/**
diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
index e09d27d34f..720e63bba0 100644
--- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
@@ -4,6 +4,7 @@ import spark.streaming.StreamingContext._
import spark.{Manifests, RDD, Partitioner, HashPartitioner}
import spark.SparkContext._
+import spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
@@ -115,7 +116,10 @@ extends Serializable {
slideTime: Time,
partitioner: Partitioner
): DStream[(K, V)] = {
- self.window(windowTime, slideTime).reduceByKey(ssc.sc.clean(reduceFunc), partitioner)
+ val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
+ self.reduceByKey(cleanedReduceFunc, partitioner)
+ .window(windowTime, slideTime)
+ .reduceByKey(cleanedReduceFunc, partitioner)
}
// This method is the efficient sliding window reduce operation,
diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala
index e2dca91179..014021be61 100644
--- a/streaming/src/main/scala/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala
@@ -17,7 +17,7 @@ extends Logging {
val graph = ssc.graph
- val concurrentJobs = System.getProperty("spark.stream.concurrentJobs", "1").toInt
+ val concurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt
val jobManager = new JobManager(ssc, concurrentJobs)
val checkpointWriter = if (ssc.checkpointInterval != null && ssc.checkpointDir != null) {
diff --git a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala b/streaming/src/main/scala/spark/streaming/WindowedDStream.scala
index ce89a3f99b..e4d2a634f5 100644
--- a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/WindowedDStream.scala
@@ -2,6 +2,7 @@ package spark.streaming
import spark.RDD
import spark.rdd.UnionRDD
+import spark.storage.StorageLevel
class WindowedDStream[T: ClassManifest](
@@ -18,6 +19,8 @@ class WindowedDStream[T: ClassManifest](
throw new Exception("The slide duration of WindowedDStream (" + _slideTime + ") " +
"must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")")
+ parent.persist(StorageLevel.MEMORY_ONLY_SER)
+
def windowTime: Time = _windowTime
override def dependencies = List(parent)