aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-11-02 12:12:25 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2012-11-02 12:12:25 -0700
commit3fb5c9ee24302edf02df130bd0dfd0463cf6c0a4 (patch)
treeb3bbaff1da74829272644d52bd61ed8a414c286a /streaming
parent1b900183c8bb4063d8ae7bd5134fdadd52b3a155 (diff)
downloadspark-3fb5c9ee24302edf02df130bd0dfd0463cf6c0a4.tar.gz
spark-3fb5c9ee24302edf02df130bd0dfd0463cf6c0a4.tar.bz2
spark-3fb5c9ee24302edf02df130bd0dfd0463cf6c0a4.zip
Fixed serialization bug in countByWindow, added countByKey and countByKeyAndWindow, and added testcases for them.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala23
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala18
-rw-r--r--streaming/src/test/scala/spark/streaming/TestSuiteBase.scala8
-rw-r--r--streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala181
5 files changed, 186 insertions, 48 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 175ebf104f..a4921bb1a2 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -338,9 +338,7 @@ extends Serializable with Logging {
}
def countByWindow(windowTime: Time, slideTime: Time): DStream[Int] = {
- def add(v1: Int, v2: Int) = (v1 + v2)
- def subtract(v1: Int, v2: Int) = (v1 - v2)
- this.map(_ => 1).reduceByWindow(add _, subtract _, windowTime, slideTime)
+ this.map(_ => 1).reduceByWindow(_ + _, _ - _, windowTime, slideTime)
}
def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that))
diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
index f88247708b..e09d27d34f 100644
--- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
@@ -53,14 +53,18 @@ extends Serializable {
combineByKey((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, partitioner)
}
- private def combineByKey[C: ClassManifest](
+ def combineByKey[C: ClassManifest](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiner: (C, C) => C,
- partitioner: Partitioner) : ShuffledDStream[K, V, C] = {
+ partitioner: Partitioner) : DStream[(K, C)] = {
new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner)
}
+ def countByKey(numPartitions: Int = self.ssc.sc.defaultParallelism): DStream[(K, Long)] = {
+ self.map(x => (x._1, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
+ }
+
def groupByKeyAndWindow(windowTime: Time, slideTime: Time): DStream[(K, Seq[V])] = {
groupByKeyAndWindow(windowTime, slideTime, defaultPartitioner())
}
@@ -157,6 +161,21 @@ extends Serializable {
self, cleanedReduceFunc, cleanedInvReduceFunc, windowTime, slideTime, partitioner)
}
+ def countByKeyAndWindow(
+ windowTime: Time,
+ slideTime: Time,
+ numPartitions: Int = self.ssc.sc.defaultParallelism
+ ): DStream[(K, Long)] = {
+
+ self.map(x => (x._1, 1L)).reduceByKeyAndWindow(
+ (x: Long, y: Long) => x + y,
+ (x: Long, y: Long) => x - y,
+ windowTime,
+ slideTime,
+ numPartitions
+ )
+ }
+
// TODO:
//
//
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index 6f6b18a790..c17254b809 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -51,6 +51,15 @@ class InputStreamsSuite extends TestSuiteBase {
ssc.stop()
// Verify whether data received by Spark Streaming was as expected
+ logInfo("--------------------------------")
+ logInfo("output.size = " + outputBuffer.size)
+ logInfo("output")
+ outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+ logInfo("expected output.size = " + expectedOutput.size)
+ logInfo("expected output")
+ expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+ logInfo("--------------------------------")
+
assert(outputBuffer.size === expectedOutput.size)
for (i <- 0 until outputBuffer.size) {
assert(outputBuffer(i).size === 1)
@@ -101,6 +110,15 @@ class InputStreamsSuite extends TestSuiteBase {
ssc.stop()
// Verify whether data received by Spark Streaming was as expected
+ logInfo("--------------------------------")
+ logInfo("output.size = " + outputBuffer.size)
+ logInfo("output")
+ outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+ logInfo("expected output.size = " + expectedOutput.size)
+ logInfo("expected output")
+ expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+ logInfo("--------------------------------")
+
assert(outputBuffer.size === expectedOutput.size)
for (i <- 0 until outputBuffer.size) {
assert(outputBuffer(i).size === 1)
diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
index c1b7772e7b..c9bc454f91 100644
--- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
@@ -16,13 +16,14 @@ class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[
def compute(validTime: Time): Option[RDD[T]] = {
logInfo("Computing RDD for time " + validTime)
- val rdd = if (currentIndex < input.size) {
- ssc.sc.makeRDD(input(currentIndex), numPartitions)
+ val index = ((validTime - zeroTime) / slideTime - 1).toInt
+ val rdd = if (index < input.size) {
+ ssc.sc.makeRDD(input(index), numPartitions)
} else {
ssc.sc.makeRDD(Seq[T](), numPartitions)
}
logInfo("Created RDD " + rdd.id)
- currentIndex += 1
+ //currentIndex += 1
Some(rdd)
}
}
@@ -96,7 +97,6 @@ trait TestSuiteBase extends FunSuite with Logging {
ssc
}
-
def runStreams[V: ClassManifest](
ssc: StreamingContext,
numBatches: Int,
diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
index 90d67844bb..d7d8d5bd36 100644
--- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
@@ -1,6 +1,7 @@
package spark.streaming
import spark.streaming.StreamingContext._
+import collection.mutable.ArrayBuffer
class WindowOperationsSuite extends TestSuiteBase {
@@ -8,6 +9,8 @@ class WindowOperationsSuite extends TestSuiteBase {
override def maxWaitTimeMillis() = 20000
+ override def batchDuration() = Seconds(1)
+
val largerSlideInput = Seq(
Seq(("a", 1)),
Seq(("a", 2)), // 1st window from here
@@ -19,7 +22,7 @@ class WindowOperationsSuite extends TestSuiteBase {
Seq() // 4th window from here
)
- val largerSlideOutput = Seq(
+ val largerSlideReduceOutput = Seq(
Seq(("a", 3)),
Seq(("a", 10)),
Seq(("a", 18)),
@@ -42,7 +45,23 @@ class WindowOperationsSuite extends TestSuiteBase {
Seq()
)
- val bigOutput = Seq(
+ val bigGroupByOutput = Seq(
+ Seq(("a", Seq(1))),
+ Seq(("a", Seq(1, 1)), ("b", Seq(1))),
+ Seq(("a", Seq(1, 1)), ("b", Seq(1, 1)), ("c", Seq(1))),
+ Seq(("a", Seq(1, 1)), ("b", Seq(1, 1)), ("c", Seq(1))),
+ Seq(("a", Seq(1, 1)), ("b", Seq(1))),
+ Seq(("a", Seq(1))),
+ Seq(("a", Seq(1))),
+ Seq(("a", Seq(1, 1)), ("b", Seq(1))),
+ Seq(("a", Seq(1, 1)), ("b", Seq(1, 1)), ("c", Seq(1))),
+ Seq(("a", Seq(1, 1)), ("b", Seq(1, 1)), ("c", Seq(1))),
+ Seq(("a", Seq(1, 1)), ("b", Seq(1))),
+ Seq(("a", Seq(1)))
+ )
+
+
+ val bigReduceOutput = Seq(
Seq(("a", 1)),
Seq(("a", 2), ("b", 1)),
Seq(("a", 2), ("b", 2), ("c", 1)),
@@ -59,13 +78,14 @@ class WindowOperationsSuite extends TestSuiteBase {
/*
The output of the reduceByKeyAndWindow with inverse reduce function is
- difference from the naive reduceByKeyAndWindow. Even if the count of a
+ different from the naive reduceByKeyAndWindow. Even if the count of a
particular key is 0, the key does not get eliminated from the RDDs of
ReducedWindowedDStream. This causes the number of keys in these RDDs to
increase forever. A more generalized version that allows elimination of
keys should be considered.
*/
- val bigOutputInv = Seq(
+
+ val bigReduceInvOutput = Seq(
Seq(("a", 1)),
Seq(("a", 2), ("b", 1)),
Seq(("a", 2), ("b", 2), ("c", 1)),
@@ -80,38 +100,37 @@ class WindowOperationsSuite extends TestSuiteBase {
Seq(("a", 1), ("b", 0), ("c", 0))
)
- def testReduceByKeyAndWindow(
- name: String,
- input: Seq[Seq[(String, Int)]],
- expectedOutput: Seq[Seq[(String, Int)]],
- windowTime: Time = batchDuration * 2,
- slideTime: Time = batchDuration
- ) {
- test("reduceByKeyAndWindow - " + name) {
- val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
- val operation = (s: DStream[(String, Int)]) => {
- s.reduceByKeyAndWindow(_ + _, windowTime, slideTime).persist()
- }
- testOperation(input, operation, expectedOutput, numBatches, true)
- }
- }
+ // Testing window operation
- def testReduceByKeyAndWindowInv(
- name: String,
- input: Seq[Seq[(String, Int)]],
- expectedOutput: Seq[Seq[(String, Int)]],
- windowTime: Time = batchDuration * 2,
- slideTime: Time = batchDuration
- ) {
- test("reduceByKeyAndWindowInv - " + name) {
- val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
- val operation = (s: DStream[(String, Int)]) => {
- s.reduceByKeyAndWindow(_ + _, _ - _, windowTime, slideTime).persist()
- }
- testOperation(input, operation, expectedOutput, numBatches, true)
- }
- }
+ testWindow(
+ "basic window",
+ Seq( Seq(0), Seq(1), Seq(2), Seq(3), Seq(4), Seq(5)),
+ Seq( Seq(0), Seq(0, 1), Seq(1, 2), Seq(2, 3), Seq(3, 4), Seq(4, 5))
+ )
+ testWindow(
+ "tumbling window",
+ Seq( Seq(0), Seq(1), Seq(2), Seq(3), Seq(4), Seq(5)),
+ Seq( Seq(0, 1), Seq(2, 3), Seq(4, 5)),
+ Seconds(2),
+ Seconds(2)
+ )
+
+ testWindow(
+ "larger window",
+ Seq( Seq(0), Seq(1), Seq(2), Seq(3), Seq(4), Seq(5)),
+ Seq( Seq(0, 1), Seq(0, 1, 2, 3), Seq(2, 3, 4, 5), Seq(4, 5)),
+ Seconds(4),
+ Seconds(2)
+ )
+
+ testWindow(
+ "non-overlapping window",
+ Seq( Seq(0), Seq(1), Seq(2), Seq(3), Seq(4), Seq(5)),
+ Seq( Seq(1, 2), Seq(4, 5)),
+ Seconds(2),
+ Seconds(3)
+ )
// Testing naive reduceByKeyAndWindow (without invertible function)
@@ -142,13 +161,12 @@ class WindowOperationsSuite extends TestSuiteBase {
testReduceByKeyAndWindow(
"larger slide time",
largerSlideInput,
- largerSlideOutput,
+ largerSlideReduceOutput,
Seconds(4),
Seconds(2)
)
- testReduceByKeyAndWindow("big test", bigInput, bigOutput)
-
+ testReduceByKeyAndWindow("big test", bigInput, bigReduceOutput)
// Testing reduceByKeyAndWindow (with invertible reduce function)
@@ -179,10 +197,95 @@ class WindowOperationsSuite extends TestSuiteBase {
testReduceByKeyAndWindowInv(
"larger slide time",
largerSlideInput,
- largerSlideOutput,
+ largerSlideReduceOutput,
Seconds(4),
Seconds(2)
)
- testReduceByKeyAndWindowInv("big test", bigInput, bigOutputInv)
+ testReduceByKeyAndWindowInv("big test", bigInput, bigReduceInvOutput)
+
+ test("groupByKeyAndWindow") {
+ val input = bigInput
+ val expectedOutput = bigGroupByOutput.map(_.map(x => (x._1, x._2.toSet)))
+ val windowTime = Seconds(2)
+ val slideTime = Seconds(1)
+ val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
+ val operation = (s: DStream[(String, Int)]) => {
+ s.groupByKeyAndWindow(windowTime, slideTime)
+ .map(x => (x._1, x._2.toSet))
+ .persist()
+ }
+ testOperation(input, operation, expectedOutput, numBatches, true)
+ }
+
+ test("countByWindow") {
+ val input = Seq(Seq(1), Seq(1), Seq(1, 2), Seq(0), Seq(), Seq() )
+ val expectedOutput = Seq( Seq(1), Seq(2), Seq(3), Seq(3), Seq(1), Seq(0))
+ val windowTime = Seconds(2)
+ val slideTime = Seconds(1)
+ val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
+ val operation = (s: DStream[Int]) => s.countByWindow(windowTime, slideTime)
+ testOperation(input, operation, expectedOutput, numBatches, true)
+ }
+
+ test("countByKeyAndWindow") {
+ val input = Seq(Seq(("a", 1)), Seq(("b", 1), ("b", 2)), Seq(("a", 10), ("b", 20)))
+ val expectedOutput = Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 2)), Seq(("a", 1), ("b", 3)))
+ val windowTime = Seconds(2)
+ val slideTime = Seconds(1)
+ val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
+ val operation = (s: DStream[(String, Int)]) => {
+ s.countByKeyAndWindow(windowTime, slideTime).map(x => (x._1, x._2.toInt))
+ }
+ testOperation(input, operation, expectedOutput, numBatches, true)
+ }
+
+
+ // Helper functions
+
+ def testWindow(
+ name: String,
+ input: Seq[Seq[Int]],
+ expectedOutput: Seq[Seq[Int]],
+ windowTime: Time = Seconds(2),
+ slideTime: Time = Seconds(1)
+ ) {
+ test("window - " + name) {
+ val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
+ val operation = (s: DStream[Int]) => s.window(windowTime, slideTime)
+ testOperation(input, operation, expectedOutput, numBatches, true)
+ }
+ }
+
+ def testReduceByKeyAndWindow(
+ name: String,
+ input: Seq[Seq[(String, Int)]],
+ expectedOutput: Seq[Seq[(String, Int)]],
+ windowTime: Time = Seconds(2),
+ slideTime: Time = Seconds(1)
+ ) {
+ test("reduceByKeyAndWindow - " + name) {
+ val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
+ val operation = (s: DStream[(String, Int)]) => {
+ s.reduceByKeyAndWindow(_ + _, windowTime, slideTime).persist()
+ }
+ testOperation(input, operation, expectedOutput, numBatches, true)
+ }
+ }
+
+ def testReduceByKeyAndWindowInv(
+ name: String,
+ input: Seq[Seq[(String, Int)]],
+ expectedOutput: Seq[Seq[(String, Int)]],
+ windowTime: Time = Seconds(2),
+ slideTime: Time = Seconds(1)
+ ) {
+ test("reduceByKeyAndWindowInv - " + name) {
+ val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt
+ val operation = (s: DStream[(String, Int)]) => {
+ s.reduceByKeyAndWindow(_ + _, _ - _, windowTime, slideTime).persist()
+ }
+ testOperation(input, operation, expectedOutput, numBatches, true)
+ }
+ }
}