aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-06-24 21:35:50 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2013-06-24 21:35:50 -0700
commit575aff6b718f9447abd6dde40fd72c66b40774a8 (patch)
tree4faf713c0c39f8e0c91a0022d85f02ecd0c194f2 /streaming
parent8ac9efba5a435443be9abf8ebbe867806d42c9db (diff)
parentb95c1bdbbaeea86152e24b394a03bbbad95989d5 (diff)
downloadspark-575aff6b718f9447abd6dde40fd72c66b40774a8.tar.gz
spark-575aff6b718f9447abd6dde40fd72c66b40774a8.tar.bz2
spark-575aff6b718f9447abd6dde40fd72c66b40774a8.zip
Merge pull request #567 from Reinvigorate/sm-count-fix
Fixing count() in Spark Streaming
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala4
2 files changed, 3 insertions, 3 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index e1be5ef51c..e125310861 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -441,7 +441,7 @@ abstract class DStream[T: ClassManifest] (
* Return a new DStream in which each RDD has a single element generated by counting each RDD
* of this DStream.
*/
- def count(): DStream[Long] = this.map(_ => 1L).reduce(_ + _)
+ def count(): DStream[Long] = this.map(_ => (null, 1L)).transform(_.union(context.sparkContext.makeRDD(Seq((null, 0L)), 1))).reduceByKey(_ + _).map(_._2)
/**
* Return a new DStream in which each RDD contains the counts of each distinct value in
diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
index 8fce91853c..168e1b7a55 100644
--- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
@@ -90,9 +90,9 @@ class BasicOperationsSuite extends TestSuiteBase {
test("count") {
testOperation(
- Seq(1 to 1, 1 to 2, 1 to 3, 1 to 4),
+ Seq(Seq(), 1 to 1, 1 to 2, 1 to 3, 1 to 4),
(s: DStream[Int]) => s.count(),
- Seq(Seq(1L), Seq(2L), Seq(3L), Seq(4L))
+ Seq(Seq(0L), Seq(1L), Seq(2L), Seq(3L), Seq(4L))
)
}