diff options
author | jerryshao <sshao@hortonworks.com> | 2015-12-28 10:43:23 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2015-12-28 10:43:23 +0000 |
commit | 8d4940092141c0909b673607f393cdd53f093ed6 (patch) | |
tree | ab9a08c8d074d8880cb873306d128ee134864433 /python/pyspark/streaming/dstream.py | |
parent | 5aa2710c1e587c340a66ed36af75214aea85a97a (diff) | |
download | spark-8d4940092141c0909b673607f393cdd53f093ed6.tar.gz spark-8d4940092141c0909b673607f393cdd53f093ed6.tar.bz2 spark-8d4940092141c0909b673607f393cdd53f093ed6.zip |
[SPARK-12353][STREAMING][PYSPARK] Fix countByValue inconsistent output in Python API
The semantics of Python countByValue is different from Scala API, it is more like countDistinctValue, so here change to make it consistent with Scala/Java API.
Author: jerryshao <sshao@hortonworks.com>
Closes #10350 from jerryshao/SPARK-12353.
Diffstat (limited to 'python/pyspark/streaming/dstream.py')
-rw-r--r-- | python/pyspark/streaming/dstream.py | 4 |
1 files changed, 2 insertions, 2 deletions
diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index adc2651740..86447f5e58 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -247,7 +247,7 @@ class DStream(object): Return a new DStream in which each RDD contains the counts of each distinct value in each RDD of this DStream. """ - return self.map(lambda x: (x, None)).reduceByKey(lambda x, y: None).count() + return self.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x+y) def saveAsTextFiles(self, prefix, suffix=None): """ @@ -493,7 +493,7 @@ class DStream(object): keyed = self.map(lambda x: (x, 1)) counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub, windowDuration, slideDuration, numPartitions) - return counted.filter(lambda kv: kv[1] > 0).count() + return counted.filter(lambda kv: kv[1] > 0) def groupByKeyAndWindow(self, windowDuration, slideDuration, numPartitions=None): """ |