aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-01-13 21:20:49 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-01-13 21:20:49 -0800
commit6cc8592f26553525e11213830b596fc397243439 (patch)
tree99e9d180aefcdfe7b75436ef8b4f459abf03c1ed /streaming
parent0dbd411a562396e024c513936fde46b0d2f6d59d (diff)
downloadspark-6cc8592f26553525e11213830b596fc397243439.tar.gz
spark-6cc8592f26553525e11213830b596fc397243439.tar.bz2
spark-6cc8592f26553525e11213830b596fc397243439.zip
Fixed bug
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala4
1 files changed, 3 insertions, 1 deletions
diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
index fa117cfcf0..f9ba1f20f0 100644
--- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
@@ -224,7 +224,9 @@ class WindowOperationsSuite extends TestSuiteBase {
val windowDuration = Seconds(2)
val slideDuration = Seconds(1)
val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
- val operation = (s: DStream[Int]) => s.countByWindow(windowDuration, slideDuration)
+ val operation = (s: DStream[Int]) => {
+ s.countByWindow(windowDuration, slideDuration).map(_.toInt)
+ }
testOperation(input, operation, expectedOutput, numBatches, true)
}