aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorXiangrui Meng <meng@databricks.com>2014-08-19 16:06:48 -0700
committerXiangrui Meng <meng@databricks.com>2014-08-19 16:06:48 -0700
commit825d4fe47b9c4d48de88622dd48dcf83beb8b80a (patch)
treed51775e9f88bff51458e57a5ec16de6e0b93b91a
parentd7e80c2597d4a9cae2e0cb35a86f7889323f4cbb (diff)
downloadspark-825d4fe47b9c4d48de88622dd48dcf83beb8b80a.tar.gz
spark-825d4fe47b9c4d48de88622dd48dcf83beb8b80a.tar.bz2
spark-825d4fe47b9c4d48de88622dd48dcf83beb8b80a.zip
[SPARK-3136][MLLIB] Create Java-friendly methods in RandomRDDs
Though we don't use default argument for methods in RandomRDDs, it is still not easy for Java users to use because the output type is either `RDD[Double]` or `RDD[Vector]`. Java users should expect `JavaDoubleRDD` and `JavaRDD[Vector]`, respectively. We should create dedicated methods for Java users, and allow default arguments in Scala methods in RandomRDDs, to make life easier for both Java and Scala users. This PR also contains documentation for random data generation. brkyvz Author: Xiangrui Meng <meng@databricks.com> Closes #2041 from mengxr/stat-doc and squashes the following commits: fc5eedf [Xiangrui Meng] add missing comma ffde810 [Xiangrui Meng] address comments aef6d07 [Xiangrui Meng] add doc for random data generation b99d94b [Xiangrui Meng] add java-friendly methods to RandomRDDs
-rw-r--r--docs/mllib-guide.md2
-rw-r--r--docs/mllib-stats.md74
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala18
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala476
-rw-r--r--mllib/src/test/java/org/apache/spark/mllib/random/JavaRandomRDDsSuite.java134
-rw-r--r--python/pyspark/mllib/random.py20
6 files changed, 418 insertions, 306 deletions
diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md
index 23d5a0c460..ca0a84a8c5 100644
--- a/docs/mllib-guide.md
+++ b/docs/mllib-guide.md
@@ -9,7 +9,7 @@ filtering, dimensionality reduction, as well as underlying optimization primitiv
* [Data types](mllib-basics.html)
* [Basic statistics](mllib-stats.html)
- * data generators
+ * random data generation
* stratified sampling
* summary statistics
* hypothesis testing
diff --git a/docs/mllib-stats.md b/docs/mllib-stats.md
index ca9ef46c15..f25dca746b 100644
--- a/docs/mllib-stats.md
+++ b/docs/mllib-stats.md
@@ -25,7 +25,79 @@ displayTitle: <a href="mllib-guide.html">MLlib</a> - Statistics Functionality
\newcommand{\zero}{\mathbf{0}}
\]`
-## Data Generators
+## Random data generation
+
+Random data generation is useful for randomized algorithms, prototyping, and performance testing.
+MLlib supports generating random RDDs with i.i.d. values drawn from a given distribution:
+uniform, standard normal, or Poisson.
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+[`RandomRDDs`](api/scala/index.html#org.apache.spark.mllib.random.RandomRDDs) provides factory
+methods to generate random double RDDs or vector RDDs.
+The following example generates a random double RDD, whose values follows the standard normal
+distribution `N(0, 1)`, and then map it to `N(1, 4)`.
+
+{% highlight scala %}
+import org.apache.spark.SparkContext
+import org.apache.spark.mllib.random.RandomRDDs._
+
+val sc: SparkContext = ...
+
+// Generate a random double RDD that contains 1 million i.i.d. values drawn from the
+// standard normal distribution `N(0, 1)`, evenly distributed in 10 partitions.
+val u = normalRDD(sc, 1000000L, 10)
+// Apply a transform to get a random double RDD following `N(1, 4)`.
+val v = u.map(x => 1.0 + 2.0 * x)
+{% endhighlight %}
+</div>
+
+<div data-lang="java" markdown="1">
+[`RandomRDDs`](api/java/index.html#org.apache.spark.mllib.random.RandomRDDs) provides factory
+methods to generate random double RDDs or vector RDDs.
+The following example generates a random double RDD, whose values follows the standard normal
+distribution `N(0, 1)`, and then map it to `N(1, 4)`.
+
+{% highlight java %}
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.JavaDoubleRDD;
+import static org.apache.spark.mllib.random.RandomRDDs.*;
+
+JavaSparkContext jsc = ...
+
+// Generate a random double RDD that contains 1 million i.i.d. values drawn from the
+// standard normal distribution `N(0, 1)`, evenly distributed in 10 partitions.
+JavaDoubleRDD u = normalJavaRDD(jsc, 1000000L, 10);
+// Apply a transform to get a random double RDD following `N(1, 4)`.
+JavaDoubleRDD v = u.map(
+ new Function<Double, Double>() {
+ public Double call(Double x) {
+ return 1.0 + 2.0 * x;
+ }
+ });
+{% endhighlight %}
+</div>
+
+<div data-lang="python" markdown="1">
+[`RandomRDDs`](api/python/pyspark.mllib.random.RandomRDDs-class.html) provides factory
+methods to generate random double RDDs or vector RDDs.
+The following example generates a random double RDD, whose values follows the standard normal
+distribution `N(0, 1)`, and then map it to `N(1, 4)`.
+
+{% highlight python %}
+from pyspark.mllib.random import RandomRDDs
+
+sc = ... # SparkContext
+
+# Generate a random double RDD that contains 1 million i.i.d. values drawn from the
+# standard normal distribution `N(0, 1)`, evenly distributed in 10 partitions.
+u = RandomRDDs.uniformRDD(sc, 1000000L, 10)
+# Apply a transform to get a random double RDD following `N(1, 4)`.
+v = u.map(lambda x: 1.0 + 2.0 * x)
+{% endhighlight %}
+</div>
+
+</div>
## Stratified Sampling
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala
index 9cab49f6ed..28179fbc45 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala
@@ -20,14 +20,14 @@ package org.apache.spark.mllib.random
import cern.jet.random.Poisson
import cern.jet.random.engine.DRand
-import org.apache.spark.annotation.Experimental
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom}
/**
- * :: Experimental ::
+ * :: DeveloperApi ::
* Trait for random data generators that generate i.i.d. data.
*/
-@Experimental
+@DeveloperApi
trait RandomDataGenerator[T] extends Pseudorandom with Serializable {
/**
@@ -43,10 +43,10 @@ trait RandomDataGenerator[T] extends Pseudorandom with Serializable {
}
/**
- * :: Experimental ::
+ * :: DeveloperApi ::
* Generates i.i.d. samples from U[0.0, 1.0]
*/
-@Experimental
+@DeveloperApi
class UniformGenerator extends RandomDataGenerator[Double] {
// XORShiftRandom for better performance. Thread safety isn't necessary here.
@@ -62,10 +62,10 @@ class UniformGenerator extends RandomDataGenerator[Double] {
}
/**
- * :: Experimental ::
+ * :: DeveloperApi ::
* Generates i.i.d. samples from the standard normal distribution.
*/
-@Experimental
+@DeveloperApi
class StandardNormalGenerator extends RandomDataGenerator[Double] {
// XORShiftRandom for better performance. Thread safety isn't necessary here.
@@ -81,12 +81,12 @@ class StandardNormalGenerator extends RandomDataGenerator[Double] {
}
/**
- * :: Experimental ::
+ * :: DeveloperApi ::
* Generates i.i.d. samples from the Poisson distribution with the given mean.
*
* @param mean mean for the Poisson distribution.
*/
-@Experimental
+@DeveloperApi
class PoissonGenerator(val mean: Double) extends RandomDataGenerator[Double] {
private var rng = new Poisson(mean, new DRand)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala
index 3627036952..c5f4b08432 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala
@@ -20,9 +20,10 @@ package org.apache.spark.mllib.random
import scala.reflect.ClassTag
import org.apache.spark.SparkContext
-import org.apache.spark.annotation.Experimental
+import org.apache.spark.annotation.{DeveloperApi, Experimental}
+import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD, JavaSparkContext}
import org.apache.spark.mllib.linalg.Vector
-import org.apache.spark.mllib.rdd.{RandomVectorRDD, RandomRDD}
+import org.apache.spark.mllib.rdd.{RandomRDD, RandomVectorRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils
@@ -34,335 +35,279 @@ import org.apache.spark.util.Utils
object RandomRDDs {
/**
- * :: Experimental ::
- * Generates an RDD comprised of i.i.d. samples from the uniform distribution on [0.0, 1.0].
+ * Generates an RDD comprised of i.i.d. samples from the uniform distribution `U(0.0, 1.0)`.
*
- * To transform the distribution in the generated RDD from U[0.0, 1.0] to U[a, b], use
- * `RandomRDDGenerators.uniformRDD(sc, n, p, seed).map(v => a + (b - a) * v)`.
+ * To transform the distribution in the generated RDD from `U(0.0, 1.0)` to `U(a, b)`, use
+ * `RandomRDDs.uniformRDD(sc, n, p, seed).map(v => a + (b - a) * v)`.
*
* @param sc SparkContext used to create the RDD.
* @param size Size of the RDD.
- * @param numPartitions Number of partitions in the RDD.
- * @param seed Seed for the RNG that generates the seed for the generator in each partition.
- * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0].
+ * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`).
+ * @param seed Random seed (default: a random long integer).
+ * @return RDD[Double] comprised of i.i.d. samples ~ `U(0.0, 1.0)`.
*/
- @Experimental
- def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = {
+ def uniformRDD(
+ sc: SparkContext,
+ size: Long,
+ numPartitions: Int = 0,
+ seed: Long = Utils.random.nextLong()): RDD[Double] = {
val uniform = new UniformGenerator()
- randomRDD(sc, uniform, size, numPartitions, seed)
+ randomRDD(sc, uniform, size, numPartitionsOrDefault(sc, numPartitions), seed)
}
/**
- * :: Experimental ::
- * Generates an RDD comprised of i.i.d. samples from the uniform distribution on [0.0, 1.0].
- *
- * To transform the distribution in the generated RDD from U[0.0, 1.0] to U[a, b], use
- * `RandomRDDGenerators.uniformRDD(sc, n, p).map(v => a + (b - a) * v)`.
- *
- * @param sc SparkContext used to create the RDD.
- * @param size Size of the RDD.
- * @param numPartitions Number of partitions in the RDD.
- * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0].
+ * Java-friendly version of [[RandomRDDs#uniformRDD]].
*/
- @Experimental
- def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = {
- uniformRDD(sc, size, numPartitions, Utils.random.nextLong)
+ def uniformJavaRDD(
+ jsc: JavaSparkContext,
+ size: Long,
+ numPartitions: Int,
+ seed: Long): JavaDoubleRDD = {
+ JavaDoubleRDD.fromRDD(uniformRDD(jsc.sc, size, numPartitions, seed))
}
/**
- * :: Experimental ::
- * Generates an RDD comprised of i.i.d. samples from the uniform distribution on [0.0, 1.0].
- * sc.defaultParallelism used for the number of partitions in the RDD.
- *
- * To transform the distribution in the generated RDD from U[0.0, 1.0] to U[a, b], use
- * `RandomRDDGenerators.uniformRDD(sc, n).map(v => a + (b - a) * v)`.
- *
- * @param sc SparkContext used to create the RDD.
- * @param size Size of the RDD.
- * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0].
+ * [[RandomRDDs#uniformJavaRDD]] with the default seed.
*/
- @Experimental
- def uniformRDD(sc: SparkContext, size: Long): RDD[Double] = {
- uniformRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong)
+ def uniformJavaRDD(jsc: JavaSparkContext, size: Long, numPartitions: Int): JavaDoubleRDD = {
+ JavaDoubleRDD.fromRDD(uniformRDD(jsc.sc, size, numPartitions))
}
/**
- * :: Experimental ::
- * Generates an RDD comprised of i.i.d. samples from the standard normal distribution.
- *
- * To transform the distribution in the generated RDD from standard normal to some other normal
- * N(mean, sigma), use `RandomRDDGenerators.normalRDD(sc, n, p, seed).map(v => mean + sigma * v)`.
- *
- * @param sc SparkContext used to create the RDD.
- * @param size Size of the RDD.
- * @param numPartitions Number of partitions in the RDD.
- * @param seed Seed for the RNG that generates the seed for the generator in each partition.
- * @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0).
+ * [[RandomRDDs#uniformJavaRDD]] with the default number of partitions and the default seed.
*/
- @Experimental
- def normalRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = {
- val normal = new StandardNormalGenerator()
- randomRDD(sc, normal, size, numPartitions, seed)
+ def uniformJavaRDD(jsc: JavaSparkContext, size: Long): JavaDoubleRDD = {
+ JavaDoubleRDD.fromRDD(uniformRDD(jsc.sc, size))
}
/**
- * :: Experimental ::
* Generates an RDD comprised of i.i.d. samples from the standard normal distribution.
*
* To transform the distribution in the generated RDD from standard normal to some other normal
- * N(mean, sigma), use `RandomRDDGenerators.normalRDD(sc, n, p).map(v => mean + sigma * v)`.
+ * `N(mean, sigma^2^)`, use `RandomRDDs.normalRDD(sc, n, p, seed).map(v => mean + sigma * v)`.
*
* @param sc SparkContext used to create the RDD.
* @param size Size of the RDD.
- * @param numPartitions Number of partitions in the RDD.
+ * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`).
+ * @param seed Random seed (default: a random long integer).
* @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0).
*/
- @Experimental
- def normalRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = {
- normalRDD(sc, size, numPartitions, Utils.random.nextLong)
+ def normalRDD(
+ sc: SparkContext,
+ size: Long,
+ numPartitions: Int = 0,
+ seed: Long = Utils.random.nextLong()): RDD[Double] = {
+ val normal = new StandardNormalGenerator()
+ randomRDD(sc, normal, size, numPartitionsOrDefault(sc, numPartitions), seed)
}
/**
- * :: Experimental ::
- * Generates an RDD comprised of i.i.d. samples from the standard normal distribution.
- * sc.defaultParallelism used for the number of partitions in the RDD.
- *
- * To transform the distribution in the generated RDD from standard normal to some other normal
- * N(mean, sigma), use `RandomRDDGenerators.normalRDD(sc, n).map(v => mean + sigma * v)`.
- *
- * @param sc SparkContext used to create the RDD.
- * @param size Size of the RDD.
- * @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0).
+ * Java-friendly version of [[RandomRDDs#normalRDD]].
*/
- @Experimental
- def normalRDD(sc: SparkContext, size: Long): RDD[Double] = {
- normalRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong)
+ def normalJavaRDD(
+ jsc: JavaSparkContext,
+ size: Long,
+ numPartitions: Int,
+ seed: Long): JavaDoubleRDD = {
+ JavaDoubleRDD.fromRDD(normalRDD(jsc.sc, size, numPartitions, seed))
}
/**
- * :: Experimental ::
- * Generates an RDD comprised of i.i.d. samples from the Poisson distribution with the input mean.
- *
- * @param sc SparkContext used to create the RDD.
- * @param mean Mean, or lambda, for the Poisson distribution.
- * @param size Size of the RDD.
- * @param numPartitions Number of partitions in the RDD.
- * @param seed Seed for the RNG that generates the seed for the generator in each partition.
- * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean).
+ * [[RandomRDDs#normalJavaRDD]] with the default seed.
*/
- @Experimental
- def poissonRDD(sc: SparkContext,
- mean: Double,
- size: Long,
- numPartitions: Int,
- seed: Long): RDD[Double] = {
- val poisson = new PoissonGenerator(mean)
- randomRDD(sc, poisson, size, numPartitions, seed)
+ def normalJavaRDD(jsc: JavaSparkContext, size: Long, numPartitions: Int): JavaDoubleRDD = {
+ JavaDoubleRDD.fromRDD(normalRDD(jsc.sc, size, numPartitions))
}
/**
- * :: Experimental ::
- * Generates an RDD comprised of i.i.d. samples from the Poisson distribution with the input mean.
- *
- * @param sc SparkContext used to create the RDD.
- * @param mean Mean, or lambda, for the Poisson distribution.
- * @param size Size of the RDD.
- * @param numPartitions Number of partitions in the RDD.
- * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean).
+ * [[RandomRDDs#normalJavaRDD]] with the default number of partitions and the default seed.
*/
- @Experimental
- def poissonRDD(sc: SparkContext, mean: Double, size: Long, numPartitions: Int): RDD[Double] = {
- poissonRDD(sc, mean, size, numPartitions, Utils.random.nextLong)
+ def normalJavaRDD(jsc: JavaSparkContext, size: Long): JavaDoubleRDD = {
+ JavaDoubleRDD.fromRDD(normalRDD(jsc.sc, size))
}
/**
- * :: Experimental ::
* Generates an RDD comprised of i.i.d. samples from the Poisson distribution with the input mean.
- * sc.defaultParallelism used for the number of partitions in the RDD.
*
* @param sc SparkContext used to create the RDD.
* @param mean Mean, or lambda, for the Poisson distribution.
* @param size Size of the RDD.
+ * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`).
+ * @param seed Random seed (default: a random long integer).
* @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean).
*/
- @Experimental
- def poissonRDD(sc: SparkContext, mean: Double, size: Long): RDD[Double] = {
- poissonRDD(sc, mean, size, sc.defaultParallelism, Utils.random.nextLong)
+ def poissonRDD(
+ sc: SparkContext,
+ mean: Double,
+ size: Long,
+ numPartitions: Int = 0,
+ seed: Long = Utils.random.nextLong()): RDD[Double] = {
+ val poisson = new PoissonGenerator(mean)
+ randomRDD(sc, poisson, size, numPartitionsOrDefault(sc, numPartitions), seed)
}
/**
- * :: Experimental ::
- * Generates an RDD comprised of i.i.d. samples produced by the input DistributionGenerator.
- *
- * @param sc SparkContext used to create the RDD.
- * @param generator DistributionGenerator used to populate the RDD.
- * @param size Size of the RDD.
- * @param numPartitions Number of partitions in the RDD.
- * @param seed Seed for the RNG that generates the seed for the generator in each partition.
- * @return RDD[Double] comprised of i.i.d. samples produced by generator.
+ * Java-friendly version of [[RandomRDDs#poissonRDD]].
*/
- @Experimental
- def randomRDD[T: ClassTag](sc: SparkContext,
- generator: RandomDataGenerator[T],
+ def poissonJavaRDD(
+ jsc: JavaSparkContext,
+ mean: Double,
size: Long,
numPartitions: Int,
- seed: Long): RDD[T] = {
- new RandomRDD[T](sc, size, numPartitions, generator, seed)
+ seed: Long): JavaDoubleRDD = {
+ JavaDoubleRDD.fromRDD(poissonRDD(jsc.sc, mean, size, numPartitions, seed))
}
/**
- * :: Experimental ::
- * Generates an RDD comprised of i.i.d. samples produced by the input DistributionGenerator.
- *
- * @param sc SparkContext used to create the RDD.
- * @param generator DistributionGenerator used to populate the RDD.
- * @param size Size of the RDD.
- * @param numPartitions Number of partitions in the RDD.
- * @return RDD[Double] comprised of i.i.d. samples produced by generator.
+ * [[RandomRDDs#poissonJavaRDD]] with the default seed.
*/
- @Experimental
- def randomRDD[T: ClassTag](sc: SparkContext,
- generator: RandomDataGenerator[T],
+ def poissonJavaRDD(
+ jsc: JavaSparkContext,
+ mean: Double,
size: Long,
- numPartitions: Int): RDD[T] = {
- randomRDD[T](sc, generator, size, numPartitions, Utils.random.nextLong)
+ numPartitions: Int): JavaDoubleRDD = {
+ JavaDoubleRDD.fromRDD(poissonRDD(jsc.sc, mean, size, numPartitions))
}
/**
- * :: Experimental ::
- * Generates an RDD comprised of i.i.d. samples produced by the input DistributionGenerator.
- * sc.defaultParallelism used for the number of partitions in the RDD.
+ * [[RandomRDDs#poissonJavaRDD]] with the default number of partitions and the default seed.
+ */
+ def poissonJavaRDD(jsc: JavaSparkContext, mean: Double, size: Long): JavaDoubleRDD = {
+ JavaDoubleRDD.fromRDD(poissonRDD(jsc.sc, mean, size))
+ }
+
+ /**
+ * :: DeveloperApi ::
+ * Generates an RDD comprised of i.i.d. samples produced by the input RandomDataGenerator.
*
* @param sc SparkContext used to create the RDD.
- * @param generator DistributionGenerator used to populate the RDD.
+ * @param generator RandomDataGenerator used to populate the RDD.
* @param size Size of the RDD.
+ * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`).
+ * @param seed Random seed (default: a random long integer).
* @return RDD[Double] comprised of i.i.d. samples produced by generator.
*/
- @Experimental
- def randomRDD[T: ClassTag](sc: SparkContext,
+ @DeveloperApi
+ def randomRDD[T: ClassTag](
+ sc: SparkContext,
generator: RandomDataGenerator[T],
- size: Long): RDD[T] = {
- randomRDD[T](sc, generator, size, sc.defaultParallelism, Utils.random.nextLong)
+ size: Long,
+ numPartitions: Int = 0,
+ seed: Long = Utils.random.nextLong()): RDD[T] = {
+ new RandomRDD[T](sc, size, numPartitionsOrDefault(sc, numPartitions), generator, seed)
}
// TODO Generate RDD[Vector] from multivariate distributions.
/**
- * :: Experimental ::
* Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
- * uniform distribution on [0.0 1.0].
+ * uniform distribution on `U(0.0, 1.0)`.
*
* @param sc SparkContext used to create the RDD.
* @param numRows Number of Vectors in the RDD.
* @param numCols Number of elements in each Vector.
* @param numPartitions Number of partitions in the RDD.
* @param seed Seed for the RNG that generates the seed for the generator in each partition.
- * @return RDD[Vector] with vectors containing i.i.d samples ~ U[0.0, 1.0].
+ * @return RDD[Vector] with vectors containing i.i.d samples ~ `U(0.0, 1.0)`.
*/
- @Experimental
- def uniformVectorRDD(sc: SparkContext,
+ def uniformVectorRDD(
+ sc: SparkContext,
numRows: Long,
numCols: Int,
- numPartitions: Int,
- seed: Long): RDD[Vector] = {
+ numPartitions: Int = 0,
+ seed: Long = Utils.random.nextLong()): RDD[Vector] = {
val uniform = new UniformGenerator()
- randomVectorRDD(sc, uniform, numRows, numCols, numPartitions, seed)
+ randomVectorRDD(sc, uniform, numRows, numCols, numPartitionsOrDefault(sc, numPartitions), seed)
}
/**
- * :: Experimental ::
- * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
- * uniform distribution on [0.0 1.0].
- *
- * @param sc SparkContext used to create the RDD.
- * @param numRows Number of Vectors in the RDD.
- * @param numCols Number of elements in each Vector.
- * @param numPartitions Number of partitions in the RDD.
- * @return RDD[Vector] with vectors containing i.i.d. samples ~ U[0.0, 1.0].
+ * Java-friendly version of [[RandomRDDs#uniformVectorRDD]].
*/
- @Experimental
- def uniformVectorRDD(sc: SparkContext,
+ def uniformJavaVectorRDD(
+ jsc: JavaSparkContext,
numRows: Long,
numCols: Int,
- numPartitions: Int): RDD[Vector] = {
- uniformVectorRDD(sc, numRows, numCols, numPartitions, Utils.random.nextLong)
+ numPartitions: Int,
+ seed: Long): JavaRDD[Vector] = {
+ uniformVectorRDD(jsc.sc, numRows, numCols, numPartitions, seed).toJavaRDD()
}
/**
- * :: Experimental ::
- * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
- * uniform distribution on [0.0 1.0].
- * sc.defaultParallelism used for the number of partitions in the RDD.
- *
- * @param sc SparkContext used to create the RDD.
- * @param numRows Number of Vectors in the RDD.
- * @param numCols Number of elements in each Vector.
- * @return RDD[Vector] with vectors containing i.i.d. samples ~ U[0.0, 1.0].
+ * [[RandomRDDs#uniformJavaVectorRDD]] with the default seed.
*/
- @Experimental
- def uniformVectorRDD(sc: SparkContext, numRows: Long, numCols: Int): RDD[Vector] = {
- uniformVectorRDD(sc, numRows, numCols, sc.defaultParallelism, Utils.random.nextLong)
+ def uniformJavaVectorRDD(
+ jsc: JavaSparkContext,
+ numRows: Long,
+ numCols: Int,
+ numPartitions: Int): JavaRDD[Vector] = {
+ uniformVectorRDD(jsc.sc, numRows, numCols, numPartitions).toJavaRDD()
+ }
+
+ /**
+ * [[RandomRDDs#uniformJavaVectorRDD]] with the default number of partitions and the default seed.
+ */
+ def uniformJavaVectorRDD(
+ jsc: JavaSparkContext,
+ numRows: Long,
+ numCols: Int): JavaRDD[Vector] = {
+ uniformVectorRDD(jsc.sc, numRows, numCols).toJavaRDD()
}
/**
- * :: Experimental ::
* Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
* standard normal distribution.
*
* @param sc SparkContext used to create the RDD.
* @param numRows Number of Vectors in the RDD.
* @param numCols Number of elements in each Vector.
- * @param numPartitions Number of partitions in the RDD.
- * @param seed Seed for the RNG that generates the seed for the generator in each partition.
- * @return RDD[Vector] with vectors containing i.i.d. samples ~ N(0.0, 1.0).
+ * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`).
+ * @param seed Random seed (default: a random long integer).
+ * @return RDD[Vector] with vectors containing i.i.d. samples ~ `N(0.0, 1.0)`.
+ */
+ def normalVectorRDD(
+ sc: SparkContext,
+ numRows: Long,
+ numCols: Int,
+ numPartitions: Int = 0,
+ seed: Long = Utils.random.nextLong()): RDD[Vector] = {
+ val normal = new StandardNormalGenerator()
+ randomVectorRDD(sc, normal, numRows, numCols, numPartitionsOrDefault(sc, numPartitions), seed)
+ }
+
+ /**
+ * Java-friendly version of [[RandomRDDs#normalVectorRDD]].
*/
- @Experimental
- def normalVectorRDD(sc: SparkContext,
+ def normalJavaVectorRDD(
+ jsc: JavaSparkContext,
numRows: Long,
numCols: Int,
numPartitions: Int,
- seed: Long): RDD[Vector] = {
- val uniform = new StandardNormalGenerator()
- randomVectorRDD(sc, uniform, numRows, numCols, numPartitions, seed)
+ seed: Long): JavaRDD[Vector] = {
+ normalVectorRDD(jsc.sc, numRows, numCols, numPartitions, seed).toJavaRDD()
}
/**
- * :: Experimental ::
- * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
- * standard normal distribution.
- *
- * @param sc SparkContext used to create the RDD.
- * @param numRows Number of Vectors in the RDD.
- * @param numCols Number of elements in each Vector.
- * @param numPartitions Number of partitions in the RDD.
- * @return RDD[Vector] with vectors containing i.i.d. samples ~ N(0.0, 1.0).
+ * [[RandomRDDs#normalJavaVectorRDD]] with the default seed.
*/
- @Experimental
- def normalVectorRDD(sc: SparkContext,
+ def normalJavaVectorRDD(
+ jsc: JavaSparkContext,
numRows: Long,
numCols: Int,
- numPartitions: Int): RDD[Vector] = {
- normalVectorRDD(sc, numRows, numCols, numPartitions, Utils.random.nextLong)
+ numPartitions: Int): JavaRDD[Vector] = {
+ normalVectorRDD(jsc.sc, numRows, numCols, numPartitions).toJavaRDD()
}
/**
- * :: Experimental ::
- * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
- * standard normal distribution.
- * sc.defaultParallelism used for the number of partitions in the RDD.
- *
- * @param sc SparkContext used to create the RDD.
- * @param numRows Number of Vectors in the RDD.
- * @param numCols Number of elements in each Vector.
- * @return RDD[Vector] with vectors containing i.i.d. samples ~ N(0.0, 1.0).
+ * [[RandomRDDs#normalJavaVectorRDD]] with the default number of partitions and the default seed.
*/
- @Experimental
- def normalVectorRDD(sc: SparkContext, numRows: Long, numCols: Int): RDD[Vector] = {
- normalVectorRDD(sc, numRows, numCols, sc.defaultParallelism, Utils.random.nextLong)
+ def normalJavaVectorRDD(
+ jsc: JavaSparkContext,
+ numRows: Long,
+ numCols: Int): JavaRDD[Vector] = {
+ normalVectorRDD(jsc.sc, numRows, numCols).toJavaRDD()
}
/**
- * :: Experimental ::
* Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
* Poisson distribution with the input mean.
*
@@ -370,124 +315,85 @@ object RandomRDDs {
* @param mean Mean, or lambda, for the Poisson distribution.
* @param numRows Number of Vectors in the RDD.
* @param numCols Number of elements in each Vector.
- * @param numPartitions Number of partitions in the RDD.
- * @param seed Seed for the RNG that generates the seed for the generator in each partition.
+ * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`)
+ * @param seed Random seed (default: a random long integer).
* @return RDD[Vector] with vectors containing i.i.d. samples ~ Pois(mean).
*/
- @Experimental
- def poissonVectorRDD(sc: SparkContext,
+ def poissonVectorRDD(
+ sc: SparkContext,
mean: Double,
numRows: Long,
numCols: Int,
- numPartitions: Int,
- seed: Long): RDD[Vector] = {
+ numPartitions: Int = 0,
+ seed: Long = Utils.random.nextLong()): RDD[Vector] = {
val poisson = new PoissonGenerator(mean)
- randomVectorRDD(sc, poisson, numRows, numCols, numPartitions, seed)
+ randomVectorRDD(sc, poisson, numRows, numCols, numPartitionsOrDefault(sc, numPartitions), seed)
}
/**
- * :: Experimental ::
- * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
- * Poisson distribution with the input mean.
- *
- * @param sc SparkContext used to create the RDD.
- * @param mean Mean, or lambda, for the Poisson distribution.
- * @param numRows Number of Vectors in the RDD.
- * @param numCols Number of elements in each Vector.
- * @param numPartitions Number of partitions in the RDD.
- * @return RDD[Vector] with vectors containing i.i.d. samples ~ Pois(mean).
+ * Java-friendly version of [[RandomRDDs#poissonVectorRDD]].
*/
- @Experimental
- def poissonVectorRDD(sc: SparkContext,
+ def poissonJavaVectorRDD(
+ jsc: JavaSparkContext,
mean: Double,
numRows: Long,
numCols: Int,
- numPartitions: Int): RDD[Vector] = {
- poissonVectorRDD(sc, mean, numRows, numCols, numPartitions, Utils.random.nextLong)
+ numPartitions: Int,
+ seed: Long): JavaRDD[Vector] = {
+ poissonVectorRDD(jsc.sc, mean, numRows, numCols, numPartitions, seed).toJavaRDD()
}
/**
- * :: Experimental ::
- * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
- * Poisson distribution with the input mean.
- * sc.defaultParallelism used for the number of partitions in the RDD.
- *
- * @param sc SparkContext used to create the RDD.
- * @param mean Mean, or lambda, for the Poisson distribution.
- * @param numRows Number of Vectors in the RDD.
- * @param numCols Number of elements in each Vector.
- * @return RDD[Vector] with vectors containing i.i.d. samples ~ Pois(mean).
+ * [[RandomRDDs#poissonJavaVectorRDD]] with the default seed.
*/
- @Experimental
- def poissonVectorRDD(sc: SparkContext,
+ def poissonJavaVectorRDD(
+ jsc: JavaSparkContext,
mean: Double,
numRows: Long,
- numCols: Int): RDD[Vector] = {
- poissonVectorRDD(sc, mean, numRows, numCols, sc.defaultParallelism, Utils.random.nextLong)
+ numCols: Int,
+ numPartitions: Int): JavaRDD[Vector] = {
+ poissonVectorRDD(jsc.sc, mean, numRows, numCols, numPartitions).toJavaRDD()
}
/**
- * :: Experimental ::
- * Generates an RDD[Vector] with vectors containing i.i.d. samples produced by the
- * input DistributionGenerator.
- *
- * @param sc SparkContext used to create the RDD.
- * @param generator DistributionGenerator used to populate the RDD.
- * @param numRows Number of Vectors in the RDD.
- * @param numCols Number of elements in each Vector.
- * @param numPartitions Number of partitions in the RDD.
- * @param seed Seed for the RNG that generates the seed for the generator in each partition.
- * @return RDD[Vector] with vectors containing i.i.d. samples produced by generator.
+ * [[RandomRDDs#poissonJavaVectorRDD]] with the default number of partitions and the default seed.
*/
- @Experimental
- def randomVectorRDD(sc: SparkContext,
- generator: RandomDataGenerator[Double],
+ def poissonJavaVectorRDD(
+ jsc: JavaSparkContext,
+ mean: Double,
numRows: Long,
- numCols: Int,
- numPartitions: Int,
- seed: Long): RDD[Vector] = {
- new RandomVectorRDD(sc, numRows, numCols, numPartitions, generator, seed)
+ numCols: Int): JavaRDD[Vector] = {
+ poissonVectorRDD(jsc.sc, mean, numRows, numCols).toJavaRDD()
}
/**
- * :: Experimental ::
+ * :: DeveloperApi ::
* Generates an RDD[Vector] with vectors containing i.i.d. samples produced by the
- * input DistributionGenerator.
+ * input RandomDataGenerator.
*
* @param sc SparkContext used to create the RDD.
- * @param generator DistributionGenerator used to populate the RDD.
+ * @param generator RandomDataGenerator used to populate the RDD.
* @param numRows Number of Vectors in the RDD.
* @param numCols Number of elements in each Vector.
- * @param numPartitions Number of partitions in the RDD.
+ * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`).
+ * @param seed Random seed (default: a random long integer).
* @return RDD[Vector] with vectors containing i.i.d. samples produced by generator.
*/
- @Experimental
+ @DeveloperApi
def randomVectorRDD(sc: SparkContext,
generator: RandomDataGenerator[Double],
numRows: Long,
numCols: Int,
- numPartitions: Int): RDD[Vector] = {
- randomVectorRDD(sc, generator, numRows, numCols, numPartitions, Utils.random.nextLong)
+ numPartitions: Int = 0,
+ seed: Long = Utils.random.nextLong()): RDD[Vector] = {
+ new RandomVectorRDD(
+ sc, numRows, numCols, numPartitionsOrDefault(sc, numPartitions), generator, seed)
}
/**
- * :: Experimental ::
- * Generates an RDD[Vector] with vectors containing i.i.d. samples produced by the
- * input DistributionGenerator.
- * sc.defaultParallelism used for the number of partitions in the RDD.
- *
- * @param sc SparkContext used to create the RDD.
- * @param generator DistributionGenerator used to populate the RDD.
- * @param numRows Number of Vectors in the RDD.
- * @param numCols Number of elements in each Vector.
- * @return RDD[Vector] with vectors containing i.i.d. samples produced by generator.
+ * Returns `numPartitions` if it is positive, or `sc.defaultParallelism` otherwise.
*/
- @Experimental
- def randomVectorRDD(sc: SparkContext,
- generator: RandomDataGenerator[Double],
- numRows: Long,
- numCols: Int): RDD[Vector] = {
- randomVectorRDD(sc, generator, numRows, numCols,
- sc.defaultParallelism, Utils.random.nextLong)
+ private def numPartitionsOrDefault(sc: SparkContext, numPartitions: Int): Int = {
+ if (numPartitions > 0) numPartitions else sc.defaultMinPartitions
}
}
diff --git a/mllib/src/test/java/org/apache/spark/mllib/random/JavaRandomRDDsSuite.java b/mllib/src/test/java/org/apache/spark/mllib/random/JavaRandomRDDsSuite.java
new file mode 100644
index 0000000000..a725736ca1
--- /dev/null
+++ b/mllib/src/test/java/org/apache/spark/mllib/random/JavaRandomRDDsSuite.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.random;
+
+import com.google.common.collect.Lists;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.Assert;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.api.java.JavaDoubleRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.mllib.linalg.Vector;
+import static org.apache.spark.mllib.random.RandomRDDs.*;
+
+public class JavaRandomRDDsSuite {
+ private transient JavaSparkContext sc;
+
+ @Before
+ public void setUp() {
+ sc = new JavaSparkContext("local", "JavaRandomRDDsSuite");
+ }
+
+ @After
+ public void tearDown() {
+ sc.stop();
+ sc = null;
+ }
+
+ @Test
+ public void testUniformRDD() {
+ long m = 1000L;
+ int p = 2;
+ long seed = 1L;
+ JavaDoubleRDD rdd1 = uniformJavaRDD(sc, m);
+ JavaDoubleRDD rdd2 = uniformJavaRDD(sc, m, p);
+ JavaDoubleRDD rdd3 = uniformJavaRDD(sc, m, p, seed);
+ for (JavaDoubleRDD rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) {
+ Assert.assertEquals(m, rdd.count());
+ }
+ }
+
+ @Test
+ public void testNormalRDD() {
+ long m = 1000L;
+ int p = 2;
+ long seed = 1L;
+ JavaDoubleRDD rdd1 = normalJavaRDD(sc, m);
+ JavaDoubleRDD rdd2 = normalJavaRDD(sc, m, p);
+ JavaDoubleRDD rdd3 = normalJavaRDD(sc, m, p, seed);
+ for (JavaDoubleRDD rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) {
+ Assert.assertEquals(m, rdd.count());
+ }
+ }
+
+ @Test
+ public void testPoissonRDD() {
+ double mean = 2.0;
+ long m = 1000L;
+ int p = 2;
+ long seed = 1L;
+ JavaDoubleRDD rdd1 = poissonJavaRDD(sc, mean, m);
+ JavaDoubleRDD rdd2 = poissonJavaRDD(sc, mean, m, p);
+ JavaDoubleRDD rdd3 = poissonJavaRDD(sc, mean, m, p, seed);
+ for (JavaDoubleRDD rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) {
+ Assert.assertEquals(m, rdd.count());
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testUniformVectorRDD() {
+ long m = 100L;
+ int n = 10;
+ int p = 2;
+ long seed = 1L;
+ JavaRDD<Vector> rdd1 = uniformJavaVectorRDD(sc, m, n);
+ JavaRDD<Vector> rdd2 = uniformJavaVectorRDD(sc, m, n, p);
+ JavaRDD<Vector> rdd3 = uniformJavaVectorRDD(sc, m, n, p, seed);
+ for (JavaRDD<Vector> rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) {
+ Assert.assertEquals(m, rdd.count());
+ Assert.assertEquals(n, rdd.first().size());
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testNormalVectorRDD() {
+ long m = 100L;
+ int n = 10;
+ int p = 2;
+ long seed = 1L;
+ JavaRDD<Vector> rdd1 = normalJavaVectorRDD(sc, m, n);
+ JavaRDD<Vector> rdd2 = normalJavaVectorRDD(sc, m, n, p);
+ JavaRDD<Vector> rdd3 = normalJavaVectorRDD(sc, m, n, p, seed);
+ for (JavaRDD<Vector> rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) {
+ Assert.assertEquals(m, rdd.count());
+ Assert.assertEquals(n, rdd.first().size());
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testPoissonVectorRDD() {
+ double mean = 2.0;
+ long m = 100L;
+ int n = 10;
+ int p = 2;
+ long seed = 1L;
+ JavaRDD<Vector> rdd1 = poissonJavaVectorRDD(sc, mean, m, n);
+ JavaRDD<Vector> rdd2 = poissonJavaVectorRDD(sc, mean, m, n, p);
+ JavaRDD<Vector> rdd3 = poissonJavaVectorRDD(sc, mean, m, n, p, seed);
+ for (JavaRDD<Vector> rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) {
+ Assert.assertEquals(m, rdd.count());
+ Assert.assertEquals(n, rdd.first().size());
+ }
+ }
+}
diff --git a/python/pyspark/mllib/random.py b/python/pyspark/mllib/random.py
index 3f3b19053d..4dc1a4a912 100644
--- a/python/pyspark/mllib/random.py
+++ b/python/pyspark/mllib/random.py
@@ -35,10 +35,10 @@ class RandomRDDs:
def uniformRDD(sc, size, numPartitions=None, seed=None):
"""
Generates an RDD comprised of i.i.d. samples from the
- uniform distribution on [0.0, 1.0].
+ uniform distribution U(0.0, 1.0).
- To transform the distribution in the generated RDD from U[0.0, 1.0]
- to U[a, b], use
+ To transform the distribution in the generated RDD from U(0.0, 1.0)
+ to U(a, b), use
C{RandomRDDs.uniformRDD(sc, n, p, seed)\
.map(lambda v: a + (b - a) * v)}
@@ -60,11 +60,11 @@ class RandomRDDs:
@staticmethod
def normalRDD(sc, size, numPartitions=None, seed=None):
"""
- Generates an RDD comprised of i.i.d samples from the standard normal
+ Generates an RDD comprised of i.i.d. samples from the standard normal
distribution.
To transform the distribution in the generated RDD from standard normal
- to some other normal N(mean, sigma), use
+ to some other normal N(mean, sigma^2), use
C{RandomRDDs.normal(sc, n, p, seed)\
.map(lambda v: mean + sigma * v)}
@@ -84,7 +84,7 @@ class RandomRDDs:
@staticmethod
def poissonRDD(sc, mean, size, numPartitions=None, seed=None):
"""
- Generates an RDD comprised of i.i.d samples from the Poisson
+ Generates an RDD comprised of i.i.d. samples from the Poisson
distribution with the input mean.
>>> mean = 100.0
@@ -105,8 +105,8 @@ class RandomRDDs:
@staticmethod
def uniformVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None):
"""
- Generates an RDD comprised of vectors containing i.i.d samples drawn
- from the uniform distribution on [0.0 1.0].
+ Generates an RDD comprised of vectors containing i.i.d. samples drawn
+ from the uniform distribution U(0.0, 1.0).
>>> import numpy as np
>>> mat = np.matrix(RandomRDDs.uniformVectorRDD(sc, 10, 10).collect())
@@ -125,7 +125,7 @@ class RandomRDDs:
@staticmethod
def normalVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None):
"""
- Generates an RDD comprised of vectors containing i.i.d samples drawn
+ Generates an RDD comprised of vectors containing i.i.d. samples drawn
from the standard normal distribution.
>>> import numpy as np
@@ -145,7 +145,7 @@ class RandomRDDs:
@staticmethod
def poissonVectorRDD(sc, mean, numRows, numCols, numPartitions=None, seed=None):
"""
- Generates an RDD comprised of vectors containing i.i.d samples drawn
+ Generates an RDD comprised of vectors containing i.i.d. samples drawn
from the Poisson distribution with the input mean.
>>> import numpy as np