aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/rdd.py
diff options
context:
space:
mode:
authorDavies Liu <davies.liu@gmail.com>2014-08-26 13:04:30 -0700
committerJosh Rosen <joshrosen@apache.org>2014-08-26 13:04:30 -0700
commit3cedc4f4d78e093fd362085e0a077bb9e4f28ca5 (patch)
tree36d5201e77ca023c384eed78e72e60fccc639163 /python/pyspark/rdd.py
parent8856c3d86009295be871989a5dc7270f31b420cd (diff)
downloadspark-3cedc4f4d78e093fd362085e0a077bb9e4f28ca5.tar.gz
spark-3cedc4f4d78e093fd362085e0a077bb9e4f28ca5.tar.bz2
spark-3cedc4f4d78e093fd362085e0a077bb9e4f28ca5.zip
[SPARK-2871] [PySpark] add histgram() API
RDD.histogram(buckets) Compute a histogram using the provided buckets. The buckets are all open to the right except for the last which is closed. e.g. [1,10,20,50] means the buckets are [1,10) [10,20) [20,50], which means 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1 and 50 we would have a histogram of 1,0,1. If your histogram is evenly spaced (e.g. [0, 10, 20, 30]), this can be switched from an O(log n) inseration to O(1) per element(where n = # buckets). Buckets must be sorted and not contain any duplicates, must be at least two elements. If `buckets` is a number, it will generates buckets which is evenly spaced between the minimum and maximum of the RDD. For example, if the min value is 0 and the max is 100, given buckets as 2, the resulting buckets will be [0,50) [50,100]. buckets must be at least 1 If the RDD contains infinity, NaN throws an exception If the elements in RDD do not vary (max == min) always returns a single bucket. It will return an tuple of buckets and histogram. >>> rdd = sc.parallelize(range(51)) >>> rdd.histogram(2) ([0, 25, 50], [25, 26]) >>> rdd.histogram([0, 5, 25, 50]) ([0, 5, 25, 50], [5, 20, 26]) >>> rdd.histogram([0, 15, 30, 45, 60], True) ([0, 15, 30, 45, 60], [15, 15, 15, 6]) >>> rdd = sc.parallelize(["ab", "ac", "b", "bd", "ef"]) >>> rdd.histogram(("a", "b", "c")) (('a', 'b', 'c'), [2, 2]) closes #122, it's duplicated. Author: Davies Liu <davies.liu@gmail.com> Closes #2091 from davies/histgram and squashes the following commits: a322f8a [Davies Liu] fix deprecation of e.message 84e85fa [Davies Liu] remove evenBuckets, add more tests (including str) d9a0722 [Davies Liu] address comments 0e18a2d [Davies Liu] add histgram() API
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r--python/pyspark/rdd.py129
1 files changed, 128 insertions, 1 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 1374f74968..3a2e7649e6 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -32,7 +32,7 @@ import warnings
import heapq
import bisect
from random import Random
-from math import sqrt, log
+from math import sqrt, log, isinf, isnan
from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
BatchedSerializer, CloudPickleSerializer, PairDeserializer, \
@@ -886,6 +886,133 @@ class RDD(object):
return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc)
+ def histogram(self, buckets):
+ """
+ Compute a histogram using the provided buckets. The buckets
+ are all open to the right except for the last which is closed.
+ e.g. [1,10,20,50] means the buckets are [1,10) [10,20) [20,50],
+ which means 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1
+ and 50 we would have a histogram of 1,0,1.
+
+ If your histogram is evenly spaced (e.g. [0, 10, 20, 30]),
+ this can be switched from an O(log n) inseration to O(1) per
+ element(where n = # buckets).
+
+ Buckets must be sorted and not contain any duplicates, must be
+ at least two elements.
+
+ If `buckets` is a number, it will generates buckets which are
+ evenly spaced between the minimum and maximum of the RDD. For
+ example, if the min value is 0 and the max is 100, given buckets
+ as 2, the resulting buckets will be [0,50) [50,100]. buckets must
+ be at least 1 If the RDD contains infinity, NaN throws an exception
+ If the elements in RDD do not vary (max == min) always returns
+ a single bucket.
+
+ It will return an tuple of buckets and histogram.
+
+ >>> rdd = sc.parallelize(range(51))
+ >>> rdd.histogram(2)
+ ([0, 25, 50], [25, 26])
+ >>> rdd.histogram([0, 5, 25, 50])
+ ([0, 5, 25, 50], [5, 20, 26])
+ >>> rdd.histogram([0, 15, 30, 45, 60]) # evenly spaced buckets
+ ([0, 15, 30, 45, 60], [15, 15, 15, 6])
+ >>> rdd = sc.parallelize(["ab", "ac", "b", "bd", "ef"])
+ >>> rdd.histogram(("a", "b", "c"))
+ (('a', 'b', 'c'), [2, 2])
+ """
+
+ if isinstance(buckets, (int, long)):
+ if buckets < 1:
+ raise ValueError("number of buckets must be >= 1")
+
+ # filter out non-comparable elements
+ def comparable(x):
+ if x is None:
+ return False
+ if type(x) is float and isnan(x):
+ return False
+ return True
+
+ filtered = self.filter(comparable)
+
+ # faster than stats()
+ def minmax(a, b):
+ return min(a[0], b[0]), max(a[1], b[1])
+ try:
+ minv, maxv = filtered.map(lambda x: (x, x)).reduce(minmax)
+ except TypeError as e:
+ if " empty " in str(e):
+ raise ValueError("can not generate buckets from empty RDD")
+ raise
+
+ if minv == maxv or buckets == 1:
+ return [minv, maxv], [filtered.count()]
+
+ try:
+ inc = (maxv - minv) / buckets
+ except TypeError:
+ raise TypeError("Can not generate buckets with non-number in RDD")
+
+ if isinf(inc):
+ raise ValueError("Can not generate buckets with infinite value")
+
+ # keep them as integer if possible
+ if inc * buckets != maxv - minv:
+ inc = (maxv - minv) * 1.0 / buckets
+
+ buckets = [i * inc + minv for i in range(buckets)]
+ buckets.append(maxv) # fix accumulated error
+ even = True
+
+ elif isinstance(buckets, (list, tuple)):
+ if len(buckets) < 2:
+ raise ValueError("buckets should have more than one value")
+
+ if any(i is None or isinstance(i, float) and isnan(i) for i in buckets):
+ raise ValueError("can not have None or NaN in buckets")
+
+ if sorted(buckets) != list(buckets):
+ raise ValueError("buckets should be sorted")
+
+ if len(set(buckets)) != len(buckets):
+ raise ValueError("buckets should not contain duplicated values")
+
+ minv = buckets[0]
+ maxv = buckets[-1]
+ even = False
+ inc = None
+ try:
+ steps = [buckets[i + 1] - buckets[i] for i in range(len(buckets) - 1)]
+ except TypeError:
+ pass # objects in buckets do not support '-'
+ else:
+ if max(steps) - min(steps) < 1e-10: # handle precision errors
+ even = True
+ inc = (maxv - minv) / (len(buckets) - 1)
+
+ else:
+ raise TypeError("buckets should be a list or tuple or number(int or long)")
+
+ def histogram(iterator):
+ counters = [0] * len(buckets)
+ for i in iterator:
+ if i is None or (type(i) is float and isnan(i)) or i > maxv or i < minv:
+ continue
+ t = (int((i - minv) / inc) if even
+ else bisect.bisect_right(buckets, i) - 1)
+ counters[t] += 1
+ # add last two together
+ last = counters.pop()
+ counters[-1] += last
+ return [counters]
+
+ def mergeCounters(a, b):
+ return [i + j for i, j in zip(a, b)]
+
+ return buckets, self.mapPartitions(histogram).reduce(mergeCounters)
+
def mean(self):
"""
Compute the mean of this RDD's elements.