aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorRJ Nowling <rnowling@gmail.com>2014-12-18 21:00:49 -0800
committerXiangrui Meng <meng@databricks.com>2014-12-18 21:00:49 -0800
commitee1fb97a97d5ac18cd2ad8028e84ecbd988fb811 (patch)
tree37772c73f752612bc4f1cb24d245b16cf5764f4a /mllib
parentc3d91da5ea8b85ca75444ec606f2e1eae376c4b2 (diff)
downloadspark-ee1fb97a97d5ac18cd2ad8028e84ecbd988fb811.tar.gz
spark-ee1fb97a97d5ac18cd2ad8028e84ecbd988fb811.tar.bz2
spark-ee1fb97a97d5ac18cd2ad8028e84ecbd988fb811.zip
[SPARK-4728][MLLib] Add exponential, gamma, and log normal sampling to MLlib da...
...ta generators This patch adds: * Exponential, gamma, and log normal generators that wrap Apache Commons math3 to the private API * Functions for generating exponential, gamma, and log normal RDDs and vector RDDs * Tests for the above Author: RJ Nowling <rnowling@gmail.com> Closes #3680 from rnowling/spark4728 and squashes the following commits: 455f50a [RJ Nowling] Add tests for exponential, gamma, and log normal samplers to JavaRandomRDDsSuite 3e1134a [RJ Nowling] Fix val/var, unncessary creation of Distribution objects when setting seeds, and import line longer than line wrap limits 58f5b97 [RJ Nowling] Fix bounds in tests so they scale with variance, not stdev 84fd98d [RJ Nowling] Add more values for testing distributions. 9f96232 [RJ Nowling] [SPARK-4728] Add exponential, gamma, and log normal sampling to MLlib data generators
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala69
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala363
-rw-r--r--mllib/src/test/java/org/apache/spark/mllib/random/JavaRandomRDDsSuite.java99
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/random/RandomDataGeneratorSuite.scala52
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDsSuite.scala43
5 files changed, 622 insertions, 4 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala
index 51f9b8657c..405bae62ee 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala
@@ -17,7 +17,8 @@
package org.apache.spark.mllib.random
-import org.apache.commons.math3.distribution.PoissonDistribution
+import org.apache.commons.math3.distribution.{ExponentialDistribution,
+ GammaDistribution, LogNormalDistribution, PoissonDistribution}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom}
@@ -88,14 +89,76 @@ class StandardNormalGenerator extends RandomDataGenerator[Double] {
@DeveloperApi
class PoissonGenerator(val mean: Double) extends RandomDataGenerator[Double] {
- private var rng = new PoissonDistribution(mean)
+ private val rng = new PoissonDistribution(mean)
override def nextValue(): Double = rng.sample()
override def setSeed(seed: Long) {
- rng = new PoissonDistribution(mean)
rng.reseedRandomGenerator(seed)
}
override def copy(): PoissonGenerator = new PoissonGenerator(mean)
}
+
+/**
+ * :: DeveloperApi ::
+ * Generates i.i.d. samples from the exponential distribution with the given mean.
+ *
+ * @param mean mean for the exponential distribution.
+ */
+@DeveloperApi
+class ExponentialGenerator(val mean: Double) extends RandomDataGenerator[Double] {
+
+ private val rng = new ExponentialDistribution(mean)
+
+ override def nextValue(): Double = rng.sample()
+
+ override def setSeed(seed: Long) {
+ rng.reseedRandomGenerator(seed)
+ }
+
+ override def copy(): ExponentialGenerator = new ExponentialGenerator(mean)
+}
+
+/**
+ * :: DeveloperApi ::
+ * Generates i.i.d. samples from the gamma distribution with the given shape and scale.
+ *
+ * @param shape shape for the gamma distribution.
+ * @param scale scale for the gamma distribution
+ */
+@DeveloperApi
+class GammaGenerator(val shape: Double, val scale: Double) extends RandomDataGenerator[Double] {
+
+ private val rng = new GammaDistribution(shape, scale)
+
+ override def nextValue(): Double = rng.sample()
+
+ override def setSeed(seed: Long) {
+ rng.reseedRandomGenerator(seed)
+ }
+
+ override def copy(): GammaGenerator = new GammaGenerator(shape, scale)
+}
+
+/**
+ * :: DeveloperApi ::
+ * Generates i.i.d. samples from the log normal distribution with the
+ * given mean and standard deviation.
+ *
+ * @param mean mean for the log normal distribution.
+ * @param std standard deviation for the log normal distribution
+ */
+@DeveloperApi
+class LogNormalGenerator(val mean: Double, val std: Double) extends RandomDataGenerator[Double] {
+
+ private val rng = new LogNormalDistribution(mean, std)
+
+ override def nextValue(): Double = rng.sample()
+
+ override def setSeed(seed: Long) {
+ rng.reseedRandomGenerator(seed)
+ }
+
+ override def copy(): LogNormalGenerator = new LogNormalGenerator(mean, std)
+}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala
index c5f4b08432..955c593a08 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala
@@ -177,6 +177,176 @@ object RandomRDDs {
}
/**
+ * Generates an RDD comprised of i.i.d. samples from the exponential distribution with
+ * the input mean.
+ *
+ * @param sc SparkContext used to create the RDD.
+ * @param mean Mean, or 1 / lambda, for the exponential distribution.
+ * @param size Size of the RDD.
+ * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`).
+ * @param seed Random seed (default: a random long integer).
+ * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean).
+ */
+ def exponentialRDD(
+ sc: SparkContext,
+ mean: Double,
+ size: Long,
+ numPartitions: Int = 0,
+ seed: Long = Utils.random.nextLong()): RDD[Double] = {
+ val exponential = new ExponentialGenerator(mean)
+ randomRDD(sc, exponential, size, numPartitionsOrDefault(sc, numPartitions), seed)
+ }
+
+ /**
+ * Java-friendly version of [[RandomRDDs#exponentialRDD]].
+ */
+ def exponentialJavaRDD(
+ jsc: JavaSparkContext,
+ mean: Double,
+ size: Long,
+ numPartitions: Int,
+ seed: Long): JavaDoubleRDD = {
+ JavaDoubleRDD.fromRDD(exponentialRDD(jsc.sc, mean, size, numPartitions, seed))
+ }
+
+ /**
+ * [[RandomRDDs#exponentialJavaRDD]] with the default seed.
+ */
+ def exponentialJavaRDD(
+ jsc: JavaSparkContext,
+ mean: Double,
+ size: Long,
+ numPartitions: Int): JavaDoubleRDD = {
+ JavaDoubleRDD.fromRDD(exponentialRDD(jsc.sc, mean, size, numPartitions))
+ }
+
+ /**
+ * [[RandomRDDs#exponentialJavaRDD]] with the default number of partitions and the default seed.
+ */
+ def exponentialJavaRDD(jsc: JavaSparkContext, mean: Double, size: Long): JavaDoubleRDD = {
+ JavaDoubleRDD.fromRDD(exponentialRDD(jsc.sc, mean, size))
+ }
+
+ /**
+ * Generates an RDD comprised of i.i.d. samples from the gamma distribution with the input
+ * shape and scale.
+ *
+ * @param sc SparkContext used to create the RDD.
+ * @param shape shape parameter (> 0) for the gamma distribution
+ * @param scale scale parameter (> 0) for the gamma distribution
+ * @param size Size of the RDD.
+ * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`).
+ * @param seed Random seed (default: a random long integer).
+ * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean).
+ */
+ def gammaRDD(
+ sc: SparkContext,
+ shape: Double,
+ scale: Double,
+ size: Long,
+ numPartitions: Int = 0,
+ seed: Long = Utils.random.nextLong()): RDD[Double] = {
+ val gamma = new GammaGenerator(shape, scale)
+ randomRDD(sc, gamma, size, numPartitionsOrDefault(sc, numPartitions), seed)
+ }
+
+ /**
+ * Java-friendly version of [[RandomRDDs#gammaRDD]].
+ */
+ def gammaJavaRDD(
+ jsc: JavaSparkContext,
+ shape: Double,
+ scale: Double,
+ size: Long,
+ numPartitions: Int,
+ seed: Long): JavaDoubleRDD = {
+ JavaDoubleRDD.fromRDD(gammaRDD(jsc.sc, shape, scale, size, numPartitions, seed))
+ }
+
+ /**
+ * [[RandomRDDs#gammaJavaRDD]] with the default seed.
+ */
+ def gammaJavaRDD(
+ jsc: JavaSparkContext,
+ shape: Double,
+ scale: Double,
+ size: Long,
+ numPartitions: Int): JavaDoubleRDD = {
+ JavaDoubleRDD.fromRDD(gammaRDD(jsc.sc, shape, scale, size, numPartitions))
+ }
+
+ /**
+ * [[RandomRDDs#gammaJavaRDD]] with the default number of partitions and the default seed.
+ */
+ def gammaJavaRDD(
+ jsc: JavaSparkContext,
+ shape: Double,
+ scale: Double,
+ size: Long): JavaDoubleRDD = {
+ JavaDoubleRDD.fromRDD(gammaRDD(jsc.sc, shape, scale, size))
+ }
+
+ /**
+ * Generates an RDD comprised of i.i.d. samples from the log normal distribution with the input
+ * mean and standard deviation
+ *
+ * @param sc SparkContext used to create the RDD.
+ * @param mean mean for the log normal distribution
+ * @param std standard deviation for the log normal distribution
+ * @param size Size of the RDD.
+ * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`).
+ * @param seed Random seed (default: a random long integer).
+ * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean).
+ */
+ def logNormalRDD(
+ sc: SparkContext,
+ mean: Double,
+ std: Double,
+ size: Long,
+ numPartitions: Int = 0,
+ seed: Long = Utils.random.nextLong()): RDD[Double] = {
+ val logNormal = new LogNormalGenerator(mean, std)
+ randomRDD(sc, logNormal, size, numPartitionsOrDefault(sc, numPartitions), seed)
+ }
+
+ /**
+ * Java-friendly version of [[RandomRDDs#logNormalRDD]].
+ */
+ def logNormalJavaRDD(
+ jsc: JavaSparkContext,
+ mean: Double,
+ std: Double,
+ size: Long,
+ numPartitions: Int,
+ seed: Long): JavaDoubleRDD = {
+ JavaDoubleRDD.fromRDD(logNormalRDD(jsc.sc, mean, std, size, numPartitions, seed))
+ }
+
+ /**
+ * [[RandomRDDs#logNormalJavaRDD]] with the default seed.
+ */
+ def logNormalJavaRDD(
+ jsc: JavaSparkContext,
+ mean: Double,
+ std: Double,
+ size: Long,
+ numPartitions: Int): JavaDoubleRDD = {
+ JavaDoubleRDD.fromRDD(logNormalRDD(jsc.sc, mean, std, size, numPartitions))
+ }
+
+ /**
+ * [[RandomRDDs#logNormalJavaRDD]] with the default number of partitions and the default seed.
+ */
+ def logNormalJavaRDD(
+ jsc: JavaSparkContext,
+ mean: Double,
+ std: Double,
+ size: Long): JavaDoubleRDD = {
+ JavaDoubleRDD.fromRDD(logNormalRDD(jsc.sc, mean, std, size))
+ }
+
+
+ /**
* :: DeveloperApi ::
* Generates an RDD comprised of i.i.d. samples produced by the input RandomDataGenerator.
*
@@ -308,6 +478,72 @@ object RandomRDDs {
}
/**
+ * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from a
+ * log normal distribution.
+ *
+ * @param sc SparkContext used to create the RDD.
+ * @param mean Mean of the log normal distribution.
+ * @param std Standard deviation of the log normal distribution.
+ * @param numRows Number of Vectors in the RDD.
+ * @param numCols Number of elements in each Vector.
+ * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`).
+ * @param seed Random seed (default: a random long integer).
+ * @return RDD[Vector] with vectors containing i.i.d. samples.
+ */
+ def logNormalVectorRDD(
+ sc: SparkContext,
+ mean: Double,
+ std: Double,
+ numRows: Long,
+ numCols: Int,
+ numPartitions: Int = 0,
+ seed: Long = Utils.random.nextLong()): RDD[Vector] = {
+ val logNormal = new LogNormalGenerator(mean, std)
+ randomVectorRDD(sc, logNormal, numRows, numCols,
+ numPartitionsOrDefault(sc, numPartitions), seed)
+ }
+
+ /**
+ * Java-friendly version of [[RandomRDDs#logNormalVectorRDD]].
+ */
+ def logNormalJavaVectorRDD(
+ jsc: JavaSparkContext,
+ mean: Double,
+ std: Double,
+ numRows: Long,
+ numCols: Int,
+ numPartitions: Int,
+ seed: Long): JavaRDD[Vector] = {
+ logNormalVectorRDD(jsc.sc, mean, std, numRows, numCols, numPartitions, seed).toJavaRDD()
+ }
+
+ /**
+ * [[RandomRDDs#logNormalJavaVectorRDD]] with the default seed.
+ */
+ def logNormalJavaVectorRDD(
+ jsc: JavaSparkContext,
+ mean: Double,
+ std: Double,
+ numRows: Long,
+ numCols: Int,
+ numPartitions: Int): JavaRDD[Vector] = {
+ logNormalVectorRDD(jsc.sc, mean, std, numRows, numCols, numPartitions).toJavaRDD()
+ }
+
+ /**
+ * [[RandomRDDs#logNormalJavaVectorRDD]] with the default number of partitions and
+ * the default seed.
+ */
+ def logNormalJavaVectorRDD(
+ jsc: JavaSparkContext,
+ mean: Double,
+ std: Double,
+ numRows: Long,
+ numCols: Int): JavaRDD[Vector] = {
+ logNormalVectorRDD(jsc.sc, mean, std, numRows, numCols).toJavaRDD()
+ }
+
+ /**
* Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
* Poisson distribution with the input mean.
*
@@ -367,6 +603,133 @@ object RandomRDDs {
}
/**
+ * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
+ * exponential distribution with the input mean.
+ *
+ * @param sc SparkContext used to create the RDD.
+ * @param mean Mean, or 1 / lambda, for the Exponential distribution.
+ * @param numRows Number of Vectors in the RDD.
+ * @param numCols Number of elements in each Vector.
+ * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`)
+ * @param seed Random seed (default: a random long integer).
+ * @return RDD[Vector] with vectors containing i.i.d. samples ~ Exp(mean).
+ */
+ def exponentialVectorRDD(
+ sc: SparkContext,
+ mean: Double,
+ numRows: Long,
+ numCols: Int,
+ numPartitions: Int = 0,
+ seed: Long = Utils.random.nextLong()): RDD[Vector] = {
+ val exponential = new ExponentialGenerator(mean)
+ randomVectorRDD(sc, exponential, numRows, numCols,
+ numPartitionsOrDefault(sc, numPartitions), seed)
+ }
+
+ /**
+ * Java-friendly version of [[RandomRDDs#exponentialVectorRDD]].
+ */
+ def exponentialJavaVectorRDD(
+ jsc: JavaSparkContext,
+ mean: Double,
+ numRows: Long,
+ numCols: Int,
+ numPartitions: Int,
+ seed: Long): JavaRDD[Vector] = {
+ exponentialVectorRDD(jsc.sc, mean, numRows, numCols, numPartitions, seed).toJavaRDD()
+ }
+
+ /**
+ * [[RandomRDDs#exponentialJavaVectorRDD]] with the default seed.
+ */
+ def exponentialJavaVectorRDD(
+ jsc: JavaSparkContext,
+ mean: Double,
+ numRows: Long,
+ numCols: Int,
+ numPartitions: Int): JavaRDD[Vector] = {
+ exponentialVectorRDD(jsc.sc, mean, numRows, numCols, numPartitions).toJavaRDD()
+ }
+
+ /**
+ * [[RandomRDDs#exponentialJavaVectorRDD]] with the default number of partitions
+ * and the default seed.
+ */
+ def exponentialJavaVectorRDD(
+ jsc: JavaSparkContext,
+ mean: Double,
+ numRows: Long,
+ numCols: Int): JavaRDD[Vector] = {
+ exponentialVectorRDD(jsc.sc, mean, numRows, numCols).toJavaRDD()
+ }
+
+
+ /**
+ * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
+ * gamma distribution with the input shape and scale.
+ *
+ * @param sc SparkContext used to create the RDD.
+ * @param shape shape parameter (> 0) for the gamma distribution.
+ * @param scale scale parameter (> 0) for the gamma distribution.
+ * @param numRows Number of Vectors in the RDD.
+ * @param numCols Number of elements in each Vector.
+ * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`)
+ * @param seed Random seed (default: a random long integer).
+ * @return RDD[Vector] with vectors containing i.i.d. samples ~ Exp(mean).
+ */
+ def gammaVectorRDD(
+ sc: SparkContext,
+ shape: Double,
+ scale: Double,
+ numRows: Long,
+ numCols: Int,
+ numPartitions: Int = 0,
+ seed: Long = Utils.random.nextLong()): RDD[Vector] = {
+ val gamma = new GammaGenerator(shape, scale)
+ randomVectorRDD(sc, gamma, numRows, numCols, numPartitionsOrDefault(sc, numPartitions), seed)
+ }
+
+ /**
+ * Java-friendly version of [[RandomRDDs#gammaVectorRDD]].
+ */
+ def gammaJavaVectorRDD(
+ jsc: JavaSparkContext,
+ shape: Double,
+ scale: Double,
+ numRows: Long,
+ numCols: Int,
+ numPartitions: Int,
+ seed: Long): JavaRDD[Vector] = {
+ gammaVectorRDD(jsc.sc, shape, scale, numRows, numCols, numPartitions, seed).toJavaRDD()
+ }
+
+ /**
+ * [[RandomRDDs#gammaJavaVectorRDD]] with the default seed.
+ */
+ def gammaJavaVectorRDD(
+ jsc: JavaSparkContext,
+ shape: Double,
+ scale: Double,
+ numRows: Long,
+ numCols: Int,
+ numPartitions: Int): JavaRDD[Vector] = {
+ gammaVectorRDD(jsc.sc, shape, scale, numRows, numCols, numPartitions).toJavaRDD()
+ }
+
+ /**
+ * [[RandomRDDs#gammaJavaVectorRDD]] with the default number of partitions and the default seed.
+ */
+ def gammaJavaVectorRDD(
+ jsc: JavaSparkContext,
+ shape: Double,
+ scale: Double,
+ numRows: Long,
+ numCols: Int): JavaRDD[Vector] = {
+ gammaVectorRDD(jsc.sc, shape, scale, numRows, numCols).toJavaRDD()
+ }
+
+
+ /**
* :: DeveloperApi ::
* Generates an RDD[Vector] with vectors containing i.i.d. samples produced by the
* input RandomDataGenerator.
diff --git a/mllib/src/test/java/org/apache/spark/mllib/random/JavaRandomRDDsSuite.java b/mllib/src/test/java/org/apache/spark/mllib/random/JavaRandomRDDsSuite.java
index a725736ca1..fcc13c00cb 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/random/JavaRandomRDDsSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/random/JavaRandomRDDsSuite.java
@@ -70,6 +70,21 @@ public class JavaRandomRDDsSuite {
}
@Test
+ public void testLNormalRDD() {
+ double mean = 4.0;
+ double std = 2.0;
+ long m = 1000L;
+ int p = 2;
+ long seed = 1L;
+ JavaDoubleRDD rdd1 = logNormalJavaRDD(sc, mean, std, m);
+ JavaDoubleRDD rdd2 = logNormalJavaRDD(sc, mean, std, m, p);
+ JavaDoubleRDD rdd3 = logNormalJavaRDD(sc, mean, std, m, p, seed);
+ for (JavaDoubleRDD rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) {
+ Assert.assertEquals(m, rdd.count());
+ }
+ }
+
+ @Test
public void testPoissonRDD() {
double mean = 2.0;
long m = 1000L;
@@ -84,6 +99,36 @@ public class JavaRandomRDDsSuite {
}
@Test
+ public void testExponentialRDD() {
+ double mean = 2.0;
+ long m = 1000L;
+ int p = 2;
+ long seed = 1L;
+ JavaDoubleRDD rdd1 = exponentialJavaRDD(sc, mean, m);
+ JavaDoubleRDD rdd2 = exponentialJavaRDD(sc, mean, m, p);
+ JavaDoubleRDD rdd3 = exponentialJavaRDD(sc, mean, m, p, seed);
+ for (JavaDoubleRDD rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) {
+ Assert.assertEquals(m, rdd.count());
+ }
+ }
+
+ @Test
+ public void testGammaRDD() {
+ double shape = 1.0;
+ double scale = 2.0;
+ long m = 1000L;
+ int p = 2;
+ long seed = 1L;
+ JavaDoubleRDD rdd1 = gammaJavaRDD(sc, shape, scale, m);
+ JavaDoubleRDD rdd2 = gammaJavaRDD(sc, shape, scale, m, p);
+ JavaDoubleRDD rdd3 = gammaJavaRDD(sc, shape, scale, m, p, seed);
+ for (JavaDoubleRDD rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) {
+ Assert.assertEquals(m, rdd.count());
+ }
+ }
+
+
+ @Test
@SuppressWarnings("unchecked")
public void testUniformVectorRDD() {
long m = 100L;
@@ -117,6 +162,24 @@ public class JavaRandomRDDsSuite {
@Test
@SuppressWarnings("unchecked")
+ public void testLogNormalVectorRDD() {
+ double mean = 4.0;
+ double std = 2.0;
+ long m = 100L;
+ int n = 10;
+ int p = 2;
+ long seed = 1L;
+ JavaRDD<Vector> rdd1 = logNormalJavaVectorRDD(sc, mean, std, m, n);
+ JavaRDD<Vector> rdd2 = logNormalJavaVectorRDD(sc, mean, std, m, n, p);
+ JavaRDD<Vector> rdd3 = logNormalJavaVectorRDD(sc, mean, std, m, n, p, seed);
+ for (JavaRDD<Vector> rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) {
+ Assert.assertEquals(m, rdd.count());
+ Assert.assertEquals(n, rdd.first().size());
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
public void testPoissonVectorRDD() {
double mean = 2.0;
long m = 100L;
@@ -131,4 +194,40 @@ public class JavaRandomRDDsSuite {
Assert.assertEquals(n, rdd.first().size());
}
}
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testExponentialVectorRDD() {
+ double mean = 2.0;
+ long m = 100L;
+ int n = 10;
+ int p = 2;
+ long seed = 1L;
+ JavaRDD<Vector> rdd1 = exponentialJavaVectorRDD(sc, mean, m, n);
+ JavaRDD<Vector> rdd2 = exponentialJavaVectorRDD(sc, mean, m, n, p);
+ JavaRDD<Vector> rdd3 = exponentialJavaVectorRDD(sc, mean, m, n, p, seed);
+ for (JavaRDD<Vector> rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) {
+ Assert.assertEquals(m, rdd.count());
+ Assert.assertEquals(n, rdd.first().size());
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testGammaVectorRDD() {
+ double shape = 1.0;
+ double scale = 2.0;
+ long m = 100L;
+ int n = 10;
+ int p = 2;
+ long seed = 1L;
+ JavaRDD<Vector> rdd1 = gammaJavaVectorRDD(sc, shape, scale, m, n);
+ JavaRDD<Vector> rdd2 = gammaJavaVectorRDD(sc, shape, scale, m, n, p);
+ JavaRDD<Vector> rdd3 = gammaJavaVectorRDD(sc, shape, scale, m, n, p, seed);
+ for (JavaRDD<Vector> rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) {
+ Assert.assertEquals(m, rdd.count());
+ Assert.assertEquals(n, rdd.first().size());
+ }
+ }
+
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/random/RandomDataGeneratorSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/random/RandomDataGeneratorSuite.scala
index 3df7c128af..b792d819fd 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/random/RandomDataGeneratorSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/random/RandomDataGeneratorSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.mllib.random
+import scala.math
+
import org.scalatest.FunSuite
import org.apache.spark.util.StatCounter
@@ -25,7 +27,6 @@ import org.apache.spark.util.StatCounter
class RandomDataGeneratorSuite extends FunSuite {
def apiChecks(gen: RandomDataGenerator[Double]) {
-
// resetting seed should generate the same sequence of random numbers
gen.setSeed(42L)
val array1 = (0 until 1000).map(_ => gen.nextValue())
@@ -79,6 +80,26 @@ class RandomDataGeneratorSuite extends FunSuite {
distributionChecks(normal, 0.0, 1.0)
}
+ test("LogNormalGenerator") {
+ List((0.0, 1.0), (0.0, 2.0), (2.0, 1.0), (2.0, 2.0)).map {
+ case (mean: Double, vari: Double) =>
+ val normal = new LogNormalGenerator(mean, math.sqrt(vari))
+ apiChecks(normal)
+
+ // mean of log normal = e^(mean + var / 2)
+ val expectedMean = math.exp(mean + 0.5 * vari)
+
+ // variance of log normal = (e^var - 1) * e^(2 * mean + var)
+ val expectedStd = math.sqrt((math.exp(vari) - 1.0) * math.exp(2.0 * mean + vari))
+
+ // since sampling error increases with variance, let's set
+ // the absolute tolerance as a percentage
+ val epsilon = 0.05 * expectedStd * expectedStd
+
+ distributionChecks(normal, expectedMean, expectedStd, epsilon)
+ }
+ }
+
test("PoissonGenerator") {
// mean = 0.0 will not pass the API checks since 0.0 is always deterministically produced.
for (mean <- List(1.0, 5.0, 100.0)) {
@@ -87,4 +108,33 @@ class RandomDataGeneratorSuite extends FunSuite {
distributionChecks(poisson, mean, math.sqrt(mean), 0.1)
}
}
+
+ test("ExponentialGenerator") {
+ // mean = 0.0 will not pass the API checks since 0.0 is always deterministically produced.
+ for (mean <- List(2.0, 5.0, 10.0, 50.0, 100.0)) {
+ val exponential = new ExponentialGenerator(mean)
+ apiChecks(exponential)
+ // var of exp = lambda^-2 = (1.0 / mean)^-2 = mean^2
+
+ // since sampling error increases with variance, let's set
+ // the absolute tolerance as a percentage
+ val epsilon = 0.05 * mean * mean
+
+ distributionChecks(exponential, mean, mean, epsilon)
+ }
+ }
+
+ test("GammaGenerator") {
+ // mean = 0.0 will not pass the API checks since 0.0 is always deterministically produced.
+ List((1.0, 2.0), (2.0, 2.0), (3.0, 2.0), (5.0, 1.0), (9.0, 0.5)).map {
+ case (shape: Double, scale: Double) =>
+ val gamma = new GammaGenerator(shape, scale)
+ apiChecks(gamma)
+ // mean of gamma = shape * scale
+ val expectedMean = shape * scale
+ // var of gamma = shape * scale^2
+ val expectedStd = math.sqrt(shape * scale * scale)
+ distributionChecks(gamma, expectedMean, expectedStd, 0.1)
+ }
+ }
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDsSuite.scala
index ea5889b3ec..6395188a08 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDsSuite.scala
@@ -110,7 +110,19 @@ class RandomRDDsSuite extends FunSuite with MLlibTestSparkContext with Serializa
test("randomRDD for different distributions") {
val size = 100000L
val numPartitions = 10
+
+ // mean of log normal = e^(mean + var / 2)
+ val logNormalMean = math.exp(0.5)
+ // variance of log normal = (e^var - 1) * e^(2 * mean + var)
+ val logNormalStd = math.sqrt((math.E - 1.0) * math.E)
+ val gammaScale = 1.0
+ val gammaShape = 2.0
+ // mean of gamma = shape * scale
+ val gammaMean = gammaShape * gammaScale
+ // var of gamma = shape * scale^2
+ val gammaStd = math.sqrt(gammaShape * gammaScale * gammaScale)
val poissonMean = 100.0
+ val exponentialMean = 1.0
for (seed <- 0 until 5) {
val uniform = RandomRDDs.uniformRDD(sc, size, numPartitions, seed)
@@ -119,8 +131,18 @@ class RandomRDDsSuite extends FunSuite with MLlibTestSparkContext with Serializa
val normal = RandomRDDs.normalRDD(sc, size, numPartitions, seed)
testGeneratedRDD(normal, size, numPartitions, 0.0, 1.0)
+ val logNormal = RandomRDDs.logNormalRDD(sc, 0.0, 1.0, size, numPartitions, seed)
+ testGeneratedRDD(logNormal, size, numPartitions, logNormalMean, logNormalStd, 0.1)
+
val poisson = RandomRDDs.poissonRDD(sc, poissonMean, size, numPartitions, seed)
testGeneratedRDD(poisson, size, numPartitions, poissonMean, math.sqrt(poissonMean), 0.1)
+
+ val exponential = RandomRDDs.exponentialRDD(sc, exponentialMean, size, numPartitions, seed)
+ testGeneratedRDD(exponential, size, numPartitions, exponentialMean, exponentialMean, 0.1)
+
+ val gamma = RandomRDDs.gammaRDD(sc, gammaShape, gammaScale, size, numPartitions, seed)
+ testGeneratedRDD(gamma, size, numPartitions, gammaMean, gammaStd, 0.1)
+
}
// mock distribution to check that partitions have unique seeds
@@ -132,7 +154,19 @@ class RandomRDDsSuite extends FunSuite with MLlibTestSparkContext with Serializa
val rows = 1000L
val cols = 100
val parts = 10
+
+ // mean of log normal = e^(mean + var / 2)
+ val logNormalMean = math.exp(0.5)
+ // variance of log normal = (e^var - 1) * e^(2 * mean + var)
+ val logNormalStd = math.sqrt((math.E - 1.0) * math.E)
+ val gammaScale = 1.0
+ val gammaShape = 2.0
+ // mean of gamma = shape * scale
+ val gammaMean = gammaShape * gammaScale
+ // var of gamma = shape * scale^2
+ val gammaStd = math.sqrt(gammaShape * gammaScale * gammaScale)
val poissonMean = 100.0
+ val exponentialMean = 1.0
for (seed <- 0 until 5) {
val uniform = RandomRDDs.uniformVectorRDD(sc, rows, cols, parts, seed)
@@ -141,8 +175,17 @@ class RandomRDDsSuite extends FunSuite with MLlibTestSparkContext with Serializa
val normal = RandomRDDs.normalVectorRDD(sc, rows, cols, parts, seed)
testGeneratedVectorRDD(normal, rows, cols, parts, 0.0, 1.0)
+ val logNormal = RandomRDDs.logNormalVectorRDD(sc, 0.0, 1.0, rows, cols, parts, seed)
+ testGeneratedVectorRDD(logNormal, rows, cols, parts, logNormalMean, logNormalStd, 0.1)
+
val poisson = RandomRDDs.poissonVectorRDD(sc, poissonMean, rows, cols, parts, seed)
testGeneratedVectorRDD(poisson, rows, cols, parts, poissonMean, math.sqrt(poissonMean), 0.1)
+
+ val exponential = RandomRDDs.exponentialVectorRDD(sc, exponentialMean, rows, cols, parts, seed)
+ testGeneratedVectorRDD(exponential, rows, cols, parts, exponentialMean, exponentialMean, 0.1)
+
+ val gamma = RandomRDDs.gammaVectorRDD(sc, gammaShape, gammaScale, rows, cols, parts, seed)
+ testGeneratedVectorRDD(gamma, rows, cols, parts, gammaMean, gammaStd, 0.1)
}
}
}