diff options
author | Ankur Dave <ankurdave@gmail.com> | 2014-01-08 21:19:08 -0800 |
---|---|---|
committer | Ankur Dave <ankurdave@gmail.com> | 2014-01-08 21:19:08 -0800 |
commit | 91227566bc9d8aabaec3f2a37a09a17afa20989c (patch) | |
tree | 80d19aac29217005b3f1cb08ca95fa08bbb9d946 /mllib | |
parent | 7210257ba3038d5e22d4b60fe9c3113dc45c3dff (diff) | |
parent | 04d83fc37f9eef89c20331c85291a0a169f75e6d (diff) | |
download | spark-91227566bc9d8aabaec3f2a37a09a17afa20989c.tar.gz spark-91227566bc9d8aabaec3f2a37a09a17afa20989c.tar.bz2 spark-91227566bc9d8aabaec3f2a37a09a17afa20989c.zip |
Merge remote-tracking branch 'spark-upstream/master' into HEAD
Conflicts:
README.md
core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
pom.xml
project/SparkBuild.scala
repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
Diffstat (limited to 'mllib')
12 files changed, 690 insertions, 23 deletions
diff --git a/mllib/pom.xml b/mllib/pom.xml index f472082ad1..dda3900afe 100644 --- a/mllib/pom.xml +++ b/mllib/pom.xml @@ -26,7 +26,7 @@ </parent> <groupId>org.apache.spark</groupId> - <artifactId>spark-mllib_2.9.3</artifactId> + <artifactId>spark-mllib_2.10</artifactId> <packaging>jar</packaging> <name>Spark Project ML Library</name> <url>http://spark.incubator.apache.org/</url> @@ -34,7 +34,7 @@ <dependencies> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-core_2.9.3</artifactId> + <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> <dependency> @@ -48,12 +48,12 @@ </dependency> <dependency> <groupId>org.scalatest</groupId> - <artifactId>scalatest_2.9.3</artifactId> + <artifactId>scalatest_${scala.binary.version}</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.scalacheck</groupId> - <artifactId>scalacheck_2.9.3</artifactId> + <artifactId>scalacheck_${scala.binary.version}</artifactId> <scope>test</scope> </dependency> <dependency> @@ -63,8 +63,8 @@ </dependency> </dependencies> <build> - <outputDirectory>target/scala-${scala.version}/classes</outputDirectory> - <testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory> + <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> + <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> <plugins> <plugin> <groupId>org.scalatest</groupId> diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala new file mode 100644 index 0000000000..2d8623392e --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.api.python +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.mllib.regression._ +import org.apache.spark.mllib.classification._ +import org.apache.spark.mllib.clustering._ +import org.apache.spark.mllib.recommendation._ +import org.apache.spark.rdd.RDD +import java.nio.ByteBuffer +import java.nio.ByteOrder +import java.nio.DoubleBuffer + +/** + * The Java stubs necessary for the Python mllib bindings. + */ +class PythonMLLibAPI extends Serializable { + private def deserializeDoubleVector(bytes: Array[Byte]): Array[Double] = { + val packetLength = bytes.length + if (packetLength < 16) { + throw new IllegalArgumentException("Byte array too short.") + } + val bb = ByteBuffer.wrap(bytes) + bb.order(ByteOrder.nativeOrder()) + val magic = bb.getLong() + if (magic != 1) { + throw new IllegalArgumentException("Magic " + magic + " is wrong.") + } + val length = bb.getLong() + if (packetLength != 16 + 8 * length) { + throw new IllegalArgumentException("Length " + length + " is wrong.") + } + val db = bb.asDoubleBuffer() + val ans = new Array[Double](length.toInt) + db.get(ans) + return ans + } + + private def serializeDoubleVector(doubles: Array[Double]): Array[Byte] = { + val len = doubles.length + val bytes = new Array[Byte](16 + 8 * len) + val bb = ByteBuffer.wrap(bytes) + bb.order(ByteOrder.nativeOrder()) + bb.putLong(1) + bb.putLong(len) + val db = bb.asDoubleBuffer() + db.put(doubles) + return bytes + } + + private def deserializeDoubleMatrix(bytes: Array[Byte]): Array[Array[Double]] = { + val packetLength = bytes.length + if (packetLength < 24) { + throw new IllegalArgumentException("Byte array too short.") + } + val bb = ByteBuffer.wrap(bytes) + bb.order(ByteOrder.nativeOrder()) + val magic = bb.getLong() + if (magic != 2) { + throw new IllegalArgumentException("Magic " + magic + " is wrong.") + } + val rows = bb.getLong() + val cols = bb.getLong() + if (packetLength != 24 + 8 * rows * cols) { + throw new IllegalArgumentException("Size " + rows + "x" + cols + " is wrong.") + } + val db = bb.asDoubleBuffer() + val ans = new Array[Array[Double]](rows.toInt) + var i = 0 + for (i <- 0 until rows.toInt) { + ans(i) = new Array[Double](cols.toInt) + db.get(ans(i)) + } + return ans + } + + private def serializeDoubleMatrix(doubles: Array[Array[Double]]): Array[Byte] = { + val rows = doubles.length + var cols = 0 + if (rows > 0) { + cols = doubles(0).length + } + val bytes = new Array[Byte](24 + 8 * rows * cols) + val bb = ByteBuffer.wrap(bytes) + bb.order(ByteOrder.nativeOrder()) + bb.putLong(2) + bb.putLong(rows) + bb.putLong(cols) + val db = bb.asDoubleBuffer() + var i = 0 + for (i <- 0 until rows) { + db.put(doubles(i)) + } + return bytes + } + + private def trainRegressionModel(trainFunc: (RDD[LabeledPoint], Array[Double]) => GeneralizedLinearModel, + dataBytesJRDD: JavaRDD[Array[Byte]], initialWeightsBA: Array[Byte]): + java.util.LinkedList[java.lang.Object] = { + val data = dataBytesJRDD.rdd.map(xBytes => { + val x = deserializeDoubleVector(xBytes) + LabeledPoint(x(0), x.slice(1, x.length)) + }) + val initialWeights = deserializeDoubleVector(initialWeightsBA) + val model = trainFunc(data, initialWeights) + val ret = new java.util.LinkedList[java.lang.Object]() + ret.add(serializeDoubleVector(model.weights)) + ret.add(model.intercept: java.lang.Double) + return ret + } + + /** + * Java stub for Python mllib LinearRegressionWithSGD.train() + */ + def trainLinearRegressionModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], + numIterations: Int, stepSize: Double, miniBatchFraction: Double, + initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = { + return trainRegressionModel((data, initialWeights) => + LinearRegressionWithSGD.train(data, numIterations, stepSize, + miniBatchFraction, initialWeights), + dataBytesJRDD, initialWeightsBA) + } + + /** + * Java stub for Python mllib LassoWithSGD.train() + */ + def trainLassoModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int, + stepSize: Double, regParam: Double, miniBatchFraction: Double, + initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = { + return trainRegressionModel((data, initialWeights) => + LassoWithSGD.train(data, numIterations, stepSize, regParam, + miniBatchFraction, initialWeights), + dataBytesJRDD, initialWeightsBA) + } + + /** + * Java stub for Python mllib RidgeRegressionWithSGD.train() + */ + def trainRidgeModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int, + stepSize: Double, regParam: Double, miniBatchFraction: Double, + initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = { + return trainRegressionModel((data, initialWeights) => + RidgeRegressionWithSGD.train(data, numIterations, stepSize, regParam, + miniBatchFraction, initialWeights), + dataBytesJRDD, initialWeightsBA) + } + + /** + * Java stub for Python mllib SVMWithSGD.train() + */ + def trainSVMModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int, + stepSize: Double, regParam: Double, miniBatchFraction: Double, + initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = { + return trainRegressionModel((data, initialWeights) => + SVMWithSGD.train(data, numIterations, stepSize, regParam, + miniBatchFraction, initialWeights), + dataBytesJRDD, initialWeightsBA) + } + + /** + * Java stub for Python mllib LogisticRegressionWithSGD.train() + */ + def trainLogisticRegressionModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], + numIterations: Int, stepSize: Double, miniBatchFraction: Double, + initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = { + return trainRegressionModel((data, initialWeights) => + LogisticRegressionWithSGD.train(data, numIterations, stepSize, + miniBatchFraction, initialWeights), + dataBytesJRDD, initialWeightsBA) + } + + /** + * Java stub for Python mllib KMeans.train() + */ + def trainKMeansModel(dataBytesJRDD: JavaRDD[Array[Byte]], k: Int, + maxIterations: Int, runs: Int, initializationMode: String): + java.util.List[java.lang.Object] = { + val data = dataBytesJRDD.rdd.map(xBytes => deserializeDoubleVector(xBytes)) + val model = KMeans.train(data, k, maxIterations, runs, initializationMode) + val ret = new java.util.LinkedList[java.lang.Object]() + ret.add(serializeDoubleMatrix(model.clusterCenters)) + return ret + } + + /** Unpack a Rating object from an array of bytes */ + private def unpackRating(ratingBytes: Array[Byte]): Rating = { + val bb = ByteBuffer.wrap(ratingBytes) + bb.order(ByteOrder.nativeOrder()) + val user = bb.getInt() + val product = bb.getInt() + val rating = bb.getDouble() + return new Rating(user, product, rating) + } + + /** Unpack a tuple of Ints from an array of bytes */ + private[spark] def unpackTuple(tupleBytes: Array[Byte]): (Int, Int) = { + val bb = ByteBuffer.wrap(tupleBytes) + bb.order(ByteOrder.nativeOrder()) + val v1 = bb.getInt() + val v2 = bb.getInt() + (v1, v2) + } + + /** + * Serialize a Rating object into an array of bytes. + * It can be deserialized using RatingDeserializer(). + * + * @param rate + * @return + */ + private[spark] def serializeRating(rate: Rating): Array[Byte] = { + val len = 3 + val bytes = new Array[Byte](4 + 8 * len) + val bb = ByteBuffer.wrap(bytes) + bb.order(ByteOrder.nativeOrder()) + bb.putInt(len) + val db = bb.asDoubleBuffer() + db.put(rate.user.toDouble) + db.put(rate.product.toDouble) + db.put(rate.rating) + bytes + } + + /** + * Java stub for Python mllib ALS.train(). This stub returns a handle + * to the Java object instead of the content of the Java object. Extra care + * needs to be taken in the Python code to ensure it gets freed on exit; see + * the Py4J documentation. + */ + def trainALSModel(ratingsBytesJRDD: JavaRDD[Array[Byte]], rank: Int, + iterations: Int, lambda: Double, blocks: Int): MatrixFactorizationModel = { + val ratings = ratingsBytesJRDD.rdd.map(unpackRating) + return ALS.train(ratings, rank, iterations, lambda, blocks) + } + + /** + * Java stub for Python mllib ALS.trainImplicit(). This stub returns a + * handle to the Java object instead of the content of the Java object. + * Extra care needs to be taken in the Python code to ensure it gets freed on + * exit; see the Py4J documentation. + */ + def trainImplicitALSModel(ratingsBytesJRDD: JavaRDD[Array[Byte]], rank: Int, + iterations: Int, lambda: Double, blocks: Int, alpha: Double): MatrixFactorizationModel = { + val ratings = ratingsBytesJRDD.rdd.map(unpackRating) + return ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala new file mode 100644 index 0000000000..524300d6ae --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.classification + +import scala.collection.mutable + +import org.jblas.DoubleMatrix + +import org.apache.spark.Logging +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.rdd.RDD + +/** + * Model for Naive Bayes Classifiers. + * + * @param pi Log of class priors, whose dimension is C. + * @param theta Log of class conditional probabilities, whose dimension is CXD. + */ +class NaiveBayesModel(pi: Array[Double], theta: Array[Array[Double]]) + extends ClassificationModel with Serializable { + + // Create a column vector that can be used for predictions + private val _pi = new DoubleMatrix(pi.length, 1, pi: _*) + private val _theta = new DoubleMatrix(theta) + + def predict(testData: RDD[Array[Double]]): RDD[Double] = testData.map(predict) + + def predict(testData: Array[Double]): Double = { + val dataMatrix = new DoubleMatrix(testData.length, 1, testData: _*) + val result = _pi.add(_theta.mmul(dataMatrix)) + result.argmax() + } +} + +/** + * Trains a Naive Bayes model given an RDD of `(label, features)` pairs. + * + * @param lambda The smooth parameter + */ +class NaiveBayes private (val lambda: Double = 1.0) + extends Serializable with Logging { + + /** + * Run the algorithm with the configured parameters on an input RDD of LabeledPoint entries. + * + * @param data RDD of (label, array of features) pairs. + */ + def run(data: RDD[LabeledPoint]) = { + // Aggregates all sample points to driver side to get sample count and summed feature vector + // for each label. The shape of `zeroCombiner` & `aggregated` is: + // + // label: Int -> (count: Int, featuresSum: DoubleMatrix) + val zeroCombiner = mutable.Map.empty[Int, (Int, DoubleMatrix)] + val aggregated = data.aggregate(zeroCombiner)({ (combiner, point) => + point match { + case LabeledPoint(label, features) => + val (count, featuresSum) = combiner.getOrElse(label.toInt, (0, DoubleMatrix.zeros(1))) + val fs = new DoubleMatrix(features.length, 1, features: _*) + combiner += label.toInt -> (count + 1, featuresSum.addi(fs)) + } + }, { (lhs, rhs) => + for ((label, (c, fs)) <- rhs) { + val (count, featuresSum) = lhs.getOrElse(label, (0, DoubleMatrix.zeros(1))) + lhs(label) = (count + c, featuresSum.addi(fs)) + } + lhs + }) + + // Kinds of label + val C = aggregated.size + // Total sample count + val N = aggregated.values.map(_._1).sum + + val pi = new Array[Double](C) + val theta = new Array[Array[Double]](C) + val piLogDenom = math.log(N + C * lambda) + + for ((label, (count, fs)) <- aggregated) { + val thetaLogDenom = math.log(fs.sum() + fs.length * lambda) + pi(label) = math.log(count + lambda) - piLogDenom + theta(label) = fs.toArray.map(f => math.log(f + lambda) - thetaLogDenom) + } + + new NaiveBayesModel(pi, theta) + } +} + +object NaiveBayes { + /** + * Trains a Naive Bayes model given an RDD of `(label, features)` pairs. + * + * This is the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of + * discrete data. For example, by converting documents into TF-IDF vectors, it can be used for + * document classification. By making every vector a 0-1 vector. it can also be used as + * Bernoulli NB ([[http://tinyurl.com/p7c96j6]]). + * + * @param input RDD of `(label, array of features)` pairs. Every vector should be a frequency + * vector or a count vector. + * @param lambda The smooth parameter + */ + def train(input: RDD[LabeledPoint], lambda: Double = 1.0): NaiveBayesModel = { + new NaiveBayes(lambda).run(input) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala index 749e7364f4..c590492e7a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala @@ -50,8 +50,8 @@ class LogisticGradient extends Gradient { val gradient = data.mul(gradientMultiplier) val loss = - if (margin > 0) { - math.log(1 + math.exp(0 - margin)) + if (label > 0) { + math.log(1 + math.exp(margin)) } else { math.log(1 + math.exp(margin)) - margin } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 36853acab5..8b27ecf82c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -578,14 +578,13 @@ object ALS { val implicitPrefs = if (args.length >= 7) args(6).toBoolean else false val alpha = if (args.length >= 8) args(7).toDouble else 1 val blocks = if (args.length == 9) args(8).toInt else -1 - - System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - System.setProperty("spark.kryo.registrator", classOf[ALSRegistrator].getName) - System.setProperty("spark.kryo.referenceTracking", "false") - System.setProperty("spark.kryoserializer.buffer.mb", "8") - System.setProperty("spark.locality.wait", "10000") - val sc = new SparkContext(master, "ALS") + sc.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + sc.conf.set("spark.kryo.registrator", classOf[ALSRegistrator].getName) + sc.conf.set("spark.kryo.referenceTracking", "false") + sc.conf.set("spark.kryoserializer.buffer.mb", "8") + sc.conf.set("spark.locality.wait", "10000") + val ratings = sc.textFile(ratingsFile).map { line => val fields = line.split(',') Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index af43d89c70..443fc5de5b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -19,8 +19,11 @@ package org.apache.spark.mllib.recommendation import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.api.python.PythonMLLibAPI import org.jblas._ +import org.apache.spark.api.java.JavaRDD + /** * Model representing the result of matrix factorization. @@ -44,6 +47,39 @@ class MatrixFactorizationModel( userVector.dot(productVector) } - // TODO: Figure out what good bulk prediction methods would look like. + /** + * Predict the rating of many users for many products. + * The output RDD has an element per each element in the input RDD (including all duplicates) + * unless a user or product is missing in the training set. + * + * @param usersProducts RDD of (user, product) pairs. + * @return RDD of Ratings. + */ + def predict(usersProducts: RDD[(Int, Int)]): RDD[Rating] = { + val users = userFeatures.join(usersProducts).map{ + case (user, (uFeatures, product)) => (product, (user, uFeatures)) + } + users.join(productFeatures).map { + case (product, ((user, uFeatures), pFeatures)) => + val userVector = new DoubleMatrix(uFeatures) + val productVector = new DoubleMatrix(pFeatures) + Rating(user, product, userVector.dot(productVector)) + } + } + + /** + * Predict the rating of many users for many products. + * This is a Java stub for python predictAll() + * + * @param usersProductsJRDD A JavaRDD with serialized tuples (user, product) + * @return JavaRDD of serialized Rating objects. + */ + def predict(usersProductsJRDD: JavaRDD[Array[Byte]]): JavaRDD[Array[Byte]] = { + val pythonAPI = new PythonMLLibAPI() + val usersProducts = usersProductsJRDD.rdd.map(xBytes => pythonAPI.unpackTuple(xBytes)) + predict(usersProducts).map(rate => pythonAPI.serializeRating(rate)) + } + + // TODO: Figure out what other good bulk prediction methods would look like. // Probably want a way to get the top users for a product or vice-versa. } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala index 5aec867257..d5f3f6b8db 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala @@ -83,7 +83,7 @@ object MFDataGenerator{ scala.math.round(.99 * m * n)).toInt val rand = new Random() val mn = m * n - val shuffled = rand.shuffle(1 to mn toIterable) + val shuffled = rand.shuffle(1 to mn toList) val omega = shuffled.slice(0, sampSize) val ordered = omega.sortWith(_ < _).toArray diff --git a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java index 32d3934ac1..33b99f4bd3 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java @@ -77,7 +77,7 @@ public class JavaKMeansSuite implements Serializable { @Test public void runKMeansUsingStaticMethods() { - List<double[]> points = new ArrayList(); + List<double[]> points = new ArrayList<double[]>(); points.add(new double[]{1.0, 2.0, 6.0}); points.add(new double[]{1.0, 3.0, 0.0}); points.add(new double[]{1.0, 4.0, 6.0}); @@ -94,7 +94,7 @@ public class JavaKMeansSuite implements Serializable { @Test public void runKMeansUsingConstructor() { - List<double[]> points = new ArrayList(); + List<double[]> points = new ArrayList<double[]>(); points.add(new double[]{1.0, 2.0, 6.0}); points.add(new double[]{1.0, 3.0, 0.0}); points.add(new double[]{1.0, 4.0, 6.0}); diff --git a/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java b/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java index eafee060cd..b40f552e0d 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java @@ -21,8 +21,6 @@ import java.io.Serializable; import java.util.List; import java.lang.Math; -import scala.Tuple2; - import org.junit.After; import org.junit.Assert; import org.junit.Before; diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala new file mode 100644 index 0000000000..b615f76e66 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.classification + +import scala.util.Random + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.FunSuite + +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.SparkContext + +object NaiveBayesSuite { + + private def calcLabel(p: Double, pi: Array[Double]): Int = { + var sum = 0.0 + for (j <- 0 until pi.length) { + sum += pi(j) + if (p < sum) return j + } + -1 + } + + // Generate input of the form Y = (theta * x).argmax() + def generateNaiveBayesInput( + pi: Array[Double], // 1XC + theta: Array[Array[Double]], // CXD + nPoints: Int, + seed: Int): Seq[LabeledPoint] = { + val D = theta(0).length + val rnd = new Random(seed) + + val _pi = pi.map(math.pow(math.E, _)) + val _theta = theta.map(row => row.map(math.pow(math.E, _))) + + for (i <- 0 until nPoints) yield { + val y = calcLabel(rnd.nextDouble(), _pi) + val xi = Array.tabulate[Double](D) { j => + if (rnd.nextDouble() < _theta(y)(j)) 1 else 0 + } + + LabeledPoint(y, xi) + } + } +} + +class NaiveBayesSuite extends FunSuite with BeforeAndAfterAll { + @transient private var sc: SparkContext = _ + + override def beforeAll() { + sc = new SparkContext("local", "test") + } + + override def afterAll() { + sc.stop() + System.clearProperty("spark.driver.port") + } + + def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) { + val numOfPredictions = predictions.zip(input).count { + case (prediction, expected) => + prediction != expected.label + } + // At least 80% of the predictions should be on. + assert(numOfPredictions < input.length / 5) + } + + test("Naive Bayes") { + val nPoints = 10000 + + val pi = Array(0.5, 0.3, 0.2).map(math.log) + val theta = Array( + Array(0.91, 0.03, 0.03, 0.03), // label 0 + Array(0.03, 0.91, 0.03, 0.03), // label 1 + Array(0.03, 0.03, 0.91, 0.03) // label 2 + ).map(_.map(math.log)) + + val testData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 42) + val testRDD = sc.parallelize(testData, 2) + testRDD.cache() + + val model = NaiveBayes.train(testRDD) + + val validationData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 17) + val validationRDD = sc.parallelize(validationData, 2) + + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala new file mode 100644 index 0000000000..a6028a1e98 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.optimization + +import scala.util.Random +import scala.collection.JavaConversions._ + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.FunSuite +import org.scalatest.matchers.ShouldMatchers + +import org.apache.spark.SparkContext +import org.apache.spark.mllib.regression._ + +object GradientDescentSuite { + + def generateLogisticInputAsList( + offset: Double, + scale: Double, + nPoints: Int, + seed: Int): java.util.List[LabeledPoint] = { + seqAsJavaList(generateGDInput(offset, scale, nPoints, seed)) + } + + // Generate input of the form Y = logistic(offset + scale * X) + def generateGDInput( + offset: Double, + scale: Double, + nPoints: Int, + seed: Int): Seq[LabeledPoint] = { + val rnd = new Random(seed) + val x1 = Array.fill[Double](nPoints)(rnd.nextGaussian()) + + val unifRand = new scala.util.Random(45) + val rLogis = (0 until nPoints).map { i => + val u = unifRand.nextDouble() + math.log(u) - math.log(1.0-u) + } + + val y: Seq[Int] = (0 until nPoints).map { i => + val yVal = offset + scale * x1(i) + rLogis(i) + if (yVal > 0) 1 else 0 + } + + val testData = (0 until nPoints).map(i => LabeledPoint(y(i), Array(x1(i)))) + testData + } +} + +class GradientDescentSuite extends FunSuite with BeforeAndAfterAll with ShouldMatchers { + @transient private var sc: SparkContext = _ + + override def beforeAll() { + sc = new SparkContext("local", "test") + } + + override def afterAll() { + sc.stop() + System.clearProperty("spark.driver.port") + } + + test("Assert the loss is decreasing.") { + val nPoints = 10000 + val A = 2.0 + val B = -1.5 + + val initialB = -1.0 + val initialWeights = Array(initialB) + + val gradient = new LogisticGradient() + val updater = new SimpleUpdater() + val stepSize = 1.0 + val numIterations = 10 + val regParam = 0 + val miniBatchFrac = 1.0 + + // Add a extra variable consisting of all 1.0's for the intercept. + val testData = GradientDescentSuite.generateGDInput(A, B, nPoints, 42) + val data = testData.map { case LabeledPoint(label, features) => + label -> Array(1.0, features: _*) + } + + val dataRDD = sc.parallelize(data, 2).cache() + val initialWeightsWithIntercept = Array(1.0, initialWeights: _*) + + val (_, loss) = GradientDescent.runMiniBatchSGD( + dataRDD, + gradient, + updater, + stepSize, + numIterations, + regParam, + miniBatchFrac, + initialWeightsWithIntercept) + + assert(loss.last - loss.head < 0, "loss isn't decreasing.") + + val lossDiff = loss.init.zip(loss.tail).map { case (lhs, rhs) => lhs - rhs } + assert(lossDiff.count(_ > 0).toDouble / lossDiff.size > 0.8) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala index fafc5ec5f2..e683a90f57 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala @@ -90,18 +90,34 @@ class ALSSuite extends FunSuite with BeforeAndAfterAll { testALS(50, 100, 1, 15, 0.7, 0.3) } + test("rank-1 matrices bulk") { + testALS(50, 100, 1, 15, 0.7, 0.3, false, true) + } + test("rank-2 matrices") { testALS(100, 200, 2, 15, 0.7, 0.3) } + test("rank-2 matrices bulk") { + testALS(100, 200, 2, 15, 0.7, 0.3, false, true) + } + test("rank-1 matrices implicit") { testALS(80, 160, 1, 15, 0.7, 0.4, true) } + test("rank-1 matrices implicit bulk") { + testALS(80, 160, 1, 15, 0.7, 0.4, true, true) + } + test("rank-2 matrices implicit") { testALS(100, 200, 2, 15, 0.7, 0.4, true) } + test("rank-2 matrices implicit bulk") { + testALS(100, 200, 2, 15, 0.7, 0.4, true, true) + } + /** * Test if we can correctly factorize R = U * P where U and P are of known rank. * @@ -111,9 +127,12 @@ class ALSSuite extends FunSuite with BeforeAndAfterAll { * @param iterations number of iterations to run * @param samplingRate what fraction of the user-product pairs are known * @param matchThreshold max difference allowed to consider a predicted rating correct + * @param implicitPrefs flag to test implicit feedback + * @param bulkPredict flag to test bulk prediciton */ def testALS(users: Int, products: Int, features: Int, iterations: Int, - samplingRate: Double, matchThreshold: Double, implicitPrefs: Boolean = false) + samplingRate: Double, matchThreshold: Double, implicitPrefs: Boolean = false, + bulkPredict: Boolean = false) { val (sampledRatings, trueRatings, truePrefs) = ALSSuite.generateRatings(users, products, features, samplingRate, implicitPrefs) @@ -130,7 +149,17 @@ class ALSSuite extends FunSuite with BeforeAndAfterAll { for ((p, vec) <- model.productFeatures.collect(); i <- 0 until features) { predictedP.put(p, i, vec(i)) } - val predictedRatings = predictedU.mmul(predictedP.transpose) + val predictedRatings = bulkPredict match { + case false => predictedU.mmul(predictedP.transpose) + case true => + val allRatings = new DoubleMatrix(users, products) + val usersProducts = for (u <- 0 until users; p <- 0 until products) yield (u, p) + val userProductsRDD = sc.parallelize(usersProducts) + model.predict(userProductsRDD).collect().foreach { elem => + allRatings.put(elem.user, elem.product, elem.rating) + } + allRatings + } if (!implicitPrefs) { for (u <- 0 until users; p <- 0 until products) { |