From 825d4fe47b9c4d48de88622dd48dcf83beb8b80a Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 19 Aug 2014 16:06:48 -0700 Subject: [SPARK-3136][MLLIB] Create Java-friendly methods in RandomRDDs Though we don't use default argument for methods in RandomRDDs, it is still not easy for Java users to use because the output type is either `RDD[Double]` or `RDD[Vector]`. Java users should expect `JavaDoubleRDD` and `JavaRDD[Vector]`, respectively. We should create dedicated methods for Java users, and allow default arguments in Scala methods in RandomRDDs, to make life easier for both Java and Scala users. This PR also contains documentation for random data generation. brkyvz Author: Xiangrui Meng Closes #2041 from mengxr/stat-doc and squashes the following commits: fc5eedf [Xiangrui Meng] add missing comma ffde810 [Xiangrui Meng] address comments aef6d07 [Xiangrui Meng] add doc for random data generation b99d94b [Xiangrui Meng] add java-friendly methods to RandomRDDs --- .../spark/mllib/random/RandomDataGenerator.scala | 18 +- .../org/apache/spark/mllib/random/RandomRDDs.scala | 476 +++++++++------------ .../spark/mllib/random/JavaRandomRDDsSuite.java | 134 ++++++ 3 files changed, 334 insertions(+), 294 deletions(-) create mode 100644 mllib/src/test/java/org/apache/spark/mllib/random/JavaRandomRDDsSuite.java (limited to 'mllib') diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala index 9cab49f6ed..28179fbc45 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala @@ -20,14 +20,14 @@ package org.apache.spark.mllib.random import cern.jet.random.Poisson import cern.jet.random.engine.DRand -import org.apache.spark.annotation.Experimental +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom} /** - * :: Experimental :: + * :: DeveloperApi :: * Trait for random data generators that generate i.i.d. data. */ -@Experimental +@DeveloperApi trait RandomDataGenerator[T] extends Pseudorandom with Serializable { /** @@ -43,10 +43,10 @@ trait RandomDataGenerator[T] extends Pseudorandom with Serializable { } /** - * :: Experimental :: + * :: DeveloperApi :: * Generates i.i.d. samples from U[0.0, 1.0] */ -@Experimental +@DeveloperApi class UniformGenerator extends RandomDataGenerator[Double] { // XORShiftRandom for better performance. Thread safety isn't necessary here. @@ -62,10 +62,10 @@ class UniformGenerator extends RandomDataGenerator[Double] { } /** - * :: Experimental :: + * :: DeveloperApi :: * Generates i.i.d. samples from the standard normal distribution. */ -@Experimental +@DeveloperApi class StandardNormalGenerator extends RandomDataGenerator[Double] { // XORShiftRandom for better performance. Thread safety isn't necessary here. @@ -81,12 +81,12 @@ class StandardNormalGenerator extends RandomDataGenerator[Double] { } /** - * :: Experimental :: + * :: DeveloperApi :: * Generates i.i.d. samples from the Poisson distribution with the given mean. * * @param mean mean for the Poisson distribution. */ -@Experimental +@DeveloperApi class PoissonGenerator(val mean: Double) extends RandomDataGenerator[Double] { private var rng = new Poisson(mean, new DRand) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala index 3627036952..c5f4b08432 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala @@ -20,9 +20,10 @@ package org.apache.spark.mllib.random import scala.reflect.ClassTag import org.apache.spark.SparkContext -import org.apache.spark.annotation.Experimental +import org.apache.spark.annotation.{DeveloperApi, Experimental} +import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD, JavaSparkContext} import org.apache.spark.mllib.linalg.Vector -import org.apache.spark.mllib.rdd.{RandomVectorRDD, RandomRDD} +import org.apache.spark.mllib.rdd.{RandomRDD, RandomVectorRDD} import org.apache.spark.rdd.RDD import org.apache.spark.util.Utils @@ -34,335 +35,279 @@ import org.apache.spark.util.Utils object RandomRDDs { /** - * :: Experimental :: - * Generates an RDD comprised of i.i.d. samples from the uniform distribution on [0.0, 1.0]. + * Generates an RDD comprised of i.i.d. samples from the uniform distribution `U(0.0, 1.0)`. * - * To transform the distribution in the generated RDD from U[0.0, 1.0] to U[a, b], use - * `RandomRDDGenerators.uniformRDD(sc, n, p, seed).map(v => a + (b - a) * v)`. + * To transform the distribution in the generated RDD from `U(0.0, 1.0)` to `U(a, b)`, use + * `RandomRDDs.uniformRDD(sc, n, p, seed).map(v => a + (b - a) * v)`. * * @param sc SparkContext used to create the RDD. * @param size Size of the RDD. - * @param numPartitions Number of partitions in the RDD. - * @param seed Seed for the RNG that generates the seed for the generator in each partition. - * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0]. + * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`). + * @param seed Random seed (default: a random long integer). + * @return RDD[Double] comprised of i.i.d. samples ~ `U(0.0, 1.0)`. */ - @Experimental - def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = { + def uniformRDD( + sc: SparkContext, + size: Long, + numPartitions: Int = 0, + seed: Long = Utils.random.nextLong()): RDD[Double] = { val uniform = new UniformGenerator() - randomRDD(sc, uniform, size, numPartitions, seed) + randomRDD(sc, uniform, size, numPartitionsOrDefault(sc, numPartitions), seed) } /** - * :: Experimental :: - * Generates an RDD comprised of i.i.d. samples from the uniform distribution on [0.0, 1.0]. - * - * To transform the distribution in the generated RDD from U[0.0, 1.0] to U[a, b], use - * `RandomRDDGenerators.uniformRDD(sc, n, p).map(v => a + (b - a) * v)`. - * - * @param sc SparkContext used to create the RDD. - * @param size Size of the RDD. - * @param numPartitions Number of partitions in the RDD. - * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0]. + * Java-friendly version of [[RandomRDDs#uniformRDD]]. */ - @Experimental - def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = { - uniformRDD(sc, size, numPartitions, Utils.random.nextLong) + def uniformJavaRDD( + jsc: JavaSparkContext, + size: Long, + numPartitions: Int, + seed: Long): JavaDoubleRDD = { + JavaDoubleRDD.fromRDD(uniformRDD(jsc.sc, size, numPartitions, seed)) } /** - * :: Experimental :: - * Generates an RDD comprised of i.i.d. samples from the uniform distribution on [0.0, 1.0]. - * sc.defaultParallelism used for the number of partitions in the RDD. - * - * To transform the distribution in the generated RDD from U[0.0, 1.0] to U[a, b], use - * `RandomRDDGenerators.uniformRDD(sc, n).map(v => a + (b - a) * v)`. - * - * @param sc SparkContext used to create the RDD. - * @param size Size of the RDD. - * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0]. + * [[RandomRDDs#uniformJavaRDD]] with the default seed. */ - @Experimental - def uniformRDD(sc: SparkContext, size: Long): RDD[Double] = { - uniformRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong) + def uniformJavaRDD(jsc: JavaSparkContext, size: Long, numPartitions: Int): JavaDoubleRDD = { + JavaDoubleRDD.fromRDD(uniformRDD(jsc.sc, size, numPartitions)) } /** - * :: Experimental :: - * Generates an RDD comprised of i.i.d. samples from the standard normal distribution. - * - * To transform the distribution in the generated RDD from standard normal to some other normal - * N(mean, sigma), use `RandomRDDGenerators.normalRDD(sc, n, p, seed).map(v => mean + sigma * v)`. - * - * @param sc SparkContext used to create the RDD. - * @param size Size of the RDD. - * @param numPartitions Number of partitions in the RDD. - * @param seed Seed for the RNG that generates the seed for the generator in each partition. - * @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0). + * [[RandomRDDs#uniformJavaRDD]] with the default number of partitions and the default seed. */ - @Experimental - def normalRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = { - val normal = new StandardNormalGenerator() - randomRDD(sc, normal, size, numPartitions, seed) + def uniformJavaRDD(jsc: JavaSparkContext, size: Long): JavaDoubleRDD = { + JavaDoubleRDD.fromRDD(uniformRDD(jsc.sc, size)) } /** - * :: Experimental :: * Generates an RDD comprised of i.i.d. samples from the standard normal distribution. * * To transform the distribution in the generated RDD from standard normal to some other normal - * N(mean, sigma), use `RandomRDDGenerators.normalRDD(sc, n, p).map(v => mean + sigma * v)`. + * `N(mean, sigma^2^)`, use `RandomRDDs.normalRDD(sc, n, p, seed).map(v => mean + sigma * v)`. * * @param sc SparkContext used to create the RDD. * @param size Size of the RDD. - * @param numPartitions Number of partitions in the RDD. + * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`). + * @param seed Random seed (default: a random long integer). * @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0). */ - @Experimental - def normalRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = { - normalRDD(sc, size, numPartitions, Utils.random.nextLong) + def normalRDD( + sc: SparkContext, + size: Long, + numPartitions: Int = 0, + seed: Long = Utils.random.nextLong()): RDD[Double] = { + val normal = new StandardNormalGenerator() + randomRDD(sc, normal, size, numPartitionsOrDefault(sc, numPartitions), seed) } /** - * :: Experimental :: - * Generates an RDD comprised of i.i.d. samples from the standard normal distribution. - * sc.defaultParallelism used for the number of partitions in the RDD. - * - * To transform the distribution in the generated RDD from standard normal to some other normal - * N(mean, sigma), use `RandomRDDGenerators.normalRDD(sc, n).map(v => mean + sigma * v)`. - * - * @param sc SparkContext used to create the RDD. - * @param size Size of the RDD. - * @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0). + * Java-friendly version of [[RandomRDDs#normalRDD]]. */ - @Experimental - def normalRDD(sc: SparkContext, size: Long): RDD[Double] = { - normalRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong) + def normalJavaRDD( + jsc: JavaSparkContext, + size: Long, + numPartitions: Int, + seed: Long): JavaDoubleRDD = { + JavaDoubleRDD.fromRDD(normalRDD(jsc.sc, size, numPartitions, seed)) } /** - * :: Experimental :: - * Generates an RDD comprised of i.i.d. samples from the Poisson distribution with the input mean. - * - * @param sc SparkContext used to create the RDD. - * @param mean Mean, or lambda, for the Poisson distribution. - * @param size Size of the RDD. - * @param numPartitions Number of partitions in the RDD. - * @param seed Seed for the RNG that generates the seed for the generator in each partition. - * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean). + * [[RandomRDDs#normalJavaRDD]] with the default seed. */ - @Experimental - def poissonRDD(sc: SparkContext, - mean: Double, - size: Long, - numPartitions: Int, - seed: Long): RDD[Double] = { - val poisson = new PoissonGenerator(mean) - randomRDD(sc, poisson, size, numPartitions, seed) + def normalJavaRDD(jsc: JavaSparkContext, size: Long, numPartitions: Int): JavaDoubleRDD = { + JavaDoubleRDD.fromRDD(normalRDD(jsc.sc, size, numPartitions)) } /** - * :: Experimental :: - * Generates an RDD comprised of i.i.d. samples from the Poisson distribution with the input mean. - * - * @param sc SparkContext used to create the RDD. - * @param mean Mean, or lambda, for the Poisson distribution. - * @param size Size of the RDD. - * @param numPartitions Number of partitions in the RDD. - * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean). + * [[RandomRDDs#normalJavaRDD]] with the default number of partitions and the default seed. */ - @Experimental - def poissonRDD(sc: SparkContext, mean: Double, size: Long, numPartitions: Int): RDD[Double] = { - poissonRDD(sc, mean, size, numPartitions, Utils.random.nextLong) + def normalJavaRDD(jsc: JavaSparkContext, size: Long): JavaDoubleRDD = { + JavaDoubleRDD.fromRDD(normalRDD(jsc.sc, size)) } /** - * :: Experimental :: * Generates an RDD comprised of i.i.d. samples from the Poisson distribution with the input mean. - * sc.defaultParallelism used for the number of partitions in the RDD. * * @param sc SparkContext used to create the RDD. * @param mean Mean, or lambda, for the Poisson distribution. * @param size Size of the RDD. + * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`). + * @param seed Random seed (default: a random long integer). * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean). */ - @Experimental - def poissonRDD(sc: SparkContext, mean: Double, size: Long): RDD[Double] = { - poissonRDD(sc, mean, size, sc.defaultParallelism, Utils.random.nextLong) + def poissonRDD( + sc: SparkContext, + mean: Double, + size: Long, + numPartitions: Int = 0, + seed: Long = Utils.random.nextLong()): RDD[Double] = { + val poisson = new PoissonGenerator(mean) + randomRDD(sc, poisson, size, numPartitionsOrDefault(sc, numPartitions), seed) } /** - * :: Experimental :: - * Generates an RDD comprised of i.i.d. samples produced by the input DistributionGenerator. - * - * @param sc SparkContext used to create the RDD. - * @param generator DistributionGenerator used to populate the RDD. - * @param size Size of the RDD. - * @param numPartitions Number of partitions in the RDD. - * @param seed Seed for the RNG that generates the seed for the generator in each partition. - * @return RDD[Double] comprised of i.i.d. samples produced by generator. + * Java-friendly version of [[RandomRDDs#poissonRDD]]. */ - @Experimental - def randomRDD[T: ClassTag](sc: SparkContext, - generator: RandomDataGenerator[T], + def poissonJavaRDD( + jsc: JavaSparkContext, + mean: Double, size: Long, numPartitions: Int, - seed: Long): RDD[T] = { - new RandomRDD[T](sc, size, numPartitions, generator, seed) + seed: Long): JavaDoubleRDD = { + JavaDoubleRDD.fromRDD(poissonRDD(jsc.sc, mean, size, numPartitions, seed)) } /** - * :: Experimental :: - * Generates an RDD comprised of i.i.d. samples produced by the input DistributionGenerator. - * - * @param sc SparkContext used to create the RDD. - * @param generator DistributionGenerator used to populate the RDD. - * @param size Size of the RDD. - * @param numPartitions Number of partitions in the RDD. - * @return RDD[Double] comprised of i.i.d. samples produced by generator. + * [[RandomRDDs#poissonJavaRDD]] with the default seed. */ - @Experimental - def randomRDD[T: ClassTag](sc: SparkContext, - generator: RandomDataGenerator[T], + def poissonJavaRDD( + jsc: JavaSparkContext, + mean: Double, size: Long, - numPartitions: Int): RDD[T] = { - randomRDD[T](sc, generator, size, numPartitions, Utils.random.nextLong) + numPartitions: Int): JavaDoubleRDD = { + JavaDoubleRDD.fromRDD(poissonRDD(jsc.sc, mean, size, numPartitions)) } /** - * :: Experimental :: - * Generates an RDD comprised of i.i.d. samples produced by the input DistributionGenerator. - * sc.defaultParallelism used for the number of partitions in the RDD. + * [[RandomRDDs#poissonJavaRDD]] with the default number of partitions and the default seed. + */ + def poissonJavaRDD(jsc: JavaSparkContext, mean: Double, size: Long): JavaDoubleRDD = { + JavaDoubleRDD.fromRDD(poissonRDD(jsc.sc, mean, size)) + } + + /** + * :: DeveloperApi :: + * Generates an RDD comprised of i.i.d. samples produced by the input RandomDataGenerator. * * @param sc SparkContext used to create the RDD. - * @param generator DistributionGenerator used to populate the RDD. + * @param generator RandomDataGenerator used to populate the RDD. * @param size Size of the RDD. + * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`). + * @param seed Random seed (default: a random long integer). * @return RDD[Double] comprised of i.i.d. samples produced by generator. */ - @Experimental - def randomRDD[T: ClassTag](sc: SparkContext, + @DeveloperApi + def randomRDD[T: ClassTag]( + sc: SparkContext, generator: RandomDataGenerator[T], - size: Long): RDD[T] = { - randomRDD[T](sc, generator, size, sc.defaultParallelism, Utils.random.nextLong) + size: Long, + numPartitions: Int = 0, + seed: Long = Utils.random.nextLong()): RDD[T] = { + new RandomRDD[T](sc, size, numPartitionsOrDefault(sc, numPartitions), generator, seed) } // TODO Generate RDD[Vector] from multivariate distributions. /** - * :: Experimental :: * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the - * uniform distribution on [0.0 1.0]. + * uniform distribution on `U(0.0, 1.0)`. * * @param sc SparkContext used to create the RDD. * @param numRows Number of Vectors in the RDD. * @param numCols Number of elements in each Vector. * @param numPartitions Number of partitions in the RDD. * @param seed Seed for the RNG that generates the seed for the generator in each partition. - * @return RDD[Vector] with vectors containing i.i.d samples ~ U[0.0, 1.0]. + * @return RDD[Vector] with vectors containing i.i.d samples ~ `U(0.0, 1.0)`. */ - @Experimental - def uniformVectorRDD(sc: SparkContext, + def uniformVectorRDD( + sc: SparkContext, numRows: Long, numCols: Int, - numPartitions: Int, - seed: Long): RDD[Vector] = { + numPartitions: Int = 0, + seed: Long = Utils.random.nextLong()): RDD[Vector] = { val uniform = new UniformGenerator() - randomVectorRDD(sc, uniform, numRows, numCols, numPartitions, seed) + randomVectorRDD(sc, uniform, numRows, numCols, numPartitionsOrDefault(sc, numPartitions), seed) } /** - * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the - * uniform distribution on [0.0 1.0]. - * - * @param sc SparkContext used to create the RDD. - * @param numRows Number of Vectors in the RDD. - * @param numCols Number of elements in each Vector. - * @param numPartitions Number of partitions in the RDD. - * @return RDD[Vector] with vectors containing i.i.d. samples ~ U[0.0, 1.0]. + * Java-friendly version of [[RandomRDDs#uniformVectorRDD]]. */ - @Experimental - def uniformVectorRDD(sc: SparkContext, + def uniformJavaVectorRDD( + jsc: JavaSparkContext, numRows: Long, numCols: Int, - numPartitions: Int): RDD[Vector] = { - uniformVectorRDD(sc, numRows, numCols, numPartitions, Utils.random.nextLong) + numPartitions: Int, + seed: Long): JavaRDD[Vector] = { + uniformVectorRDD(jsc.sc, numRows, numCols, numPartitions, seed).toJavaRDD() } /** - * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the - * uniform distribution on [0.0 1.0]. - * sc.defaultParallelism used for the number of partitions in the RDD. - * - * @param sc SparkContext used to create the RDD. - * @param numRows Number of Vectors in the RDD. - * @param numCols Number of elements in each Vector. - * @return RDD[Vector] with vectors containing i.i.d. samples ~ U[0.0, 1.0]. + * [[RandomRDDs#uniformJavaVectorRDD]] with the default seed. */ - @Experimental - def uniformVectorRDD(sc: SparkContext, numRows: Long, numCols: Int): RDD[Vector] = { - uniformVectorRDD(sc, numRows, numCols, sc.defaultParallelism, Utils.random.nextLong) + def uniformJavaVectorRDD( + jsc: JavaSparkContext, + numRows: Long, + numCols: Int, + numPartitions: Int): JavaRDD[Vector] = { + uniformVectorRDD(jsc.sc, numRows, numCols, numPartitions).toJavaRDD() + } + + /** + * [[RandomRDDs#uniformJavaVectorRDD]] with the default number of partitions and the default seed. + */ + def uniformJavaVectorRDD( + jsc: JavaSparkContext, + numRows: Long, + numCols: Int): JavaRDD[Vector] = { + uniformVectorRDD(jsc.sc, numRows, numCols).toJavaRDD() } /** - * :: Experimental :: * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the * standard normal distribution. * * @param sc SparkContext used to create the RDD. * @param numRows Number of Vectors in the RDD. * @param numCols Number of elements in each Vector. - * @param numPartitions Number of partitions in the RDD. - * @param seed Seed for the RNG that generates the seed for the generator in each partition. - * @return RDD[Vector] with vectors containing i.i.d. samples ~ N(0.0, 1.0). + * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`). + * @param seed Random seed (default: a random long integer). + * @return RDD[Vector] with vectors containing i.i.d. samples ~ `N(0.0, 1.0)`. + */ + def normalVectorRDD( + sc: SparkContext, + numRows: Long, + numCols: Int, + numPartitions: Int = 0, + seed: Long = Utils.random.nextLong()): RDD[Vector] = { + val normal = new StandardNormalGenerator() + randomVectorRDD(sc, normal, numRows, numCols, numPartitionsOrDefault(sc, numPartitions), seed) + } + + /** + * Java-friendly version of [[RandomRDDs#normalVectorRDD]]. */ - @Experimental - def normalVectorRDD(sc: SparkContext, + def normalJavaVectorRDD( + jsc: JavaSparkContext, numRows: Long, numCols: Int, numPartitions: Int, - seed: Long): RDD[Vector] = { - val uniform = new StandardNormalGenerator() - randomVectorRDD(sc, uniform, numRows, numCols, numPartitions, seed) + seed: Long): JavaRDD[Vector] = { + normalVectorRDD(jsc.sc, numRows, numCols, numPartitions, seed).toJavaRDD() } /** - * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the - * standard normal distribution. - * - * @param sc SparkContext used to create the RDD. - * @param numRows Number of Vectors in the RDD. - * @param numCols Number of elements in each Vector. - * @param numPartitions Number of partitions in the RDD. - * @return RDD[Vector] with vectors containing i.i.d. samples ~ N(0.0, 1.0). + * [[RandomRDDs#normalJavaVectorRDD]] with the default seed. */ - @Experimental - def normalVectorRDD(sc: SparkContext, + def normalJavaVectorRDD( + jsc: JavaSparkContext, numRows: Long, numCols: Int, - numPartitions: Int): RDD[Vector] = { - normalVectorRDD(sc, numRows, numCols, numPartitions, Utils.random.nextLong) + numPartitions: Int): JavaRDD[Vector] = { + normalVectorRDD(jsc.sc, numRows, numCols, numPartitions).toJavaRDD() } /** - * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the - * standard normal distribution. - * sc.defaultParallelism used for the number of partitions in the RDD. - * - * @param sc SparkContext used to create the RDD. - * @param numRows Number of Vectors in the RDD. - * @param numCols Number of elements in each Vector. - * @return RDD[Vector] with vectors containing i.i.d. samples ~ N(0.0, 1.0). + * [[RandomRDDs#normalJavaVectorRDD]] with the default number of partitions and the default seed. */ - @Experimental - def normalVectorRDD(sc: SparkContext, numRows: Long, numCols: Int): RDD[Vector] = { - normalVectorRDD(sc, numRows, numCols, sc.defaultParallelism, Utils.random.nextLong) + def normalJavaVectorRDD( + jsc: JavaSparkContext, + numRows: Long, + numCols: Int): JavaRDD[Vector] = { + normalVectorRDD(jsc.sc, numRows, numCols).toJavaRDD() } /** - * :: Experimental :: * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the * Poisson distribution with the input mean. * @@ -370,124 +315,85 @@ object RandomRDDs { * @param mean Mean, or lambda, for the Poisson distribution. * @param numRows Number of Vectors in the RDD. * @param numCols Number of elements in each Vector. - * @param numPartitions Number of partitions in the RDD. - * @param seed Seed for the RNG that generates the seed for the generator in each partition. + * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`) + * @param seed Random seed (default: a random long integer). * @return RDD[Vector] with vectors containing i.i.d. samples ~ Pois(mean). */ - @Experimental - def poissonVectorRDD(sc: SparkContext, + def poissonVectorRDD( + sc: SparkContext, mean: Double, numRows: Long, numCols: Int, - numPartitions: Int, - seed: Long): RDD[Vector] = { + numPartitions: Int = 0, + seed: Long = Utils.random.nextLong()): RDD[Vector] = { val poisson = new PoissonGenerator(mean) - randomVectorRDD(sc, poisson, numRows, numCols, numPartitions, seed) + randomVectorRDD(sc, poisson, numRows, numCols, numPartitionsOrDefault(sc, numPartitions), seed) } /** - * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the - * Poisson distribution with the input mean. - * - * @param sc SparkContext used to create the RDD. - * @param mean Mean, or lambda, for the Poisson distribution. - * @param numRows Number of Vectors in the RDD. - * @param numCols Number of elements in each Vector. - * @param numPartitions Number of partitions in the RDD. - * @return RDD[Vector] with vectors containing i.i.d. samples ~ Pois(mean). + * Java-friendly version of [[RandomRDDs#poissonVectorRDD]]. */ - @Experimental - def poissonVectorRDD(sc: SparkContext, + def poissonJavaVectorRDD( + jsc: JavaSparkContext, mean: Double, numRows: Long, numCols: Int, - numPartitions: Int): RDD[Vector] = { - poissonVectorRDD(sc, mean, numRows, numCols, numPartitions, Utils.random.nextLong) + numPartitions: Int, + seed: Long): JavaRDD[Vector] = { + poissonVectorRDD(jsc.sc, mean, numRows, numCols, numPartitions, seed).toJavaRDD() } /** - * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the - * Poisson distribution with the input mean. - * sc.defaultParallelism used for the number of partitions in the RDD. - * - * @param sc SparkContext used to create the RDD. - * @param mean Mean, or lambda, for the Poisson distribution. - * @param numRows Number of Vectors in the RDD. - * @param numCols Number of elements in each Vector. - * @return RDD[Vector] with vectors containing i.i.d. samples ~ Pois(mean). + * [[RandomRDDs#poissonJavaVectorRDD]] with the default seed. */ - @Experimental - def poissonVectorRDD(sc: SparkContext, + def poissonJavaVectorRDD( + jsc: JavaSparkContext, mean: Double, numRows: Long, - numCols: Int): RDD[Vector] = { - poissonVectorRDD(sc, mean, numRows, numCols, sc.defaultParallelism, Utils.random.nextLong) + numCols: Int, + numPartitions: Int): JavaRDD[Vector] = { + poissonVectorRDD(jsc.sc, mean, numRows, numCols, numPartitions).toJavaRDD() } /** - * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d. samples produced by the - * input DistributionGenerator. - * - * @param sc SparkContext used to create the RDD. - * @param generator DistributionGenerator used to populate the RDD. - * @param numRows Number of Vectors in the RDD. - * @param numCols Number of elements in each Vector. - * @param numPartitions Number of partitions in the RDD. - * @param seed Seed for the RNG that generates the seed for the generator in each partition. - * @return RDD[Vector] with vectors containing i.i.d. samples produced by generator. + * [[RandomRDDs#poissonJavaVectorRDD]] with the default number of partitions and the default seed. */ - @Experimental - def randomVectorRDD(sc: SparkContext, - generator: RandomDataGenerator[Double], + def poissonJavaVectorRDD( + jsc: JavaSparkContext, + mean: Double, numRows: Long, - numCols: Int, - numPartitions: Int, - seed: Long): RDD[Vector] = { - new RandomVectorRDD(sc, numRows, numCols, numPartitions, generator, seed) + numCols: Int): JavaRDD[Vector] = { + poissonVectorRDD(jsc.sc, mean, numRows, numCols).toJavaRDD() } /** - * :: Experimental :: + * :: DeveloperApi :: * Generates an RDD[Vector] with vectors containing i.i.d. samples produced by the - * input DistributionGenerator. + * input RandomDataGenerator. * * @param sc SparkContext used to create the RDD. - * @param generator DistributionGenerator used to populate the RDD. + * @param generator RandomDataGenerator used to populate the RDD. * @param numRows Number of Vectors in the RDD. * @param numCols Number of elements in each Vector. - * @param numPartitions Number of partitions in the RDD. + * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`). + * @param seed Random seed (default: a random long integer). * @return RDD[Vector] with vectors containing i.i.d. samples produced by generator. */ - @Experimental + @DeveloperApi def randomVectorRDD(sc: SparkContext, generator: RandomDataGenerator[Double], numRows: Long, numCols: Int, - numPartitions: Int): RDD[Vector] = { - randomVectorRDD(sc, generator, numRows, numCols, numPartitions, Utils.random.nextLong) + numPartitions: Int = 0, + seed: Long = Utils.random.nextLong()): RDD[Vector] = { + new RandomVectorRDD( + sc, numRows, numCols, numPartitionsOrDefault(sc, numPartitions), generator, seed) } /** - * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d. samples produced by the - * input DistributionGenerator. - * sc.defaultParallelism used for the number of partitions in the RDD. - * - * @param sc SparkContext used to create the RDD. - * @param generator DistributionGenerator used to populate the RDD. - * @param numRows Number of Vectors in the RDD. - * @param numCols Number of elements in each Vector. - * @return RDD[Vector] with vectors containing i.i.d. samples produced by generator. + * Returns `numPartitions` if it is positive, or `sc.defaultParallelism` otherwise. */ - @Experimental - def randomVectorRDD(sc: SparkContext, - generator: RandomDataGenerator[Double], - numRows: Long, - numCols: Int): RDD[Vector] = { - randomVectorRDD(sc, generator, numRows, numCols, - sc.defaultParallelism, Utils.random.nextLong) + private def numPartitionsOrDefault(sc: SparkContext, numPartitions: Int): Int = { + if (numPartitions > 0) numPartitions else sc.defaultMinPartitions } } diff --git a/mllib/src/test/java/org/apache/spark/mllib/random/JavaRandomRDDsSuite.java b/mllib/src/test/java/org/apache/spark/mllib/random/JavaRandomRDDsSuite.java new file mode 100644 index 0000000000..a725736ca1 --- /dev/null +++ b/mllib/src/test/java/org/apache/spark/mllib/random/JavaRandomRDDsSuite.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.random; + +import com.google.common.collect.Lists; +import org.apache.spark.api.java.JavaRDD; +import org.junit.Assert; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.spark.api.java.JavaDoubleRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.mllib.linalg.Vector; +import static org.apache.spark.mllib.random.RandomRDDs.*; + +public class JavaRandomRDDsSuite { + private transient JavaSparkContext sc; + + @Before + public void setUp() { + sc = new JavaSparkContext("local", "JavaRandomRDDsSuite"); + } + + @After + public void tearDown() { + sc.stop(); + sc = null; + } + + @Test + public void testUniformRDD() { + long m = 1000L; + int p = 2; + long seed = 1L; + JavaDoubleRDD rdd1 = uniformJavaRDD(sc, m); + JavaDoubleRDD rdd2 = uniformJavaRDD(sc, m, p); + JavaDoubleRDD rdd3 = uniformJavaRDD(sc, m, p, seed); + for (JavaDoubleRDD rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) { + Assert.assertEquals(m, rdd.count()); + } + } + + @Test + public void testNormalRDD() { + long m = 1000L; + int p = 2; + long seed = 1L; + JavaDoubleRDD rdd1 = normalJavaRDD(sc, m); + JavaDoubleRDD rdd2 = normalJavaRDD(sc, m, p); + JavaDoubleRDD rdd3 = normalJavaRDD(sc, m, p, seed); + for (JavaDoubleRDD rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) { + Assert.assertEquals(m, rdd.count()); + } + } + + @Test + public void testPoissonRDD() { + double mean = 2.0; + long m = 1000L; + int p = 2; + long seed = 1L; + JavaDoubleRDD rdd1 = poissonJavaRDD(sc, mean, m); + JavaDoubleRDD rdd2 = poissonJavaRDD(sc, mean, m, p); + JavaDoubleRDD rdd3 = poissonJavaRDD(sc, mean, m, p, seed); + for (JavaDoubleRDD rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) { + Assert.assertEquals(m, rdd.count()); + } + } + + @Test + @SuppressWarnings("unchecked") + public void testUniformVectorRDD() { + long m = 100L; + int n = 10; + int p = 2; + long seed = 1L; + JavaRDD rdd1 = uniformJavaVectorRDD(sc, m, n); + JavaRDD rdd2 = uniformJavaVectorRDD(sc, m, n, p); + JavaRDD rdd3 = uniformJavaVectorRDD(sc, m, n, p, seed); + for (JavaRDD rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) { + Assert.assertEquals(m, rdd.count()); + Assert.assertEquals(n, rdd.first().size()); + } + } + + @Test + @SuppressWarnings("unchecked") + public void testNormalVectorRDD() { + long m = 100L; + int n = 10; + int p = 2; + long seed = 1L; + JavaRDD rdd1 = normalJavaVectorRDD(sc, m, n); + JavaRDD rdd2 = normalJavaVectorRDD(sc, m, n, p); + JavaRDD rdd3 = normalJavaVectorRDD(sc, m, n, p, seed); + for (JavaRDD rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) { + Assert.assertEquals(m, rdd.count()); + Assert.assertEquals(n, rdd.first().size()); + } + } + + @Test + @SuppressWarnings("unchecked") + public void testPoissonVectorRDD() { + double mean = 2.0; + long m = 100L; + int n = 10; + int p = 2; + long seed = 1L; + JavaRDD rdd1 = poissonJavaVectorRDD(sc, mean, m, n); + JavaRDD rdd2 = poissonJavaVectorRDD(sc, mean, m, n, p); + JavaRDD rdd3 = poissonJavaVectorRDD(sc, mean, m, n, p, seed); + for (JavaRDD rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) { + Assert.assertEquals(m, rdd.count()); + Assert.assertEquals(n, rdd.first().size()); + } + } +} -- cgit v1.2.3