aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
Diffstat (limited to 'mllib')
-rw-r--r--mllib/pom.xml12
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala262
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala119
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala4
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala13
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala38
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala2
-rw-r--r--mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java4
-rw-r--r--mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java2
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala108
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala116
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala33
12 files changed, 690 insertions, 23 deletions
diff --git a/mllib/pom.xml b/mllib/pom.xml
index f472082ad1..dda3900afe 100644
--- a/mllib/pom.xml
+++ b/mllib/pom.xml
@@ -26,7 +26,7 @@
</parent>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-mllib_2.9.3</artifactId>
+ <artifactId>spark-mllib_2.10</artifactId>
<packaging>jar</packaging>
<name>Spark Project ML Library</name>
<url>http://spark.incubator.apache.org/</url>
@@ -34,7 +34,7 @@
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.9.3</artifactId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
@@ -48,12 +48,12 @@
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
- <artifactId>scalatest_2.9.3</artifactId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
- <artifactId>scalacheck_2.9.3</artifactId>
+ <artifactId>scalacheck_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
@@ -63,8 +63,8 @@
</dependency>
</dependencies>
<build>
- <outputDirectory>target/scala-${scala.version}/classes</outputDirectory>
- <testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.scalatest</groupId>
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
new file mode 100644
index 0000000000..2d8623392e
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.api.python
+import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.mllib.regression._
+import org.apache.spark.mllib.classification._
+import org.apache.spark.mllib.clustering._
+import org.apache.spark.mllib.recommendation._
+import org.apache.spark.rdd.RDD
+import java.nio.ByteBuffer
+import java.nio.ByteOrder
+import java.nio.DoubleBuffer
+
+/**
+ * The Java stubs necessary for the Python mllib bindings.
+ */
+class PythonMLLibAPI extends Serializable {
+ private def deserializeDoubleVector(bytes: Array[Byte]): Array[Double] = {
+ val packetLength = bytes.length
+ if (packetLength < 16) {
+ throw new IllegalArgumentException("Byte array too short.")
+ }
+ val bb = ByteBuffer.wrap(bytes)
+ bb.order(ByteOrder.nativeOrder())
+ val magic = bb.getLong()
+ if (magic != 1) {
+ throw new IllegalArgumentException("Magic " + magic + " is wrong.")
+ }
+ val length = bb.getLong()
+ if (packetLength != 16 + 8 * length) {
+ throw new IllegalArgumentException("Length " + length + " is wrong.")
+ }
+ val db = bb.asDoubleBuffer()
+ val ans = new Array[Double](length.toInt)
+ db.get(ans)
+ return ans
+ }
+
+ private def serializeDoubleVector(doubles: Array[Double]): Array[Byte] = {
+ val len = doubles.length
+ val bytes = new Array[Byte](16 + 8 * len)
+ val bb = ByteBuffer.wrap(bytes)
+ bb.order(ByteOrder.nativeOrder())
+ bb.putLong(1)
+ bb.putLong(len)
+ val db = bb.asDoubleBuffer()
+ db.put(doubles)
+ return bytes
+ }
+
+ private def deserializeDoubleMatrix(bytes: Array[Byte]): Array[Array[Double]] = {
+ val packetLength = bytes.length
+ if (packetLength < 24) {
+ throw new IllegalArgumentException("Byte array too short.")
+ }
+ val bb = ByteBuffer.wrap(bytes)
+ bb.order(ByteOrder.nativeOrder())
+ val magic = bb.getLong()
+ if (magic != 2) {
+ throw new IllegalArgumentException("Magic " + magic + " is wrong.")
+ }
+ val rows = bb.getLong()
+ val cols = bb.getLong()
+ if (packetLength != 24 + 8 * rows * cols) {
+ throw new IllegalArgumentException("Size " + rows + "x" + cols + " is wrong.")
+ }
+ val db = bb.asDoubleBuffer()
+ val ans = new Array[Array[Double]](rows.toInt)
+ var i = 0
+ for (i <- 0 until rows.toInt) {
+ ans(i) = new Array[Double](cols.toInt)
+ db.get(ans(i))
+ }
+ return ans
+ }
+
+ private def serializeDoubleMatrix(doubles: Array[Array[Double]]): Array[Byte] = {
+ val rows = doubles.length
+ var cols = 0
+ if (rows > 0) {
+ cols = doubles(0).length
+ }
+ val bytes = new Array[Byte](24 + 8 * rows * cols)
+ val bb = ByteBuffer.wrap(bytes)
+ bb.order(ByteOrder.nativeOrder())
+ bb.putLong(2)
+ bb.putLong(rows)
+ bb.putLong(cols)
+ val db = bb.asDoubleBuffer()
+ var i = 0
+ for (i <- 0 until rows) {
+ db.put(doubles(i))
+ }
+ return bytes
+ }
+
+ private def trainRegressionModel(trainFunc: (RDD[LabeledPoint], Array[Double]) => GeneralizedLinearModel,
+ dataBytesJRDD: JavaRDD[Array[Byte]], initialWeightsBA: Array[Byte]):
+ java.util.LinkedList[java.lang.Object] = {
+ val data = dataBytesJRDD.rdd.map(xBytes => {
+ val x = deserializeDoubleVector(xBytes)
+ LabeledPoint(x(0), x.slice(1, x.length))
+ })
+ val initialWeights = deserializeDoubleVector(initialWeightsBA)
+ val model = trainFunc(data, initialWeights)
+ val ret = new java.util.LinkedList[java.lang.Object]()
+ ret.add(serializeDoubleVector(model.weights))
+ ret.add(model.intercept: java.lang.Double)
+ return ret
+ }
+
+ /**
+ * Java stub for Python mllib LinearRegressionWithSGD.train()
+ */
+ def trainLinearRegressionModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]],
+ numIterations: Int, stepSize: Double, miniBatchFraction: Double,
+ initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
+ return trainRegressionModel((data, initialWeights) =>
+ LinearRegressionWithSGD.train(data, numIterations, stepSize,
+ miniBatchFraction, initialWeights),
+ dataBytesJRDD, initialWeightsBA)
+ }
+
+ /**
+ * Java stub for Python mllib LassoWithSGD.train()
+ */
+ def trainLassoModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
+ stepSize: Double, regParam: Double, miniBatchFraction: Double,
+ initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
+ return trainRegressionModel((data, initialWeights) =>
+ LassoWithSGD.train(data, numIterations, stepSize, regParam,
+ miniBatchFraction, initialWeights),
+ dataBytesJRDD, initialWeightsBA)
+ }
+
+ /**
+ * Java stub for Python mllib RidgeRegressionWithSGD.train()
+ */
+ def trainRidgeModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
+ stepSize: Double, regParam: Double, miniBatchFraction: Double,
+ initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
+ return trainRegressionModel((data, initialWeights) =>
+ RidgeRegressionWithSGD.train(data, numIterations, stepSize, regParam,
+ miniBatchFraction, initialWeights),
+ dataBytesJRDD, initialWeightsBA)
+ }
+
+ /**
+ * Java stub for Python mllib SVMWithSGD.train()
+ */
+ def trainSVMModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
+ stepSize: Double, regParam: Double, miniBatchFraction: Double,
+ initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
+ return trainRegressionModel((data, initialWeights) =>
+ SVMWithSGD.train(data, numIterations, stepSize, regParam,
+ miniBatchFraction, initialWeights),
+ dataBytesJRDD, initialWeightsBA)
+ }
+
+ /**
+ * Java stub for Python mllib LogisticRegressionWithSGD.train()
+ */
+ def trainLogisticRegressionModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]],
+ numIterations: Int, stepSize: Double, miniBatchFraction: Double,
+ initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
+ return trainRegressionModel((data, initialWeights) =>
+ LogisticRegressionWithSGD.train(data, numIterations, stepSize,
+ miniBatchFraction, initialWeights),
+ dataBytesJRDD, initialWeightsBA)
+ }
+
+ /**
+ * Java stub for Python mllib KMeans.train()
+ */
+ def trainKMeansModel(dataBytesJRDD: JavaRDD[Array[Byte]], k: Int,
+ maxIterations: Int, runs: Int, initializationMode: String):
+ java.util.List[java.lang.Object] = {
+ val data = dataBytesJRDD.rdd.map(xBytes => deserializeDoubleVector(xBytes))
+ val model = KMeans.train(data, k, maxIterations, runs, initializationMode)
+ val ret = new java.util.LinkedList[java.lang.Object]()
+ ret.add(serializeDoubleMatrix(model.clusterCenters))
+ return ret
+ }
+
+ /** Unpack a Rating object from an array of bytes */
+ private def unpackRating(ratingBytes: Array[Byte]): Rating = {
+ val bb = ByteBuffer.wrap(ratingBytes)
+ bb.order(ByteOrder.nativeOrder())
+ val user = bb.getInt()
+ val product = bb.getInt()
+ val rating = bb.getDouble()
+ return new Rating(user, product, rating)
+ }
+
+ /** Unpack a tuple of Ints from an array of bytes */
+ private[spark] def unpackTuple(tupleBytes: Array[Byte]): (Int, Int) = {
+ val bb = ByteBuffer.wrap(tupleBytes)
+ bb.order(ByteOrder.nativeOrder())
+ val v1 = bb.getInt()
+ val v2 = bb.getInt()
+ (v1, v2)
+ }
+
+ /**
+ * Serialize a Rating object into an array of bytes.
+ * It can be deserialized using RatingDeserializer().
+ *
+ * @param rate
+ * @return
+ */
+ private[spark] def serializeRating(rate: Rating): Array[Byte] = {
+ val len = 3
+ val bytes = new Array[Byte](4 + 8 * len)
+ val bb = ByteBuffer.wrap(bytes)
+ bb.order(ByteOrder.nativeOrder())
+ bb.putInt(len)
+ val db = bb.asDoubleBuffer()
+ db.put(rate.user.toDouble)
+ db.put(rate.product.toDouble)
+ db.put(rate.rating)
+ bytes
+ }
+
+ /**
+ * Java stub for Python mllib ALS.train(). This stub returns a handle
+ * to the Java object instead of the content of the Java object. Extra care
+ * needs to be taken in the Python code to ensure it gets freed on exit; see
+ * the Py4J documentation.
+ */
+ def trainALSModel(ratingsBytesJRDD: JavaRDD[Array[Byte]], rank: Int,
+ iterations: Int, lambda: Double, blocks: Int): MatrixFactorizationModel = {
+ val ratings = ratingsBytesJRDD.rdd.map(unpackRating)
+ return ALS.train(ratings, rank, iterations, lambda, blocks)
+ }
+
+ /**
+ * Java stub for Python mllib ALS.trainImplicit(). This stub returns a
+ * handle to the Java object instead of the content of the Java object.
+ * Extra care needs to be taken in the Python code to ensure it gets freed on
+ * exit; see the Py4J documentation.
+ */
+ def trainImplicitALSModel(ratingsBytesJRDD: JavaRDD[Array[Byte]], rank: Int,
+ iterations: Int, lambda: Double, blocks: Int, alpha: Double): MatrixFactorizationModel = {
+ val ratings = ratingsBytesJRDD.rdd.map(unpackRating)
+ return ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha)
+ }
+}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
new file mode 100644
index 0000000000..524300d6ae
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.classification
+
+import scala.collection.mutable
+
+import org.jblas.DoubleMatrix
+
+import org.apache.spark.Logging
+import org.apache.spark.mllib.regression.LabeledPoint
+import org.apache.spark.rdd.RDD
+
+/**
+ * Model for Naive Bayes Classifiers.
+ *
+ * @param pi Log of class priors, whose dimension is C.
+ * @param theta Log of class conditional probabilities, whose dimension is CXD.
+ */
+class NaiveBayesModel(pi: Array[Double], theta: Array[Array[Double]])
+ extends ClassificationModel with Serializable {
+
+ // Create a column vector that can be used for predictions
+ private val _pi = new DoubleMatrix(pi.length, 1, pi: _*)
+ private val _theta = new DoubleMatrix(theta)
+
+ def predict(testData: RDD[Array[Double]]): RDD[Double] = testData.map(predict)
+
+ def predict(testData: Array[Double]): Double = {
+ val dataMatrix = new DoubleMatrix(testData.length, 1, testData: _*)
+ val result = _pi.add(_theta.mmul(dataMatrix))
+ result.argmax()
+ }
+}
+
+/**
+ * Trains a Naive Bayes model given an RDD of `(label, features)` pairs.
+ *
+ * @param lambda The smooth parameter
+ */
+class NaiveBayes private (val lambda: Double = 1.0)
+ extends Serializable with Logging {
+
+ /**
+ * Run the algorithm with the configured parameters on an input RDD of LabeledPoint entries.
+ *
+ * @param data RDD of (label, array of features) pairs.
+ */
+ def run(data: RDD[LabeledPoint]) = {
+ // Aggregates all sample points to driver side to get sample count and summed feature vector
+ // for each label. The shape of `zeroCombiner` & `aggregated` is:
+ //
+ // label: Int -> (count: Int, featuresSum: DoubleMatrix)
+ val zeroCombiner = mutable.Map.empty[Int, (Int, DoubleMatrix)]
+ val aggregated = data.aggregate(zeroCombiner)({ (combiner, point) =>
+ point match {
+ case LabeledPoint(label, features) =>
+ val (count, featuresSum) = combiner.getOrElse(label.toInt, (0, DoubleMatrix.zeros(1)))
+ val fs = new DoubleMatrix(features.length, 1, features: _*)
+ combiner += label.toInt -> (count + 1, featuresSum.addi(fs))
+ }
+ }, { (lhs, rhs) =>
+ for ((label, (c, fs)) <- rhs) {
+ val (count, featuresSum) = lhs.getOrElse(label, (0, DoubleMatrix.zeros(1)))
+ lhs(label) = (count + c, featuresSum.addi(fs))
+ }
+ lhs
+ })
+
+ // Kinds of label
+ val C = aggregated.size
+ // Total sample count
+ val N = aggregated.values.map(_._1).sum
+
+ val pi = new Array[Double](C)
+ val theta = new Array[Array[Double]](C)
+ val piLogDenom = math.log(N + C * lambda)
+
+ for ((label, (count, fs)) <- aggregated) {
+ val thetaLogDenom = math.log(fs.sum() + fs.length * lambda)
+ pi(label) = math.log(count + lambda) - piLogDenom
+ theta(label) = fs.toArray.map(f => math.log(f + lambda) - thetaLogDenom)
+ }
+
+ new NaiveBayesModel(pi, theta)
+ }
+}
+
+object NaiveBayes {
+ /**
+ * Trains a Naive Bayes model given an RDD of `(label, features)` pairs.
+ *
+ * This is the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of
+ * discrete data. For example, by converting documents into TF-IDF vectors, it can be used for
+ * document classification. By making every vector a 0-1 vector. it can also be used as
+ * Bernoulli NB ([[http://tinyurl.com/p7c96j6]]).
+ *
+ * @param input RDD of `(label, array of features)` pairs. Every vector should be a frequency
+ * vector or a count vector.
+ * @param lambda The smooth parameter
+ */
+ def train(input: RDD[LabeledPoint], lambda: Double = 1.0): NaiveBayesModel = {
+ new NaiveBayes(lambda).run(input)
+ }
+}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala
index 749e7364f4..c590492e7a 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala
@@ -50,8 +50,8 @@ class LogisticGradient extends Gradient {
val gradient = data.mul(gradientMultiplier)
val loss =
- if (margin > 0) {
- math.log(1 + math.exp(0 - margin))
+ if (label > 0) {
+ math.log(1 + math.exp(margin))
} else {
math.log(1 + math.exp(margin)) - margin
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
index 36853acab5..8b27ecf82c 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
@@ -578,14 +578,13 @@ object ALS {
val implicitPrefs = if (args.length >= 7) args(6).toBoolean else false
val alpha = if (args.length >= 8) args(7).toDouble else 1
val blocks = if (args.length == 9) args(8).toInt else -1
-
- System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- System.setProperty("spark.kryo.registrator", classOf[ALSRegistrator].getName)
- System.setProperty("spark.kryo.referenceTracking", "false")
- System.setProperty("spark.kryoserializer.buffer.mb", "8")
- System.setProperty("spark.locality.wait", "10000")
-
val sc = new SparkContext(master, "ALS")
+ sc.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ sc.conf.set("spark.kryo.registrator", classOf[ALSRegistrator].getName)
+ sc.conf.set("spark.kryo.referenceTracking", "false")
+ sc.conf.set("spark.kryoserializer.buffer.mb", "8")
+ sc.conf.set("spark.locality.wait", "10000")
+
val ratings = sc.textFile(ratingsFile).map { line =>
val fields = line.split(',')
Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
index af43d89c70..443fc5de5b 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
@@ -19,8 +19,11 @@ package org.apache.spark.mllib.recommendation
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.api.python.PythonMLLibAPI
import org.jblas._
+import org.apache.spark.api.java.JavaRDD
+
/**
* Model representing the result of matrix factorization.
@@ -44,6 +47,39 @@ class MatrixFactorizationModel(
userVector.dot(productVector)
}
- // TODO: Figure out what good bulk prediction methods would look like.
+ /**
+ * Predict the rating of many users for many products.
+ * The output RDD has an element per each element in the input RDD (including all duplicates)
+ * unless a user or product is missing in the training set.
+ *
+ * @param usersProducts RDD of (user, product) pairs.
+ * @return RDD of Ratings.
+ */
+ def predict(usersProducts: RDD[(Int, Int)]): RDD[Rating] = {
+ val users = userFeatures.join(usersProducts).map{
+ case (user, (uFeatures, product)) => (product, (user, uFeatures))
+ }
+ users.join(productFeatures).map {
+ case (product, ((user, uFeatures), pFeatures)) =>
+ val userVector = new DoubleMatrix(uFeatures)
+ val productVector = new DoubleMatrix(pFeatures)
+ Rating(user, product, userVector.dot(productVector))
+ }
+ }
+
+ /**
+ * Predict the rating of many users for many products.
+ * This is a Java stub for python predictAll()
+ *
+ * @param usersProductsJRDD A JavaRDD with serialized tuples (user, product)
+ * @return JavaRDD of serialized Rating objects.
+ */
+ def predict(usersProductsJRDD: JavaRDD[Array[Byte]]): JavaRDD[Array[Byte]] = {
+ val pythonAPI = new PythonMLLibAPI()
+ val usersProducts = usersProductsJRDD.rdd.map(xBytes => pythonAPI.unpackTuple(xBytes))
+ predict(usersProducts).map(rate => pythonAPI.serializeRating(rate))
+ }
+
+ // TODO: Figure out what other good bulk prediction methods would look like.
// Probably want a way to get the top users for a product or vice-versa.
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala
index 5aec867257..d5f3f6b8db 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala
@@ -83,7 +83,7 @@ object MFDataGenerator{
scala.math.round(.99 * m * n)).toInt
val rand = new Random()
val mn = m * n
- val shuffled = rand.shuffle(1 to mn toIterable)
+ val shuffled = rand.shuffle(1 to mn toList)
val omega = shuffled.slice(0, sampSize)
val ordered = omega.sortWith(_ < _).toArray
diff --git a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java
index 32d3934ac1..33b99f4bd3 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java
@@ -77,7 +77,7 @@ public class JavaKMeansSuite implements Serializable {
@Test
public void runKMeansUsingStaticMethods() {
- List<double[]> points = new ArrayList();
+ List<double[]> points = new ArrayList<double[]>();
points.add(new double[]{1.0, 2.0, 6.0});
points.add(new double[]{1.0, 3.0, 0.0});
points.add(new double[]{1.0, 4.0, 6.0});
@@ -94,7 +94,7 @@ public class JavaKMeansSuite implements Serializable {
@Test
public void runKMeansUsingConstructor() {
- List<double[]> points = new ArrayList();
+ List<double[]> points = new ArrayList<double[]>();
points.add(new double[]{1.0, 2.0, 6.0});
points.add(new double[]{1.0, 3.0, 0.0});
points.add(new double[]{1.0, 4.0, 6.0});
diff --git a/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java b/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java
index eafee060cd..b40f552e0d 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java
@@ -21,8 +21,6 @@ import java.io.Serializable;
import java.util.List;
import java.lang.Math;
-import scala.Tuple2;
-
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
new file mode 100644
index 0000000000..b615f76e66
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.classification
+
+import scala.util.Random
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.FunSuite
+
+import org.apache.spark.mllib.regression.LabeledPoint
+import org.apache.spark.SparkContext
+
+object NaiveBayesSuite {
+
+ private def calcLabel(p: Double, pi: Array[Double]): Int = {
+ var sum = 0.0
+ for (j <- 0 until pi.length) {
+ sum += pi(j)
+ if (p < sum) return j
+ }
+ -1
+ }
+
+ // Generate input of the form Y = (theta * x).argmax()
+ def generateNaiveBayesInput(
+ pi: Array[Double], // 1XC
+ theta: Array[Array[Double]], // CXD
+ nPoints: Int,
+ seed: Int): Seq[LabeledPoint] = {
+ val D = theta(0).length
+ val rnd = new Random(seed)
+
+ val _pi = pi.map(math.pow(math.E, _))
+ val _theta = theta.map(row => row.map(math.pow(math.E, _)))
+
+ for (i <- 0 until nPoints) yield {
+ val y = calcLabel(rnd.nextDouble(), _pi)
+ val xi = Array.tabulate[Double](D) { j =>
+ if (rnd.nextDouble() < _theta(y)(j)) 1 else 0
+ }
+
+ LabeledPoint(y, xi)
+ }
+ }
+}
+
+class NaiveBayesSuite extends FunSuite with BeforeAndAfterAll {
+ @transient private var sc: SparkContext = _
+
+ override def beforeAll() {
+ sc = new SparkContext("local", "test")
+ }
+
+ override def afterAll() {
+ sc.stop()
+ System.clearProperty("spark.driver.port")
+ }
+
+ def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) {
+ val numOfPredictions = predictions.zip(input).count {
+ case (prediction, expected) =>
+ prediction != expected.label
+ }
+ // At least 80% of the predictions should be on.
+ assert(numOfPredictions < input.length / 5)
+ }
+
+ test("Naive Bayes") {
+ val nPoints = 10000
+
+ val pi = Array(0.5, 0.3, 0.2).map(math.log)
+ val theta = Array(
+ Array(0.91, 0.03, 0.03, 0.03), // label 0
+ Array(0.03, 0.91, 0.03, 0.03), // label 1
+ Array(0.03, 0.03, 0.91, 0.03) // label 2
+ ).map(_.map(math.log))
+
+ val testData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 42)
+ val testRDD = sc.parallelize(testData, 2)
+ testRDD.cache()
+
+ val model = NaiveBayes.train(testRDD)
+
+ val validationData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 17)
+ val validationRDD = sc.parallelize(validationData, 2)
+
+ // Test prediction on RDD.
+ validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData)
+
+ // Test prediction on Array.
+ validatePrediction(validationData.map(row => model.predict(row.features)), validationData)
+ }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala
new file mode 100644
index 0000000000..a6028a1e98
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.optimization
+
+import scala.util.Random
+import scala.collection.JavaConversions._
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.FunSuite
+import org.scalatest.matchers.ShouldMatchers
+
+import org.apache.spark.SparkContext
+import org.apache.spark.mllib.regression._
+
+object GradientDescentSuite {
+
+ def generateLogisticInputAsList(
+ offset: Double,
+ scale: Double,
+ nPoints: Int,
+ seed: Int): java.util.List[LabeledPoint] = {
+ seqAsJavaList(generateGDInput(offset, scale, nPoints, seed))
+ }
+
+ // Generate input of the form Y = logistic(offset + scale * X)
+ def generateGDInput(
+ offset: Double,
+ scale: Double,
+ nPoints: Int,
+ seed: Int): Seq[LabeledPoint] = {
+ val rnd = new Random(seed)
+ val x1 = Array.fill[Double](nPoints)(rnd.nextGaussian())
+
+ val unifRand = new scala.util.Random(45)
+ val rLogis = (0 until nPoints).map { i =>
+ val u = unifRand.nextDouble()
+ math.log(u) - math.log(1.0-u)
+ }
+
+ val y: Seq[Int] = (0 until nPoints).map { i =>
+ val yVal = offset + scale * x1(i) + rLogis(i)
+ if (yVal > 0) 1 else 0
+ }
+
+ val testData = (0 until nPoints).map(i => LabeledPoint(y(i), Array(x1(i))))
+ testData
+ }
+}
+
+class GradientDescentSuite extends FunSuite with BeforeAndAfterAll with ShouldMatchers {
+ @transient private var sc: SparkContext = _
+
+ override def beforeAll() {
+ sc = new SparkContext("local", "test")
+ }
+
+ override def afterAll() {
+ sc.stop()
+ System.clearProperty("spark.driver.port")
+ }
+
+ test("Assert the loss is decreasing.") {
+ val nPoints = 10000
+ val A = 2.0
+ val B = -1.5
+
+ val initialB = -1.0
+ val initialWeights = Array(initialB)
+
+ val gradient = new LogisticGradient()
+ val updater = new SimpleUpdater()
+ val stepSize = 1.0
+ val numIterations = 10
+ val regParam = 0
+ val miniBatchFrac = 1.0
+
+ // Add a extra variable consisting of all 1.0's for the intercept.
+ val testData = GradientDescentSuite.generateGDInput(A, B, nPoints, 42)
+ val data = testData.map { case LabeledPoint(label, features) =>
+ label -> Array(1.0, features: _*)
+ }
+
+ val dataRDD = sc.parallelize(data, 2).cache()
+ val initialWeightsWithIntercept = Array(1.0, initialWeights: _*)
+
+ val (_, loss) = GradientDescent.runMiniBatchSGD(
+ dataRDD,
+ gradient,
+ updater,
+ stepSize,
+ numIterations,
+ regParam,
+ miniBatchFrac,
+ initialWeightsWithIntercept)
+
+ assert(loss.last - loss.head < 0, "loss isn't decreasing.")
+
+ val lossDiff = loss.init.zip(loss.tail).map { case (lhs, rhs) => lhs - rhs }
+ assert(lossDiff.count(_ > 0).toDouble / lossDiff.size > 0.8)
+ }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
index fafc5ec5f2..e683a90f57 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
@@ -90,18 +90,34 @@ class ALSSuite extends FunSuite with BeforeAndAfterAll {
testALS(50, 100, 1, 15, 0.7, 0.3)
}
+ test("rank-1 matrices bulk") {
+ testALS(50, 100, 1, 15, 0.7, 0.3, false, true)
+ }
+
test("rank-2 matrices") {
testALS(100, 200, 2, 15, 0.7, 0.3)
}
+ test("rank-2 matrices bulk") {
+ testALS(100, 200, 2, 15, 0.7, 0.3, false, true)
+ }
+
test("rank-1 matrices implicit") {
testALS(80, 160, 1, 15, 0.7, 0.4, true)
}
+ test("rank-1 matrices implicit bulk") {
+ testALS(80, 160, 1, 15, 0.7, 0.4, true, true)
+ }
+
test("rank-2 matrices implicit") {
testALS(100, 200, 2, 15, 0.7, 0.4, true)
}
+ test("rank-2 matrices implicit bulk") {
+ testALS(100, 200, 2, 15, 0.7, 0.4, true, true)
+ }
+
/**
* Test if we can correctly factorize R = U * P where U and P are of known rank.
*
@@ -111,9 +127,12 @@ class ALSSuite extends FunSuite with BeforeAndAfterAll {
* @param iterations number of iterations to run
* @param samplingRate what fraction of the user-product pairs are known
* @param matchThreshold max difference allowed to consider a predicted rating correct
+ * @param implicitPrefs flag to test implicit feedback
+ * @param bulkPredict flag to test bulk prediciton
*/
def testALS(users: Int, products: Int, features: Int, iterations: Int,
- samplingRate: Double, matchThreshold: Double, implicitPrefs: Boolean = false)
+ samplingRate: Double, matchThreshold: Double, implicitPrefs: Boolean = false,
+ bulkPredict: Boolean = false)
{
val (sampledRatings, trueRatings, truePrefs) = ALSSuite.generateRatings(users, products,
features, samplingRate, implicitPrefs)
@@ -130,7 +149,17 @@ class ALSSuite extends FunSuite with BeforeAndAfterAll {
for ((p, vec) <- model.productFeatures.collect(); i <- 0 until features) {
predictedP.put(p, i, vec(i))
}
- val predictedRatings = predictedU.mmul(predictedP.transpose)
+ val predictedRatings = bulkPredict match {
+ case false => predictedU.mmul(predictedP.transpose)
+ case true =>
+ val allRatings = new DoubleMatrix(users, products)
+ val usersProducts = for (u <- 0 until users; p <- 0 until products) yield (u, p)
+ val userProductsRDD = sc.parallelize(usersProducts)
+ model.predict(userProductsRDD).collect().foreach { elem =>
+ allRatings.put(elem.user, elem.product, elem.rating)
+ }
+ allRatings
+ }
if (!implicitPrefs) {
for (u <- 0 until users; p <- 0 until products) {