aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/mllib/random.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/mllib/random.py')
-rw-r--r--python/pyspark/mllib/random.py54
1 files changed, 34 insertions, 20 deletions
diff --git a/python/pyspark/mllib/random.py b/python/pyspark/mllib/random.py
index d53c95fd59..a787e4dea2 100644
--- a/python/pyspark/mllib/random.py
+++ b/python/pyspark/mllib/random.py
@@ -19,15 +19,32 @@
Python package for random data generation.
"""
+from functools import wraps
from pyspark.rdd import RDD
-from pyspark.mllib._common import _deserialize_double, _deserialize_double_vector
-from pyspark.serializers import NoOpSerializer
+from pyspark.serializers import BatchedSerializer, PickleSerializer
__all__ = ['RandomRDDs', ]
+def serialize(f):
+ @wraps(f)
+ def func(sc, *a, **kw):
+ jrdd = f(sc, *a, **kw)
+ return RDD(sc._jvm.PythonRDD.javaToPython(jrdd), sc,
+ BatchedSerializer(PickleSerializer(), 1024))
+ return func
+
+
+def toArray(f):
+ @wraps(f)
+ def func(sc, *a, **kw):
+ rdd = f(sc, *a, **kw)
+ return rdd.map(lambda vec: vec.toArray())
+ return func
+
+
class RandomRDDs(object):
"""
Generator methods for creating RDDs comprised of i.i.d samples from
@@ -35,6 +52,7 @@ class RandomRDDs(object):
"""
@staticmethod
+ @serialize
def uniformRDD(sc, size, numPartitions=None, seed=None):
"""
Generates an RDD comprised of i.i.d. samples from the
@@ -56,11 +74,10 @@ class RandomRDDs(object):
>>> parts == sc.defaultParallelism
True
"""
- jrdd = sc._jvm.PythonMLLibAPI().uniformRDD(sc._jsc, size, numPartitions, seed)
- uniform = RDD(jrdd, sc, NoOpSerializer())
- return uniform.map(lambda bytes: _deserialize_double(bytearray(bytes)))
+ return sc._jvm.PythonMLLibAPI().uniformRDD(sc._jsc, size, numPartitions, seed)
@staticmethod
+ @serialize
def normalRDD(sc, size, numPartitions=None, seed=None):
"""
Generates an RDD comprised of i.i.d. samples from the standard normal
@@ -80,11 +97,10 @@ class RandomRDDs(object):
>>> abs(stats.stdev() - 1.0) < 0.1
True
"""
- jrdd = sc._jvm.PythonMLLibAPI().normalRDD(sc._jsc, size, numPartitions, seed)
- normal = RDD(jrdd, sc, NoOpSerializer())
- return normal.map(lambda bytes: _deserialize_double(bytearray(bytes)))
+ return sc._jvm.PythonMLLibAPI().normalRDD(sc._jsc, size, numPartitions, seed)
@staticmethod
+ @serialize
def poissonRDD(sc, mean, size, numPartitions=None, seed=None):
"""
Generates an RDD comprised of i.i.d. samples from the Poisson
@@ -101,11 +117,11 @@ class RandomRDDs(object):
>>> abs(stats.stdev() - sqrt(mean)) < 0.5
True
"""
- jrdd = sc._jvm.PythonMLLibAPI().poissonRDD(sc._jsc, mean, size, numPartitions, seed)
- poisson = RDD(jrdd, sc, NoOpSerializer())
- return poisson.map(lambda bytes: _deserialize_double(bytearray(bytes)))
+ return sc._jvm.PythonMLLibAPI().poissonRDD(sc._jsc, mean, size, numPartitions, seed)
@staticmethod
+ @toArray
+ @serialize
def uniformVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None):
"""
Generates an RDD comprised of vectors containing i.i.d. samples drawn
@@ -120,12 +136,12 @@ class RandomRDDs(object):
>>> RandomRDDs.uniformVectorRDD(sc, 10, 10, 4).getNumPartitions()
4
"""
- jrdd = sc._jvm.PythonMLLibAPI() \
+ return sc._jvm.PythonMLLibAPI() \
.uniformVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed)
- uniform = RDD(jrdd, sc, NoOpSerializer())
- return uniform.map(lambda bytes: _deserialize_double_vector(bytearray(bytes)))
@staticmethod
+ @toArray
+ @serialize
def normalVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None):
"""
Generates an RDD comprised of vectors containing i.i.d. samples drawn
@@ -140,12 +156,12 @@ class RandomRDDs(object):
>>> abs(mat.std() - 1.0) < 0.1
True
"""
- jrdd = sc._jvm.PythonMLLibAPI() \
+ return sc._jvm.PythonMLLibAPI() \
.normalVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed)
- normal = RDD(jrdd, sc, NoOpSerializer())
- return normal.map(lambda bytes: _deserialize_double_vector(bytearray(bytes)))
@staticmethod
+ @toArray
+ @serialize
def poissonVectorRDD(sc, mean, numRows, numCols, numPartitions=None, seed=None):
"""
Generates an RDD comprised of vectors containing i.i.d. samples drawn
@@ -163,10 +179,8 @@ class RandomRDDs(object):
>>> abs(mat.std() - sqrt(mean)) < 0.5
True
"""
- jrdd = sc._jvm.PythonMLLibAPI() \
+ return sc._jvm.PythonMLLibAPI() \
.poissonVectorRDD(sc._jsc, mean, numRows, numCols, numPartitions, seed)
- poisson = RDD(jrdd, sc, NoOpSerializer())
- return poisson.map(lambda bytes: _deserialize_double_vector(bytearray(bytes)))
def _test():