diff options
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala | 18 | ||||
-rw-r--r-- | streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala | 7 |
2 files changed, 18 insertions, 7 deletions
diff --git a/streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala b/streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala index 9b953d9dae..965b58c03f 100644 --- a/streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala +++ b/streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala @@ -39,22 +39,28 @@ class DStreamBasicSuite extends DStreamSuiteBase { test("stateful operations") { val inputData = Seq( + Seq("a"), + Seq("a", "b"), Seq("a", "b", "c"), - Seq("a", "b", "c"), - Seq("a", "b", "c") + Seq("a", "b"), + Seq("a"), + Seq() ) val outputData = Seq( - Seq(("a", 1), ("b", 1), ("c", 1)), - Seq(("a", 2), ("b", 2), ("c", 2)), - Seq(("a", 3), ("b", 3), ("c", 3)) + Seq(("a", 1)), + Seq(("a", 2), ("b", 1)), + Seq(("a", 3), ("b", 2), ("c", 1)), + Seq(("a", 4), ("b", 3), ("c", 1)), + Seq(("a", 5), ("b", 3), ("c", 1)), + Seq(("a", 5), ("b", 3), ("c", 1)) ) val updateStateOp = (s: DStream[String]) => { val updateFunc = (values: Seq[Int], state: RichInt) => { var newState = 0 - if (values != null) newState += values.reduce(_ + _) + if (values != null && values.size > 0) newState += values.reduce(_ + _) if (state != null) newState += state.self //println("values = " + values + ", state = " + state + ", " + " new state = " + newState) new RichInt(newState) diff --git a/streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala b/streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala index 1c4ea14b1d..59fe36baf0 100644 --- a/streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala +++ b/streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala @@ -45,14 +45,19 @@ trait DStreamSuiteBase extends FunSuite with Logging { val clock = ssc.scheduler.clock if (clock.isInstanceOf[ManualClock]) { - clock.asInstanceOf[ManualClock].addToTime(input.size * batchDuration.milliseconds) + clock.asInstanceOf[ManualClock].addToTime((input.size - 1) * batchDuration.milliseconds) } val startTime = System.currentTimeMillis() while (output.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { + println("output.size = " + output.size + ", expectedOutput.size = " + expectedOutput.size) Thread.sleep(500) } + println("output.size = " + output.size) + println("output") + output.foreach(x => println("[" + x.mkString(",") + "]")) + assert(output.size === expectedOutput.size) for (i <- 0 until output.size) { if (useSet) { |