diff options
author | seanm <sean.mcnamara@webtrends.com> | 2013-04-15 12:54:55 -0600 |
---|---|---|
committer | seanm <sean.mcnamara@webtrends.com> | 2013-04-15 12:54:55 -0600 |
commit | b42d68c8ce9f63513969297b65f4b5a2b06e6078 (patch) | |
tree | fbdb34a85e14e73afd3575fbca957d5cfe8a85fc /streaming | |
parent | 8ac9efba5a435443be9abf8ebbe867806d42c9db (diff) | |
download | spark-b42d68c8ce9f63513969297b65f4b5a2b06e6078.tar.gz spark-b42d68c8ce9f63513969297b65f4b5a2b06e6078.tar.bz2 spark-b42d68c8ce9f63513969297b65f4b5a2b06e6078.zip |
fixing Spark Streaming count() so that 0 will be emitted when there is nothing to count
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/DStream.scala | 5 | ||||
-rw-r--r-- | streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala | 4 |
2 files changed, 6 insertions, 3 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index e1be5ef51c..e3a9247924 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -441,7 +441,10 @@ abstract class DStream[T: ClassManifest] ( * Return a new DStream in which each RDD has a single element generated by counting each RDD * of this DStream. */ - def count(): DStream[Long] = this.map(_ => 1L).reduce(_ + _) + def count(): DStream[Long] = { + val zero = new ConstantInputDStream(context, context.sparkContext.makeRDD(Seq((null, 0L)), 1)) + this.map(_ => (null, 1L)).union(zero).reduceByKey(_ + _).map(_._2) + } /** * Return a new DStream in which each RDD contains the counts of each distinct value in diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala index 8fce91853c..168e1b7a55 100644 --- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala @@ -90,9 +90,9 @@ class BasicOperationsSuite extends TestSuiteBase { test("count") { testOperation( - Seq(1 to 1, 1 to 2, 1 to 3, 1 to 4), + Seq(Seq(), 1 to 1, 1 to 2, 1 to 3, 1 to 4), (s: DStream[Int]) => s.count(), - Seq(Seq(1L), Seq(2L), Seq(3L), Seq(4L)) + Seq(Seq(0L), Seq(1L), Seq(2L), Seq(3L), Seq(4L)) ) } |