aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala18
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala476
-rw-r--r--mllib/src/test/java/org/apache/spark/mllib/random/JavaRandomRDDsSuite.java134
3 files changed, 334 insertions, 294 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala
index 9cab49f6ed..28179fbc45 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala
@@ -20,14 +20,14 @@ package org.apache.spark.mllib.random
import cern.jet.random.Poisson
import cern.jet.random.engine.DRand
-import org.apache.spark.annotation.Experimental
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom}
/**
- * :: Experimental ::
+ * :: DeveloperApi ::
* Trait for random data generators that generate i.i.d. data.
*/
-@Experimental
+@DeveloperApi
trait RandomDataGenerator[T] extends Pseudorandom with Serializable {
/**
@@ -43,10 +43,10 @@ trait RandomDataGenerator[T] extends Pseudorandom with Serializable {
}
/**
- * :: Experimental ::
+ * :: DeveloperApi ::
* Generates i.i.d. samples from U[0.0, 1.0]
*/
-@Experimental
+@DeveloperApi
class UniformGenerator extends RandomDataGenerator[Double] {
// XORShiftRandom for better performance. Thread safety isn't necessary here.
@@ -62,10 +62,10 @@ class UniformGenerator extends RandomDataGenerator[Double] {
}
/**
- * :: Experimental ::
+ * :: DeveloperApi ::
* Generates i.i.d. samples from the standard normal distribution.
*/
-@Experimental
+@DeveloperApi
class StandardNormalGenerator extends RandomDataGenerator[Double] {
// XORShiftRandom for better performance. Thread safety isn't necessary here.
@@ -81,12 +81,12 @@ class StandardNormalGenerator extends RandomDataGenerator[Double] {
}
/**
- * :: Experimental ::
+ * :: DeveloperApi ::
* Generates i.i.d. samples from the Poisson distribution with the given mean.
*
* @param mean mean for the Poisson distribution.
*/
-@Experimental
+@DeveloperApi
class PoissonGenerator(val mean: Double) extends RandomDataGenerator[Double] {
private var rng = new Poisson(mean, new DRand)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala
index 3627036952..c5f4b08432 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala
@@ -20,9 +20,10 @@ package org.apache.spark.mllib.random
import scala.reflect.ClassTag
import org.apache.spark.SparkContext
-import org.apache.spark.annotation.Experimental
+import org.apache.spark.annotation.{DeveloperApi, Experimental}
+import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD, JavaSparkContext}
import org.apache.spark.mllib.linalg.Vector
-import org.apache.spark.mllib.rdd.{RandomVectorRDD, RandomRDD}
+import org.apache.spark.mllib.rdd.{RandomRDD, RandomVectorRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils
@@ -34,335 +35,279 @@ import org.apache.spark.util.Utils
object RandomRDDs {
/**
- * :: Experimental ::
- * Generates an RDD comprised of i.i.d. samples from the uniform distribution on [0.0, 1.0].
+ * Generates an RDD comprised of i.i.d. samples from the uniform distribution `U(0.0, 1.0)`.
*
- * To transform the distribution in the generated RDD from U[0.0, 1.0] to U[a, b], use
- * `RandomRDDGenerators.uniformRDD(sc, n, p, seed).map(v => a + (b - a) * v)`.
+ * To transform the distribution in the generated RDD from `U(0.0, 1.0)` to `U(a, b)`, use
+ * `RandomRDDs.uniformRDD(sc, n, p, seed).map(v => a + (b - a) * v)`.
*
* @param sc SparkContext used to create the RDD.
* @param size Size of the RDD.
- * @param numPartitions Number of partitions in the RDD.
- * @param seed Seed for the RNG that generates the seed for the generator in each partition.
- * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0].
+ * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`).
+ * @param seed Random seed (default: a random long integer).
+ * @return RDD[Double] comprised of i.i.d. samples ~ `U(0.0, 1.0)`.
*/
- @Experimental
- def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = {
+ def uniformRDD(
+ sc: SparkContext,
+ size: Long,
+ numPartitions: Int = 0,
+ seed: Long = Utils.random.nextLong()): RDD[Double] = {
val uniform = new UniformGenerator()
- randomRDD(sc, uniform, size, numPartitions, seed)
+ randomRDD(sc, uniform, size, numPartitionsOrDefault(sc, numPartitions), seed)
}
/**
- * :: Experimental ::
- * Generates an RDD comprised of i.i.d. samples from the uniform distribution on [0.0, 1.0].
- *
- * To transform the distribution in the generated RDD from U[0.0, 1.0] to U[a, b], use
- * `RandomRDDGenerators.uniformRDD(sc, n, p).map(v => a + (b - a) * v)`.
- *
- * @param sc SparkContext used to create the RDD.
- * @param size Size of the RDD.
- * @param numPartitions Number of partitions in the RDD.
- * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0].
+ * Java-friendly version of [[RandomRDDs#uniformRDD]].
*/
- @Experimental
- def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = {
- uniformRDD(sc, size, numPartitions, Utils.random.nextLong)
+ def uniformJavaRDD(
+ jsc: JavaSparkContext,
+ size: Long,
+ numPartitions: Int,
+ seed: Long): JavaDoubleRDD = {
+ JavaDoubleRDD.fromRDD(uniformRDD(jsc.sc, size, numPartitions, seed))
}
/**
- * :: Experimental ::
- * Generates an RDD comprised of i.i.d. samples from the uniform distribution on [0.0, 1.0].
- * sc.defaultParallelism used for the number of partitions in the RDD.
- *
- * To transform the distribution in the generated RDD from U[0.0, 1.0] to U[a, b], use
- * `RandomRDDGenerators.uniformRDD(sc, n).map(v => a + (b - a) * v)`.
- *
- * @param sc SparkContext used to create the RDD.
- * @param size Size of the RDD.
- * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0].
+ * [[RandomRDDs#uniformJavaRDD]] with the default seed.
*/
- @Experimental
- def uniformRDD(sc: SparkContext, size: Long): RDD[Double] = {
- uniformRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong)
+ def uniformJavaRDD(jsc: JavaSparkContext, size: Long, numPartitions: Int): JavaDoubleRDD = {
+ JavaDoubleRDD.fromRDD(uniformRDD(jsc.sc, size, numPartitions))
}
/**
- * :: Experimental ::
- * Generates an RDD comprised of i.i.d. samples from the standard normal distribution.
- *
- * To transform the distribution in the generated RDD from standard normal to some other normal
- * N(mean, sigma), use `RandomRDDGenerators.normalRDD(sc, n, p, seed).map(v => mean + sigma * v)`.
- *
- * @param sc SparkContext used to create the RDD.
- * @param size Size of the RDD.
- * @param numPartitions Number of partitions in the RDD.
- * @param seed Seed for the RNG that generates the seed for the generator in each partition.
- * @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0).
+ * [[RandomRDDs#uniformJavaRDD]] with the default number of partitions and the default seed.
*/
- @Experimental
- def normalRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = {
- val normal = new StandardNormalGenerator()
- randomRDD(sc, normal, size, numPartitions, seed)
+ def uniformJavaRDD(jsc: JavaSparkContext, size: Long): JavaDoubleRDD = {
+ JavaDoubleRDD.fromRDD(uniformRDD(jsc.sc, size))
}
/**
- * :: Experimental ::
* Generates an RDD comprised of i.i.d. samples from the standard normal distribution.
*
* To transform the distribution in the generated RDD from standard normal to some other normal
- * N(mean, sigma), use `RandomRDDGenerators.normalRDD(sc, n, p).map(v => mean + sigma * v)`.
+ * `N(mean, sigma^2^)`, use `RandomRDDs.normalRDD(sc, n, p, seed).map(v => mean + sigma * v)`.
*
* @param sc SparkContext used to create the RDD.
* @param size Size of the RDD.
- * @param numPartitions Number of partitions in the RDD.
+ * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`).
+ * @param seed Random seed (default: a random long integer).
* @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0).
*/
- @Experimental
- def normalRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = {
- normalRDD(sc, size, numPartitions, Utils.random.nextLong)
+ def normalRDD(
+ sc: SparkContext,
+ size: Long,
+ numPartitions: Int = 0,
+ seed: Long = Utils.random.nextLong()): RDD[Double] = {
+ val normal = new StandardNormalGenerator()
+ randomRDD(sc, normal, size, numPartitionsOrDefault(sc, numPartitions), seed)
}
/**
- * :: Experimental ::
- * Generates an RDD comprised of i.i.d. samples from the standard normal distribution.
- * sc.defaultParallelism used for the number of partitions in the RDD.
- *
- * To transform the distribution in the generated RDD from standard normal to some other normal
- * N(mean, sigma), use `RandomRDDGenerators.normalRDD(sc, n).map(v => mean + sigma * v)`.
- *
- * @param sc SparkContext used to create the RDD.
- * @param size Size of the RDD.
- * @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0).
+ * Java-friendly version of [[RandomRDDs#normalRDD]].
*/
- @Experimental
- def normalRDD(sc: SparkContext, size: Long): RDD[Double] = {
- normalRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong)
+ def normalJavaRDD(
+ jsc: JavaSparkContext,
+ size: Long,
+ numPartitions: Int,
+ seed: Long): JavaDoubleRDD = {
+ JavaDoubleRDD.fromRDD(normalRDD(jsc.sc, size, numPartitions, seed))
}
/**
- * :: Experimental ::
- * Generates an RDD comprised of i.i.d. samples from the Poisson distribution with the input mean.
- *
- * @param sc SparkContext used to create the RDD.
- * @param mean Mean, or lambda, for the Poisson distribution.
- * @param size Size of the RDD.
- * @param numPartitions Number of partitions in the RDD.
- * @param seed Seed for the RNG that generates the seed for the generator in each partition.
- * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean).
+ * [[RandomRDDs#normalJavaRDD]] with the default seed.
*/
- @Experimental
- def poissonRDD(sc: SparkContext,
- mean: Double,
- size: Long,
- numPartitions: Int,
- seed: Long): RDD[Double] = {
- val poisson = new PoissonGenerator(mean)
- randomRDD(sc, poisson, size, numPartitions, seed)
+ def normalJavaRDD(jsc: JavaSparkContext, size: Long, numPartitions: Int): JavaDoubleRDD = {
+ JavaDoubleRDD.fromRDD(normalRDD(jsc.sc, size, numPartitions))
}
/**
- * :: Experimental ::
- * Generates an RDD comprised of i.i.d. samples from the Poisson distribution with the input mean.
- *
- * @param sc SparkContext used to create the RDD.
- * @param mean Mean, or lambda, for the Poisson distribution.
- * @param size Size of the RDD.
- * @param numPartitions Number of partitions in the RDD.
- * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean).
+ * [[RandomRDDs#normalJavaRDD]] with the default number of partitions and the default seed.
*/
- @Experimental
- def poissonRDD(sc: SparkContext, mean: Double, size: Long, numPartitions: Int): RDD[Double] = {
- poissonRDD(sc, mean, size, numPartitions, Utils.random.nextLong)
+ def normalJavaRDD(jsc: JavaSparkContext, size: Long): JavaDoubleRDD = {
+ JavaDoubleRDD.fromRDD(normalRDD(jsc.sc, size))
}
/**
- * :: Experimental ::
* Generates an RDD comprised of i.i.d. samples from the Poisson distribution with the input mean.
- * sc.defaultParallelism used for the number of partitions in the RDD.
*
* @param sc SparkContext used to create the RDD.
* @param mean Mean, or lambda, for the Poisson distribution.
* @param size Size of the RDD.
+ * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`).
+ * @param seed Random seed (default: a random long integer).
* @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean).
*/
- @Experimental
- def poissonRDD(sc: SparkContext, mean: Double, size: Long): RDD[Double] = {
- poissonRDD(sc, mean, size, sc.defaultParallelism, Utils.random.nextLong)
+ def poissonRDD(
+ sc: SparkContext,
+ mean: Double,
+ size: Long,
+ numPartitions: Int = 0,
+ seed: Long = Utils.random.nextLong()): RDD[Double] = {
+ val poisson = new PoissonGenerator(mean)
+ randomRDD(sc, poisson, size, numPartitionsOrDefault(sc, numPartitions), seed)
}
/**
- * :: Experimental ::
- * Generates an RDD comprised of i.i.d. samples produced by the input DistributionGenerator.
- *
- * @param sc SparkContext used to create the RDD.
- * @param generator DistributionGenerator used to populate the RDD.
- * @param size Size of the RDD.
- * @param numPartitions Number of partitions in the RDD.
- * @param seed Seed for the RNG that generates the seed for the generator in each partition.
- * @return RDD[Double] comprised of i.i.d. samples produced by generator.
+ * Java-friendly version of [[RandomRDDs#poissonRDD]].
*/
- @Experimental
- def randomRDD[T: ClassTag](sc: SparkContext,
- generator: RandomDataGenerator[T],
+ def poissonJavaRDD(
+ jsc: JavaSparkContext,
+ mean: Double,
size: Long,
numPartitions: Int,
- seed: Long): RDD[T] = {
- new RandomRDD[T](sc, size, numPartitions, generator, seed)
+ seed: Long): JavaDoubleRDD = {
+ JavaDoubleRDD.fromRDD(poissonRDD(jsc.sc, mean, size, numPartitions, seed))
}
/**
- * :: Experimental ::
- * Generates an RDD comprised of i.i.d. samples produced by the input DistributionGenerator.
- *
- * @param sc SparkContext used to create the RDD.
- * @param generator DistributionGenerator used to populate the RDD.
- * @param size Size of the RDD.
- * @param numPartitions Number of partitions in the RDD.
- * @return RDD[Double] comprised of i.i.d. samples produced by generator.
+ * [[RandomRDDs#poissonJavaRDD]] with the default seed.
*/
- @Experimental
- def randomRDD[T: ClassTag](sc: SparkContext,
- generator: RandomDataGenerator[T],
+ def poissonJavaRDD(
+ jsc: JavaSparkContext,
+ mean: Double,
size: Long,
- numPartitions: Int): RDD[T] = {
- randomRDD[T](sc, generator, size, numPartitions, Utils.random.nextLong)
+ numPartitions: Int): JavaDoubleRDD = {
+ JavaDoubleRDD.fromRDD(poissonRDD(jsc.sc, mean, size, numPartitions))
}
/**
- * :: Experimental ::
- * Generates an RDD comprised of i.i.d. samples produced by the input DistributionGenerator.
- * sc.defaultParallelism used for the number of partitions in the RDD.
+ * [[RandomRDDs#poissonJavaRDD]] with the default number of partitions and the default seed.
+ */
+ def poissonJavaRDD(jsc: JavaSparkContext, mean: Double, size: Long): JavaDoubleRDD = {
+ JavaDoubleRDD.fromRDD(poissonRDD(jsc.sc, mean, size))
+ }
+
+ /**
+ * :: DeveloperApi ::
+ * Generates an RDD comprised of i.i.d. samples produced by the input RandomDataGenerator.
*
* @param sc SparkContext used to create the RDD.
- * @param generator DistributionGenerator used to populate the RDD.
+ * @param generator RandomDataGenerator used to populate the RDD.
* @param size Size of the RDD.
+ * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`).
+ * @param seed Random seed (default: a random long integer).
* @return RDD[Double] comprised of i.i.d. samples produced by generator.
*/
- @Experimental
- def randomRDD[T: ClassTag](sc: SparkContext,
+ @DeveloperApi
+ def randomRDD[T: ClassTag](
+ sc: SparkContext,
generator: RandomDataGenerator[T],
- size: Long): RDD[T] = {
- randomRDD[T](sc, generator, size, sc.defaultParallelism, Utils.random.nextLong)
+ size: Long,
+ numPartitions: Int = 0,
+ seed: Long = Utils.random.nextLong()): RDD[T] = {
+ new RandomRDD[T](sc, size, numPartitionsOrDefault(sc, numPartitions), generator, seed)
}
// TODO Generate RDD[Vector] from multivariate distributions.
/**
- * :: Experimental ::
* Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
- * uniform distribution on [0.0 1.0].
+ * uniform distribution on `U(0.0, 1.0)`.
*
* @param sc SparkContext used to create the RDD.
* @param numRows Number of Vectors in the RDD.
* @param numCols Number of elements in each Vector.
* @param numPartitions Number of partitions in the RDD.
* @param seed Seed for the RNG that generates the seed for the generator in each partition.
- * @return RDD[Vector] with vectors containing i.i.d samples ~ U[0.0, 1.0].
+ * @return RDD[Vector] with vectors containing i.i.d samples ~ `U(0.0, 1.0)`.
*/
- @Experimental
- def uniformVectorRDD(sc: SparkContext,
+ def uniformVectorRDD(
+ sc: SparkContext,
numRows: Long,
numCols: Int,
- numPartitions: Int,
- seed: Long): RDD[Vector] = {
+ numPartitions: Int = 0,
+ seed: Long = Utils.random.nextLong()): RDD[Vector] = {
val uniform = new UniformGenerator()
- randomVectorRDD(sc, uniform, numRows, numCols, numPartitions, seed)
+ randomVectorRDD(sc, uniform, numRows, numCols, numPartitionsOrDefault(sc, numPartitions), seed)
}
/**
- * :: Experimental ::
- * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
- * uniform distribution on [0.0 1.0].
- *
- * @param sc SparkContext used to create the RDD.
- * @param numRows Number of Vectors in the RDD.
- * @param numCols Number of elements in each Vector.
- * @param numPartitions Number of partitions in the RDD.
- * @return RDD[Vector] with vectors containing i.i.d. samples ~ U[0.0, 1.0].
+ * Java-friendly version of [[RandomRDDs#uniformVectorRDD]].
*/
- @Experimental
- def uniformVectorRDD(sc: SparkContext,
+ def uniformJavaVectorRDD(
+ jsc: JavaSparkContext,
numRows: Long,
numCols: Int,
- numPartitions: Int): RDD[Vector] = {
- uniformVectorRDD(sc, numRows, numCols, numPartitions, Utils.random.nextLong)
+ numPartitions: Int,
+ seed: Long): JavaRDD[Vector] = {
+ uniformVectorRDD(jsc.sc, numRows, numCols, numPartitions, seed).toJavaRDD()
}
/**
- * :: Experimental ::
- * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
- * uniform distribution on [0.0 1.0].
- * sc.defaultParallelism used for the number of partitions in the RDD.
- *
- * @param sc SparkContext used to create the RDD.
- * @param numRows Number of Vectors in the RDD.
- * @param numCols Number of elements in each Vector.
- * @return RDD[Vector] with vectors containing i.i.d. samples ~ U[0.0, 1.0].
+ * [[RandomRDDs#uniformJavaVectorRDD]] with the default seed.
*/
- @Experimental
- def uniformVectorRDD(sc: SparkContext, numRows: Long, numCols: Int): RDD[Vector] = {
- uniformVectorRDD(sc, numRows, numCols, sc.defaultParallelism, Utils.random.nextLong)
+ def uniformJavaVectorRDD(
+ jsc: JavaSparkContext,
+ numRows: Long,
+ numCols: Int,
+ numPartitions: Int): JavaRDD[Vector] = {
+ uniformVectorRDD(jsc.sc, numRows, numCols, numPartitions).toJavaRDD()
+ }
+
+ /**
+ * [[RandomRDDs#uniformJavaVectorRDD]] with the default number of partitions and the default seed.
+ */
+ def uniformJavaVectorRDD(
+ jsc: JavaSparkContext,
+ numRows: Long,
+ numCols: Int): JavaRDD[Vector] = {
+ uniformVectorRDD(jsc.sc, numRows, numCols).toJavaRDD()
}
/**
- * :: Experimental ::
* Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
* standard normal distribution.
*
* @param sc SparkContext used to create the RDD.
* @param numRows Number of Vectors in the RDD.
* @param numCols Number of elements in each Vector.
- * @param numPartitions Number of partitions in the RDD.
- * @param seed Seed for the RNG that generates the seed for the generator in each partition.
- * @return RDD[Vector] with vectors containing i.i.d. samples ~ N(0.0, 1.0).
+ * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`).
+ * @param seed Random seed (default: a random long integer).
+ * @return RDD[Vector] with vectors containing i.i.d. samples ~ `N(0.0, 1.0)`.
+ */
+ def normalVectorRDD(
+ sc: SparkContext,
+ numRows: Long,
+ numCols: Int,
+ numPartitions: Int = 0,
+ seed: Long = Utils.random.nextLong()): RDD[Vector] = {
+ val normal = new StandardNormalGenerator()
+ randomVectorRDD(sc, normal, numRows, numCols, numPartitionsOrDefault(sc, numPartitions), seed)
+ }
+
+ /**
+ * Java-friendly version of [[RandomRDDs#normalVectorRDD]].
*/
- @Experimental
- def normalVectorRDD(sc: SparkContext,
+ def normalJavaVectorRDD(
+ jsc: JavaSparkContext,
numRows: Long,
numCols: Int,
numPartitions: Int,
- seed: Long): RDD[Vector] = {
- val uniform = new StandardNormalGenerator()
- randomVectorRDD(sc, uniform, numRows, numCols, numPartitions, seed)
+ seed: Long): JavaRDD[Vector] = {
+ normalVectorRDD(jsc.sc, numRows, numCols, numPartitions, seed).toJavaRDD()
}
/**
- * :: Experimental ::
- * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
- * standard normal distribution.
- *
- * @param sc SparkContext used to create the RDD.
- * @param numRows Number of Vectors in the RDD.
- * @param numCols Number of elements in each Vector.
- * @param numPartitions Number of partitions in the RDD.
- * @return RDD[Vector] with vectors containing i.i.d. samples ~ N(0.0, 1.0).
+ * [[RandomRDDs#normalJavaVectorRDD]] with the default seed.
*/
- @Experimental
- def normalVectorRDD(sc: SparkContext,
+ def normalJavaVectorRDD(
+ jsc: JavaSparkContext,
numRows: Long,
numCols: Int,
- numPartitions: Int): RDD[Vector] = {
- normalVectorRDD(sc, numRows, numCols, numPartitions, Utils.random.nextLong)
+ numPartitions: Int): JavaRDD[Vector] = {
+ normalVectorRDD(jsc.sc, numRows, numCols, numPartitions).toJavaRDD()
}
/**
- * :: Experimental ::
- * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
- * standard normal distribution.
- * sc.defaultParallelism used for the number of partitions in the RDD.
- *
- * @param sc SparkContext used to create the RDD.
- * @param numRows Number of Vectors in the RDD.
- * @param numCols Number of elements in each Vector.
- * @return RDD[Vector] with vectors containing i.i.d. samples ~ N(0.0, 1.0).
+ * [[RandomRDDs#normalJavaVectorRDD]] with the default number of partitions and the default seed.
*/
- @Experimental
- def normalVectorRDD(sc: SparkContext, numRows: Long, numCols: Int): RDD[Vector] = {
- normalVectorRDD(sc, numRows, numCols, sc.defaultParallelism, Utils.random.nextLong)
+ def normalJavaVectorRDD(
+ jsc: JavaSparkContext,
+ numRows: Long,
+ numCols: Int): JavaRDD[Vector] = {
+ normalVectorRDD(jsc.sc, numRows, numCols).toJavaRDD()
}
/**
- * :: Experimental ::
* Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
* Poisson distribution with the input mean.
*
@@ -370,124 +315,85 @@ object RandomRDDs {
* @param mean Mean, or lambda, for the Poisson distribution.
* @param numRows Number of Vectors in the RDD.
* @param numCols Number of elements in each Vector.
- * @param numPartitions Number of partitions in the RDD.
- * @param seed Seed for the RNG that generates the seed for the generator in each partition.
+ * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`)
+ * @param seed Random seed (default: a random long integer).
* @return RDD[Vector] with vectors containing i.i.d. samples ~ Pois(mean).
*/
- @Experimental
- def poissonVectorRDD(sc: SparkContext,
+ def poissonVectorRDD(
+ sc: SparkContext,
mean: Double,
numRows: Long,
numCols: Int,
- numPartitions: Int,
- seed: Long): RDD[Vector] = {
+ numPartitions: Int = 0,
+ seed: Long = Utils.random.nextLong()): RDD[Vector] = {
val poisson = new PoissonGenerator(mean)
- randomVectorRDD(sc, poisson, numRows, numCols, numPartitions, seed)
+ randomVectorRDD(sc, poisson, numRows, numCols, numPartitionsOrDefault(sc, numPartitions), seed)
}
/**
- * :: Experimental ::
- * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
- * Poisson distribution with the input mean.
- *
- * @param sc SparkContext used to create the RDD.
- * @param mean Mean, or lambda, for the Poisson distribution.
- * @param numRows Number of Vectors in the RDD.
- * @param numCols Number of elements in each Vector.
- * @param numPartitions Number of partitions in the RDD.
- * @return RDD[Vector] with vectors containing i.i.d. samples ~ Pois(mean).
+ * Java-friendly version of [[RandomRDDs#poissonVectorRDD]].
*/
- @Experimental
- def poissonVectorRDD(sc: SparkContext,
+ def poissonJavaVectorRDD(
+ jsc: JavaSparkContext,
mean: Double,
numRows: Long,
numCols: Int,
- numPartitions: Int): RDD[Vector] = {
- poissonVectorRDD(sc, mean, numRows, numCols, numPartitions, Utils.random.nextLong)
+ numPartitions: Int,
+ seed: Long): JavaRDD[Vector] = {
+ poissonVectorRDD(jsc.sc, mean, numRows, numCols, numPartitions, seed).toJavaRDD()
}
/**
- * :: Experimental ::
- * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
- * Poisson distribution with the input mean.
- * sc.defaultParallelism used for the number of partitions in the RDD.
- *
- * @param sc SparkContext used to create the RDD.
- * @param mean Mean, or lambda, for the Poisson distribution.
- * @param numRows Number of Vectors in the RDD.
- * @param numCols Number of elements in each Vector.
- * @return RDD[Vector] with vectors containing i.i.d. samples ~ Pois(mean).
+ * [[RandomRDDs#poissonJavaVectorRDD]] with the default seed.
*/
- @Experimental
- def poissonVectorRDD(sc: SparkContext,
+ def poissonJavaVectorRDD(
+ jsc: JavaSparkContext,
mean: Double,
numRows: Long,
- numCols: Int): RDD[Vector] = {
- poissonVectorRDD(sc, mean, numRows, numCols, sc.defaultParallelism, Utils.random.nextLong)
+ numCols: Int,
+ numPartitions: Int): JavaRDD[Vector] = {
+ poissonVectorRDD(jsc.sc, mean, numRows, numCols, numPartitions).toJavaRDD()
}
/**
- * :: Experimental ::
- * Generates an RDD[Vector] with vectors containing i.i.d. samples produced by the
- * input DistributionGenerator.
- *
- * @param sc SparkContext used to create the RDD.
- * @param generator DistributionGenerator used to populate the RDD.
- * @param numRows Number of Vectors in the RDD.
- * @param numCols Number of elements in each Vector.
- * @param numPartitions Number of partitions in the RDD.
- * @param seed Seed for the RNG that generates the seed for the generator in each partition.
- * @return RDD[Vector] with vectors containing i.i.d. samples produced by generator.
+ * [[RandomRDDs#poissonJavaVectorRDD]] with the default number of partitions and the default seed.
*/
- @Experimental
- def randomVectorRDD(sc: SparkContext,
- generator: RandomDataGenerator[Double],
+ def poissonJavaVectorRDD(
+ jsc: JavaSparkContext,
+ mean: Double,
numRows: Long,
- numCols: Int,
- numPartitions: Int,
- seed: Long): RDD[Vector] = {
- new RandomVectorRDD(sc, numRows, numCols, numPartitions, generator, seed)
+ numCols: Int): JavaRDD[Vector] = {
+ poissonVectorRDD(jsc.sc, mean, numRows, numCols).toJavaRDD()
}
/**
- * :: Experimental ::
+ * :: DeveloperApi ::
* Generates an RDD[Vector] with vectors containing i.i.d. samples produced by the
- * input DistributionGenerator.
+ * input RandomDataGenerator.
*
* @param sc SparkContext used to create the RDD.
- * @param generator DistributionGenerator used to populate the RDD.
+ * @param generator RandomDataGenerator used to populate the RDD.
* @param numRows Number of Vectors in the RDD.
* @param numCols Number of elements in each Vector.
- * @param numPartitions Number of partitions in the RDD.
+ * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`).
+ * @param seed Random seed (default: a random long integer).
* @return RDD[Vector] with vectors containing i.i.d. samples produced by generator.
*/
- @Experimental
+ @DeveloperApi
def randomVectorRDD(sc: SparkContext,
generator: RandomDataGenerator[Double],
numRows: Long,
numCols: Int,
- numPartitions: Int): RDD[Vector] = {
- randomVectorRDD(sc, generator, numRows, numCols, numPartitions, Utils.random.nextLong)
+ numPartitions: Int = 0,
+ seed: Long = Utils.random.nextLong()): RDD[Vector] = {
+ new RandomVectorRDD(
+ sc, numRows, numCols, numPartitionsOrDefault(sc, numPartitions), generator, seed)
}
/**
- * :: Experimental ::
- * Generates an RDD[Vector] with vectors containing i.i.d. samples produced by the
- * input DistributionGenerator.
- * sc.defaultParallelism used for the number of partitions in the RDD.
- *
- * @param sc SparkContext used to create the RDD.
- * @param generator DistributionGenerator used to populate the RDD.
- * @param numRows Number of Vectors in the RDD.
- * @param numCols Number of elements in each Vector.
- * @return RDD[Vector] with vectors containing i.i.d. samples produced by generator.
+ * Returns `numPartitions` if it is positive, or `sc.defaultParallelism` otherwise.
*/
- @Experimental
- def randomVectorRDD(sc: SparkContext,
- generator: RandomDataGenerator[Double],
- numRows: Long,
- numCols: Int): RDD[Vector] = {
- randomVectorRDD(sc, generator, numRows, numCols,
- sc.defaultParallelism, Utils.random.nextLong)
+ private def numPartitionsOrDefault(sc: SparkContext, numPartitions: Int): Int = {
+ if (numPartitions > 0) numPartitions else sc.defaultMinPartitions
}
}
diff --git a/mllib/src/test/java/org/apache/spark/mllib/random/JavaRandomRDDsSuite.java b/mllib/src/test/java/org/apache/spark/mllib/random/JavaRandomRDDsSuite.java
new file mode 100644
index 0000000000..a725736ca1
--- /dev/null
+++ b/mllib/src/test/java/org/apache/spark/mllib/random/JavaRandomRDDsSuite.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.random;
+
+import com.google.common.collect.Lists;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.Assert;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.api.java.JavaDoubleRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.mllib.linalg.Vector;
+import static org.apache.spark.mllib.random.RandomRDDs.*;
+
+public class JavaRandomRDDsSuite {
+ private transient JavaSparkContext sc;
+
+ @Before
+ public void setUp() {
+ sc = new JavaSparkContext("local", "JavaRandomRDDsSuite");
+ }
+
+ @After
+ public void tearDown() {
+ sc.stop();
+ sc = null;
+ }
+
+ @Test
+ public void testUniformRDD() {
+ long m = 1000L;
+ int p = 2;
+ long seed = 1L;
+ JavaDoubleRDD rdd1 = uniformJavaRDD(sc, m);
+ JavaDoubleRDD rdd2 = uniformJavaRDD(sc, m, p);
+ JavaDoubleRDD rdd3 = uniformJavaRDD(sc, m, p, seed);
+ for (JavaDoubleRDD rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) {
+ Assert.assertEquals(m, rdd.count());
+ }
+ }
+
+ @Test
+ public void testNormalRDD() {
+ long m = 1000L;
+ int p = 2;
+ long seed = 1L;
+ JavaDoubleRDD rdd1 = normalJavaRDD(sc, m);
+ JavaDoubleRDD rdd2 = normalJavaRDD(sc, m, p);
+ JavaDoubleRDD rdd3 = normalJavaRDD(sc, m, p, seed);
+ for (JavaDoubleRDD rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) {
+ Assert.assertEquals(m, rdd.count());
+ }
+ }
+
+ @Test
+ public void testPoissonRDD() {
+ double mean = 2.0;
+ long m = 1000L;
+ int p = 2;
+ long seed = 1L;
+ JavaDoubleRDD rdd1 = poissonJavaRDD(sc, mean, m);
+ JavaDoubleRDD rdd2 = poissonJavaRDD(sc, mean, m, p);
+ JavaDoubleRDD rdd3 = poissonJavaRDD(sc, mean, m, p, seed);
+ for (JavaDoubleRDD rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) {
+ Assert.assertEquals(m, rdd.count());
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testUniformVectorRDD() {
+ long m = 100L;
+ int n = 10;
+ int p = 2;
+ long seed = 1L;
+ JavaRDD<Vector> rdd1 = uniformJavaVectorRDD(sc, m, n);
+ JavaRDD<Vector> rdd2 = uniformJavaVectorRDD(sc, m, n, p);
+ JavaRDD<Vector> rdd3 = uniformJavaVectorRDD(sc, m, n, p, seed);
+ for (JavaRDD<Vector> rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) {
+ Assert.assertEquals(m, rdd.count());
+ Assert.assertEquals(n, rdd.first().size());
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testNormalVectorRDD() {
+ long m = 100L;
+ int n = 10;
+ int p = 2;
+ long seed = 1L;
+ JavaRDD<Vector> rdd1 = normalJavaVectorRDD(sc, m, n);
+ JavaRDD<Vector> rdd2 = normalJavaVectorRDD(sc, m, n, p);
+ JavaRDD<Vector> rdd3 = normalJavaVectorRDD(sc, m, n, p, seed);
+ for (JavaRDD<Vector> rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) {
+ Assert.assertEquals(m, rdd.count());
+ Assert.assertEquals(n, rdd.first().size());
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testPoissonVectorRDD() {
+ double mean = 2.0;
+ long m = 100L;
+ int n = 10;
+ int p = 2;
+ long seed = 1L;
+ JavaRDD<Vector> rdd1 = poissonJavaVectorRDD(sc, mean, m, n);
+ JavaRDD<Vector> rdd2 = poissonJavaVectorRDD(sc, mean, m, n, p);
+ JavaRDD<Vector> rdd3 = poissonJavaVectorRDD(sc, mean, m, n, p, seed);
+ for (JavaRDD<Vector> rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) {
+ Assert.assertEquals(m, rdd.count());
+ Assert.assertEquals(n, rdd.first().size());
+ }
+ }
+}