diff options
Diffstat (limited to 'mllib/src/main')
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala | 18 | ||||
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala | 476 |
2 files changed, 200 insertions, 294 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala index 9cab49f6ed..28179fbc45 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala @@ -20,14 +20,14 @@ package org.apache.spark.mllib.random import cern.jet.random.Poisson import cern.jet.random.engine.DRand -import org.apache.spark.annotation.Experimental +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom} /** - * :: Experimental :: + * :: DeveloperApi :: * Trait for random data generators that generate i.i.d. data. */ -@Experimental +@DeveloperApi trait RandomDataGenerator[T] extends Pseudorandom with Serializable { /** @@ -43,10 +43,10 @@ trait RandomDataGenerator[T] extends Pseudorandom with Serializable { } /** - * :: Experimental :: + * :: DeveloperApi :: * Generates i.i.d. samples from U[0.0, 1.0] */ -@Experimental +@DeveloperApi class UniformGenerator extends RandomDataGenerator[Double] { // XORShiftRandom for better performance. Thread safety isn't necessary here. @@ -62,10 +62,10 @@ class UniformGenerator extends RandomDataGenerator[Double] { } /** - * :: Experimental :: + * :: DeveloperApi :: * Generates i.i.d. samples from the standard normal distribution. */ -@Experimental +@DeveloperApi class StandardNormalGenerator extends RandomDataGenerator[Double] { // XORShiftRandom for better performance. Thread safety isn't necessary here. @@ -81,12 +81,12 @@ class StandardNormalGenerator extends RandomDataGenerator[Double] { } /** - * :: Experimental :: + * :: DeveloperApi :: * Generates i.i.d. samples from the Poisson distribution with the given mean. * * @param mean mean for the Poisson distribution. */ -@Experimental +@DeveloperApi class PoissonGenerator(val mean: Double) extends RandomDataGenerator[Double] { private var rng = new Poisson(mean, new DRand) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala index 3627036952..c5f4b08432 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala @@ -20,9 +20,10 @@ package org.apache.spark.mllib.random import scala.reflect.ClassTag import org.apache.spark.SparkContext -import org.apache.spark.annotation.Experimental +import org.apache.spark.annotation.{DeveloperApi, Experimental} +import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD, JavaSparkContext} import org.apache.spark.mllib.linalg.Vector -import org.apache.spark.mllib.rdd.{RandomVectorRDD, RandomRDD} +import org.apache.spark.mllib.rdd.{RandomRDD, RandomVectorRDD} import org.apache.spark.rdd.RDD import org.apache.spark.util.Utils @@ -34,335 +35,279 @@ import org.apache.spark.util.Utils object RandomRDDs { /** - * :: Experimental :: - * Generates an RDD comprised of i.i.d. samples from the uniform distribution on [0.0, 1.0]. + * Generates an RDD comprised of i.i.d. samples from the uniform distribution `U(0.0, 1.0)`. * - * To transform the distribution in the generated RDD from U[0.0, 1.0] to U[a, b], use - * `RandomRDDGenerators.uniformRDD(sc, n, p, seed).map(v => a + (b - a) * v)`. + * To transform the distribution in the generated RDD from `U(0.0, 1.0)` to `U(a, b)`, use + * `RandomRDDs.uniformRDD(sc, n, p, seed).map(v => a + (b - a) * v)`. * * @param sc SparkContext used to create the RDD. * @param size Size of the RDD. - * @param numPartitions Number of partitions in the RDD. - * @param seed Seed for the RNG that generates the seed for the generator in each partition. - * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0]. + * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`). + * @param seed Random seed (default: a random long integer). + * @return RDD[Double] comprised of i.i.d. samples ~ `U(0.0, 1.0)`. */ - @Experimental - def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = { + def uniformRDD( + sc: SparkContext, + size: Long, + numPartitions: Int = 0, + seed: Long = Utils.random.nextLong()): RDD[Double] = { val uniform = new UniformGenerator() - randomRDD(sc, uniform, size, numPartitions, seed) + randomRDD(sc, uniform, size, numPartitionsOrDefault(sc, numPartitions), seed) } /** - * :: Experimental :: - * Generates an RDD comprised of i.i.d. samples from the uniform distribution on [0.0, 1.0]. - * - * To transform the distribution in the generated RDD from U[0.0, 1.0] to U[a, b], use - * `RandomRDDGenerators.uniformRDD(sc, n, p).map(v => a + (b - a) * v)`. - * - * @param sc SparkContext used to create the RDD. - * @param size Size of the RDD. - * @param numPartitions Number of partitions in the RDD. - * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0]. + * Java-friendly version of [[RandomRDDs#uniformRDD]]. */ - @Experimental - def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = { - uniformRDD(sc, size, numPartitions, Utils.random.nextLong) + def uniformJavaRDD( + jsc: JavaSparkContext, + size: Long, + numPartitions: Int, + seed: Long): JavaDoubleRDD = { + JavaDoubleRDD.fromRDD(uniformRDD(jsc.sc, size, numPartitions, seed)) } /** - * :: Experimental :: - * Generates an RDD comprised of i.i.d. samples from the uniform distribution on [0.0, 1.0]. - * sc.defaultParallelism used for the number of partitions in the RDD. - * - * To transform the distribution in the generated RDD from U[0.0, 1.0] to U[a, b], use - * `RandomRDDGenerators.uniformRDD(sc, n).map(v => a + (b - a) * v)`. - * - * @param sc SparkContext used to create the RDD. - * @param size Size of the RDD. - * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0]. + * [[RandomRDDs#uniformJavaRDD]] with the default seed. */ - @Experimental - def uniformRDD(sc: SparkContext, size: Long): RDD[Double] = { - uniformRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong) + def uniformJavaRDD(jsc: JavaSparkContext, size: Long, numPartitions: Int): JavaDoubleRDD = { + JavaDoubleRDD.fromRDD(uniformRDD(jsc.sc, size, numPartitions)) } /** - * :: Experimental :: - * Generates an RDD comprised of i.i.d. samples from the standard normal distribution. - * - * To transform the distribution in the generated RDD from standard normal to some other normal - * N(mean, sigma), use `RandomRDDGenerators.normalRDD(sc, n, p, seed).map(v => mean + sigma * v)`. - * - * @param sc SparkContext used to create the RDD. - * @param size Size of the RDD. - * @param numPartitions Number of partitions in the RDD. - * @param seed Seed for the RNG that generates the seed for the generator in each partition. - * @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0). + * [[RandomRDDs#uniformJavaRDD]] with the default number of partitions and the default seed. */ - @Experimental - def normalRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = { - val normal = new StandardNormalGenerator() - randomRDD(sc, normal, size, numPartitions, seed) + def uniformJavaRDD(jsc: JavaSparkContext, size: Long): JavaDoubleRDD = { + JavaDoubleRDD.fromRDD(uniformRDD(jsc.sc, size)) } /** - * :: Experimental :: * Generates an RDD comprised of i.i.d. samples from the standard normal distribution. * * To transform the distribution in the generated RDD from standard normal to some other normal - * N(mean, sigma), use `RandomRDDGenerators.normalRDD(sc, n, p).map(v => mean + sigma * v)`. + * `N(mean, sigma^2^)`, use `RandomRDDs.normalRDD(sc, n, p, seed).map(v => mean + sigma * v)`. * * @param sc SparkContext used to create the RDD. * @param size Size of the RDD. - * @param numPartitions Number of partitions in the RDD. + * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`). + * @param seed Random seed (default: a random long integer). * @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0). */ - @Experimental - def normalRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = { - normalRDD(sc, size, numPartitions, Utils.random.nextLong) + def normalRDD( + sc: SparkContext, + size: Long, + numPartitions: Int = 0, + seed: Long = Utils.random.nextLong()): RDD[Double] = { + val normal = new StandardNormalGenerator() + randomRDD(sc, normal, size, numPartitionsOrDefault(sc, numPartitions), seed) } /** - * :: Experimental :: - * Generates an RDD comprised of i.i.d. samples from the standard normal distribution. - * sc.defaultParallelism used for the number of partitions in the RDD. - * - * To transform the distribution in the generated RDD from standard normal to some other normal - * N(mean, sigma), use `RandomRDDGenerators.normalRDD(sc, n).map(v => mean + sigma * v)`. - * - * @param sc SparkContext used to create the RDD. - * @param size Size of the RDD. - * @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0). + * Java-friendly version of [[RandomRDDs#normalRDD]]. */ - @Experimental - def normalRDD(sc: SparkContext, size: Long): RDD[Double] = { - normalRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong) + def normalJavaRDD( + jsc: JavaSparkContext, + size: Long, + numPartitions: Int, + seed: Long): JavaDoubleRDD = { + JavaDoubleRDD.fromRDD(normalRDD(jsc.sc, size, numPartitions, seed)) } /** - * :: Experimental :: - * Generates an RDD comprised of i.i.d. samples from the Poisson distribution with the input mean. - * - * @param sc SparkContext used to create the RDD. - * @param mean Mean, or lambda, for the Poisson distribution. - * @param size Size of the RDD. - * @param numPartitions Number of partitions in the RDD. - * @param seed Seed for the RNG that generates the seed for the generator in each partition. - * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean). + * [[RandomRDDs#normalJavaRDD]] with the default seed. */ - @Experimental - def poissonRDD(sc: SparkContext, - mean: Double, - size: Long, - numPartitions: Int, - seed: Long): RDD[Double] = { - val poisson = new PoissonGenerator(mean) - randomRDD(sc, poisson, size, numPartitions, seed) + def normalJavaRDD(jsc: JavaSparkContext, size: Long, numPartitions: Int): JavaDoubleRDD = { + JavaDoubleRDD.fromRDD(normalRDD(jsc.sc, size, numPartitions)) } /** - * :: Experimental :: - * Generates an RDD comprised of i.i.d. samples from the Poisson distribution with the input mean. - * - * @param sc SparkContext used to create the RDD. - * @param mean Mean, or lambda, for the Poisson distribution. - * @param size Size of the RDD. - * @param numPartitions Number of partitions in the RDD. - * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean). + * [[RandomRDDs#normalJavaRDD]] with the default number of partitions and the default seed. */ - @Experimental - def poissonRDD(sc: SparkContext, mean: Double, size: Long, numPartitions: Int): RDD[Double] = { - poissonRDD(sc, mean, size, numPartitions, Utils.random.nextLong) + def normalJavaRDD(jsc: JavaSparkContext, size: Long): JavaDoubleRDD = { + JavaDoubleRDD.fromRDD(normalRDD(jsc.sc, size)) } /** - * :: Experimental :: * Generates an RDD comprised of i.i.d. samples from the Poisson distribution with the input mean. - * sc.defaultParallelism used for the number of partitions in the RDD. * * @param sc SparkContext used to create the RDD. * @param mean Mean, or lambda, for the Poisson distribution. * @param size Size of the RDD. + * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`). + * @param seed Random seed (default: a random long integer). * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean). */ - @Experimental - def poissonRDD(sc: SparkContext, mean: Double, size: Long): RDD[Double] = { - poissonRDD(sc, mean, size, sc.defaultParallelism, Utils.random.nextLong) + def poissonRDD( + sc: SparkContext, + mean: Double, + size: Long, + numPartitions: Int = 0, + seed: Long = Utils.random.nextLong()): RDD[Double] = { + val poisson = new PoissonGenerator(mean) + randomRDD(sc, poisson, size, numPartitionsOrDefault(sc, numPartitions), seed) } /** - * :: Experimental :: - * Generates an RDD comprised of i.i.d. samples produced by the input DistributionGenerator. - * - * @param sc SparkContext used to create the RDD. - * @param generator DistributionGenerator used to populate the RDD. - * @param size Size of the RDD. - * @param numPartitions Number of partitions in the RDD. - * @param seed Seed for the RNG that generates the seed for the generator in each partition. - * @return RDD[Double] comprised of i.i.d. samples produced by generator. + * Java-friendly version of [[RandomRDDs#poissonRDD]]. */ - @Experimental - def randomRDD[T: ClassTag](sc: SparkContext, - generator: RandomDataGenerator[T], + def poissonJavaRDD( + jsc: JavaSparkContext, + mean: Double, size: Long, numPartitions: Int, - seed: Long): RDD[T] = { - new RandomRDD[T](sc, size, numPartitions, generator, seed) + seed: Long): JavaDoubleRDD = { + JavaDoubleRDD.fromRDD(poissonRDD(jsc.sc, mean, size, numPartitions, seed)) } /** - * :: Experimental :: - * Generates an RDD comprised of i.i.d. samples produced by the input DistributionGenerator. - * - * @param sc SparkContext used to create the RDD. - * @param generator DistributionGenerator used to populate the RDD. - * @param size Size of the RDD. - * @param numPartitions Number of partitions in the RDD. - * @return RDD[Double] comprised of i.i.d. samples produced by generator. + * [[RandomRDDs#poissonJavaRDD]] with the default seed. */ - @Experimental - def randomRDD[T: ClassTag](sc: SparkContext, - generator: RandomDataGenerator[T], + def poissonJavaRDD( + jsc: JavaSparkContext, + mean: Double, size: Long, - numPartitions: Int): RDD[T] = { - randomRDD[T](sc, generator, size, numPartitions, Utils.random.nextLong) + numPartitions: Int): JavaDoubleRDD = { + JavaDoubleRDD.fromRDD(poissonRDD(jsc.sc, mean, size, numPartitions)) } /** - * :: Experimental :: - * Generates an RDD comprised of i.i.d. samples produced by the input DistributionGenerator. - * sc.defaultParallelism used for the number of partitions in the RDD. + * [[RandomRDDs#poissonJavaRDD]] with the default number of partitions and the default seed. + */ + def poissonJavaRDD(jsc: JavaSparkContext, mean: Double, size: Long): JavaDoubleRDD = { + JavaDoubleRDD.fromRDD(poissonRDD(jsc.sc, mean, size)) + } + + /** + * :: DeveloperApi :: + * Generates an RDD comprised of i.i.d. samples produced by the input RandomDataGenerator. * * @param sc SparkContext used to create the RDD. - * @param generator DistributionGenerator used to populate the RDD. + * @param generator RandomDataGenerator used to populate the RDD. * @param size Size of the RDD. + * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`). + * @param seed Random seed (default: a random long integer). * @return RDD[Double] comprised of i.i.d. samples produced by generator. */ - @Experimental - def randomRDD[T: ClassTag](sc: SparkContext, + @DeveloperApi + def randomRDD[T: ClassTag]( + sc: SparkContext, generator: RandomDataGenerator[T], - size: Long): RDD[T] = { - randomRDD[T](sc, generator, size, sc.defaultParallelism, Utils.random.nextLong) + size: Long, + numPartitions: Int = 0, + seed: Long = Utils.random.nextLong()): RDD[T] = { + new RandomRDD[T](sc, size, numPartitionsOrDefault(sc, numPartitions), generator, seed) } // TODO Generate RDD[Vector] from multivariate distributions. /** - * :: Experimental :: * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the - * uniform distribution on [0.0 1.0]. + * uniform distribution on `U(0.0, 1.0)`. * * @param sc SparkContext used to create the RDD. * @param numRows Number of Vectors in the RDD. * @param numCols Number of elements in each Vector. * @param numPartitions Number of partitions in the RDD. * @param seed Seed for the RNG that generates the seed for the generator in each partition. - * @return RDD[Vector] with vectors containing i.i.d samples ~ U[0.0, 1.0]. + * @return RDD[Vector] with vectors containing i.i.d samples ~ `U(0.0, 1.0)`. */ - @Experimental - def uniformVectorRDD(sc: SparkContext, + def uniformVectorRDD( + sc: SparkContext, numRows: Long, numCols: Int, - numPartitions: Int, - seed: Long): RDD[Vector] = { + numPartitions: Int = 0, + seed: Long = Utils.random.nextLong()): RDD[Vector] = { val uniform = new UniformGenerator() - randomVectorRDD(sc, uniform, numRows, numCols, numPartitions, seed) + randomVectorRDD(sc, uniform, numRows, numCols, numPartitionsOrDefault(sc, numPartitions), seed) } /** - * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the - * uniform distribution on [0.0 1.0]. - * - * @param sc SparkContext used to create the RDD. - * @param numRows Number of Vectors in the RDD. - * @param numCols Number of elements in each Vector. - * @param numPartitions Number of partitions in the RDD. - * @return RDD[Vector] with vectors containing i.i.d. samples ~ U[0.0, 1.0]. + * Java-friendly version of [[RandomRDDs#uniformVectorRDD]]. */ - @Experimental - def uniformVectorRDD(sc: SparkContext, + def uniformJavaVectorRDD( + jsc: JavaSparkContext, numRows: Long, numCols: Int, - numPartitions: Int): RDD[Vector] = { - uniformVectorRDD(sc, numRows, numCols, numPartitions, Utils.random.nextLong) + numPartitions: Int, + seed: Long): JavaRDD[Vector] = { + uniformVectorRDD(jsc.sc, numRows, numCols, numPartitions, seed).toJavaRDD() } /** - * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the - * uniform distribution on [0.0 1.0]. - * sc.defaultParallelism used for the number of partitions in the RDD. - * - * @param sc SparkContext used to create the RDD. - * @param numRows Number of Vectors in the RDD. - * @param numCols Number of elements in each Vector. - * @return RDD[Vector] with vectors containing i.i.d. samples ~ U[0.0, 1.0]. + * [[RandomRDDs#uniformJavaVectorRDD]] with the default seed. */ - @Experimental - def uniformVectorRDD(sc: SparkContext, numRows: Long, numCols: Int): RDD[Vector] = { - uniformVectorRDD(sc, numRows, numCols, sc.defaultParallelism, Utils.random.nextLong) + def uniformJavaVectorRDD( + jsc: JavaSparkContext, + numRows: Long, + numCols: Int, + numPartitions: Int): JavaRDD[Vector] = { + uniformVectorRDD(jsc.sc, numRows, numCols, numPartitions).toJavaRDD() + } + + /** + * [[RandomRDDs#uniformJavaVectorRDD]] with the default number of partitions and the default seed. + */ + def uniformJavaVectorRDD( + jsc: JavaSparkContext, + numRows: Long, + numCols: Int): JavaRDD[Vector] = { + uniformVectorRDD(jsc.sc, numRows, numCols).toJavaRDD() } /** - * :: Experimental :: * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the * standard normal distribution. * * @param sc SparkContext used to create the RDD. * @param numRows Number of Vectors in the RDD. * @param numCols Number of elements in each Vector. - * @param numPartitions Number of partitions in the RDD. - * @param seed Seed for the RNG that generates the seed for the generator in each partition. - * @return RDD[Vector] with vectors containing i.i.d. samples ~ N(0.0, 1.0). + * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`). + * @param seed Random seed (default: a random long integer). + * @return RDD[Vector] with vectors containing i.i.d. samples ~ `N(0.0, 1.0)`. + */ + def normalVectorRDD( + sc: SparkContext, + numRows: Long, + numCols: Int, + numPartitions: Int = 0, + seed: Long = Utils.random.nextLong()): RDD[Vector] = { + val normal = new StandardNormalGenerator() + randomVectorRDD(sc, normal, numRows, numCols, numPartitionsOrDefault(sc, numPartitions), seed) + } + + /** + * Java-friendly version of [[RandomRDDs#normalVectorRDD]]. */ - @Experimental - def normalVectorRDD(sc: SparkContext, + def normalJavaVectorRDD( + jsc: JavaSparkContext, numRows: Long, numCols: Int, numPartitions: Int, - seed: Long): RDD[Vector] = { - val uniform = new StandardNormalGenerator() - randomVectorRDD(sc, uniform, numRows, numCols, numPartitions, seed) + seed: Long): JavaRDD[Vector] = { + normalVectorRDD(jsc.sc, numRows, numCols, numPartitions, seed).toJavaRDD() } /** - * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the - * standard normal distribution. - * - * @param sc SparkContext used to create the RDD. - * @param numRows Number of Vectors in the RDD. - * @param numCols Number of elements in each Vector. - * @param numPartitions Number of partitions in the RDD. - * @return RDD[Vector] with vectors containing i.i.d. samples ~ N(0.0, 1.0). + * [[RandomRDDs#normalJavaVectorRDD]] with the default seed. */ - @Experimental - def normalVectorRDD(sc: SparkContext, + def normalJavaVectorRDD( + jsc: JavaSparkContext, numRows: Long, numCols: Int, - numPartitions: Int): RDD[Vector] = { - normalVectorRDD(sc, numRows, numCols, numPartitions, Utils.random.nextLong) + numPartitions: Int): JavaRDD[Vector] = { + normalVectorRDD(jsc.sc, numRows, numCols, numPartitions).toJavaRDD() } /** - * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the - * standard normal distribution. - * sc.defaultParallelism used for the number of partitions in the RDD. - * - * @param sc SparkContext used to create the RDD. - * @param numRows Number of Vectors in the RDD. - * @param numCols Number of elements in each Vector. - * @return RDD[Vector] with vectors containing i.i.d. samples ~ N(0.0, 1.0). + * [[RandomRDDs#normalJavaVectorRDD]] with the default number of partitions and the default seed. */ - @Experimental - def normalVectorRDD(sc: SparkContext, numRows: Long, numCols: Int): RDD[Vector] = { - normalVectorRDD(sc, numRows, numCols, sc.defaultParallelism, Utils.random.nextLong) + def normalJavaVectorRDD( + jsc: JavaSparkContext, + numRows: Long, + numCols: Int): JavaRDD[Vector] = { + normalVectorRDD(jsc.sc, numRows, numCols).toJavaRDD() } /** - * :: Experimental :: * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the * Poisson distribution with the input mean. * @@ -370,124 +315,85 @@ object RandomRDDs { * @param mean Mean, or lambda, for the Poisson distribution. * @param numRows Number of Vectors in the RDD. * @param numCols Number of elements in each Vector. - * @param numPartitions Number of partitions in the RDD. - * @param seed Seed for the RNG that generates the seed for the generator in each partition. + * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`) + * @param seed Random seed (default: a random long integer). * @return RDD[Vector] with vectors containing i.i.d. samples ~ Pois(mean). */ - @Experimental - def poissonVectorRDD(sc: SparkContext, + def poissonVectorRDD( + sc: SparkContext, mean: Double, numRows: Long, numCols: Int, - numPartitions: Int, - seed: Long): RDD[Vector] = { + numPartitions: Int = 0, + seed: Long = Utils.random.nextLong()): RDD[Vector] = { val poisson = new PoissonGenerator(mean) - randomVectorRDD(sc, poisson, numRows, numCols, numPartitions, seed) + randomVectorRDD(sc, poisson, numRows, numCols, numPartitionsOrDefault(sc, numPartitions), seed) } /** - * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the - * Poisson distribution with the input mean. - * - * @param sc SparkContext used to create the RDD. - * @param mean Mean, or lambda, for the Poisson distribution. - * @param numRows Number of Vectors in the RDD. - * @param numCols Number of elements in each Vector. - * @param numPartitions Number of partitions in the RDD. - * @return RDD[Vector] with vectors containing i.i.d. samples ~ Pois(mean). + * Java-friendly version of [[RandomRDDs#poissonVectorRDD]]. */ - @Experimental - def poissonVectorRDD(sc: SparkContext, + def poissonJavaVectorRDD( + jsc: JavaSparkContext, mean: Double, numRows: Long, numCols: Int, - numPartitions: Int): RDD[Vector] = { - poissonVectorRDD(sc, mean, numRows, numCols, numPartitions, Utils.random.nextLong) + numPartitions: Int, + seed: Long): JavaRDD[Vector] = { + poissonVectorRDD(jsc.sc, mean, numRows, numCols, numPartitions, seed).toJavaRDD() } /** - * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the - * Poisson distribution with the input mean. - * sc.defaultParallelism used for the number of partitions in the RDD. - * - * @param sc SparkContext used to create the RDD. - * @param mean Mean, or lambda, for the Poisson distribution. - * @param numRows Number of Vectors in the RDD. - * @param numCols Number of elements in each Vector. - * @return RDD[Vector] with vectors containing i.i.d. samples ~ Pois(mean). + * [[RandomRDDs#poissonJavaVectorRDD]] with the default seed. */ - @Experimental - def poissonVectorRDD(sc: SparkContext, + def poissonJavaVectorRDD( + jsc: JavaSparkContext, mean: Double, numRows: Long, - numCols: Int): RDD[Vector] = { - poissonVectorRDD(sc, mean, numRows, numCols, sc.defaultParallelism, Utils.random.nextLong) + numCols: Int, + numPartitions: Int): JavaRDD[Vector] = { + poissonVectorRDD(jsc.sc, mean, numRows, numCols, numPartitions).toJavaRDD() } /** - * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d. samples produced by the - * input DistributionGenerator. - * - * @param sc SparkContext used to create the RDD. - * @param generator DistributionGenerator used to populate the RDD. - * @param numRows Number of Vectors in the RDD. - * @param numCols Number of elements in each Vector. - * @param numPartitions Number of partitions in the RDD. - * @param seed Seed for the RNG that generates the seed for the generator in each partition. - * @return RDD[Vector] with vectors containing i.i.d. samples produced by generator. + * [[RandomRDDs#poissonJavaVectorRDD]] with the default number of partitions and the default seed. */ - @Experimental - def randomVectorRDD(sc: SparkContext, - generator: RandomDataGenerator[Double], + def poissonJavaVectorRDD( + jsc: JavaSparkContext, + mean: Double, numRows: Long, - numCols: Int, - numPartitions: Int, - seed: Long): RDD[Vector] = { - new RandomVectorRDD(sc, numRows, numCols, numPartitions, generator, seed) + numCols: Int): JavaRDD[Vector] = { + poissonVectorRDD(jsc.sc, mean, numRows, numCols).toJavaRDD() } /** - * :: Experimental :: + * :: DeveloperApi :: * Generates an RDD[Vector] with vectors containing i.i.d. samples produced by the - * input DistributionGenerator. + * input RandomDataGenerator. * * @param sc SparkContext used to create the RDD. - * @param generator DistributionGenerator used to populate the RDD. + * @param generator RandomDataGenerator used to populate the RDD. * @param numRows Number of Vectors in the RDD. * @param numCols Number of elements in each Vector. - * @param numPartitions Number of partitions in the RDD. + * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`). + * @param seed Random seed (default: a random long integer). * @return RDD[Vector] with vectors containing i.i.d. samples produced by generator. */ - @Experimental + @DeveloperApi def randomVectorRDD(sc: SparkContext, generator: RandomDataGenerator[Double], numRows: Long, numCols: Int, - numPartitions: Int): RDD[Vector] = { - randomVectorRDD(sc, generator, numRows, numCols, numPartitions, Utils.random.nextLong) + numPartitions: Int = 0, + seed: Long = Utils.random.nextLong()): RDD[Vector] = { + new RandomVectorRDD( + sc, numRows, numCols, numPartitionsOrDefault(sc, numPartitions), generator, seed) } /** - * :: Experimental :: - * Generates an RDD[Vector] with vectors containing i.i.d. samples produced by the - * input DistributionGenerator. - * sc.defaultParallelism used for the number of partitions in the RDD. - * - * @param sc SparkContext used to create the RDD. - * @param generator DistributionGenerator used to populate the RDD. - * @param numRows Number of Vectors in the RDD. - * @param numCols Number of elements in each Vector. - * @return RDD[Vector] with vectors containing i.i.d. samples produced by generator. + * Returns `numPartitions` if it is positive, or `sc.defaultParallelism` otherwise. */ - @Experimental - def randomVectorRDD(sc: SparkContext, - generator: RandomDataGenerator[Double], - numRows: Long, - numCols: Int): RDD[Vector] = { - randomVectorRDD(sc, generator, numRows, numCols, - sc.defaultParallelism, Utils.random.nextLong) + private def numPartitionsOrDefault(sc: SparkContext, numPartitions: Int): Int = { + if (numPartitions > 0) numPartitions else sc.defaultMinPartitions } } |