diff options
author | Reza Zadeh <rizlar@gmail.com> | 2014-01-09 22:45:32 -0800 |
---|---|---|
committer | Reza Zadeh <rizlar@gmail.com> | 2014-01-09 22:45:32 -0800 |
commit | 21c8a54c08354f8934fd8ec58b43879c1686ccad (patch) | |
tree | 51426328d9f0eafdeec7fb46ef99c86f27f86dd2 /mllib | |
parent | cf5bd4ab2e9db72d3d9164053523e9e872d85b94 (diff) | |
parent | 300eaa994c399a0c991c1e39b4dd864a7aa4bdc6 (diff) | |
download | spark-21c8a54c08354f8934fd8ec58b43879c1686ccad.tar.gz spark-21c8a54c08354f8934fd8ec58b43879c1686ccad.tar.bz2 spark-21c8a54c08354f8934fd8ec58b43879c1686ccad.zip |
Merge remote-tracking branch 'upstream/master' into sparsesvd
Conflicts:
docs/mllib-guide.md
Diffstat (limited to 'mllib')
7 files changed, 443 insertions, 5 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 8247c1ebc5..2d8623392e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -197,6 +197,7 @@ class PythonMLLibAPI extends Serializable { return ret } + /** Unpack a Rating object from an array of bytes */ private def unpackRating(ratingBytes: Array[Byte]): Rating = { val bb = ByteBuffer.wrap(ratingBytes) bb.order(ByteOrder.nativeOrder()) @@ -206,6 +207,35 @@ class PythonMLLibAPI extends Serializable { return new Rating(user, product, rating) } + /** Unpack a tuple of Ints from an array of bytes */ + private[spark] def unpackTuple(tupleBytes: Array[Byte]): (Int, Int) = { + val bb = ByteBuffer.wrap(tupleBytes) + bb.order(ByteOrder.nativeOrder()) + val v1 = bb.getInt() + val v2 = bb.getInt() + (v1, v2) + } + + /** + * Serialize a Rating object into an array of bytes. + * It can be deserialized using RatingDeserializer(). + * + * @param rate + * @return + */ + private[spark] def serializeRating(rate: Rating): Array[Byte] = { + val len = 3 + val bytes = new Array[Byte](4 + 8 * len) + val bb = ByteBuffer.wrap(bytes) + bb.order(ByteOrder.nativeOrder()) + bb.putInt(len) + val db = bb.asDoubleBuffer() + db.put(rate.user.toDouble) + db.put(rate.product.toDouble) + db.put(rate.rating) + bytes + } + /** * Java stub for Python mllib ALS.train(). This stub returns a handle * to the Java object instead of the content of the Java object. Extra care diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala new file mode 100644 index 0000000000..524300d6ae --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.classification + +import scala.collection.mutable + +import org.jblas.DoubleMatrix + +import org.apache.spark.Logging +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.rdd.RDD + +/** + * Model for Naive Bayes Classifiers. + * + * @param pi Log of class priors, whose dimension is C. + * @param theta Log of class conditional probabilities, whose dimension is CXD. + */ +class NaiveBayesModel(pi: Array[Double], theta: Array[Array[Double]]) + extends ClassificationModel with Serializable { + + // Create a column vector that can be used for predictions + private val _pi = new DoubleMatrix(pi.length, 1, pi: _*) + private val _theta = new DoubleMatrix(theta) + + def predict(testData: RDD[Array[Double]]): RDD[Double] = testData.map(predict) + + def predict(testData: Array[Double]): Double = { + val dataMatrix = new DoubleMatrix(testData.length, 1, testData: _*) + val result = _pi.add(_theta.mmul(dataMatrix)) + result.argmax() + } +} + +/** + * Trains a Naive Bayes model given an RDD of `(label, features)` pairs. + * + * @param lambda The smooth parameter + */ +class NaiveBayes private (val lambda: Double = 1.0) + extends Serializable with Logging { + + /** + * Run the algorithm with the configured parameters on an input RDD of LabeledPoint entries. + * + * @param data RDD of (label, array of features) pairs. + */ + def run(data: RDD[LabeledPoint]) = { + // Aggregates all sample points to driver side to get sample count and summed feature vector + // for each label. The shape of `zeroCombiner` & `aggregated` is: + // + // label: Int -> (count: Int, featuresSum: DoubleMatrix) + val zeroCombiner = mutable.Map.empty[Int, (Int, DoubleMatrix)] + val aggregated = data.aggregate(zeroCombiner)({ (combiner, point) => + point match { + case LabeledPoint(label, features) => + val (count, featuresSum) = combiner.getOrElse(label.toInt, (0, DoubleMatrix.zeros(1))) + val fs = new DoubleMatrix(features.length, 1, features: _*) + combiner += label.toInt -> (count + 1, featuresSum.addi(fs)) + } + }, { (lhs, rhs) => + for ((label, (c, fs)) <- rhs) { + val (count, featuresSum) = lhs.getOrElse(label, (0, DoubleMatrix.zeros(1))) + lhs(label) = (count + c, featuresSum.addi(fs)) + } + lhs + }) + + // Kinds of label + val C = aggregated.size + // Total sample count + val N = aggregated.values.map(_._1).sum + + val pi = new Array[Double](C) + val theta = new Array[Array[Double]](C) + val piLogDenom = math.log(N + C * lambda) + + for ((label, (count, fs)) <- aggregated) { + val thetaLogDenom = math.log(fs.sum() + fs.length * lambda) + pi(label) = math.log(count + lambda) - piLogDenom + theta(label) = fs.toArray.map(f => math.log(f + lambda) - thetaLogDenom) + } + + new NaiveBayesModel(pi, theta) + } +} + +object NaiveBayes { + /** + * Trains a Naive Bayes model given an RDD of `(label, features)` pairs. + * + * This is the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of + * discrete data. For example, by converting documents into TF-IDF vectors, it can be used for + * document classification. By making every vector a 0-1 vector. it can also be used as + * Bernoulli NB ([[http://tinyurl.com/p7c96j6]]). + * + * @param input RDD of `(label, array of features)` pairs. Every vector should be a frequency + * vector or a count vector. + * @param lambda The smooth parameter + */ + def train(input: RDD[LabeledPoint], lambda: Double = 1.0): NaiveBayesModel = { + new NaiveBayes(lambda).run(input) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala index 749e7364f4..c590492e7a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala @@ -50,8 +50,8 @@ class LogisticGradient extends Gradient { val gradient = data.mul(gradientMultiplier) val loss = - if (margin > 0) { - math.log(1 + math.exp(0 - margin)) + if (label > 0) { + math.log(1 + math.exp(margin)) } else { math.log(1 + math.exp(margin)) - margin } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index af43d89c70..443fc5de5b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -19,8 +19,11 @@ package org.apache.spark.mllib.recommendation import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.api.python.PythonMLLibAPI import org.jblas._ +import org.apache.spark.api.java.JavaRDD + /** * Model representing the result of matrix factorization. @@ -44,6 +47,39 @@ class MatrixFactorizationModel( userVector.dot(productVector) } - // TODO: Figure out what good bulk prediction methods would look like. + /** + * Predict the rating of many users for many products. + * The output RDD has an element per each element in the input RDD (including all duplicates) + * unless a user or product is missing in the training set. + * + * @param usersProducts RDD of (user, product) pairs. + * @return RDD of Ratings. + */ + def predict(usersProducts: RDD[(Int, Int)]): RDD[Rating] = { + val users = userFeatures.join(usersProducts).map{ + case (user, (uFeatures, product)) => (product, (user, uFeatures)) + } + users.join(productFeatures).map { + case (product, ((user, uFeatures), pFeatures)) => + val userVector = new DoubleMatrix(uFeatures) + val productVector = new DoubleMatrix(pFeatures) + Rating(user, product, userVector.dot(productVector)) + } + } + + /** + * Predict the rating of many users for many products. + * This is a Java stub for python predictAll() + * + * @param usersProductsJRDD A JavaRDD with serialized tuples (user, product) + * @return JavaRDD of serialized Rating objects. + */ + def predict(usersProductsJRDD: JavaRDD[Array[Byte]]): JavaRDD[Array[Byte]] = { + val pythonAPI = new PythonMLLibAPI() + val usersProducts = usersProductsJRDD.rdd.map(xBytes => pythonAPI.unpackTuple(xBytes)) + predict(usersProducts).map(rate => pythonAPI.serializeRating(rate)) + } + + // TODO: Figure out what other good bulk prediction methods would look like. // Probably want a way to get the top users for a product or vice-versa. } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala new file mode 100644 index 0000000000..b615f76e66 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.classification + +import scala.util.Random + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.FunSuite + +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.SparkContext + +object NaiveBayesSuite { + + private def calcLabel(p: Double, pi: Array[Double]): Int = { + var sum = 0.0 + for (j <- 0 until pi.length) { + sum += pi(j) + if (p < sum) return j + } + -1 + } + + // Generate input of the form Y = (theta * x).argmax() + def generateNaiveBayesInput( + pi: Array[Double], // 1XC + theta: Array[Array[Double]], // CXD + nPoints: Int, + seed: Int): Seq[LabeledPoint] = { + val D = theta(0).length + val rnd = new Random(seed) + + val _pi = pi.map(math.pow(math.E, _)) + val _theta = theta.map(row => row.map(math.pow(math.E, _))) + + for (i <- 0 until nPoints) yield { + val y = calcLabel(rnd.nextDouble(), _pi) + val xi = Array.tabulate[Double](D) { j => + if (rnd.nextDouble() < _theta(y)(j)) 1 else 0 + } + + LabeledPoint(y, xi) + } + } +} + +class NaiveBayesSuite extends FunSuite with BeforeAndAfterAll { + @transient private var sc: SparkContext = _ + + override def beforeAll() { + sc = new SparkContext("local", "test") + } + + override def afterAll() { + sc.stop() + System.clearProperty("spark.driver.port") + } + + def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) { + val numOfPredictions = predictions.zip(input).count { + case (prediction, expected) => + prediction != expected.label + } + // At least 80% of the predictions should be on. + assert(numOfPredictions < input.length / 5) + } + + test("Naive Bayes") { + val nPoints = 10000 + + val pi = Array(0.5, 0.3, 0.2).map(math.log) + val theta = Array( + Array(0.91, 0.03, 0.03, 0.03), // label 0 + Array(0.03, 0.91, 0.03, 0.03), // label 1 + Array(0.03, 0.03, 0.91, 0.03) // label 2 + ).map(_.map(math.log)) + + val testData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 42) + val testRDD = sc.parallelize(testData, 2) + testRDD.cache() + + val model = NaiveBayes.train(testRDD) + + val validationData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 17) + val validationRDD = sc.parallelize(validationData, 2) + + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala new file mode 100644 index 0000000000..a6028a1e98 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.optimization + +import scala.util.Random +import scala.collection.JavaConversions._ + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.FunSuite +import org.scalatest.matchers.ShouldMatchers + +import org.apache.spark.SparkContext +import org.apache.spark.mllib.regression._ + +object GradientDescentSuite { + + def generateLogisticInputAsList( + offset: Double, + scale: Double, + nPoints: Int, + seed: Int): java.util.List[LabeledPoint] = { + seqAsJavaList(generateGDInput(offset, scale, nPoints, seed)) + } + + // Generate input of the form Y = logistic(offset + scale * X) + def generateGDInput( + offset: Double, + scale: Double, + nPoints: Int, + seed: Int): Seq[LabeledPoint] = { + val rnd = new Random(seed) + val x1 = Array.fill[Double](nPoints)(rnd.nextGaussian()) + + val unifRand = new scala.util.Random(45) + val rLogis = (0 until nPoints).map { i => + val u = unifRand.nextDouble() + math.log(u) - math.log(1.0-u) + } + + val y: Seq[Int] = (0 until nPoints).map { i => + val yVal = offset + scale * x1(i) + rLogis(i) + if (yVal > 0) 1 else 0 + } + + val testData = (0 until nPoints).map(i => LabeledPoint(y(i), Array(x1(i)))) + testData + } +} + +class GradientDescentSuite extends FunSuite with BeforeAndAfterAll with ShouldMatchers { + @transient private var sc: SparkContext = _ + + override def beforeAll() { + sc = new SparkContext("local", "test") + } + + override def afterAll() { + sc.stop() + System.clearProperty("spark.driver.port") + } + + test("Assert the loss is decreasing.") { + val nPoints = 10000 + val A = 2.0 + val B = -1.5 + + val initialB = -1.0 + val initialWeights = Array(initialB) + + val gradient = new LogisticGradient() + val updater = new SimpleUpdater() + val stepSize = 1.0 + val numIterations = 10 + val regParam = 0 + val miniBatchFrac = 1.0 + + // Add a extra variable consisting of all 1.0's for the intercept. + val testData = GradientDescentSuite.generateGDInput(A, B, nPoints, 42) + val data = testData.map { case LabeledPoint(label, features) => + label -> Array(1.0, features: _*) + } + + val dataRDD = sc.parallelize(data, 2).cache() + val initialWeightsWithIntercept = Array(1.0, initialWeights: _*) + + val (_, loss) = GradientDescent.runMiniBatchSGD( + dataRDD, + gradient, + updater, + stepSize, + numIterations, + regParam, + miniBatchFrac, + initialWeightsWithIntercept) + + assert(loss.last - loss.head < 0, "loss isn't decreasing.") + + val lossDiff = loss.init.zip(loss.tail).map { case (lhs, rhs) => lhs - rhs } + assert(lossDiff.count(_ > 0).toDouble / lossDiff.size > 0.8) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala index fafc5ec5f2..e683a90f57 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala @@ -90,18 +90,34 @@ class ALSSuite extends FunSuite with BeforeAndAfterAll { testALS(50, 100, 1, 15, 0.7, 0.3) } + test("rank-1 matrices bulk") { + testALS(50, 100, 1, 15, 0.7, 0.3, false, true) + } + test("rank-2 matrices") { testALS(100, 200, 2, 15, 0.7, 0.3) } + test("rank-2 matrices bulk") { + testALS(100, 200, 2, 15, 0.7, 0.3, false, true) + } + test("rank-1 matrices implicit") { testALS(80, 160, 1, 15, 0.7, 0.4, true) } + test("rank-1 matrices implicit bulk") { + testALS(80, 160, 1, 15, 0.7, 0.4, true, true) + } + test("rank-2 matrices implicit") { testALS(100, 200, 2, 15, 0.7, 0.4, true) } + test("rank-2 matrices implicit bulk") { + testALS(100, 200, 2, 15, 0.7, 0.4, true, true) + } + /** * Test if we can correctly factorize R = U * P where U and P are of known rank. * @@ -111,9 +127,12 @@ class ALSSuite extends FunSuite with BeforeAndAfterAll { * @param iterations number of iterations to run * @param samplingRate what fraction of the user-product pairs are known * @param matchThreshold max difference allowed to consider a predicted rating correct + * @param implicitPrefs flag to test implicit feedback + * @param bulkPredict flag to test bulk prediciton */ def testALS(users: Int, products: Int, features: Int, iterations: Int, - samplingRate: Double, matchThreshold: Double, implicitPrefs: Boolean = false) + samplingRate: Double, matchThreshold: Double, implicitPrefs: Boolean = false, + bulkPredict: Boolean = false) { val (sampledRatings, trueRatings, truePrefs) = ALSSuite.generateRatings(users, products, features, samplingRate, implicitPrefs) @@ -130,7 +149,17 @@ class ALSSuite extends FunSuite with BeforeAndAfterAll { for ((p, vec) <- model.productFeatures.collect(); i <- 0 until features) { predictedP.put(p, i, vec(i)) } - val predictedRatings = predictedU.mmul(predictedP.transpose) + val predictedRatings = bulkPredict match { + case false => predictedU.mmul(predictedP.transpose) + case true => + val allRatings = new DoubleMatrix(users, products) + val usersProducts = for (u <- 0 until users; p <- 0 until products) yield (u, p) + val userProductsRDD = sc.parallelize(usersProducts) + model.predict(userProductsRDD).collect().foreach { elem => + allRatings.put(elem.user, elem.product, elem.rating) + } + allRatings + } if (!implicitPrefs) { for (u <- 0 until users; p <- 0 until products) { |