aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-09-06 19:06:59 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2012-09-06 19:06:59 -0700
commit4a7bde6865cf22af060f20a9619c516b811c80f2 (patch)
tree5b49ecb45facfa639c7d1d7c68cd52881baa8862 /streaming
parent203ac8fa8bbda9fe477a2ac17b4ec7ce94d48fc8 (diff)
downloadspark-4a7bde6865cf22af060f20a9619c516b811c80f2.tar.gz
spark-4a7bde6865cf22af060f20a9619c516b811c80f2.tar.bz2
spark-4a7bde6865cf22af060f20a9619c516b811c80f2.zip
Fixed bugs and added testcases for naive reduceByKeyAndWindow.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala6
-rw-r--r--streaming/src/main/scala/spark/streaming/Scheduler.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/WindowedDStream.scala38
-rw-r--r--streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/DStreamWindowSuite.scala179
5 files changed, 140 insertions, 87 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 50b9458fae..3973ca1520 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -256,11 +256,17 @@ extends Logging with Serializable {
def union(that: DStream[T]) = new UnifiedDStream(Array(this, that))
+ def slice(interval: Interval): Seq[RDD[T]] = {
+ slice(interval.beginTime, interval.endTime)
+ }
+
+ // Get all the RDDs between fromTime to toTime (both included)
def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
val rdds = new ArrayBuffer[RDD[T]]()
var time = toTime.floor(slideTime)
+
while (time >= zeroTime && time >= fromTime) {
getOrCompute(time) match {
case Some(rdd) => rdds += rdd
diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala
index 12e52bf56c..00136685d5 100644
--- a/streaming/src/main/scala/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala
@@ -40,7 +40,7 @@ extends Logging {
}
def generateRDDs (time: Time) {
- println("\n-----------------------------------------------------\n")
+ logInfo("\n-----------------------------------------------------\n")
logInfo("Generating RDDs for time " + time)
outputStreams.foreach(outputStream => {
outputStream.generateJob(time) match {
diff --git a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala b/streaming/src/main/scala/spark/streaming/WindowedDStream.scala
index 6c791fcfc1..93c1291691 100644
--- a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/WindowedDStream.scala
@@ -1,12 +1,8 @@
package spark.streaming
-import spark.streaming.StreamingContext._
-
import spark.RDD
import spark.UnionRDD
-import spark.SparkContext._
-import scala.collection.mutable.ArrayBuffer
class WindowedDStream[T: ClassManifest](
parent: DStream[T],
@@ -22,8 +18,6 @@ class WindowedDStream[T: ClassManifest](
throw new Exception("The slide duration of WindowedDStream (" + _slideTime + ") " +
"must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")")
- val allowPartialWindows = true
-
override def dependencies = List(parent)
def windowTime: Time = _windowTime
@@ -31,36 +25,8 @@ class WindowedDStream[T: ClassManifest](
override def slideTime: Time = _slideTime
override def compute(validTime: Time): Option[RDD[T]] = {
- val parentRDDs = new ArrayBuffer[RDD[T]]()
- val windowEndTime = validTime
- val windowStartTime = if (allowPartialWindows && windowEndTime - windowTime < parent.zeroTime) {
- parent.zeroTime
- } else {
- windowEndTime - windowTime
- }
-
- logInfo("Window = " + windowStartTime + " - " + windowEndTime)
- logInfo("Parent.zeroTime = " + parent.zeroTime)
-
- if (windowStartTime >= parent.zeroTime) {
- // Walk back through time, from the 'windowEndTime' to 'windowStartTime'
- // and get all parent RDDs from the parent DStream
- var t = windowEndTime
- while (t > windowStartTime) {
- parent.getOrCompute(t) match {
- case Some(rdd) => parentRDDs += rdd
- case None => throw new Exception("Could not generate parent RDD for time " + t)
- }
- t -= parent.slideTime
- }
- }
-
- // Do a union of all parent RDDs to generate the new RDD
- if (parentRDDs.size > 0) {
- Some(new UnionRDD(ssc.sc, parentRDDs))
- } else {
- None
- }
+ val currentWindow = Interval(validTime - windowTime + parent.slideTime, validTime)
+ Some(new UnionRDD(ssc.sc, parent.slice(currentWindow)))
}
}
diff --git a/streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala b/streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala
index 2634c9b405..9b953d9dae 100644
--- a/streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala
@@ -56,7 +56,7 @@ class DStreamBasicSuite extends DStreamSuiteBase {
var newState = 0
if (values != null) newState += values.reduce(_ + _)
if (state != null) newState += state.self
- println("values = " + values + ", state = " + state + ", " + " new state = " + newState)
+ //println("values = " + values + ", state = " + state + ", " + " new state = " + newState)
new RichInt(newState)
}
s.map(x => (x, 1)).updateStateByKey[RichInt](updateFunc).map(t => (t._1, t._2.self))
diff --git a/streaming/src/test/scala/spark/streaming/DStreamWindowSuite.scala b/streaming/src/test/scala/spark/streaming/DStreamWindowSuite.scala
index c0e054418c..061cab2cbb 100644
--- a/streaming/src/test/scala/spark/streaming/DStreamWindowSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/DStreamWindowSuite.scala
@@ -4,47 +4,6 @@ import spark.streaming.StreamingContext._
class DStreamWindowSuite extends DStreamSuiteBase {
- def testReduceByKeyAndWindow(
- name: String,
- input: Seq[Seq[(String, Int)]],
- expectedOutput: Seq[Seq[(String, Int)]],
- windowTime: Time = Seconds(2),
- slideTime: Time = Seconds(1)
- ) {
- test("reduceByKeyAndWindow - " + name) {
- testOperation(
- input,
- (s: DStream[(String, Int)]) => s.reduceByKeyAndWindow(_ + _, _ - _, windowTime, slideTime).persist(),
- expectedOutput,
- true
- )
- }
- }
-
- testReduceByKeyAndWindow(
- "basic reduction",
- Seq(Seq(("a", 1), ("a", 3)) ),
- Seq(Seq(("a", 4)) )
- )
-
- testReduceByKeyAndWindow(
- "key already in window and new value added into window",
- Seq( Seq(("a", 1)), Seq(("a", 1)) ),
- Seq( Seq(("a", 1)), Seq(("a", 2)) )
- )
-
- testReduceByKeyAndWindow(
- "new key added into window",
- Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 1)) ),
- Seq( Seq(("a", 1)), Seq(("a", 2), ("b", 1)) )
- )
-
- testReduceByKeyAndWindow(
- "key removed from window",
- Seq( Seq(("a", 1)), Seq(("a", 1)), Seq(), Seq() ),
- Seq( Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 1)), Seq(("a", 0)) )
- )
-
val largerSlideInput = Seq(
Seq(("a", 1)), // 1st window from here
Seq(("a", 2)),
@@ -65,14 +24,6 @@ class DStreamWindowSuite extends DStreamSuiteBase {
Seq(("a", 6))
)
- testReduceByKeyAndWindow(
- "larger slide time",
- largerSlideInput,
- largerSlideOutput,
- Seconds(4),
- Seconds(2)
- )
-
val bigInput = Seq(
Seq(("a", 1)),
Seq(("a", 1), ("b", 1)),
@@ -93,6 +44,29 @@ class DStreamWindowSuite extends DStreamSuiteBase {
Seq(("a", 2), ("b", 1)),
Seq(("a", 2), ("b", 2), ("c", 1)),
Seq(("a", 2), ("b", 2), ("c", 1)),
+ Seq(("a", 2), ("b", 1)),
+ Seq(("a", 1)),
+ Seq(("a", 1)),
+ Seq(("a", 2), ("b", 1)),
+ Seq(("a", 2), ("b", 2), ("c", 1)),
+ Seq(("a", 2), ("b", 2), ("c", 1)),
+ Seq(("a", 2), ("b", 1)),
+ Seq(("a", 1))
+ )
+
+ /*
+ The output of the reduceByKeyAndWindow with inverse reduce function is
+ difference from the naive reduceByKeyAndWindow. Even if the count of a
+ particular key is 0, the key does not get eliminated from the RDDs of
+ ReducedWindowedDStream. This causes the number of keys in these RDDs to
+ increase forever. A more generalized version that allows elimination of
+ keys should be considered.
+ */
+ val bigOutputInv = Seq(
+ Seq(("a", 1)),
+ Seq(("a", 2), ("b", 1)),
+ Seq(("a", 2), ("b", 2), ("c", 1)),
+ Seq(("a", 2), ("b", 2), ("c", 1)),
Seq(("a", 2), ("b", 1), ("c", 0)),
Seq(("a", 1), ("b", 0), ("c", 0)),
Seq(("a", 1), ("b", 0), ("c", 0)),
@@ -103,5 +77,112 @@ class DStreamWindowSuite extends DStreamSuiteBase {
Seq(("a", 1), ("b", 0), ("c", 0))
)
+ def testReduceByKeyAndWindow(
+ name: String,
+ input: Seq[Seq[(String, Int)]],
+ expectedOutput: Seq[Seq[(String, Int)]],
+ windowTime: Time = Seconds(2),
+ slideTime: Time = Seconds(1)
+ ) {
+ test("reduceByKeyAndWindow - " + name) {
+ testOperation(
+ input,
+ (s: DStream[(String, Int)]) => s.reduceByKeyAndWindow(_ + _, windowTime, slideTime).persist(),
+ expectedOutput,
+ true
+ )
+ }
+ }
+
+ def testReduceByKeyAndWindowInv(
+ name: String,
+ input: Seq[Seq[(String, Int)]],
+ expectedOutput: Seq[Seq[(String, Int)]],
+ windowTime: Time = Seconds(2),
+ slideTime: Time = Seconds(1)
+ ) {
+ test("reduceByKeyAndWindowInv - " + name) {
+ testOperation(
+ input,
+ (s: DStream[(String, Int)]) => s.reduceByKeyAndWindow(_ + _, _ - _, windowTime, slideTime).persist(),
+ expectedOutput,
+ true
+ )
+ }
+ }
+
+
+ // Testing naive reduceByKeyAndWindow (without invertible function)
+
+ testReduceByKeyAndWindow(
+ "basic reduction",
+ Seq(Seq(("a", 1), ("a", 3)) ),
+ Seq(Seq(("a", 4)) )
+ )
+
+ testReduceByKeyAndWindow(
+ "key already in window and new value added into window",
+ Seq( Seq(("a", 1)), Seq(("a", 1)) ),
+ Seq( Seq(("a", 1)), Seq(("a", 2)) )
+ )
+
+
+ testReduceByKeyAndWindow(
+ "new key added into window",
+ Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 1)) ),
+ Seq( Seq(("a", 1)), Seq(("a", 2), ("b", 1)) )
+ )
+
+ testReduceByKeyAndWindow(
+ "key removed from window",
+ Seq( Seq(("a", 1)), Seq(("a", 1)), Seq(), Seq() ),
+ Seq( Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 1)), Seq() )
+ )
+
+ testReduceByKeyAndWindow(
+ "larger slide time",
+ largerSlideInput,
+ largerSlideOutput,
+ Seconds(4),
+ Seconds(2)
+ )
+
testReduceByKeyAndWindow("big test", bigInput, bigOutput)
+
+
+ // Testing reduceByKeyAndWindow (with invertible reduce function)
+
+ testReduceByKeyAndWindowInv(
+ "basic reduction",
+ Seq(Seq(("a", 1), ("a", 3)) ),
+ Seq(Seq(("a", 4)) )
+ )
+
+ testReduceByKeyAndWindowInv(
+ "key already in window and new value added into window",
+ Seq( Seq(("a", 1)), Seq(("a", 1)) ),
+ Seq( Seq(("a", 1)), Seq(("a", 2)) )
+ )
+
+ testReduceByKeyAndWindowInv(
+ "new key added into window",
+ Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 1)) ),
+ Seq( Seq(("a", 1)), Seq(("a", 2), ("b", 1)) )
+ )
+
+ testReduceByKeyAndWindowInv(
+ "key removed from window",
+ Seq( Seq(("a", 1)), Seq(("a", 1)), Seq(), Seq() ),
+ Seq( Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 1)), Seq(("a", 0)) )
+ )
+
+ testReduceByKeyAndWindowInv(
+ "larger slide time",
+ largerSlideInput,
+ largerSlideOutput,
+ Seconds(4),
+ Seconds(2)
+ )
+
+ testReduceByKeyAndWindowInv("big test", bigInput, bigOutputInv)
}