aboutsummaryrefslogtreecommitdiff
path: root/mllib/src/test/scala/org
diff options
context:
space:
mode:
Diffstat (limited to 'mllib/src/test/scala/org')
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala150
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala169
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala173
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala125
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala121
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala72
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala90
7 files changed, 900 insertions, 0 deletions
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala
new file mode 100644
index 0000000000..34c67294e9
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.classification
+
+import scala.util.Random
+import scala.collection.JavaConversions._
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.FunSuite
+import org.scalatest.matchers.ShouldMatchers
+
+import org.apache.spark.SparkContext
+import org.apache.spark.mllib.regression._
+
+object LogisticRegressionSuite {
+
+ def generateLogisticInputAsList(
+ offset: Double,
+ scale: Double,
+ nPoints: Int,
+ seed: Int): java.util.List[LabeledPoint] = {
+ seqAsJavaList(generateLogisticInput(offset, scale, nPoints, seed))
+ }
+
+ // Generate input of the form Y = logistic(offset + scale*X)
+ def generateLogisticInput(
+ offset: Double,
+ scale: Double,
+ nPoints: Int,
+ seed: Int): Seq[LabeledPoint] = {
+ val rnd = new Random(seed)
+ val x1 = Array.fill[Double](nPoints)(rnd.nextGaussian())
+
+ // NOTE: if U is uniform[0, 1] then ln(u) - ln(1-u) is Logistic(0,1)
+ val unifRand = new scala.util.Random(45)
+ val rLogis = (0 until nPoints).map { i =>
+ val u = unifRand.nextDouble()
+ math.log(u) - math.log(1.0-u)
+ }
+
+ // y <- A + B*x + rLogis()
+ // y <- as.numeric(y > 0)
+ val y: Seq[Int] = (0 until nPoints).map { i =>
+ val yVal = offset + scale * x1(i) + rLogis(i)
+ if (yVal > 0) 1 else 0
+ }
+
+ val testData = (0 until nPoints).map(i => LabeledPoint(y(i), Array(x1(i))))
+ testData
+ }
+
+}
+
+class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll with ShouldMatchers {
+ @transient private var sc: SparkContext = _
+
+ override def beforeAll() {
+ sc = new SparkContext("local", "test")
+ }
+
+
+ override def afterAll() {
+ sc.stop()
+ System.clearProperty("spark.driver.port")
+ }
+
+ def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) {
+ val numOffPredictions = predictions.zip(input).filter { case (prediction, expected) =>
+ (prediction != expected.label)
+ }.size
+ // At least 83% of the predictions should be on.
+ ((input.length - numOffPredictions).toDouble / input.length) should be > 0.83
+ }
+
+ // Test if we can correctly learn A, B where Y = logistic(A + B*X)
+ test("logistic regression") {
+ val nPoints = 10000
+ val A = 2.0
+ val B = -1.5
+
+ val testData = LogisticRegressionSuite.generateLogisticInput(A, B, nPoints, 42)
+
+ val testRDD = sc.parallelize(testData, 2)
+ testRDD.cache()
+ val lr = new LogisticRegressionWithSGD()
+ lr.optimizer.setStepSize(10.0).setNumIterations(20)
+
+ val model = lr.run(testRDD)
+
+ // Test the weights
+ val weight0 = model.weights(0)
+ assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]")
+ assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + " not in [1.9, 2.1]")
+
+ val validationData = LogisticRegressionSuite.generateLogisticInput(A, B, nPoints, 17)
+ val validationRDD = sc.parallelize(validationData, 2)
+ // Test prediction on RDD.
+ validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData)
+
+ // Test prediction on Array.
+ validatePrediction(validationData.map(row => model.predict(row.features)), validationData)
+ }
+
+ test("logistic regression with initial weights") {
+ val nPoints = 10000
+ val A = 2.0
+ val B = -1.5
+
+ val testData = LogisticRegressionSuite.generateLogisticInput(A, B, nPoints, 42)
+
+ val initialB = -1.0
+ val initialWeights = Array(initialB)
+
+ val testRDD = sc.parallelize(testData, 2)
+ testRDD.cache()
+
+ // Use half as many iterations as the previous test.
+ val lr = new LogisticRegressionWithSGD()
+ lr.optimizer.setStepSize(10.0).setNumIterations(10)
+
+ val model = lr.run(testRDD, initialWeights)
+
+ val weight0 = model.weights(0)
+ assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]")
+ assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + " not in [1.9, 2.1]")
+
+ val validationData = LogisticRegressionSuite.generateLogisticInput(A, B, nPoints, 17)
+ val validationRDD = sc.parallelize(validationData, 2)
+ // Test prediction on RDD.
+ validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData)
+
+ // Test prediction on Array.
+ validatePrediction(validationData.map(row => model.predict(row.features)), validationData)
+ }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala
new file mode 100644
index 0000000000..6a957e3ddc
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.classification
+
+import scala.util.Random
+import scala.math.signum
+import scala.collection.JavaConversions._
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.FunSuite
+
+import org.jblas.DoubleMatrix
+
+import org.apache.spark.{SparkException, SparkContext}
+import org.apache.spark.mllib.regression._
+
+object SVMSuite {
+
+ def generateSVMInputAsList(
+ intercept: Double,
+ weights: Array[Double],
+ nPoints: Int,
+ seed: Int): java.util.List[LabeledPoint] = {
+ seqAsJavaList(generateSVMInput(intercept, weights, nPoints, seed))
+ }
+
+ // Generate noisy input of the form Y = signum(x.dot(weights) + intercept + noise)
+ def generateSVMInput(
+ intercept: Double,
+ weights: Array[Double],
+ nPoints: Int,
+ seed: Int): Seq[LabeledPoint] = {
+ val rnd = new Random(seed)
+ val weightsMat = new DoubleMatrix(1, weights.length, weights:_*)
+ val x = Array.fill[Array[Double]](nPoints)(
+ Array.fill[Double](weights.length)(rnd.nextDouble() * 2.0 - 1.0))
+ val y = x.map { xi =>
+ val yD = (new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) +
+ intercept + 0.01 * rnd.nextGaussian()
+ if (yD < 0) 0.0 else 1.0
+ }
+ y.zip(x).map(p => LabeledPoint(p._1, p._2))
+ }
+
+}
+
+class SVMSuite extends FunSuite with BeforeAndAfterAll {
+ @transient private var sc: SparkContext = _
+
+ override def beforeAll() {
+ sc = new SparkContext("local", "test")
+ }
+
+ override def afterAll() {
+ sc.stop()
+ System.clearProperty("spark.driver.port")
+ }
+
+ def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) {
+ val numOffPredictions = predictions.zip(input).filter { case (prediction, expected) =>
+ (prediction != expected.label)
+ }.size
+ // At least 80% of the predictions should be on.
+ assert(numOffPredictions < input.length / 5)
+ }
+
+
+ test("SVM using local random SGD") {
+ val nPoints = 10000
+
+ // NOTE: Intercept should be small for generating equal 0s and 1s
+ val A = 0.01
+ val B = -1.5
+ val C = 1.0
+
+ val testData = SVMSuite.generateSVMInput(A, Array[Double](B,C), nPoints, 42)
+
+ val testRDD = sc.parallelize(testData, 2)
+ testRDD.cache()
+
+ val svm = new SVMWithSGD()
+ svm.optimizer.setStepSize(1.0).setRegParam(1.0).setNumIterations(100)
+
+ val model = svm.run(testRDD)
+
+ val validationData = SVMSuite.generateSVMInput(A, Array[Double](B,C), nPoints, 17)
+ val validationRDD = sc.parallelize(validationData, 2)
+
+ // Test prediction on RDD.
+ validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData)
+
+ // Test prediction on Array.
+ validatePrediction(validationData.map(row => model.predict(row.features)), validationData)
+ }
+
+ test("SVM local random SGD with initial weights") {
+ val nPoints = 10000
+
+ // NOTE: Intercept should be small for generating equal 0s and 1s
+ val A = 0.01
+ val B = -1.5
+ val C = 1.0
+
+ val testData = SVMSuite.generateSVMInput(A, Array[Double](B,C), nPoints, 42)
+
+ val initialB = -1.0
+ val initialC = -1.0
+ val initialWeights = Array(initialB,initialC)
+
+ val testRDD = sc.parallelize(testData, 2)
+ testRDD.cache()
+
+ val svm = new SVMWithSGD()
+ svm.optimizer.setStepSize(1.0).setRegParam(1.0).setNumIterations(100)
+
+ val model = svm.run(testRDD, initialWeights)
+
+ val validationData = SVMSuite.generateSVMInput(A, Array[Double](B,C), nPoints, 17)
+ val validationRDD = sc.parallelize(validationData,2)
+
+ // Test prediction on RDD.
+ validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData)
+
+ // Test prediction on Array.
+ validatePrediction(validationData.map(row => model.predict(row.features)), validationData)
+ }
+
+ test("SVM with invalid labels") {
+ val nPoints = 10000
+
+ // NOTE: Intercept should be small for generating equal 0s and 1s
+ val A = 0.01
+ val B = -1.5
+ val C = 1.0
+
+ val testData = SVMSuite.generateSVMInput(A, Array[Double](B,C), nPoints, 42)
+ val testRDD = sc.parallelize(testData, 2)
+
+ val testRDDInvalid = testRDD.map { lp =>
+ if (lp.label == 0.0) {
+ LabeledPoint(-1.0, lp.features)
+ } else {
+ lp
+ }
+ }
+
+ intercept[SparkException] {
+ val model = SVMWithSGD.train(testRDDInvalid, 100)
+ }
+
+ // Turning off data validation should not throw an exception
+ val noValidationModel = new SVMWithSGD().setValidateData(false).run(testRDDInvalid)
+ }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala
new file mode 100644
index 0000000000..94245f6027
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.clustering
+
+import scala.util.Random
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+
+import org.jblas._
+
+class KMeansSuite extends FunSuite with BeforeAndAfterAll {
+ @transient private var sc: SparkContext = _
+
+ override def beforeAll() {
+ sc = new SparkContext("local", "test")
+ }
+
+ override def afterAll() {
+ sc.stop()
+ System.clearProperty("spark.driver.port")
+ }
+
+ val EPSILON = 1e-4
+
+ import KMeans.{RANDOM, K_MEANS_PARALLEL}
+
+ def prettyPrint(point: Array[Double]): String = point.mkString("(", ", ", ")")
+
+ def prettyPrint(points: Array[Array[Double]]): String = {
+ points.map(prettyPrint).mkString("(", "; ", ")")
+ }
+
+ // L1 distance between two points
+ def distance1(v1: Array[Double], v2: Array[Double]): Double = {
+ v1.zip(v2).map{ case (a, b) => math.abs(a-b) }.max
+ }
+
+ // Assert that two vectors are equal within tolerance EPSILON
+ def assertEqual(v1: Array[Double], v2: Array[Double]) {
+ def errorMessage = prettyPrint(v1) + " did not equal " + prettyPrint(v2)
+ assert(v1.length == v2.length, errorMessage)
+ assert(distance1(v1, v2) <= EPSILON, errorMessage)
+ }
+
+ // Assert that two sets of points are equal, within EPSILON tolerance
+ def assertSetsEqual(set1: Array[Array[Double]], set2: Array[Array[Double]]) {
+ def errorMessage = prettyPrint(set1) + " did not equal " + prettyPrint(set2)
+ assert(set1.length == set2.length, errorMessage)
+ for (v <- set1) {
+ val closestDistance = set2.map(w => distance1(v, w)).min
+ if (closestDistance > EPSILON) {
+ fail(errorMessage)
+ }
+ }
+ for (v <- set2) {
+ val closestDistance = set1.map(w => distance1(v, w)).min
+ if (closestDistance > EPSILON) {
+ fail(errorMessage)
+ }
+ }
+ }
+
+ test("single cluster") {
+ val data = sc.parallelize(Array(
+ Array(1.0, 2.0, 6.0),
+ Array(1.0, 3.0, 0.0),
+ Array(1.0, 4.0, 6.0)
+ ))
+
+ // No matter how many runs or iterations we use, we should get one cluster,
+ // centered at the mean of the points
+
+ var model = KMeans.train(data, k=1, maxIterations=1)
+ assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0)))
+
+ model = KMeans.train(data, k=1, maxIterations=2)
+ assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0)))
+
+ model = KMeans.train(data, k=1, maxIterations=5)
+ assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0)))
+
+ model = KMeans.train(data, k=1, maxIterations=1, runs=5)
+ assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0)))
+
+ model = KMeans.train(data, k=1, maxIterations=1, runs=5)
+ assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0)))
+
+ model = KMeans.train(data, k=1, maxIterations=1, runs=1, initializationMode=RANDOM)
+ assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0)))
+
+ model = KMeans.train(
+ data, k=1, maxIterations=1, runs=1, initializationMode=K_MEANS_PARALLEL)
+ assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0)))
+ }
+
+ test("single cluster with big dataset") {
+ val smallData = Array(
+ Array(1.0, 2.0, 6.0),
+ Array(1.0, 3.0, 0.0),
+ Array(1.0, 4.0, 6.0)
+ )
+ val data = sc.parallelize((1 to 100).flatMap(_ => smallData), 4)
+
+ // No matter how many runs or iterations we use, we should get one cluster,
+ // centered at the mean of the points
+
+ var model = KMeans.train(data, k=1, maxIterations=1)
+ assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0)))
+
+ model = KMeans.train(data, k=1, maxIterations=2)
+ assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0)))
+
+ model = KMeans.train(data, k=1, maxIterations=5)
+ assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0)))
+
+ model = KMeans.train(data, k=1, maxIterations=1, runs=5)
+ assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0)))
+
+ model = KMeans.train(data, k=1, maxIterations=1, runs=5)
+ assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0)))
+
+ model = KMeans.train(data, k=1, maxIterations=1, runs=1, initializationMode=RANDOM)
+ assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0)))
+
+ model = KMeans.train(data, k=1, maxIterations=1, runs=1, initializationMode=K_MEANS_PARALLEL)
+ assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0)))
+ }
+
+ test("k-means|| initialization") {
+ val points = Array(
+ Array(1.0, 2.0, 6.0),
+ Array(1.0, 3.0, 0.0),
+ Array(1.0, 4.0, 6.0),
+ Array(1.0, 0.0, 1.0),
+ Array(1.0, 1.0, 1.0)
+ )
+ val rdd = sc.parallelize(points)
+
+ // K-means|| initialization should place all clusters into distinct centers because
+ // it will make at least five passes, and it will give non-zero probability to each
+ // unselected point as long as it hasn't yet selected all of them
+
+ var model = KMeans.train(rdd, k=5, maxIterations=1)
+ assertSetsEqual(model.clusterCenters, points)
+
+ // Iterations of Lloyd's should not change the answer either
+ model = KMeans.train(rdd, k=5, maxIterations=10)
+ assertSetsEqual(model.clusterCenters, points)
+
+ // Neither should more runs
+ model = KMeans.train(rdd, k=5, maxIterations=10, runs=5)
+ assertSetsEqual(model.clusterCenters, points)
+ }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
new file mode 100644
index 0000000000..347ef238f4
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.recommendation
+
+import scala.collection.JavaConversions._
+import scala.util.Random
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+
+import org.jblas._
+
+object ALSSuite {
+
+ def generateRatingsAsJavaList(
+ users: Int,
+ products: Int,
+ features: Int,
+ samplingRate: Double): (java.util.List[Rating], DoubleMatrix) = {
+ val (sampledRatings, trueRatings) = generateRatings(users, products, features, samplingRate)
+ (seqAsJavaList(sampledRatings), trueRatings)
+ }
+
+ def generateRatings(
+ users: Int,
+ products: Int,
+ features: Int,
+ samplingRate: Double): (Seq[Rating], DoubleMatrix) = {
+ val rand = new Random(42)
+
+ // Create a random matrix with uniform values from -1 to 1
+ def randomMatrix(m: Int, n: Int) =
+ new DoubleMatrix(m, n, Array.fill(m * n)(rand.nextDouble() * 2 - 1): _*)
+
+ val userMatrix = randomMatrix(users, features)
+ val productMatrix = randomMatrix(features, products)
+ val trueRatings = userMatrix.mmul(productMatrix)
+
+ val sampledRatings = {
+ for (u <- 0 until users; p <- 0 until products if rand.nextDouble() < samplingRate)
+ yield Rating(u, p, trueRatings.get(u, p))
+ }
+
+ (sampledRatings, trueRatings)
+ }
+
+}
+
+
+class ALSSuite extends FunSuite with BeforeAndAfterAll {
+ @transient private var sc: SparkContext = _
+
+ override def beforeAll() {
+ sc = new SparkContext("local", "test")
+ }
+
+ override def afterAll() {
+ sc.stop()
+ System.clearProperty("spark.driver.port")
+ }
+
+ test("rank-1 matrices") {
+ testALS(10, 20, 1, 15, 0.7, 0.3)
+ }
+
+ test("rank-2 matrices") {
+ testALS(20, 30, 2, 15, 0.7, 0.3)
+ }
+
+ /**
+ * Test if we can correctly factorize R = U * P where U and P are of known rank.
+ *
+ * @param users number of users
+ * @param products number of products
+ * @param features number of features (rank of problem)
+ * @param iterations number of iterations to run
+ * @param samplingRate what fraction of the user-product pairs are known
+ * @param matchThreshold max difference allowed to consider a predicted rating correct
+ */
+ def testALS(users: Int, products: Int, features: Int, iterations: Int,
+ samplingRate: Double, matchThreshold: Double)
+ {
+ val (sampledRatings, trueRatings) = ALSSuite.generateRatings(users, products,
+ features, samplingRate)
+ val model = ALS.train(sc.parallelize(sampledRatings), features, iterations)
+
+ val predictedU = new DoubleMatrix(users, features)
+ for ((u, vec) <- model.userFeatures.collect(); i <- 0 until features) {
+ predictedU.put(u, i, vec(i))
+ }
+ val predictedP = new DoubleMatrix(products, features)
+ for ((p, vec) <- model.productFeatures.collect(); i <- 0 until features) {
+ predictedP.put(p, i, vec(i))
+ }
+ val predictedRatings = predictedU.mmul(predictedP.transpose)
+
+ for (u <- 0 until users; p <- 0 until products) {
+ val prediction = predictedRatings.get(u, p)
+ val correct = trueRatings.get(u, p)
+ if (math.abs(prediction - correct) > matchThreshold) {
+ fail("Model failed to predict (%d, %d): %f vs %f\ncorr: %s\npred: %s\nU: %s\n P: %s".format(
+ u, p, correct, prediction, trueRatings, predictedRatings, predictedU, predictedP))
+ }
+ }
+ }
+}
+
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala
new file mode 100644
index 0000000000..db980c7bae
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.regression
+
+import scala.collection.JavaConversions._
+import scala.util.Random
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext
+import org.apache.spark.mllib.util.LinearDataGenerator
+
+
+class LassoSuite extends FunSuite with BeforeAndAfterAll {
+ @transient private var sc: SparkContext = _
+
+ override def beforeAll() {
+ sc = new SparkContext("local", "test")
+ }
+
+
+ override def afterAll() {
+ sc.stop()
+ System.clearProperty("spark.driver.port")
+ }
+
+ def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) {
+ val numOffPredictions = predictions.zip(input).filter { case (prediction, expected) =>
+ // A prediction is off if the prediction is more than 0.5 away from expected value.
+ math.abs(prediction - expected.label) > 0.5
+ }.size
+ // At least 80% of the predictions should be on.
+ assert(numOffPredictions < input.length / 5)
+ }
+
+ test("Lasso local random SGD") {
+ val nPoints = 10000
+
+ val A = 2.0
+ val B = -1.5
+ val C = 1.0e-2
+
+ val testData = LinearDataGenerator.generateLinearInput(A, Array[Double](B,C), nPoints, 42)
+
+ val testRDD = sc.parallelize(testData, 2)
+ testRDD.cache()
+
+ val ls = new LassoWithSGD()
+ ls.optimizer.setStepSize(1.0).setRegParam(0.01).setNumIterations(20)
+
+ val model = ls.run(testRDD)
+
+ val weight0 = model.weights(0)
+ val weight1 = model.weights(1)
+ assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + " not in [1.9, 2.1]")
+ assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]")
+ assert(weight1 >= -1.0e-3 && weight1 <= 1.0e-3, weight1 + " not in [-0.001, 0.001]")
+
+ val validationData = LinearDataGenerator.generateLinearInput(A, Array[Double](B,C), nPoints, 17)
+ val validationRDD = sc.parallelize(validationData, 2)
+
+ // Test prediction on RDD.
+ validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData)
+
+ // Test prediction on Array.
+ validatePrediction(validationData.map(row => model.predict(row.features)), validationData)
+ }
+
+ test("Lasso local random SGD with initial weights") {
+ val nPoints = 10000
+
+ val A = 2.0
+ val B = -1.5
+ val C = 1.0e-2
+
+ val testData = LinearDataGenerator.generateLinearInput(A, Array[Double](B,C), nPoints, 42)
+
+ val initialB = -1.0
+ val initialC = -1.0
+ val initialWeights = Array(initialB,initialC)
+
+ val testRDD = sc.parallelize(testData, 2)
+ testRDD.cache()
+
+ val ls = new LassoWithSGD()
+ ls.optimizer.setStepSize(1.0).setRegParam(0.01).setNumIterations(20)
+
+ val model = ls.run(testRDD, initialWeights)
+
+ val weight0 = model.weights(0)
+ val weight1 = model.weights(1)
+ assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + " not in [1.9, 2.1]")
+ assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]")
+ assert(weight1 >= -1.0e-3 && weight1 <= 1.0e-3, weight1 + " not in [-0.001, 0.001]")
+
+ val validationData = LinearDataGenerator.generateLinearInput(A, Array[Double](B,C), nPoints, 17)
+ val validationRDD = sc.parallelize(validationData,2)
+
+ // Test prediction on RDD.
+ validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData)
+
+ // Test prediction on Array.
+ validatePrediction(validationData.map(row => model.predict(row.features)), validationData)
+ }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala
new file mode 100644
index 0000000000..ef500c704c
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.regression
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.util.LinearDataGenerator
+
+class LinearRegressionSuite extends FunSuite with BeforeAndAfterAll {
+ @transient private var sc: SparkContext = _
+
+ override def beforeAll() {
+ sc = new SparkContext("local", "test")
+ }
+
+ override def afterAll() {
+ sc.stop()
+ System.clearProperty("spark.driver.port")
+ }
+
+ def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) {
+ val numOffPredictions = predictions.zip(input).filter { case (prediction, expected) =>
+ // A prediction is off if the prediction is more than 0.5 away from expected value.
+ math.abs(prediction - expected.label) > 0.5
+ }.size
+ // At least 80% of the predictions should be on.
+ assert(numOffPredictions < input.length / 5)
+ }
+
+ // Test if we can correctly learn Y = 3 + 10*X1 + 10*X2
+ test("linear regression") {
+ val testRDD = sc.parallelize(LinearDataGenerator.generateLinearInput(
+ 3.0, Array(10.0, 10.0), 100, 42), 2).cache()
+ val linReg = new LinearRegressionWithSGD()
+ linReg.optimizer.setNumIterations(1000).setStepSize(1.0)
+
+ val model = linReg.run(testRDD)
+
+ assert(model.intercept >= 2.5 && model.intercept <= 3.5)
+ assert(model.weights.length === 2)
+ assert(model.weights(0) >= 9.0 && model.weights(0) <= 11.0)
+ assert(model.weights(1) >= 9.0 && model.weights(1) <= 11.0)
+
+ val validationData = LinearDataGenerator.generateLinearInput(
+ 3.0, Array(10.0, 10.0), 100, 17)
+ val validationRDD = sc.parallelize(validationData, 2).cache()
+
+ // Test prediction on RDD.
+ validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData)
+
+ // Test prediction on Array.
+ validatePrediction(validationData.map(row => model.predict(row.features)), validationData)
+ }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala
new file mode 100644
index 0000000000..c18092d804
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.regression
+
+import scala.collection.JavaConversions._
+import scala.util.Random
+
+import org.jblas.DoubleMatrix
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.util.LinearDataGenerator
+
+class RidgeRegressionSuite extends FunSuite with BeforeAndAfterAll {
+ @transient private var sc: SparkContext = _
+
+ override def beforeAll() {
+ sc = new SparkContext("local", "test")
+ }
+
+ override def afterAll() {
+ sc.stop()
+ System.clearProperty("spark.driver.port")
+ }
+
+ def predictionError(predictions: Seq[Double], input: Seq[LabeledPoint]) = {
+ predictions.zip(input).map { case (prediction, expected) =>
+ (prediction - expected.label) * (prediction - expected.label)
+ }.reduceLeft(_ + _) / predictions.size
+ }
+
+ test("regularization with skewed weights") {
+ val nexamples = 200
+ val nfeatures = 20
+ val eps = 10
+
+ org.jblas.util.Random.seed(42)
+ // Pick weights as random values distributed uniformly in [-0.5, 0.5]
+ val w = DoubleMatrix.rand(nfeatures, 1).subi(0.5)
+ // Set first two weights to eps
+ w.put(0, 0, eps)
+ w.put(1, 0, eps)
+
+ // Use half of data for training and other half for validation
+ val data = LinearDataGenerator.generateLinearInput(3.0, w.toArray, 2*nexamples, 42, eps)
+ val testData = data.take(nexamples)
+ val validationData = data.takeRight(nexamples)
+
+ val testRDD = sc.parallelize(testData, 2).cache()
+ val validationRDD = sc.parallelize(validationData, 2).cache()
+
+ // First run without regularization.
+ val linearReg = new LinearRegressionWithSGD()
+ linearReg.optimizer.setNumIterations(200)
+ .setStepSize(1.0)
+
+ val linearModel = linearReg.run(testRDD)
+ val linearErr = predictionError(
+ linearModel.predict(validationRDD.map(_.features)).collect(), validationData)
+
+ val ridgeReg = new RidgeRegressionWithSGD()
+ ridgeReg.optimizer.setNumIterations(200)
+ .setRegParam(0.1)
+ .setStepSize(1.0)
+ val ridgeModel = ridgeReg.run(testRDD)
+ val ridgeErr = predictionError(
+ ridgeModel.predict(validationRDD.map(_.features)).collect(), validationData)
+
+ // Ridge CV-error should be lower than linear regression
+ assert(ridgeErr < linearErr,
+ "ridgeError (" + ridgeErr + ") was not less than linearError(" + linearErr + ")")
+ }
+}