aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/tests.py')
-rw-r--r--python/pyspark/tests.py104
1 files changed, 104 insertions, 0 deletions
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 51bfbb47e5..1db922f513 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -364,6 +364,110 @@ class TestRDDFunctions(PySparkTestCase):
self.assertEquals(a.count(), b.count())
self.assertRaises(Exception, lambda: a.zip(b).count())
+ def test_histogram(self):
+ # empty
+ rdd = self.sc.parallelize([])
+ self.assertEquals([0], rdd.histogram([0, 10])[1])
+ self.assertEquals([0, 0], rdd.histogram([0, 4, 10])[1])
+ self.assertRaises(ValueError, lambda: rdd.histogram(1))
+
+ # out of range
+ rdd = self.sc.parallelize([10.01, -0.01])
+ self.assertEquals([0], rdd.histogram([0, 10])[1])
+ self.assertEquals([0, 0], rdd.histogram((0, 4, 10))[1])
+
+ # in range with one bucket
+ rdd = self.sc.parallelize(range(1, 5))
+ self.assertEquals([4], rdd.histogram([0, 10])[1])
+ self.assertEquals([3, 1], rdd.histogram([0, 4, 10])[1])
+
+ # in range with one bucket exact match
+ self.assertEquals([4], rdd.histogram([1, 4])[1])
+
+ # out of range with two buckets
+ rdd = self.sc.parallelize([10.01, -0.01])
+ self.assertEquals([0, 0], rdd.histogram([0, 5, 10])[1])
+
+ # out of range with two uneven buckets
+ rdd = self.sc.parallelize([10.01, -0.01])
+ self.assertEquals([0, 0], rdd.histogram([0, 4, 10])[1])
+
+ # in range with two buckets
+ rdd = self.sc.parallelize([1, 2, 3, 5, 6])
+ self.assertEquals([3, 2], rdd.histogram([0, 5, 10])[1])
+
+ # in range with two bucket and None
+ rdd = self.sc.parallelize([1, 2, 3, 5, 6, None, float('nan')])
+ self.assertEquals([3, 2], rdd.histogram([0, 5, 10])[1])
+
+ # in range with two uneven buckets
+ rdd = self.sc.parallelize([1, 2, 3, 5, 6])
+ self.assertEquals([3, 2], rdd.histogram([0, 5, 11])[1])
+
+ # mixed range with two uneven buckets
+ rdd = self.sc.parallelize([-0.01, 0.0, 1, 2, 3, 5, 6, 11.0, 11.01])
+ self.assertEquals([4, 3], rdd.histogram([0, 5, 11])[1])
+
+ # mixed range with four uneven buckets
+ rdd = self.sc.parallelize([-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0, 200.0, 200.1])
+ self.assertEquals([4, 2, 1, 3], rdd.histogram([0.0, 5.0, 11.0, 12.0, 200.0])[1])
+
+ # mixed range with uneven buckets and NaN
+ rdd = self.sc.parallelize([-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0,
+ 199.0, 200.0, 200.1, None, float('nan')])
+ self.assertEquals([4, 2, 1, 3], rdd.histogram([0.0, 5.0, 11.0, 12.0, 200.0])[1])
+
+ # out of range with infinite buckets
+ rdd = self.sc.parallelize([10.01, -0.01, float('nan'), float("inf")])
+ self.assertEquals([1, 2], rdd.histogram([float('-inf'), 0, float('inf')])[1])
+
+ # invalid buckets
+ self.assertRaises(ValueError, lambda: rdd.histogram([]))
+ self.assertRaises(ValueError, lambda: rdd.histogram([1]))
+ self.assertRaises(ValueError, lambda: rdd.histogram(0))
+ self.assertRaises(TypeError, lambda: rdd.histogram({}))
+
+ # without buckets
+ rdd = self.sc.parallelize(range(1, 5))
+ self.assertEquals(([1, 4], [4]), rdd.histogram(1))
+
+ # without buckets single element
+ rdd = self.sc.parallelize([1])
+ self.assertEquals(([1, 1], [1]), rdd.histogram(1))
+
+ # without bucket no range
+ rdd = self.sc.parallelize([1] * 4)
+ self.assertEquals(([1, 1], [4]), rdd.histogram(1))
+
+ # without buckets basic two
+ rdd = self.sc.parallelize(range(1, 5))
+ self.assertEquals(([1, 2.5, 4], [2, 2]), rdd.histogram(2))
+
+ # without buckets with more requested than elements
+ rdd = self.sc.parallelize([1, 2])
+ buckets = [1 + 0.2 * i for i in range(6)]
+ hist = [1, 0, 0, 0, 1]
+ self.assertEquals((buckets, hist), rdd.histogram(5))
+
+ # invalid RDDs
+ rdd = self.sc.parallelize([1, float('inf')])
+ self.assertRaises(ValueError, lambda: rdd.histogram(2))
+ rdd = self.sc.parallelize([float('nan')])
+ self.assertRaises(ValueError, lambda: rdd.histogram(2))
+
+ # string
+ rdd = self.sc.parallelize(["ab", "ac", "b", "bd", "ef"], 2)
+ self.assertEquals([2, 2], rdd.histogram(["a", "b", "c"])[1])
+ self.assertEquals((["ab", "ef"], [5]), rdd.histogram(1))
+ self.assertRaises(TypeError, lambda: rdd.histogram(2))
+
+ # mixed RDD
+ rdd = self.sc.parallelize([1, 4, "ab", "ac", "b"], 2)
+ self.assertEquals([1, 1], rdd.histogram([0, 4, 10])[1])
+ self.assertEquals([2, 1], rdd.histogram(["a", "b", "c"])[1])
+ self.assertEquals(([1, "b"], [5]), rdd.histogram(1))
+ self.assertRaises(TypeError, lambda: rdd.histogram(2))
+
class TestIO(PySparkTestCase):