aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorseanm <sean.mcnamara@webtrends.com>2013-05-10 12:47:24 -0600
committerseanm <sean.mcnamara@webtrends.com>2013-05-10 12:47:24 -0600
commitb95c1bdbbaeea86152e24b394a03bbbad95989d5 (patch)
tree4faf713c0c39f8e0c91a0022d85f02ecd0c194f2 /streaming
parentb42d68c8ce9f63513969297b65f4b5a2b06e6078 (diff)
downloadspark-b95c1bdbbaeea86152e24b394a03bbbad95989d5.tar.gz
spark-b95c1bdbbaeea86152e24b394a03bbbad95989d5.tar.bz2
spark-b95c1bdbbaeea86152e24b394a03bbbad95989d5.zip
count() now uses a transform instead of ConstantInputDStream
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala5
1 files changed, 1 insertions, 4 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index e3a9247924..e125310861 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -441,10 +441,7 @@ abstract class DStream[T: ClassManifest] (
* Return a new DStream in which each RDD has a single element generated by counting each RDD
* of this DStream.
*/
- def count(): DStream[Long] = {
- val zero = new ConstantInputDStream(context, context.sparkContext.makeRDD(Seq((null, 0L)), 1))
- this.map(_ => (null, 1L)).union(zero).reduceByKey(_ + _).map(_._2)
- }
+ def count(): DStream[Long] = this.map(_ => (null, 1L)).transform(_.union(context.sparkContext.makeRDD(Seq((null, 0L)), 1))).reduceByKey(_ + _).map(_._2)
/**
* Return a new DStream in which each RDD contains the counts of each distinct value in