aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRJ Nowling <rnowling@gmail.com>2015-01-08 15:03:43 -0800
committerXiangrui Meng <meng@databricks.com>2015-01-08 15:03:43 -0800
commitc9c8b219ad81c4c30bc1598ff35b01f964570c29 (patch)
treed9d7aa3396e35e86cabbaaa78d88a65e8e3321c5
parenta00af6bec57b8df8b286aaa5897232475aef441c (diff)
downloadspark-c9c8b219ad81c4c30bc1598ff35b01f964570c29.tar.gz
spark-c9c8b219ad81c4c30bc1598ff35b01f964570c29.tar.bz2
spark-c9c8b219ad81c4c30bc1598ff35b01f964570c29.zip
[SPARK-4891][PySpark][MLlib] Add gamma/log normal/exp dist sampling to P...
...ySpark MLlib This is a follow up to PR3680 https://github.com/apache/spark/pull/3680 . Author: RJ Nowling <rnowling@gmail.com> Closes #3955 from rnowling/spark4891 and squashes the following commits: 1236a01 [RJ Nowling] Fix Python style issues 7a01a78 [RJ Nowling] Fix Python style issues 174beab [RJ Nowling] [SPARK-4891][PySpark][MLlib] Add gamma/log normal/exp dist sampling to PySpark MLlib
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala88
-rw-r--r--python/pyspark/mllib/rand.py187
2 files changed, 275 insertions, 0 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index c4e5fd8e46..555da8c7e7 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -625,6 +625,21 @@ class PythonMLLibAPI extends Serializable {
}
/**
+ * Java stub for Python mllib RandomRDDGenerators.logNormalRDD()
+ */
+ def logNormalRDD(jsc: JavaSparkContext,
+ mean: Double,
+ std: Double,
+ size: Long,
+ numPartitions: java.lang.Integer,
+ seed: java.lang.Long): JavaRDD[Double] = {
+ val parts = getNumPartitionsOrDefault(numPartitions, jsc)
+ val s = getSeedOrDefault(seed)
+ RG.logNormalRDD(jsc.sc, mean, std, size, parts, s)
+ }
+
+
+ /**
* Java stub for Python mllib RandomRDDGenerators.poissonRDD()
*/
def poissonRDD(jsc: JavaSparkContext,
@@ -638,6 +653,33 @@ class PythonMLLibAPI extends Serializable {
}
/**
+ * Java stub for Python mllib RandomRDDGenerators.exponentialRDD()
+ */
+ def exponentialRDD(jsc: JavaSparkContext,
+ mean: Double,
+ size: Long,
+ numPartitions: java.lang.Integer,
+ seed: java.lang.Long): JavaRDD[Double] = {
+ val parts = getNumPartitionsOrDefault(numPartitions, jsc)
+ val s = getSeedOrDefault(seed)
+ RG.exponentialRDD(jsc.sc, mean, size, parts, s)
+ }
+
+ /**
+ * Java stub for Python mllib RandomRDDGenerators.gammaRDD()
+ */
+ def gammaRDD(jsc: JavaSparkContext,
+ shape: Double,
+ scale: Double,
+ size: Long,
+ numPartitions: java.lang.Integer,
+ seed: java.lang.Long): JavaRDD[Double] = {
+ val parts = getNumPartitionsOrDefault(numPartitions, jsc)
+ val s = getSeedOrDefault(seed)
+ RG.gammaRDD(jsc.sc, shape, scale, size, parts, s)
+ }
+
+ /**
* Java stub for Python mllib RandomRDDGenerators.uniformVectorRDD()
*/
def uniformVectorRDD(jsc: JavaSparkContext,
@@ -664,6 +706,22 @@ class PythonMLLibAPI extends Serializable {
}
/**
+ * Java stub for Python mllib RandomRDDGenerators.logNormalVectorRDD()
+ */
+ def logNormalVectorRDD(jsc: JavaSparkContext,
+ mean: Double,
+ std: Double,
+ numRows: Long,
+ numCols: Int,
+ numPartitions: java.lang.Integer,
+ seed: java.lang.Long): JavaRDD[Vector] = {
+ val parts = getNumPartitionsOrDefault(numPartitions, jsc)
+ val s = getSeedOrDefault(seed)
+ RG.logNormalVectorRDD(jsc.sc, mean, std, numRows, numCols, parts, s)
+ }
+
+
+ /**
* Java stub for Python mllib RandomRDDGenerators.poissonVectorRDD()
*/
def poissonVectorRDD(jsc: JavaSparkContext,
@@ -677,6 +735,36 @@ class PythonMLLibAPI extends Serializable {
RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s)
}
+ /**
+ * Java stub for Python mllib RandomRDDGenerators.exponentialVectorRDD()
+ */
+ def exponentialVectorRDD(jsc: JavaSparkContext,
+ mean: Double,
+ numRows: Long,
+ numCols: Int,
+ numPartitions: java.lang.Integer,
+ seed: java.lang.Long): JavaRDD[Vector] = {
+ val parts = getNumPartitionsOrDefault(numPartitions, jsc)
+ val s = getSeedOrDefault(seed)
+ RG.exponentialVectorRDD(jsc.sc, mean, numRows, numCols, parts, s)
+ }
+
+ /**
+ * Java stub for Python mllib RandomRDDGenerators.gammaVectorRDD()
+ */
+ def gammaVectorRDD(jsc: JavaSparkContext,
+ shape: Double,
+ scale: Double,
+ numRows: Long,
+ numCols: Int,
+ numPartitions: java.lang.Integer,
+ seed: java.lang.Long): JavaRDD[Vector] = {
+ val parts = getNumPartitionsOrDefault(numPartitions, jsc)
+ val s = getSeedOrDefault(seed)
+ RG.gammaVectorRDD(jsc.sc, shape, scale, numRows, numCols, parts, s)
+ }
+
+
}
/**
diff --git a/python/pyspark/mllib/rand.py b/python/pyspark/mllib/rand.py
index cb4304f921..20ee9d78bf 100644
--- a/python/pyspark/mllib/rand.py
+++ b/python/pyspark/mllib/rand.py
@@ -100,6 +100,38 @@ class RandomRDDs(object):
return callMLlibFunc("normalRDD", sc._jsc, size, numPartitions, seed)
@staticmethod
+ def logNormalRDD(sc, mean, std, size, numPartitions=None, seed=None):
+ """
+ Generates an RDD comprised of i.i.d. samples from the log normal
+ distribution with the input mean and standard distribution.
+
+ :param sc: SparkContext used to create the RDD.
+ :param mean: mean for the log Normal distribution
+ :param std: std for the log Normal distribution
+ :param size: Size of the RDD.
+ :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
+ :param seed: Random seed (default: a random long integer).
+ :return: RDD of float comprised of i.i.d. samples ~ log N(mean, std).
+
+ >>> from math import sqrt, exp
+ >>> mean = 0.0
+ >>> std = 1.0
+ >>> expMean = exp(mean + 0.5 * std * std)
+ >>> expStd = sqrt((exp(std * std) - 1.0) * exp(2.0 * mean + std * std))
+ >>> x = RandomRDDs.logNormalRDD(sc, mean, std, 1000, seed=2L)
+ >>> stats = x.stats()
+ >>> stats.count()
+ 1000L
+ >>> abs(stats.mean() - expMean) < 0.5
+ True
+ >>> from math import sqrt
+ >>> abs(stats.stdev() - expStd) < 0.5
+ True
+ """
+ return callMLlibFunc("logNormalRDD", sc._jsc, float(mean), float(std),
+ size, numPartitions, seed)
+
+ @staticmethod
def poissonRDD(sc, mean, size, numPartitions=None, seed=None):
"""
Generates an RDD comprised of i.i.d. samples from the Poisson
@@ -126,6 +158,63 @@ class RandomRDDs(object):
return callMLlibFunc("poissonRDD", sc._jsc, float(mean), size, numPartitions, seed)
@staticmethod
+ def exponentialRDD(sc, mean, size, numPartitions=None, seed=None):
+ """
+ Generates an RDD comprised of i.i.d. samples from the Exponential
+ distribution with the input mean.
+
+ :param sc: SparkContext used to create the RDD.
+ :param mean: Mean, or 1 / lambda, for the Exponential distribution.
+ :param size: Size of the RDD.
+ :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
+ :param seed: Random seed (default: a random long integer).
+ :return: RDD of float comprised of i.i.d. samples ~ Exp(mean).
+
+ >>> mean = 2.0
+ >>> x = RandomRDDs.exponentialRDD(sc, mean, 1000, seed=2L)
+ >>> stats = x.stats()
+ >>> stats.count()
+ 1000L
+ >>> abs(stats.mean() - mean) < 0.5
+ True
+ >>> from math import sqrt
+ >>> abs(stats.stdev() - sqrt(mean)) < 0.5
+ True
+ """
+ return callMLlibFunc("exponentialRDD", sc._jsc, float(mean), size, numPartitions, seed)
+
+ @staticmethod
+ def gammaRDD(sc, shape, scale, size, numPartitions=None, seed=None):
+ """
+ Generates an RDD comprised of i.i.d. samples from the Gamma
+ distribution with the input shape and scale.
+
+ :param sc: SparkContext used to create the RDD.
+ :param shape: shape (> 0) parameter for the Gamma distribution
+ :param scale: scale (> 0) parameter for the Gamma distribution
+ :param size: Size of the RDD.
+ :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
+ :param seed: Random seed (default: a random long integer).
+ :return: RDD of float comprised of i.i.d. samples ~ Gamma(shape, scale).
+
+ >>> from math import sqrt
+ >>> shape = 1.0
+ >>> scale = 2.0
+ >>> expMean = shape * scale
+ >>> expStd = sqrt(shape * scale * scale)
+ >>> x = RandomRDDs.gammaRDD(sc, shape, scale, 1000, seed=2L)
+ >>> stats = x.stats()
+ >>> stats.count()
+ 1000L
+ >>> abs(stats.mean() - expMean) < 0.5
+ True
+ >>> abs(stats.stdev() - expStd) < 0.5
+ True
+ """
+ return callMLlibFunc("gammaRDD", sc._jsc, float(shape),
+ float(scale), size, numPartitions, seed)
+
+ @staticmethod
@toArray
def uniformVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None):
"""
@@ -177,6 +266,40 @@ class RandomRDDs(object):
@staticmethod
@toArray
+ def logNormalVectorRDD(sc, mean, std, numRows, numCols, numPartitions=None, seed=None):
+ """
+ Generates an RDD comprised of vectors containing i.i.d. samples drawn
+ from the log normal distribution.
+
+ :param sc: SparkContext used to create the RDD.
+ :param mean: Mean of the log normal distribution
+ :param std: Standard Deviation of the log normal distribution
+ :param numRows: Number of Vectors in the RDD.
+ :param numCols: Number of elements in each Vector.
+ :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
+ :param seed: Random seed (default: a random long integer).
+ :return: RDD of Vector with vectors containing i.i.d. samples ~ log `N(mean, std)`.
+
+ >>> import numpy as np
+ >>> from math import sqrt, exp
+ >>> mean = 0.0
+ >>> std = 1.0
+ >>> expMean = exp(mean + 0.5 * std * std)
+ >>> expStd = sqrt((exp(std * std) - 1.0) * exp(2.0 * mean + std * std))
+ >>> mat = np.matrix(RandomRDDs.logNormalVectorRDD(sc, mean, std, \
+ 100, 100, seed=1L).collect())
+ >>> mat.shape
+ (100, 100)
+ >>> abs(mat.mean() - expMean) < 0.1
+ True
+ >>> abs(mat.std() - expStd) < 0.1
+ True
+ """
+ return callMLlibFunc("logNormalVectorRDD", sc._jsc, float(mean), float(std),
+ numRows, numCols, numPartitions, seed)
+
+ @staticmethod
+ @toArray
def poissonVectorRDD(sc, mean, numRows, numCols, numPartitions=None, seed=None):
"""
Generates an RDD comprised of vectors containing i.i.d. samples drawn
@@ -205,6 +328,70 @@ class RandomRDDs(object):
return callMLlibFunc("poissonVectorRDD", sc._jsc, float(mean), numRows, numCols,
numPartitions, seed)
+ @staticmethod
+ @toArray
+ def exponentialVectorRDD(sc, mean, numRows, numCols, numPartitions=None, seed=None):
+ """
+ Generates an RDD comprised of vectors containing i.i.d. samples drawn
+ from the Exponential distribution with the input mean.
+
+ :param sc: SparkContext used to create the RDD.
+ :param mean: Mean, or 1 / lambda, for the Exponential distribution.
+ :param numRows: Number of Vectors in the RDD.
+ :param numCols: Number of elements in each Vector.
+ :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`)
+ :param seed: Random seed (default: a random long integer).
+ :return: RDD of Vector with vectors containing i.i.d. samples ~ Exp(mean).
+
+ >>> import numpy as np
+ >>> mean = 0.5
+ >>> rdd = RandomRDDs.exponentialVectorRDD(sc, mean, 100, 100, seed=1L)
+ >>> mat = np.mat(rdd.collect())
+ >>> mat.shape
+ (100, 100)
+ >>> abs(mat.mean() - mean) < 0.5
+ True
+ >>> from math import sqrt
+ >>> abs(mat.std() - sqrt(mean)) < 0.5
+ True
+ """
+ return callMLlibFunc("exponentialVectorRDD", sc._jsc, float(mean), numRows, numCols,
+ numPartitions, seed)
+
+ @staticmethod
+ @toArray
+ def gammaVectorRDD(sc, shape, scale, numRows, numCols, numPartitions=None, seed=None):
+ """
+ Generates an RDD comprised of vectors containing i.i.d. samples drawn
+ from the Gamma distribution.
+
+ :param sc: SparkContext used to create the RDD.
+ :param shape: Shape (> 0) of the Gamma distribution
+ :param scale: Scale (> 0) of the Gamma distribution
+ :param numRows: Number of Vectors in the RDD.
+ :param numCols: Number of elements in each Vector.
+ :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
+ :param seed: Random seed (default: a random long integer).
+ :return: RDD of Vector with vectors containing i.i.d. samples ~ Gamma(shape, scale).
+
+ >>> import numpy as np
+ >>> from math import sqrt
+ >>> shape = 1.0
+ >>> scale = 2.0
+ >>> expMean = shape * scale
+ >>> expStd = sqrt(shape * scale * scale)
+ >>> mat = np.matrix(RandomRDDs.gammaVectorRDD(sc, shape, scale, \
+ 100, 100, seed=1L).collect())
+ >>> mat.shape
+ (100, 100)
+ >>> abs(mat.mean() - expMean) < 0.1
+ True
+ >>> abs(mat.std() - expStd) < 0.1
+ True
+ """
+ return callMLlibFunc("gammaVectorRDD", sc._jsc, float(shape), float(scale),
+ numRows, numCols, numPartitions, seed)
+
def _test():
import doctest