aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming/dstream.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/streaming/dstream.py')
-rw-r--r--python/pyspark/streaming/dstream.py4
1 files changed, 2 insertions, 2 deletions
diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py
index adc2651740..86447f5e58 100644
--- a/python/pyspark/streaming/dstream.py
+++ b/python/pyspark/streaming/dstream.py
@@ -247,7 +247,7 @@ class DStream(object):
Return a new DStream in which each RDD contains the counts of each
distinct value in each RDD of this DStream.
"""
- return self.map(lambda x: (x, None)).reduceByKey(lambda x, y: None).count()
+ return self.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x+y)
def saveAsTextFiles(self, prefix, suffix=None):
"""
@@ -493,7 +493,7 @@ class DStream(object):
keyed = self.map(lambda x: (x, 1))
counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub,
windowDuration, slideDuration, numPartitions)
- return counted.filter(lambda kv: kv[1] > 0).count()
+ return counted.filter(lambda kv: kv[1] > 0)
def groupByKeyAndWindow(self, windowDuration, slideDuration, numPartitions=None):
"""