diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-07 11:59:19 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-07 11:59:19 -0800 |
commit | 915d9931fea72296ed50765ad3f7a88d254d7d60 (patch) | |
tree | 23244219b4c19f3eb2b92dfa341b29d69533e4df /streaming/src | |
parent | 86057ec7c868262763d1e31b3f3c94bd43eeafb3 (diff) | |
parent | c0694291c81ad775918421941a80a00ca9593a38 (diff) | |
download | spark-915d9931fea72296ed50765ad3f7a88d254d7d60.tar.gz spark-915d9931fea72296ed50765ad3f7a88d254d7d60.tar.bz2 spark-915d9931fea72296ed50765ad3f7a88d254d7d60.zip |
Merge pull request #373 from Reinvigorate/sm-updateStateByKey
StateDStream changes to give updateStateByKey consistent behavior
Diffstat (limited to 'streaming/src')
4 files changed, 78 insertions, 6 deletions
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 14500bdcb1..3cec35cb37 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -283,17 +283,31 @@ class StreamingContext private ( } /** - * Creates a input stream from an queue of RDDs. In each batch, + * Creates an input stream from a queue of RDDs. In each batch, * it will process either one or all of the RDDs returned by the queue. * @param queue Queue of RDDs * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval - * @param defaultRDD Default RDD is returned by the DStream when the queue is empty * @tparam T Type of objects in the RDD */ def queueStream[T: ClassManifest]( queue: Queue[RDD[T]], - oneAtATime: Boolean = true, - defaultRDD: RDD[T] = null + oneAtATime: Boolean = true + ): DStream[T] = { + queueStream(queue, oneAtATime, sc.makeRDD(Seq[T](), 1)) + } + + /** + * Creates an input stream from a queue of RDDs. In each batch, + * it will process either one or all of the RDDs returned by the queue. + * @param queue Queue of RDDs + * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval + * @param defaultRDD Default RDD is returned by the DStream when the queue is empty. Set as null if no RDD should be returned when empty + * @tparam T Type of objects in the RDD + */ + def queueStream[T: ClassManifest]( + queue: Queue[RDD[T]], + oneAtATime: Boolean, + defaultRDD: RDD[T] ): DStream[T] = { val inputStream = new QueueInputDStream(this, queue, oneAtATime, defaultRDD) registerInputStream(inputStream) diff --git a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala index b4506c74aa..db62955036 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala @@ -48,8 +48,16 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S: ClassManifest]( //logDebug("Generating state RDD for time " + validTime) return Some(stateRDD) } - case None => { // If parent RDD does not exist, then return old state RDD - return Some(prevStateRDD) + case None => { // If parent RDD does not exist + + // Re-apply the update function to the old state RDD + val updateFuncLocal = updateFunc + val finalFunc = (iterator: Iterator[(K, S)]) => { + val i = iterator.map(t => (t._1, Seq[V](), Option(t._2))) + updateFuncLocal(i) + } + val stateRDD = prevStateRDD.mapPartitions(finalFunc, preservePartitioning) + return Some(stateRDD) } } } diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala index bfdf32c73e..d98b840b8e 100644 --- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala @@ -165,6 +165,51 @@ class BasicOperationsSuite extends TestSuiteBase { testOperation(inputData, updateStateOperation, outputData, true) } + test("updateStateByKey - object lifecycle") { + val inputData = + Seq( + Seq("a","b"), + null, + Seq("a","c","a"), + Seq("c"), + null, + null + ) + + val outputData = + Seq( + Seq(("a", 1), ("b", 1)), + Seq(("a", 1), ("b", 1)), + Seq(("a", 3), ("c", 1)), + Seq(("a", 3), ("c", 2)), + Seq(("c", 2)), + Seq() + ) + + val updateStateOperation = (s: DStream[String]) => { + class StateObject(var counter: Int = 0, var expireCounter: Int = 0) extends Serializable + + // updateFunc clears a state when a StateObject is seen without new values twice in a row + val updateFunc = (values: Seq[Int], state: Option[StateObject]) => { + val stateObj = state.getOrElse(new StateObject) + values.foldLeft(0)(_ + _) match { + case 0 => stateObj.expireCounter += 1 // no new values + case n => { // has new values, increment and reset expireCounter + stateObj.counter += n + stateObj.expireCounter = 0 + } + } + stateObj.expireCounter match { + case 2 => None // seen twice with no new values, give it the boot + case _ => Option(stateObj) + } + } + s.map(_ -> 1).updateStateByKey[StateObject](updateFunc).mapValues(_.counter) + } + + testOperation(inputData, updateStateOperation, outputData, true) + } + test("forgetting of RDDs - map and window operations") { assert(batchDuration === Seconds(1), "Batch duration has changed from 1 second") diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala index 49129f3964..c2733831b2 100644 --- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala @@ -28,6 +28,11 @@ class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[ logInfo("Computing RDD for time " + validTime) val index = ((validTime - zeroTime) / slideDuration - 1).toInt val selectedInput = if (index < input.size) input(index) else Seq[T]() + + // lets us test cases where RDDs are not created + if (selectedInput == null) + return None + val rdd = ssc.sc.makeRDD(selectedInput, numPartitions) logInfo("Created RDD " + rdd.id + " with " + selectedInput) Some(rdd) |