aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorHossein Falaki <falaki@gmail.com>2014-01-07 15:22:42 -0800
committerHossein Falaki <falaki@gmail.com>2014-01-07 15:22:42 -0800
commit3a8beb46cb53cf6807f39cca54b1efdbbc303f41 (patch)
tree609e141e3b34212c5d171e593dab803aaafdd7aa /mllib
parent754f5300a1e0a214b62cbd6db2398dea4dfbceb4 (diff)
parent7d5fa175ca9cd2260c7bcd18c201bc087d4f62c3 (diff)
downloadspark-3a8beb46cb53cf6807f39cca54b1efdbbc303f41.tar.gz
spark-3a8beb46cb53cf6807f39cca54b1efdbbc303f41.tar.bz2
spark-3a8beb46cb53cf6807f39cca54b1efdbbc303f41.zip
Merge branch 'master' into MatrixFactorizationModel-fix
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala119
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala4
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala108
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala116
4 files changed, 345 insertions, 2 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
new file mode 100644
index 0000000000..524300d6ae
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.classification
+
+import scala.collection.mutable
+
+import org.jblas.DoubleMatrix
+
+import org.apache.spark.Logging
+import org.apache.spark.mllib.regression.LabeledPoint
+import org.apache.spark.rdd.RDD
+
+/**
+ * Model for Naive Bayes Classifiers.
+ *
+ * @param pi Log of class priors, whose dimension is C.
+ * @param theta Log of class conditional probabilities, whose dimension is CXD.
+ */
+class NaiveBayesModel(pi: Array[Double], theta: Array[Array[Double]])
+ extends ClassificationModel with Serializable {
+
+ // Create a column vector that can be used for predictions
+ private val _pi = new DoubleMatrix(pi.length, 1, pi: _*)
+ private val _theta = new DoubleMatrix(theta)
+
+ def predict(testData: RDD[Array[Double]]): RDD[Double] = testData.map(predict)
+
+ def predict(testData: Array[Double]): Double = {
+ val dataMatrix = new DoubleMatrix(testData.length, 1, testData: _*)
+ val result = _pi.add(_theta.mmul(dataMatrix))
+ result.argmax()
+ }
+}
+
+/**
+ * Trains a Naive Bayes model given an RDD of `(label, features)` pairs.
+ *
+ * @param lambda The smooth parameter
+ */
+class NaiveBayes private (val lambda: Double = 1.0)
+ extends Serializable with Logging {
+
+ /**
+ * Run the algorithm with the configured parameters on an input RDD of LabeledPoint entries.
+ *
+ * @param data RDD of (label, array of features) pairs.
+ */
+ def run(data: RDD[LabeledPoint]) = {
+ // Aggregates all sample points to driver side to get sample count and summed feature vector
+ // for each label. The shape of `zeroCombiner` & `aggregated` is:
+ //
+ // label: Int -> (count: Int, featuresSum: DoubleMatrix)
+ val zeroCombiner = mutable.Map.empty[Int, (Int, DoubleMatrix)]
+ val aggregated = data.aggregate(zeroCombiner)({ (combiner, point) =>
+ point match {
+ case LabeledPoint(label, features) =>
+ val (count, featuresSum) = combiner.getOrElse(label.toInt, (0, DoubleMatrix.zeros(1)))
+ val fs = new DoubleMatrix(features.length, 1, features: _*)
+ combiner += label.toInt -> (count + 1, featuresSum.addi(fs))
+ }
+ }, { (lhs, rhs) =>
+ for ((label, (c, fs)) <- rhs) {
+ val (count, featuresSum) = lhs.getOrElse(label, (0, DoubleMatrix.zeros(1)))
+ lhs(label) = (count + c, featuresSum.addi(fs))
+ }
+ lhs
+ })
+
+ // Kinds of label
+ val C = aggregated.size
+ // Total sample count
+ val N = aggregated.values.map(_._1).sum
+
+ val pi = new Array[Double](C)
+ val theta = new Array[Array[Double]](C)
+ val piLogDenom = math.log(N + C * lambda)
+
+ for ((label, (count, fs)) <- aggregated) {
+ val thetaLogDenom = math.log(fs.sum() + fs.length * lambda)
+ pi(label) = math.log(count + lambda) - piLogDenom
+ theta(label) = fs.toArray.map(f => math.log(f + lambda) - thetaLogDenom)
+ }
+
+ new NaiveBayesModel(pi, theta)
+ }
+}
+
+object NaiveBayes {
+ /**
+ * Trains a Naive Bayes model given an RDD of `(label, features)` pairs.
+ *
+ * This is the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of
+ * discrete data. For example, by converting documents into TF-IDF vectors, it can be used for
+ * document classification. By making every vector a 0-1 vector. it can also be used as
+ * Bernoulli NB ([[http://tinyurl.com/p7c96j6]]).
+ *
+ * @param input RDD of `(label, array of features)` pairs. Every vector should be a frequency
+ * vector or a count vector.
+ * @param lambda The smooth parameter
+ */
+ def train(input: RDD[LabeledPoint], lambda: Double = 1.0): NaiveBayesModel = {
+ new NaiveBayes(lambda).run(input)
+ }
+}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala
index 749e7364f4..c590492e7a 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala
@@ -50,8 +50,8 @@ class LogisticGradient extends Gradient {
val gradient = data.mul(gradientMultiplier)
val loss =
- if (margin > 0) {
- math.log(1 + math.exp(0 - margin))
+ if (label > 0) {
+ math.log(1 + math.exp(margin))
} else {
math.log(1 + math.exp(margin)) - margin
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
new file mode 100644
index 0000000000..b615f76e66
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.classification
+
+import scala.util.Random
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.FunSuite
+
+import org.apache.spark.mllib.regression.LabeledPoint
+import org.apache.spark.SparkContext
+
+object NaiveBayesSuite {
+
+ private def calcLabel(p: Double, pi: Array[Double]): Int = {
+ var sum = 0.0
+ for (j <- 0 until pi.length) {
+ sum += pi(j)
+ if (p < sum) return j
+ }
+ -1
+ }
+
+ // Generate input of the form Y = (theta * x).argmax()
+ def generateNaiveBayesInput(
+ pi: Array[Double], // 1XC
+ theta: Array[Array[Double]], // CXD
+ nPoints: Int,
+ seed: Int): Seq[LabeledPoint] = {
+ val D = theta(0).length
+ val rnd = new Random(seed)
+
+ val _pi = pi.map(math.pow(math.E, _))
+ val _theta = theta.map(row => row.map(math.pow(math.E, _)))
+
+ for (i <- 0 until nPoints) yield {
+ val y = calcLabel(rnd.nextDouble(), _pi)
+ val xi = Array.tabulate[Double](D) { j =>
+ if (rnd.nextDouble() < _theta(y)(j)) 1 else 0
+ }
+
+ LabeledPoint(y, xi)
+ }
+ }
+}
+
+class NaiveBayesSuite extends FunSuite with BeforeAndAfterAll {
+ @transient private var sc: SparkContext = _
+
+ override def beforeAll() {
+ sc = new SparkContext("local", "test")
+ }
+
+ override def afterAll() {
+ sc.stop()
+ System.clearProperty("spark.driver.port")
+ }
+
+ def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) {
+ val numOfPredictions = predictions.zip(input).count {
+ case (prediction, expected) =>
+ prediction != expected.label
+ }
+ // At least 80% of the predictions should be on.
+ assert(numOfPredictions < input.length / 5)
+ }
+
+ test("Naive Bayes") {
+ val nPoints = 10000
+
+ val pi = Array(0.5, 0.3, 0.2).map(math.log)
+ val theta = Array(
+ Array(0.91, 0.03, 0.03, 0.03), // label 0
+ Array(0.03, 0.91, 0.03, 0.03), // label 1
+ Array(0.03, 0.03, 0.91, 0.03) // label 2
+ ).map(_.map(math.log))
+
+ val testData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 42)
+ val testRDD = sc.parallelize(testData, 2)
+ testRDD.cache()
+
+ val model = NaiveBayes.train(testRDD)
+
+ val validationData = NaiveBayesSuite.generateNaiveBayesInput(pi, theta, nPoints, 17)
+ val validationRDD = sc.parallelize(validationData, 2)
+
+ // Test prediction on RDD.
+ validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData)
+
+ // Test prediction on Array.
+ validatePrediction(validationData.map(row => model.predict(row.features)), validationData)
+ }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala
new file mode 100644
index 0000000000..a6028a1e98
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.optimization
+
+import scala.util.Random
+import scala.collection.JavaConversions._
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.FunSuite
+import org.scalatest.matchers.ShouldMatchers
+
+import org.apache.spark.SparkContext
+import org.apache.spark.mllib.regression._
+
+object GradientDescentSuite {
+
+ def generateLogisticInputAsList(
+ offset: Double,
+ scale: Double,
+ nPoints: Int,
+ seed: Int): java.util.List[LabeledPoint] = {
+ seqAsJavaList(generateGDInput(offset, scale, nPoints, seed))
+ }
+
+ // Generate input of the form Y = logistic(offset + scale * X)
+ def generateGDInput(
+ offset: Double,
+ scale: Double,
+ nPoints: Int,
+ seed: Int): Seq[LabeledPoint] = {
+ val rnd = new Random(seed)
+ val x1 = Array.fill[Double](nPoints)(rnd.nextGaussian())
+
+ val unifRand = new scala.util.Random(45)
+ val rLogis = (0 until nPoints).map { i =>
+ val u = unifRand.nextDouble()
+ math.log(u) - math.log(1.0-u)
+ }
+
+ val y: Seq[Int] = (0 until nPoints).map { i =>
+ val yVal = offset + scale * x1(i) + rLogis(i)
+ if (yVal > 0) 1 else 0
+ }
+
+ val testData = (0 until nPoints).map(i => LabeledPoint(y(i), Array(x1(i))))
+ testData
+ }
+}
+
+class GradientDescentSuite extends FunSuite with BeforeAndAfterAll with ShouldMatchers {
+ @transient private var sc: SparkContext = _
+
+ override def beforeAll() {
+ sc = new SparkContext("local", "test")
+ }
+
+ override def afterAll() {
+ sc.stop()
+ System.clearProperty("spark.driver.port")
+ }
+
+ test("Assert the loss is decreasing.") {
+ val nPoints = 10000
+ val A = 2.0
+ val B = -1.5
+
+ val initialB = -1.0
+ val initialWeights = Array(initialB)
+
+ val gradient = new LogisticGradient()
+ val updater = new SimpleUpdater()
+ val stepSize = 1.0
+ val numIterations = 10
+ val regParam = 0
+ val miniBatchFrac = 1.0
+
+ // Add a extra variable consisting of all 1.0's for the intercept.
+ val testData = GradientDescentSuite.generateGDInput(A, B, nPoints, 42)
+ val data = testData.map { case LabeledPoint(label, features) =>
+ label -> Array(1.0, features: _*)
+ }
+
+ val dataRDD = sc.parallelize(data, 2).cache()
+ val initialWeightsWithIntercept = Array(1.0, initialWeights: _*)
+
+ val (_, loss) = GradientDescent.runMiniBatchSGD(
+ dataRDD,
+ gradient,
+ updater,
+ stepSize,
+ numIterations,
+ regParam,
+ miniBatchFrac,
+ initialWeightsWithIntercept)
+
+ assert(loss.last - loss.head < 0, "loss isn't decreasing.")
+
+ val lossDiff = loss.init.zip(loss.tail).map { case (lhs, rhs) => lhs - rhs }
+ assert(lossDiff.count(_ > 0).toDouble / lossDiff.size > 0.8)
+ }
+}