aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorhaoyuan <haoyuan@eecs.berkeley.edu>2012-09-07 02:18:33 +0000
committerhaoyuan <haoyuan@eecs.berkeley.edu>2012-09-07 02:18:33 +0000
commit0681bbc5d9e353e6400087d36a758e89d423bca3 (patch)
tree852c245440dd80f0f0fd52183479593cde80eac4 /streaming
parentdb08a362aae68682f9105f9e5568bc9b9d9faaab (diff)
parent4a7bde6865cf22af060f20a9619c516b811c80f2 (diff)
downloadspark-0681bbc5d9e353e6400087d36a758e89d423bca3.tar.gz
spark-0681bbc5d9e353e6400087d36a758e89d423bca3.tar.bz2
spark-0681bbc5d9e353e6400087d36a758e89d423bca3.zip
Merge branch 'dev' of github.com:radlab/spark into dev
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala26
-rw-r--r--streaming/src/main/scala/spark/streaming/Interval.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala221
-rw-r--r--streaming/src/main/scala/spark/streaming/Scheduler.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/WindowedDStream.scala38
-rw-r--r--streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala67
-rw-r--r--streaming/src/test/scala/spark/streaming/DStreamSuite.scala123
-rw-r--r--streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala68
-rw-r--r--streaming/src/test/scala/spark/streaming/DStreamWindowSuite.scala188
9 files changed, 424 insertions, 313 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 20f1c4db20..3973ca1520 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -143,7 +143,7 @@ extends Logging with Serializable {
/**
* This method generates a SparkStreaming job for the given time
- * and may require to be overriden by subclasses
+ * and may required to be overriden by subclasses
*/
def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
@@ -208,7 +208,7 @@ extends Logging with Serializable {
new TransformedDStream(this, ssc.sc.clean(transformFunc))
}
- def toQueue = {
+ def toBlockingQueue = {
val queue = new ArrayBlockingQueue[RDD[T]](10000)
this.foreachRDD(rdd => {
queue.add(rdd)
@@ -256,6 +256,28 @@ extends Logging with Serializable {
def union(that: DStream[T]) = new UnifiedDStream(Array(this, that))
+ def slice(interval: Interval): Seq[RDD[T]] = {
+ slice(interval.beginTime, interval.endTime)
+ }
+
+ // Get all the RDDs between fromTime to toTime (both included)
+ def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
+
+ val rdds = new ArrayBuffer[RDD[T]]()
+ var time = toTime.floor(slideTime)
+
+
+ while (time >= zeroTime && time >= fromTime) {
+ getOrCompute(time) match {
+ case Some(rdd) => rdds += rdd
+ case None => throw new Exception("Could not get old reduced RDD for time " + time)
+ }
+ time -= slideTime
+ }
+
+ rdds.toSeq
+ }
+
def register() {
ssc.registerOutputStream(this)
}
diff --git a/streaming/src/main/scala/spark/streaming/Interval.scala b/streaming/src/main/scala/spark/streaming/Interval.scala
index 87b8437b3d..ffb7725ac9 100644
--- a/streaming/src/main/scala/spark/streaming/Interval.scala
+++ b/streaming/src/main/scala/spark/streaming/Interval.scala
@@ -9,6 +9,10 @@ case class Interval(beginTime: Time, endTime: Time) {
new Interval(beginTime + time, endTime + time)
}
+ def - (time: Time): Interval = {
+ new Interval(beginTime - time, endTime - time)
+ }
+
def < (that: Interval): Boolean = {
if (this.duration != that.duration) {
throw new Exception("Comparing two intervals with different durations [" + this + ", " + that + "]")
diff --git a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
index 191d264b2b..b0beaba94d 100644
--- a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
@@ -12,7 +12,7 @@ import spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
- parent: DStream[(K, V)],
+ @transient parent: DStream[(K, V)],
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
_windowTime: Time,
@@ -28,9 +28,7 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
throw new Exception("The slide duration of ReducedWindowedDStream (" + _slideTime + ") " +
"must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")")
- val reducedStream = parent.reduceByKey(reduceFunc, partitioner)
- val allowPartialWindows = true
- //reducedStream.persist(StorageLevel.MEMORY_ONLY_DESER_2)
+ @transient val reducedStream = parent.reduceByKey(reduceFunc, partitioner)
override def dependencies = List(reducedStream)
@@ -44,174 +42,95 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
checkpointInterval: Time): DStream[(K,V)] = {
super.persist(storageLevel, checkpointLevel, checkpointInterval)
reducedStream.persist(storageLevel, checkpointLevel, checkpointInterval)
+ this
}
-
+
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
-
- // Notation:
+ val currentTime = validTime
+ val currentWindow = Interval(currentTime - windowTime + parent.slideTime, currentTime)
+ val previousWindow = currentWindow - slideTime
+
+ logDebug("Window time = " + windowTime)
+ logDebug("Slide time = " + slideTime)
+ logDebug("ZeroTime = " + zeroTime)
+ logDebug("Current window = " + currentWindow)
+ logDebug("Previous window = " + previousWindow)
+
// _____________________________
- // | previous window _________|___________________
- // |___________________| current window | --------------> Time
+ // | previous window _________|___________________
+ // |___________________| current window | --------------> Time
// |_____________________________|
- //
+ //
// |________ _________| |________ _________|
// | |
// V V
- // old time steps new time steps
+ // old RDDs new RDDs
//
- def getAdjustedWindow(endTime: Time, windowTime: Time): Interval = {
- val beginTime =
- if (allowPartialWindows && endTime - windowTime < parent.zeroTime) {
- parent.zeroTime
- } else {
- endTime - windowTime
- }
- Interval(beginTime, endTime)
- }
-
- val currentTime = validTime
- val currentWindow = getAdjustedWindow(currentTime, windowTime)
- val previousWindow = getAdjustedWindow(currentTime - slideTime, windowTime)
-
- logInfo("Current window = " + currentWindow)
- logInfo("Slide time = " + slideTime)
- logInfo("Previous window = " + previousWindow)
- logInfo("Parent.zeroTime = " + parent.zeroTime)
-
- if (allowPartialWindows) {
- if (currentTime - slideTime <= parent.zeroTime) {
- reducedStream.getOrCompute(currentTime) match {
- case Some(rdd) => return Some(rdd)
- case None => throw new Exception("Could not get first reduced RDD for time " + currentTime)
- }
- }
- } else {
- if (previousWindow.beginTime < parent.zeroTime) {
- if (currentWindow.beginTime < parent.zeroTime) {
- return None
- } else {
- // If this is the first feasible window, then generate reduced value in the naive manner
- val reducedRDDs = new ArrayBuffer[RDD[(K, V)]]()
- var t = currentWindow.endTime
- while (t > currentWindow.beginTime) {
- reducedStream.getOrCompute(t) match {
- case Some(rdd) => reducedRDDs += rdd
- case None => throw new Exception("Could not get reduced RDD for time " + t)
- }
- t -= reducedStream.slideTime
- }
- if (reducedRDDs.size == 0) {
- throw new Exception("Could not generate the first RDD for time " + validTime)
- }
- return Some(new UnionRDD(ssc.sc, reducedRDDs).reduceByKey(partitioner, reduceFunc))
- }
- }
- }
-
- // Get the RDD of the reduced value of the previous window
- val previousWindowRDD = getOrCompute(previousWindow.endTime) match {
- case Some(rdd) => rdd.asInstanceOf[RDD[(_, _)]]
- case None => throw new Exception("Could not get previous RDD for time " + previousWindow.endTime)
- }
- val oldRDDs = new ArrayBuffer[RDD[(_, _)]]()
- val newRDDs = new ArrayBuffer[RDD[(_, _)]]()
-
// Get the RDDs of the reduced values in "old time steps"
- var t = currentWindow.beginTime
- while (t > previousWindow.beginTime) {
- reducedStream.getOrCompute(t) match {
- case Some(rdd) => oldRDDs += rdd.asInstanceOf[RDD[(_, _)]]
- case None => throw new Exception("Could not get old reduced RDD for time " + t)
- }
- t -= reducedStream.slideTime
- }
+ val oldRDDs = reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideTime)
+ logDebug("# old RDDs = " + oldRDDs.size)
// Get the RDDs of the reduced values in "new time steps"
- t = currentWindow.endTime
- while (t > previousWindow.endTime) {
- reducedStream.getOrCompute(t) match {
- case Some(rdd) => newRDDs += rdd.asInstanceOf[RDD[(_, _)]]
- case None => throw new Exception("Could not get new reduced RDD for time " + t)
- }
- t -= reducedStream.slideTime
+ val newRDDs = reducedStream.slice(previousWindow.endTime + parent.slideTime, currentWindow.endTime)
+ logDebug("# new RDDs = " + newRDDs.size)
+
+ // Get the RDD of the reduced value of the previous window
+ val previousWindowRDD = getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K,V)]()))
+
+ // Make the list of RDDs that needs to cogrouped together for reducing their reduced values
+ val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs
+
+ // Cogroup the reduced RDDs and merge the reduced values
+ val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(_, _)]]], partitioner)
+ val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _
+ val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K,Seq[Seq[V]])]].mapValues(mergeValuesFunc)
+
+ Some(mergedValuesRDD)
+ }
+
+ def mergeValues(numOldValues: Int, numNewValues: Int)(seqOfValues: Seq[Seq[V]]): V = {
+
+ if (seqOfValues.size != 1 + numOldValues + numNewValues) {
+ throw new Exception("Unexpected number of sequences of reduced values")
}
- val allRDDs = new ArrayBuffer[RDD[(_, _)]]()
- allRDDs += previousWindowRDD
- allRDDs ++= oldRDDs
- allRDDs ++= newRDDs
-
-
- val numOldRDDs = oldRDDs.size
- val numNewRDDs = newRDDs.size
- logInfo("Generated numOldRDDs = " + numOldRDDs + ", numNewRDDs = " + numNewRDDs)
- logInfo("Generating CoGroupedRDD with " + allRDDs.size + " RDDs")
- val newRDD = new CoGroupedRDD[K](allRDDs.toSeq, partitioner).asInstanceOf[RDD[(K,Seq[Seq[V]])]].map(x => {
- val (key, value) = x
- logDebug("value.size = " + value.size + ", numOldRDDs = " + numOldRDDs + ", numNewRDDs = " + numNewRDDs)
- if (value.size != 1 + numOldRDDs + numNewRDDs) {
- throw new Exception("Number of groups not odd!")
- }
+ // Getting reduced values "old time steps" that will be removed from current window
+ val oldValues = (1 to numOldValues).map(i => seqOfValues(i)).filter(!_.isEmpty).map(_.head)
+
+ // Getting reduced values "new time steps"
+ val newValues = (1 to numNewValues).map(i => seqOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head)
- // old values = reduced values of the "old time steps" that are eliminated from current window
- // new values = reduced values of the "new time steps" that are introduced to the current window
- // previous value = reduced value of the previous window
-
- /*val numOldValues = (value.size - 1) / 2*/
- // Getting reduced values "old time steps"
- val oldValues =
- (0 until numOldRDDs).map(i => value(1 + i)).filter(_.size > 0).map(x => x(0))
- // Getting reduced values "new time steps"
- val newValues =
- (0 until numNewRDDs).map(i => value(1 + numOldRDDs + i)).filter(_.size > 0).map(x => x(0))
-
- // If reduced value for the key does not exist in previous window, it should not exist in "old time steps"
- if (value(0).size == 0 && oldValues.size != 0) {
- throw new Exception("Unexpected: Key exists in old reduced values but not in previous reduced values")
+ if (seqOfValues(0).isEmpty) {
+
+ // If previous window's reduce value does not exist, then at least new values should exist
+ if (newValues.isEmpty) {
+ throw new Exception("Neither previous window has value for key, nor new values found")
}
- // For the key, at least one of "old time steps", "new time steps" and previous window should have reduced values
- if (value(0).size == 0 && oldValues.size == 0 && newValues.size == 0) {
- throw new Exception("Unexpected: Key does not exist in any of old, new, or previour reduced values")
+ // Reduce the new values
+ // println("new values = " + newValues.map(_.toString).reduce(_ + " " + _))
+ return newValues.reduce(reduceFunc)
+ } else {
+
+ // Get the previous window's reduced value
+ var tempValue = seqOfValues(0).head
+
+ // If old values exists, then inverse reduce then from previous value
+ if (!oldValues.isEmpty) {
+ // println("old values = " + oldValues.map(_.toString).reduce(_ + " " + _))
+ tempValue = invReduceFunc(tempValue, oldValues.reduce(reduceFunc))
}
- // Logic to generate the final reduced value for current window:
- //
- // If previous window did not have reduced value for the key
- // Then, return reduced value of "new time steps" as the final value
- // Else, reduced value exists in previous window
- // If "old" time steps did not have reduced value for the key
- // Then, reduce previous window's reduced value with that of "new time steps" for final value
- // Else, reduced values exists in "old time steps"
- // If "new values" did not have reduced value for the key
- // Then, inverse-reduce "old values" from previous window's reduced value for final value
- // Else, all 3 values exist, combine all of them together
- //
- logDebug("# old values = " + oldValues.size + ", # new values = " + newValues)
- val finalValue = {
- if (value(0).size == 0) {
- newValues.reduce(reduceFunc)
- } else {
- val prevValue = value(0)(0)
- logDebug("prev value = " + prevValue)
- if (oldValues.size == 0) {
- // assuming newValue.size > 0 (all 3 cannot be zero, as checked earlier)
- val temp = newValues.reduce(reduceFunc)
- reduceFunc(prevValue, temp)
- } else if (newValues.size == 0) {
- invReduceFunc(prevValue, oldValues.reduce(reduceFunc))
- } else {
- val tempValue = invReduceFunc(prevValue, oldValues.reduce(reduceFunc))
- reduceFunc(tempValue, newValues.reduce(reduceFunc))
- }
- }
+ // If new values exists, then reduce them with previous value
+ if (!newValues.isEmpty) {
+ // println("new values = " + newValues.map(_.toString).reduce(_ + " " + _))
+ tempValue = reduceFunc(tempValue, newValues.reduce(reduceFunc))
}
- (key, finalValue)
- })
- //newRDD.persist(StorageLevel.MEMORY_ONLY_DESER_2)
- Some(newRDD)
+ // println("final value = " + tempValue)
+ return tempValue
+ }
}
}
diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala
index 12e52bf56c..00136685d5 100644
--- a/streaming/src/main/scala/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala
@@ -40,7 +40,7 @@ extends Logging {
}
def generateRDDs (time: Time) {
- println("\n-----------------------------------------------------\n")
+ logInfo("\n-----------------------------------------------------\n")
logInfo("Generating RDDs for time " + time)
outputStreams.foreach(outputStream => {
outputStream.generateJob(time) match {
diff --git a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala b/streaming/src/main/scala/spark/streaming/WindowedDStream.scala
index 6c791fcfc1..93c1291691 100644
--- a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/WindowedDStream.scala
@@ -1,12 +1,8 @@
package spark.streaming
-import spark.streaming.StreamingContext._
-
import spark.RDD
import spark.UnionRDD
-import spark.SparkContext._
-import scala.collection.mutable.ArrayBuffer
class WindowedDStream[T: ClassManifest](
parent: DStream[T],
@@ -22,8 +18,6 @@ class WindowedDStream[T: ClassManifest](
throw new Exception("The slide duration of WindowedDStream (" + _slideTime + ") " +
"must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")")
- val allowPartialWindows = true
-
override def dependencies = List(parent)
def windowTime: Time = _windowTime
@@ -31,36 +25,8 @@ class WindowedDStream[T: ClassManifest](
override def slideTime: Time = _slideTime
override def compute(validTime: Time): Option[RDD[T]] = {
- val parentRDDs = new ArrayBuffer[RDD[T]]()
- val windowEndTime = validTime
- val windowStartTime = if (allowPartialWindows && windowEndTime - windowTime < parent.zeroTime) {
- parent.zeroTime
- } else {
- windowEndTime - windowTime
- }
-
- logInfo("Window = " + windowStartTime + " - " + windowEndTime)
- logInfo("Parent.zeroTime = " + parent.zeroTime)
-
- if (windowStartTime >= parent.zeroTime) {
- // Walk back through time, from the 'windowEndTime' to 'windowStartTime'
- // and get all parent RDDs from the parent DStream
- var t = windowEndTime
- while (t > windowStartTime) {
- parent.getOrCompute(t) match {
- case Some(rdd) => parentRDDs += rdd
- case None => throw new Exception("Could not generate parent RDD for time " + t)
- }
- t -= parent.slideTime
- }
- }
-
- // Do a union of all parent RDDs to generate the new RDD
- if (parentRDDs.size > 0) {
- Some(new UnionRDD(ssc.sc, parentRDDs))
- } else {
- None
- }
+ val currentWindow = Interval(validTime - windowTime + parent.slideTime, validTime)
+ Some(new UnionRDD(ssc.sc, parent.slice(currentWindow)))
}
}
diff --git a/streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala b/streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala
new file mode 100644
index 0000000000..9b953d9dae
--- /dev/null
+++ b/streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala
@@ -0,0 +1,67 @@
+package spark.streaming
+
+import spark.streaming.StreamingContext._
+import scala.runtime.RichInt
+
+class DStreamBasicSuite extends DStreamSuiteBase {
+
+ test("map-like operations") {
+ val input = Seq(1 to 4, 5 to 8, 9 to 12)
+
+ // map
+ testOperation(input, (r: DStream[Int]) => r.map(_.toString), input.map(_.map(_.toString)))
+
+ // flatMap
+ testOperation(
+ input,
+ (r: DStream[Int]) => r.flatMap(x => Seq(x, x * 2)),
+ input.map(_.flatMap(x => Array(x, x * 2)))
+ )
+ }
+
+ test("shuffle-based operations") {
+ // reduceByKey
+ testOperation(
+ Seq(Seq("a", "a", "b"), Seq("", ""), Seq()),
+ (s: DStream[String]) => s.map(x => (x, 1)).reduceByKey(_ + _),
+ Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
+ true
+ )
+
+ // reduce
+ testOperation(
+ Seq(1 to 4, 5 to 8, 9 to 12),
+ (s: DStream[Int]) => s.reduce(_ + _),
+ Seq(Seq(10), Seq(26), Seq(42))
+ )
+ }
+
+ test("stateful operations") {
+ val inputData =
+ Seq(
+ Seq("a", "b", "c"),
+ Seq("a", "b", "c"),
+ Seq("a", "b", "c")
+ )
+
+ val outputData =
+ Seq(
+ Seq(("a", 1), ("b", 1), ("c", 1)),
+ Seq(("a", 2), ("b", 2), ("c", 2)),
+ Seq(("a", 3), ("b", 3), ("c", 3))
+ )
+
+ val updateStateOp = (s: DStream[String]) => {
+ val updateFunc = (values: Seq[Int], state: RichInt) => {
+ var newState = 0
+ if (values != null) newState += values.reduce(_ + _)
+ if (state != null) newState += state.self
+ //println("values = " + values + ", state = " + state + ", " + " new state = " + newState)
+ new RichInt(newState)
+ }
+ s.map(x => (x, 1)).updateStateByKey[RichInt](updateFunc).map(t => (t._1, t._2.self))
+ }
+
+ testOperation(inputData, updateStateOp, outputData, true)
+ }
+}
diff --git a/streaming/src/test/scala/spark/streaming/DStreamSuite.scala b/streaming/src/test/scala/spark/streaming/DStreamSuite.scala
deleted file mode 100644
index fc00952afe..0000000000
--- a/streaming/src/test/scala/spark/streaming/DStreamSuite.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-package spark.streaming
-
-import spark.Logging
-import spark.streaming.StreamingContext._
-import spark.streaming.util.ManualClock
-
-import org.scalatest.FunSuite
-import org.scalatest.BeforeAndAfter
-
-import scala.collection.mutable.ArrayBuffer
-import scala.runtime.RichInt
-
-class DStreamSuite extends FunSuite with BeforeAndAfter with Logging {
-
- var ssc: StreamingContext = null
- val batchDurationMillis = 1000
-
- System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
-
- def testOp[U: ClassManifest, V: ClassManifest](
- input: Seq[Seq[U]],
- operation: DStream[U] => DStream[V],
- expectedOutput: Seq[Seq[V]],
- useSet: Boolean = false
- ) {
- try {
- ssc = new StreamingContext("local", "test")
- ssc.setBatchDuration(Milliseconds(batchDurationMillis))
-
- val inputStream = ssc.createQueueStream(input.map(ssc.sc.makeRDD(_, 2)).toIterator)
- val outputStream = operation(inputStream)
- val outputQueue = outputStream.toQueue
-
- ssc.start()
- val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- clock.addToTime(input.size * batchDurationMillis)
-
- Thread.sleep(1000)
-
- val output = new ArrayBuffer[Seq[V]]()
- while(outputQueue.size > 0) {
- val rdd = outputQueue.take()
- output += (rdd.collect())
- }
-
- assert(output.size === expectedOutput.size)
- for (i <- 0 until output.size) {
- if (useSet) {
- assert(output(i).toSet === expectedOutput(i).toSet)
- } else {
- assert(output(i).toList === expectedOutput(i).toList)
- }
- }
- } finally {
- ssc.stop()
- }
- }
-
- test("map-like operations") {
- val inputData = Seq(1 to 4, 5 to 8, 9 to 12)
-
- // map
- testOp(inputData, (r: DStream[Int]) => r.map(_.toString), inputData.map(_.map(_.toString)))
-
- // flatMap
- testOp(
- inputData,
- (r: DStream[Int]) => r.flatMap(x => Seq(x, x * 2)),
- inputData.map(_.flatMap(x => Array(x, x * 2)))
- )
- }
-
- test("shuffle-based operations") {
- // reduceByKey
- testOp(
- Seq(Seq("a", "a", "b"), Seq("", ""), Seq()),
- (s: DStream[String]) => s.map(x => (x, 1)).reduceByKey(_ + _),
- Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
- true
- )
-
- // reduce
- testOp(
- Seq(1 to 4, 5 to 8, 9 to 12),
- (s: DStream[Int]) => s.reduce(_ + _),
- Seq(Seq(10), Seq(26), Seq(42))
- )
- }
-
- test("window-based operations") {
-
- }
-
-
- test("stateful operations") {
- val inputData =
- Seq(
- Seq("a", "b", "c"),
- Seq("a", "b", "c"),
- Seq("a", "b", "c")
- )
-
- val outputData =
- Seq(
- Seq(("a", 1), ("b", 1), ("c", 1)),
- Seq(("a", 2), ("b", 2), ("c", 2)),
- Seq(("a", 3), ("b", 3), ("c", 3))
- )
-
- val updateStateOp = (s: DStream[String]) => {
- val updateFunc = (values: Seq[Int], state: RichInt) => {
- var newState = 0
- if (values != null) newState += values.reduce(_ + _)
- if (state != null) newState += state.self
- println("values = " + values + ", state = " + state + ", " + " new state = " + newState)
- new RichInt(newState)
- }
- s.map(x => (x, 1)).updateStateByKey[RichInt](updateFunc).map(t => (t._1, t._2.self))
- }
-
- testOp(inputData, updateStateOp, outputData, true)
- }
-}
diff --git a/streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala b/streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala
new file mode 100644
index 0000000000..1c4ea14b1d
--- /dev/null
+++ b/streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala
@@ -0,0 +1,68 @@
+package spark.streaming
+
+import spark.{RDD, Logging}
+import util.ManualClock
+import collection.mutable.ArrayBuffer
+import org.scalatest.FunSuite
+import scala.collection.mutable.Queue
+
+
+trait DStreamSuiteBase extends FunSuite with Logging {
+
+ def batchDuration() = Seconds(1)
+
+ def maxWaitTimeMillis() = 10000
+
+ def testOperation[U: ClassManifest, V: ClassManifest](
+ input: Seq[Seq[U]],
+ operation: DStream[U] => DStream[V],
+ expectedOutput: Seq[Seq[V]],
+ useSet: Boolean = false
+ ) {
+
+ val manualClock = true
+
+ if (manualClock) {
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
+ }
+
+ val ssc = new StreamingContext("local", "test")
+
+ try {
+ ssc.setBatchDuration(Milliseconds(batchDuration))
+
+ val inputQueue = new Queue[RDD[U]]()
+ inputQueue ++= input.map(ssc.sc.makeRDD(_, 2))
+ val emptyRDD = ssc.sc.makeRDD(Seq[U](), 2)
+
+ val inputStream = ssc.createQueueStream(inputQueue, true, emptyRDD)
+ val outputStream = operation(inputStream)
+
+ val output = new ArrayBuffer[Seq[V]]()
+ outputStream.foreachRDD(rdd => output += rdd.collect())
+
+ ssc.start()
+
+ val clock = ssc.scheduler.clock
+ if (clock.isInstanceOf[ManualClock]) {
+ clock.asInstanceOf[ManualClock].addToTime(input.size * batchDuration.milliseconds)
+ }
+
+ val startTime = System.currentTimeMillis()
+ while (output.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
+ Thread.sleep(500)
+ }
+
+ assert(output.size === expectedOutput.size)
+ for (i <- 0 until output.size) {
+ if (useSet) {
+ assert(output(i).toSet === expectedOutput(i).toSet)
+ } else {
+ assert(output(i).toList === expectedOutput(i).toList)
+ }
+ }
+ } finally {
+ ssc.stop()
+ }
+ }
+}
diff --git a/streaming/src/test/scala/spark/streaming/DStreamWindowSuite.scala b/streaming/src/test/scala/spark/streaming/DStreamWindowSuite.scala
new file mode 100644
index 0000000000..061cab2cbb
--- /dev/null
+++ b/streaming/src/test/scala/spark/streaming/DStreamWindowSuite.scala
@@ -0,0 +1,188 @@
+package spark.streaming
+
+import spark.streaming.StreamingContext._
+
+class DStreamWindowSuite extends DStreamSuiteBase {
+
+ val largerSlideInput = Seq(
+ Seq(("a", 1)), // 1st window from here
+ Seq(("a", 2)),
+ Seq(("a", 3)), // 2nd window from here
+ Seq(("a", 4)),
+ Seq(("a", 5)), // 3rd window from here
+ Seq(("a", 6)),
+ Seq(), // 4th window from here
+ Seq(),
+ Seq() // 5th window from here
+ )
+
+ val largerSlideOutput = Seq(
+ Seq(("a", 1)),
+ Seq(("a", 6)),
+ Seq(("a", 14)),
+ Seq(("a", 15)),
+ Seq(("a", 6))
+ )
+
+ val bigInput = Seq(
+ Seq(("a", 1)),
+ Seq(("a", 1), ("b", 1)),
+ Seq(("a", 1), ("b", 1), ("c", 1)),
+ Seq(("a", 1), ("b", 1)),
+ Seq(("a", 1)),
+ Seq(),
+ Seq(("a", 1)),
+ Seq(("a", 1), ("b", 1)),
+ Seq(("a", 1), ("b", 1), ("c", 1)),
+ Seq(("a", 1), ("b", 1)),
+ Seq(("a", 1)),
+ Seq()
+ )
+
+ val bigOutput = Seq(
+ Seq(("a", 1)),
+ Seq(("a", 2), ("b", 1)),
+ Seq(("a", 2), ("b", 2), ("c", 1)),
+ Seq(("a", 2), ("b", 2), ("c", 1)),
+ Seq(("a", 2), ("b", 1)),
+ Seq(("a", 1)),
+ Seq(("a", 1)),
+ Seq(("a", 2), ("b", 1)),
+ Seq(("a", 2), ("b", 2), ("c", 1)),
+ Seq(("a", 2), ("b", 2), ("c", 1)),
+ Seq(("a", 2), ("b", 1)),
+ Seq(("a", 1))
+ )
+
+ /*
+ The output of the reduceByKeyAndWindow with inverse reduce function is
+ difference from the naive reduceByKeyAndWindow. Even if the count of a
+ particular key is 0, the key does not get eliminated from the RDDs of
+ ReducedWindowedDStream. This causes the number of keys in these RDDs to
+ increase forever. A more generalized version that allows elimination of
+ keys should be considered.
+ */
+ val bigOutputInv = Seq(
+ Seq(("a", 1)),
+ Seq(("a", 2), ("b", 1)),
+ Seq(("a", 2), ("b", 2), ("c", 1)),
+ Seq(("a", 2), ("b", 2), ("c", 1)),
+ Seq(("a", 2), ("b", 1), ("c", 0)),
+ Seq(("a", 1), ("b", 0), ("c", 0)),
+ Seq(("a", 1), ("b", 0), ("c", 0)),
+ Seq(("a", 2), ("b", 1), ("c", 0)),
+ Seq(("a", 2), ("b", 2), ("c", 1)),
+ Seq(("a", 2), ("b", 2), ("c", 1)),
+ Seq(("a", 2), ("b", 1), ("c", 0)),
+ Seq(("a", 1), ("b", 0), ("c", 0))
+ )
+
+ def testReduceByKeyAndWindow(
+ name: String,
+ input: Seq[Seq[(String, Int)]],
+ expectedOutput: Seq[Seq[(String, Int)]],
+ windowTime: Time = Seconds(2),
+ slideTime: Time = Seconds(1)
+ ) {
+ test("reduceByKeyAndWindow - " + name) {
+ testOperation(
+ input,
+ (s: DStream[(String, Int)]) => s.reduceByKeyAndWindow(_ + _, windowTime, slideTime).persist(),
+ expectedOutput,
+ true
+ )
+ }
+ }
+
+ def testReduceByKeyAndWindowInv(
+ name: String,
+ input: Seq[Seq[(String, Int)]],
+ expectedOutput: Seq[Seq[(String, Int)]],
+ windowTime: Time = Seconds(2),
+ slideTime: Time = Seconds(1)
+ ) {
+ test("reduceByKeyAndWindowInv - " + name) {
+ testOperation(
+ input,
+ (s: DStream[(String, Int)]) => s.reduceByKeyAndWindow(_ + _, _ - _, windowTime, slideTime).persist(),
+ expectedOutput,
+ true
+ )
+ }
+ }
+
+
+ // Testing naive reduceByKeyAndWindow (without invertible function)
+
+ testReduceByKeyAndWindow(
+ "basic reduction",
+ Seq(Seq(("a", 1), ("a", 3)) ),
+ Seq(Seq(("a", 4)) )
+ )
+
+ testReduceByKeyAndWindow(
+ "key already in window and new value added into window",
+ Seq( Seq(("a", 1)), Seq(("a", 1)) ),
+ Seq( Seq(("a", 1)), Seq(("a", 2)) )
+ )
+
+
+ testReduceByKeyAndWindow(
+ "new key added into window",
+ Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 1)) ),
+ Seq( Seq(("a", 1)), Seq(("a", 2), ("b", 1)) )
+ )
+
+ testReduceByKeyAndWindow(
+ "key removed from window",
+ Seq( Seq(("a", 1)), Seq(("a", 1)), Seq(), Seq() ),
+ Seq( Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 1)), Seq() )
+ )
+
+ testReduceByKeyAndWindow(
+ "larger slide time",
+ largerSlideInput,
+ largerSlideOutput,
+ Seconds(4),
+ Seconds(2)
+ )
+
+ testReduceByKeyAndWindow("big test", bigInput, bigOutput)
+
+
+ // Testing reduceByKeyAndWindow (with invertible reduce function)
+
+ testReduceByKeyAndWindowInv(
+ "basic reduction",
+ Seq(Seq(("a", 1), ("a", 3)) ),
+ Seq(Seq(("a", 4)) )
+ )
+
+ testReduceByKeyAndWindowInv(
+ "key already in window and new value added into window",
+ Seq( Seq(("a", 1)), Seq(("a", 1)) ),
+ Seq( Seq(("a", 1)), Seq(("a", 2)) )
+ )
+
+ testReduceByKeyAndWindowInv(
+ "new key added into window",
+ Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 1)) ),
+ Seq( Seq(("a", 1)), Seq(("a", 2), ("b", 1)) )
+ )
+
+ testReduceByKeyAndWindowInv(
+ "key removed from window",
+ Seq( Seq(("a", 1)), Seq(("a", 1)), Seq(), Seq() ),
+ Seq( Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 1)), Seq(("a", 0)) )
+ )
+
+ testReduceByKeyAndWindowInv(
+ "larger slide time",
+ largerSlideInput,
+ largerSlideOutput,
+ Seconds(4),
+ Seconds(2)
+ )
+
+ testReduceByKeyAndWindowInv("big test", bigInput, bigOutputInv)
+}