From 83d273023b03faa0ceacd69956a132f40d247bc1 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 26 Aug 2014 13:04:30 -0700 Subject: [SPARK-2871] [PySpark] add histgram() API RDD.histogram(buckets) Compute a histogram using the provided buckets. The buckets are all open to the right except for the last which is closed. e.g. [1,10,20,50] means the buckets are [1,10) [10,20) [20,50], which means 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1 and 50 we would have a histogram of 1,0,1. If your histogram is evenly spaced (e.g. [0, 10, 20, 30]), this can be switched from an O(log n) inseration to O(1) per element(where n = # buckets). Buckets must be sorted and not contain any duplicates, must be at least two elements. If `buckets` is a number, it will generates buckets which is evenly spaced between the minimum and maximum of the RDD. For example, if the min value is 0 and the max is 100, given buckets as 2, the resulting buckets will be [0,50) [50,100]. buckets must be at least 1 If the RDD contains infinity, NaN throws an exception If the elements in RDD do not vary (max == min) always returns a single bucket. It will return an tuple of buckets and histogram. >>> rdd = sc.parallelize(range(51)) >>> rdd.histogram(2) ([0, 25, 50], [25, 26]) >>> rdd.histogram([0, 5, 25, 50]) ([0, 5, 25, 50], [5, 20, 26]) >>> rdd.histogram([0, 15, 30, 45, 60], True) ([0, 15, 30, 45, 60], [15, 15, 15, 6]) >>> rdd = sc.parallelize(["ab", "ac", "b", "bd", "ef"]) >>> rdd.histogram(("a", "b", "c")) (('a', 'b', 'c'), [2, 2]) closes #122, it's duplicated. Author: Davies Liu Closes #2091 from davies/histgram and squashes the following commits: a322f8a [Davies Liu] fix deprecation of e.message 84e85fa [Davies Liu] remove evenBuckets, add more tests (including str) d9a0722 [Davies Liu] address comments 0e18a2d [Davies Liu] add histgram() API (cherry picked from commit 3cedc4f4d78e093fd362085e0a077bb9e4f28ca5) Signed-off-by: Josh Rosen --- python/pyspark/rdd.py | 129 +++++++++++++++++++++++++++++++++++++++++++++++- python/pyspark/tests.py | 104 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 232 insertions(+), 1 deletion(-) (limited to 'python') diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index f0706d846d..bfefc26123 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -32,7 +32,7 @@ import warnings import heapq import bisect from random import Random -from math import sqrt, log +from math import sqrt, log, isinf, isnan from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ BatchedSerializer, CloudPickleSerializer, PairDeserializer, \ @@ -856,6 +856,133 @@ class RDD(object): return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc) + def histogram(self, buckets): + """ + Compute a histogram using the provided buckets. The buckets + are all open to the right except for the last which is closed. + e.g. [1,10,20,50] means the buckets are [1,10) [10,20) [20,50], + which means 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1 + and 50 we would have a histogram of 1,0,1. + + If your histogram is evenly spaced (e.g. [0, 10, 20, 30]), + this can be switched from an O(log n) inseration to O(1) per + element(where n = # buckets). + + Buckets must be sorted and not contain any duplicates, must be + at least two elements. + + If `buckets` is a number, it will generates buckets which are + evenly spaced between the minimum and maximum of the RDD. For + example, if the min value is 0 and the max is 100, given buckets + as 2, the resulting buckets will be [0,50) [50,100]. buckets must + be at least 1 If the RDD contains infinity, NaN throws an exception + If the elements in RDD do not vary (max == min) always returns + a single bucket. + + It will return an tuple of buckets and histogram. + + >>> rdd = sc.parallelize(range(51)) + >>> rdd.histogram(2) + ([0, 25, 50], [25, 26]) + >>> rdd.histogram([0, 5, 25, 50]) + ([0, 5, 25, 50], [5, 20, 26]) + >>> rdd.histogram([0, 15, 30, 45, 60]) # evenly spaced buckets + ([0, 15, 30, 45, 60], [15, 15, 15, 6]) + >>> rdd = sc.parallelize(["ab", "ac", "b", "bd", "ef"]) + >>> rdd.histogram(("a", "b", "c")) + (('a', 'b', 'c'), [2, 2]) + """ + + if isinstance(buckets, (int, long)): + if buckets < 1: + raise ValueError("number of buckets must be >= 1") + + # filter out non-comparable elements + def comparable(x): + if x is None: + return False + if type(x) is float and isnan(x): + return False + return True + + filtered = self.filter(comparable) + + # faster than stats() + def minmax(a, b): + return min(a[0], b[0]), max(a[1], b[1]) + try: + minv, maxv = filtered.map(lambda x: (x, x)).reduce(minmax) + except TypeError as e: + if " empty " in str(e): + raise ValueError("can not generate buckets from empty RDD") + raise + + if minv == maxv or buckets == 1: + return [minv, maxv], [filtered.count()] + + try: + inc = (maxv - minv) / buckets + except TypeError: + raise TypeError("Can not generate buckets with non-number in RDD") + + if isinf(inc): + raise ValueError("Can not generate buckets with infinite value") + + # keep them as integer if possible + if inc * buckets != maxv - minv: + inc = (maxv - minv) * 1.0 / buckets + + buckets = [i * inc + minv for i in range(buckets)] + buckets.append(maxv) # fix accumulated error + even = True + + elif isinstance(buckets, (list, tuple)): + if len(buckets) < 2: + raise ValueError("buckets should have more than one value") + + if any(i is None or isinstance(i, float) and isnan(i) for i in buckets): + raise ValueError("can not have None or NaN in buckets") + + if sorted(buckets) != list(buckets): + raise ValueError("buckets should be sorted") + + if len(set(buckets)) != len(buckets): + raise ValueError("buckets should not contain duplicated values") + + minv = buckets[0] + maxv = buckets[-1] + even = False + inc = None + try: + steps = [buckets[i + 1] - buckets[i] for i in range(len(buckets) - 1)] + except TypeError: + pass # objects in buckets do not support '-' + else: + if max(steps) - min(steps) < 1e-10: # handle precision errors + even = True + inc = (maxv - minv) / (len(buckets) - 1) + + else: + raise TypeError("buckets should be a list or tuple or number(int or long)") + + def histogram(iterator): + counters = [0] * len(buckets) + for i in iterator: + if i is None or (type(i) is float and isnan(i)) or i > maxv or i < minv: + continue + t = (int((i - minv) / inc) if even + else bisect.bisect_right(buckets, i) - 1) + counters[t] += 1 + # add last two together + last = counters.pop() + counters[-1] += last + return [counters] + + def mergeCounters(a, b): + return [i + j for i, j in zip(a, b)] + + return buckets, self.mapPartitions(histogram).reduce(mergeCounters) + def mean(self): """ Compute the mean of this RDD's elements. diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 51bfbb47e5..1db922f513 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -364,6 +364,110 @@ class TestRDDFunctions(PySparkTestCase): self.assertEquals(a.count(), b.count()) self.assertRaises(Exception, lambda: a.zip(b).count()) + def test_histogram(self): + # empty + rdd = self.sc.parallelize([]) + self.assertEquals([0], rdd.histogram([0, 10])[1]) + self.assertEquals([0, 0], rdd.histogram([0, 4, 10])[1]) + self.assertRaises(ValueError, lambda: rdd.histogram(1)) + + # out of range + rdd = self.sc.parallelize([10.01, -0.01]) + self.assertEquals([0], rdd.histogram([0, 10])[1]) + self.assertEquals([0, 0], rdd.histogram((0, 4, 10))[1]) + + # in range with one bucket + rdd = self.sc.parallelize(range(1, 5)) + self.assertEquals([4], rdd.histogram([0, 10])[1]) + self.assertEquals([3, 1], rdd.histogram([0, 4, 10])[1]) + + # in range with one bucket exact match + self.assertEquals([4], rdd.histogram([1, 4])[1]) + + # out of range with two buckets + rdd = self.sc.parallelize([10.01, -0.01]) + self.assertEquals([0, 0], rdd.histogram([0, 5, 10])[1]) + + # out of range with two uneven buckets + rdd = self.sc.parallelize([10.01, -0.01]) + self.assertEquals([0, 0], rdd.histogram([0, 4, 10])[1]) + + # in range with two buckets + rdd = self.sc.parallelize([1, 2, 3, 5, 6]) + self.assertEquals([3, 2], rdd.histogram([0, 5, 10])[1]) + + # in range with two bucket and None + rdd = self.sc.parallelize([1, 2, 3, 5, 6, None, float('nan')]) + self.assertEquals([3, 2], rdd.histogram([0, 5, 10])[1]) + + # in range with two uneven buckets + rdd = self.sc.parallelize([1, 2, 3, 5, 6]) + self.assertEquals([3, 2], rdd.histogram([0, 5, 11])[1]) + + # mixed range with two uneven buckets + rdd = self.sc.parallelize([-0.01, 0.0, 1, 2, 3, 5, 6, 11.0, 11.01]) + self.assertEquals([4, 3], rdd.histogram([0, 5, 11])[1]) + + # mixed range with four uneven buckets + rdd = self.sc.parallelize([-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0, 200.0, 200.1]) + self.assertEquals([4, 2, 1, 3], rdd.histogram([0.0, 5.0, 11.0, 12.0, 200.0])[1]) + + # mixed range with uneven buckets and NaN + rdd = self.sc.parallelize([-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, + 199.0, 200.0, 200.1, None, float('nan')]) + self.assertEquals([4, 2, 1, 3], rdd.histogram([0.0, 5.0, 11.0, 12.0, 200.0])[1]) + + # out of range with infinite buckets + rdd = self.sc.parallelize([10.01, -0.01, float('nan'), float("inf")]) + self.assertEquals([1, 2], rdd.histogram([float('-inf'), 0, float('inf')])[1]) + + # invalid buckets + self.assertRaises(ValueError, lambda: rdd.histogram([])) + self.assertRaises(ValueError, lambda: rdd.histogram([1])) + self.assertRaises(ValueError, lambda: rdd.histogram(0)) + self.assertRaises(TypeError, lambda: rdd.histogram({})) + + # without buckets + rdd = self.sc.parallelize(range(1, 5)) + self.assertEquals(([1, 4], [4]), rdd.histogram(1)) + + # without buckets single element + rdd = self.sc.parallelize([1]) + self.assertEquals(([1, 1], [1]), rdd.histogram(1)) + + # without bucket no range + rdd = self.sc.parallelize([1] * 4) + self.assertEquals(([1, 1], [4]), rdd.histogram(1)) + + # without buckets basic two + rdd = self.sc.parallelize(range(1, 5)) + self.assertEquals(([1, 2.5, 4], [2, 2]), rdd.histogram(2)) + + # without buckets with more requested than elements + rdd = self.sc.parallelize([1, 2]) + buckets = [1 + 0.2 * i for i in range(6)] + hist = [1, 0, 0, 0, 1] + self.assertEquals((buckets, hist), rdd.histogram(5)) + + # invalid RDDs + rdd = self.sc.parallelize([1, float('inf')]) + self.assertRaises(ValueError, lambda: rdd.histogram(2)) + rdd = self.sc.parallelize([float('nan')]) + self.assertRaises(ValueError, lambda: rdd.histogram(2)) + + # string + rdd = self.sc.parallelize(["ab", "ac", "b", "bd", "ef"], 2) + self.assertEquals([2, 2], rdd.histogram(["a", "b", "c"])[1]) + self.assertEquals((["ab", "ef"], [5]), rdd.histogram(1)) + self.assertRaises(TypeError, lambda: rdd.histogram(2)) + + # mixed RDD + rdd = self.sc.parallelize([1, 4, "ab", "ac", "b"], 2) + self.assertEquals([1, 1], rdd.histogram([0, 4, 10])[1]) + self.assertEquals([2, 1], rdd.histogram(["a", "b", "c"])[1]) + self.assertEquals(([1, "b"], [5]), rdd.histogram(1)) + self.assertRaises(TypeError, lambda: rdd.histogram(2)) + class TestIO(PySparkTestCase): -- cgit v1.2.3