aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDoris Xin <doris.s.xin@gmail.com>2014-07-27 16:16:39 -0700
committerXiangrui Meng <meng@databricks.com>2014-07-27 16:16:39 -0700
commit81fcdd22c8ef52889ed51b3ec5c2747708505fc2 (patch)
tree503582e9268d5abccc95b186b5fd59292094524b
parentecf30ee7e78ea59c462c54db0fde5328f997466c (diff)
downloadspark-81fcdd22c8ef52889ed51b3ec5c2747708505fc2.tar.gz
spark-81fcdd22c8ef52889ed51b3ec5c2747708505fc2.tar.bz2
spark-81fcdd22c8ef52889ed51b3ec5c2747708505fc2.zip
[SPARK-2514] [mllib] Random RDD generator
Utilities for generating random RDDs. RandomRDD and RandomVectorRDD are created instead of using `sc.parallelize(range:Range)` because `Range` objects in Scala can only have `size <= Int.MaxValue`. The object `RandomRDDGenerators` can be transformed into a generator class to reduce the number of auxiliary methods for optional arguments. Author: Doris Xin <doris.s.xin@gmail.com> Closes #1520 from dorx/randomRDD and squashes the following commits: 01121ac [Doris Xin] reviewer comments 6bf27d8 [Doris Xin] Merge branch 'master' into randomRDD a8ea92d [Doris Xin] Reviewer comments 063ea0b [Doris Xin] Merge branch 'master' into randomRDD aec68eb [Doris Xin] newline bc90234 [Doris Xin] units passed. d56cacb [Doris Xin] impl with RandomRDD 92d6f1c [Doris Xin] solution for Cloneable df5bcff [Doris Xin] Merge branch 'generator' into randomRDD f46d928 [Doris Xin] WIP 49ed20d [Doris Xin] alternative poisson distribution generator 7cb0e40 [Doris Xin] fix for data inconsistency 8881444 [Doris Xin] RandomRDDGenerator: initial design
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala101
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala473
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala118
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/random/DistributionGeneratorSuite.scala90
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala158
5 files changed, 940 insertions, 0 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala
new file mode 100644
index 0000000000..7ecb409c4a
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.random
+
+import cern.jet.random.Poisson
+import cern.jet.random.engine.DRand
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom}
+
+/**
+ * :: Experimental ::
+ * Trait for random number generators that generate i.i.d. values from a distribution.
+ */
+@Experimental
+trait DistributionGenerator extends Pseudorandom with Serializable {
+
+ /**
+ * Returns an i.i.d. sample as a Double from an underlying distribution.
+ */
+ def nextValue(): Double
+
+ /**
+ * Returns a copy of the DistributionGenerator with a new instance of the rng object used in the
+ * class when applicable for non-locking concurrent usage.
+ */
+ def copy(): DistributionGenerator
+}
+
+/**
+ * :: Experimental ::
+ * Generates i.i.d. samples from U[0.0, 1.0]
+ */
+@Experimental
+class UniformGenerator extends DistributionGenerator {
+
+ // XORShiftRandom for better performance. Thread safety isn't necessary here.
+ private val random = new XORShiftRandom()
+
+ override def nextValue(): Double = {
+ random.nextDouble()
+ }
+
+ override def setSeed(seed: Long) = random.setSeed(seed)
+
+ override def copy(): UniformGenerator = new UniformGenerator()
+}
+
+/**
+ * :: Experimental ::
+ * Generates i.i.d. samples from the standard normal distribution.
+ */
+@Experimental
+class StandardNormalGenerator extends DistributionGenerator {
+
+ // XORShiftRandom for better performance. Thread safety isn't necessary here.
+ private val random = new XORShiftRandom()
+
+ override def nextValue(): Double = {
+ random.nextGaussian()
+ }
+
+ override def setSeed(seed: Long) = random.setSeed(seed)
+
+ override def copy(): StandardNormalGenerator = new StandardNormalGenerator()
+}
+
+/**
+ * :: Experimental ::
+ * Generates i.i.d. samples from the Poisson distribution with the given mean.
+ *
+ * @param mean mean for the Poisson distribution.
+ */
+@Experimental
+class PoissonGenerator(val mean: Double) extends DistributionGenerator {
+
+ private var rng = new Poisson(mean, new DRand)
+
+ override def nextValue(): Double = rng.nextDouble()
+
+ override def setSeed(seed: Long) {
+ rng = new Poisson(mean, new DRand(seed.toInt))
+ }
+
+ override def copy(): PoissonGenerator = new PoissonGenerator(mean)
+}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala
new file mode 100644
index 0000000000..d7ee2d3f46
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.random
+
+import org.apache.spark.SparkContext
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.mllib.linalg.Vector
+import org.apache.spark.mllib.rdd.{RandomVectorRDD, RandomRDD}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
+
+/**
+ * :: Experimental ::
+ * Generator methods for creating RDDs comprised of i.i.d samples from some distribution.
+ */
+@Experimental
+object RandomRDDGenerators {
+
+ /**
+ * :: Experimental ::
+ * Generates an RDD comprised of i.i.d samples from the uniform distribution on [0.0, 1.0].
+ *
+ * @param sc SparkContext used to create the RDD.
+ * @param size Size of the RDD.
+ * @param numPartitions Number of partitions in the RDD.
+ * @param seed Seed for the RNG that generates the seed for the generator in each partition.
+ * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0].
+ */
+ @Experimental
+ def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = {
+ val uniform = new UniformGenerator()
+ randomRDD(sc, uniform, size, numPartitions, seed)
+ }
+
+ /**
+ * :: Experimental ::
+ * Generates an RDD comprised of i.i.d samples from the uniform distribution on [0.0, 1.0].
+ *
+ * @param sc SparkContext used to create the RDD.
+ * @param size Size of the RDD.
+ * @param numPartitions Number of partitions in the RDD.
+ * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0].
+ */
+ @Experimental
+ def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = {
+ uniformRDD(sc, size, numPartitions, Utils.random.nextLong)
+ }
+
+ /**
+ * :: Experimental ::
+ * Generates an RDD comprised of i.i.d samples from the uniform distribution on [0.0, 1.0].
+ * sc.defaultParallelism used for the number of partitions in the RDD.
+ *
+ * @param sc SparkContext used to create the RDD.
+ * @param size Size of the RDD.
+ * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0].
+ */
+ @Experimental
+ def uniformRDD(sc: SparkContext, size: Long): RDD[Double] = {
+ uniformRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong)
+ }
+
+ /**
+ * :: Experimental ::
+ * Generates an RDD comprised of i.i.d samples from the standard normal distribution.
+ *
+ * @param sc SparkContext used to create the RDD.
+ * @param size Size of the RDD.
+ * @param numPartitions Number of partitions in the RDD.
+ * @param seed Seed for the RNG that generates the seed for the generator in each partition.
+ * @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0).
+ */
+ @Experimental
+ def normalRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = {
+ val normal = new StandardNormalGenerator()
+ randomRDD(sc, normal, size, numPartitions, seed)
+ }
+
+ /**
+ * :: Experimental ::
+ * Generates an RDD comprised of i.i.d samples from the standard normal distribution.
+ *
+ * @param sc SparkContext used to create the RDD.
+ * @param size Size of the RDD.
+ * @param numPartitions Number of partitions in the RDD.
+ * @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0).
+ */
+ @Experimental
+ def normalRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = {
+ normalRDD(sc, size, numPartitions, Utils.random.nextLong)
+ }
+
+ /**
+ * :: Experimental ::
+ * Generates an RDD comprised of i.i.d samples from the standard normal distribution.
+ * sc.defaultParallelism used for the number of partitions in the RDD.
+ *
+ * @param sc SparkContext used to create the RDD.
+ * @param size Size of the RDD.
+ * @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0).
+ */
+ @Experimental
+ def normalRDD(sc: SparkContext, size: Long): RDD[Double] = {
+ normalRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong)
+ }
+
+ /**
+ * :: Experimental ::
+ * Generates an RDD comprised of i.i.d samples from the Poisson distribution with the input mean.
+ *
+ * @param sc SparkContext used to create the RDD.
+ * @param mean Mean, or lambda, for the Poisson distribution.
+ * @param size Size of the RDD.
+ * @param numPartitions Number of partitions in the RDD.
+ * @param seed Seed for the RNG that generates the seed for the generator in each partition.
+ * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean).
+ */
+ @Experimental
+ def poissonRDD(sc: SparkContext,
+ mean: Double,
+ size: Long,
+ numPartitions: Int,
+ seed: Long): RDD[Double] = {
+ val poisson = new PoissonGenerator(mean)
+ randomRDD(sc, poisson, size, numPartitions, seed)
+ }
+
+ /**
+ * :: Experimental ::
+ * Generates an RDD comprised of i.i.d samples from the Poisson distribution with the input mean.
+ *
+ * @param sc SparkContext used to create the RDD.
+ * @param mean Mean, or lambda, for the Poisson distribution.
+ * @param size Size of the RDD.
+ * @param numPartitions Number of partitions in the RDD.
+ * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean).
+ */
+ @Experimental
+ def poissonRDD(sc: SparkContext, mean: Double, size: Long, numPartitions: Int): RDD[Double] = {
+ poissonRDD(sc, mean, size, numPartitions, Utils.random.nextLong)
+ }
+
+ /**
+ * :: Experimental ::
+ * Generates an RDD comprised of i.i.d samples from the Poisson distribution with the input mean.
+ * sc.defaultParallelism used for the number of partitions in the RDD.
+ *
+ * @param sc SparkContext used to create the RDD.
+ * @param mean Mean, or lambda, for the Poisson distribution.
+ * @param size Size of the RDD.
+ * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean).
+ */
+ @Experimental
+ def poissonRDD(sc: SparkContext, mean: Double, size: Long): RDD[Double] = {
+ poissonRDD(sc, mean, size, sc.defaultParallelism, Utils.random.nextLong)
+ }
+
+ /**
+ * :: Experimental ::
+ * Generates an RDD comprised of i.i.d samples produced by the input DistributionGenerator.
+ *
+ * @param sc SparkContext used to create the RDD.
+ * @param generator DistributionGenerator used to populate the RDD.
+ * @param size Size of the RDD.
+ * @param numPartitions Number of partitions in the RDD.
+ * @param seed Seed for the RNG that generates the seed for the generator in each partition.
+ * @return RDD[Double] comprised of i.i.d. samples produced by generator.
+ */
+ @Experimental
+ def randomRDD(sc: SparkContext,
+ generator: DistributionGenerator,
+ size: Long,
+ numPartitions: Int,
+ seed: Long): RDD[Double] = {
+ new RandomRDD(sc, size, numPartitions, generator, seed)
+ }
+
+ /**
+ * :: Experimental ::
+ * Generates an RDD comprised of i.i.d samples produced by the input DistributionGenerator.
+ *
+ * @param sc SparkContext used to create the RDD.
+ * @param generator DistributionGenerator used to populate the RDD.
+ * @param size Size of the RDD.
+ * @param numPartitions Number of partitions in the RDD.
+ * @return RDD[Double] comprised of i.i.d. samples produced by generator.
+ */
+ @Experimental
+ def randomRDD(sc: SparkContext,
+ generator: DistributionGenerator,
+ size: Long,
+ numPartitions: Int): RDD[Double] = {
+ randomRDD(sc, generator, size, numPartitions, Utils.random.nextLong)
+ }
+
+ /**
+ * :: Experimental ::
+ * Generates an RDD comprised of i.i.d samples produced by the input DistributionGenerator.
+ * sc.defaultParallelism used for the number of partitions in the RDD.
+ *
+ * @param sc SparkContext used to create the RDD.
+ * @param generator DistributionGenerator used to populate the RDD.
+ * @param size Size of the RDD.
+ * @return RDD[Double] comprised of i.i.d. samples produced by generator.
+ */
+ @Experimental
+ def randomRDD(sc: SparkContext,
+ generator: DistributionGenerator,
+ size: Long): RDD[Double] = {
+ randomRDD(sc, generator, size, sc.defaultParallelism, Utils.random.nextLong)
+ }
+
+ // TODO Generate RDD[Vector] from multivariate distributions.
+
+ /**
+ * :: Experimental ::
+ * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the
+ * uniform distribution on [0.0 1.0].
+ *
+ * @param sc SparkContext used to create the RDD.
+ * @param numRows Number of Vectors in the RDD.
+ * @param numCols Number of elements in each Vector.
+ * @param numPartitions Number of partitions in the RDD.
+ * @param seed Seed for the RNG that generates the seed for the generator in each partition.
+ * @return RDD[Vector] with vectors containing i.i.d samples ~ U[0.0, 1.0].
+ */
+ @Experimental
+ def uniformVectorRDD(sc: SparkContext,
+ numRows: Long,
+ numCols: Int,
+ numPartitions: Int,
+ seed: Long): RDD[Vector] = {
+ val uniform = new UniformGenerator()
+ randomVectorRDD(sc, uniform, numRows, numCols, numPartitions, seed)
+ }
+
+ /**
+ * :: Experimental ::
+ * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the
+ * uniform distribution on [0.0 1.0].
+ *
+ * @param sc SparkContext used to create the RDD.
+ * @param numRows Number of Vectors in the RDD.
+ * @param numCols Number of elements in each Vector.
+ * @param numPartitions Number of partitions in the RDD.
+ * @return RDD[Vector] with vectors containing i.i.d samples ~ U[0.0, 1.0].
+ */
+ @Experimental
+ def uniformVectorRDD(sc: SparkContext,
+ numRows: Long,
+ numCols: Int,
+ numPartitions: Int): RDD[Vector] = {
+ uniformVectorRDD(sc, numRows, numCols, numPartitions, Utils.random.nextLong)
+ }
+
+ /**
+ * :: Experimental ::
+ * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the
+ * uniform distribution on [0.0 1.0].
+ * sc.defaultParallelism used for the number of partitions in the RDD.
+ *
+ * @param sc SparkContext used to create the RDD.
+ * @param numRows Number of Vectors in the RDD.
+ * @param numCols Number of elements in each Vector.
+ * @return RDD[Vector] with vectors containing i.i.d samples ~ U[0.0, 1.0].
+ */
+ @Experimental
+ def uniformVectorRDD(sc: SparkContext, numRows: Long, numCols: Int): RDD[Vector] = {
+ uniformVectorRDD(sc, numRows, numCols, sc.defaultParallelism, Utils.random.nextLong)
+ }
+
+ /**
+ * :: Experimental ::
+ * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the
+ * standard normal distribution.
+ *
+ * @param sc SparkContext used to create the RDD.
+ * @param numRows Number of Vectors in the RDD.
+ * @param numCols Number of elements in each Vector.
+ * @param numPartitions Number of partitions in the RDD.
+ * @param seed Seed for the RNG that generates the seed for the generator in each partition.
+ * @return RDD[Vector] with vectors containing i.i.d samples ~ N(0.0, 1.0).
+ */
+ @Experimental
+ def normalVectorRDD(sc: SparkContext,
+ numRows: Long,
+ numCols: Int,
+ numPartitions: Int,
+ seed: Long): RDD[Vector] = {
+ val uniform = new StandardNormalGenerator()
+ randomVectorRDD(sc, uniform, numRows, numCols, numPartitions, seed)
+ }
+
+ /**
+ * :: Experimental ::
+ * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the
+ * standard normal distribution.
+ *
+ * @param sc SparkContext used to create the RDD.
+ * @param numRows Number of Vectors in the RDD.
+ * @param numCols Number of elements in each Vector.
+ * @param numPartitions Number of partitions in the RDD.
+ * @return RDD[Vector] with vectors containing i.i.d samples ~ N(0.0, 1.0).
+ */
+ @Experimental
+ def normalVectorRDD(sc: SparkContext,
+ numRows: Long,
+ numCols: Int,
+ numPartitions: Int): RDD[Vector] = {
+ normalVectorRDD(sc, numRows, numCols, numPartitions, Utils.random.nextLong)
+ }
+
+ /**
+ * :: Experimental ::
+ * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the
+ * standard normal distribution.
+ * sc.defaultParallelism used for the number of partitions in the RDD.
+ *
+ * @param sc SparkContext used to create the RDD.
+ * @param numRows Number of Vectors in the RDD.
+ * @param numCols Number of elements in each Vector.
+ * @return RDD[Vector] with vectors containing i.i.d samples ~ N(0.0, 1.0).
+ */
+ @Experimental
+ def normalVectorRDD(sc: SparkContext, numRows: Long, numCols: Int): RDD[Vector] = {
+ normalVectorRDD(sc, numRows, numCols, sc.defaultParallelism, Utils.random.nextLong)
+ }
+
+ /**
+ * :: Experimental ::
+ * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the
+ * Poisson distribution with the input mean.
+ *
+ * @param sc SparkContext used to create the RDD.
+ * @param mean Mean, or lambda, for the Poisson distribution.
+ * @param numRows Number of Vectors in the RDD.
+ * @param numCols Number of elements in each Vector.
+ * @param numPartitions Number of partitions in the RDD.
+ * @param seed Seed for the RNG that generates the seed for the generator in each partition.
+ * @return RDD[Vector] with vectors containing i.i.d samples ~ Pois(mean).
+ */
+ @Experimental
+ def poissonVectorRDD(sc: SparkContext,
+ mean: Double,
+ numRows: Long,
+ numCols: Int,
+ numPartitions: Int,
+ seed: Long): RDD[Vector] = {
+ val poisson = new PoissonGenerator(mean)
+ randomVectorRDD(sc, poisson, numRows, numCols, numPartitions, seed)
+ }
+
+ /**
+ * :: Experimental ::
+ * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the
+ * Poisson distribution with the input mean.
+ *
+ * @param sc SparkContext used to create the RDD.
+ * @param mean Mean, or lambda, for the Poisson distribution.
+ * @param numRows Number of Vectors in the RDD.
+ * @param numCols Number of elements in each Vector.
+ * @param numPartitions Number of partitions in the RDD.
+ * @return RDD[Vector] with vectors containing i.i.d samples ~ Pois(mean).
+ */
+ @Experimental
+ def poissonVectorRDD(sc: SparkContext,
+ mean: Double,
+ numRows: Long,
+ numCols: Int,
+ numPartitions: Int): RDD[Vector] = {
+ poissonVectorRDD(sc, mean, numRows, numCols, numPartitions, Utils.random.nextLong)
+ }
+
+ /**
+ * :: Experimental ::
+ * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the
+ * Poisson distribution with the input mean.
+ * sc.defaultParallelism used for the number of partitions in the RDD.
+ *
+ * @param sc SparkContext used to create the RDD.
+ * @param mean Mean, or lambda, for the Poisson distribution.
+ * @param numRows Number of Vectors in the RDD.
+ * @param numCols Number of elements in each Vector.
+ * @return RDD[Vector] with vectors containing i.i.d samples ~ Pois(mean).
+ */
+ @Experimental
+ def poissonVectorRDD(sc: SparkContext,
+ mean: Double,
+ numRows: Long,
+ numCols: Int): RDD[Vector] = {
+ poissonVectorRDD(sc, mean, numRows, numCols, sc.defaultParallelism, Utils.random.nextLong)
+ }
+
+ /**
+ * :: Experimental ::
+ * Generates an RDD[Vector] with vectors containing i.i.d samples produced by the
+ * input DistributionGenerator.
+ *
+ * @param sc SparkContext used to create the RDD.
+ * @param generator DistributionGenerator used to populate the RDD.
+ * @param numRows Number of Vectors in the RDD.
+ * @param numCols Number of elements in each Vector.
+ * @param numPartitions Number of partitions in the RDD.
+ * @param seed Seed for the RNG that generates the seed for the generator in each partition.
+ * @return RDD[Vector] with vectors containing i.i.d samples produced by generator.
+ */
+ @Experimental
+ def randomVectorRDD(sc: SparkContext,
+ generator: DistributionGenerator,
+ numRows: Long,
+ numCols: Int,
+ numPartitions: Int,
+ seed: Long): RDD[Vector] = {
+ new RandomVectorRDD(sc, numRows, numCols, numPartitions, generator, seed)
+ }
+
+ /**
+ * :: Experimental ::
+ * Generates an RDD[Vector] with vectors containing i.i.d samples produced by the
+ * input DistributionGenerator.
+ *
+ * @param sc SparkContext used to create the RDD.
+ * @param generator DistributionGenerator used to populate the RDD.
+ * @param numRows Number of Vectors in the RDD.
+ * @param numCols Number of elements in each Vector.
+ * @param numPartitions Number of partitions in the RDD.
+ * @return RDD[Vector] with vectors containing i.i.d samples produced by generator.
+ */
+ @Experimental
+ def randomVectorRDD(sc: SparkContext,
+ generator: DistributionGenerator,
+ numRows: Long,
+ numCols: Int,
+ numPartitions: Int): RDD[Vector] = {
+ randomVectorRDD(sc, generator, numRows, numCols, numPartitions, Utils.random.nextLong)
+ }
+
+ /**
+ * :: Experimental ::
+ * Generates an RDD[Vector] with vectors containing i.i.d samples produced by the
+ * input DistributionGenerator.
+ * sc.defaultParallelism used for the number of partitions in the RDD.
+ *
+ * @param sc SparkContext used to create the RDD.
+ * @param generator DistributionGenerator used to populate the RDD.
+ * @param numRows Number of Vectors in the RDD.
+ * @param numCols Number of elements in each Vector.
+ * @return RDD[Vector] with vectors containing i.i.d samples produced by generator.
+ */
+ @Experimental
+ def randomVectorRDD(sc: SparkContext,
+ generator: DistributionGenerator,
+ numRows: Long,
+ numCols: Int): RDD[Vector] = {
+ randomVectorRDD(sc, generator, numRows, numCols,
+ sc.defaultParallelism, Utils.random.nextLong)
+ }
+}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala
new file mode 100644
index 0000000000..f13282d07f
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.rdd
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.mllib.linalg.{DenseVector, Vector}
+import org.apache.spark.mllib.random.DistributionGenerator
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
+
+import scala.util.Random
+
+private[mllib] class RandomRDDPartition(override val index: Int,
+ val size: Int,
+ val generator: DistributionGenerator,
+ val seed: Long) extends Partition {
+
+ require(size >= 0, "Non-negative partition size required.")
+}
+
+// These two classes are necessary since Range objects in Scala cannot have size > Int.MaxValue
+private[mllib] class RandomRDD(@transient sc: SparkContext,
+ size: Long,
+ numPartitions: Int,
+ @transient rng: DistributionGenerator,
+ @transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, Nil) {
+
+ require(size > 0, "Positive RDD size required.")
+ require(numPartitions > 0, "Positive number of partitions required")
+ require(math.ceil(size.toDouble / numPartitions) <= Int.MaxValue,
+ "Partition size cannot exceed Int.MaxValue")
+
+ override def compute(splitIn: Partition, context: TaskContext): Iterator[Double] = {
+ val split = splitIn.asInstanceOf[RandomRDDPartition]
+ RandomRDD.getPointIterator(split)
+ }
+
+ override def getPartitions: Array[Partition] = {
+ RandomRDD.getPartitions(size, numPartitions, rng, seed)
+ }
+}
+
+private[mllib] class RandomVectorRDD(@transient sc: SparkContext,
+ size: Long,
+ vectorSize: Int,
+ numPartitions: Int,
+ @transient rng: DistributionGenerator,
+ @transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, Nil) {
+
+ require(size > 0, "Positive RDD size required.")
+ require(numPartitions > 0, "Positive number of partitions required")
+ require(vectorSize > 0, "Positive vector size required.")
+ require(math.ceil(size.toDouble / numPartitions) <= Int.MaxValue,
+ "Partition size cannot exceed Int.MaxValue")
+
+ override def compute(splitIn: Partition, context: TaskContext): Iterator[Vector] = {
+ val split = splitIn.asInstanceOf[RandomRDDPartition]
+ RandomRDD.getVectorIterator(split, vectorSize)
+ }
+
+ override protected def getPartitions: Array[Partition] = {
+ RandomRDD.getPartitions(size, numPartitions, rng, seed)
+ }
+}
+
+private[mllib] object RandomRDD {
+
+ def getPartitions(size: Long,
+ numPartitions: Int,
+ rng: DistributionGenerator,
+ seed: Long): Array[Partition] = {
+
+ val partitions = new Array[RandomRDDPartition](numPartitions)
+ var i = 0
+ var start: Long = 0
+ var end: Long = 0
+ val random = new Random(seed)
+ while (i < numPartitions) {
+ end = ((i + 1) * size) / numPartitions
+ partitions(i) = new RandomRDDPartition(i, (end - start).toInt, rng, random.nextLong())
+ start = end
+ i += 1
+ }
+ partitions.asInstanceOf[Array[Partition]]
+ }
+
+ // The RNG has to be reset every time the iterator is requested to guarantee same data
+ // every time the content of the RDD is examined.
+ def getPointIterator(partition: RandomRDDPartition): Iterator[Double] = {
+ val generator = partition.generator.copy()
+ generator.setSeed(partition.seed)
+ Array.fill(partition.size)(generator.nextValue()).toIterator
+ }
+
+ // The RNG has to be reset every time the iterator is requested to guarantee same data
+ // every time the content of the RDD is examined.
+ def getVectorIterator(partition: RandomRDDPartition, vectorSize: Int): Iterator[Vector] = {
+ val generator = partition.generator.copy()
+ generator.setSeed(partition.seed)
+ Array.fill(partition.size)(new DenseVector(
+ (0 until vectorSize).map { _ => generator.nextValue() }.toArray)).toIterator
+ }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/random/DistributionGeneratorSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/random/DistributionGeneratorSuite.scala
new file mode 100644
index 0000000000..974dec4c0b
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/random/DistributionGeneratorSuite.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.random
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.util.StatCounter
+
+// TODO update tests to use TestingUtils for floating point comparison after PR 1367 is merged
+class DistributionGeneratorSuite extends FunSuite {
+
+ def apiChecks(gen: DistributionGenerator) {
+
+ // resetting seed should generate the same sequence of random numbers
+ gen.setSeed(42L)
+ val array1 = (0 until 1000).map(_ => gen.nextValue())
+ gen.setSeed(42L)
+ val array2 = (0 until 1000).map(_ => gen.nextValue())
+ assert(array1.equals(array2))
+
+ // newInstance should contain a difference instance of the rng
+ // i.e. setting difference seeds for difference instances produces different sequences of
+ // random numbers.
+ val gen2 = gen.copy()
+ gen.setSeed(0L)
+ val array3 = (0 until 1000).map(_ => gen.nextValue())
+ gen2.setSeed(1L)
+ val array4 = (0 until 1000).map(_ => gen2.nextValue())
+ // Compare arrays instead of elements since individual elements can coincide by chance but the
+ // sequences should differ given two different seeds.
+ assert(!array3.equals(array4))
+
+ // test that setting the same seed in the copied instance produces the same sequence of numbers
+ gen.setSeed(0L)
+ val array5 = (0 until 1000).map(_ => gen.nextValue())
+ gen2.setSeed(0L)
+ val array6 = (0 until 1000).map(_ => gen2.nextValue())
+ assert(array5.equals(array6))
+ }
+
+ def distributionChecks(gen: DistributionGenerator,
+ mean: Double = 0.0,
+ stddev: Double = 1.0,
+ epsilon: Double = 0.01) {
+ for (seed <- 0 until 5) {
+ gen.setSeed(seed.toLong)
+ val sample = (0 until 100000).map { _ => gen.nextValue()}
+ val stats = new StatCounter(sample)
+ assert(math.abs(stats.mean - mean) < epsilon)
+ assert(math.abs(stats.stdev - stddev) < epsilon)
+ }
+ }
+
+ test("UniformGenerator") {
+ val uniform = new UniformGenerator()
+ apiChecks(uniform)
+ // Stddev of uniform distribution = (ub - lb) / math.sqrt(12)
+ distributionChecks(uniform, 0.5, 1 / math.sqrt(12))
+ }
+
+ test("StandardNormalGenerator") {
+ val normal = new StandardNormalGenerator()
+ apiChecks(normal)
+ distributionChecks(normal, 0.0, 1.0)
+ }
+
+ test("PoissonGenerator") {
+ // mean = 0.0 will not pass the API checks since 0.0 is always deterministically produced.
+ for (mean <- List(1.0, 5.0, 100.0)) {
+ val poisson = new PoissonGenerator(mean)
+ apiChecks(poisson)
+ distributionChecks(poisson, mean, math.sqrt(mean), 0.1)
+ }
+ }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala
new file mode 100644
index 0000000000..6aa4f803df
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.random
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.Vector
+import org.apache.spark.mllib.rdd.{RandomRDDPartition, RandomRDD}
+import org.apache.spark.mllib.util.LocalSparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.StatCounter
+
+/*
+ * Note: avoid including APIs that do not set the seed for the RNG in unit tests
+ * in order to guarantee deterministic behavior.
+ *
+ * TODO update tests to use TestingUtils for floating point comparison after PR 1367 is merged
+ */
+class RandomRDDGeneratorsSuite extends FunSuite with LocalSparkContext with Serializable {
+
+ def testGeneratedRDD(rdd: RDD[Double],
+ expectedSize: Long,
+ expectedNumPartitions: Int,
+ expectedMean: Double,
+ expectedStddev: Double,
+ epsilon: Double = 0.01) {
+ val stats = rdd.stats()
+ assert(expectedSize === stats.count)
+ assert(expectedNumPartitions === rdd.partitions.size)
+ assert(math.abs(stats.mean - expectedMean) < epsilon)
+ assert(math.abs(stats.stdev - expectedStddev) < epsilon)
+ }
+
+ // assume test RDDs are small
+ def testGeneratedVectorRDD(rdd: RDD[Vector],
+ expectedRows: Long,
+ expectedColumns: Int,
+ expectedNumPartitions: Int,
+ expectedMean: Double,
+ expectedStddev: Double,
+ epsilon: Double = 0.01) {
+ assert(expectedNumPartitions === rdd.partitions.size)
+ val values = new ArrayBuffer[Double]()
+ rdd.collect.foreach { vector => {
+ assert(vector.size === expectedColumns)
+ values ++= vector.toArray
+ }}
+ assert(expectedRows === values.size / expectedColumns)
+ val stats = new StatCounter(values)
+ assert(math.abs(stats.mean - expectedMean) < epsilon)
+ assert(math.abs(stats.stdev - expectedStddev) < epsilon)
+ }
+
+ test("RandomRDD sizes") {
+
+ // some cases where size % numParts != 0 to test getPartitions behaves correctly
+ for ((size, numPartitions) <- List((10000, 6), (12345, 1), (1000, 101))) {
+ val rdd = new RandomRDD(sc, size, numPartitions, new UniformGenerator, 0L)
+ assert(rdd.count() === size)
+ assert(rdd.partitions.size === numPartitions)
+
+ // check that partition sizes are balanced
+ val partSizes = rdd.partitions.map(p => p.asInstanceOf[RandomRDDPartition].size.toDouble)
+ val partStats = new StatCounter(partSizes)
+ assert(partStats.max - partStats.min <= 1)
+ }
+
+ // size > Int.MaxValue
+ val size = Int.MaxValue.toLong * 100L
+ val numPartitions = 101
+ val rdd = new RandomRDD(sc, size, numPartitions, new UniformGenerator, 0L)
+ assert(rdd.partitions.size === numPartitions)
+ val count = rdd.partitions.foldLeft(0L) { (count, part) =>
+ count + part.asInstanceOf[RandomRDDPartition].size
+ }
+ assert(count === size)
+
+ // size needs to be positive
+ intercept[IllegalArgumentException] { new RandomRDD(sc, 0, 10, new UniformGenerator, 0L) }
+
+ // numPartitions needs to be positive
+ intercept[IllegalArgumentException] { new RandomRDD(sc, 100, 0, new UniformGenerator, 0L) }
+
+ // partition size needs to be <= Int.MaxValue
+ intercept[IllegalArgumentException] {
+ new RandomRDD(sc, Int.MaxValue.toLong * 100L, 99, new UniformGenerator, 0L)
+ }
+ }
+
+ test("randomRDD for different distributions") {
+ val size = 100000L
+ val numPartitions = 10
+ val poissonMean = 100.0
+
+ for (seed <- 0 until 5) {
+ val uniform = RandomRDDGenerators.uniformRDD(sc, size, numPartitions, seed)
+ testGeneratedRDD(uniform, size, numPartitions, 0.5, 1 / math.sqrt(12))
+
+ val normal = RandomRDDGenerators.normalRDD(sc, size, numPartitions, seed)
+ testGeneratedRDD(normal, size, numPartitions, 0.0, 1.0)
+
+ val poisson = RandomRDDGenerators.poissonRDD(sc, poissonMean, size, numPartitions, seed)
+ testGeneratedRDD(poisson, size, numPartitions, poissonMean, math.sqrt(poissonMean), 0.1)
+ }
+
+ // mock distribution to check that partitions have unique seeds
+ val random = RandomRDDGenerators.randomRDD(sc, new MockDistro(), 1000L, 1000, 0L)
+ assert(random.collect.size === random.collect.distinct.size)
+ }
+
+ test("randomVectorRDD for different distributions") {
+ val rows = 1000L
+ val cols = 100
+ val parts = 10
+ val poissonMean = 100.0
+
+ for (seed <- 0 until 5) {
+ val uniform = RandomRDDGenerators.uniformVectorRDD(sc, rows, cols, parts, seed)
+ testGeneratedVectorRDD(uniform, rows, cols, parts, 0.5, 1 / math.sqrt(12))
+
+ val normal = RandomRDDGenerators.normalVectorRDD(sc, rows, cols, parts, seed)
+ testGeneratedVectorRDD(normal, rows, cols, parts, 0.0, 1.0)
+
+ val poisson = RandomRDDGenerators.poissonVectorRDD(sc, poissonMean, rows, cols, parts, seed)
+ testGeneratedVectorRDD(poisson, rows, cols, parts, poissonMean, math.sqrt(poissonMean), 0.1)
+ }
+ }
+}
+
+private[random] class MockDistro extends DistributionGenerator {
+
+ var seed = 0L
+
+ // This allows us to check that each partition has a different seed
+ override def nextValue(): Double = seed.toDouble
+
+ override def setSeed(seed: Long) = this.seed = seed
+
+ override def copy(): MockDistro = new MockDistro
+}