diff options
author | Reza Zadeh <rizlar@gmail.com> | 2014-01-09 22:45:32 -0800 |
---|---|---|
committer | Reza Zadeh <rizlar@gmail.com> | 2014-01-09 22:45:32 -0800 |
commit | 21c8a54c08354f8934fd8ec58b43879c1686ccad (patch) | |
tree | 51426328d9f0eafdeec7fb46ef99c86f27f86dd2 /mllib/src/main | |
parent | cf5bd4ab2e9db72d3d9164053523e9e872d85b94 (diff) | |
parent | 300eaa994c399a0c991c1e39b4dd864a7aa4bdc6 (diff) | |
download | spark-21c8a54c08354f8934fd8ec58b43879c1686ccad.tar.gz spark-21c8a54c08354f8934fd8ec58b43879c1686ccad.tar.bz2 spark-21c8a54c08354f8934fd8ec58b43879c1686ccad.zip |
Merge remote-tracking branch 'upstream/master' into sparsesvd
Conflicts:
docs/mllib-guide.md
Diffstat (limited to 'mllib/src/main')
4 files changed, 188 insertions, 3 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 8247c1ebc5..2d8623392e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -197,6 +197,7 @@ class PythonMLLibAPI extends Serializable { return ret } + /** Unpack a Rating object from an array of bytes */ private def unpackRating(ratingBytes: Array[Byte]): Rating = { val bb = ByteBuffer.wrap(ratingBytes) bb.order(ByteOrder.nativeOrder()) @@ -206,6 +207,35 @@ class PythonMLLibAPI extends Serializable { return new Rating(user, product, rating) } + /** Unpack a tuple of Ints from an array of bytes */ + private[spark] def unpackTuple(tupleBytes: Array[Byte]): (Int, Int) = { + val bb = ByteBuffer.wrap(tupleBytes) + bb.order(ByteOrder.nativeOrder()) + val v1 = bb.getInt() + val v2 = bb.getInt() + (v1, v2) + } + + /** + * Serialize a Rating object into an array of bytes. + * It can be deserialized using RatingDeserializer(). + * + * @param rate + * @return + */ + private[spark] def serializeRating(rate: Rating): Array[Byte] = { + val len = 3 + val bytes = new Array[Byte](4 + 8 * len) + val bb = ByteBuffer.wrap(bytes) + bb.order(ByteOrder.nativeOrder()) + bb.putInt(len) + val db = bb.asDoubleBuffer() + db.put(rate.user.toDouble) + db.put(rate.product.toDouble) + db.put(rate.rating) + bytes + } + /** * Java stub for Python mllib ALS.train(). This stub returns a handle * to the Java object instead of the content of the Java object. Extra care diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala new file mode 100644 index 0000000000..524300d6ae --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.classification + +import scala.collection.mutable + +import org.jblas.DoubleMatrix + +import org.apache.spark.Logging +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.rdd.RDD + +/** + * Model for Naive Bayes Classifiers. + * + * @param pi Log of class priors, whose dimension is C. + * @param theta Log of class conditional probabilities, whose dimension is CXD. + */ +class NaiveBayesModel(pi: Array[Double], theta: Array[Array[Double]]) + extends ClassificationModel with Serializable { + + // Create a column vector that can be used for predictions + private val _pi = new DoubleMatrix(pi.length, 1, pi: _*) + private val _theta = new DoubleMatrix(theta) + + def predict(testData: RDD[Array[Double]]): RDD[Double] = testData.map(predict) + + def predict(testData: Array[Double]): Double = { + val dataMatrix = new DoubleMatrix(testData.length, 1, testData: _*) + val result = _pi.add(_theta.mmul(dataMatrix)) + result.argmax() + } +} + +/** + * Trains a Naive Bayes model given an RDD of `(label, features)` pairs. + * + * @param lambda The smooth parameter + */ +class NaiveBayes private (val lambda: Double = 1.0) + extends Serializable with Logging { + + /** + * Run the algorithm with the configured parameters on an input RDD of LabeledPoint entries. + * + * @param data RDD of (label, array of features) pairs. + */ + def run(data: RDD[LabeledPoint]) = { + // Aggregates all sample points to driver side to get sample count and summed feature vector + // for each label. The shape of `zeroCombiner` & `aggregated` is: + // + // label: Int -> (count: Int, featuresSum: DoubleMatrix) + val zeroCombiner = mutable.Map.empty[Int, (Int, DoubleMatrix)] + val aggregated = data.aggregate(zeroCombiner)({ (combiner, point) => + point match { + case LabeledPoint(label, features) => + val (count, featuresSum) = combiner.getOrElse(label.toInt, (0, DoubleMatrix.zeros(1))) + val fs = new DoubleMatrix(features.length, 1, features: _*) + combiner += label.toInt -> (count + 1, featuresSum.addi(fs)) + } + }, { (lhs, rhs) => + for ((label, (c, fs)) <- rhs) { + val (count, featuresSum) = lhs.getOrElse(label, (0, DoubleMatrix.zeros(1))) + lhs(label) = (count + c, featuresSum.addi(fs)) + } + lhs + }) + + // Kinds of label + val C = aggregated.size + // Total sample count + val N = aggregated.values.map(_._1).sum + + val pi = new Array[Double](C) + val theta = new Array[Array[Double]](C) + val piLogDenom = math.log(N + C * lambda) + + for ((label, (count, fs)) <- aggregated) { + val thetaLogDenom = math.log(fs.sum() + fs.length * lambda) + pi(label) = math.log(count + lambda) - piLogDenom + theta(label) = fs.toArray.map(f => math.log(f + lambda) - thetaLogDenom) + } + + new NaiveBayesModel(pi, theta) + } +} + +object NaiveBayes { + /** + * Trains a Naive Bayes model given an RDD of `(label, features)` pairs. + * + * This is the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of + * discrete data. For example, by converting documents into TF-IDF vectors, it can be used for + * document classification. By making every vector a 0-1 vector. it can also be used as + * Bernoulli NB ([[http://tinyurl.com/p7c96j6]]). + * + * @param input RDD of `(label, array of features)` pairs. Every vector should be a frequency + * vector or a count vector. + * @param lambda The smooth parameter + */ + def train(input: RDD[LabeledPoint], lambda: Double = 1.0): NaiveBayesModel = { + new NaiveBayes(lambda).run(input) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala index 749e7364f4..c590492e7a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala @@ -50,8 +50,8 @@ class LogisticGradient extends Gradient { val gradient = data.mul(gradientMultiplier) val loss = - if (margin > 0) { - math.log(1 + math.exp(0 - margin)) + if (label > 0) { + math.log(1 + math.exp(margin)) } else { math.log(1 + math.exp(margin)) - margin } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index af43d89c70..443fc5de5b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -19,8 +19,11 @@ package org.apache.spark.mllib.recommendation import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.api.python.PythonMLLibAPI import org.jblas._ +import org.apache.spark.api.java.JavaRDD + /** * Model representing the result of matrix factorization. @@ -44,6 +47,39 @@ class MatrixFactorizationModel( userVector.dot(productVector) } - // TODO: Figure out what good bulk prediction methods would look like. + /** + * Predict the rating of many users for many products. + * The output RDD has an element per each element in the input RDD (including all duplicates) + * unless a user or product is missing in the training set. + * + * @param usersProducts RDD of (user, product) pairs. + * @return RDD of Ratings. + */ + def predict(usersProducts: RDD[(Int, Int)]): RDD[Rating] = { + val users = userFeatures.join(usersProducts).map{ + case (user, (uFeatures, product)) => (product, (user, uFeatures)) + } + users.join(productFeatures).map { + case (product, ((user, uFeatures), pFeatures)) => + val userVector = new DoubleMatrix(uFeatures) + val productVector = new DoubleMatrix(pFeatures) + Rating(user, product, userVector.dot(productVector)) + } + } + + /** + * Predict the rating of many users for many products. + * This is a Java stub for python predictAll() + * + * @param usersProductsJRDD A JavaRDD with serialized tuples (user, product) + * @return JavaRDD of serialized Rating objects. + */ + def predict(usersProductsJRDD: JavaRDD[Array[Byte]]): JavaRDD[Array[Byte]] = { + val pythonAPI = new PythonMLLibAPI() + val usersProducts = usersProductsJRDD.rdd.map(xBytes => pythonAPI.unpackTuple(xBytes)) + predict(usersProducts).map(rate => pythonAPI.serializeRating(rate)) + } + + // TODO: Figure out what other good bulk prediction methods would look like. // Probably want a way to get the top users for a product or vice-versa. } |