diff options
Diffstat (limited to 'python/pyspark/mllib/random.py')
-rw-r--r-- | python/pyspark/mllib/random.py | 54 |
1 files changed, 34 insertions, 20 deletions
diff --git a/python/pyspark/mllib/random.py b/python/pyspark/mllib/random.py index d53c95fd59..a787e4dea2 100644 --- a/python/pyspark/mllib/random.py +++ b/python/pyspark/mllib/random.py @@ -19,15 +19,32 @@ Python package for random data generation. """ +from functools import wraps from pyspark.rdd import RDD -from pyspark.mllib._common import _deserialize_double, _deserialize_double_vector -from pyspark.serializers import NoOpSerializer +from pyspark.serializers import BatchedSerializer, PickleSerializer __all__ = ['RandomRDDs', ] +def serialize(f): + @wraps(f) + def func(sc, *a, **kw): + jrdd = f(sc, *a, **kw) + return RDD(sc._jvm.PythonRDD.javaToPython(jrdd), sc, + BatchedSerializer(PickleSerializer(), 1024)) + return func + + +def toArray(f): + @wraps(f) + def func(sc, *a, **kw): + rdd = f(sc, *a, **kw) + return rdd.map(lambda vec: vec.toArray()) + return func + + class RandomRDDs(object): """ Generator methods for creating RDDs comprised of i.i.d samples from @@ -35,6 +52,7 @@ class RandomRDDs(object): """ @staticmethod + @serialize def uniformRDD(sc, size, numPartitions=None, seed=None): """ Generates an RDD comprised of i.i.d. samples from the @@ -56,11 +74,10 @@ class RandomRDDs(object): >>> parts == sc.defaultParallelism True """ - jrdd = sc._jvm.PythonMLLibAPI().uniformRDD(sc._jsc, size, numPartitions, seed) - uniform = RDD(jrdd, sc, NoOpSerializer()) - return uniform.map(lambda bytes: _deserialize_double(bytearray(bytes))) + return sc._jvm.PythonMLLibAPI().uniformRDD(sc._jsc, size, numPartitions, seed) @staticmethod + @serialize def normalRDD(sc, size, numPartitions=None, seed=None): """ Generates an RDD comprised of i.i.d. samples from the standard normal @@ -80,11 +97,10 @@ class RandomRDDs(object): >>> abs(stats.stdev() - 1.0) < 0.1 True """ - jrdd = sc._jvm.PythonMLLibAPI().normalRDD(sc._jsc, size, numPartitions, seed) - normal = RDD(jrdd, sc, NoOpSerializer()) - return normal.map(lambda bytes: _deserialize_double(bytearray(bytes))) + return sc._jvm.PythonMLLibAPI().normalRDD(sc._jsc, size, numPartitions, seed) @staticmethod + @serialize def poissonRDD(sc, mean, size, numPartitions=None, seed=None): """ Generates an RDD comprised of i.i.d. samples from the Poisson @@ -101,11 +117,11 @@ class RandomRDDs(object): >>> abs(stats.stdev() - sqrt(mean)) < 0.5 True """ - jrdd = sc._jvm.PythonMLLibAPI().poissonRDD(sc._jsc, mean, size, numPartitions, seed) - poisson = RDD(jrdd, sc, NoOpSerializer()) - return poisson.map(lambda bytes: _deserialize_double(bytearray(bytes))) + return sc._jvm.PythonMLLibAPI().poissonRDD(sc._jsc, mean, size, numPartitions, seed) @staticmethod + @toArray + @serialize def uniformVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None): """ Generates an RDD comprised of vectors containing i.i.d. samples drawn @@ -120,12 +136,12 @@ class RandomRDDs(object): >>> RandomRDDs.uniformVectorRDD(sc, 10, 10, 4).getNumPartitions() 4 """ - jrdd = sc._jvm.PythonMLLibAPI() \ + return sc._jvm.PythonMLLibAPI() \ .uniformVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed) - uniform = RDD(jrdd, sc, NoOpSerializer()) - return uniform.map(lambda bytes: _deserialize_double_vector(bytearray(bytes))) @staticmethod + @toArray + @serialize def normalVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None): """ Generates an RDD comprised of vectors containing i.i.d. samples drawn @@ -140,12 +156,12 @@ class RandomRDDs(object): >>> abs(mat.std() - 1.0) < 0.1 True """ - jrdd = sc._jvm.PythonMLLibAPI() \ + return sc._jvm.PythonMLLibAPI() \ .normalVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed) - normal = RDD(jrdd, sc, NoOpSerializer()) - return normal.map(lambda bytes: _deserialize_double_vector(bytearray(bytes))) @staticmethod + @toArray + @serialize def poissonVectorRDD(sc, mean, numRows, numCols, numPartitions=None, seed=None): """ Generates an RDD comprised of vectors containing i.i.d. samples drawn @@ -163,10 +179,8 @@ class RandomRDDs(object): >>> abs(mat.std() - sqrt(mean)) < 0.5 True """ - jrdd = sc._jvm.PythonMLLibAPI() \ + return sc._jvm.PythonMLLibAPI() \ .poissonVectorRDD(sc._jsc, mean, numRows, numCols, numPartitions, seed) - poisson = RDD(jrdd, sc, NoOpSerializer()) - return poisson.map(lambda bytes: _deserialize_double_vector(bytearray(bytes))) def _test(): |