diff options
author | Davies Liu <davies.liu@gmail.com> | 2014-08-23 19:33:34 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@apache.org> | 2014-08-23 19:33:34 -0700 |
commit | 8df4dad4951ca6e687df1288331909878922a55f (patch) | |
tree | 85712756692df718f235514431bd85321f0b7653 /core | |
parent | db436e36c4e20893de708a0bc07a5a8877c49563 (diff) | |
download | spark-8df4dad4951ca6e687df1288331909878922a55f.tar.gz spark-8df4dad4951ca6e687df1288331909878922a55f.tar.bz2 spark-8df4dad4951ca6e687df1288331909878922a55f.zip |
[SPARK-2871] [PySpark] add approx API for RDD
RDD.countApprox(self, timeout, confidence=0.95)
:: Experimental ::
Approximate version of count() that returns a potentially incomplete
result within a timeout, even if not all tasks have finished.
>>> rdd = sc.parallelize(range(1000), 10)
>>> rdd.countApprox(1000, 1.0)
1000
RDD.sumApprox(self, timeout, confidence=0.95)
Approximate operation to return the sum within a timeout
or meet the confidence.
>>> rdd = sc.parallelize(range(1000), 10)
>>> r = sum(xrange(1000))
>>> (rdd.sumApprox(1000) - r) / r < 0.05
RDD.meanApprox(self, timeout, confidence=0.95)
:: Experimental ::
Approximate operation to return the mean within a timeout
or meet the confidence.
>>> rdd = sc.parallelize(range(1000), 10)
>>> r = sum(xrange(1000)) / 1000.0
>>> (rdd.meanApprox(1000) - r) / r < 0.05
True
Author: Davies Liu <davies.liu@gmail.com>
Closes #2095 from davies/approx and squashes the following commits:
e8c252b [Davies Liu] add approx API for RDD
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 17 |
1 files changed, 17 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 747023812f..ae8010300a 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -749,6 +749,23 @@ private[spark] object PythonRDD extends Logging { } } } + + /** + * Convert an RDD of serialized Python objects to RDD of objects, that is usable by PySpark. + */ + def pythonToJava(pyRDD: JavaRDD[Array[Byte]], batched: Boolean): JavaRDD[Any] = { + pyRDD.rdd.mapPartitions { iter => + val unpickle = new Unpickler + iter.flatMap { row => + val obj = unpickle.loads(row) + if (batched) { + obj.asInstanceOf[JArrayList[_]] + } else { + Seq(obj) + } + } + }.toJavaRDD() + } } private |