diff options
author | Doris Xin <doris.s.xin@gmail.com> | 2014-07-31 20:32:57 -0700 |
---|---|---|
committer | Xiangrui Meng <meng@databricks.com> | 2014-07-31 20:32:57 -0700 |
commit | d8430148ee1f6ba02569db0538eeae473a32c78e (patch) | |
tree | d5103a5bc8f3068c48e0d581abe515560c1ecfe5 /python | |
parent | 8f51491ea78d8e88fc664c2eac3b4ac14226d98f (diff) | |
download | spark-d8430148ee1f6ba02569db0538eeae473a32c78e.tar.gz spark-d8430148ee1f6ba02569db0538eeae473a32c78e.tar.bz2 spark-d8430148ee1f6ba02569db0538eeae473a32c78e.zip |
[SPARK-2724] Python version of RandomRDDGenerators
RandomRDDGenerators but without support for randomRDD and randomVectorRDD, which take in arbitrary DistributionGenerator.
`randomRDD.py` is named to avoid collision with the built-in Python `random` package.
Author: Doris Xin <doris.s.xin@gmail.com>
Closes #1628 from dorx/pythonRDD and squashes the following commits:
55c6de8 [Doris Xin] review comments. all python units passed.
f831d9b [Doris Xin] moved default args logic into PythonMLLibAPI
2d73917 [Doris Xin] fix for linalg.py
8663e6a [Doris Xin] reverting back to a single python file for random
f47c481 [Doris Xin] docs update
687aac0 [Doris Xin] add RandomRDDGenerators.py to run-tests
4338f40 [Doris Xin] renamed randomRDD to rand and import as random
29d205e [Doris Xin] created mllib.random package
bd2df13 [Doris Xin] typos
07ddff2 [Doris Xin] units passed.
23b2ecd [Doris Xin] WIP
Diffstat (limited to 'python')
-rw-r--r-- | python/pyspark/__init__.py | 10 | ||||
-rw-r--r-- | python/pyspark/mllib/linalg.py | 4 | ||||
-rw-r--r-- | python/pyspark/mllib/random.py | 182 | ||||
-rwxr-xr-x | python/run-tests | 1 |
4 files changed, 197 insertions, 0 deletions
diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py index 312c75d112..c58555fc9d 100644 --- a/python/pyspark/__init__.py +++ b/python/pyspark/__init__.py @@ -49,6 +49,16 @@ Hive: Main entry point for accessing data stored in Apache Hive.. """ +# The following block allows us to import python's random instead of mllib.random for scripts in +# mllib that depend on top level pyspark packages, which transitively depend on python's random. +# Since Python's import logic looks for modules in the current package first, we eliminate +# mllib.random as a candidate for C{import random} by removing the first search path, the script's +# location, in order to force the loader to look in Python's top-level modules for C{random}. +import sys +s = sys.path.pop(0) +import random +sys.path.insert(0, s) + from pyspark.conf import SparkConf from pyspark.context import SparkContext from pyspark.sql import SQLContext diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py index 71f4ad1a8d..54720c2324 100644 --- a/python/pyspark/mllib/linalg.py +++ b/python/pyspark/mllib/linalg.py @@ -255,4 +255,8 @@ def _test(): exit(-1) if __name__ == "__main__": + # remove current path from list of search paths to avoid importing mllib.random + # for C{import random}, which is done in an external dependency of pyspark during doctests. + import sys + sys.path.pop(0) _test() diff --git a/python/pyspark/mllib/random.py b/python/pyspark/mllib/random.py new file mode 100644 index 0000000000..36e710dbae --- /dev/null +++ b/python/pyspark/mllib/random.py @@ -0,0 +1,182 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Python package for random data generation. +""" + + +from pyspark.rdd import RDD +from pyspark.mllib._common import _deserialize_double, _deserialize_double_vector +from pyspark.serializers import NoOpSerializer + +class RandomRDDGenerators: + """ + Generator methods for creating RDDs comprised of i.i.d samples from + some distribution. + """ + + @staticmethod + def uniformRDD(sc, size, numPartitions=None, seed=None): + """ + Generates an RDD comprised of i.i.d. samples from the + uniform distribution on [0.0, 1.0]. + + To transform the distribution in the generated RDD from U[0.0, 1.0] + to U[a, b], use + C{RandomRDDGenerators.uniformRDD(sc, n, p, seed)\ + .map(lambda v: a + (b - a) * v)} + + >>> x = RandomRDDGenerators.uniformRDD(sc, 100).collect() + >>> len(x) + 100 + >>> max(x) <= 1.0 and min(x) >= 0.0 + True + >>> RandomRDDGenerators.uniformRDD(sc, 100, 4).getNumPartitions() + 4 + >>> parts = RandomRDDGenerators.uniformRDD(sc, 100, seed=4).getNumPartitions() + >>> parts == sc.defaultParallelism + True + """ + jrdd = sc._jvm.PythonMLLibAPI().uniformRDD(sc._jsc, size, numPartitions, seed) + uniform = RDD(jrdd, sc, NoOpSerializer()) + return uniform.map(lambda bytes: _deserialize_double(bytearray(bytes))) + + @staticmethod + def normalRDD(sc, size, numPartitions=None, seed=None): + """ + Generates an RDD comprised of i.i.d samples from the standard normal + distribution. + + To transform the distribution in the generated RDD from standard normal + to some other normal N(mean, sigma), use + C{RandomRDDGenerators.normal(sc, n, p, seed)\ + .map(lambda v: mean + sigma * v)} + + >>> x = RandomRDDGenerators.normalRDD(sc, 1000, seed=1L) + >>> stats = x.stats() + >>> stats.count() + 1000L + >>> abs(stats.mean() - 0.0) < 0.1 + True + >>> abs(stats.stdev() - 1.0) < 0.1 + True + """ + jrdd = sc._jvm.PythonMLLibAPI().normalRDD(sc._jsc, size, numPartitions, seed) + normal = RDD(jrdd, sc, NoOpSerializer()) + return normal.map(lambda bytes: _deserialize_double(bytearray(bytes))) + + @staticmethod + def poissonRDD(sc, mean, size, numPartitions=None, seed=None): + """ + Generates an RDD comprised of i.i.d samples from the Poisson + distribution with the input mean. + + >>> mean = 100.0 + >>> x = RandomRDDGenerators.poissonRDD(sc, mean, 1000, seed=1L) + >>> stats = x.stats() + >>> stats.count() + 1000L + >>> abs(stats.mean() - mean) < 0.5 + True + >>> from math import sqrt + >>> abs(stats.stdev() - sqrt(mean)) < 0.5 + True + """ + jrdd = sc._jvm.PythonMLLibAPI().poissonRDD(sc._jsc, mean, size, numPartitions, seed) + poisson = RDD(jrdd, sc, NoOpSerializer()) + return poisson.map(lambda bytes: _deserialize_double(bytearray(bytes))) + + @staticmethod + def uniformVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None): + """ + Generates an RDD comprised of vectors containing i.i.d samples drawn + from the uniform distribution on [0.0 1.0]. + + >>> import numpy as np + >>> mat = np.matrix(RandomRDDGenerators.uniformVectorRDD(sc, 10, 10).collect()) + >>> mat.shape + (10, 10) + >>> mat.max() <= 1.0 and mat.min() >= 0.0 + True + >>> RandomRDDGenerators.uniformVectorRDD(sc, 10, 10, 4).getNumPartitions() + 4 + """ + jrdd = sc._jvm.PythonMLLibAPI() \ + .uniformVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed) + uniform = RDD(jrdd, sc, NoOpSerializer()) + return uniform.map(lambda bytes: _deserialize_double_vector(bytearray(bytes))) + + @staticmethod + def normalVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None): + """ + Generates an RDD comprised of vectors containing i.i.d samples drawn + from the standard normal distribution. + + >>> import numpy as np + >>> mat = np.matrix(RandomRDDGenerators.normalVectorRDD(sc, 100, 100, seed=1L).collect()) + >>> mat.shape + (100, 100) + >>> abs(mat.mean() - 0.0) < 0.1 + True + >>> abs(mat.std() - 1.0) < 0.1 + True + """ + jrdd = sc._jvm.PythonMLLibAPI() \ + .normalVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed) + normal = RDD(jrdd, sc, NoOpSerializer()) + return normal.map(lambda bytes: _deserialize_double_vector(bytearray(bytes))) + + @staticmethod + def poissonVectorRDD(sc, mean, numRows, numCols, numPartitions=None, seed=None): + """ + Generates an RDD comprised of vectors containing i.i.d samples drawn + from the Poisson distribution with the input mean. + + >>> import numpy as np + >>> mean = 100.0 + >>> rdd = RandomRDDGenerators.poissonVectorRDD(sc, mean, 100, 100, seed=1L) + >>> mat = np.mat(rdd.collect()) + >>> mat.shape + (100, 100) + >>> abs(mat.mean() - mean) < 0.5 + True + >>> from math import sqrt + >>> abs(mat.std() - sqrt(mean)) < 0.5 + True + """ + jrdd = sc._jvm.PythonMLLibAPI() \ + .poissonVectorRDD(sc._jsc, mean, numRows, numCols, numPartitions, seed) + poisson = RDD(jrdd, sc, NoOpSerializer()) + return poisson.map(lambda bytes: _deserialize_double_vector(bytearray(bytes))) + + +def _test(): + import doctest + from pyspark.context import SparkContext + globs = globals().copy() + # The small batch size here ensures that we see multiple batches, + # even in these small test examples: + globs['sc'] = SparkContext('local[2]', 'PythonTest', batchSize=2) + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) + globs['sc'].stop() + if failure_count: + exit(-1) + + +if __name__ == "__main__": + _test() diff --git a/python/run-tests b/python/run-tests index 29f755fc0d..5049e15ce5 100755 --- a/python/run-tests +++ b/python/run-tests @@ -67,6 +67,7 @@ run_test "pyspark/mllib/_common.py" run_test "pyspark/mllib/classification.py" run_test "pyspark/mllib/clustering.py" run_test "pyspark/mllib/linalg.py" +run_test "pyspark/mllib/random.py" run_test "pyspark/mllib/recommendation.py" run_test "pyspark/mllib/regression.py" run_test "pyspark/mllib/tests.py" |