aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorDoris Xin <doris.s.xin@gmail.com>2014-07-31 20:32:57 -0700
committerXiangrui Meng <meng@databricks.com>2014-07-31 20:32:57 -0700
commitd8430148ee1f6ba02569db0538eeae473a32c78e (patch)
treed5103a5bc8f3068c48e0d581abe515560c1ecfe5 /mllib
parent8f51491ea78d8e88fc664c2eac3b4ac14226d98f (diff)
downloadspark-d8430148ee1f6ba02569db0538eeae473a32c78e.tar.gz
spark-d8430148ee1f6ba02569db0538eeae473a32c78e.tar.bz2
spark-d8430148ee1f6ba02569db0538eeae473a32c78e.zip
[SPARK-2724] Python version of RandomRDDGenerators
RandomRDDGenerators but without support for randomRDD and randomVectorRDD, which take in arbitrary DistributionGenerator. `randomRDD.py` is named to avoid collision with the built-in Python `random` package. Author: Doris Xin <doris.s.xin@gmail.com> Closes #1628 from dorx/pythonRDD and squashes the following commits: 55c6de8 [Doris Xin] review comments. all python units passed. f831d9b [Doris Xin] moved default args logic into PythonMLLibAPI 2d73917 [Doris Xin] fix for linalg.py 8663e6a [Doris Xin] reverting back to a single python file for random f47c481 [Doris Xin] docs update 687aac0 [Doris Xin] add RandomRDDGenerators.py to run-tests 4338f40 [Doris Xin] renamed randomRDD to rand and import as random 29d205e [Doris Xin] created mllib.random package bd2df13 [Doris Xin] typos 07ddff2 [Doris Xin] units passed. 23b2ecd [Doris Xin] WIP
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala97
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala90
2 files changed, 151 insertions, 36 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index 954621ee8b..d2e8ccf208 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -24,10 +24,12 @@ import org.apache.spark.api.java.{JavaSparkContext, JavaRDD}
import org.apache.spark.mllib.classification._
import org.apache.spark.mllib.clustering._
import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors}
+import org.apache.spark.mllib.random.{RandomRDDGenerators => RG}
import org.apache.spark.mllib.recommendation._
import org.apache.spark.mllib.regression._
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
/**
* :: DeveloperApi ::
@@ -453,4 +455,99 @@ class PythonMLLibAPI extends Serializable {
val ratings = ratingsBytesJRDD.rdd.map(unpackRating)
ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha)
}
+
+ // Used by the *RDD methods to get default seed if not passed in from pyspark
+ private def getSeedOrDefault(seed: java.lang.Long): Long = {
+ if (seed == null) Utils.random.nextLong else seed
+ }
+
+ // Used by *RDD methods to get default numPartitions if not passed in from pyspark
+ private def getNumPartitionsOrDefault(numPartitions: java.lang.Integer,
+ jsc: JavaSparkContext): Int = {
+ if (numPartitions == null) {
+ jsc.sc.defaultParallelism
+ } else {
+ numPartitions
+ }
+ }
+
+ // Note: for the following methods, numPartitions and seed are boxed to allow nulls to be passed
+ // in for either argument from pyspark
+
+ /**
+ * Java stub for Python mllib RandomRDDGenerators.uniformRDD()
+ */
+ def uniformRDD(jsc: JavaSparkContext,
+ size: Long,
+ numPartitions: java.lang.Integer,
+ seed: java.lang.Long): JavaRDD[Array[Byte]] = {
+ val parts = getNumPartitionsOrDefault(numPartitions, jsc)
+ val s = getSeedOrDefault(seed)
+ RG.uniformRDD(jsc.sc, size, parts, s).map(serializeDouble)
+ }
+
+ /**
+ * Java stub for Python mllib RandomRDDGenerators.normalRDD()
+ */
+ def normalRDD(jsc: JavaSparkContext,
+ size: Long,
+ numPartitions: java.lang.Integer,
+ seed: java.lang.Long): JavaRDD[Array[Byte]] = {
+ val parts = getNumPartitionsOrDefault(numPartitions, jsc)
+ val s = getSeedOrDefault(seed)
+ RG.normalRDD(jsc.sc, size, parts, s).map(serializeDouble)
+ }
+
+ /**
+ * Java stub for Python mllib RandomRDDGenerators.poissonRDD()
+ */
+ def poissonRDD(jsc: JavaSparkContext,
+ mean: Double,
+ size: Long,
+ numPartitions: java.lang.Integer,
+ seed: java.lang.Long): JavaRDD[Array[Byte]] = {
+ val parts = getNumPartitionsOrDefault(numPartitions, jsc)
+ val s = getSeedOrDefault(seed)
+ RG.poissonRDD(jsc.sc, mean, size, parts, s).map(serializeDouble)
+ }
+
+ /**
+ * Java stub for Python mllib RandomRDDGenerators.uniformVectorRDD()
+ */
+ def uniformVectorRDD(jsc: JavaSparkContext,
+ numRows: Long,
+ numCols: Int,
+ numPartitions: java.lang.Integer,
+ seed: java.lang.Long): JavaRDD[Array[Byte]] = {
+ val parts = getNumPartitionsOrDefault(numPartitions, jsc)
+ val s = getSeedOrDefault(seed)
+ RG.uniformVectorRDD(jsc.sc, numRows, numCols, parts, s).map(serializeDoubleVector)
+ }
+
+ /**
+ * Java stub for Python mllib RandomRDDGenerators.normalVectorRDD()
+ */
+ def normalVectorRDD(jsc: JavaSparkContext,
+ numRows: Long,
+ numCols: Int,
+ numPartitions: java.lang.Integer,
+ seed: java.lang.Long): JavaRDD[Array[Byte]] = {
+ val parts = getNumPartitionsOrDefault(numPartitions, jsc)
+ val s = getSeedOrDefault(seed)
+ RG.normalVectorRDD(jsc.sc, numRows, numCols, parts, s).map(serializeDoubleVector)
+ }
+
+ /**
+ * Java stub for Python mllib RandomRDDGenerators.poissonVectorRDD()
+ */
+ def poissonVectorRDD(jsc: JavaSparkContext,
+ mean: Double,
+ numRows: Long,
+ numCols: Int,
+ numPartitions: java.lang.Integer,
+ seed: java.lang.Long): JavaRDD[Array[Byte]] = {
+ val parts = getNumPartitionsOrDefault(numPartitions, jsc)
+ val s = getSeedOrDefault(seed)
+ RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s).map(serializeDoubleVector)
+ }
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala
index d7ee2d3f46..021d651d4d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala
@@ -26,14 +26,17 @@ import org.apache.spark.util.Utils
/**
* :: Experimental ::
- * Generator methods for creating RDDs comprised of i.i.d samples from some distribution.
+ * Generator methods for creating RDDs comprised of i.i.d. samples from some distribution.
*/
@Experimental
object RandomRDDGenerators {
/**
* :: Experimental ::
- * Generates an RDD comprised of i.i.d samples from the uniform distribution on [0.0, 1.0].
+ * Generates an RDD comprised of i.i.d. samples from the uniform distribution on [0.0, 1.0].
+ *
+ * To transform the distribution in the generated RDD from U[0.0, 1.0] to U[a, b], use
+ * `RandomRDDGenerators.uniformRDD(sc, n, p, seed).map(v => a + (b - a) * v)`.
*
* @param sc SparkContext used to create the RDD.
* @param size Size of the RDD.
@@ -49,7 +52,10 @@ object RandomRDDGenerators {
/**
* :: Experimental ::
- * Generates an RDD comprised of i.i.d samples from the uniform distribution on [0.0, 1.0].
+ * Generates an RDD comprised of i.i.d. samples from the uniform distribution on [0.0, 1.0].
+ *
+ * To transform the distribution in the generated RDD from U[0.0, 1.0] to U[a, b], use
+ * `RandomRDDGenerators.uniformRDD(sc, n, p).map(v => a + (b - a) * v)`.
*
* @param sc SparkContext used to create the RDD.
* @param size Size of the RDD.
@@ -63,9 +69,12 @@ object RandomRDDGenerators {
/**
* :: Experimental ::
- * Generates an RDD comprised of i.i.d samples from the uniform distribution on [0.0, 1.0].
+ * Generates an RDD comprised of i.i.d. samples from the uniform distribution on [0.0, 1.0].
* sc.defaultParallelism used for the number of partitions in the RDD.
*
+ * To transform the distribution in the generated RDD from U[0.0, 1.0] to U[a, b], use
+ * `RandomRDDGenerators.uniformRDD(sc, n).map(v => a + (b - a) * v)`.
+ *
* @param sc SparkContext used to create the RDD.
* @param size Size of the RDD.
* @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0].
@@ -77,7 +86,10 @@ object RandomRDDGenerators {
/**
* :: Experimental ::
- * Generates an RDD comprised of i.i.d samples from the standard normal distribution.
+ * Generates an RDD comprised of i.i.d. samples from the standard normal distribution.
+ *
+ * To transform the distribution in the generated RDD from standard normal to some other normal
+ * N(mean, sigma), use `RandomRDDGenerators.normalRDD(sc, n, p, seed).map(v => mean + sigma * v)`.
*
* @param sc SparkContext used to create the RDD.
* @param size Size of the RDD.
@@ -93,7 +105,10 @@ object RandomRDDGenerators {
/**
* :: Experimental ::
- * Generates an RDD comprised of i.i.d samples from the standard normal distribution.
+ * Generates an RDD comprised of i.i.d. samples from the standard normal distribution.
+ *
+ * To transform the distribution in the generated RDD from standard normal to some other normal
+ * N(mean, sigma), use `RandomRDDGenerators.normalRDD(sc, n, p).map(v => mean + sigma * v)`.
*
* @param sc SparkContext used to create the RDD.
* @param size Size of the RDD.
@@ -107,9 +122,12 @@ object RandomRDDGenerators {
/**
* :: Experimental ::
- * Generates an RDD comprised of i.i.d samples from the standard normal distribution.
+ * Generates an RDD comprised of i.i.d. samples from the standard normal distribution.
* sc.defaultParallelism used for the number of partitions in the RDD.
*
+ * To transform the distribution in the generated RDD from standard normal to some other normal
+ * N(mean, sigma), use `RandomRDDGenerators.normalRDD(sc, n).map(v => mean + sigma * v)`.
+ *
* @param sc SparkContext used to create the RDD.
* @param size Size of the RDD.
* @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0).
@@ -121,7 +139,7 @@ object RandomRDDGenerators {
/**
* :: Experimental ::
- * Generates an RDD comprised of i.i.d samples from the Poisson distribution with the input mean.
+ * Generates an RDD comprised of i.i.d. samples from the Poisson distribution with the input mean.
*
* @param sc SparkContext used to create the RDD.
* @param mean Mean, or lambda, for the Poisson distribution.
@@ -142,7 +160,7 @@ object RandomRDDGenerators {
/**
* :: Experimental ::
- * Generates an RDD comprised of i.i.d samples from the Poisson distribution with the input mean.
+ * Generates an RDD comprised of i.i.d. samples from the Poisson distribution with the input mean.
*
* @param sc SparkContext used to create the RDD.
* @param mean Mean, or lambda, for the Poisson distribution.
@@ -157,7 +175,7 @@ object RandomRDDGenerators {
/**
* :: Experimental ::
- * Generates an RDD comprised of i.i.d samples from the Poisson distribution with the input mean.
+ * Generates an RDD comprised of i.i.d. samples from the Poisson distribution with the input mean.
* sc.defaultParallelism used for the number of partitions in the RDD.
*
* @param sc SparkContext used to create the RDD.
@@ -172,7 +190,7 @@ object RandomRDDGenerators {
/**
* :: Experimental ::
- * Generates an RDD comprised of i.i.d samples produced by the input DistributionGenerator.
+ * Generates an RDD comprised of i.i.d. samples produced by the input DistributionGenerator.
*
* @param sc SparkContext used to create the RDD.
* @param generator DistributionGenerator used to populate the RDD.
@@ -192,7 +210,7 @@ object RandomRDDGenerators {
/**
* :: Experimental ::
- * Generates an RDD comprised of i.i.d samples produced by the input DistributionGenerator.
+ * Generates an RDD comprised of i.i.d. samples produced by the input DistributionGenerator.
*
* @param sc SparkContext used to create the RDD.
* @param generator DistributionGenerator used to populate the RDD.
@@ -210,7 +228,7 @@ object RandomRDDGenerators {
/**
* :: Experimental ::
- * Generates an RDD comprised of i.i.d samples produced by the input DistributionGenerator.
+ * Generates an RDD comprised of i.i.d. samples produced by the input DistributionGenerator.
* sc.defaultParallelism used for the number of partitions in the RDD.
*
* @param sc SparkContext used to create the RDD.
@@ -229,7 +247,7 @@ object RandomRDDGenerators {
/**
* :: Experimental ::
- * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the
+ * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
* uniform distribution on [0.0 1.0].
*
* @param sc SparkContext used to create the RDD.
@@ -251,14 +269,14 @@ object RandomRDDGenerators {
/**
* :: Experimental ::
- * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the
+ * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
* uniform distribution on [0.0 1.0].
*
* @param sc SparkContext used to create the RDD.
* @param numRows Number of Vectors in the RDD.
* @param numCols Number of elements in each Vector.
* @param numPartitions Number of partitions in the RDD.
- * @return RDD[Vector] with vectors containing i.i.d samples ~ U[0.0, 1.0].
+ * @return RDD[Vector] with vectors containing i.i.d. samples ~ U[0.0, 1.0].
*/
@Experimental
def uniformVectorRDD(sc: SparkContext,
@@ -270,14 +288,14 @@ object RandomRDDGenerators {
/**
* :: Experimental ::
- * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the
+ * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
* uniform distribution on [0.0 1.0].
* sc.defaultParallelism used for the number of partitions in the RDD.
*
* @param sc SparkContext used to create the RDD.
* @param numRows Number of Vectors in the RDD.
* @param numCols Number of elements in each Vector.
- * @return RDD[Vector] with vectors containing i.i.d samples ~ U[0.0, 1.0].
+ * @return RDD[Vector] with vectors containing i.i.d. samples ~ U[0.0, 1.0].
*/
@Experimental
def uniformVectorRDD(sc: SparkContext, numRows: Long, numCols: Int): RDD[Vector] = {
@@ -286,7 +304,7 @@ object RandomRDDGenerators {
/**
* :: Experimental ::
- * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the
+ * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
* standard normal distribution.
*
* @param sc SparkContext used to create the RDD.
@@ -294,7 +312,7 @@ object RandomRDDGenerators {
* @param numCols Number of elements in each Vector.
* @param numPartitions Number of partitions in the RDD.
* @param seed Seed for the RNG that generates the seed for the generator in each partition.
- * @return RDD[Vector] with vectors containing i.i.d samples ~ N(0.0, 1.0).
+ * @return RDD[Vector] with vectors containing i.i.d. samples ~ N(0.0, 1.0).
*/
@Experimental
def normalVectorRDD(sc: SparkContext,
@@ -308,14 +326,14 @@ object RandomRDDGenerators {
/**
* :: Experimental ::
- * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the
+ * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
* standard normal distribution.
*
* @param sc SparkContext used to create the RDD.
* @param numRows Number of Vectors in the RDD.
* @param numCols Number of elements in each Vector.
* @param numPartitions Number of partitions in the RDD.
- * @return RDD[Vector] with vectors containing i.i.d samples ~ N(0.0, 1.0).
+ * @return RDD[Vector] with vectors containing i.i.d. samples ~ N(0.0, 1.0).
*/
@Experimental
def normalVectorRDD(sc: SparkContext,
@@ -327,14 +345,14 @@ object RandomRDDGenerators {
/**
* :: Experimental ::
- * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the
+ * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
* standard normal distribution.
* sc.defaultParallelism used for the number of partitions in the RDD.
*
* @param sc SparkContext used to create the RDD.
* @param numRows Number of Vectors in the RDD.
* @param numCols Number of elements in each Vector.
- * @return RDD[Vector] with vectors containing i.i.d samples ~ N(0.0, 1.0).
+ * @return RDD[Vector] with vectors containing i.i.d. samples ~ N(0.0, 1.0).
*/
@Experimental
def normalVectorRDD(sc: SparkContext, numRows: Long, numCols: Int): RDD[Vector] = {
@@ -343,7 +361,7 @@ object RandomRDDGenerators {
/**
* :: Experimental ::
- * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the
+ * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
* Poisson distribution with the input mean.
*
* @param sc SparkContext used to create the RDD.
@@ -352,7 +370,7 @@ object RandomRDDGenerators {
* @param numCols Number of elements in each Vector.
* @param numPartitions Number of partitions in the RDD.
* @param seed Seed for the RNG that generates the seed for the generator in each partition.
- * @return RDD[Vector] with vectors containing i.i.d samples ~ Pois(mean).
+ * @return RDD[Vector] with vectors containing i.i.d. samples ~ Pois(mean).
*/
@Experimental
def poissonVectorRDD(sc: SparkContext,
@@ -367,7 +385,7 @@ object RandomRDDGenerators {
/**
* :: Experimental ::
- * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the
+ * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
* Poisson distribution with the input mean.
*
* @param sc SparkContext used to create the RDD.
@@ -375,7 +393,7 @@ object RandomRDDGenerators {
* @param numRows Number of Vectors in the RDD.
* @param numCols Number of elements in each Vector.
* @param numPartitions Number of partitions in the RDD.
- * @return RDD[Vector] with vectors containing i.i.d samples ~ Pois(mean).
+ * @return RDD[Vector] with vectors containing i.i.d. samples ~ Pois(mean).
*/
@Experimental
def poissonVectorRDD(sc: SparkContext,
@@ -388,7 +406,7 @@ object RandomRDDGenerators {
/**
* :: Experimental ::
- * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the
+ * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
* Poisson distribution with the input mean.
* sc.defaultParallelism used for the number of partitions in the RDD.
*
@@ -396,7 +414,7 @@ object RandomRDDGenerators {
* @param mean Mean, or lambda, for the Poisson distribution.
* @param numRows Number of Vectors in the RDD.
* @param numCols Number of elements in each Vector.
- * @return RDD[Vector] with vectors containing i.i.d samples ~ Pois(mean).
+ * @return RDD[Vector] with vectors containing i.i.d. samples ~ Pois(mean).
*/
@Experimental
def poissonVectorRDD(sc: SparkContext,
@@ -408,7 +426,7 @@ object RandomRDDGenerators {
/**
* :: Experimental ::
- * Generates an RDD[Vector] with vectors containing i.i.d samples produced by the
+ * Generates an RDD[Vector] with vectors containing i.i.d. samples produced by the
* input DistributionGenerator.
*
* @param sc SparkContext used to create the RDD.
@@ -417,7 +435,7 @@ object RandomRDDGenerators {
* @param numCols Number of elements in each Vector.
* @param numPartitions Number of partitions in the RDD.
* @param seed Seed for the RNG that generates the seed for the generator in each partition.
- * @return RDD[Vector] with vectors containing i.i.d samples produced by generator.
+ * @return RDD[Vector] with vectors containing i.i.d. samples produced by generator.
*/
@Experimental
def randomVectorRDD(sc: SparkContext,
@@ -431,7 +449,7 @@ object RandomRDDGenerators {
/**
* :: Experimental ::
- * Generates an RDD[Vector] with vectors containing i.i.d samples produced by the
+ * Generates an RDD[Vector] with vectors containing i.i.d. samples produced by the
* input DistributionGenerator.
*
* @param sc SparkContext used to create the RDD.
@@ -439,7 +457,7 @@ object RandomRDDGenerators {
* @param numRows Number of Vectors in the RDD.
* @param numCols Number of elements in each Vector.
* @param numPartitions Number of partitions in the RDD.
- * @return RDD[Vector] with vectors containing i.i.d samples produced by generator.
+ * @return RDD[Vector] with vectors containing i.i.d. samples produced by generator.
*/
@Experimental
def randomVectorRDD(sc: SparkContext,
@@ -452,7 +470,7 @@ object RandomRDDGenerators {
/**
* :: Experimental ::
- * Generates an RDD[Vector] with vectors containing i.i.d samples produced by the
+ * Generates an RDD[Vector] with vectors containing i.i.d. samples produced by the
* input DistributionGenerator.
* sc.defaultParallelism used for the number of partitions in the RDD.
*
@@ -460,7 +478,7 @@ object RandomRDDGenerators {
* @param generator DistributionGenerator used to populate the RDD.
* @param numRows Number of Vectors in the RDD.
* @param numCols Number of elements in each Vector.
- * @return RDD[Vector] with vectors containing i.i.d samples produced by generator.
+ * @return RDD[Vector] with vectors containing i.i.d. samples produced by generator.
*/
@Experimental
def randomVectorRDD(sc: SparkContext,