aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/mllib-guide.md2
-rw-r--r--docs/mllib-stats.md74
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala18
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala476
-rw-r--r--mllib/src/test/java/org/apache/spark/mllib/random/JavaRandomRDDsSuite.java134
-rw-r--r--python/pyspark/mllib/random.py20
6 files changed, 418 insertions, 306 deletions
diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md
index 23d5a0c460..ca0a84a8c5 100644
--- a/docs/mllib-guide.md
+++ b/docs/mllib-guide.md
@@ -9,7 +9,7 @@ filtering, dimensionality reduction, as well as underlying optimization primitiv
* [Data types](mllib-basics.html)
* [Basic statistics](mllib-stats.html)
- * data generators
+ * random data generation
* stratified sampling
* summary statistics
* hypothesis testing
diff --git a/docs/mllib-stats.md b/docs/mllib-stats.md
index ca9ef46c15..f25dca746b 100644
--- a/docs/mllib-stats.md
+++ b/docs/mllib-stats.md
@@ -25,7 +25,79 @@ displayTitle: <a href="mllib-guide.html">MLlib</a> - Statistics Functionality
\newcommand{\zero}{\mathbf{0}}
\]`
-## Data Generators
+## Random data generation
+
+Random data generation is useful for randomized algorithms, prototyping, and performance testing.
+MLlib supports generating random RDDs with i.i.d. values drawn from a given distribution:
+uniform, standard normal, or Poisson.
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+[`RandomRDDs`](api/scala/index.html#org.apache.spark.mllib.random.RandomRDDs) provides factory
+methods to generate random double RDDs or vector RDDs.
+The following example generates a random double RDD, whose values follows the standard normal
+distribution `N(0, 1)`, and then map it to `N(1, 4)`.
+
+{% highlight scala %}
+import org.apache.spark.SparkContext
+import org.apache.spark.mllib.random.RandomRDDs._
+
+val sc: SparkContext = ...
+
+// Generate a random double RDD that contains 1 million i.i.d. values drawn from the
+// standard normal distribution `N(0, 1)`, evenly distributed in 10 partitions.
+val u = normalRDD(sc, 1000000L, 10)
+// Apply a transform to get a random double RDD following `N(1, 4)`.
+val v = u.map(x => 1.0 + 2.0 * x)
+{% endhighlight %}
+</div>
+
+<div data-lang="java" markdown="1">
+[`RandomRDDs`](api/java/index.html#org.apache.spark.mllib.random.RandomRDDs) provides factory
+methods to generate random double RDDs or vector RDDs.
+The following example generates a random double RDD, whose values follows the standard normal
+distribution `N(0, 1)`, and then map it to `N(1, 4)`.
+
+{% highlight java %}
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.JavaDoubleRDD;
+import static org.apache.spark.mllib.random.RandomRDDs.*;
+
+JavaSparkContext jsc = ...
+
+// Generate a random double RDD that contains 1 million i.i.d. values drawn from the
+// standard normal distribution `N(0, 1)`, evenly distributed in 10 partitions.
+JavaDoubleRDD u = normalJavaRDD(jsc, 1000000L, 10);
+// Apply a transform to get a random double RDD following `N(1, 4)`.
+JavaDoubleRDD v = u.map(
+ new Function<Double, Double>() {
+ public Double call(Double x) {
+ return 1.0 + 2.0 * x;
+ }
+ });
+{% endhighlight %}
+</div>
+
+<div data-lang="python" markdown="1">
+[`RandomRDDs`](api/python/pyspark.mllib.random.RandomRDDs-class.html) provides factory
+methods to generate random double RDDs or vector RDDs.
+The following example generates a random double RDD, whose values follows the standard normal
+distribution `N(0, 1)`, and then map it to `N(1, 4)`.
+
+{% highlight python %}
+from pyspark.mllib.random import RandomRDDs
+
+sc = ... # SparkContext
+
+# Generate a random double RDD that contains 1 million i.i.d. values drawn from the
+# standard normal distribution `N(0, 1)`, evenly distributed in 10 partitions.
+u = RandomRDDs.uniformRDD(sc, 1000000L, 10)
+# Apply a transform to get a random double RDD following `N(1, 4)`.
+v = u.map(lambda x: 1.0 + 2.0 * x)
+{% endhighlight %}
+</div>
+
+</div>
## Stratified Sampling
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala
index 9cab49f6ed..28179fbc45 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala
@@ -20,14 +20,14 @@ package org.apache.spark.mllib.random
import cern.jet.random.Poisson
import cern.jet.random.engine.DRand
-import org.apache.spark.annotation.Experimental
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom}
/**
- * :: Experimental ::
+ * :: DeveloperApi ::
* Trait for random data generators that generate i.i.d. data.
*/
-@Experimental
+@DeveloperApi
trait RandomDataGenerator[T] extends Pseudorandom with Serializable {
/**
@@ -43,10 +43,10 @@ trait RandomDataGenerator[T] extends Pseudorandom with Serializable {
}
/**
- * :: Experimental ::
+ * :: DeveloperApi ::
* Generates i.i.d. samples from U[0.0, 1.0]
*/
-@Experimental
+@DeveloperApi
class UniformGenerator extends RandomDataGenerator[Double] {
// XORShiftRandom for better performance. Thread safety isn't necessary here.
@@ -62,10 +62,10 @@ class UniformGenerator extends RandomDataGenerator[Double] {
}
/**
- * :: Experimental ::
+ * :: DeveloperApi ::
* Generates i.i.d. samples from the standard normal distribution.
*/
-@Experimental
+@DeveloperApi
class StandardNormalGenerator extends RandomDataGenerator[Double] {
// XORShiftRandom for better performance. Thread safety isn't necessary here.
@@ -81,12 +81,12 @@ class StandardNormalGenerator extends RandomDataGenerator[Double] {
}
/**
- * :: Experimental ::
+ * :: DeveloperApi ::
* Generates i.i.d. samples from the Poisson distribution with the given mean.
*
* @param mean mean for the Poisson distribution.
*/
-@Experimental
+@DeveloperApi
class PoissonGenerator(val mean: Double) extends RandomDataGenerator[Double] {
private var rng = new Poisson(mean, new DRand)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala
index 3627036952..c5f4b08432 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala
@@ -20,9 +20,10 @@ package org.apache.spark.mllib.random
import scala.reflect.ClassTag
import org.apache.spark.SparkContext
-import org.apache.spark.annotation.Experimental
+import org.apache.spark.annotation.{DeveloperApi, Experimental}
+import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD, JavaSparkContext}
import org.apache.spark.mllib.linalg.Vector
-import org.apache.spark.mllib.rdd.{RandomVectorRDD, RandomRDD}
+import org.apache.spark.mllib.rdd.{RandomRDD, RandomVectorRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils
@@ -34,335 +35,279 @@ import org.apache.spark.util.Utils
object RandomRDDs {
/**
- * :: Experimental ::
- * Generates an RDD comprised of i.i.d. samples from the uniform distribution on [0.0, 1.0].
+ * Generates an RDD comprised of i.i.d. samples from the uniform distribution `U(0.0, 1.0)`.
*
- * To transform the distribution in the generated RDD from U[0.0, 1.0] to U[a, b], use
- * `RandomRDDGenerators.uniformRDD(sc, n, p, seed).map(v => a + (b - a) * v)`.
+ * To transform the distribution in the generated RDD from `U(0.0, 1.0)` to `U(a, b)`, use
+ * `RandomRDDs.uniformRDD(sc, n, p, seed).map(v => a + (b - a) * v)`.
*
* @param sc SparkContext used to create the RDD.
* @param size Size of the RDD.
- * @param numPartitions Number of partitions in the RDD.
- * @param seed Seed for the RNG that generates the seed for the generator in each partition.
- * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0].
+ * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`).
+ * @param seed Random seed (default: a random long integer).
+ * @return RDD[Double] comprised of i.i.d. samples ~ `U(0.0, 1.0)`.
*/
- @Experimental
- def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = {
+ def uniformRDD(
+ sc: SparkContext,
+ size: Long,
+ numPartitions: Int = 0,
+ seed: Long = Utils.random.nextLong()): RDD[Double] = {
val uniform = new UniformGenerator()
- randomRDD(sc, uniform, size, numPartitions, seed)
+ randomRDD(sc, uniform, size, numPartitionsOrDefault(sc, numPartitions), seed)
}
/**
- * :: Experimental ::
- * Generates an RDD comprised of i.i.d. samples from the uniform distribution on [0.0, 1.0].
- *
- * To transform the distribution in the generated RDD from U[0.0, 1.0] to U[a, b], use
- * `RandomRDDGenerators.uniformRDD(sc, n, p).map(v => a + (b - a) * v)`.
- *
- * @param sc SparkContext used to create the RDD.
- * @param size Size of the RDD.
- * @param numPartitions Number of partitions in the RDD.
- * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0].
+ * Java-friendly version of [[RandomRDDs#uniformRDD]].
*/
- @Experimental
- def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = {
- uniformRDD(sc, size, numPartitions, Utils.random.nextLong)
+ def uniformJavaRDD(
+ jsc: JavaSparkContext,
+ size: Long,
+ numPartitions: Int,
+ seed: Long): JavaDoubleRDD = {
+ JavaDoubleRDD.fromRDD(uniformRDD(jsc.sc, size, numPartitions, seed))
}
/**
- * :: Experimental ::
- * Generates an RDD comprised of i.i.d. samples from the uniform distribution on [0.0, 1.0].
- * sc.defaultParallelism used for the number of partitions in the RDD.
- *
- * To transform the distribution in the generated RDD from U[0.0, 1.0] to U[a, b], use
- * `RandomRDDGenerators.uniformRDD(sc, n).map(v => a + (b - a) * v)`.
- *
- * @param sc SparkContext used to create the RDD.
- * @param size Size of the RDD.
- * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0].
+ * [[RandomRDDs#uniformJavaRDD]] with the default seed.
*/
- @Experimental
- def uniformRDD(sc: SparkContext, size: Long): RDD[Double] = {
- uniformRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong)
+ def uniformJavaRDD(jsc: JavaSparkContext, size: Long, numPartitions: Int): JavaDoubleRDD = {
+ JavaDoubleRDD.fromRDD(uniformRDD(jsc.sc, size, numPartitions))
}
/**
- * :: Experimental ::
- * Generates an RDD comprised of i.i.d. samples from the standard normal distribution.
- *
- * To transform the distribution in the generated RDD from standard normal to some other normal
- * N(mean, sigma), use `RandomRDDGenerators.normalRDD(sc, n, p, seed).map(v => mean + sigma * v)`.
- *
- * @param sc SparkContext used to create the RDD.
- * @param size Size of the RDD.
- * @param numPartitions Number of partitions in the RDD.
- * @param seed Seed for the RNG that generates the seed for the generator in each partition.
- * @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0).
+ * [[RandomRDDs#uniformJavaRDD]] with the default number of partitions and the default seed.
*/
- @Experimental
- def normalRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = {
- val normal = new StandardNormalGenerator()
- randomRDD(sc, normal, size, numPartitions, seed)
+ def uniformJavaRDD(jsc: JavaSparkContext, size: Long): JavaDoubleRDD = {
+ JavaDoubleRDD.fromRDD(uniformRDD(jsc.sc, size))
}
/**
- * :: Experimental ::
* Generates an RDD comprised of i.i.d. samples from the standard normal distribution.
*
* To transform the distribution in the generated RDD from standard normal to some other normal
- * N(mean, sigma), use `RandomRDDGenerators.normalRDD(sc, n, p).map(v => mean + sigma * v)`.
+ * `N(mean, sigma^2^)`, use `RandomRDDs.normalRDD(sc, n, p, seed).map(v => mean + sigma * v)`.
*
* @param sc SparkContext used to create the RDD.
* @param size Size of the RDD.
- * @param numPartitions Number of partitions in the RDD.
+ * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`).
+ * @param seed Random seed (default: a random long integer).
* @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0).
*/
- @Experimental
- def normalRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = {
- normalRDD(sc, size, numPartitions, Utils.random.nextLong)
+ def normalRDD(
+ sc: SparkContext,
+ size: Long,
+ numPartitions: Int = 0,
+ seed: Long = Utils.random.nextLong()): RDD[Double] = {
+ val normal = new StandardNormalGenerator()
+ randomRDD(sc, normal, size, numPartitionsOrDefault(sc, numPartitions), seed)
}
/**
- * :: Experimental ::
- * Generates an RDD comprised of i.i.d. samples from the standard normal distribution.
- * sc.defaultParallelism used for the number of partitions in the RDD.
- *
- * To transform the distribution in the generated RDD from standard normal to some other normal
- * N(mean, sigma), use `RandomRDDGenerators.normalRDD(sc, n).map(v => mean + sigma * v)`.
- *
- * @param sc SparkContext used to create the RDD.
- * @param size Size of the RDD.
- * @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0).
+ * Java-friendly version of [[RandomRDDs#normalRDD]].
*/
- @Experimental
- def normalRDD(sc: SparkContext, size: Long): RDD[Double] = {
- normalRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong)
+ def normalJavaRDD(
+ jsc: JavaSparkContext,
+ size: Long,
+ numPartitions: Int,
+ seed: Long): JavaDoubleRDD = {
+ JavaDoubleRDD.fromRDD(normalRDD(jsc.sc, size, numPartitions, seed))
}
/**
- * :: Experimental ::
- * Generates an RDD comprised of i.i.d. samples from the Poisson distribution with the input mean.
- *
- * @param sc SparkContext used to create the RDD.
- * @param mean Mean, or lambda, for the Poisson distribution.
- * @param size Size of the RDD.
- * @param numPartitions Number of partitions in the RDD.
- * @param seed Seed for the RNG that generates the seed for the generator in each partition.
- * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean).
+ * [[RandomRDDs#normalJavaRDD]] with the default seed.
*/
- @Experimental
- def poissonRDD(sc: SparkContext,
- mean: Double,
- size: Long,
- numPartitions: Int,
- seed: Long): RDD[Double] = {
- val poisson = new PoissonGenerator(mean)
- randomRDD(sc, poisson, size, numPartitions, seed)
+ def normalJavaRDD(jsc: JavaSparkContext, size: Long, numPartitions: Int): JavaDoubleRDD = {
+ JavaDoubleRDD.fromRDD(normalRDD(jsc.sc, size, numPartitions))
}
/**
- * :: Experimental ::
- * Generates an RDD comprised of i.i.d. samples from the Poisson distribution with the input mean.
- *
- * @param sc SparkContext used to create the RDD.
- * @param mean Mean, or lambda, for the Poisson distribution.
- * @param size Size of the RDD.
- * @param numPartitions Number of partitions in the RDD.
- * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean).
+ * [[RandomRDDs#normalJavaRDD]] with the default number of partitions and the default seed.
*/
- @Experimental
- def poissonRDD(sc: SparkContext, mean: Double, size: Long, numPartitions: Int): RDD[Double] = {
- poissonRDD(sc, mean, size, numPartitions, Utils.random.nextLong)
+ def normalJavaRDD(jsc: JavaSparkContext, size: Long): JavaDoubleRDD = {
+ JavaDoubleRDD.fromRDD(normalRDD(jsc.sc, size))
}
/**
- * :: Experimental ::
* Generates an RDD comprised of i.i.d. samples from the Poisson distribution with the input mean.
- * sc.defaultParallelism used for the number of partitions in the RDD.
*
* @param sc SparkContext used to create the RDD.
* @param mean Mean, or lambda, for the Poisson distribution.
* @param size Size of the RDD.
+ * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`).
+ * @param seed Random seed (default: a random long integer).
* @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean).
*/
- @Experimental
- def poissonRDD(sc: SparkContext, mean: Double, size: Long): RDD[Double] = {
- poissonRDD(sc, mean, size, sc.defaultParallelism, Utils.random.nextLong)
+ def poissonRDD(
+ sc: SparkContext,
+ mean: Double,
+ size: Long,
+ numPartitions: Int = 0,
+ seed: Long = Utils.random.nextLong()): RDD[Double] = {
+ val poisson = new PoissonGenerator(mean)
+ randomRDD(sc, poisson, size, numPartitionsOrDefault(sc, numPartitions), seed)
}
/**
- * :: Experimental ::
- * Generates an RDD comprised of i.i.d. samples produced by the input DistributionGenerator.
- *
- * @param sc SparkContext used to create the RDD.
- * @param generator DistributionGenerator used to populate the RDD.
- * @param size Size of the RDD.
- * @param numPartitions Number of partitions in the RDD.
- * @param seed Seed for the RNG that generates the seed for the generator in each partition.
- * @return RDD[Double] comprised of i.i.d. samples produced by generator.
+ * Java-friendly version of [[RandomRDDs#poissonRDD]].
*/
- @Experimental
- def randomRDD[T: ClassTag](sc: SparkContext,
- generator: RandomDataGenerator[T],
+ def poissonJavaRDD(
+ jsc: JavaSparkContext,
+ mean: Double,
size: Long,
numPartitions: Int,
- seed: Long): RDD[T] = {
- new RandomRDD[T](sc, size, numPartitions, generator, seed)
+ seed: Long): JavaDoubleRDD = {
+ JavaDoubleRDD.fromRDD(poissonRDD(jsc.sc, mean, size, numPartitions, seed))
}
/**
- * :: Experimental ::
- * Generates an RDD comprised of i.i.d. samples produced by the input DistributionGenerator.
- *
- * @param sc SparkContext used to create the RDD.
- * @param generator DistributionGenerator used to populate the RDD.
- * @param size Size of the RDD.
- * @param numPartitions Number of partitions in the RDD.
- * @return RDD[Double] comprised of i.i.d. samples produced by generator.
+ * [[RandomRDDs#poissonJavaRDD]] with the default seed.
*/
- @Experimental
- def randomRDD[T: ClassTag](sc: SparkContext,
- generator: RandomDataGenerator[T],
+ def poissonJavaRDD(
+ jsc: JavaSparkContext,
+ mean: Double,
size: Long,
- numPartitions: Int): RDD[T] = {
- randomRDD[T](sc, generator, size, numPartitions, Utils.random.nextLong)
+ numPartitions: Int): JavaDoubleRDD = {
+ JavaDoubleRDD.fromRDD(poissonRDD(jsc.sc, mean, size, numPartitions))
}
/**
- * :: Experimental ::
- * Generates an RDD comprised of i.i.d. samples produced by the input DistributionGenerator.
- * sc.defaultParallelism used for the number of partitions in the RDD.
+ * [[RandomRDDs#poissonJavaRDD]] with the default number of partitions and the default seed.
+ */
+ def poissonJavaRDD(jsc: JavaSparkContext, mean: Double, size: Long): JavaDoubleRDD = {
+ JavaDoubleRDD.fromRDD(poissonRDD(jsc.sc, mean, size))
+ }
+
+ /**
+ * :: DeveloperApi ::
+ * Generates an RDD comprised of i.i.d. samples produced by the input RandomDataGenerator.
*
* @param sc SparkContext used to create the RDD.
- * @param generator DistributionGenerator used to populate the RDD.
+ * @param generator RandomDataGenerator used to populate the RDD.
* @param size Size of the RDD.
+ * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`).
+ * @param seed Random seed (default: a random long integer).
* @return RDD[Double] comprised of i.i.d. samples produced by generator.
*/
- @Experimental
- def randomRDD[T: ClassTag](sc: SparkContext,
+ @DeveloperApi
+ def randomRDD[T: ClassTag](
+ sc: SparkContext,
generator: RandomDataGenerator[T],
- size: Long): RDD[T] = {
- randomRDD[T](sc, generator, size, sc.defaultParallelism, Utils.random.nextLong)
+ size: Long,
+ numPartitions: Int = 0,
+ seed: Long = Utils.random.nextLong()): RDD[T] = {
+ new RandomRDD[T](sc, size, numPartitionsOrDefault(sc, numPartitions), generator, seed)
}
// TODO Generate RDD[Vector] from multivariate distributions.
/**
- * :: Experimental ::
* Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
- * uniform distribution on [0.0 1.0].
+ * uniform distribution on `U(0.0, 1.0)`.
*
* @param sc SparkContext used to create the RDD.
* @param numRows Number of Vectors in the RDD.
* @param numCols Number of elements in each Vector.
* @param numPartitions Number of partitions in the RDD.
* @param seed Seed for the RNG that generates the seed for the generator in each partition.
- * @return RDD[Vector] with vectors containing i.i.d samples ~ U[0.0, 1.0].
+ * @return RDD[Vector] with vectors containing i.i.d samples ~ `U(0.0, 1.0)`.
*/
- @Experimental
- def uniformVectorRDD(sc: SparkContext,
+ def uniformVectorRDD(
+ sc: SparkContext,
numRows: Long,
numCols: Int,
- numPartitions: Int,
- seed: Long): RDD[Vector] = {
+ numPartitions: Int = 0,
+ seed: Long = Utils.random.nextLong()): RDD[Vector] = {
val uniform = new UniformGenerator()
- randomVectorRDD(sc, uniform, numRows, numCols, numPartitions, seed)
+ randomVectorRDD(sc, uniform, numRows, numCols, numPartitionsOrDefault(sc, numPartitions), seed)
}
/**
- * :: Experimental ::
- * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
- * uniform distribution on [0.0 1.0].
- *
- * @param sc SparkContext used to create the RDD.
- * @param numRows Number of Vectors in the RDD.
- * @param numCols Number of elements in each Vector.
- * @param numPartitions Number of partitions in the RDD.
- * @return RDD[Vector] with vectors containing i.i.d. samples ~ U[0.0, 1.0].
+ * Java-friendly version of [[RandomRDDs#uniformVectorRDD]].
*/
- @Experimental
- def uniformVectorRDD(sc: SparkContext,
+ def uniformJavaVectorRDD(
+ jsc: JavaSparkContext,
numRows: Long,
numCols: Int,
- numPartitions: Int): RDD[Vector] = {
- uniformVectorRDD(sc, numRows, numCols, numPartitions, Utils.random.nextLong)
+ numPartitions: Int,
+ seed: Long): JavaRDD[Vector] = {
+ uniformVectorRDD(jsc.sc, numRows, numCols, numPartitions, seed).toJavaRDD()
}
/**
- * :: Experimental ::
- * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
- * uniform distribution on [0.0 1.0].
- * sc.defaultParallelism used for the number of partitions in the RDD.
- *
- * @param sc SparkContext used to create the RDD.
- * @param numRows Number of Vectors in the RDD.
- * @param numCols Number of elements in each Vector.
- * @return RDD[Vector] with vectors containing i.i.d. samples ~ U[0.0, 1.0].
+ * [[RandomRDDs#uniformJavaVectorRDD]] with the default seed.
*/
- @Experimental
- def uniformVectorRDD(sc: SparkContext, numRows: Long, numCols: Int): RDD[Vector] = {
- uniformVectorRDD(sc, numRows, numCols, sc.defaultParallelism, Utils.random.nextLong)
+ def uniformJavaVectorRDD(
+ jsc: JavaSparkContext,
+ numRows: Long,
+ numCols: Int,
+ numPartitions: Int): JavaRDD[Vector] = {
+ uniformVectorRDD(jsc.sc, numRows, numCols, numPartitions).toJavaRDD()
+ }
+
+ /**
+ * [[RandomRDDs#uniformJavaVectorRDD]] with the default number of partitions and the default seed.
+ */
+ def uniformJavaVectorRDD(
+ jsc: JavaSparkContext,
+ numRows: Long,
+ numCols: Int): JavaRDD[Vector] = {
+ uniformVectorRDD(jsc.sc, numRows, numCols).toJavaRDD()
}
/**
- * :: Experimental ::
* Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
* standard normal distribution.
*
* @param sc SparkContext used to create the RDD.
* @param numRows Number of Vectors in the RDD.
* @param numCols Number of elements in each Vector.
- * @param numPartitions Number of partitions in the RDD.
- * @param seed Seed for the RNG that generates the seed for the generator in each partition.
- * @return RDD[Vector] with vectors containing i.i.d. samples ~ N(0.0, 1.0).
+ * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`).
+ * @param seed Random seed (default: a random long integer).
+ * @return RDD[Vector] with vectors containing i.i.d. samples ~ `N(0.0, 1.0)`.
+ */
+ def normalVectorRDD(
+ sc: SparkContext,
+ numRows: Long,
+ numCols: Int,
+ numPartitions: Int = 0,
+ seed: Long = Utils.random.nextLong()): RDD[Vector] = {
+ val normal = new StandardNormalGenerator()
+ randomVectorRDD(sc, normal, numRows, numCols, numPartitionsOrDefault(sc, numPartitions), seed)
+ }
+
+ /**
+ * Java-friendly version of [[RandomRDDs#normalVectorRDD]].
*/
- @Experimental
- def normalVectorRDD(sc: SparkContext,
+ def normalJavaVectorRDD(
+ jsc: JavaSparkContext,
numRows: Long,
numCols: Int,
numPartitions: Int,
- seed: Long): RDD[Vector] = {
- val uniform = new StandardNormalGenerator()
- randomVectorRDD(sc, uniform, numRows, numCols, numPartitions, seed)
+ seed: Long): JavaRDD[Vector] = {
+ normalVectorRDD(jsc.sc, numRows, numCols, numPartitions, seed).toJavaRDD()
}
/**
- * :: Experimental ::
- * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
- * standard normal distribution.
- *
- * @param sc SparkContext used to create the RDD.
- * @param numRows Number of Vectors in the RDD.
- * @param numCols Number of elements in each Vector.
- * @param numPartitions Number of partitions in the RDD.
- * @return RDD[Vector] with vectors containing i.i.d. samples ~ N(0.0, 1.0).
+ * [[RandomRDDs#normalJavaVectorRDD]] with the default seed.
*/
- @Experimental
- def normalVectorRDD(sc: SparkContext,
+ def normalJavaVectorRDD(
+ jsc: JavaSparkContext,
numRows: Long,
numCols: Int,
- numPartitions: Int): RDD[Vector] = {
- normalVectorRDD(sc, numRows, numCols, numPartitions, Utils.random.nextLong)
+ numPartitions: Int): JavaRDD[Vector] = {
+ normalVectorRDD(jsc.sc, numRows, numCols, numPartitions).toJavaRDD()
}
/**
- * :: Experimental ::
- * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
- * standard normal distribution.
- * sc.defaultParallelism used for the number of partitions in the RDD.
- *
- * @param sc SparkContext used to create the RDD.
- * @param numRows Number of Vectors in the RDD.
- * @param numCols Number of elements in each Vector.
- * @return RDD[Vector] with vectors containing i.i.d. samples ~ N(0.0, 1.0).
+ * [[RandomRDDs#normalJavaVectorRDD]] with the default number of partitions and the default seed.
*/
- @Experimental
- def normalVectorRDD(sc: SparkContext, numRows: Long, numCols: Int): RDD[Vector] = {
- normalVectorRDD(sc, numRows, numCols, sc.defaultParallelism, Utils.random.nextLong)
+ def normalJavaVectorRDD(
+ jsc: JavaSparkContext,
+ numRows: Long,
+ numCols: Int): JavaRDD[Vector] = {
+ normalVectorRDD(jsc.sc, numRows, numCols).toJavaRDD()
}
/**
- * :: Experimental ::
* Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
* Poisson distribution with the input mean.
*
@@ -370,124 +315,85 @@ object RandomRDDs {
* @param mean Mean, or lambda, for the Poisson distribution.
* @param numRows Number of Vectors in the RDD.
* @param numCols Number of elements in each Vector.
- * @param numPartitions Number of partitions in the RDD.
- * @param seed Seed for the RNG that generates the seed for the generator in each partition.
+ * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`)
+ * @param seed Random seed (default: a random long integer).
* @return RDD[Vector] with vectors containing i.i.d. samples ~ Pois(mean).
*/
- @Experimental
- def poissonVectorRDD(sc: SparkContext,
+ def poissonVectorRDD(
+ sc: SparkContext,
mean: Double,
numRows: Long,
numCols: Int,
- numPartitions: Int,
- seed: Long): RDD[Vector] = {
+ numPartitions: Int = 0,
+ seed: Long = Utils.random.nextLong()): RDD[Vector] = {
val poisson = new PoissonGenerator(mean)
- randomVectorRDD(sc, poisson, numRows, numCols, numPartitions, seed)
+ randomVectorRDD(sc, poisson, numRows, numCols, numPartitionsOrDefault(sc, numPartitions), seed)
}
/**
- * :: Experimental ::
- * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
- * Poisson distribution with the input mean.
- *
- * @param sc SparkContext used to create the RDD.
- * @param mean Mean, or lambda, for the Poisson distribution.
- * @param numRows Number of Vectors in the RDD.
- * @param numCols Number of elements in each Vector.
- * @param numPartitions Number of partitions in the RDD.
- * @return RDD[Vector] with vectors containing i.i.d. samples ~ Pois(mean).
+ * Java-friendly version of [[RandomRDDs#poissonVectorRDD]].
*/
- @Experimental
- def poissonVectorRDD(sc: SparkContext,
+ def poissonJavaVectorRDD(
+ jsc: JavaSparkContext,
mean: Double,
numRows: Long,
numCols: Int,
- numPartitions: Int): RDD[Vector] = {
- poissonVectorRDD(sc, mean, numRows, numCols, numPartitions, Utils.random.nextLong)
+ numPartitions: Int,
+ seed: Long): JavaRDD[Vector] = {
+ poissonVectorRDD(jsc.sc, mean, numRows, numCols, numPartitions, seed).toJavaRDD()
}
/**
- * :: Experimental ::
- * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
- * Poisson distribution with the input mean.
- * sc.defaultParallelism used for the number of partitions in the RDD.
- *
- * @param sc SparkContext used to create the RDD.
- * @param mean Mean, or lambda, for the Poisson distribution.
- * @param numRows Number of Vectors in the RDD.
- * @param numCols Number of elements in each Vector.
- * @return RDD[Vector] with vectors containing i.i.d. samples ~ Pois(mean).
+ * [[RandomRDDs#poissonJavaVectorRDD]] with the default seed.
*/
- @Experimental
- def poissonVectorRDD(sc: SparkContext,
+ def poissonJavaVectorRDD(
+ jsc: JavaSparkContext,
mean: Double,
numRows: Long,
- numCols: Int): RDD[Vector] = {
- poissonVectorRDD(sc, mean, numRows, numCols, sc.defaultParallelism, Utils.random.nextLong)
+ numCols: Int,
+ numPartitions: Int): JavaRDD[Vector] = {
+ poissonVectorRDD(jsc.sc, mean, numRows, numCols, numPartitions).toJavaRDD()
}
/**
- * :: Experimental ::
- * Generates an RDD[Vector] with vectors containing i.i.d. samples produced by the
- * input DistributionGenerator.
- *
- * @param sc SparkContext used to create the RDD.
- * @param generator DistributionGenerator used to populate the RDD.
- * @param numRows Number of Vectors in the RDD.
- * @param numCols Number of elements in each Vector.
- * @param numPartitions Number of partitions in the RDD.
- * @param seed Seed for the RNG that generates the seed for the generator in each partition.
- * @return RDD[Vector] with vectors containing i.i.d. samples produced by generator.
+ * [[RandomRDDs#poissonJavaVectorRDD]] with the default number of partitions and the default seed.
*/
- @Experimental
- def randomVectorRDD(sc: SparkContext,
- generator: RandomDataGenerator[Double],
+ def poissonJavaVectorRDD(
+ jsc: JavaSparkContext,
+ mean: Double,
numRows: Long,
- numCols: Int,
- numPartitions: Int,
- seed: Long): RDD[Vector] = {
- new RandomVectorRDD(sc, numRows, numCols, numPartitions, generator, seed)
+ numCols: Int): JavaRDD[Vector] = {
+ poissonVectorRDD(jsc.sc, mean, numRows, numCols).toJavaRDD()
}
/**
- * :: Experimental ::
+ * :: DeveloperApi ::
* Generates an RDD[Vector] with vectors containing i.i.d. samples produced by the
- * input DistributionGenerator.
+ * input RandomDataGenerator.
*
* @param sc SparkContext used to create the RDD.
- * @param generator DistributionGenerator used to populate the RDD.
+ * @param generator RandomDataGenerator used to populate the RDD.
* @param numRows Number of Vectors in the RDD.
* @param numCols Number of elements in each Vector.
- * @param numPartitions Number of partitions in the RDD.
+ * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`).
+ * @param seed Random seed (default: a random long integer).
* @return RDD[Vector] with vectors containing i.i.d. samples produced by generator.
*/
- @Experimental
+ @DeveloperApi
def randomVectorRDD(sc: SparkContext,
generator: RandomDataGenerator[Double],
numRows: Long,
numCols: Int,
- numPartitions: Int): RDD[Vector] = {
- randomVectorRDD(sc, generator, numRows, numCols, numPartitions, Utils.random.nextLong)
+ numPartitions: Int = 0,
+ seed: Long = Utils.random.nextLong()): RDD[Vector] = {
+ new RandomVectorRDD(
+ sc, numRows, numCols, numPartitionsOrDefault(sc, numPartitions), generator, seed)
}
/**
- * :: Experimental ::
- * Generates an RDD[Vector] with vectors containing i.i.d. samples produced by the
- * input DistributionGenerator.
- * sc.defaultParallelism used for the number of partitions in the RDD.
- *
- * @param sc SparkContext used to create the RDD.
- * @param generator DistributionGenerator used to populate the RDD.
- * @param numRows Number of Vectors in the RDD.
- * @param numCols Number of elements in each Vector.
- * @return RDD[Vector] with vectors containing i.i.d. samples produced by generator.
+ * Returns `numPartitions` if it is positive, or `sc.defaultParallelism` otherwise.
*/
- @Experimental
- def randomVectorRDD(sc: SparkContext,
- generator: RandomDataGenerator[Double],
- numRows: Long,
- numCols: Int): RDD[Vector] = {
- randomVectorRDD(sc, generator, numRows, numCols,
- sc.defaultParallelism, Utils.random.nextLong)
+ private def numPartitionsOrDefault(sc: SparkContext, numPartitions: Int): Int = {
+ if (numPartitions > 0) numPartitions else sc.defaultMinPartitions
}
}
diff --git a/mllib/src/test/java/org/apache/spark/mllib/random/JavaRandomRDDsSuite.java b/mllib/src/test/java/org/apache/spark/mllib/random/JavaRandomRDDsSuite.java
new file mode 100644
index 0000000000..a725736ca1
--- /dev/null
+++ b/mllib/src/test/java/org/apache/spark/mllib/random/JavaRandomRDDsSuite.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.random;
+
+import com.google.common.collect.Lists;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.Assert;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.api.java.JavaDoubleRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.mllib.linalg.Vector;
+import static org.apache.spark.mllib.random.RandomRDDs.*;
+
+public class JavaRandomRDDsSuite {
+ private transient JavaSparkContext sc;
+
+ @Before
+ public void setUp() {
+ sc = new JavaSparkContext("local", "JavaRandomRDDsSuite");
+ }
+
+ @After
+ public void tearDown() {
+ sc.stop();
+ sc = null;
+ }
+
+ @Test
+ public void testUniformRDD() {
+ long m = 1000L;
+ int p = 2;
+ long seed = 1L;
+ JavaDoubleRDD rdd1 = uniformJavaRDD(sc, m);
+ JavaDoubleRDD rdd2 = uniformJavaRDD(sc, m, p);
+ JavaDoubleRDD rdd3 = uniformJavaRDD(sc, m, p, seed);
+ for (JavaDoubleRDD rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) {
+ Assert.assertEquals(m, rdd.count());
+ }
+ }
+
+ @Test
+ public void testNormalRDD() {
+ long m = 1000L;
+ int p = 2;
+ long seed = 1L;
+ JavaDoubleRDD rdd1 = normalJavaRDD(sc, m);
+ JavaDoubleRDD rdd2 = normalJavaRDD(sc, m, p);
+ JavaDoubleRDD rdd3 = normalJavaRDD(sc, m, p, seed);
+ for (JavaDoubleRDD rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) {
+ Assert.assertEquals(m, rdd.count());
+ }
+ }
+
+ @Test
+ public void testPoissonRDD() {
+ double mean = 2.0;
+ long m = 1000L;
+ int p = 2;
+ long seed = 1L;
+ JavaDoubleRDD rdd1 = poissonJavaRDD(sc, mean, m);
+ JavaDoubleRDD rdd2 = poissonJavaRDD(sc, mean, m, p);
+ JavaDoubleRDD rdd3 = poissonJavaRDD(sc, mean, m, p, seed);
+ for (JavaDoubleRDD rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) {
+ Assert.assertEquals(m, rdd.count());
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testUniformVectorRDD() {
+ long m = 100L;
+ int n = 10;
+ int p = 2;
+ long seed = 1L;
+ JavaRDD<Vector> rdd1 = uniformJavaVectorRDD(sc, m, n);
+ JavaRDD<Vector> rdd2 = uniformJavaVectorRDD(sc, m, n, p);
+ JavaRDD<Vector> rdd3 = uniformJavaVectorRDD(sc, m, n, p, seed);
+ for (JavaRDD<Vector> rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) {
+ Assert.assertEquals(m, rdd.count());
+ Assert.assertEquals(n, rdd.first().size());
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testNormalVectorRDD() {
+ long m = 100L;
+ int n = 10;
+ int p = 2;
+ long seed = 1L;
+ JavaRDD<Vector> rdd1 = normalJavaVectorRDD(sc, m, n);
+ JavaRDD<Vector> rdd2 = normalJavaVectorRDD(sc, m, n, p);
+ JavaRDD<Vector> rdd3 = normalJavaVectorRDD(sc, m, n, p, seed);
+ for (JavaRDD<Vector> rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) {
+ Assert.assertEquals(m, rdd.count());
+ Assert.assertEquals(n, rdd.first().size());
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testPoissonVectorRDD() {
+ double mean = 2.0;
+ long m = 100L;
+ int n = 10;
+ int p = 2;
+ long seed = 1L;
+ JavaRDD<Vector> rdd1 = poissonJavaVectorRDD(sc, mean, m, n);
+ JavaRDD<Vector> rdd2 = poissonJavaVectorRDD(sc, mean, m, n, p);
+ JavaRDD<Vector> rdd3 = poissonJavaVectorRDD(sc, mean, m, n, p, seed);
+ for (JavaRDD<Vector> rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) {
+ Assert.assertEquals(m, rdd.count());
+ Assert.assertEquals(n, rdd.first().size());
+ }
+ }
+}
diff --git a/python/pyspark/mllib/random.py b/python/pyspark/mllib/random.py
index 3f3b19053d..4dc1a4a912 100644
--- a/python/pyspark/mllib/random.py
+++ b/python/pyspark/mllib/random.py
@@ -35,10 +35,10 @@ class RandomRDDs:
def uniformRDD(sc, size, numPartitions=None, seed=None):
"""
Generates an RDD comprised of i.i.d. samples from the
- uniform distribution on [0.0, 1.0].
+ uniform distribution U(0.0, 1.0).
- To transform the distribution in the generated RDD from U[0.0, 1.0]
- to U[a, b], use
+ To transform the distribution in the generated RDD from U(0.0, 1.0)
+ to U(a, b), use
C{RandomRDDs.uniformRDD(sc, n, p, seed)\
.map(lambda v: a + (b - a) * v)}
@@ -60,11 +60,11 @@ class RandomRDDs:
@staticmethod
def normalRDD(sc, size, numPartitions=None, seed=None):
"""
- Generates an RDD comprised of i.i.d samples from the standard normal
+ Generates an RDD comprised of i.i.d. samples from the standard normal
distribution.
To transform the distribution in the generated RDD from standard normal
- to some other normal N(mean, sigma), use
+ to some other normal N(mean, sigma^2), use
C{RandomRDDs.normal(sc, n, p, seed)\
.map(lambda v: mean + sigma * v)}
@@ -84,7 +84,7 @@ class RandomRDDs:
@staticmethod
def poissonRDD(sc, mean, size, numPartitions=None, seed=None):
"""
- Generates an RDD comprised of i.i.d samples from the Poisson
+ Generates an RDD comprised of i.i.d. samples from the Poisson
distribution with the input mean.
>>> mean = 100.0
@@ -105,8 +105,8 @@ class RandomRDDs:
@staticmethod
def uniformVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None):
"""
- Generates an RDD comprised of vectors containing i.i.d samples drawn
- from the uniform distribution on [0.0 1.0].
+ Generates an RDD comprised of vectors containing i.i.d. samples drawn
+ from the uniform distribution U(0.0, 1.0).
>>> import numpy as np
>>> mat = np.matrix(RandomRDDs.uniformVectorRDD(sc, 10, 10).collect())
@@ -125,7 +125,7 @@ class RandomRDDs:
@staticmethod
def normalVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None):
"""
- Generates an RDD comprised of vectors containing i.i.d samples drawn
+ Generates an RDD comprised of vectors containing i.i.d. samples drawn
from the standard normal distribution.
>>> import numpy as np
@@ -145,7 +145,7 @@ class RandomRDDs:
@staticmethod
def poissonVectorRDD(sc, mean, numRows, numCols, numPartitions=None, seed=None):
"""
- Generates an RDD comprised of vectors containing i.i.d samples drawn
+ Generates an RDD comprised of vectors containing i.i.d. samples drawn
from the Poisson distribution with the input mean.
>>> import numpy as np