diff options
Diffstat (limited to 'mllib/src/test/scala/org')
7 files changed, 900 insertions, 0 deletions
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala new file mode 100644 index 0000000000..34c67294e9 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.classification + +import scala.util.Random +import scala.collection.JavaConversions._ + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.FunSuite +import org.scalatest.matchers.ShouldMatchers + +import org.apache.spark.SparkContext +import org.apache.spark.mllib.regression._ + +object LogisticRegressionSuite { + + def generateLogisticInputAsList( + offset: Double, + scale: Double, + nPoints: Int, + seed: Int): java.util.List[LabeledPoint] = { + seqAsJavaList(generateLogisticInput(offset, scale, nPoints, seed)) + } + + // Generate input of the form Y = logistic(offset + scale*X) + def generateLogisticInput( + offset: Double, + scale: Double, + nPoints: Int, + seed: Int): Seq[LabeledPoint] = { + val rnd = new Random(seed) + val x1 = Array.fill[Double](nPoints)(rnd.nextGaussian()) + + // NOTE: if U is uniform[0, 1] then ln(u) - ln(1-u) is Logistic(0,1) + val unifRand = new scala.util.Random(45) + val rLogis = (0 until nPoints).map { i => + val u = unifRand.nextDouble() + math.log(u) - math.log(1.0-u) + } + + // y <- A + B*x + rLogis() + // y <- as.numeric(y > 0) + val y: Seq[Int] = (0 until nPoints).map { i => + val yVal = offset + scale * x1(i) + rLogis(i) + if (yVal > 0) 1 else 0 + } + + val testData = (0 until nPoints).map(i => LabeledPoint(y(i), Array(x1(i)))) + testData + } + +} + +class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll with ShouldMatchers { + @transient private var sc: SparkContext = _ + + override def beforeAll() { + sc = new SparkContext("local", "test") + } + + + override def afterAll() { + sc.stop() + System.clearProperty("spark.driver.port") + } + + def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) { + val numOffPredictions = predictions.zip(input).filter { case (prediction, expected) => + (prediction != expected.label) + }.size + // At least 83% of the predictions should be on. + ((input.length - numOffPredictions).toDouble / input.length) should be > 0.83 + } + + // Test if we can correctly learn A, B where Y = logistic(A + B*X) + test("logistic regression") { + val nPoints = 10000 + val A = 2.0 + val B = -1.5 + + val testData = LogisticRegressionSuite.generateLogisticInput(A, B, nPoints, 42) + + val testRDD = sc.parallelize(testData, 2) + testRDD.cache() + val lr = new LogisticRegressionWithSGD() + lr.optimizer.setStepSize(10.0).setNumIterations(20) + + val model = lr.run(testRDD) + + // Test the weights + val weight0 = model.weights(0) + assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]") + assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + " not in [1.9, 2.1]") + + val validationData = LogisticRegressionSuite.generateLogisticInput(A, B, nPoints, 17) + val validationRDD = sc.parallelize(validationData, 2) + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + } + + test("logistic regression with initial weights") { + val nPoints = 10000 + val A = 2.0 + val B = -1.5 + + val testData = LogisticRegressionSuite.generateLogisticInput(A, B, nPoints, 42) + + val initialB = -1.0 + val initialWeights = Array(initialB) + + val testRDD = sc.parallelize(testData, 2) + testRDD.cache() + + // Use half as many iterations as the previous test. + val lr = new LogisticRegressionWithSGD() + lr.optimizer.setStepSize(10.0).setNumIterations(10) + + val model = lr.run(testRDD, initialWeights) + + val weight0 = model.weights(0) + assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]") + assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + " not in [1.9, 2.1]") + + val validationData = LogisticRegressionSuite.generateLogisticInput(A, B, nPoints, 17) + val validationRDD = sc.parallelize(validationData, 2) + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala new file mode 100644 index 0000000000..6a957e3ddc --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.classification + +import scala.util.Random +import scala.math.signum +import scala.collection.JavaConversions._ + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.FunSuite + +import org.jblas.DoubleMatrix + +import org.apache.spark.{SparkException, SparkContext} +import org.apache.spark.mllib.regression._ + +object SVMSuite { + + def generateSVMInputAsList( + intercept: Double, + weights: Array[Double], + nPoints: Int, + seed: Int): java.util.List[LabeledPoint] = { + seqAsJavaList(generateSVMInput(intercept, weights, nPoints, seed)) + } + + // Generate noisy input of the form Y = signum(x.dot(weights) + intercept + noise) + def generateSVMInput( + intercept: Double, + weights: Array[Double], + nPoints: Int, + seed: Int): Seq[LabeledPoint] = { + val rnd = new Random(seed) + val weightsMat = new DoubleMatrix(1, weights.length, weights:_*) + val x = Array.fill[Array[Double]](nPoints)( + Array.fill[Double](weights.length)(rnd.nextDouble() * 2.0 - 1.0)) + val y = x.map { xi => + val yD = (new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) + + intercept + 0.01 * rnd.nextGaussian() + if (yD < 0) 0.0 else 1.0 + } + y.zip(x).map(p => LabeledPoint(p._1, p._2)) + } + +} + +class SVMSuite extends FunSuite with BeforeAndAfterAll { + @transient private var sc: SparkContext = _ + + override def beforeAll() { + sc = new SparkContext("local", "test") + } + + override def afterAll() { + sc.stop() + System.clearProperty("spark.driver.port") + } + + def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) { + val numOffPredictions = predictions.zip(input).filter { case (prediction, expected) => + (prediction != expected.label) + }.size + // At least 80% of the predictions should be on. + assert(numOffPredictions < input.length / 5) + } + + + test("SVM using local random SGD") { + val nPoints = 10000 + + // NOTE: Intercept should be small for generating equal 0s and 1s + val A = 0.01 + val B = -1.5 + val C = 1.0 + + val testData = SVMSuite.generateSVMInput(A, Array[Double](B,C), nPoints, 42) + + val testRDD = sc.parallelize(testData, 2) + testRDD.cache() + + val svm = new SVMWithSGD() + svm.optimizer.setStepSize(1.0).setRegParam(1.0).setNumIterations(100) + + val model = svm.run(testRDD) + + val validationData = SVMSuite.generateSVMInput(A, Array[Double](B,C), nPoints, 17) + val validationRDD = sc.parallelize(validationData, 2) + + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + } + + test("SVM local random SGD with initial weights") { + val nPoints = 10000 + + // NOTE: Intercept should be small for generating equal 0s and 1s + val A = 0.01 + val B = -1.5 + val C = 1.0 + + val testData = SVMSuite.generateSVMInput(A, Array[Double](B,C), nPoints, 42) + + val initialB = -1.0 + val initialC = -1.0 + val initialWeights = Array(initialB,initialC) + + val testRDD = sc.parallelize(testData, 2) + testRDD.cache() + + val svm = new SVMWithSGD() + svm.optimizer.setStepSize(1.0).setRegParam(1.0).setNumIterations(100) + + val model = svm.run(testRDD, initialWeights) + + val validationData = SVMSuite.generateSVMInput(A, Array[Double](B,C), nPoints, 17) + val validationRDD = sc.parallelize(validationData,2) + + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + } + + test("SVM with invalid labels") { + val nPoints = 10000 + + // NOTE: Intercept should be small for generating equal 0s and 1s + val A = 0.01 + val B = -1.5 + val C = 1.0 + + val testData = SVMSuite.generateSVMInput(A, Array[Double](B,C), nPoints, 42) + val testRDD = sc.parallelize(testData, 2) + + val testRDDInvalid = testRDD.map { lp => + if (lp.label == 0.0) { + LabeledPoint(-1.0, lp.features) + } else { + lp + } + } + + intercept[SparkException] { + val model = SVMWithSGD.train(testRDDInvalid, 100) + } + + // Turning off data validation should not throw an exception + val noValidationModel = new SVMWithSGD().setValidateData(false).run(testRDDInvalid) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala new file mode 100644 index 0000000000..94245f6027 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import scala.util.Random + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ + +import org.jblas._ + +class KMeansSuite extends FunSuite with BeforeAndAfterAll { + @transient private var sc: SparkContext = _ + + override def beforeAll() { + sc = new SparkContext("local", "test") + } + + override def afterAll() { + sc.stop() + System.clearProperty("spark.driver.port") + } + + val EPSILON = 1e-4 + + import KMeans.{RANDOM, K_MEANS_PARALLEL} + + def prettyPrint(point: Array[Double]): String = point.mkString("(", ", ", ")") + + def prettyPrint(points: Array[Array[Double]]): String = { + points.map(prettyPrint).mkString("(", "; ", ")") + } + + // L1 distance between two points + def distance1(v1: Array[Double], v2: Array[Double]): Double = { + v1.zip(v2).map{ case (a, b) => math.abs(a-b) }.max + } + + // Assert that two vectors are equal within tolerance EPSILON + def assertEqual(v1: Array[Double], v2: Array[Double]) { + def errorMessage = prettyPrint(v1) + " did not equal " + prettyPrint(v2) + assert(v1.length == v2.length, errorMessage) + assert(distance1(v1, v2) <= EPSILON, errorMessage) + } + + // Assert that two sets of points are equal, within EPSILON tolerance + def assertSetsEqual(set1: Array[Array[Double]], set2: Array[Array[Double]]) { + def errorMessage = prettyPrint(set1) + " did not equal " + prettyPrint(set2) + assert(set1.length == set2.length, errorMessage) + for (v <- set1) { + val closestDistance = set2.map(w => distance1(v, w)).min + if (closestDistance > EPSILON) { + fail(errorMessage) + } + } + for (v <- set2) { + val closestDistance = set1.map(w => distance1(v, w)).min + if (closestDistance > EPSILON) { + fail(errorMessage) + } + } + } + + test("single cluster") { + val data = sc.parallelize(Array( + Array(1.0, 2.0, 6.0), + Array(1.0, 3.0, 0.0), + Array(1.0, 4.0, 6.0) + )) + + // No matter how many runs or iterations we use, we should get one cluster, + // centered at the mean of the points + + var model = KMeans.train(data, k=1, maxIterations=1) + assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + + model = KMeans.train(data, k=1, maxIterations=2) + assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + + model = KMeans.train(data, k=1, maxIterations=5) + assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + + model = KMeans.train(data, k=1, maxIterations=1, runs=5) + assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + + model = KMeans.train(data, k=1, maxIterations=1, runs=5) + assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + + model = KMeans.train(data, k=1, maxIterations=1, runs=1, initializationMode=RANDOM) + assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + + model = KMeans.train( + data, k=1, maxIterations=1, runs=1, initializationMode=K_MEANS_PARALLEL) + assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + } + + test("single cluster with big dataset") { + val smallData = Array( + Array(1.0, 2.0, 6.0), + Array(1.0, 3.0, 0.0), + Array(1.0, 4.0, 6.0) + ) + val data = sc.parallelize((1 to 100).flatMap(_ => smallData), 4) + + // No matter how many runs or iterations we use, we should get one cluster, + // centered at the mean of the points + + var model = KMeans.train(data, k=1, maxIterations=1) + assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + + model = KMeans.train(data, k=1, maxIterations=2) + assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + + model = KMeans.train(data, k=1, maxIterations=5) + assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + + model = KMeans.train(data, k=1, maxIterations=1, runs=5) + assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + + model = KMeans.train(data, k=1, maxIterations=1, runs=5) + assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + + model = KMeans.train(data, k=1, maxIterations=1, runs=1, initializationMode=RANDOM) + assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + + model = KMeans.train(data, k=1, maxIterations=1, runs=1, initializationMode=K_MEANS_PARALLEL) + assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0))) + } + + test("k-means|| initialization") { + val points = Array( + Array(1.0, 2.0, 6.0), + Array(1.0, 3.0, 0.0), + Array(1.0, 4.0, 6.0), + Array(1.0, 0.0, 1.0), + Array(1.0, 1.0, 1.0) + ) + val rdd = sc.parallelize(points) + + // K-means|| initialization should place all clusters into distinct centers because + // it will make at least five passes, and it will give non-zero probability to each + // unselected point as long as it hasn't yet selected all of them + + var model = KMeans.train(rdd, k=5, maxIterations=1) + assertSetsEqual(model.clusterCenters, points) + + // Iterations of Lloyd's should not change the answer either + model = KMeans.train(rdd, k=5, maxIterations=10) + assertSetsEqual(model.clusterCenters, points) + + // Neither should more runs + model = KMeans.train(rdd, k=5, maxIterations=10, runs=5) + assertSetsEqual(model.clusterCenters, points) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala new file mode 100644 index 0000000000..347ef238f4 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.recommendation + +import scala.collection.JavaConversions._ +import scala.util.Random + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ + +import org.jblas._ + +object ALSSuite { + + def generateRatingsAsJavaList( + users: Int, + products: Int, + features: Int, + samplingRate: Double): (java.util.List[Rating], DoubleMatrix) = { + val (sampledRatings, trueRatings) = generateRatings(users, products, features, samplingRate) + (seqAsJavaList(sampledRatings), trueRatings) + } + + def generateRatings( + users: Int, + products: Int, + features: Int, + samplingRate: Double): (Seq[Rating], DoubleMatrix) = { + val rand = new Random(42) + + // Create a random matrix with uniform values from -1 to 1 + def randomMatrix(m: Int, n: Int) = + new DoubleMatrix(m, n, Array.fill(m * n)(rand.nextDouble() * 2 - 1): _*) + + val userMatrix = randomMatrix(users, features) + val productMatrix = randomMatrix(features, products) + val trueRatings = userMatrix.mmul(productMatrix) + + val sampledRatings = { + for (u <- 0 until users; p <- 0 until products if rand.nextDouble() < samplingRate) + yield Rating(u, p, trueRatings.get(u, p)) + } + + (sampledRatings, trueRatings) + } + +} + + +class ALSSuite extends FunSuite with BeforeAndAfterAll { + @transient private var sc: SparkContext = _ + + override def beforeAll() { + sc = new SparkContext("local", "test") + } + + override def afterAll() { + sc.stop() + System.clearProperty("spark.driver.port") + } + + test("rank-1 matrices") { + testALS(10, 20, 1, 15, 0.7, 0.3) + } + + test("rank-2 matrices") { + testALS(20, 30, 2, 15, 0.7, 0.3) + } + + /** + * Test if we can correctly factorize R = U * P where U and P are of known rank. + * + * @param users number of users + * @param products number of products + * @param features number of features (rank of problem) + * @param iterations number of iterations to run + * @param samplingRate what fraction of the user-product pairs are known + * @param matchThreshold max difference allowed to consider a predicted rating correct + */ + def testALS(users: Int, products: Int, features: Int, iterations: Int, + samplingRate: Double, matchThreshold: Double) + { + val (sampledRatings, trueRatings) = ALSSuite.generateRatings(users, products, + features, samplingRate) + val model = ALS.train(sc.parallelize(sampledRatings), features, iterations) + + val predictedU = new DoubleMatrix(users, features) + for ((u, vec) <- model.userFeatures.collect(); i <- 0 until features) { + predictedU.put(u, i, vec(i)) + } + val predictedP = new DoubleMatrix(products, features) + for ((p, vec) <- model.productFeatures.collect(); i <- 0 until features) { + predictedP.put(p, i, vec(i)) + } + val predictedRatings = predictedU.mmul(predictedP.transpose) + + for (u <- 0 until users; p <- 0 until products) { + val prediction = predictedRatings.get(u, p) + val correct = trueRatings.get(u, p) + if (math.abs(prediction - correct) > matchThreshold) { + fail("Model failed to predict (%d, %d): %f vs %f\ncorr: %s\npred: %s\nU: %s\n P: %s".format( + u, p, correct, prediction, trueRatings, predictedRatings, predictedU, predictedP)) + } + } + } +} + diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala new file mode 100644 index 0000000000..db980c7bae --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.regression + +import scala.collection.JavaConversions._ +import scala.util.Random + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext +import org.apache.spark.mllib.util.LinearDataGenerator + + +class LassoSuite extends FunSuite with BeforeAndAfterAll { + @transient private var sc: SparkContext = _ + + override def beforeAll() { + sc = new SparkContext("local", "test") + } + + + override def afterAll() { + sc.stop() + System.clearProperty("spark.driver.port") + } + + def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) { + val numOffPredictions = predictions.zip(input).filter { case (prediction, expected) => + // A prediction is off if the prediction is more than 0.5 away from expected value. + math.abs(prediction - expected.label) > 0.5 + }.size + // At least 80% of the predictions should be on. + assert(numOffPredictions < input.length / 5) + } + + test("Lasso local random SGD") { + val nPoints = 10000 + + val A = 2.0 + val B = -1.5 + val C = 1.0e-2 + + val testData = LinearDataGenerator.generateLinearInput(A, Array[Double](B,C), nPoints, 42) + + val testRDD = sc.parallelize(testData, 2) + testRDD.cache() + + val ls = new LassoWithSGD() + ls.optimizer.setStepSize(1.0).setRegParam(0.01).setNumIterations(20) + + val model = ls.run(testRDD) + + val weight0 = model.weights(0) + val weight1 = model.weights(1) + assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + " not in [1.9, 2.1]") + assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]") + assert(weight1 >= -1.0e-3 && weight1 <= 1.0e-3, weight1 + " not in [-0.001, 0.001]") + + val validationData = LinearDataGenerator.generateLinearInput(A, Array[Double](B,C), nPoints, 17) + val validationRDD = sc.parallelize(validationData, 2) + + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + } + + test("Lasso local random SGD with initial weights") { + val nPoints = 10000 + + val A = 2.0 + val B = -1.5 + val C = 1.0e-2 + + val testData = LinearDataGenerator.generateLinearInput(A, Array[Double](B,C), nPoints, 42) + + val initialB = -1.0 + val initialC = -1.0 + val initialWeights = Array(initialB,initialC) + + val testRDD = sc.parallelize(testData, 2) + testRDD.cache() + + val ls = new LassoWithSGD() + ls.optimizer.setStepSize(1.0).setRegParam(0.01).setNumIterations(20) + + val model = ls.run(testRDD, initialWeights) + + val weight0 = model.weights(0) + val weight1 = model.weights(1) + assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + " not in [1.9, 2.1]") + assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]") + assert(weight1 >= -1.0e-3 && weight1 <= 1.0e-3, weight1 + " not in [-0.001, 0.001]") + + val validationData = LinearDataGenerator.generateLinearInput(A, Array[Double](B,C), nPoints, 17) + val validationRDD = sc.parallelize(validationData,2) + + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala new file mode 100644 index 0000000000..ef500c704c --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.regression + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.util.LinearDataGenerator + +class LinearRegressionSuite extends FunSuite with BeforeAndAfterAll { + @transient private var sc: SparkContext = _ + + override def beforeAll() { + sc = new SparkContext("local", "test") + } + + override def afterAll() { + sc.stop() + System.clearProperty("spark.driver.port") + } + + def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) { + val numOffPredictions = predictions.zip(input).filter { case (prediction, expected) => + // A prediction is off if the prediction is more than 0.5 away from expected value. + math.abs(prediction - expected.label) > 0.5 + }.size + // At least 80% of the predictions should be on. + assert(numOffPredictions < input.length / 5) + } + + // Test if we can correctly learn Y = 3 + 10*X1 + 10*X2 + test("linear regression") { + val testRDD = sc.parallelize(LinearDataGenerator.generateLinearInput( + 3.0, Array(10.0, 10.0), 100, 42), 2).cache() + val linReg = new LinearRegressionWithSGD() + linReg.optimizer.setNumIterations(1000).setStepSize(1.0) + + val model = linReg.run(testRDD) + + assert(model.intercept >= 2.5 && model.intercept <= 3.5) + assert(model.weights.length === 2) + assert(model.weights(0) >= 9.0 && model.weights(0) <= 11.0) + assert(model.weights(1) >= 9.0 && model.weights(1) <= 11.0) + + val validationData = LinearDataGenerator.generateLinearInput( + 3.0, Array(10.0, 10.0), 100, 17) + val validationRDD = sc.parallelize(validationData, 2).cache() + + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala new file mode 100644 index 0000000000..c18092d804 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.regression + +import scala.collection.JavaConversions._ +import scala.util.Random + +import org.jblas.DoubleMatrix +import org.scalatest.BeforeAndAfterAll +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.util.LinearDataGenerator + +class RidgeRegressionSuite extends FunSuite with BeforeAndAfterAll { + @transient private var sc: SparkContext = _ + + override def beforeAll() { + sc = new SparkContext("local", "test") + } + + override def afterAll() { + sc.stop() + System.clearProperty("spark.driver.port") + } + + def predictionError(predictions: Seq[Double], input: Seq[LabeledPoint]) = { + predictions.zip(input).map { case (prediction, expected) => + (prediction - expected.label) * (prediction - expected.label) + }.reduceLeft(_ + _) / predictions.size + } + + test("regularization with skewed weights") { + val nexamples = 200 + val nfeatures = 20 + val eps = 10 + + org.jblas.util.Random.seed(42) + // Pick weights as random values distributed uniformly in [-0.5, 0.5] + val w = DoubleMatrix.rand(nfeatures, 1).subi(0.5) + // Set first two weights to eps + w.put(0, 0, eps) + w.put(1, 0, eps) + + // Use half of data for training and other half for validation + val data = LinearDataGenerator.generateLinearInput(3.0, w.toArray, 2*nexamples, 42, eps) + val testData = data.take(nexamples) + val validationData = data.takeRight(nexamples) + + val testRDD = sc.parallelize(testData, 2).cache() + val validationRDD = sc.parallelize(validationData, 2).cache() + + // First run without regularization. + val linearReg = new LinearRegressionWithSGD() + linearReg.optimizer.setNumIterations(200) + .setStepSize(1.0) + + val linearModel = linearReg.run(testRDD) + val linearErr = predictionError( + linearModel.predict(validationRDD.map(_.features)).collect(), validationData) + + val ridgeReg = new RidgeRegressionWithSGD() + ridgeReg.optimizer.setNumIterations(200) + .setRegParam(0.1) + .setStepSize(1.0) + val ridgeModel = ridgeReg.run(testRDD) + val ridgeErr = predictionError( + ridgeModel.predict(validationRDD.map(_.features)).collect(), validationData) + + // Ridge CV-error should be lower than linear regression + assert(ridgeErr < linearErr, + "ridgeError (" + ridgeErr + ") was not less than linearError(" + linearErr + ")") + } +} |