From 3dc655aa19f678219e5d999fe97ab769567ffb1c Mon Sep 17 00:00:00 2001 From: Frank Dai Date: Wed, 25 Dec 2013 16:50:42 +0800 Subject: standard Naive Bayes classifier --- .../spark/mllib/classification/NaiveBayes.scala | 103 +++++++++++++++++++++ .../mllib/classification/NaiveBayesSuite.scala | 92 ++++++++++++++++++ 2 files changed, 195 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala (limited to 'mllib') diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala new file mode 100644 index 0000000000..f1b0e6ee6a --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.classification + +import scala.collection.mutable + +import org.apache.spark.Logging +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.rdd.RDD +import org.apache.spark.SparkContext._ +import org.jblas.DoubleMatrix + +/** + * Model for Naive Bayes Classifiers. + * + * @param weightPerLabel Weights computed for every label, which's dimension is C. + * @param weightMatrix Weights computed for every label and feature, which's dimension is CXD + */ +class NaiveBayesModel(val weightPerLabel: Array[Double], + val weightMatrix: Array[Array[Double]]) + extends ClassificationModel with Serializable { + + // Create a column vector that can be used for predictions + private val _weightPerLabel = new DoubleMatrix(weightPerLabel.length, 1, weightPerLabel:_*) + private val _weightMatrix = new DoubleMatrix(weightMatrix) + + def predict(testData: RDD[Array[Double]]): RDD[Double] = testData.map(predict) + + def predict(testData: Array[Double]): Double = { + val dataMatrix = new DoubleMatrix(testData.length, 1, testData: _*) + val result = _weightPerLabel.add(_weightMatrix.mmul(dataMatrix)) + result.argmax() + } +} + + + +class NaiveBayes private (val lambda: Double = 1.0) // smoothing parameter + extends Serializable with Logging { + + /** + * Run the algorithm with the configured parameters on an input + * RDD of LabeledPoint entries. + * + * @param C kind of labels, labels are continuous integers and the maximal label is C-1 + * @param D dimension of feature vectors + * @param data RDD of (label, array of features) pairs. + */ + def run(C: Int, D: Int, data: RDD[LabeledPoint]): NaiveBayesModel = { + val groupedData = data.map(p => p.label.toInt -> p.features).groupByKey() + + val countPerLabel = groupedData.mapValues(_.size) + val logDenominator = math.log(data.count() + C * lambda) + val weightPerLabel = countPerLabel.mapValues { + count => math.log(count + lambda) - logDenominator + } + + val summedObservations = groupedData.mapValues(_.reduce { + (lhs, rhs) => lhs.zip(rhs).map(pair => pair._1 + pair._2) + }) + + val weightsMatrix = summedObservations.mapValues { weights => + val sum = weights.sum + val logDenom = math.log(sum + D * lambda) + weights.map(w => math.log(w + lambda) - logDenom) + } + + val labelWeights = weightPerLabel.collect().sorted.map(_._2) + val weightsMat = weightsMatrix.collect().sortBy(_._1).map(_._2) + + new NaiveBayesModel(labelWeights, weightsMat) + } +} + +object NaiveBayes { + /** + * Train a naive bayes model given an RDD of (label, features) pairs. + * + * @param C kind of labels, the maximal label is C-1 + * @param D dimension of feature vectors + * @param input RDD of (label, array of features) pairs. + * @param lambda smooth parameter + */ + def train(C: Int, D: Int, input: RDD[LabeledPoint], + lambda: Double = 1.0): NaiveBayesModel = { + new NaiveBayes(lambda).run(C, D, input) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala new file mode 100644 index 0000000000..d871ed3672 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala @@ -0,0 +1,92 @@ +package org.apache.spark.mllib.classification + +import scala.collection.JavaConversions._ +import scala.util.Random + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.FunSuite + +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.SparkContext + +object NaiveBayesSuite { + + private def calcLabel(p: Double, weightPerLabel: Array[Double]): Int = { + var sum = 0.0 + for (j <- 0 until weightPerLabel.length) { + sum += weightPerLabel(j) + if (p < sum) return j + } + -1 + } + + // Generate input of the form Y = (weightMatrix*x).argmax() + def generateNaiveBayesInput( + weightPerLabel: Array[Double], // 1XC + weightsMatrix: Array[Array[Double]], // CXD + nPoints: Int, + seed: Int): Seq[LabeledPoint] = { + val D = weightsMatrix(0).length + val rnd = new Random(seed) + + val _weightPerLabel = weightPerLabel.map(math.pow(math.E, _)) + val _weightMatrix = weightsMatrix.map(row => row.map(math.pow(math.E, _))) + + for (i <- 0 until nPoints) yield { + val y = calcLabel(rnd.nextDouble(), _weightPerLabel) + val xi = Array.tabulate[Double](D) { j => + if (rnd.nextDouble() < _weightMatrix(y)(j)) 1 else 0 + } + + LabeledPoint(y, xi) + } + } +} + +class NaiveBayesSuite extends FunSuite with BeforeAndAfterAll { + @transient private var sc: SparkContext = _ + + override def beforeAll() { + sc = new SparkContext("local", "test") + } + + override def afterAll() { + sc.stop() + System.clearProperty("spark.driver.port") + } + + def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) { + val numOffPredictions = predictions.zip(input).count { + case (prediction, expected) => + prediction != expected.label + } + // At least 80% of the predictions should be on. + assert(numOffPredictions < input.length / 5) + } + + test("Naive Bayes") { + val nPoints = 10000 + + val weightPerLabel = Array(math.log(0.5), math.log(0.3), math.log(0.2)) + val weightsMatrix = Array( + Array(math.log(0.91), math.log(0.03), math.log(0.03), math.log(0.03)), // label 0 + Array(math.log(0.03), math.log(0.91), math.log(0.03), math.log(0.03)), // label 1 + Array(math.log(0.03), math.log(0.03), math.log(0.91), math.log(0.03)) // label 2 + ) + + val testData = NaiveBayesSuite.generateNaiveBayesInput(weightPerLabel, weightsMatrix, nPoints, 42) + val testRDD = sc.parallelize(testData, 2) + testRDD.cache() + + val model = NaiveBayes.train(3, 4, testRDD) + + val validationData = NaiveBayesSuite.generateNaiveBayesInput(weightPerLabel, weightsMatrix, nPoints, 17) + val validationRDD = sc.parallelize(validationData, 2) + + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + } +} -- cgit v1.2.3 From 3bb714eaa3bdb7b7c33f6e5263c683f4c4beeddc Mon Sep 17 00:00:00 2001 From: "Lian, Cheng" Date: Wed, 25 Dec 2013 17:15:38 +0800 Subject: Refactored NaiveBayes * Minimized shuffle output with mapPartitions. * Reduced RDD actions from 3 to 1. --- .../spark/mllib/classification/NaiveBayes.scala | 60 +++++++++++++--------- .../mllib/classification/NaiveBayesSuite.scala | 9 ++-- 2 files changed, 41 insertions(+), 28 deletions(-) (limited to 'mllib') diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index f1b0e6ee6a..edea5ed3e6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -48,11 +48,12 @@ class NaiveBayesModel(val weightPerLabel: Array[Double], } } - - class NaiveBayes private (val lambda: Double = 1.0) // smoothing parameter extends Serializable with Logging { + private[this] def vectorAdd(v1: Array[Double], v2: Array[Double]) = + v1.zip(v2).map(pair => pair._1 + pair._2) + /** * Run the algorithm with the configured parameters on an input * RDD of LabeledPoint entries. @@ -61,29 +62,42 @@ class NaiveBayes private (val lambda: Double = 1.0) // smoothing parameter * @param D dimension of feature vectors * @param data RDD of (label, array of features) pairs. */ - def run(C: Int, D: Int, data: RDD[LabeledPoint]): NaiveBayesModel = { - val groupedData = data.map(p => p.label.toInt -> p.features).groupByKey() - - val countPerLabel = groupedData.mapValues(_.size) - val logDenominator = math.log(data.count() + C * lambda) - val weightPerLabel = countPerLabel.mapValues { - count => math.log(count + lambda) - logDenominator + def run(C: Int, D: Int, data: RDD[LabeledPoint]) = { + val locallyReduced = data.mapPartitions { iterator => + val localLabelCounts = mutable.Map.empty[Int, Int].withDefaultValue(0) + val localSummedObservations = + mutable.Map.empty[Int, Array[Double]].withDefaultValue(Array.fill(D)(0.0)) + + for (LabeledPoint(label, features) <- iterator; i = label.toInt) { + localLabelCounts(i) += 1 + localSummedObservations(i) = vectorAdd(localSummedObservations(i), features) + } + + for ((label, count) <- localLabelCounts.toIterator) yield { + label -> (count, localSummedObservations(label)) + } + } + + val reduced = locallyReduced.reduceByKey { (lhs, rhs) => + (lhs._1 + rhs._1, vectorAdd(lhs._2, rhs._2)) } - - val summedObservations = groupedData.mapValues(_.reduce { - (lhs, rhs) => lhs.zip(rhs).map(pair => pair._1 + pair._2) - }) - - val weightsMatrix = summedObservations.mapValues { weights => - val sum = weights.sum - val logDenom = math.log(sum + D * lambda) - weights.map(w => math.log(w + lambda) - logDenom) + + val collected = reduced.mapValues { case (count, summed) => + val labelWeight = math.log(count + lambda) + val logDenom = math.log(summed.sum + D * lambda) + val weights = summed.map(w => math.log(w + lambda) - logDenom) + (count, labelWeight, weights) + }.collectAsMap() + + val weightPerLabel = { + val N = collected.values.map(_._1).sum + val logDenom = math.log(N + C * lambda) + collected.mapValues(_._2 - logDenom).toArray.sortBy(_._1).map(_._2) } - - val labelWeights = weightPerLabel.collect().sorted.map(_._2) - val weightsMat = weightsMatrix.collect().sortBy(_._1).map(_._2) - - new NaiveBayesModel(labelWeights, weightsMat) + + val weightMatrix = collected.mapValues(_._3).toArray.sortBy(_._1).map(_._2) + + new NaiveBayesModel(weightPerLabel, weightMatrix) } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala index d871ed3672..cc8d48a42b 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala @@ -1,6 +1,5 @@ package org.apache.spark.mllib.classification -import scala.collection.JavaConversions._ import scala.util.Random import org.scalatest.BeforeAndAfterAll @@ -56,12 +55,12 @@ class NaiveBayesSuite extends FunSuite with BeforeAndAfterAll { } def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) { - val numOffPredictions = predictions.zip(input).count { + val numOfPredictions = predictions.zip(input).count { case (prediction, expected) => prediction != expected.label } // At least 80% of the predictions should be on. - assert(numOffPredictions < input.length / 5) + assert(numOfPredictions < input.length / 5) } test("Naive Bayes") { @@ -71,8 +70,8 @@ class NaiveBayesSuite extends FunSuite with BeforeAndAfterAll { val weightsMatrix = Array( Array(math.log(0.91), math.log(0.03), math.log(0.03), math.log(0.03)), // label 0 Array(math.log(0.03), math.log(0.91), math.log(0.03), math.log(0.03)), // label 1 - Array(math.log(0.03), math.log(0.03), math.log(0.91), math.log(0.03)) // label 2 - ) + Array(math.log(0.03), math.log(0.03), math.log(0.91), math.log(0.03)) // label 2 + ) val testData = NaiveBayesSuite.generateNaiveBayesInput(weightPerLabel, weightsMatrix, nPoints, 42) val testRDD = sc.parallelize(testData, 2) -- cgit v1.2.3 From c0337c5bbfd5126c64964a9fdefd2bef11727d87 Mon Sep 17 00:00:00 2001 From: "Lian, Cheng" Date: Wed, 25 Dec 2013 22:45:57 +0800 Subject: Let reduceByKey to take care of local combine Also refactored some heavy FP code to improve readability and reduce memory footprint. --- .../spark/mllib/classification/NaiveBayes.scala | 43 ++++++++-------------- 1 file changed, 16 insertions(+), 27 deletions(-) (limited to 'mllib') diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index edea5ed3e6..4c96b241eb 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -17,8 +17,6 @@ package org.apache.spark.mllib.classification -import scala.collection.mutable - import org.apache.spark.Logging import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD @@ -63,39 +61,30 @@ class NaiveBayes private (val lambda: Double = 1.0) // smoothing parameter * @param data RDD of (label, array of features) pairs. */ def run(C: Int, D: Int, data: RDD[LabeledPoint]) = { - val locallyReduced = data.mapPartitions { iterator => - val localLabelCounts = mutable.Map.empty[Int, Int].withDefaultValue(0) - val localSummedObservations = - mutable.Map.empty[Int, Array[Double]].withDefaultValue(Array.fill(D)(0.0)) - - for (LabeledPoint(label, features) <- iterator; i = label.toInt) { - localLabelCounts(i) += 1 - localSummedObservations(i) = vectorAdd(localSummedObservations(i), features) - } - - for ((label, count) <- localLabelCounts.toIterator) yield { - label -> (count, localSummedObservations(label)) - } - } - - val reduced = locallyReduced.reduceByKey { (lhs, rhs) => + val countsAndSummedFeatures = data.map { case LabeledPoint(label, features) => + label.toInt ->(1, features) + }.reduceByKey { (lhs, rhs) => (lhs._1 + rhs._1, vectorAdd(lhs._2, rhs._2)) } - val collected = reduced.mapValues { case (count, summed) => + val collected = countsAndSummedFeatures.mapValues { case (count, summedFeatureVector) => val labelWeight = math.log(count + lambda) - val logDenom = math.log(summed.sum + D * lambda) - val weights = summed.map(w => math.log(w + lambda) - logDenom) + val logDenom = math.log(summedFeatureVector.sum + D * lambda) + val weights = summedFeatureVector.map(w => math.log(w + lambda) - logDenom) (count, labelWeight, weights) }.collectAsMap() - val weightPerLabel = { - val N = collected.values.map(_._1).sum - val logDenom = math.log(N + C * lambda) - collected.mapValues(_._2 - logDenom).toArray.sortBy(_._1).map(_._2) - } + // We can simply call `data.count` to get `N`, but that triggers another RDD action, which is + // considerably expensive. + val N = collected.values.map(_._1).sum + val logDenom = math.log(N + C * lambda) + val weightPerLabel = Array.fill[Double](C)(0) + val weightMatrix = Array.fill[Array[Double]](C)(null) - val weightMatrix = collected.mapValues(_._3).toArray.sortBy(_._1).map(_._2) + for ((label, (_, labelWeight, weights)) <- collected) { + weightPerLabel(label) = labelWeight - logDenom + weightMatrix(label) = weights + } new NaiveBayesModel(weightPerLabel, weightMatrix) } -- cgit v1.2.3 From 654f42174aa912fec7355d779e4e02731c535c94 Mon Sep 17 00:00:00 2001 From: "Lian, Cheng" Date: Fri, 27 Dec 2013 04:45:04 +0800 Subject: Reformatted some lines commented by Matei --- .../scala/org/apache/spark/mllib/classification/NaiveBayes.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'mllib') diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index 4c96b241eb..2bc4c5afc0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -49,8 +49,9 @@ class NaiveBayesModel(val weightPerLabel: Array[Double], class NaiveBayes private (val lambda: Double = 1.0) // smoothing parameter extends Serializable with Logging { - private[this] def vectorAdd(v1: Array[Double], v2: Array[Double]) = + private def vectorAdd(v1: Array[Double], v2: Array[Double]) = { v1.zip(v2).map(pair => pair._1 + pair._2) + } /** * Run the algorithm with the configured parameters on an input @@ -62,7 +63,7 @@ class NaiveBayes private (val lambda: Double = 1.0) // smoothing parameter */ def run(C: Int, D: Int, data: RDD[LabeledPoint]) = { val countsAndSummedFeatures = data.map { case LabeledPoint(label, features) => - label.toInt ->(1, features) + label.toInt -> (1, features) }.reduceByKey { (lhs, rhs) => (lhs._1 + rhs._1, vectorAdd(lhs._2, rhs._2)) } -- cgit v1.2.3 From d7086dc28a856ec8856278be108310ec8264a115 Mon Sep 17 00:00:00 2001 From: "Lian, Cheng" Date: Fri, 27 Dec 2013 08:20:41 +0800 Subject: Added Apache license header to NaiveBayesSuite --- .../spark/mllib/classification/NaiveBayesSuite.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) (limited to 'mllib') diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala index cc8d48a42b..a2821347a7 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.mllib.classification import scala.util.Random -- cgit v1.2.3 From f150b6e76c56ed6f604e6dbda7bce6b6278929fb Mon Sep 17 00:00:00 2001 From: "Lian, Cheng" Date: Sun, 29 Dec 2013 17:13:01 +0800 Subject: Response to Reynold's comments --- .../spark/mllib/classification/NaiveBayes.scala | 26 +++++++++++++--------- 1 file changed, 16 insertions(+), 10 deletions(-) (limited to 'mllib') diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index 2bc4c5afc0..d0f3a368e8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -17,20 +17,22 @@ package org.apache.spark.mllib.classification +import org.jblas.DoubleMatrix + import org.apache.spark.Logging import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ -import org.jblas.DoubleMatrix /** * Model for Naive Bayes Classifiers. * - * @param weightPerLabel Weights computed for every label, which's dimension is C. - * @param weightMatrix Weights computed for every label and feature, which's dimension is CXD + * @param weightPerLabel Weights computed for every label, whose dimension is C. + * @param weightMatrix Weights computed for every label and feature, whose dimension is CXD */ -class NaiveBayesModel(val weightPerLabel: Array[Double], - val weightMatrix: Array[Array[Double]]) +class NaiveBayesModel( + @transient val weightPerLabel: Array[Double], + @transient val weightMatrix: Array[Array[Double]]) extends ClassificationModel with Serializable { // Create a column vector that can be used for predictions @@ -50,7 +52,12 @@ class NaiveBayes private (val lambda: Double = 1.0) // smoothing parameter extends Serializable with Logging { private def vectorAdd(v1: Array[Double], v2: Array[Double]) = { - v1.zip(v2).map(pair => pair._1 + pair._2) + var i = 0 + while (i < v1.length) { + v1(i) += v2(i) + i += 1 + } + v1 } /** @@ -79,8 +86,8 @@ class NaiveBayes private (val lambda: Double = 1.0) // smoothing parameter // considerably expensive. val N = collected.values.map(_._1).sum val logDenom = math.log(N + C * lambda) - val weightPerLabel = Array.fill[Double](C)(0) - val weightMatrix = Array.fill[Array[Double]](C)(null) + val weightPerLabel = new Array[Double](C) + val weightMatrix = new Array[Array[Double]](C) for ((label, (_, labelWeight, weights)) <- collected) { weightPerLabel(label) = labelWeight - logDenom @@ -100,8 +107,7 @@ object NaiveBayes { * @param input RDD of (label, array of features) pairs. * @param lambda smooth parameter */ - def train(C: Int, D: Int, input: RDD[LabeledPoint], - lambda: Double = 1.0): NaiveBayesModel = { + def train(C: Int, D: Int, input: RDD[LabeledPoint], lambda: Double = 1.0): NaiveBayesModel = { new NaiveBayes(lambda).run(C, D, input) } } -- cgit v1.2.3 From 6d0e2e86dfbca88abc847d3babac2d1f82d61aaf Mon Sep 17 00:00:00 2001 From: "Lian, Cheng" Date: Mon, 30 Dec 2013 22:46:32 +0800 Subject: Response to comments from Reynold, Ameet and Evan * Arguments renamed according to Ameet's suggestion * Using DoubleMatrix instead of Array[Double] in computation * Removed arguments C (kinds of label) and D (dimension of feature vector) from NaiveBayes.train() * Replaced reduceByKey with foldByKey to avoid modifying original input data --- .../spark/mllib/classification/NaiveBayes.scala | 120 +++++++++++++-------- .../mllib/classification/NaiveBayesSuite.scala | 32 +++--- 2 files changed, 90 insertions(+), 62 deletions(-) (limited to 'mllib') diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index d0f3a368e8..9fd1adddb0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -27,87 +27,115 @@ import org.apache.spark.SparkContext._ /** * Model for Naive Bayes Classifiers. * - * @param weightPerLabel Weights computed for every label, whose dimension is C. - * @param weightMatrix Weights computed for every label and feature, whose dimension is CXD + * @param pi Log of class priors, whose dimension is C. + * @param theta Log of class conditional probabilities, whose dimension is CXD. */ -class NaiveBayesModel( - @transient val weightPerLabel: Array[Double], - @transient val weightMatrix: Array[Array[Double]]) +class NaiveBayesModel(pi: Array[Double], theta: Array[Array[Double]]) extends ClassificationModel with Serializable { // Create a column vector that can be used for predictions - private val _weightPerLabel = new DoubleMatrix(weightPerLabel.length, 1, weightPerLabel:_*) - private val _weightMatrix = new DoubleMatrix(weightMatrix) + private val _pi = new DoubleMatrix(pi.length, 1, pi: _*) + private val _theta = new DoubleMatrix(theta) def predict(testData: RDD[Array[Double]]): RDD[Double] = testData.map(predict) def predict(testData: Array[Double]): Double = { val dataMatrix = new DoubleMatrix(testData.length, 1, testData: _*) - val result = _weightPerLabel.add(_weightMatrix.mmul(dataMatrix)) + val result = _pi.add(_theta.mmul(dataMatrix)) result.argmax() } } -class NaiveBayes private (val lambda: Double = 1.0) // smoothing parameter +/** + * Trains a Naive Bayes model given an RDD of `(label, features)` pairs. + * + * @param lambda The smooth parameter + */ +class NaiveBayes private (val lambda: Double = 1.0) extends Serializable with Logging { - private def vectorAdd(v1: Array[Double], v2: Array[Double]) = { - var i = 0 - while (i < v1.length) { - v1(i) += v2(i) - i += 1 - } - v1 - } - /** - * Run the algorithm with the configured parameters on an input - * RDD of LabeledPoint entries. + * Run the algorithm with the configured parameters on an input RDD of LabeledPoint entries. * - * @param C kind of labels, labels are continuous integers and the maximal label is C-1 - * @param D dimension of feature vectors * @param data RDD of (label, array of features) pairs. */ - def run(C: Int, D: Int, data: RDD[LabeledPoint]) = { - val countsAndSummedFeatures = data.map { case LabeledPoint(label, features) => - label.toInt -> (1, features) - }.reduceByKey { (lhs, rhs) => - (lhs._1 + rhs._1, vectorAdd(lhs._2, rhs._2)) + def run(data: RDD[LabeledPoint]) = { + // Prepares input data, the shape of resulted RDD is: + // + // label: Int -> (count: Int, features: DoubleMatrix) + // + // The added count field is initialized to 1 to enable the following `foldByKey` transformation. + val mappedData = data.map { case LabeledPoint(label, features) => + label.toInt -> (1, new DoubleMatrix(features.length, 1, features: _*)) + } + + // Gets a map from labels to their corresponding sample point counts and summed feature vectors. + // Shape of resulted RDD is: + // + // label: Int -> (count: Int, summedFeatureVector: DoubleMatrix) + // + // Two tricky parts worth explaining: + // + // 1. Feature vectors are summed with the inplace jblas matrix addition operation, thus we + // chose `foldByKey` instead of `reduceByKey` to avoid modifying original input data. + // + // 2. The zero value passed to `foldByKey` contains a `null` rather than a zero vector because + // the dimension of the feature vector is unknown. Calling `data.first.length` to get the + // dimension is not preferable since it requires an expensive RDD action. + val countsAndSummedFeatures = mappedData.foldByKey((0, null)) { (lhs, rhs) => + if (lhs._1 == 0) { + (rhs._1, new DoubleMatrix().copy(rhs._2)) + } else { + (lhs._1 + rhs._1, lhs._2.addi(rhs._2)) + } } val collected = countsAndSummedFeatures.mapValues { case (count, summedFeatureVector) => - val labelWeight = math.log(count + lambda) - val logDenom = math.log(summedFeatureVector.sum + D * lambda) - val weights = summedFeatureVector.map(w => math.log(w + lambda) - logDenom) - (count, labelWeight, weights) + val p = math.log(count + lambda) + val logDenom = math.log(summedFeatureVector.sum + summedFeatureVector.length * lambda) + val t = summedFeatureVector + var i = 0 + while (i < t.length) { + t.put(i, math.log(t.get(i) + lambda) - logDenom) + i += 1 + } + (count, p, t) }.collectAsMap() - // We can simply call `data.count` to get `N`, but that triggers another RDD action, which is - // considerably expensive. + // Total sample count. Calling `data.count` to get `N` is not preferable since it triggers + // an expensive RDD action val N = collected.values.map(_._1).sum + + // Kinds of label. + val C = collected.size + val logDenom = math.log(N + C * lambda) - val weightPerLabel = new Array[Double](C) - val weightMatrix = new Array[Array[Double]](C) + val pi = new Array[Double](C) + val theta = new Array[Array[Double]](C) - for ((label, (_, labelWeight, weights)) <- collected) { - weightPerLabel(label) = labelWeight - logDenom - weightMatrix(label) = weights + for ((label, (_, p, t)) <- collected) { + pi(label) = p - logDenom + theta(label) = t.toArray } - new NaiveBayesModel(weightPerLabel, weightMatrix) + new NaiveBayesModel(pi, theta) } } object NaiveBayes { /** - * Train a naive bayes model given an RDD of (label, features) pairs. + * Trains a Naive Bayes model given an RDD of `(label, features)` pairs. + * + * This is the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of + * discrete data. For example, by converting documents into TF-IDF vectors, it can be used for + * document classification. By making every vector a 0-1 vector. it can also be used as + * Bernoulli NB ([[http://tinyurl.com/p7c96j6]]). * - * @param C kind of labels, the maximal label is C-1 - * @param D dimension of feature vectors - * @param input RDD of (label, array of features) pairs. - * @param lambda smooth parameter + * @param input RDD of `(label, array of features)` pairs. Every vector should be a frequency + * vector or a count vector. + * @param lambda The smooth parameter */ - def train(C: Int, D: Int, input: RDD[LabeledPoint], lambda: Double = 1.0): NaiveBayesModel = { - new NaiveBayes(lambda).run(C, D, input) + def train(input: RDD[LabeledPoint], lambda: Double = 1.0): NaiveBayesModel = { + new NaiveBayes(lambda).run(input) } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala index a2821347a7..18575f410c 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala @@ -38,20 +38,20 @@ object NaiveBayesSuite { // Generate input of the form Y = (weightMatrix*x).argmax() def generateNaiveBayesInput( - weightPerLabel: Array[Double], // 1XC - weightsMatrix: Array[Array[Double]], // CXD + pi: Array[Double], // 1XC + theta: Array[Array[Double]], // CXD nPoints: Int, seed: Int): Seq[LabeledPoint] = { - val D = weightsMatrix(0).length + val D = theta(0).length val rnd = new Random(seed) - val _weightPerLabel = weightPerLabel.map(math.pow(math.E, _)) - val _weightMatrix = weightsMatrix.map(row => row.map(math.pow(math.E, _))) + val _pi = pi.map(math.pow(math.E, _)) + val _theta = theta.map(row => row.map(math.pow(math.E, _))) for (i <- 0 until nPoints) yield { - val y = calcLabel(rnd.nextDouble(), _weightPerLabel) + val y = calcLabel(rnd.nextDouble(), _pi) val xi = Array.tabulate[Double](D) { j => - if (rnd.nextDouble() < _weightMatrix(y)(j)) 1 else 0 + if (rnd.nextDouble() < _theta(y)(j)) 1 else 0 } LabeledPoint(y, xi) @@ -83,20 +83,20 @@ class NaiveBayesSuite extends FunSuite with BeforeAndAfterAll { test("Naive Bayes") { val nPoints = 10000 - val weightPerLabel = Array(math.log(0.5), math.log(0.3), math.log(0.2)) - val weightsMatrix = Array( - Array(math.log(0.91), math.log(0.03), math.log(0.03), math.log(0.03)), // label 0 - Array(math.log(0.03), math.log(0.91), math.log(0.03), math.log(0.03)), // label 1 - Array(math.log(0.03), math.log(0.03), math.log(0.91), math.log(0.03)) // label 2 - ) + val pi = Array(0.5, 0.3, 0.2).map(math.log) + val theta = Array( + Array(0.91, 0.03, 0.03, 0.03), // label 0 + Array(0.03, 0.91, 0.03, 0.03), // label 1 + Array(0.03, 0.03, 0.91, 0.03) // label 2 + ).map(_.map(math.log)) - val testData = NaiveBayesSuite.generateNaiveBayesInput(weightPerLabel, weightsMatrix, nPoints, 42) + val testData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 42) val testRDD = sc.parallelize(testData, 2) testRDD.cache() - val model = NaiveBayes.train(3, 4, testRDD) + val model = NaiveBayes.train(testRDD) - val validationData = NaiveBayesSuite.generateNaiveBayesInput(weightPerLabel, weightsMatrix, nPoints, 17) + val validationData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 17) val validationRDD = sc.parallelize(validationData, 2) // Test prediction on RDD. -- cgit v1.2.3 From dd6033e6853e32e9de2c910797c7fbc0072e7491 Mon Sep 17 00:00:00 2001 From: "Lian, Cheng" Date: Thu, 2 Jan 2014 01:38:24 +0800 Subject: Aggregated all sample points to driver without any shuffle --- .../spark/mllib/classification/NaiveBayes.scala | 76 ++++++++-------------- .../mllib/classification/NaiveBayesSuite.scala | 8 +-- 2 files changed, 31 insertions(+), 53 deletions(-) (limited to 'mllib') diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index 9fd1adddb0..524300d6ae 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -17,12 +17,13 @@ package org.apache.spark.mllib.classification +import scala.collection.mutable + import org.jblas.DoubleMatrix import org.apache.spark.Logging import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD -import org.apache.spark.SparkContext._ /** * Model for Naive Bayes Classifiers. @@ -60,62 +61,39 @@ class NaiveBayes private (val lambda: Double = 1.0) * @param data RDD of (label, array of features) pairs. */ def run(data: RDD[LabeledPoint]) = { - // Prepares input data, the shape of resulted RDD is: - // - // label: Int -> (count: Int, features: DoubleMatrix) - // - // The added count field is initialized to 1 to enable the following `foldByKey` transformation. - val mappedData = data.map { case LabeledPoint(label, features) => - label.toInt -> (1, new DoubleMatrix(features.length, 1, features: _*)) - } - - // Gets a map from labels to their corresponding sample point counts and summed feature vectors. - // Shape of resulted RDD is: - // - // label: Int -> (count: Int, summedFeatureVector: DoubleMatrix) + // Aggregates all sample points to driver side to get sample count and summed feature vector + // for each label. The shape of `zeroCombiner` & `aggregated` is: // - // Two tricky parts worth explaining: - // - // 1. Feature vectors are summed with the inplace jblas matrix addition operation, thus we - // chose `foldByKey` instead of `reduceByKey` to avoid modifying original input data. - // - // 2. The zero value passed to `foldByKey` contains a `null` rather than a zero vector because - // the dimension of the feature vector is unknown. Calling `data.first.length` to get the - // dimension is not preferable since it requires an expensive RDD action. - val countsAndSummedFeatures = mappedData.foldByKey((0, null)) { (lhs, rhs) => - if (lhs._1 == 0) { - (rhs._1, new DoubleMatrix().copy(rhs._2)) - } else { - (lhs._1 + rhs._1, lhs._2.addi(rhs._2)) + // label: Int -> (count: Int, featuresSum: DoubleMatrix) + val zeroCombiner = mutable.Map.empty[Int, (Int, DoubleMatrix)] + val aggregated = data.aggregate(zeroCombiner)({ (combiner, point) => + point match { + case LabeledPoint(label, features) => + val (count, featuresSum) = combiner.getOrElse(label.toInt, (0, DoubleMatrix.zeros(1))) + val fs = new DoubleMatrix(features.length, 1, features: _*) + combiner += label.toInt -> (count + 1, featuresSum.addi(fs)) } - } - - val collected = countsAndSummedFeatures.mapValues { case (count, summedFeatureVector) => - val p = math.log(count + lambda) - val logDenom = math.log(summedFeatureVector.sum + summedFeatureVector.length * lambda) - val t = summedFeatureVector - var i = 0 - while (i < t.length) { - t.put(i, math.log(t.get(i) + lambda) - logDenom) - i += 1 + }, { (lhs, rhs) => + for ((label, (c, fs)) <- rhs) { + val (count, featuresSum) = lhs.getOrElse(label, (0, DoubleMatrix.zeros(1))) + lhs(label) = (count + c, featuresSum.addi(fs)) } - (count, p, t) - }.collectAsMap() - - // Total sample count. Calling `data.count` to get `N` is not preferable since it triggers - // an expensive RDD action - val N = collected.values.map(_._1).sum + lhs + }) - // Kinds of label. - val C = collected.size + // Kinds of label + val C = aggregated.size + // Total sample count + val N = aggregated.values.map(_._1).sum - val logDenom = math.log(N + C * lambda) val pi = new Array[Double](C) val theta = new Array[Array[Double]](C) + val piLogDenom = math.log(N + C * lambda) - for ((label, (_, p, t)) <- collected) { - pi(label) = p - logDenom - theta(label) = t.toArray + for ((label, (count, fs)) <- aggregated) { + val thetaLogDenom = math.log(fs.sum() + fs.length * lambda) + pi(label) = math.log(count + lambda) - piLogDenom + theta(label) = fs.toArray.map(f => math.log(f + lambda) - thetaLogDenom) } new NaiveBayesModel(pi, theta) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala index 18575f410c..b615f76e66 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala @@ -27,16 +27,16 @@ import org.apache.spark.SparkContext object NaiveBayesSuite { - private def calcLabel(p: Double, weightPerLabel: Array[Double]): Int = { + private def calcLabel(p: Double, pi: Array[Double]): Int = { var sum = 0.0 - for (j <- 0 until weightPerLabel.length) { - sum += weightPerLabel(j) + for (j <- 0 until pi.length) { + sum += pi(j) if (p < sum) return j } -1 } - // Generate input of the form Y = (weightMatrix*x).argmax() + // Generate input of the form Y = (theta * x).argmax() def generateNaiveBayesInput( pi: Array[Double], // 1XC theta: Array[Array[Double]], // CXD -- cgit v1.2.3