aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorDavies Liu <davies.liu@gmail.com>2014-08-23 19:33:34 -0700
committerJosh Rosen <joshrosen@apache.org>2014-08-23 19:33:34 -0700
commit8df4dad4951ca6e687df1288331909878922a55f (patch)
tree85712756692df718f235514431bd85321f0b7653 /python
parentdb436e36c4e20893de708a0bc07a5a8877c49563 (diff)
downloadspark-8df4dad4951ca6e687df1288331909878922a55f.tar.gz
spark-8df4dad4951ca6e687df1288331909878922a55f.tar.bz2
spark-8df4dad4951ca6e687df1288331909878922a55f.zip
[SPARK-2871] [PySpark] add approx API for RDD
RDD.countApprox(self, timeout, confidence=0.95) :: Experimental :: Approximate version of count() that returns a potentially incomplete result within a timeout, even if not all tasks have finished. >>> rdd = sc.parallelize(range(1000), 10) >>> rdd.countApprox(1000, 1.0) 1000 RDD.sumApprox(self, timeout, confidence=0.95) Approximate operation to return the sum within a timeout or meet the confidence. >>> rdd = sc.parallelize(range(1000), 10) >>> r = sum(xrange(1000)) >>> (rdd.sumApprox(1000) - r) / r < 0.05 RDD.meanApprox(self, timeout, confidence=0.95) :: Experimental :: Approximate operation to return the mean within a timeout or meet the confidence. >>> rdd = sc.parallelize(range(1000), 10) >>> r = sum(xrange(1000)) / 1000.0 >>> (rdd.meanApprox(1000) - r) / r < 0.05 True Author: Davies Liu <davies.liu@gmail.com> Closes #2095 from davies/approx and squashes the following commits: e8c252b [Davies Liu] add approx API for RDD
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/rdd.py81
1 files changed, 81 insertions, 0 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index bdd8bc8286..9f88340d03 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -131,6 +131,22 @@ class _JavaStackTrace(object):
self._context._jsc.setCallSite(None)
+class BoundedFloat(float):
+ """
+ Bounded value is generated by approximate job, with confidence and low
+ bound and high bound.
+
+ >>> BoundedFloat(100.0, 0.95, 95.0, 105.0)
+ 100.0
+ """
+ def __new__(cls, mean, confidence, low, high):
+ obj = float.__new__(cls, mean)
+ obj.confidence = confidence
+ obj.low = low
+ obj.high = high
+ return obj
+
+
class MaxHeapQ(object):
"""
@@ -1792,6 +1808,71 @@ class RDD(object):
# keys in the pairs. This could be an expensive operation, since those
# hashes aren't retained.
+ def _is_pickled(self):
+ """ Return this RDD is serialized by Pickle or not. """
+ der = self._jrdd_deserializer
+ if isinstance(der, PickleSerializer):
+ return True
+ if isinstance(der, BatchedSerializer) and isinstance(der.serializer, PickleSerializer):
+ return True
+ return False
+
+ def _to_jrdd(self):
+ """ Return an JavaRDD of Object by unpickling
+
+ It will convert each Python object into Java object by Pyrolite, whenever the
+ RDD is serialized in batch or not.
+ """
+ if not self._is_pickled():
+ self = self._reserialize(BatchedSerializer(PickleSerializer(), 1024))
+ batched = isinstance(self._jrdd_deserializer, BatchedSerializer)
+ return self.ctx._jvm.PythonRDD.pythonToJava(self._jrdd, batched)
+
+ def countApprox(self, timeout, confidence=0.95):
+ """
+ :: Experimental ::
+ Approximate version of count() that returns a potentially incomplete
+ result within a timeout, even if not all tasks have finished.
+
+ >>> rdd = sc.parallelize(range(1000), 10)
+ >>> rdd.countApprox(1000, 1.0)
+ 1000
+ """
+ drdd = self.mapPartitions(lambda it: [float(sum(1 for i in it))])
+ return int(drdd.sumApprox(timeout, confidence))
+
+ def sumApprox(self, timeout, confidence=0.95):
+ """
+ :: Experimental ::
+ Approximate operation to return the sum within a timeout
+ or meet the confidence.
+
+ >>> rdd = sc.parallelize(range(1000), 10)
+ >>> r = sum(xrange(1000))
+ >>> (rdd.sumApprox(1000) - r) / r < 0.05
+ True
+ """
+ jrdd = self.mapPartitions(lambda it: [float(sum(it))])._to_jrdd()
+ jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd())
+ r = jdrdd.sumApprox(timeout, confidence).getFinalValue()
+ return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high())
+
+ def meanApprox(self, timeout, confidence=0.95):
+ """
+ :: Experimental ::
+ Approximate operation to return the mean within a timeout
+ or meet the confidence.
+
+ >>> rdd = sc.parallelize(range(1000), 10)
+ >>> r = sum(xrange(1000)) / 1000.0
+ >>> (rdd.meanApprox(1000) - r) / r < 0.05
+ True
+ """
+ jrdd = self.map(float)._to_jrdd()
+ jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd())
+ r = jdrdd.meanApprox(timeout, confidence).getFinalValue()
+ return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high())
+
class PipelinedRDD(RDD):