aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorDoris Xin <doris.s.xin@gmail.com>2014-07-31 20:32:57 -0700
committerXiangrui Meng <meng@databricks.com>2014-07-31 20:32:57 -0700
commitd8430148ee1f6ba02569db0538eeae473a32c78e (patch)
treed5103a5bc8f3068c48e0d581abe515560c1ecfe5 /python
parent8f51491ea78d8e88fc664c2eac3b4ac14226d98f (diff)
downloadspark-d8430148ee1f6ba02569db0538eeae473a32c78e.tar.gz
spark-d8430148ee1f6ba02569db0538eeae473a32c78e.tar.bz2
spark-d8430148ee1f6ba02569db0538eeae473a32c78e.zip
[SPARK-2724] Python version of RandomRDDGenerators
RandomRDDGenerators but without support for randomRDD and randomVectorRDD, which take in arbitrary DistributionGenerator. `randomRDD.py` is named to avoid collision with the built-in Python `random` package. Author: Doris Xin <doris.s.xin@gmail.com> Closes #1628 from dorx/pythonRDD and squashes the following commits: 55c6de8 [Doris Xin] review comments. all python units passed. f831d9b [Doris Xin] moved default args logic into PythonMLLibAPI 2d73917 [Doris Xin] fix for linalg.py 8663e6a [Doris Xin] reverting back to a single python file for random f47c481 [Doris Xin] docs update 687aac0 [Doris Xin] add RandomRDDGenerators.py to run-tests 4338f40 [Doris Xin] renamed randomRDD to rand and import as random 29d205e [Doris Xin] created mllib.random package bd2df13 [Doris Xin] typos 07ddff2 [Doris Xin] units passed. 23b2ecd [Doris Xin] WIP
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/__init__.py10
-rw-r--r--python/pyspark/mllib/linalg.py4
-rw-r--r--python/pyspark/mllib/random.py182
-rwxr-xr-xpython/run-tests1
4 files changed, 197 insertions, 0 deletions
diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py
index 312c75d112..c58555fc9d 100644
--- a/python/pyspark/__init__.py
+++ b/python/pyspark/__init__.py
@@ -49,6 +49,16 @@ Hive:
Main entry point for accessing data stored in Apache Hive..
"""
+# The following block allows us to import python's random instead of mllib.random for scripts in
+# mllib that depend on top level pyspark packages, which transitively depend on python's random.
+# Since Python's import logic looks for modules in the current package first, we eliminate
+# mllib.random as a candidate for C{import random} by removing the first search path, the script's
+# location, in order to force the loader to look in Python's top-level modules for C{random}.
+import sys
+s = sys.path.pop(0)
+import random
+sys.path.insert(0, s)
+
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py
index 71f4ad1a8d..54720c2324 100644
--- a/python/pyspark/mllib/linalg.py
+++ b/python/pyspark/mllib/linalg.py
@@ -255,4 +255,8 @@ def _test():
exit(-1)
if __name__ == "__main__":
+ # remove current path from list of search paths to avoid importing mllib.random
+ # for C{import random}, which is done in an external dependency of pyspark during doctests.
+ import sys
+ sys.path.pop(0)
_test()
diff --git a/python/pyspark/mllib/random.py b/python/pyspark/mllib/random.py
new file mode 100644
index 0000000000..36e710dbae
--- /dev/null
+++ b/python/pyspark/mllib/random.py
@@ -0,0 +1,182 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Python package for random data generation.
+"""
+
+
+from pyspark.rdd import RDD
+from pyspark.mllib._common import _deserialize_double, _deserialize_double_vector
+from pyspark.serializers import NoOpSerializer
+
+class RandomRDDGenerators:
+ """
+ Generator methods for creating RDDs comprised of i.i.d samples from
+ some distribution.
+ """
+
+ @staticmethod
+ def uniformRDD(sc, size, numPartitions=None, seed=None):
+ """
+ Generates an RDD comprised of i.i.d. samples from the
+ uniform distribution on [0.0, 1.0].
+
+ To transform the distribution in the generated RDD from U[0.0, 1.0]
+ to U[a, b], use
+ C{RandomRDDGenerators.uniformRDD(sc, n, p, seed)\
+ .map(lambda v: a + (b - a) * v)}
+
+ >>> x = RandomRDDGenerators.uniformRDD(sc, 100).collect()
+ >>> len(x)
+ 100
+ >>> max(x) <= 1.0 and min(x) >= 0.0
+ True
+ >>> RandomRDDGenerators.uniformRDD(sc, 100, 4).getNumPartitions()
+ 4
+ >>> parts = RandomRDDGenerators.uniformRDD(sc, 100, seed=4).getNumPartitions()
+ >>> parts == sc.defaultParallelism
+ True
+ """
+ jrdd = sc._jvm.PythonMLLibAPI().uniformRDD(sc._jsc, size, numPartitions, seed)
+ uniform = RDD(jrdd, sc, NoOpSerializer())
+ return uniform.map(lambda bytes: _deserialize_double(bytearray(bytes)))
+
+ @staticmethod
+ def normalRDD(sc, size, numPartitions=None, seed=None):
+ """
+ Generates an RDD comprised of i.i.d samples from the standard normal
+ distribution.
+
+ To transform the distribution in the generated RDD from standard normal
+ to some other normal N(mean, sigma), use
+ C{RandomRDDGenerators.normal(sc, n, p, seed)\
+ .map(lambda v: mean + sigma * v)}
+
+ >>> x = RandomRDDGenerators.normalRDD(sc, 1000, seed=1L)
+ >>> stats = x.stats()
+ >>> stats.count()
+ 1000L
+ >>> abs(stats.mean() - 0.0) < 0.1
+ True
+ >>> abs(stats.stdev() - 1.0) < 0.1
+ True
+ """
+ jrdd = sc._jvm.PythonMLLibAPI().normalRDD(sc._jsc, size, numPartitions, seed)
+ normal = RDD(jrdd, sc, NoOpSerializer())
+ return normal.map(lambda bytes: _deserialize_double(bytearray(bytes)))
+
+ @staticmethod
+ def poissonRDD(sc, mean, size, numPartitions=None, seed=None):
+ """
+ Generates an RDD comprised of i.i.d samples from the Poisson
+ distribution with the input mean.
+
+ >>> mean = 100.0
+ >>> x = RandomRDDGenerators.poissonRDD(sc, mean, 1000, seed=1L)
+ >>> stats = x.stats()
+ >>> stats.count()
+ 1000L
+ >>> abs(stats.mean() - mean) < 0.5
+ True
+ >>> from math import sqrt
+ >>> abs(stats.stdev() - sqrt(mean)) < 0.5
+ True
+ """
+ jrdd = sc._jvm.PythonMLLibAPI().poissonRDD(sc._jsc, mean, size, numPartitions, seed)
+ poisson = RDD(jrdd, sc, NoOpSerializer())
+ return poisson.map(lambda bytes: _deserialize_double(bytearray(bytes)))
+
+ @staticmethod
+ def uniformVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None):
+ """
+ Generates an RDD comprised of vectors containing i.i.d samples drawn
+ from the uniform distribution on [0.0 1.0].
+
+ >>> import numpy as np
+ >>> mat = np.matrix(RandomRDDGenerators.uniformVectorRDD(sc, 10, 10).collect())
+ >>> mat.shape
+ (10, 10)
+ >>> mat.max() <= 1.0 and mat.min() >= 0.0
+ True
+ >>> RandomRDDGenerators.uniformVectorRDD(sc, 10, 10, 4).getNumPartitions()
+ 4
+ """
+ jrdd = sc._jvm.PythonMLLibAPI() \
+ .uniformVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed)
+ uniform = RDD(jrdd, sc, NoOpSerializer())
+ return uniform.map(lambda bytes: _deserialize_double_vector(bytearray(bytes)))
+
+ @staticmethod
+ def normalVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None):
+ """
+ Generates an RDD comprised of vectors containing i.i.d samples drawn
+ from the standard normal distribution.
+
+ >>> import numpy as np
+ >>> mat = np.matrix(RandomRDDGenerators.normalVectorRDD(sc, 100, 100, seed=1L).collect())
+ >>> mat.shape
+ (100, 100)
+ >>> abs(mat.mean() - 0.0) < 0.1
+ True
+ >>> abs(mat.std() - 1.0) < 0.1
+ True
+ """
+ jrdd = sc._jvm.PythonMLLibAPI() \
+ .normalVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed)
+ normal = RDD(jrdd, sc, NoOpSerializer())
+ return normal.map(lambda bytes: _deserialize_double_vector(bytearray(bytes)))
+
+ @staticmethod
+ def poissonVectorRDD(sc, mean, numRows, numCols, numPartitions=None, seed=None):
+ """
+ Generates an RDD comprised of vectors containing i.i.d samples drawn
+ from the Poisson distribution with the input mean.
+
+ >>> import numpy as np
+ >>> mean = 100.0
+ >>> rdd = RandomRDDGenerators.poissonVectorRDD(sc, mean, 100, 100, seed=1L)
+ >>> mat = np.mat(rdd.collect())
+ >>> mat.shape
+ (100, 100)
+ >>> abs(mat.mean() - mean) < 0.5
+ True
+ >>> from math import sqrt
+ >>> abs(mat.std() - sqrt(mean)) < 0.5
+ True
+ """
+ jrdd = sc._jvm.PythonMLLibAPI() \
+ .poissonVectorRDD(sc._jsc, mean, numRows, numCols, numPartitions, seed)
+ poisson = RDD(jrdd, sc, NoOpSerializer())
+ return poisson.map(lambda bytes: _deserialize_double_vector(bytearray(bytes)))
+
+
+def _test():
+ import doctest
+ from pyspark.context import SparkContext
+ globs = globals().copy()
+ # The small batch size here ensures that we see multiple batches,
+ # even in these small test examples:
+ globs['sc'] = SparkContext('local[2]', 'PythonTest', batchSize=2)
+ (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
+ globs['sc'].stop()
+ if failure_count:
+ exit(-1)
+
+
+if __name__ == "__main__":
+ _test()
diff --git a/python/run-tests b/python/run-tests
index 29f755fc0d..5049e15ce5 100755
--- a/python/run-tests
+++ b/python/run-tests
@@ -67,6 +67,7 @@ run_test "pyspark/mllib/_common.py"
run_test "pyspark/mllib/classification.py"
run_test "pyspark/mllib/clustering.py"
run_test "pyspark/mllib/linalg.py"
+run_test "pyspark/mllib/random.py"
run_test "pyspark/mllib/recommendation.py"
run_test "pyspark/mllib/regression.py"
run_test "pyspark/mllib/tests.py"