aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-10-13 21:02:24 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2012-10-13 21:02:24 -0700
commitb08708e6fcb59a09b36c5b8e3e7a4aa98f7ad050 (patch)
tree9499bd6129e3cbdc278e8d6ad1968994d1767511 /streaming
parente95ff45b53bf995d89f1825b9581cc18a083a438 (diff)
downloadspark-b08708e6fcb59a09b36c5b8e3e7a4aa98f7ad050.tar.gz
spark-b08708e6fcb59a09b36c5b8e3e7a4aa98f7ad050.tar.bz2
spark-b08708e6fcb59a09b36c5b8e3e7a4aa98f7ad050.zip
Fixed bugs in the streaming testsuites.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala18
-rw-r--r--streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala7
2 files changed, 18 insertions, 7 deletions
diff --git a/streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala b/streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala
index 9b953d9dae..965b58c03f 100644
--- a/streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala
@@ -39,22 +39,28 @@ class DStreamBasicSuite extends DStreamSuiteBase {
test("stateful operations") {
val inputData =
Seq(
+ Seq("a"),
+ Seq("a", "b"),
Seq("a", "b", "c"),
- Seq("a", "b", "c"),
- Seq("a", "b", "c")
+ Seq("a", "b"),
+ Seq("a"),
+ Seq()
)
val outputData =
Seq(
- Seq(("a", 1), ("b", 1), ("c", 1)),
- Seq(("a", 2), ("b", 2), ("c", 2)),
- Seq(("a", 3), ("b", 3), ("c", 3))
+ Seq(("a", 1)),
+ Seq(("a", 2), ("b", 1)),
+ Seq(("a", 3), ("b", 2), ("c", 1)),
+ Seq(("a", 4), ("b", 3), ("c", 1)),
+ Seq(("a", 5), ("b", 3), ("c", 1)),
+ Seq(("a", 5), ("b", 3), ("c", 1))
)
val updateStateOp = (s: DStream[String]) => {
val updateFunc = (values: Seq[Int], state: RichInt) => {
var newState = 0
- if (values != null) newState += values.reduce(_ + _)
+ if (values != null && values.size > 0) newState += values.reduce(_ + _)
if (state != null) newState += state.self
//println("values = " + values + ", state = " + state + ", " + " new state = " + newState)
new RichInt(newState)
diff --git a/streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala b/streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala
index 1c4ea14b1d..59fe36baf0 100644
--- a/streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala
+++ b/streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala
@@ -45,14 +45,19 @@ trait DStreamSuiteBase extends FunSuite with Logging {
val clock = ssc.scheduler.clock
if (clock.isInstanceOf[ManualClock]) {
- clock.asInstanceOf[ManualClock].addToTime(input.size * batchDuration.milliseconds)
+ clock.asInstanceOf[ManualClock].addToTime((input.size - 1) * batchDuration.milliseconds)
}
val startTime = System.currentTimeMillis()
while (output.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
+ println("output.size = " + output.size + ", expectedOutput.size = " + expectedOutput.size)
Thread.sleep(500)
}
+ println("output.size = " + output.size)
+ println("output")
+ output.foreach(x => println("[" + x.mkString(",") + "]"))
+
assert(output.size === expectedOutput.size)
for (i <- 0 until output.size) {
if (useSet) {