aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorDavies Liu <davies.liu@gmail.com>2014-08-23 19:33:34 -0700
committerJosh Rosen <joshrosen@apache.org>2014-08-23 19:33:34 -0700
commit8df4dad4951ca6e687df1288331909878922a55f (patch)
tree85712756692df718f235514431bd85321f0b7653 /core
parentdb436e36c4e20893de708a0bc07a5a8877c49563 (diff)
downloadspark-8df4dad4951ca6e687df1288331909878922a55f.tar.gz
spark-8df4dad4951ca6e687df1288331909878922a55f.tar.bz2
spark-8df4dad4951ca6e687df1288331909878922a55f.zip
[SPARK-2871] [PySpark] add approx API for RDD
RDD.countApprox(self, timeout, confidence=0.95) :: Experimental :: Approximate version of count() that returns a potentially incomplete result within a timeout, even if not all tasks have finished. >>> rdd = sc.parallelize(range(1000), 10) >>> rdd.countApprox(1000, 1.0) 1000 RDD.sumApprox(self, timeout, confidence=0.95) Approximate operation to return the sum within a timeout or meet the confidence. >>> rdd = sc.parallelize(range(1000), 10) >>> r = sum(xrange(1000)) >>> (rdd.sumApprox(1000) - r) / r < 0.05 RDD.meanApprox(self, timeout, confidence=0.95) :: Experimental :: Approximate operation to return the mean within a timeout or meet the confidence. >>> rdd = sc.parallelize(range(1000), 10) >>> r = sum(xrange(1000)) / 1000.0 >>> (rdd.meanApprox(1000) - r) / r < 0.05 True Author: Davies Liu <davies.liu@gmail.com> Closes #2095 from davies/approx and squashes the following commits: e8c252b [Davies Liu] add approx API for RDD
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala17
1 files changed, 17 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 747023812f..ae8010300a 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -749,6 +749,23 @@ private[spark] object PythonRDD extends Logging {
}
}
}
+
+ /**
+ * Convert an RDD of serialized Python objects to RDD of objects, that is usable by PySpark.
+ */
+ def pythonToJava(pyRDD: JavaRDD[Array[Byte]], batched: Boolean): JavaRDD[Any] = {
+ pyRDD.rdd.mapPartitions { iter =>
+ val unpickle = new Unpickler
+ iter.flatMap { row =>
+ val obj = unpickle.loads(row)
+ if (batched) {
+ obj.asInstanceOf[JArrayList[_]]
+ } else {
+ Seq(obj)
+ }
+ }
+ }.toJavaRDD()
+ }
}
private