From 3dc655aa19f678219e5d999fe97ab769567ffb1c Mon Sep 17 00:00:00 2001 From: Frank Dai Date: Wed, 25 Dec 2013 16:50:42 +0800 Subject: standard Naive Bayes classifier --- .../spark/mllib/classification/NaiveBayes.scala | 103 +++++++++++++++++++++ .../mllib/classification/NaiveBayesSuite.scala | 92 ++++++++++++++++++ 2 files changed, 195 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala (limited to 'mllib') diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala new file mode 100644 index 0000000000..f1b0e6ee6a --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.classification + +import scala.collection.mutable + +import org.apache.spark.Logging +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.rdd.RDD +import org.apache.spark.SparkContext._ +import org.jblas.DoubleMatrix + +/** + * Model for Naive Bayes Classifiers. + * + * @param weightPerLabel Weights computed for every label, which's dimension is C. + * @param weightMatrix Weights computed for every label and feature, which's dimension is CXD + */ +class NaiveBayesModel(val weightPerLabel: Array[Double], + val weightMatrix: Array[Array[Double]]) + extends ClassificationModel with Serializable { + + // Create a column vector that can be used for predictions + private val _weightPerLabel = new DoubleMatrix(weightPerLabel.length, 1, weightPerLabel:_*) + private val _weightMatrix = new DoubleMatrix(weightMatrix) + + def predict(testData: RDD[Array[Double]]): RDD[Double] = testData.map(predict) + + def predict(testData: Array[Double]): Double = { + val dataMatrix = new DoubleMatrix(testData.length, 1, testData: _*) + val result = _weightPerLabel.add(_weightMatrix.mmul(dataMatrix)) + result.argmax() + } +} + + + +class NaiveBayes private (val lambda: Double = 1.0) // smoothing parameter + extends Serializable with Logging { + + /** + * Run the algorithm with the configured parameters on an input + * RDD of LabeledPoint entries. + * + * @param C kind of labels, labels are continuous integers and the maximal label is C-1 + * @param D dimension of feature vectors + * @param data RDD of (label, array of features) pairs. + */ + def run(C: Int, D: Int, data: RDD[LabeledPoint]): NaiveBayesModel = { + val groupedData = data.map(p => p.label.toInt -> p.features).groupByKey() + + val countPerLabel = groupedData.mapValues(_.size) + val logDenominator = math.log(data.count() + C * lambda) + val weightPerLabel = countPerLabel.mapValues { + count => math.log(count + lambda) - logDenominator + } + + val summedObservations = groupedData.mapValues(_.reduce { + (lhs, rhs) => lhs.zip(rhs).map(pair => pair._1 + pair._2) + }) + + val weightsMatrix = summedObservations.mapValues { weights => + val sum = weights.sum + val logDenom = math.log(sum + D * lambda) + weights.map(w => math.log(w + lambda) - logDenom) + } + + val labelWeights = weightPerLabel.collect().sorted.map(_._2) + val weightsMat = weightsMatrix.collect().sortBy(_._1).map(_._2) + + new NaiveBayesModel(labelWeights, weightsMat) + } +} + +object NaiveBayes { + /** + * Train a naive bayes model given an RDD of (label, features) pairs. + * + * @param C kind of labels, the maximal label is C-1 + * @param D dimension of feature vectors + * @param input RDD of (label, array of features) pairs. + * @param lambda smooth parameter + */ + def train(C: Int, D: Int, input: RDD[LabeledPoint], + lambda: Double = 1.0): NaiveBayesModel = { + new NaiveBayes(lambda).run(C, D, input) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala new file mode 100644 index 0000000000..d871ed3672 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala @@ -0,0 +1,92 @@ +package org.apache.spark.mllib.classification + +import scala.collection.JavaConversions._ +import scala.util.Random + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.FunSuite + +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.SparkContext + +object NaiveBayesSuite { + + private def calcLabel(p: Double, weightPerLabel: Array[Double]): Int = { + var sum = 0.0 + for (j <- 0 until weightPerLabel.length) { + sum += weightPerLabel(j) + if (p < sum) return j + } + -1 + } + + // Generate input of the form Y = (weightMatrix*x).argmax() + def generateNaiveBayesInput( + weightPerLabel: Array[Double], // 1XC + weightsMatrix: Array[Array[Double]], // CXD + nPoints: Int, + seed: Int): Seq[LabeledPoint] = { + val D = weightsMatrix(0).length + val rnd = new Random(seed) + + val _weightPerLabel = weightPerLabel.map(math.pow(math.E, _)) + val _weightMatrix = weightsMatrix.map(row => row.map(math.pow(math.E, _))) + + for (i <- 0 until nPoints) yield { + val y = calcLabel(rnd.nextDouble(), _weightPerLabel) + val xi = Array.tabulate[Double](D) { j => + if (rnd.nextDouble() < _weightMatrix(y)(j)) 1 else 0 + } + + LabeledPoint(y, xi) + } + } +} + +class NaiveBayesSuite extends FunSuite with BeforeAndAfterAll { + @transient private var sc: SparkContext = _ + + override def beforeAll() { + sc = new SparkContext("local", "test") + } + + override def afterAll() { + sc.stop() + System.clearProperty("spark.driver.port") + } + + def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) { + val numOffPredictions = predictions.zip(input).count { + case (prediction, expected) => + prediction != expected.label + } + // At least 80% of the predictions should be on. + assert(numOffPredictions < input.length / 5) + } + + test("Naive Bayes") { + val nPoints = 10000 + + val weightPerLabel = Array(math.log(0.5), math.log(0.3), math.log(0.2)) + val weightsMatrix = Array( + Array(math.log(0.91), math.log(0.03), math.log(0.03), math.log(0.03)), // label 0 + Array(math.log(0.03), math.log(0.91), math.log(0.03), math.log(0.03)), // label 1 + Array(math.log(0.03), math.log(0.03), math.log(0.91), math.log(0.03)) // label 2 + ) + + val testData = NaiveBayesSuite.generateNaiveBayesInput(weightPerLabel, weightsMatrix, nPoints, 42) + val testRDD = sc.parallelize(testData, 2) + testRDD.cache() + + val model = NaiveBayes.train(3, 4, testRDD) + + val validationData = NaiveBayesSuite.generateNaiveBayesInput(weightPerLabel, weightsMatrix, nPoints, 17) + val validationRDD = sc.parallelize(validationData, 2) + + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + } +} -- cgit v1.2.3 From 3bb714eaa3bdb7b7c33f6e5263c683f4c4beeddc Mon Sep 17 00:00:00 2001 From: "Lian, Cheng" Date: Wed, 25 Dec 2013 17:15:38 +0800 Subject: Refactored NaiveBayes * Minimized shuffle output with mapPartitions. * Reduced RDD actions from 3 to 1. --- .../spark/mllib/classification/NaiveBayes.scala | 60 +++++++++++++--------- .../mllib/classification/NaiveBayesSuite.scala | 9 ++-- 2 files changed, 41 insertions(+), 28 deletions(-) (limited to 'mllib') diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index f1b0e6ee6a..edea5ed3e6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -48,11 +48,12 @@ class NaiveBayesModel(val weightPerLabel: Array[Double], } } - - class NaiveBayes private (val lambda: Double = 1.0) // smoothing parameter extends Serializable with Logging { + private[this] def vectorAdd(v1: Array[Double], v2: Array[Double]) = + v1.zip(v2).map(pair => pair._1 + pair._2) + /** * Run the algorithm with the configured parameters on an input * RDD of LabeledPoint entries. @@ -61,29 +62,42 @@ class NaiveBayes private (val lambda: Double = 1.0) // smoothing parameter * @param D dimension of feature vectors * @param data RDD of (label, array of features) pairs. */ - def run(C: Int, D: Int, data: RDD[LabeledPoint]): NaiveBayesModel = { - val groupedData = data.map(p => p.label.toInt -> p.features).groupByKey() - - val countPerLabel = groupedData.mapValues(_.size) - val logDenominator = math.log(data.count() + C * lambda) - val weightPerLabel = countPerLabel.mapValues { - count => math.log(count + lambda) - logDenominator + def run(C: Int, D: Int, data: RDD[LabeledPoint]) = { + val locallyReduced = data.mapPartitions { iterator => + val localLabelCounts = mutable.Map.empty[Int, Int].withDefaultValue(0) + val localSummedObservations = + mutable.Map.empty[Int, Array[Double]].withDefaultValue(Array.fill(D)(0.0)) + + for (LabeledPoint(label, features) <- iterator; i = label.toInt) { + localLabelCounts(i) += 1 + localSummedObservations(i) = vectorAdd(localSummedObservations(i), features) + } + + for ((label, count) <- localLabelCounts.toIterator) yield { + label -> (count, localSummedObservations(label)) + } + } + + val reduced = locallyReduced.reduceByKey { (lhs, rhs) => + (lhs._1 + rhs._1, vectorAdd(lhs._2, rhs._2)) } - - val summedObservations = groupedData.mapValues(_.reduce { - (lhs, rhs) => lhs.zip(rhs).map(pair => pair._1 + pair._2) - }) - - val weightsMatrix = summedObservations.mapValues { weights => - val sum = weights.sum - val logDenom = math.log(sum + D * lambda) - weights.map(w => math.log(w + lambda) - logDenom) + + val collected = reduced.mapValues { case (count, summed) => + val labelWeight = math.log(count + lambda) + val logDenom = math.log(summed.sum + D * lambda) + val weights = summed.map(w => math.log(w + lambda) - logDenom) + (count, labelWeight, weights) + }.collectAsMap() + + val weightPerLabel = { + val N = collected.values.map(_._1).sum + val logDenom = math.log(N + C * lambda) + collected.mapValues(_._2 - logDenom).toArray.sortBy(_._1).map(_._2) } - - val labelWeights = weightPerLabel.collect().sorted.map(_._2) - val weightsMat = weightsMatrix.collect().sortBy(_._1).map(_._2) - - new NaiveBayesModel(labelWeights, weightsMat) + + val weightMatrix = collected.mapValues(_._3).toArray.sortBy(_._1).map(_._2) + + new NaiveBayesModel(weightPerLabel, weightMatrix) } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala index d871ed3672..cc8d48a42b 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala @@ -1,6 +1,5 @@ package org.apache.spark.mllib.classification -import scala.collection.JavaConversions._ import scala.util.Random import org.scalatest.BeforeAndAfterAll @@ -56,12 +55,12 @@ class NaiveBayesSuite extends FunSuite with BeforeAndAfterAll { } def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) { - val numOffPredictions = predictions.zip(input).count { + val numOfPredictions = predictions.zip(input).count { case (prediction, expected) => prediction != expected.label } // At least 80% of the predictions should be on. - assert(numOffPredictions < input.length / 5) + assert(numOfPredictions < input.length / 5) } test("Naive Bayes") { @@ -71,8 +70,8 @@ class NaiveBayesSuite extends FunSuite with BeforeAndAfterAll { val weightsMatrix = Array( Array(math.log(0.91), math.log(0.03), math.log(0.03), math.log(0.03)), // label 0 Array(math.log(0.03), math.log(0.91), math.log(0.03), math.log(0.03)), // label 1 - Array(math.log(0.03), math.log(0.03), math.log(0.91), math.log(0.03)) // label 2 - ) + Array(math.log(0.03), math.log(0.03), math.log(0.91), math.log(0.03)) // label 2 + ) val testData = NaiveBayesSuite.generateNaiveBayesInput(weightPerLabel, weightsMatrix, nPoints, 42) val testRDD = sc.parallelize(testData, 2) -- cgit v1.2.3 From c0337c5bbfd5126c64964a9fdefd2bef11727d87 Mon Sep 17 00:00:00 2001 From: "Lian, Cheng" Date: Wed, 25 Dec 2013 22:45:57 +0800 Subject: Let reduceByKey to take care of local combine Also refactored some heavy FP code to improve readability and reduce memory footprint. --- .../spark/mllib/classification/NaiveBayes.scala | 43 ++++++++-------------- 1 file changed, 16 insertions(+), 27 deletions(-) (limited to 'mllib') diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index edea5ed3e6..4c96b241eb 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -17,8 +17,6 @@ package org.apache.spark.mllib.classification -import scala.collection.mutable - import org.apache.spark.Logging import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD @@ -63,39 +61,30 @@ class NaiveBayes private (val lambda: Double = 1.0) // smoothing parameter * @param data RDD of (label, array of features) pairs. */ def run(C: Int, D: Int, data: RDD[LabeledPoint]) = { - val locallyReduced = data.mapPartitions { iterator => - val localLabelCounts = mutable.Map.empty[Int, Int].withDefaultValue(0) - val localSummedObservations = - mutable.Map.empty[Int, Array[Double]].withDefaultValue(Array.fill(D)(0.0)) - - for (LabeledPoint(label, features) <- iterator; i = label.toInt) { - localLabelCounts(i) += 1 - localSummedObservations(i) = vectorAdd(localSummedObservations(i), features) - } - - for ((label, count) <- localLabelCounts.toIterator) yield { - label -> (count, localSummedObservations(label)) - } - } - - val reduced = locallyReduced.reduceByKey { (lhs, rhs) => + val countsAndSummedFeatures = data.map { case LabeledPoint(label, features) => + label.toInt ->(1, features) + }.reduceByKey { (lhs, rhs) => (lhs._1 + rhs._1, vectorAdd(lhs._2, rhs._2)) } - val collected = reduced.mapValues { case (count, summed) => + val collected = countsAndSummedFeatures.mapValues { case (count, summedFeatureVector) => val labelWeight = math.log(count + lambda) - val logDenom = math.log(summed.sum + D * lambda) - val weights = summed.map(w => math.log(w + lambda) - logDenom) + val logDenom = math.log(summedFeatureVector.sum + D * lambda) + val weights = summedFeatureVector.map(w => math.log(w + lambda) - logDenom) (count, labelWeight, weights) }.collectAsMap() - val weightPerLabel = { - val N = collected.values.map(_._1).sum - val logDenom = math.log(N + C * lambda) - collected.mapValues(_._2 - logDenom).toArray.sortBy(_._1).map(_._2) - } + // We can simply call `data.count` to get `N`, but that triggers another RDD action, which is + // considerably expensive. + val N = collected.values.map(_._1).sum + val logDenom = math.log(N + C * lambda) + val weightPerLabel = Array.fill[Double](C)(0) + val weightMatrix = Array.fill[Array[Double]](C)(null) - val weightMatrix = collected.mapValues(_._3).toArray.sortBy(_._1).map(_._2) + for ((label, (_, labelWeight, weights)) <- collected) { + weightPerLabel(label) = labelWeight - logDenom + weightMatrix(label) = weights + } new NaiveBayesModel(weightPerLabel, weightMatrix) } -- cgit v1.2.3 From 654f42174aa912fec7355d779e4e02731c535c94 Mon Sep 17 00:00:00 2001 From: "Lian, Cheng" Date: Fri, 27 Dec 2013 04:45:04 +0800 Subject: Reformatted some lines commented by Matei --- .../scala/org/apache/spark/mllib/classification/NaiveBayes.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'mllib') diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index 4c96b241eb..2bc4c5afc0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -49,8 +49,9 @@ class NaiveBayesModel(val weightPerLabel: Array[Double], class NaiveBayes private (val lambda: Double = 1.0) // smoothing parameter extends Serializable with Logging { - private[this] def vectorAdd(v1: Array[Double], v2: Array[Double]) = + private def vectorAdd(v1: Array[Double], v2: Array[Double]) = { v1.zip(v2).map(pair => pair._1 + pair._2) + } /** * Run the algorithm with the configured parameters on an input @@ -62,7 +63,7 @@ class NaiveBayes private (val lambda: Double = 1.0) // smoothing parameter */ def run(C: Int, D: Int, data: RDD[LabeledPoint]) = { val countsAndSummedFeatures = data.map { case LabeledPoint(label, features) => - label.toInt ->(1, features) + label.toInt -> (1, features) }.reduceByKey { (lhs, rhs) => (lhs._1 + rhs._1, vectorAdd(lhs._2, rhs._2)) } -- cgit v1.2.3 From d7086dc28a856ec8856278be108310ec8264a115 Mon Sep 17 00:00:00 2001 From: "Lian, Cheng" Date: Fri, 27 Dec 2013 08:20:41 +0800 Subject: Added Apache license header to NaiveBayesSuite --- .../spark/mllib/classification/NaiveBayesSuite.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) (limited to 'mllib') diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala index cc8d48a42b..a2821347a7 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.mllib.classification import scala.util.Random -- cgit v1.2.3 From f150b6e76c56ed6f604e6dbda7bce6b6278929fb Mon Sep 17 00:00:00 2001 From: "Lian, Cheng" Date: Sun, 29 Dec 2013 17:13:01 +0800 Subject: Response to Reynold's comments --- .../spark/mllib/classification/NaiveBayes.scala | 26 +++++++++++++--------- 1 file changed, 16 insertions(+), 10 deletions(-) (limited to 'mllib') diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index 2bc4c5afc0..d0f3a368e8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -17,20 +17,22 @@ package org.apache.spark.mllib.classification +import org.jblas.DoubleMatrix + import org.apache.spark.Logging import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ -import org.jblas.DoubleMatrix /** * Model for Naive Bayes Classifiers. * - * @param weightPerLabel Weights computed for every label, which's dimension is C. - * @param weightMatrix Weights computed for every label and feature, which's dimension is CXD + * @param weightPerLabel Weights computed for every label, whose dimension is C. + * @param weightMatrix Weights computed for every label and feature, whose dimension is CXD */ -class NaiveBayesModel(val weightPerLabel: Array[Double], - val weightMatrix: Array[Array[Double]]) +class NaiveBayesModel( + @transient val weightPerLabel: Array[Double], + @transient val weightMatrix: Array[Array[Double]]) extends ClassificationModel with Serializable { // Create a column vector that can be used for predictions @@ -50,7 +52,12 @@ class NaiveBayes private (val lambda: Double = 1.0) // smoothing parameter extends Serializable with Logging { private def vectorAdd(v1: Array[Double], v2: Array[Double]) = { - v1.zip(v2).map(pair => pair._1 + pair._2) + var i = 0 + while (i < v1.length) { + v1(i) += v2(i) + i += 1 + } + v1 } /** @@ -79,8 +86,8 @@ class NaiveBayes private (val lambda: Double = 1.0) // smoothing parameter // considerably expensive. val N = collected.values.map(_._1).sum val logDenom = math.log(N + C * lambda) - val weightPerLabel = Array.fill[Double](C)(0) - val weightMatrix = Array.fill[Array[Double]](C)(null) + val weightPerLabel = new Array[Double](C) + val weightMatrix = new Array[Array[Double]](C) for ((label, (_, labelWeight, weights)) <- collected) { weightPerLabel(label) = labelWeight - logDenom @@ -100,8 +107,7 @@ object NaiveBayes { * @param input RDD of (label, array of features) pairs. * @param lambda smooth parameter */ - def train(C: Int, D: Int, input: RDD[LabeledPoint], - lambda: Double = 1.0): NaiveBayesModel = { + def train(C: Int, D: Int, input: RDD[LabeledPoint], lambda: Double = 1.0): NaiveBayesModel = { new NaiveBayes(lambda).run(C, D, input) } } -- cgit v1.2.3 From 6d0e2e86dfbca88abc847d3babac2d1f82d61aaf Mon Sep 17 00:00:00 2001 From: "Lian, Cheng" Date: Mon, 30 Dec 2013 22:46:32 +0800 Subject: Response to comments from Reynold, Ameet and Evan * Arguments renamed according to Ameet's suggestion * Using DoubleMatrix instead of Array[Double] in computation * Removed arguments C (kinds of label) and D (dimension of feature vector) from NaiveBayes.train() * Replaced reduceByKey with foldByKey to avoid modifying original input data --- .../spark/mllib/classification/NaiveBayes.scala | 120 +++++++++++++-------- .../mllib/classification/NaiveBayesSuite.scala | 32 +++--- 2 files changed, 90 insertions(+), 62 deletions(-) (limited to 'mllib') diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index d0f3a368e8..9fd1adddb0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -27,87 +27,115 @@ import org.apache.spark.SparkContext._ /** * Model for Naive Bayes Classifiers. * - * @param weightPerLabel Weights computed for every label, whose dimension is C. - * @param weightMatrix Weights computed for every label and feature, whose dimension is CXD + * @param pi Log of class priors, whose dimension is C. + * @param theta Log of class conditional probabilities, whose dimension is CXD. */ -class NaiveBayesModel( - @transient val weightPerLabel: Array[Double], - @transient val weightMatrix: Array[Array[Double]]) +class NaiveBayesModel(pi: Array[Double], theta: Array[Array[Double]]) extends ClassificationModel with Serializable { // Create a column vector that can be used for predictions - private val _weightPerLabel = new DoubleMatrix(weightPerLabel.length, 1, weightPerLabel:_*) - private val _weightMatrix = new DoubleMatrix(weightMatrix) + private val _pi = new DoubleMatrix(pi.length, 1, pi: _*) + private val _theta = new DoubleMatrix(theta) def predict(testData: RDD[Array[Double]]): RDD[Double] = testData.map(predict) def predict(testData: Array[Double]): Double = { val dataMatrix = new DoubleMatrix(testData.length, 1, testData: _*) - val result = _weightPerLabel.add(_weightMatrix.mmul(dataMatrix)) + val result = _pi.add(_theta.mmul(dataMatrix)) result.argmax() } } -class NaiveBayes private (val lambda: Double = 1.0) // smoothing parameter +/** + * Trains a Naive Bayes model given an RDD of `(label, features)` pairs. + * + * @param lambda The smooth parameter + */ +class NaiveBayes private (val lambda: Double = 1.0) extends Serializable with Logging { - private def vectorAdd(v1: Array[Double], v2: Array[Double]) = { - var i = 0 - while (i < v1.length) { - v1(i) += v2(i) - i += 1 - } - v1 - } - /** - * Run the algorithm with the configured parameters on an input - * RDD of LabeledPoint entries. + * Run the algorithm with the configured parameters on an input RDD of LabeledPoint entries. * - * @param C kind of labels, labels are continuous integers and the maximal label is C-1 - * @param D dimension of feature vectors * @param data RDD of (label, array of features) pairs. */ - def run(C: Int, D: Int, data: RDD[LabeledPoint]) = { - val countsAndSummedFeatures = data.map { case LabeledPoint(label, features) => - label.toInt -> (1, features) - }.reduceByKey { (lhs, rhs) => - (lhs._1 + rhs._1, vectorAdd(lhs._2, rhs._2)) + def run(data: RDD[LabeledPoint]) = { + // Prepares input data, the shape of resulted RDD is: + // + // label: Int -> (count: Int, features: DoubleMatrix) + // + // The added count field is initialized to 1 to enable the following `foldByKey` transformation. + val mappedData = data.map { case LabeledPoint(label, features) => + label.toInt -> (1, new DoubleMatrix(features.length, 1, features: _*)) + } + + // Gets a map from labels to their corresponding sample point counts and summed feature vectors. + // Shape of resulted RDD is: + // + // label: Int -> (count: Int, summedFeatureVector: DoubleMatrix) + // + // Two tricky parts worth explaining: + // + // 1. Feature vectors are summed with the inplace jblas matrix addition operation, thus we + // chose `foldByKey` instead of `reduceByKey` to avoid modifying original input data. + // + // 2. The zero value passed to `foldByKey` contains a `null` rather than a zero vector because + // the dimension of the feature vector is unknown. Calling `data.first.length` to get the + // dimension is not preferable since it requires an expensive RDD action. + val countsAndSummedFeatures = mappedData.foldByKey((0, null)) { (lhs, rhs) => + if (lhs._1 == 0) { + (rhs._1, new DoubleMatrix().copy(rhs._2)) + } else { + (lhs._1 + rhs._1, lhs._2.addi(rhs._2)) + } } val collected = countsAndSummedFeatures.mapValues { case (count, summedFeatureVector) => - val labelWeight = math.log(count + lambda) - val logDenom = math.log(summedFeatureVector.sum + D * lambda) - val weights = summedFeatureVector.map(w => math.log(w + lambda) - logDenom) - (count, labelWeight, weights) + val p = math.log(count + lambda) + val logDenom = math.log(summedFeatureVector.sum + summedFeatureVector.length * lambda) + val t = summedFeatureVector + var i = 0 + while (i < t.length) { + t.put(i, math.log(t.get(i) + lambda) - logDenom) + i += 1 + } + (count, p, t) }.collectAsMap() - // We can simply call `data.count` to get `N`, but that triggers another RDD action, which is - // considerably expensive. + // Total sample count. Calling `data.count` to get `N` is not preferable since it triggers + // an expensive RDD action val N = collected.values.map(_._1).sum + + // Kinds of label. + val C = collected.size + val logDenom = math.log(N + C * lambda) - val weightPerLabel = new Array[Double](C) - val weightMatrix = new Array[Array[Double]](C) + val pi = new Array[Double](C) + val theta = new Array[Array[Double]](C) - for ((label, (_, labelWeight, weights)) <- collected) { - weightPerLabel(label) = labelWeight - logDenom - weightMatrix(label) = weights + for ((label, (_, p, t)) <- collected) { + pi(label) = p - logDenom + theta(label) = t.toArray } - new NaiveBayesModel(weightPerLabel, weightMatrix) + new NaiveBayesModel(pi, theta) } } object NaiveBayes { /** - * Train a naive bayes model given an RDD of (label, features) pairs. + * Trains a Naive Bayes model given an RDD of `(label, features)` pairs. + * + * This is the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of + * discrete data. For example, by converting documents into TF-IDF vectors, it can be used for + * document classification. By making every vector a 0-1 vector. it can also be used as + * Bernoulli NB ([[http://tinyurl.com/p7c96j6]]). * - * @param C kind of labels, the maximal label is C-1 - * @param D dimension of feature vectors - * @param input RDD of (label, array of features) pairs. - * @param lambda smooth parameter + * @param input RDD of `(label, array of features)` pairs. Every vector should be a frequency + * vector or a count vector. + * @param lambda The smooth parameter */ - def train(C: Int, D: Int, input: RDD[LabeledPoint], lambda: Double = 1.0): NaiveBayesModel = { - new NaiveBayes(lambda).run(C, D, input) + def train(input: RDD[LabeledPoint], lambda: Double = 1.0): NaiveBayesModel = { + new NaiveBayes(lambda).run(input) } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala index a2821347a7..18575f410c 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala @@ -38,20 +38,20 @@ object NaiveBayesSuite { // Generate input of the form Y = (weightMatrix*x).argmax() def generateNaiveBayesInput( - weightPerLabel: Array[Double], // 1XC - weightsMatrix: Array[Array[Double]], // CXD + pi: Array[Double], // 1XC + theta: Array[Array[Double]], // CXD nPoints: Int, seed: Int): Seq[LabeledPoint] = { - val D = weightsMatrix(0).length + val D = theta(0).length val rnd = new Random(seed) - val _weightPerLabel = weightPerLabel.map(math.pow(math.E, _)) - val _weightMatrix = weightsMatrix.map(row => row.map(math.pow(math.E, _))) + val _pi = pi.map(math.pow(math.E, _)) + val _theta = theta.map(row => row.map(math.pow(math.E, _))) for (i <- 0 until nPoints) yield { - val y = calcLabel(rnd.nextDouble(), _weightPerLabel) + val y = calcLabel(rnd.nextDouble(), _pi) val xi = Array.tabulate[Double](D) { j => - if (rnd.nextDouble() < _weightMatrix(y)(j)) 1 else 0 + if (rnd.nextDouble() < _theta(y)(j)) 1 else 0 } LabeledPoint(y, xi) @@ -83,20 +83,20 @@ class NaiveBayesSuite extends FunSuite with BeforeAndAfterAll { test("Naive Bayes") { val nPoints = 10000 - val weightPerLabel = Array(math.log(0.5), math.log(0.3), math.log(0.2)) - val weightsMatrix = Array( - Array(math.log(0.91), math.log(0.03), math.log(0.03), math.log(0.03)), // label 0 - Array(math.log(0.03), math.log(0.91), math.log(0.03), math.log(0.03)), // label 1 - Array(math.log(0.03), math.log(0.03), math.log(0.91), math.log(0.03)) // label 2 - ) + val pi = Array(0.5, 0.3, 0.2).map(math.log) + val theta = Array( + Array(0.91, 0.03, 0.03, 0.03), // label 0 + Array(0.03, 0.91, 0.03, 0.03), // label 1 + Array(0.03, 0.03, 0.91, 0.03) // label 2 + ).map(_.map(math.log)) - val testData = NaiveBayesSuite.generateNaiveBayesInput(weightPerLabel, weightsMatrix, nPoints, 42) + val testData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 42) val testRDD = sc.parallelize(testData, 2) testRDD.cache() - val model = NaiveBayes.train(3, 4, testRDD) + val model = NaiveBayes.train(testRDD) - val validationData = NaiveBayesSuite.generateNaiveBayesInput(weightPerLabel, weightsMatrix, nPoints, 17) + val validationData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 17) val validationRDD = sc.parallelize(validationData, 2) // Test prediction on RDD. -- cgit v1.2.3 From dd6033e6853e32e9de2c910797c7fbc0072e7491 Mon Sep 17 00:00:00 2001 From: "Lian, Cheng" Date: Thu, 2 Jan 2014 01:38:24 +0800 Subject: Aggregated all sample points to driver without any shuffle --- .../spark/mllib/classification/NaiveBayes.scala | 76 ++++++++-------------- .../mllib/classification/NaiveBayesSuite.scala | 8 +-- 2 files changed, 31 insertions(+), 53 deletions(-) (limited to 'mllib') diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index 9fd1adddb0..524300d6ae 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -17,12 +17,13 @@ package org.apache.spark.mllib.classification +import scala.collection.mutable + import org.jblas.DoubleMatrix import org.apache.spark.Logging import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD -import org.apache.spark.SparkContext._ /** * Model for Naive Bayes Classifiers. @@ -60,62 +61,39 @@ class NaiveBayes private (val lambda: Double = 1.0) * @param data RDD of (label, array of features) pairs. */ def run(data: RDD[LabeledPoint]) = { - // Prepares input data, the shape of resulted RDD is: - // - // label: Int -> (count: Int, features: DoubleMatrix) - // - // The added count field is initialized to 1 to enable the following `foldByKey` transformation. - val mappedData = data.map { case LabeledPoint(label, features) => - label.toInt -> (1, new DoubleMatrix(features.length, 1, features: _*)) - } - - // Gets a map from labels to their corresponding sample point counts and summed feature vectors. - // Shape of resulted RDD is: - // - // label: Int -> (count: Int, summedFeatureVector: DoubleMatrix) + // Aggregates all sample points to driver side to get sample count and summed feature vector + // for each label. The shape of `zeroCombiner` & `aggregated` is: // - // Two tricky parts worth explaining: - // - // 1. Feature vectors are summed with the inplace jblas matrix addition operation, thus we - // chose `foldByKey` instead of `reduceByKey` to avoid modifying original input data. - // - // 2. The zero value passed to `foldByKey` contains a `null` rather than a zero vector because - // the dimension of the feature vector is unknown. Calling `data.first.length` to get the - // dimension is not preferable since it requires an expensive RDD action. - val countsAndSummedFeatures = mappedData.foldByKey((0, null)) { (lhs, rhs) => - if (lhs._1 == 0) { - (rhs._1, new DoubleMatrix().copy(rhs._2)) - } else { - (lhs._1 + rhs._1, lhs._2.addi(rhs._2)) + // label: Int -> (count: Int, featuresSum: DoubleMatrix) + val zeroCombiner = mutable.Map.empty[Int, (Int, DoubleMatrix)] + val aggregated = data.aggregate(zeroCombiner)({ (combiner, point) => + point match { + case LabeledPoint(label, features) => + val (count, featuresSum) = combiner.getOrElse(label.toInt, (0, DoubleMatrix.zeros(1))) + val fs = new DoubleMatrix(features.length, 1, features: _*) + combiner += label.toInt -> (count + 1, featuresSum.addi(fs)) } - } - - val collected = countsAndSummedFeatures.mapValues { case (count, summedFeatureVector) => - val p = math.log(count + lambda) - val logDenom = math.log(summedFeatureVector.sum + summedFeatureVector.length * lambda) - val t = summedFeatureVector - var i = 0 - while (i < t.length) { - t.put(i, math.log(t.get(i) + lambda) - logDenom) - i += 1 + }, { (lhs, rhs) => + for ((label, (c, fs)) <- rhs) { + val (count, featuresSum) = lhs.getOrElse(label, (0, DoubleMatrix.zeros(1))) + lhs(label) = (count + c, featuresSum.addi(fs)) } - (count, p, t) - }.collectAsMap() - - // Total sample count. Calling `data.count` to get `N` is not preferable since it triggers - // an expensive RDD action - val N = collected.values.map(_._1).sum + lhs + }) - // Kinds of label. - val C = collected.size + // Kinds of label + val C = aggregated.size + // Total sample count + val N = aggregated.values.map(_._1).sum - val logDenom = math.log(N + C * lambda) val pi = new Array[Double](C) val theta = new Array[Array[Double]](C) + val piLogDenom = math.log(N + C * lambda) - for ((label, (_, p, t)) <- collected) { - pi(label) = p - logDenom - theta(label) = t.toArray + for ((label, (count, fs)) <- aggregated) { + val thetaLogDenom = math.log(fs.sum() + fs.length * lambda) + pi(label) = math.log(count + lambda) - piLogDenom + theta(label) = fs.toArray.map(f => math.log(f + lambda) - thetaLogDenom) } new NaiveBayesModel(pi, theta) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala index 18575f410c..b615f76e66 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala @@ -27,16 +27,16 @@ import org.apache.spark.SparkContext object NaiveBayesSuite { - private def calcLabel(p: Double, weightPerLabel: Array[Double]): Int = { + private def calcLabel(p: Double, pi: Array[Double]): Int = { var sum = 0.0 - for (j <- 0 until weightPerLabel.length) { - sum += weightPerLabel(j) + for (j <- 0 until pi.length) { + sum += pi(j) if (p < sum) return j } -1 } - // Generate input of the form Y = (weightMatrix*x).argmax() + // Generate input of the form Y = (theta * x).argmax() def generateNaiveBayesInput( pi: Array[Double], // 1XC theta: Array[Array[Double]], // CXD -- cgit v1.2.3 From 67f937ec222c5a7db5286c0af0ec6f9c482d2af6 Mon Sep 17 00:00:00 2001 From: Hossein Falaki Date: Fri, 3 Jan 2014 15:34:16 -0800 Subject: Added a method to enable bulk prediction --- .../recommendation/MatrixFactorizationModel.scala | 24 +++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) (limited to 'mllib') diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index af43d89c70..bc13a66dbe 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -20,7 +20,9 @@ package org.apache.spark.mllib.recommendation import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ + import org.jblas._ +import java.nio.{ByteOrder, ByteBuffer} /** * Model representing the result of matrix factorization. @@ -44,6 +46,26 @@ class MatrixFactorizationModel( userVector.dot(productVector) } - // TODO: Figure out what good bulk prediction methods would look like. + /** + * Predict the rating of many users for many products. + * The output RDD has an element per each element in the input RDD (including all duplicates) + * unless a user or product is missing in the training set. + * + * @param usersProducts RDD of (user, product) pairs. + * @return RDD of Ratings. + */ + def predict(usersProducts: RDD[(Int, Int)]): RDD[Rating] = { + val users = userFeatures.join(usersProducts).map{ + case (user, (uFeatures, product)) => (product, (user, uFeatures)) + } + users.join(productFeatures).map { + case (product, ((user, uFeatures), pFeatures)) => + val userVector = new DoubleMatrix(uFeatures) + val productVector = new DoubleMatrix(pFeatures) + Rating(user, product, userVector.dot(productVector)) + } + } + + // TODO: Figure out what other good bulk prediction methods would look like. // Probably want a way to get the top users for a product or vice-versa. } -- cgit v1.2.3 From 2c1cba851c2954bacf10006c0d5dad67aba77ab5 Mon Sep 17 00:00:00 2001 From: Hossein Falaki Date: Fri, 3 Jan 2014 15:35:20 -0800 Subject: Added unit tests for bulk prediction in MatrixFactorizationModel --- .../spark/mllib/recommendation/ALSSuite.scala | 33 ++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) (limited to 'mllib') diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala index fafc5ec5f2..e683a90f57 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala @@ -90,18 +90,34 @@ class ALSSuite extends FunSuite with BeforeAndAfterAll { testALS(50, 100, 1, 15, 0.7, 0.3) } + test("rank-1 matrices bulk") { + testALS(50, 100, 1, 15, 0.7, 0.3, false, true) + } + test("rank-2 matrices") { testALS(100, 200, 2, 15, 0.7, 0.3) } + test("rank-2 matrices bulk") { + testALS(100, 200, 2, 15, 0.7, 0.3, false, true) + } + test("rank-1 matrices implicit") { testALS(80, 160, 1, 15, 0.7, 0.4, true) } + test("rank-1 matrices implicit bulk") { + testALS(80, 160, 1, 15, 0.7, 0.4, true, true) + } + test("rank-2 matrices implicit") { testALS(100, 200, 2, 15, 0.7, 0.4, true) } + test("rank-2 matrices implicit bulk") { + testALS(100, 200, 2, 15, 0.7, 0.4, true, true) + } + /** * Test if we can correctly factorize R = U * P where U and P are of known rank. * @@ -111,9 +127,12 @@ class ALSSuite extends FunSuite with BeforeAndAfterAll { * @param iterations number of iterations to run * @param samplingRate what fraction of the user-product pairs are known * @param matchThreshold max difference allowed to consider a predicted rating correct + * @param implicitPrefs flag to test implicit feedback + * @param bulkPredict flag to test bulk prediciton */ def testALS(users: Int, products: Int, features: Int, iterations: Int, - samplingRate: Double, matchThreshold: Double, implicitPrefs: Boolean = false) + samplingRate: Double, matchThreshold: Double, implicitPrefs: Boolean = false, + bulkPredict: Boolean = false) { val (sampledRatings, trueRatings, truePrefs) = ALSSuite.generateRatings(users, products, features, samplingRate, implicitPrefs) @@ -130,7 +149,17 @@ class ALSSuite extends FunSuite with BeforeAndAfterAll { for ((p, vec) <- model.productFeatures.collect(); i <- 0 until features) { predictedP.put(p, i, vec(i)) } - val predictedRatings = predictedU.mmul(predictedP.transpose) + val predictedRatings = bulkPredict match { + case false => predictedU.mmul(predictedP.transpose) + case true => + val allRatings = new DoubleMatrix(users, products) + val usersProducts = for (u <- 0 until users; p <- 0 until products) yield (u, p) + val userProductsRDD = sc.parallelize(usersProducts) + model.predict(userProductsRDD).collect().foreach { elem => + allRatings.put(elem.user, elem.product, elem.rating) + } + allRatings + } if (!implicitPrefs) { for (u <- 0 until users; p <- 0 until products) { -- cgit v1.2.3 From dfe57fa84cea9d8bbca9a89a293efcaa95eae9e7 Mon Sep 17 00:00:00 2001 From: Hossein Falaki Date: Fri, 3 Jan 2014 15:40:53 -0800 Subject: Removed unnecessary blank line --- .../org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala | 1 - 1 file changed, 1 deletion(-) (limited to 'mllib') diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index bc13a66dbe..8caecf0fa1 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -20,7 +20,6 @@ package org.apache.spark.mllib.recommendation import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ - import org.jblas._ import java.nio.{ByteOrder, ByteBuffer} -- cgit v1.2.3 From 8d0c2f7399ebf7a38346a60cf84d7020c0b1dba1 Mon Sep 17 00:00:00 2001 From: Hossein Falaki Date: Sat, 4 Jan 2014 16:23:17 -0800 Subject: Added python binding for bulk recommendation --- .../apache/spark/mllib/api/python/PythonMLLibAPI.scala | 18 ++++++++++++++++++ .../recommendation/MatrixFactorizationModel.scala | 10 +++++++++- python/pyspark/mllib/_common.py | 10 ++++++++++ python/pyspark/mllib/recommendation.py | 10 +++++++++- 4 files changed, 46 insertions(+), 2 deletions(-) (limited to 'mllib') diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 8247c1ebc5..be2628fac5 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -206,6 +206,24 @@ class PythonMLLibAPI extends Serializable { return new Rating(user, product, rating) } + private[spark] def unpackTuple(tupleBytes: Array[Byte]): (Int, Int) = { + val bb = ByteBuffer.wrap(tupleBytes) + bb.order(ByteOrder.nativeOrder()) + val v1 = bb.getInt() + val v2 = bb.getInt() + (v1, v2) + } + + private[spark] def serializeRating(rate: Rating): Array[Byte] = { + val bytes = new Array[Byte](24) + val bb = ByteBuffer.wrap(bytes) + bb.order(ByteOrder.nativeOrder()) + bb.putDouble(rate.user.toDouble) + bb.putDouble(rate.product.toDouble) + bb.putDouble(rate.rating) + bytes + } + /** * Java stub for Python mllib ALS.train(). This stub returns a handle * to the Java object instead of the content of the Java object. Extra care diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 8caecf0fa1..2c3e828300 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -19,9 +19,11 @@ package org.apache.spark.mllib.recommendation import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.api.python.PythonMLLibAPI import org.jblas._ -import java.nio.{ByteOrder, ByteBuffer} +import org.apache.spark.api.java.JavaRDD + /** * Model representing the result of matrix factorization. @@ -65,6 +67,12 @@ class MatrixFactorizationModel( } } + def predictJavaRDD(usersProductsJRDD: JavaRDD[Array[Byte]]): JavaRDD[Array[Byte]] = { + val pythonAPI = new PythonMLLibAPI() + val usersProducts = usersProductsJRDD.rdd.map(xBytes => pythonAPI.unpackTuple(xBytes)) + predict(usersProducts).map(rate => pythonAPI.serializeRating(rate)) + } + // TODO: Figure out what other good bulk prediction methods would look like. // Probably want a way to get the top users for a product or vice-versa. } diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py index e74ba0fabc..c818fc4d97 100644 --- a/python/pyspark/mllib/_common.py +++ b/python/pyspark/mllib/_common.py @@ -213,6 +213,16 @@ def _serialize_rating(r): intpart[0], intpart[1], doublepart[0] = r return ba +def _deserialize_rating(ba): + ar = ndarray(shape=(3, ), buffer=ba, dtype="float64", order='C') + return ar.copy() + +def _serialize_tuple(t): + ba = bytearray(8) + intpart = ndarray(shape=[2], buffer=ba, dtype=int32) + intpart[0], intpart[1] = t + return ba + def _test(): import doctest globs = globals().copy() diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py index 14d06cba21..c81b482a87 100644 --- a/python/pyspark/mllib/recommendation.py +++ b/python/pyspark/mllib/recommendation.py @@ -20,7 +20,10 @@ from pyspark.mllib._common import \ _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \ _serialize_double_matrix, _deserialize_double_matrix, \ _serialize_double_vector, _deserialize_double_vector, \ - _get_initial_weights, _serialize_rating, _regression_train_wrapper + _get_initial_weights, _serialize_rating, _regression_train_wrapper, \ + _serialize_tuple, _deserialize_rating +from pyspark.serializers import BatchedSerializer +from pyspark.rdd import RDD class MatrixFactorizationModel(object): """A matrix factorisation model trained by regularized alternating @@ -45,6 +48,11 @@ class MatrixFactorizationModel(object): def predict(self, user, product): return self._java_model.predict(user, product) + def predictAll(self, usersProducts): + usersProductsJRDD = _get_unmangled_rdd(usersProducts, _serialize_tuple) + return RDD(self._java_model.predictJavaRDD(usersProductsJRDD._jrdd), + self._context, BatchedSerializer(_deserialize_rating, self._context._batchSize)) + class ALS(object): @classmethod def train(cls, sc, ratings, rank, iterations=5, lambda_=0.01, blocks=-1): -- cgit v1.2.3 From a72107284ae4d8b6c7c47ded31c6784732028603 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Mon, 6 Jan 2014 12:30:17 +0800 Subject: fix logistic loss bug --- .../src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'mllib') diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala index 749e7364f4..c590492e7a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala @@ -50,8 +50,8 @@ class LogisticGradient extends Gradient { val gradient = data.mul(gradientMultiplier) val loss = - if (margin > 0) { - math.log(1 + math.exp(0 - margin)) + if (label > 0) { + math.log(1 + math.exp(margin)) } else { math.log(1 + math.exp(margin)) - margin } -- cgit v1.2.3 From 05e6d5b454b74b222b4131af24c8f750c30a05fb Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Mon, 6 Jan 2014 16:54:00 +0800 Subject: Added GradientDescentSuite --- .../mllib/optimization/GradientDescentSuite.scala | 116 +++++++++++++++++++++ 1 file changed, 116 insertions(+) create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala (limited to 'mllib') diff --git a/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala new file mode 100644 index 0000000000..a6028a1e98 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.optimization + +import scala.util.Random +import scala.collection.JavaConversions._ + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.FunSuite +import org.scalatest.matchers.ShouldMatchers + +import org.apache.spark.SparkContext +import org.apache.spark.mllib.regression._ + +object GradientDescentSuite { + + def generateLogisticInputAsList( + offset: Double, + scale: Double, + nPoints: Int, + seed: Int): java.util.List[LabeledPoint] = { + seqAsJavaList(generateGDInput(offset, scale, nPoints, seed)) + } + + // Generate input of the form Y = logistic(offset + scale * X) + def generateGDInput( + offset: Double, + scale: Double, + nPoints: Int, + seed: Int): Seq[LabeledPoint] = { + val rnd = new Random(seed) + val x1 = Array.fill[Double](nPoints)(rnd.nextGaussian()) + + val unifRand = new scala.util.Random(45) + val rLogis = (0 until nPoints).map { i => + val u = unifRand.nextDouble() + math.log(u) - math.log(1.0-u) + } + + val y: Seq[Int] = (0 until nPoints).map { i => + val yVal = offset + scale * x1(i) + rLogis(i) + if (yVal > 0) 1 else 0 + } + + val testData = (0 until nPoints).map(i => LabeledPoint(y(i), Array(x1(i)))) + testData + } +} + +class GradientDescentSuite extends FunSuite with BeforeAndAfterAll with ShouldMatchers { + @transient private var sc: SparkContext = _ + + override def beforeAll() { + sc = new SparkContext("local", "test") + } + + override def afterAll() { + sc.stop() + System.clearProperty("spark.driver.port") + } + + test("Assert the loss is decreasing.") { + val nPoints = 10000 + val A = 2.0 + val B = -1.5 + + val initialB = -1.0 + val initialWeights = Array(initialB) + + val gradient = new LogisticGradient() + val updater = new SimpleUpdater() + val stepSize = 1.0 + val numIterations = 10 + val regParam = 0 + val miniBatchFrac = 1.0 + + // Add a extra variable consisting of all 1.0's for the intercept. + val testData = GradientDescentSuite.generateGDInput(A, B, nPoints, 42) + val data = testData.map { case LabeledPoint(label, features) => + label -> Array(1.0, features: _*) + } + + val dataRDD = sc.parallelize(data, 2).cache() + val initialWeightsWithIntercept = Array(1.0, initialWeights: _*) + + val (_, loss) = GradientDescent.runMiniBatchSGD( + dataRDD, + gradient, + updater, + stepSize, + numIterations, + regParam, + miniBatchFrac, + initialWeightsWithIntercept) + + assert(loss.last - loss.head < 0, "loss isn't decreasing.") + + val lossDiff = loss.init.zip(loss.tail).map { case (lhs, rhs) => lhs - rhs } + assert(lossDiff.count(_ > 0).toDouble / lossDiff.size > 0.8) + } +} -- cgit v1.2.3 From 11a93fb5a8fafa940db27b652e4c21f6713ed8d1 Mon Sep 17 00:00:00 2001 From: Hossein Falaki Date: Mon, 6 Jan 2014 12:18:03 -0800 Subject: Added serializing method for Rating object --- .../spark/mllib/api/python/PythonMLLibAPI.scala | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) (limited to 'mllib') diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index be2628fac5..2d8623392e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -197,6 +197,7 @@ class PythonMLLibAPI extends Serializable { return ret } + /** Unpack a Rating object from an array of bytes */ private def unpackRating(ratingBytes: Array[Byte]): Rating = { val bb = ByteBuffer.wrap(ratingBytes) bb.order(ByteOrder.nativeOrder()) @@ -206,6 +207,7 @@ class PythonMLLibAPI extends Serializable { return new Rating(user, product, rating) } + /** Unpack a tuple of Ints from an array of bytes */ private[spark] def unpackTuple(tupleBytes: Array[Byte]): (Int, Int) = { val bb = ByteBuffer.wrap(tupleBytes) bb.order(ByteOrder.nativeOrder()) @@ -214,13 +216,23 @@ class PythonMLLibAPI extends Serializable { (v1, v2) } + /** + * Serialize a Rating object into an array of bytes. + * It can be deserialized using RatingDeserializer(). + * + * @param rate + * @return + */ private[spark] def serializeRating(rate: Rating): Array[Byte] = { - val bytes = new Array[Byte](24) + val len = 3 + val bytes = new Array[Byte](4 + 8 * len) val bb = ByteBuffer.wrap(bytes) bb.order(ByteOrder.nativeOrder()) - bb.putDouble(rate.user.toDouble) - bb.putDouble(rate.product.toDouble) - bb.putDouble(rate.rating) + bb.putInt(len) + val db = bb.asDoubleBuffer() + db.put(rate.user.toDouble) + db.put(rate.product.toDouble) + db.put(rate.rating) bytes } -- cgit v1.2.3 From 04132ea9b20a95cd68482605d4022f692bb556e5 Mon Sep 17 00:00:00 2001 From: Hossein Falaki Date: Mon, 6 Jan 2014 12:19:08 -0800 Subject: Added Rating deserializer --- .../recommendation/MatrixFactorizationModel.scala | 9 ++++++++- python/pyspark/mllib/_common.py | 21 ++++++++++++++++++--- 2 files changed, 26 insertions(+), 4 deletions(-) (limited to 'mllib') diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 2c3e828300..443fc5de5b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -67,7 +67,14 @@ class MatrixFactorizationModel( } } - def predictJavaRDD(usersProductsJRDD: JavaRDD[Array[Byte]]): JavaRDD[Array[Byte]] = { + /** + * Predict the rating of many users for many products. + * This is a Java stub for python predictAll() + * + * @param usersProductsJRDD A JavaRDD with serialized tuples (user, product) + * @return JavaRDD of serialized Rating objects. + */ + def predict(usersProductsJRDD: JavaRDD[Array[Byte]]): JavaRDD[Array[Byte]] = { val pythonAPI = new PythonMLLibAPI() val usersProducts = usersProductsJRDD.rdd.map(xBytes => pythonAPI.unpackTuple(xBytes)) predict(usersProducts).map(rate => pythonAPI.serializeRating(rate)) diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py index c818fc4d97..769d88dfb9 100644 --- a/python/pyspark/mllib/_common.py +++ b/python/pyspark/mllib/_common.py @@ -18,6 +18,9 @@ from numpy import ndarray, copyto, float64, int64, int32, ones, array_equal, array, dot, shape from pyspark import SparkContext +from pyspark.serializers import Serializer +import struct + # Double vector format: # # [8-byte 1] [8-byte length] [length*8 bytes of data] @@ -213,9 +216,21 @@ def _serialize_rating(r): intpart[0], intpart[1], doublepart[0] = r return ba -def _deserialize_rating(ba): - ar = ndarray(shape=(3, ), buffer=ba, dtype="float64", order='C') - return ar.copy() +class RatingDeserializer(Serializer): + def loads(self, stream): + length = struct.unpack("!i", stream.read(4))[0] + ba = stream.read(length) + res = ndarray(shape=(3, ), buffer=ba, dtype="float64", offset=4) + return int(res[0]), int(res[1]), res[2] + + def load_stream(self, stream): + while True: + try: + yield self.loads(stream) + except struct.error: + return + except EOFError: + return def _serialize_tuple(t): ba = bytearray(8) -- cgit v1.2.3