aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-02-07 11:59:19 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-02-07 11:59:19 -0800
commit915d9931fea72296ed50765ad3f7a88d254d7d60 (patch)
tree23244219b4c19f3eb2b92dfa341b29d69533e4df
parent86057ec7c868262763d1e31b3f3c94bd43eeafb3 (diff)
parentc0694291c81ad775918421941a80a00ca9593a38 (diff)
downloadspark-915d9931fea72296ed50765ad3f7a88d254d7d60.tar.gz
spark-915d9931fea72296ed50765ad3f7a88d254d7d60.tar.bz2
spark-915d9931fea72296ed50765ad3f7a88d254d7d60.zip
Merge pull request #373 from Reinvigorate/sm-updateStateByKey
StateDStream changes to give updateStateByKey consistent behavior
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala22
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala12
-rw-r--r--streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala45
-rw-r--r--streaming/src/test/scala/spark/streaming/TestSuiteBase.scala5
4 files changed, 78 insertions, 6 deletions
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 14500bdcb1..3cec35cb37 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -283,17 +283,31 @@ class StreamingContext private (
}
/**
- * Creates a input stream from an queue of RDDs. In each batch,
+ * Creates an input stream from a queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
* @param queue Queue of RDDs
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
- * @param defaultRDD Default RDD is returned by the DStream when the queue is empty
* @tparam T Type of objects in the RDD
*/
def queueStream[T: ClassManifest](
queue: Queue[RDD[T]],
- oneAtATime: Boolean = true,
- defaultRDD: RDD[T] = null
+ oneAtATime: Boolean = true
+ ): DStream[T] = {
+ queueStream(queue, oneAtATime, sc.makeRDD(Seq[T](), 1))
+ }
+
+ /**
+ * Creates an input stream from a queue of RDDs. In each batch,
+ * it will process either one or all of the RDDs returned by the queue.
+ * @param queue Queue of RDDs
+ * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
+ * @param defaultRDD Default RDD is returned by the DStream when the queue is empty. Set as null if no RDD should be returned when empty
+ * @tparam T Type of objects in the RDD
+ */
+ def queueStream[T: ClassManifest](
+ queue: Queue[RDD[T]],
+ oneAtATime: Boolean,
+ defaultRDD: RDD[T]
): DStream[T] = {
val inputStream = new QueueInputDStream(this, queue, oneAtATime, defaultRDD)
registerInputStream(inputStream)
diff --git a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala
index b4506c74aa..db62955036 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala
@@ -48,8 +48,16 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S: ClassManifest](
//logDebug("Generating state RDD for time " + validTime)
return Some(stateRDD)
}
- case None => { // If parent RDD does not exist, then return old state RDD
- return Some(prevStateRDD)
+ case None => { // If parent RDD does not exist
+
+ // Re-apply the update function to the old state RDD
+ val updateFuncLocal = updateFunc
+ val finalFunc = (iterator: Iterator[(K, S)]) => {
+ val i = iterator.map(t => (t._1, Seq[V](), Option(t._2)))
+ updateFuncLocal(i)
+ }
+ val stateRDD = prevStateRDD.mapPartitions(finalFunc, preservePartitioning)
+ return Some(stateRDD)
}
}
}
diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
index bfdf32c73e..d98b840b8e 100644
--- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
@@ -165,6 +165,51 @@ class BasicOperationsSuite extends TestSuiteBase {
testOperation(inputData, updateStateOperation, outputData, true)
}
+ test("updateStateByKey - object lifecycle") {
+ val inputData =
+ Seq(
+ Seq("a","b"),
+ null,
+ Seq("a","c","a"),
+ Seq("c"),
+ null,
+ null
+ )
+
+ val outputData =
+ Seq(
+ Seq(("a", 1), ("b", 1)),
+ Seq(("a", 1), ("b", 1)),
+ Seq(("a", 3), ("c", 1)),
+ Seq(("a", 3), ("c", 2)),
+ Seq(("c", 2)),
+ Seq()
+ )
+
+ val updateStateOperation = (s: DStream[String]) => {
+ class StateObject(var counter: Int = 0, var expireCounter: Int = 0) extends Serializable
+
+ // updateFunc clears a state when a StateObject is seen without new values twice in a row
+ val updateFunc = (values: Seq[Int], state: Option[StateObject]) => {
+ val stateObj = state.getOrElse(new StateObject)
+ values.foldLeft(0)(_ + _) match {
+ case 0 => stateObj.expireCounter += 1 // no new values
+ case n => { // has new values, increment and reset expireCounter
+ stateObj.counter += n
+ stateObj.expireCounter = 0
+ }
+ }
+ stateObj.expireCounter match {
+ case 2 => None // seen twice with no new values, give it the boot
+ case _ => Option(stateObj)
+ }
+ }
+ s.map(_ -> 1).updateStateByKey[StateObject](updateFunc).mapValues(_.counter)
+ }
+
+ testOperation(inputData, updateStateOperation, outputData, true)
+ }
+
test("forgetting of RDDs - map and window operations") {
assert(batchDuration === Seconds(1), "Batch duration has changed from 1 second")
diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
index 49129f3964..c2733831b2 100644
--- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
@@ -28,6 +28,11 @@ class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[
logInfo("Computing RDD for time " + validTime)
val index = ((validTime - zeroTime) / slideDuration - 1).toInt
val selectedInput = if (index < input.size) input(index) else Seq[T]()
+
+ // lets us test cases where RDDs are not created
+ if (selectedInput == null)
+ return None
+
val rdd = ssc.sc.makeRDD(selectedInput, numPartitions)
logInfo("Created RDD " + rdd.id + " with " + selectedInput)
Some(rdd)