aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-08-31 03:47:34 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2012-08-31 03:47:34 -0700
commit2d01d38a4199590145551a108903a3ac7cffcceb (patch)
treeef8318c379f15170e21c5e84064c841d9d44638d
parent4db3a967669a53de4c4b79b4c0b70daa5accb682 (diff)
downloadspark-2d01d38a4199590145551a108903a3ac7cffcceb.tar.gz
spark-2d01d38a4199590145551a108903a3ac7cffcceb.tar.bz2
spark-2d01d38a4199590145551a108903a3ac7cffcceb.zip
Added StateDStream, corresponding stateful stream operations, and testcases. Also refactored few PairDStreamFunctions methods.
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala31
-rw-r--r--streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala162
-rw-r--r--streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala13
-rw-r--r--streaming/src/main/scala/spark/streaming/Scheduler.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/StateDStream.scala83
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala4
-rw-r--r--streaming/src/test/scala/spark/streaming/DStreamSuite.scala90
7 files changed, 303 insertions, 82 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 3a57488f9b..8c06345933 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -3,13 +3,12 @@ package spark.streaming
import spark.streaming.StreamingContext._
import spark.RDD
-import spark.BlockRDD
import spark.UnionRDD
import spark.Logging
-import spark.SparkContext
import spark.SparkContext._
import spark.storage.StorageLevel
-
+import spark.Partitioner
+
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
@@ -95,12 +94,12 @@ extends Logging with Serializable {
/** This method checks whether the 'time' is valid wrt slideTime for generating RDD */
private def isTimeValid (time: Time): Boolean = {
- if (!isInitialized)
+ if (!isInitialized) {
throw new Exception (this.toString + " has not been initialized")
- if ((time - zeroTime).isMultipleOf(slideTime)) {
- true
- } else {
+ } else if (time < zeroTime || ! (time - zeroTime).isMultipleOf(slideTime)) {
false
+ } else {
+ true
}
}
@@ -119,7 +118,7 @@ extends Logging with Serializable {
// if RDD was not generated, and if the time is valid
// (based on sliding time of this DStream), then generate the RDD
- case None =>
+ case None => {
if (isTimeValid(time)) {
compute(time) match {
case Some(newRDD) =>
@@ -138,6 +137,7 @@ extends Logging with Serializable {
} else {
None
}
+ }
}
}
@@ -361,24 +361,19 @@ class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiner: (C, C) => C,
- numPartitions: Int)
+ partitioner: Partitioner)
extends DStream [(K,C)] (parent.ssc) {
override def dependencies = List(parent)
override def slideTime: Time = parent.slideTime
+
+
override def compute(validTime: Time): Option[RDD[(K,C)]] = {
parent.getOrCompute(validTime) match {
- case Some(rdd) =>
- val newrdd = {
- if (numPartitions > 0) {
- rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, numPartitions)
- } else {
- rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner)
- }
- }
- Some(newrdd)
+ case Some(rdd) =>
+ Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner))
case None => None
}
}
diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
index d2887c3aea..13db34ac80 100644
--- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
@@ -1,71 +1,169 @@
package spark.streaming
import scala.collection.mutable.ArrayBuffer
+import spark.Partitioner
+import spark.HashPartitioner
import spark.streaming.StreamingContext._
+import javax.annotation.Nullable
class PairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)])
extends Serializable {
def ssc = stream.ssc
+ def defaultPartitioner(numPartitions: Int = stream.ssc.sc.defaultParallelism) = {
+ new HashPartitioner(numPartitions)
+ }
+
/* ---------------------------------- */
/* DStream operations for key-value pairs */
/* ---------------------------------- */
-
- def groupByKey(numPartitions: Int = 0): ShuffledDStream[K, V, ArrayBuffer[V]] = {
+
+ def groupByKey(): ShuffledDStream[K, V, ArrayBuffer[V]] = {
+ groupByKey(defaultPartitioner())
+ }
+
+ def groupByKey(numPartitions: Int): ShuffledDStream[K, V, ArrayBuffer[V]] = {
+ groupByKey(defaultPartitioner(numPartitions))
+ }
+
+ def groupByKey(partitioner: Partitioner): ShuffledDStream[K, V, ArrayBuffer[V]] = {
def createCombiner(v: V) = ArrayBuffer[V](v)
def mergeValue(c: ArrayBuffer[V], v: V) = (c += v)
def mergeCombiner(c1: ArrayBuffer[V], c2: ArrayBuffer[V]) = (c1 ++ c2)
- combineByKey[ArrayBuffer[V]](createCombiner, mergeValue, mergeCombiner, numPartitions)
+ combineByKey[ArrayBuffer[V]](createCombiner _, mergeValue _, mergeCombiner _, partitioner)
}
-
- def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int = 0): ShuffledDStream[K, V, V] = {
+
+ def reduceByKey(reduceFunc: (V, V) => V): ShuffledDStream[K, V, V] = {
+ reduceByKey(reduceFunc, defaultPartitioner())
+ }
+
+ def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): ShuffledDStream[K, V, V] = {
+ reduceByKey(reduceFunc, defaultPartitioner(numPartitions))
+ }
+
+ def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): ShuffledDStream[K, V, V] = {
val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
- combineByKey[V]((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, numPartitions)
+ combineByKey[V]((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, partitioner)
}
private def combineByKey[C: ClassManifest](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiner: (C, C) => C,
- numPartitions: Int) : ShuffledDStream[K, V, C] = {
- new ShuffledDStream[K, V, C](stream, createCombiner, mergeValue, mergeCombiner, numPartitions)
+ partitioner: Partitioner) : ShuffledDStream[K, V, C] = {
+ new ShuffledDStream[K, V, C](stream, createCombiner, mergeValue, mergeCombiner, partitioner)
+ }
+
+ def groupByKeyAndWindow(windowTime: Time, slideTime: Time): ShuffledDStream[K, V, ArrayBuffer[V]] = {
+ groupByKeyAndWindow(windowTime, slideTime, defaultPartitioner())
+ }
+
+ def groupByKeyAndWindow(windowTime: Time, slideTime: Time, numPartitions: Int): ShuffledDStream[K, V, ArrayBuffer[V]] = {
+ groupByKeyAndWindow(windowTime, slideTime, defaultPartitioner(numPartitions))
}
def groupByKeyAndWindow(
- windowTime: Time,
- slideTime: Time,
- numPartitions: Int = 0): ShuffledDStream[K, V, ArrayBuffer[V]] = {
- stream.window(windowTime, slideTime).groupByKey(numPartitions)
+ windowTime: Time,
+ slideTime: Time,
+ partitioner: Partitioner
+ ): ShuffledDStream[K, V, ArrayBuffer[V]] = {
+ stream.window(windowTime, slideTime).groupByKey(partitioner)
+ }
+
+ def reduceByKeyAndWindow(reduceFunc: (V, V) => V, windowTime: Time, slideTime: Time): ShuffledDStream[K, V, V] = {
+ reduceByKeyAndWindow(reduceFunc, windowTime, slideTime, defaultPartitioner())
+ }
+
+ def reduceByKeyAndWindow(reduceFunc: (V, V) => V, windowTime: Time, slideTime: Time, numPartitions: Int): ShuffledDStream[K, V, V] = {
+ reduceByKeyAndWindow(reduceFunc, windowTime, slideTime, defaultPartitioner(numPartitions))
}
def reduceByKeyAndWindow(
- reduceFunc: (V, V) => V,
- windowTime: Time,
- slideTime: Time,
- numPartitions: Int = 0): ShuffledDStream[K, V, V] = {
- stream.window(windowTime, slideTime).reduceByKey(ssc.sc.clean(reduceFunc), numPartitions)
+ reduceFunc: (V, V) => V,
+ windowTime: Time,
+ slideTime: Time,
+ partitioner: Partitioner
+ ): ShuffledDStream[K, V, V] = {
+ stream.window(windowTime, slideTime).reduceByKey(ssc.sc.clean(reduceFunc), partitioner)
}
- // This method is the efficient sliding window reduce operation,
- // which requires the specification of an inverse reduce function,
- // so that new elements introduced in the window can be "added" using
- // reduceFunc to the previous window's result and old elements can be
+ // This method is the efficient sliding window reduce operation,
+ // which requires the specification of an inverse reduce function,
+ // so that new elements introduced in the window can be "added" using
+ // reduceFunc to the previous window's result and old elements can be
// "subtracted using invReduceFunc.
def reduceByKeyAndWindow(
- reduceFunc: (V, V) => V,
- invReduceFunc: (V, V) => V,
- windowTime: Time,
- slideTime: Time,
- numPartitions: Int): ReducedWindowedDStream[K, V] = {
+ reduceFunc: (V, V) => V,
+ invReduceFunc: (V, V) => V,
+ windowTime: Time,
+ slideTime: Time
+ ): ReducedWindowedDStream[K, V] = {
+
+ reduceByKeyAndWindow(
+ reduceFunc, invReduceFunc, windowTime, slideTime, defaultPartitioner())
+ }
+
+ def reduceByKeyAndWindow(
+ reduceFunc: (V, V) => V,
+ invReduceFunc: (V, V) => V,
+ windowTime: Time,
+ slideTime: Time,
+ numPartitions: Int
+ ): ReducedWindowedDStream[K, V] = {
+
+ reduceByKeyAndWindow(
+ reduceFunc, invReduceFunc, windowTime, slideTime, defaultPartitioner(numPartitions))
+ }
+
+ def reduceByKeyAndWindow(
+ reduceFunc: (V, V) => V,
+ invReduceFunc: (V, V) => V,
+ windowTime: Time,
+ slideTime: Time,
+ partitioner: Partitioner
+ ): ReducedWindowedDStream[K, V] = {
+ val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
+ val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc)
new ReducedWindowedDStream[K, V](
- stream,
- ssc.sc.clean(reduceFunc),
- ssc.sc.clean(invReduceFunc),
- windowTime,
- slideTime,
- numPartitions)
+ stream, cleanedReduceFunc, cleanedInvReduceFunc, windowTime, slideTime, partitioner)
+ }
+
+ // TODO:
+ //
+ //
+ //
+ //
+ def updateStateByKey[S <: AnyRef : ClassManifest](
+ updateFunc: (Seq[V], S) => S
+ ): StateDStream[K, V, S] = {
+ updateStateByKey(updateFunc, defaultPartitioner())
+ }
+
+ def updateStateByKey[S <: AnyRef : ClassManifest](
+ updateFunc: (Seq[V], S) => S,
+ numPartitions: Int
+ ): StateDStream[K, V, S] = {
+ updateStateByKey(updateFunc, defaultPartitioner(numPartitions))
+ }
+
+ def updateStateByKey[S <: AnyRef : ClassManifest](
+ updateFunc: (Seq[V], S) => S,
+ partitioner: Partitioner
+ ): StateDStream[K, V, S] = {
+ val func = (iterator: Iterator[(K, Seq[V], S)]) => {
+ iterator.map(tuple => (tuple._1, updateFunc(tuple._2, tuple._3)))
+ }
+ updateStateByKey(func, partitioner, true)
+ }
+
+ def updateStateByKey[S <: AnyRef : ClassManifest](
+ updateFunc: (Iterator[(K, Seq[V], S)]) => Iterator[(K, S)],
+ partitioner: Partitioner,
+ rememberPartitioner: Boolean
+ ): StateDStream[K, V, S] = {
+ new StateDStream(stream, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner)
}
}
diff --git a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
index 896e7dbafb..191d264b2b 100644
--- a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
@@ -5,7 +5,7 @@ import spark.streaming.StreamingContext._
import spark.RDD
import spark.UnionRDD
import spark.CoGroupedRDD
-import spark.HashPartitioner
+import spark.Partitioner
import spark.SparkContext._
import spark.storage.StorageLevel
@@ -17,8 +17,8 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
invReduceFunc: (V, V) => V,
_windowTime: Time,
_slideTime: Time,
- numPartitions: Int)
-extends DStream[(K,V)](parent.ssc) {
+ partitioner: Partitioner
+ ) extends DStream[(K,V)](parent.ssc) {
if (!_windowTime.isMultipleOf(parent.slideTime))
throw new Exception("The window duration of ReducedWindowedDStream (" + _slideTime + ") " +
@@ -28,7 +28,7 @@ extends DStream[(K,V)](parent.ssc) {
throw new Exception("The slide duration of ReducedWindowedDStream (" + _slideTime + ") " +
"must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")")
- val reducedStream = parent.reduceByKey(reduceFunc, numPartitions)
+ val reducedStream = parent.reduceByKey(reduceFunc, partitioner)
val allowPartialWindows = true
//reducedStream.persist(StorageLevel.MEMORY_ONLY_DESER_2)
@@ -104,7 +104,7 @@ extends DStream[(K,V)](parent.ssc) {
if (reducedRDDs.size == 0) {
throw new Exception("Could not generate the first RDD for time " + validTime)
}
- return Some(new UnionRDD(ssc.sc, reducedRDDs).reduceByKey(reduceFunc, numPartitions))
+ return Some(new UnionRDD(ssc.sc, reducedRDDs).reduceByKey(partitioner, reduceFunc))
}
}
}
@@ -137,8 +137,7 @@ extends DStream[(K,V)](parent.ssc) {
}
t -= reducedStream.slideTime
}
-
- val partitioner = new HashPartitioner(numPartitions)
+
val allRDDs = new ArrayBuffer[RDD[(_, _)]]()
allRDDs += previousWindowRDD
allRDDs ++= oldRDDs
diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala
index da50e22719..12e52bf56c 100644
--- a/streaming/src/main/scala/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala
@@ -41,7 +41,7 @@ extends Logging {
def generateRDDs (time: Time) {
println("\n-----------------------------------------------------\n")
- logInfo("Generating RDDs for time " + time)
+ logInfo("Generating RDDs for time " + time)
outputStreams.foreach(outputStream => {
outputStream.generateJob(time) match {
case Some(job) => submitJob(job)
diff --git a/streaming/src/main/scala/spark/streaming/StateDStream.scala b/streaming/src/main/scala/spark/streaming/StateDStream.scala
new file mode 100644
index 0000000000..eabb33d89e
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/StateDStream.scala
@@ -0,0 +1,83 @@
+package spark.streaming
+
+import spark.RDD
+import spark.Partitioner
+import spark.MapPartitionsRDD
+import spark.SparkContext._
+import javax.annotation.Nullable
+
+
+class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest](
+ parent: DStream[(K, V)],
+ updateFunc: (Iterator[(K, Seq[V], S)]) => Iterator[(K, S)],
+ partitioner: Partitioner,
+ rememberPartitioner: Boolean
+ ) extends DStream[(K, S)](parent.ssc) {
+
+ class SpecialMapPartitionsRDD[U: ClassManifest, T: ClassManifest](prev: RDD[T], f: Iterator[T] => Iterator[U])
+ extends MapPartitionsRDD(prev, f) {
+ override val partitioner = if (rememberPartitioner) prev.partitioner else None
+ }
+
+ override def dependencies = List(parent)
+
+ override def slideTime = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[(K, S)]] = {
+
+ // Try to get the previous state RDD
+ getOrCompute(validTime - slideTime) match {
+
+ case Some(prevStateRDD) => { // If previous state RDD exists
+
+ // Define the function for the mapPartition operation on cogrouped RDD;
+ // first map the cogrouped tuple to tuples of required type,
+ // and then apply the update function
+ val func = (iterator: Iterator[(K, (Seq[V], Seq[S]))]) => {
+ val i = iterator.map(t => {
+ (t._1, t._2._1, t._2._2.headOption.getOrElse(null.asInstanceOf[S]))
+ })
+ updateFunc(i)
+ }
+
+ // Try to get the parent RDD
+ parent.getOrCompute(validTime) match {
+ case Some(parentRDD) => { // If parent RDD exists, then compute as usual
+ val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner)
+ val stateRDD = new SpecialMapPartitionsRDD(cogroupedRDD, func)
+ logDebug("Generating state RDD for time " + validTime)
+ return Some(stateRDD)
+ }
+ case None => { // If parent RDD does not exist, then return old state RDD
+ logDebug("Generating state RDD for time " + validTime + " (no change)")
+ return Some(prevStateRDD)
+ }
+ }
+ }
+
+ case None => { // If previous session RDD does not exist (first input data)
+
+ // Define the function for the mapPartition operation on grouped RDD;
+ // first map the grouped tuple to tuples of required type,
+ // and then apply the update function
+ val func = (iterator: Iterator[(K, Seq[V])]) => {
+ updateFunc(iterator.map(tuple => (tuple._1, tuple._2, null.asInstanceOf[S])))
+ }
+
+ // Try to get the parent RDD
+ parent.getOrCompute(validTime) match {
+ case Some(parentRDD) => { // If parent RDD exists, then compute as usual
+ val groupedRDD = parentRDD.groupByKey(partitioner)
+ val sessionRDD = new SpecialMapPartitionsRDD(groupedRDD, func)
+ logDebug("Generating state RDD for time " + validTime + " (first)")
+ return Some(sessionRDD)
+ }
+ case None => { // If parent RDD does not exist, then nothing to do!
+ logDebug("Not generating state RDD (no previous state, no parent)")
+ return None
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala
index 0a33a05bae..0aa5294a17 100644
--- a/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala
@@ -14,8 +14,8 @@ object WordCountNetwork {
val ssc = new StreamingContext(args(0), "WordCountNetwork")
ssc.setBatchDuration(Seconds(2))
- // Create the FileInputDStream on the directory and use the
- // stream to count words in new files created
+ // Create a NetworkInputDStream on target ip:port and count the
+ // words in input stream of \n delimited test (eg. generated by 'nc')
val lines = ssc.createNetworkTextStream(args(1), args(2).toInt)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
diff --git a/streaming/src/test/scala/spark/streaming/DStreamSuite.scala b/streaming/src/test/scala/spark/streaming/DStreamSuite.scala
index e9dc377263..d5eb20b37e 100644
--- a/streaming/src/test/scala/spark/streaming/DStreamSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/DStreamSuite.scala
@@ -1,14 +1,14 @@
package spark.streaming
import spark.Logging
-import spark.RDD
+import spark.streaming.StreamingContext._
import spark.streaming.util.ManualClock
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.SynchronizedQueue
+import scala.runtime.RichInt
class DStreamSuite extends FunSuite with BeforeAndAfter with Logging {
@@ -20,7 +20,9 @@ class DStreamSuite extends FunSuite with BeforeAndAfter with Logging {
def testOp[U: ClassManifest, V: ClassManifest](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
- expectedOutput: Seq[Seq[V]]) {
+ expectedOutput: Seq[Seq[V]],
+ useSet: Boolean = false
+ ) {
try {
ssc = new StreamingContext("local", "test")
ssc.setBatchDuration(Milliseconds(batchDurationMillis))
@@ -33,45 +35,89 @@ class DStreamSuite extends FunSuite with BeforeAndAfter with Logging {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
clock.addToTime(input.size * batchDurationMillis)
- Thread.sleep(100)
+ Thread.sleep(1000)
val output = new ArrayBuffer[Seq[V]]()
while(outputQueue.size > 0) {
val rdd = outputQueue.take()
output += (rdd.collect())
}
+
assert(output.size === expectedOutput.size)
for (i <- 0 until output.size) {
- assert(output(i).toList === expectedOutput(i).toList)
+ if (useSet) {
+ assert(output(i).toSet === expectedOutput(i).toSet)
+ } else {
+ assert(output(i).toList === expectedOutput(i).toList)
+ }
}
} finally {
ssc.stop()
}
}
-
- test("basic operations") {
- val inputData = Array(1 to 4, 5 to 8, 9 to 12)
+
+ test("map-like operations") {
+ val inputData = Seq(1 to 4, 5 to 8, 9 to 12)
// map
testOp(inputData, (r: DStream[Int]) => r.map(_.toString), inputData.map(_.map(_.toString)))
// flatMap
- testOp(inputData, (r: DStream[Int]) => r.flatMap(x => Array(x, x * 2)),
- inputData.map(_.flatMap(x => Array(x, x * 2)))
+ testOp(
+ inputData,
+ (r: DStream[Int]) => r.flatMap(x => Seq(x, x * 2)),
+ inputData.map(_.flatMap(x => Array(x, x * 2)))
)
}
-}
-object DStreamSuite {
- def main(args: Array[String]) {
- try {
- val r = new DStreamSuite()
- val inputData = Array(1 to 4, 5 to 8, 9 to 12)
- r.testOp(inputData, (r: DStream[Int]) => r.map(_.toString), inputData.map(_.map(_.toString)))
-
- } catch {
- case e: Exception => e.printStackTrace()
+ test("shuffle-based operations") {
+ // reduceByKey
+ testOp(
+ Seq(Seq("a", "a", "b"), Seq("", ""), Seq()),
+ (s: DStream[String]) => s.map(x => (x, 1)).reduceByKey(_ + _),
+ Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
+ true
+ )
+
+ // reduce
+ testOp(
+ Seq(1 to 4, 5 to 8, 9 to 12),
+ (s: DStream[Int]) => s.reduce(_ + _),
+ Seq(Seq(10), Seq(26), Seq(42))
+ )
+ }
+
+ test("window-based operations") {
+
+ }
+
+
+ test("stateful operations") {
+ val inputData =
+ Seq(
+ Seq("a", "b", "c"),
+ Seq("a", "b", "c"),
+ Seq("a", "b", "c")
+ )
+
+ val outputData =
+ Seq(
+ Seq(("a", 1), ("b", 1), ("c", 1)),
+ Seq(("a", 2), ("b", 2), ("c", 2)),
+ Seq(("a", 3), ("b", 3), ("c", 3))
+ )//.map(array => array.toSeq.map(x => (x._1, new RichInt(x._2))))
+
+ val updateStateOp =(s: DStream[String]) => {
+ val updateFunc = (values: Seq[Int], state: RichInt) => {
+ var newState = 0
+ if (values != null) newState += values.reduce(_ + _)
+ if (state != null) newState += state.self
+ //println("values = " + values + ", state = " + state + ", " + " new state = " + newState)
+ new RichInt(newState)
+ }
+ s.map(x => (x, 1)).updateStateByKey[RichInt](updateFunc).map(t => (t._1, t._2.self))
}
- System.exit(0)
+
+ testOp(inputData, updateStateOp, outputData, true)
}
-} \ No newline at end of file
+}