diff options
author | Prashant Sharma <prashant.s@imaginea.com> | 2013-09-06 17:53:01 +0530 |
---|---|---|
committer | Prashant Sharma <prashant.s@imaginea.com> | 2013-09-06 17:53:01 +0530 |
commit | 4106ae9fbf8a582697deba2198b3b966dec00bfe (patch) | |
tree | 7c3046faee5f62f9ec4c4176125988d7cb5d70e2 /mllib/src | |
parent | e0dd24dc858777904335218f3001a24bffe73b27 (diff) | |
parent | 5c7494d7c1b7301138fb3dc155a1b0c961126ec6 (diff) | |
download | spark-4106ae9fbf8a582697deba2198b3b966dec00bfe.tar.gz spark-4106ae9fbf8a582697deba2198b3b966dec00bfe.tar.bz2 spark-4106ae9fbf8a582697deba2198b3b966dec00bfe.zip |
Merged with master
Diffstat (limited to 'mllib/src')
53 files changed, 3913 insertions, 909 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/ClassificationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/ClassificationModel.scala new file mode 100644 index 0000000000..391f5b9b7a --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/ClassificationModel.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.classification + +import org.apache.spark.rdd.RDD + +trait ClassificationModel extends Serializable { + /** + * Predict values for the given data set using the model trained. + * + * @param testData RDD representing data points to be predicted + * @return RDD[Int] where each entry contains the corresponding prediction + */ + def predict(testData: RDD[Array[Double]]): RDD[Double] + + /** + * Predict values for a single data point using the model trained. + * + * @param testData array representing a single data point + * @return Int prediction from the trained model + */ + def predict(testData: Array[Double]): Double +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala new file mode 100644 index 0000000000..50aede9c07 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.classification + +import scala.math.round + +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.optimization._ +import org.apache.spark.mllib.regression._ +import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.mllib.util.DataValidators + +import org.jblas.DoubleMatrix + +/** + * Classification model trained using Logistic Regression. + * + * @param weights Weights computed for every feature. + * @param intercept Intercept computed for this model. + */ +class LogisticRegressionModel( + override val weights: Array[Double], + override val intercept: Double) + extends GeneralizedLinearModel(weights, intercept) + with ClassificationModel with Serializable { + + override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix, + intercept: Double) = { + val margin = dataMatrix.mmul(weightMatrix).get(0) + intercept + round(1.0/ (1.0 + math.exp(margin * -1))) + } +} + +/** + * Train a classification model for Logistic Regression using Stochastic Gradient Descent. + * NOTE: Labels used in Logistic Regression should be {0, 1} + */ +class LogisticRegressionWithSGD private ( + var stepSize: Double, + var numIterations: Int, + var regParam: Double, + var miniBatchFraction: Double) + extends GeneralizedLinearAlgorithm[LogisticRegressionModel] + with Serializable { + + val gradient = new LogisticGradient() + val updater = new SimpleUpdater() + override val optimizer = new GradientDescent(gradient, updater) + .setStepSize(stepSize) + .setNumIterations(numIterations) + .setRegParam(regParam) + .setMiniBatchFraction(miniBatchFraction) + override val validators = List(DataValidators.classificationLabels) + + /** + * Construct a LogisticRegression object with default parameters + */ + def this() = this(1.0, 100, 0.0, 1.0) + + def createModel(weights: Array[Double], intercept: Double) = { + new LogisticRegressionModel(weights, intercept) + } +} + +/** + * Top-level methods for calling Logistic Regression. + * NOTE: Labels used in Logistic Regression should be {0, 1} + */ +object LogisticRegressionWithSGD { + // NOTE(shivaram): We use multiple train methods instead of default arguments to support + // Java programs. + + /** + * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed + * number of iterations of gradient descent using the specified step size. Each iteration uses + * `miniBatchFraction` fraction of the data to calculate the gradient. The weights used in + * gradient descent are initialized using the initial weights provided. + * NOTE: Labels used in Logistic Regression should be {0, 1} + * + * @param input RDD of (label, array of features) pairs. + * @param numIterations Number of iterations of gradient descent to run. + * @param stepSize Step size to be used for each iteration of gradient descent. + * @param miniBatchFraction Fraction of data to be used per iteration. + * @param initialWeights Initial set of weights to be used. Array should be equal in size to + * the number of features in the data. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double, + miniBatchFraction: Double, + initialWeights: Array[Double]) + : LogisticRegressionModel = + { + new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction).run( + input, initialWeights) + } + + /** + * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed + * number of iterations of gradient descent using the specified step size. Each iteration uses + * `miniBatchFraction` fraction of the data to calculate the gradient. + * NOTE: Labels used in Logistic Regression should be {0, 1} + * + * @param input RDD of (label, array of features) pairs. + * @param numIterations Number of iterations of gradient descent to run. + * @param stepSize Step size to be used for each iteration of gradient descent. + + * @param miniBatchFraction Fraction of data to be used per iteration. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double, + miniBatchFraction: Double) + : LogisticRegressionModel = + { + new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction).run( + input) + } + + /** + * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed + * number of iterations of gradient descent using the specified step size. We use the entire data + * set to update the gradient in each iteration. + * NOTE: Labels used in Logistic Regression should be {0, 1} + * + * @param input RDD of (label, array of features) pairs. + * @param stepSize Step size to be used for each iteration of Gradient Descent. + + * @param numIterations Number of iterations of gradient descent to run. + * @return a LogisticRegressionModel which has the weights and offset from training. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double) + : LogisticRegressionModel = + { + train(input, numIterations, stepSize, 1.0) + } + + /** + * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed + * number of iterations of gradient descent using a step size of 1.0. We use the entire data set + * to update the gradient in each iteration. + * NOTE: Labels used in Logistic Regression should be {0, 1} + * + * @param input RDD of (label, array of features) pairs. + * @param numIterations Number of iterations of gradient descent to run. + * @return a LogisticRegressionModel which has the weights and offset from training. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int) + : LogisticRegressionModel = + { + train(input, numIterations, 1.0, 1.0) + } + + def main(args: Array[String]) { + if (args.length != 4) { + println("Usage: LogisticRegression <master> <input_dir> <step_size> " + + "<niters>") + System.exit(1) + } + val sc = new SparkContext(args(0), "LogisticRegression") + val data = MLUtils.loadLabeledData(sc, args(1)) + val model = LogisticRegressionWithSGD.train(data, args(3).toInt, args(2).toDouble) + + sc.stop() + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala new file mode 100644 index 0000000000..3511e24bce --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.classification + +import scala.math.signum + +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.optimization._ +import org.apache.spark.mllib.regression._ +import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.mllib.util.DataValidators + +import org.jblas.DoubleMatrix + +/** + * Model built using SVM. + * + * @param weights Weights computed for every feature. + * @param intercept Intercept computed for this model. + */ +class SVMModel( + override val weights: Array[Double], + override val intercept: Double) + extends GeneralizedLinearModel(weights, intercept) + with ClassificationModel with Serializable { + + override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix, + intercept: Double) = { + val margin = dataMatrix.dot(weightMatrix) + intercept + if (margin < 0) 0.0 else 1.0 + } +} + +/** + * Train an SVM using Stochastic Gradient Descent. + * NOTE: Labels used in SVM should be {0, 1} + */ +class SVMWithSGD private ( + var stepSize: Double, + var numIterations: Int, + var regParam: Double, + var miniBatchFraction: Double) + extends GeneralizedLinearAlgorithm[SVMModel] with Serializable { + + val gradient = new HingeGradient() + val updater = new SquaredL2Updater() + override val optimizer = new GradientDescent(gradient, updater) + .setStepSize(stepSize) + .setNumIterations(numIterations) + .setRegParam(regParam) + .setMiniBatchFraction(miniBatchFraction) + + override val validators = List(DataValidators.classificationLabels) + + /** + * Construct a SVM object with default parameters + */ + def this() = this(1.0, 100, 1.0, 1.0) + + def createModel(weights: Array[Double], intercept: Double) = { + new SVMModel(weights, intercept) + } +} + +/** + * Top-level methods for calling SVM. NOTE: Labels used in SVM should be {0, 1} + */ +object SVMWithSGD { + + /** + * Train a SVM model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using the specified step size. Each iteration uses + * `miniBatchFraction` fraction of the data to calculate the gradient. The weights used in + * gradient descent are initialized using the initial weights provided. + * NOTE: Labels used in SVM should be {0, 1} + * + * @param input RDD of (label, array of features) pairs. + * @param numIterations Number of iterations of gradient descent to run. + * @param stepSize Step size to be used for each iteration of gradient descent. + * @param regParam Regularization parameter. + * @param miniBatchFraction Fraction of data to be used per iteration. + * @param initialWeights Initial set of weights to be used. Array should be equal in size to + * the number of features in the data. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double, + regParam: Double, + miniBatchFraction: Double, + initialWeights: Array[Double]) + : SVMModel = + { + new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input, + initialWeights) + } + + /** + * Train a SVM model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using the specified step size. Each iteration uses + * `miniBatchFraction` fraction of the data to calculate the gradient. + * NOTE: Labels used in SVM should be {0, 1} + * + * @param input RDD of (label, array of features) pairs. + * @param numIterations Number of iterations of gradient descent to run. + * @param stepSize Step size to be used for each iteration of gradient descent. + * @param regParam Regularization parameter. + * @param miniBatchFraction Fraction of data to be used per iteration. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double, + regParam: Double, + miniBatchFraction: Double) + : SVMModel = + { + new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input) + } + + /** + * Train a SVM model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using the specified step size. We use the entire data set to + * update the gradient in each iteration. + * NOTE: Labels used in SVM should be {0, 1} + * + * @param input RDD of (label, array of features) pairs. + * @param stepSize Step size to be used for each iteration of Gradient Descent. + * @param regParam Regularization parameter. + * @param numIterations Number of iterations of gradient descent to run. + * @return a SVMModel which has the weights and offset from training. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double, + regParam: Double) + : SVMModel = + { + train(input, numIterations, stepSize, regParam, 1.0) + } + + /** + * Train a SVM model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using a step size of 1.0. We use the entire data set to + * update the gradient in each iteration. + * NOTE: Labels used in SVM should be {0, 1} + * + * @param input RDD of (label, array of features) pairs. + * @param numIterations Number of iterations of gradient descent to run. + * @return a SVMModel which has the weights and offset from training. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int) + : SVMModel = + { + train(input, numIterations, 1.0, 1.0, 1.0) + } + + def main(args: Array[String]) { + if (args.length != 5) { + println("Usage: SVM <master> <input_dir> <step_size> <regularization_parameter> <niters>") + System.exit(1) + } + val sc = new SparkContext(args(0), "SVM") + val data = MLUtils.loadLabeledData(sc, args(1)) + val model = SVMWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) + + sc.stop() + } +} diff --git a/mllib/src/main/scala/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index b0e141ff32..edbf77dbcc 100644 --- a/mllib/src/main/scala/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -1,12 +1,30 @@ -package spark.mllib.clustering +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering import scala.collection.mutable.ArrayBuffer import scala.util.Random -import spark.{SparkContext, RDD} -import spark.SparkContext._ -import spark.Logging -import spark.mllib.util.MLUtils +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD +import org.apache.spark.Logging +import org.apache.spark.mllib.util.MLUtils import org.jblas.DoubleMatrix @@ -95,7 +113,7 @@ class KMeans private ( * Train a K-means model on the given set of points; `data` should be cached for high * performance, because this is an iterative algorithm. */ - def train(data: RDD[Array[Double]]): KMeansModel = { + def run(data: RDD[Array[Double]]): KMeansModel = { // TODO: check whether data is persistent; this needs RDD.storageLevel to be publicly readable val sc = data.sparkContext @@ -177,8 +195,8 @@ class KMeans private ( */ private def initRandom(data: RDD[Array[Double]]): Array[ClusterCenters] = { // Sample all the cluster centers in one pass to avoid repeated scans - val sample = data.takeSample(true, runs * k, new Random().nextInt()) - Array.tabulate(runs)(r => sample.slice(r * k, (r + 1) * k)) + val sample = data.takeSample(true, runs * k, new Random().nextInt()).toSeq + Array.tabulate(runs)(r => sample.slice(r * k, (r + 1) * k).toArray) } /** @@ -193,7 +211,7 @@ class KMeans private ( private def initKMeansParallel(data: RDD[Array[Double]]): Array[ClusterCenters] = { // Initialize each run's center to a random point val seed = new Random().nextInt() - val sample = data.takeSample(true, runs, seed) + val sample = data.takeSample(true, runs, seed).toSeq val centers = Array.tabulate(runs)(r => ArrayBuffer(sample(r))) // On each step, sample 2 * k points on average for each run with probability proportional @@ -254,7 +272,7 @@ object KMeans { .setMaxIterations(maxIterations) .setRuns(runs) .setInitializationMode(initializationMode) - .train(data) + .run(data) } def train(data: RDD[Array[Double]], k: Int, maxIterations: Int, runs: Int): KMeansModel = { @@ -298,14 +316,15 @@ object KMeans { } def main(args: Array[String]) { - if (args.length != 4) { - println("Usage: KMeans <master> <input_file> <k> <max_iterations>") + if (args.length < 4) { + println("Usage: KMeans <master> <input_file> <k> <max_iterations> [<runs>]") System.exit(1) } val (master, inputFile, k, iters) = (args(0), args(1), args(2).toInt, args(3).toInt) + val runs = if (args.length >= 5) args(4).toInt else 1 val sc = new SparkContext(master, "KMeans") - val data = sc.textFile(inputFile).map(line => line.split(' ').map(_.toDouble)) - val model = KMeans.train(data, k, iters) + val data = sc.textFile(inputFile).map(line => line.split(' ').map(_.toDouble)).cache() + val model = KMeans.train(data, k, iters, runs) val cost = model.computeCost(data) println("Cluster centers:") for (c <- model.clusterCenters) { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala new file mode 100644 index 0000000000..cfc81c985a --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import org.apache.spark.rdd.RDD +import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.util.MLUtils + + +/** + * A clustering model for K-means. Each point belongs to the cluster with the closest center. + */ +class KMeansModel(val clusterCenters: Array[Array[Double]]) extends Serializable { + /** Total number of clusters. */ + def k: Int = clusterCenters.length + + /** Return the cluster index that a given point belongs to. */ + def predict(point: Array[Double]): Int = { + KMeans.findClosest(clusterCenters, point)._1 + } + + /** + * Return the K-means cost (sum of squared distances of points to their nearest center) for this + * model on the given data. + */ + def computeCost(data: RDD[Array[Double]]): Double = { + data.map(p => KMeans.pointCost(clusterCenters, p)).sum + } +} diff --git a/mllib/src/main/scala/spark/mllib/clustering/LocalKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala index e12b3be251..baf8251d8f 100644 --- a/mllib/src/main/scala/spark/mllib/clustering/LocalKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala @@ -1,4 +1,21 @@ -package spark.mllib.clustering +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering import scala.util.Random diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala new file mode 100644 index 0000000000..749e7364f4 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.optimization + +import org.jblas.DoubleMatrix + +/** + * Class used to compute the gradient for a loss function, given a single data point. + */ +abstract class Gradient extends Serializable { + /** + * Compute the gradient and loss given features of a single data point. + * + * @param data - Feature values for one data point. Column matrix of size nx1 + * where n is the number of features. + * @param label - Label for this data item. + * @param weights - Column matrix containing weights for every feature. + * + * @return A tuple of 2 elements. The first element is a column matrix containing the computed + * gradient and the second element is the loss computed at this data point. + * + */ + def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix): + (DoubleMatrix, Double) +} + +/** + * Compute gradient and loss for a logistic loss function. + */ +class LogisticGradient extends Gradient { + override def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix): + (DoubleMatrix, Double) = { + val margin: Double = -1.0 * data.dot(weights) + val gradientMultiplier = (1.0 / (1.0 + math.exp(margin))) - label + + val gradient = data.mul(gradientMultiplier) + val loss = + if (margin > 0) { + math.log(1 + math.exp(0 - margin)) + } else { + math.log(1 + math.exp(margin)) - margin + } + + (gradient, loss) + } +} + +/** + * Compute gradient and loss for a Least-squared loss function. + */ +class SquaredGradient extends Gradient { + override def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix): + (DoubleMatrix, Double) = { + val diff: Double = data.dot(weights) - label + + val loss = 0.5 * diff * diff + val gradient = data.mul(diff) + + (gradient, loss) + } +} + +/** + * Compute gradient and loss for a Hinge loss function. + * NOTE: This assumes that the labels are {0,1} + */ +class HingeGradient extends Gradient { + override def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix): + (DoubleMatrix, Double) = { + + val dotProduct = data.dot(weights) + + // Our loss function with {0, 1} labels is max(0, 1 - (2y – 1) (f_w(x))) + // Therefore the gradient is -(2y - 1)*x + val labelScaled = 2 * label - 1.0 + + if (1.0 > labelScaled * dotProduct) { + (data.mul(-labelScaled), 1.0 - labelScaled * dotProduct) + } else { + (DoubleMatrix.zeros(1, weights.length), 0.0) + } + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala new file mode 100644 index 0000000000..b77364e08d --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.optimization + +import org.apache.spark.{Logging, SparkContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.SparkContext._ + +import org.jblas.DoubleMatrix + +import scala.collection.mutable.ArrayBuffer + +/** + * Class used to solve an optimization problem using Gradient Descent. + * @param gradient Gradient function to be used. + * @param updater Updater to be used to update weights after every iteration. + */ +class GradientDescent(var gradient: Gradient, var updater: Updater) + extends Optimizer with Logging +{ + private var stepSize: Double = 1.0 + private var numIterations: Int = 100 + private var regParam: Double = 0.0 + private var miniBatchFraction: Double = 1.0 + + /** + * Set the step size per-iteration of SGD. Default 1.0. + */ + def setStepSize(step: Double): this.type = { + this.stepSize = step + this + } + + /** + * Set fraction of data to be used for each SGD iteration. Default 1.0. + */ + def setMiniBatchFraction(fraction: Double): this.type = { + this.miniBatchFraction = fraction + this + } + + /** + * Set the number of iterations for SGD. Default 100. + */ + def setNumIterations(iters: Int): this.type = { + this.numIterations = iters + this + } + + /** + * Set the regularization parameter used for SGD. Default 0.0. + */ + def setRegParam(regParam: Double): this.type = { + this.regParam = regParam + this + } + + /** + * Set the gradient function to be used for SGD. + */ + def setGradient(gradient: Gradient): this.type = { + this.gradient = gradient + this + } + + + /** + * Set the updater function to be used for SGD. + */ + def setUpdater(updater: Updater): this.type = { + this.updater = updater + this + } + + def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double]) + : Array[Double] = { + + val (weights, stochasticLossHistory) = GradientDescent.runMiniBatchSGD( + data, + gradient, + updater, + stepSize, + numIterations, + regParam, + miniBatchFraction, + initialWeights) + weights + } + +} + +// Top-level method to run gradient descent. +object GradientDescent extends Logging { + /** + * Run gradient descent in parallel using mini batches. + * + * @param data - Input data for SGD. RDD of form (label, [feature values]). + * @param gradient - Gradient object that will be used to compute the gradient. + * @param updater - Updater object that will be used to update the model. + * @param stepSize - stepSize to be used during update. + * @param numIterations - number of iterations that SGD should be run. + * @param regParam - regularization parameter + * @param miniBatchFraction - fraction of the input data set that should be used for + * one iteration of SGD. Default value 1.0. + * + * @return A tuple containing two elements. The first element is a column matrix containing + * weights for every feature, and the second element is an array containing the stochastic + * loss computed for every iteration. + */ + def runMiniBatchSGD( + data: RDD[(Double, Array[Double])], + gradient: Gradient, + updater: Updater, + stepSize: Double, + numIterations: Int, + regParam: Double, + miniBatchFraction: Double, + initialWeights: Array[Double]) : (Array[Double], Array[Double]) = { + + val stochasticLossHistory = new ArrayBuffer[Double](numIterations) + + val nexamples: Long = data.count() + val miniBatchSize = nexamples * miniBatchFraction + + // Initialize weights as a column vector + var weights = new DoubleMatrix(initialWeights.length, 1, initialWeights:_*) + var regVal = 0.0 + + for (i <- 1 to numIterations) { + val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42+i).map { + case (y, features) => + val featuresCol = new DoubleMatrix(features.length, 1, features:_*) + val (grad, loss) = gradient.compute(featuresCol, y, weights) + (grad, loss) + }.reduce((a, b) => (a._1.addi(b._1), a._2 + b._2)) + + /** + * NOTE(Xinghao): lossSum is computed using the weights from the previous iteration + * and regVal is the regularization value computed in the previous iteration as well. + */ + stochasticLossHistory.append(lossSum / miniBatchSize + regVal) + val update = updater.compute( + weights, gradientSum.div(miniBatchSize), stepSize, i, regParam) + weights = update._1 + regVal = update._2 + } + + logInfo("GradientDescent finished. Last 10 stochastic losses %s".format( + stochasticLossHistory.takeRight(10).mkString(", "))) + + (weights.toArray, stochasticLossHistory.toArray) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala new file mode 100644 index 0000000000..94d30b56f2 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.optimization + +import org.apache.spark.rdd.RDD + +trait Optimizer { + + /** + * Solve the provided convex optimization problem. + */ + def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double]): Array[Double] + +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala new file mode 100644 index 0000000000..4c51f4f881 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.optimization + +import scala.math._ +import org.jblas.DoubleMatrix + +/** + * Class used to update weights used in Gradient Descent. + */ +abstract class Updater extends Serializable { + /** + * Compute an updated value for weights given the gradient, stepSize, iteration number and + * regularization parameter. Also returns the regularization value computed using the + * *updated* weights. + * + * @param weightsOld - Column matrix of size nx1 where n is the number of features. + * @param gradient - Column matrix of size nx1 where n is the number of features. + * @param stepSize - step size across iterations + * @param iter - Iteration number + * @param regParam - Regularization parameter + * + * @return A tuple of 2 elements. The first element is a column matrix containing updated weights, + * and the second element is the regularization value computed using updated weights. + */ + def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix, stepSize: Double, iter: Int, + regParam: Double): (DoubleMatrix, Double) +} + +/** + * A simple updater that adaptively adjusts the learning rate the + * square root of the number of iterations. Does not perform any regularization. + */ +class SimpleUpdater extends Updater { + override def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix, + stepSize: Double, iter: Int, regParam: Double): (DoubleMatrix, Double) = { + val thisIterStepSize = stepSize / math.sqrt(iter) + val normGradient = gradient.mul(thisIterStepSize) + (weightsOld.sub(normGradient), 0) + } +} + +/** + * Updater that adjusts learning rate and performs L1 regularization. + * + * The corresponding proximal operator used is the soft-thresholding function. + * That is, each weight component is shrunk towards 0 by shrinkageVal. + * + * If w > shrinkageVal, set weight component to w-shrinkageVal. + * If w < -shrinkageVal, set weight component to w+shrinkageVal. + * If -shrinkageVal < w < shrinkageVal, set weight component to 0. + * + * Equivalently, set weight component to signum(w) * max(0.0, abs(w) - shrinkageVal) + */ +class L1Updater extends Updater { + override def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix, + stepSize: Double, iter: Int, regParam: Double): (DoubleMatrix, Double) = { + val thisIterStepSize = stepSize / math.sqrt(iter) + val normGradient = gradient.mul(thisIterStepSize) + // Take gradient step + val newWeights = weightsOld.sub(normGradient) + // Soft thresholding + val shrinkageVal = regParam * thisIterStepSize + (0 until newWeights.length).foreach { i => + val wi = newWeights.get(i) + newWeights.put(i, signum(wi) * max(0.0, abs(wi) - shrinkageVal)) + } + (newWeights, newWeights.norm1 * regParam) + } +} + +/** + * Updater that adjusts the learning rate and performs L2 regularization + */ +class SquaredL2Updater extends Updater { + override def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix, + stepSize: Double, iter: Int, regParam: Double): (DoubleMatrix, Double) = { + val thisIterStepSize = stepSize / math.sqrt(iter) + val normGradient = gradient.mul(thisIterStepSize) + val newWeights = weightsOld.sub(normGradient).div(2.0 * thisIterStepSize * regParam + 1.0) + (newWeights, pow(newWeights.norm2, 2.0) * regParam) + } +} + diff --git a/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 4c18cbdc6b..be002d02bc 100644 --- a/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -1,13 +1,31 @@ -package spark.mllib.recommendation +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.recommendation import scala.collection.mutable.{ArrayBuffer, BitSet} import scala.util.Random import scala.util.Sorting -import spark.{HashPartitioner, Partitioner, SparkContext, RDD} -import spark.storage.StorageLevel -import spark.KryoRegistrator -import spark.SparkContext._ +import org.apache.spark.{HashPartitioner, Partitioner, SparkContext} +import org.apache.spark.storage.StorageLevel +import org.apache.spark.rdd.RDD +import org.apache.spark.serializer.KryoRegistrator +import org.apache.spark.SparkContext._ import com.esotericsoftware.kryo.Kryo import org.jblas.{DoubleMatrix, SimpleBlas, Solve} @@ -18,8 +36,7 @@ import org.jblas.{DoubleMatrix, SimpleBlas, Solve} * of the elements within this block, and the list of destination blocks that each user or * product will need to send its feature vector to. */ -private[recommendation] case class OutLinkBlock( - elementIds: Array[Int], shouldSend: Array[BitSet]) +private[recommendation] case class OutLinkBlock(elementIds: Array[Int], shouldSend: Array[BitSet]) /** @@ -39,8 +56,7 @@ private[recommendation] case class InLinkBlock( /** * A more compact class to represent a rating than Tuple3[Int, Int, Double]. */ -private[recommendation] case class Rating(user: Int, product: Int, rating: Double) - +case class Rating(val user: Int, val product: Int, val rating: Double) /** * Alternating Least Squares matrix factorization. @@ -88,10 +104,10 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l } /** - * Run ALS with the configured parmeters on an input RDD of (user, product, rating) triples. + * Run ALS with the configured parameters on an input RDD of (user, product, rating) triples. * Returns a MatrixFactorizationModel with feature vectors for each user and product. */ - def train(ratings: RDD[(Int, Int, Double)]): MatrixFactorizationModel = { + def run(ratings: RDD[Rating]): MatrixFactorizationModel = { val numBlocks = if (this.numBlocks == -1) { math.max(ratings.context.defaultParallelism, ratings.partitions.size / 2) } else { @@ -100,16 +116,36 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l val partitioner = new HashPartitioner(numBlocks) - val ratingsByUserBlock = ratings.map{ case (u, p, r) => (u % numBlocks, Rating(u, p, r)) } - val ratingsByProductBlock = ratings.map{ case (u, p, r) => (p % numBlocks, Rating(p, u, r)) } + val ratingsByUserBlock = ratings.map{ rating => (rating.user % numBlocks, rating) } + val ratingsByProductBlock = ratings.map{ rating => + (rating.product % numBlocks, Rating(rating.product, rating.user, rating.rating)) + } val (userInLinks, userOutLinks) = makeLinkRDDs(numBlocks, ratingsByUserBlock) val (productInLinks, productOutLinks) = makeLinkRDDs(numBlocks, ratingsByProductBlock) - // Initialize user and product factors randomly - val seed = new Random().nextInt() - var users = userOutLinks.mapValues(_.elementIds.map(u => randomFactor(rank, seed ^ u))) - var products = productOutLinks.mapValues(_.elementIds.map(p => randomFactor(rank, seed ^ ~p))) + // Initialize user and product factors randomly, but use a deterministic seed for each partition + // so that fault recovery works + val seedGen = new Random() + val seed1 = seedGen.nextInt() + val seed2 = seedGen.nextInt() + // Hash an integer to propagate random bits at all positions, similar to java.util.HashTable + def hash(x: Int): Int = { + val r = x ^ (x >>> 20) ^ (x >>> 12) + r ^ (r >>> 7) ^ (r >>> 4) + } + var users = userOutLinks.mapPartitionsWithIndex { (index, itr) => + val rand = new Random(hash(seed1 ^ index)) + itr.map { case (x, y) => + (x, y.elementIds.map(_ => randomFactor(rank, rand))) + } + } + var products = productOutLinks.mapPartitionsWithIndex { (index, itr) => + val rand = new Random(hash(seed2 ^ index)) + itr.map { case (x, y) => + (x, y.elementIds.map(_ => randomFactor(rank, rand))) + } + } for (iter <- 0 until iterations) { // perform ALS update @@ -196,11 +232,9 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l } /** - * Make a random factor vector with the given seed. - * TODO: Initialize things using mapPartitionsWithIndex to make it faster? + * Make a random factor vector with the given random. */ - private def randomFactor(rank: Int, seed: Int): Array[Double] = { - val rand = new Random(seed) + private def randomFactor(rank: Int, rand: Random): Array[Double] = { Array.fill(rank)(rand.nextDouble) } @@ -340,14 +374,14 @@ object ALS { * @param blocks level of parallelism to split computation into */ def train( - ratings: RDD[(Int, Int, Double)], + ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double, blocks: Int) : MatrixFactorizationModel = { - new ALS(blocks, rank, iterations, lambda).train(ratings) + new ALS(blocks, rank, iterations, lambda).run(ratings) } /** @@ -362,7 +396,7 @@ object ALS { * @param iterations number of iterations of ALS (recommended: 10-20) * @param lambda regularization factor (recommended: 0.01) */ - def train(ratings: RDD[(Int, Int, Double)], rank: Int, iterations: Int, lambda: Double) + def train(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double) : MatrixFactorizationModel = { train(ratings, rank, iterations, lambda, -1) @@ -379,7 +413,7 @@ object ALS { * @param rank number of features to use * @param iterations number of iterations of ALS (recommended: 10-20) */ - def train(ratings: RDD[(Int, Int, Double)], rank: Int, iterations: Int) + def train(ratings: RDD[Rating], rank: Int, iterations: Int) : MatrixFactorizationModel = { train(ratings, rank, iterations, 0.01, -1) @@ -399,14 +433,15 @@ object ALS { val (master, ratingsFile, rank, iters, outputDir) = (args(0), args(1), args(2).toInt, args(3).toInt, args(4)) val blocks = if (args.length == 6) args(5).toInt else -1 - System.setProperty("spark.serializer", "spark.KryoSerializer") + System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") System.setProperty("spark.kryo.registrator", classOf[ALSRegistrator].getName) System.setProperty("spark.kryo.referenceTracking", "false") + System.setProperty("spark.kryoserializer.buffer.mb", "8") System.setProperty("spark.locality.wait", "10000") val sc = new SparkContext(master, "ALS") val ratings = sc.textFile(ratingsFile).map { line => val fields = line.split(',') - (fields(0).toInt, fields(1).toInt, fields(2).toDouble) + Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble) } val model = ALS.train(ratings, rank, iters, 0.01, blocks) model.userFeatures.map{ case (id, vec) => id + "," + vec.mkString(" ") } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala new file mode 100644 index 0000000000..af43d89c70 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.recommendation + +import org.apache.spark.rdd.RDD +import org.apache.spark.SparkContext._ + +import org.jblas._ + +/** + * Model representing the result of matrix factorization. + * + * @param rank Rank for the features in this model. + * @param userFeatures RDD of tuples where each tuple represents the userId and + * the features computed for this user. + * @param productFeatures RDD of tuples where each tuple represents the productId + * and the features computed for this product. + */ +class MatrixFactorizationModel( + val rank: Int, + val userFeatures: RDD[(Int, Array[Double])], + val productFeatures: RDD[(Int, Array[Double])]) + extends Serializable +{ + /** Predict the rating of one user for one product. */ + def predict(user: Int, product: Int): Double = { + val userVector = new DoubleMatrix(userFeatures.lookup(user).head) + val productVector = new DoubleMatrix(productFeatures.lookup(product).head) + userVector.dot(productVector) + } + + // TODO: Figure out what good bulk prediction methods would look like. + // Probably want a way to get the top users for a product or vice-versa. +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala new file mode 100644 index 0000000000..f98b0b536d --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.regression + +import org.apache.spark.{Logging, SparkException} +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.optimization._ + +import org.jblas.DoubleMatrix + +/** + * GeneralizedLinearModel (GLM) represents a model trained using + * GeneralizedLinearAlgorithm. GLMs consist of a weight vector and + * an intercept. + * + * @param weights Weights computed for every feature. + * @param intercept Intercept computed for this model. + */ +abstract class GeneralizedLinearModel(val weights: Array[Double], val intercept: Double) + extends Serializable { + + // Create a column vector that can be used for predictions + private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*) + + /** + * Predict the result given a data point and the weights learned. + * + * @param dataMatrix Row vector containing the features for this data point + * @param weightMatrix Column vector containing the weights of the model + * @param intercept Intercept of the model. + */ + def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix, + intercept: Double): Double + + /** + * Predict values for the given data set using the model trained. + * + * @param testData RDD representing data points to be predicted + * @return RDD[Double] where each entry contains the corresponding prediction + */ + def predict(testData: RDD[Array[Double]]): RDD[Double] = { + // A small optimization to avoid serializing the entire model. Only the weightsMatrix + // and intercept is needed. + val localWeights = weightsMatrix + val localIntercept = intercept + + testData.map { x => + val dataMatrix = new DoubleMatrix(1, x.length, x:_*) + predictPoint(dataMatrix, localWeights, localIntercept) + } + } + + /** + * Predict values for a single data point using the model trained. + * + * @param testData array representing a single data point + * @return Double prediction from the trained model + */ + def predict(testData: Array[Double]): Double = { + val dataMat = new DoubleMatrix(1, testData.length, testData:_*) + predictPoint(dataMat, weightsMatrix, intercept) + } +} + +/** + * GeneralizedLinearAlgorithm implements methods to train a Genearalized Linear Model (GLM). + * This class should be extended with an Optimizer to create a new GLM. + */ +abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel] + extends Logging with Serializable { + + protected val validators: Seq[RDD[LabeledPoint] => Boolean] = List() + + val optimizer: Optimizer + + protected var addIntercept: Boolean = true + + protected var validateData: Boolean = true + + /** + * Create a model given the weights and intercept + */ + protected def createModel(weights: Array[Double], intercept: Double): M + + /** + * Set if the algorithm should add an intercept. Default true. + */ + def setIntercept(addIntercept: Boolean): this.type = { + this.addIntercept = addIntercept + this + } + + /** + * Set if the algorithm should validate data before training. Default true. + */ + def setValidateData(validateData: Boolean): this.type = { + this.validateData = validateData + this + } + + /** + * Run the algorithm with the configured parameters on an input + * RDD of LabeledPoint entries. + */ + def run(input: RDD[LabeledPoint]) : M = { + val nfeatures: Int = input.first().features.length + val initialWeights = Array.fill(nfeatures)(1.0) + run(input, initialWeights) + } + + /** + * Run the algorithm with the configured parameters on an input RDD + * of LabeledPoint entries starting from the initial weights provided. + */ + def run(input: RDD[LabeledPoint], initialWeights: Array[Double]) : M = { + + // Check the data properties before running the optimizer + if (validateData && !validators.forall(func => func(input))) { + throw new SparkException("Input validation failed.") + } + + // Add a extra variable consisting of all 1.0's for the intercept. + val data = if (addIntercept) { + input.map(labeledPoint => (labeledPoint.label, Array(1.0, labeledPoint.features:_*))) + } else { + input.map(labeledPoint => (labeledPoint.label, labeledPoint.features)) + } + + val initialWeightsWithIntercept = if (addIntercept) { + Array(1.0, initialWeights:_*) + } else { + initialWeights + } + + val weights = optimizer.optimize(data, initialWeightsWithIntercept) + val intercept = weights(0) + val weightsScaled = weights.tail + + val model = createModel(weightsScaled, intercept) + + logInfo("Final model weights " + model.weights.mkString(",")) + logInfo("Final model intercept " + model.intercept) + model + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala new file mode 100644 index 0000000000..63240e24dc --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.regression + +/** + * Class that represents the features and labels of a data point. + * + * @param label Label for this data point. + * @param features List of features for this data point. + */ +case class LabeledPoint(val label: Double, val features: Array[Double]) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala new file mode 100644 index 0000000000..d959695325 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.regression + +import org.apache.spark.{Logging, SparkContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.optimization._ +import org.apache.spark.mllib.util.MLUtils + +import org.jblas.DoubleMatrix + +/** + * Regression model trained using Lasso. + * + * @param weights Weights computed for every feature. + * @param intercept Intercept computed for this model. + */ +class LassoModel( + override val weights: Array[Double], + override val intercept: Double) + extends GeneralizedLinearModel(weights, intercept) + with RegressionModel with Serializable { + + override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix, + intercept: Double) = { + dataMatrix.dot(weightMatrix) + intercept + } +} + +/** + * Train a regression model with L1-regularization using Stochastic Gradient Descent. + */ +class LassoWithSGD private ( + var stepSize: Double, + var numIterations: Int, + var regParam: Double, + var miniBatchFraction: Double) + extends GeneralizedLinearAlgorithm[LassoModel] + with Serializable { + + val gradient = new SquaredGradient() + val updater = new L1Updater() + @transient val optimizer = new GradientDescent(gradient, updater).setStepSize(stepSize) + .setNumIterations(numIterations) + .setRegParam(regParam) + .setMiniBatchFraction(miniBatchFraction) + + // We don't want to penalize the intercept, so set this to false. + setIntercept(false) + + var yMean = 0.0 + var xColMean: DoubleMatrix = _ + var xColSd: DoubleMatrix = _ + + /** + * Construct a Lasso object with default parameters + */ + def this() = this(1.0, 100, 1.0, 1.0) + + def createModel(weights: Array[Double], intercept: Double) = { + val weightsMat = new DoubleMatrix(weights.length + 1, 1, (Array(intercept) ++ weights):_*) + val weightsScaled = weightsMat.div(xColSd) + val interceptScaled = yMean - (weightsMat.transpose().mmul(xColMean.div(xColSd)).get(0)) + + new LassoModel(weightsScaled.data, interceptScaled) + } + + override def run( + input: RDD[LabeledPoint], + initialWeights: Array[Double]) + : LassoModel = + { + val nfeatures: Int = input.first.features.length + val nexamples: Long = input.count() + + // To avoid penalizing the intercept, we center and scale the data. + val stats = MLUtils.computeStats(input, nfeatures, nexamples) + yMean = stats._1 + xColMean = stats._2 + xColSd = stats._3 + + val normalizedData = input.map { point => + val yNormalized = point.label - yMean + val featuresMat = new DoubleMatrix(nfeatures, 1, point.features:_*) + val featuresNormalized = featuresMat.sub(xColMean).divi(xColSd) + LabeledPoint(yNormalized, featuresNormalized.toArray) + } + + super.run(normalizedData, initialWeights) + } +} + +/** + * Top-level methods for calling Lasso. + */ +object LassoWithSGD { + + /** + * Train a Lasso model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using the specified step size. Each iteration uses + * `miniBatchFraction` fraction of the data to calculate the gradient. The weights used in + * gradient descent are initialized using the initial weights provided. + * + * @param input RDD of (label, array of features) pairs. + * @param numIterations Number of iterations of gradient descent to run. + * @param stepSize Step size to be used for each iteration of gradient descent. + * @param regParam Regularization parameter. + * @param miniBatchFraction Fraction of data to be used per iteration. + * @param initialWeights Initial set of weights to be used. Array should be equal in size to + * the number of features in the data. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double, + regParam: Double, + miniBatchFraction: Double, + initialWeights: Array[Double]) + : LassoModel = + { + new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input, + initialWeights) + } + + /** + * Train a Lasso model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using the specified step size. Each iteration uses + * `miniBatchFraction` fraction of the data to calculate the gradient. + * + * @param input RDD of (label, array of features) pairs. + * @param numIterations Number of iterations of gradient descent to run. + * @param stepSize Step size to be used for each iteration of gradient descent. + * @param regParam Regularization parameter. + * @param miniBatchFraction Fraction of data to be used per iteration. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double, + regParam: Double, + miniBatchFraction: Double) + : LassoModel = + { + new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input) + } + + /** + * Train a Lasso model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using the specified step size. We use the entire data set to + * update the gradient in each iteration. + * + * @param input RDD of (label, array of features) pairs. + * @param stepSize Step size to be used for each iteration of Gradient Descent. + * @param regParam Regularization parameter. + * @param numIterations Number of iterations of gradient descent to run. + * @return a LassoModel which has the weights and offset from training. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double, + regParam: Double) + : LassoModel = + { + train(input, numIterations, stepSize, regParam, 1.0) + } + + /** + * Train a Lasso model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using a step size of 1.0. We use the entire data set to + * update the gradient in each iteration. + * + * @param input RDD of (label, array of features) pairs. + * @param numIterations Number of iterations of gradient descent to run. + * @return a LassoModel which has the weights and offset from training. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int) + : LassoModel = + { + train(input, numIterations, 1.0, 1.0, 1.0) + } + + def main(args: Array[String]) { + if (args.length != 5) { + println("Usage: Lasso <master> <input_dir> <step_size> <regularization_parameter> <niters>") + System.exit(1) + } + val sc = new SparkContext(args(0), "Lasso") + val data = MLUtils.loadLabeledData(sc, args(1)) + val model = LassoWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) + + sc.stop() + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala new file mode 100644 index 0000000000..ae95ea24fc --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.regression + +import org.apache.spark.{Logging, SparkContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.optimization._ +import org.apache.spark.mllib.util.MLUtils + +import org.jblas.DoubleMatrix + +/** + * Regression model trained using LinearRegression. + * + * @param weights Weights computed for every feature. + * @param intercept Intercept computed for this model. + */ +class LinearRegressionModel( + override val weights: Array[Double], + override val intercept: Double) + extends GeneralizedLinearModel(weights, intercept) + with RegressionModel with Serializable { + + override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix, + intercept: Double) = { + dataMatrix.dot(weightMatrix) + intercept + } +} + +/** + * Train a regression model with no regularization using Stochastic Gradient Descent. + */ +class LinearRegressionWithSGD private ( + var stepSize: Double, + var numIterations: Int, + var miniBatchFraction: Double) + extends GeneralizedLinearAlgorithm[LinearRegressionModel] + with Serializable { + + val gradient = new SquaredGradient() + val updater = new SimpleUpdater() + val optimizer = new GradientDescent(gradient, updater).setStepSize(stepSize) + .setNumIterations(numIterations) + .setMiniBatchFraction(miniBatchFraction) + + /** + * Construct a LinearRegression object with default parameters + */ + def this() = this(1.0, 100, 1.0) + + def createModel(weights: Array[Double], intercept: Double) = { + new LinearRegressionModel(weights, intercept) + } +} + +/** + * Top-level methods for calling LinearRegression. + */ +object LinearRegressionWithSGD { + + /** + * Train a Linear Regression model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using the specified step size. Each iteration uses + * `miniBatchFraction` fraction of the data to calculate the gradient. The weights used in + * gradient descent are initialized using the initial weights provided. + * + * @param input RDD of (label, array of features) pairs. + * @param numIterations Number of iterations of gradient descent to run. + * @param stepSize Step size to be used for each iteration of gradient descent. + * @param miniBatchFraction Fraction of data to be used per iteration. + * @param initialWeights Initial set of weights to be used. Array should be equal in size to + * the number of features in the data. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double, + miniBatchFraction: Double, + initialWeights: Array[Double]) + : LinearRegressionModel = + { + new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction).run(input, + initialWeights) + } + + /** + * Train a LinearRegression model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using the specified step size. Each iteration uses + * `miniBatchFraction` fraction of the data to calculate the gradient. + * + * @param input RDD of (label, array of features) pairs. + * @param numIterations Number of iterations of gradient descent to run. + * @param stepSize Step size to be used for each iteration of gradient descent. + * @param miniBatchFraction Fraction of data to be used per iteration. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double, + miniBatchFraction: Double) + : LinearRegressionModel = + { + new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction).run(input) + } + + /** + * Train a LinearRegression model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using the specified step size. We use the entire data set to + * update the gradient in each iteration. + * + * @param input RDD of (label, array of features) pairs. + * @param stepSize Step size to be used for each iteration of Gradient Descent. + * @param numIterations Number of iterations of gradient descent to run. + * @return a LinearRegressionModel which has the weights and offset from training. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double) + : LinearRegressionModel = + { + train(input, numIterations, stepSize, 1.0) + } + + /** + * Train a LinearRegression model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using a step size of 1.0. We use the entire data set to + * update the gradient in each iteration. + * + * @param input RDD of (label, array of features) pairs. + * @param numIterations Number of iterations of gradient descent to run. + * @return a LinearRegressionModel which has the weights and offset from training. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int) + : LinearRegressionModel = + { + train(input, numIterations, 1.0, 1.0) + } + + def main(args: Array[String]) { + if (args.length != 5) { + println("Usage: LinearRegression <master> <input_dir> <step_size> <niters>") + System.exit(1) + } + val sc = new SparkContext(args(0), "LinearRegression") + val data = MLUtils.loadLabeledData(sc, args(1)) + val model = LinearRegressionWithSGD.train(data, args(3).toInt, args(2).toDouble) + + sc.stop() + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/RegressionModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/RegressionModel.scala new file mode 100644 index 0000000000..423afc32d6 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/RegressionModel.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.regression + +import org.apache.spark.rdd.RDD + +trait RegressionModel extends Serializable { + /** + * Predict values for the given data set using the model trained. + * + * @param testData RDD representing data points to be predicted + * @return RDD[Double] where each entry contains the corresponding prediction + */ + def predict(testData: RDD[Array[Double]]): RDD[Double] + + /** + * Predict values for a single data point using the model trained. + * + * @param testData array representing a single data point + * @return Double prediction from the trained model + */ + def predict(testData: Array[Double]): Double +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala new file mode 100644 index 0000000000..b29508d2b9 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.regression + +import org.apache.spark.{Logging, SparkContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.optimization._ +import org.apache.spark.mllib.util.MLUtils + +import org.jblas.DoubleMatrix + +/** + * Regression model trained using RidgeRegression. + * + * @param weights Weights computed for every feature. + * @param intercept Intercept computed for this model. + */ +class RidgeRegressionModel( + override val weights: Array[Double], + override val intercept: Double) + extends GeneralizedLinearModel(weights, intercept) + with RegressionModel with Serializable { + + override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix, + intercept: Double) = { + dataMatrix.dot(weightMatrix) + intercept + } +} + +/** + * Train a regression model with L2-regularization using Stochastic Gradient Descent. + */ +class RidgeRegressionWithSGD private ( + var stepSize: Double, + var numIterations: Int, + var regParam: Double, + var miniBatchFraction: Double) + extends GeneralizedLinearAlgorithm[RidgeRegressionModel] + with Serializable { + + val gradient = new SquaredGradient() + val updater = new SquaredL2Updater() + + @transient val optimizer = new GradientDescent(gradient, updater).setStepSize(stepSize) + .setNumIterations(numIterations) + .setRegParam(regParam) + .setMiniBatchFraction(miniBatchFraction) + + // We don't want to penalize the intercept in RidgeRegression, so set this to false. + setIntercept(false) + + var yMean = 0.0 + var xColMean: DoubleMatrix = _ + var xColSd: DoubleMatrix = _ + + /** + * Construct a RidgeRegression object with default parameters + */ + def this() = this(1.0, 100, 1.0, 1.0) + + def createModel(weights: Array[Double], intercept: Double) = { + val weightsMat = new DoubleMatrix(weights.length + 1, 1, (Array(intercept) ++ weights):_*) + val weightsScaled = weightsMat.div(xColSd) + val interceptScaled = yMean - (weightsMat.transpose().mmul(xColMean.div(xColSd)).get(0)) + + new RidgeRegressionModel(weightsScaled.data, interceptScaled) + } + + override def run( + input: RDD[LabeledPoint], + initialWeights: Array[Double]) + : RidgeRegressionModel = + { + val nfeatures: Int = input.first.features.length + val nexamples: Long = input.count() + + // To avoid penalizing the intercept, we center and scale the data. + val stats = MLUtils.computeStats(input, nfeatures, nexamples) + yMean = stats._1 + xColMean = stats._2 + xColSd = stats._3 + + val normalizedData = input.map { point => + val yNormalized = point.label - yMean + val featuresMat = new DoubleMatrix(nfeatures, 1, point.features:_*) + val featuresNormalized = featuresMat.sub(xColMean).divi(xColSd) + LabeledPoint(yNormalized, featuresNormalized.toArray) + } + + super.run(normalizedData, initialWeights) + } +} + +/** + * Top-level methods for calling RidgeRegression. + */ +object RidgeRegressionWithSGD { + + /** + * Train a RidgeRegression model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using the specified step size. Each iteration uses + * `miniBatchFraction` fraction of the data to calculate the gradient. The weights used in + * gradient descent are initialized using the initial weights provided. + * + * @param input RDD of (label, array of features) pairs. + * @param numIterations Number of iterations of gradient descent to run. + * @param stepSize Step size to be used for each iteration of gradient descent. + * @param regParam Regularization parameter. + * @param miniBatchFraction Fraction of data to be used per iteration. + * @param initialWeights Initial set of weights to be used. Array should be equal in size to + * the number of features in the data. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double, + regParam: Double, + miniBatchFraction: Double, + initialWeights: Array[Double]) + : RidgeRegressionModel = + { + new RidgeRegressionWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run( + input, initialWeights) + } + + /** + * Train a RidgeRegression model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using the specified step size. Each iteration uses + * `miniBatchFraction` fraction of the data to calculate the gradient. + * + * @param input RDD of (label, array of features) pairs. + * @param numIterations Number of iterations of gradient descent to run. + * @param stepSize Step size to be used for each iteration of gradient descent. + * @param regParam Regularization parameter. + * @param miniBatchFraction Fraction of data to be used per iteration. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double, + regParam: Double, + miniBatchFraction: Double) + : RidgeRegressionModel = + { + new RidgeRegressionWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input) + } + + /** + * Train a RidgeRegression model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using the specified step size. We use the entire data set to + * update the gradient in each iteration. + * + * @param input RDD of (label, array of features) pairs. + * @param stepSize Step size to be used for each iteration of Gradient Descent. + * @param regParam Regularization parameter. + * @param numIterations Number of iterations of gradient descent to run. + * @return a RidgeRegressionModel which has the weights and offset from training. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double, + regParam: Double) + : RidgeRegressionModel = + { + train(input, numIterations, stepSize, regParam, 1.0) + } + + /** + * Train a RidgeRegression model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using a step size of 1.0. We use the entire data set to + * update the gradient in each iteration. + * + * @param input RDD of (label, array of features) pairs. + * @param numIterations Number of iterations of gradient descent to run. + * @return a RidgeRegressionModel which has the weights and offset from training. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int) + : RidgeRegressionModel = + { + train(input, numIterations, 1.0, 1.0, 1.0) + } + + def main(args: Array[String]) { + if (args.length != 5) { + println("Usage: RidgeRegression <master> <input_dir> <step_size> <regularization_parameter>" + + " <niters>") + System.exit(1) + } + val sc = new SparkContext(args(0), "RidgeRegression") + val data = MLUtils.loadLabeledData(sc, args(1)) + val model = RidgeRegressionWithSGD.train(data, args(4).toInt, args(2).toDouble, + args(3).toDouble) + + sc.stop() + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/DataValidators.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/DataValidators.scala new file mode 100644 index 0000000000..8b55bce7c4 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/DataValidators.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.util + +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.regression.LabeledPoint + +/** + * A collection of methods used to validate data before applying ML algorithms. + */ +object DataValidators extends Logging { + + /** + * Function to check if labels used for classification are either zero or one. + * + * @param data - input data set that needs to be checked + * + * @return True if labels are all zero or one, false otherwise. + */ + val classificationLabels: RDD[LabeledPoint] => Boolean = { data => + val numInvalid = data.filter(x => x.label != 1.0 && x.label != 0.0).count() + if (numInvalid != 0) { + logError("Classification labels should be 0 or 1. Found " + numInvalid + " invalid labels") + } + numInvalid == 0 + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/KMeansDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/KMeansDataGenerator.scala new file mode 100644 index 0000000000..9109189dff --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/KMeansDataGenerator.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.util + +import scala.util.Random + +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD + +/** + * Generate test data for KMeans. This class first chooses k cluster centers + * from a d-dimensional Gaussian distribution scaled by factor r and then creates a Gaussian + * cluster with scale 1 around each center. + */ + +object KMeansDataGenerator { + + /** + * Generate an RDD containing test data for KMeans. + * + * @param sc SparkContext to use for creating the RDD + * @param numPoints Number of points that will be contained in the RDD + * @param k Number of clusters + * @param d Number of dimensions + * @param r Scaling factor for the distribution of the initial centers + * @param numPartitions Number of partitions of the generated RDD; default 2 + */ + def generateKMeansRDD( + sc: SparkContext, + numPoints: Int, + k: Int, + d: Int, + r: Double, + numPartitions: Int = 2) + : RDD[Array[Double]] = + { + // First, generate some centers + val rand = new Random(42) + val centers = Array.fill(k)(Array.fill(d)(rand.nextGaussian() * r)) + // Then generate points around each center + sc.parallelize(0 until numPoints, numPartitions).map { idx => + val center = centers(idx % k) + val rand2 = new Random(42 + idx) + Array.tabulate(d)(i => center(i) + rand2.nextGaussian()) + } + } + + def main(args: Array[String]) { + if (args.length < 6) { + println("Usage: KMeansGenerator " + + "<master> <output_dir> <num_points> <k> <d> <r> [<num_partitions>]") + System.exit(1) + } + + val sparkMaster = args(0) + val outputPath = args(1) + val numPoints = args(2).toInt + val k = args(3).toInt + val d = args(4).toInt + val r = args(5).toDouble + val parts = if (args.length >= 7) args(6).toInt else 2 + + val sc = new SparkContext(sparkMaster, "KMeansDataGenerator") + val data = generateKMeansRDD(sc, numPoints, k, d, r, parts) + data.map(_.mkString(" ")).saveAsTextFile(outputPath) + + System.exit(0) + } +} + diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala new file mode 100644 index 0000000000..bc5045fb05 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.util + +import scala.collection.JavaConversions._ +import scala.util.Random + +import org.jblas.DoubleMatrix + +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.mllib.regression.LabeledPoint + +/** + * Generate sample data used for Linear Data. This class generates + * uniformly random values for every feature and adds Gaussian noise with mean `eps` to the + * response variable `Y`. + */ +object LinearDataGenerator { + + /** + * Return a Java List of synthetic data randomly generated according to a multi + * collinear model. + * @param intercept Data intercept + * @param weights Weights to be applied. + * @param nPoints Number of points in sample. + * @param seed Random seed + * @return Java List of input. + */ + def generateLinearInputAsList( + intercept: Double, + weights: Array[Double], + nPoints: Int, + seed: Int, + eps: Double): java.util.List[LabeledPoint] = { + seqAsJavaList(generateLinearInput(intercept, weights, nPoints, seed, eps)) + } + + /** + * + * @param intercept Data intercept + * @param weights Weights to be applied. + * @param nPoints Number of points in sample. + * @param seed Random seed + * @param eps Epsilon scaling factor. + * @return + */ + def generateLinearInput( + intercept: Double, + weights: Array[Double], + nPoints: Int, + seed: Int, + eps: Double = 0.1): Seq[LabeledPoint] = { + + val rnd = new Random(seed) + val weightsMat = new DoubleMatrix(1, weights.length, weights:_*) + val x = Array.fill[Array[Double]](nPoints)( + Array.fill[Double](weights.length)(2 * rnd.nextDouble - 1.0)) + val y = x.map { xi => + (new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) + intercept + eps * rnd.nextGaussian() + } + y.zip(x).map(p => LabeledPoint(p._1, p._2)) + } + + /** + * Generate an RDD containing sample data for Linear Regression models - including Ridge, Lasso, + * and uregularized variants. + * + * @param sc SparkContext to be used for generating the RDD. + * @param nexamples Number of examples that will be contained in the RDD. + * @param nfeatures Number of features to generate for each example. + * @param eps Epsilon factor by which examples are scaled. + * @param weights Weights associated with the first weights.length features. + * @param nparts Number of partitions in the RDD. Default value is 2. + * + * @return RDD of LabeledPoint containing sample data. + */ + def generateLinearRDD( + sc: SparkContext, + nexamples: Int, + nfeatures: Int, + eps: Double, + nparts: Int = 2, + intercept: Double = 0.0) : RDD[LabeledPoint] = { + org.jblas.util.Random.seed(42) + // Random values distributed uniformly in [-0.5, 0.5] + val w = DoubleMatrix.rand(nfeatures, 1).subi(0.5) + + val data: RDD[LabeledPoint] = sc.parallelize(0 until nparts, nparts).flatMap { p => + val seed = 42 + p + val examplesInPartition = nexamples / nparts + generateLinearInput(intercept, w.toArray, examplesInPartition, seed, eps) + } + data + } + + def main(args: Array[String]) { + if (args.length < 2) { + println("Usage: LinearDataGenerator " + + "<master> <output_dir> [num_examples] [num_features] [num_partitions]") + System.exit(1) + } + + val sparkMaster: String = args(0) + val outputPath: String = args(1) + val nexamples: Int = if (args.length > 2) args(2).toInt else 1000 + val nfeatures: Int = if (args.length > 3) args(3).toInt else 100 + val parts: Int = if (args.length > 4) args(4).toInt else 2 + val eps = 10 + + val sc = new SparkContext(sparkMaster, "LinearDataGenerator") + val data = generateLinearRDD(sc, nexamples, nfeatures, eps, nparts = parts) + + MLUtils.saveLabeledData(data, outputPath) + sc.stop() + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala new file mode 100644 index 0000000000..52c4a71d62 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.util + +import scala.util.Random + +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.regression.LabeledPoint + +/** + * Generate test data for LogisticRegression. This class chooses positive labels + * with probability `probOne` and scales features for positive examples by `eps`. + */ + +object LogisticRegressionDataGenerator { + + /** + * Generate an RDD containing test data for LogisticRegression. + * + * @param sc SparkContext to use for creating the RDD. + * @param nexamples Number of examples that will be contained in the RDD. + * @param nfeatures Number of features to generate for each example. + * @param eps Epsilon factor by which positive examples are scaled. + * @param nparts Number of partitions of the generated RDD. Default value is 2. + * @param probOne Probability that a label is 1 (and not 0). Default value is 0.5. + */ + def generateLogisticRDD( + sc: SparkContext, + nexamples: Int, + nfeatures: Int, + eps: Double, + nparts: Int = 2, + probOne: Double = 0.5): RDD[LabeledPoint] = { + val data = sc.parallelize(0 until nexamples, nparts).map { idx => + val rnd = new Random(42 + idx) + + val y = if (idx % 2 == 0) 0.0 else 1.0 + val x = Array.fill[Double](nfeatures) { + rnd.nextGaussian() + (y * eps) + } + LabeledPoint(y, x) + } + data + } + + def main(args: Array[String]) { + if (args.length != 5) { + println("Usage: LogisticRegressionGenerator " + + "<master> <output_dir> <num_examples> <num_features> <num_partitions>") + System.exit(1) + } + + val sparkMaster: String = args(0) + val outputPath: String = args(1) + val nexamples: Int = if (args.length > 2) args(2).toInt else 1000 + val nfeatures: Int = if (args.length > 3) args(3).toInt else 2 + val parts: Int = if (args.length > 4) args(4).toInt else 2 + val eps = 3 + + val sc = new SparkContext(sparkMaster, "LogisticRegressionDataGenerator") + val data = generateLogisticRDD(sc, nexamples, nfeatures, eps, parts) + + MLUtils.saveLabeledData(data, outputPath) + sc.stop() + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala new file mode 100644 index 0000000000..d5f3f6b8db --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.recommendation + +import scala.util.Random + +import org.jblas.DoubleMatrix + +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.util.MLUtils + +/** +* Generate RDD(s) containing data for Matrix Factorization. +* +* This method samples training entries according to the oversampling factor +* 'trainSampFact', which is a multiplicative factor of the number of +* degrees of freedom of the matrix: rank*(m+n-rank). +* +* It optionally samples entries for a testing matrix using +* 'testSampFact', the percentage of the number of training entries +* to use for testing. +* +* This method takes the following inputs: +* sparkMaster (String) The master URL. +* outputPath (String) Directory to save output. +* m (Int) Number of rows in data matrix. +* n (Int) Number of columns in data matrix. +* rank (Int) Underlying rank of data matrix. +* trainSampFact (Double) Oversampling factor. +* noise (Boolean) Whether to add gaussian noise to training data. +* sigma (Double) Standard deviation of added gaussian noise. +* test (Boolean) Whether to create testing RDD. +* testSampFact (Double) Percentage of training data to use as test data. +*/ + +object MFDataGenerator{ + + def main(args: Array[String]) { + if (args.length < 2) { + println("Usage: MFDataGenerator " + + "<master> <outputDir> [m] [n] [rank] [trainSampFact] [noise] [sigma] [test] [testSampFact]") + System.exit(1) + } + + val sparkMaster: String = args(0) + val outputPath: String = args(1) + val m: Int = if (args.length > 2) args(2).toInt else 100 + val n: Int = if (args.length > 3) args(3).toInt else 100 + val rank: Int = if (args.length > 4) args(4).toInt else 10 + val trainSampFact: Double = if (args.length > 5) args(5).toDouble else 1.0 + val noise: Boolean = if (args.length > 6) args(6).toBoolean else false + val sigma: Double = if (args.length > 7) args(7).toDouble else 0.1 + val test: Boolean = if (args.length > 8) args(8).toBoolean else false + val testSampFact: Double = if (args.length > 9) args(9).toDouble else 0.1 + + val sc = new SparkContext(sparkMaster, "MFDataGenerator") + + val A = DoubleMatrix.randn(m, rank) + val B = DoubleMatrix.randn(rank, n) + val z = 1 / (scala.math.sqrt(scala.math.sqrt(rank))) + A.mmuli(z) + B.mmuli(z) + val fullData = A.mmul(B) + + val df = rank * (m + n - rank) + val sampSize = scala.math.min(scala.math.round(trainSampFact * df), + scala.math.round(.99 * m * n)).toInt + val rand = new Random() + val mn = m * n + val shuffled = rand.shuffle(1 to mn toList) + + val omega = shuffled.slice(0, sampSize) + val ordered = omega.sortWith(_ < _).toArray + val trainData: RDD[(Int, Int, Double)] = sc.parallelize(ordered) + .map(x => (fullData.indexRows(x - 1), fullData.indexColumns(x - 1), fullData.get(x - 1))) + + // optionally add gaussian noise + if (noise) { + trainData.map(x => (x._1, x._2, x._3 + rand.nextGaussian * sigma)) + } + + trainData.map(x => x._1 + "," + x._2 + "," + x._3).saveAsTextFile(outputPath) + + // optionally generate testing data + if (test) { + val testSampSize = scala.math + .min(scala.math.round(sampSize * testSampFact),scala.math.round(mn - sampSize)).toInt + val testOmega = shuffled.slice(sampSize, sampSize + testSampSize) + val testOrdered = testOmega.sortWith(_ < _).toArray + val testData: RDD[(Int, Int, Double)] = sc.parallelize(testOrdered) + .map(x => (fullData.indexRows(x - 1), fullData.indexColumns(x - 1), fullData.get(x - 1))) + testData.map(x => x._1 + "," + x._2 + "," + x._3).saveAsTextFile(outputPath) + } + + sc.stop() + + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala new file mode 100644 index 0000000000..d91b74c3ac --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.util + +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.SparkContext._ + +import org.jblas.DoubleMatrix +import org.apache.spark.mllib.regression.LabeledPoint + +/** + * Helper methods to load, save and pre-process data used in ML Lib. + */ +object MLUtils { + + /** + * Load labeled data from a file. The data format used here is + * <L>, <f1> <f2> ... + * where <f1>, <f2> are feature values in Double and <L> is the corresponding label as Double. + * + * @param sc SparkContext + * @param dir Directory to the input data files. + * @return An RDD of LabeledPoint. Each labeled point has two elements: the first element is + * the label, and the second element represents the feature values (an array of Double). + */ + def loadLabeledData(sc: SparkContext, dir: String): RDD[LabeledPoint] = { + sc.textFile(dir).map { line => + val parts = line.split(',') + val label = parts(0).toDouble + val features = parts(1).trim().split(' ').map(_.toDouble) + LabeledPoint(label, features) + } + } + + /** + * Save labeled data to a file. The data format used here is + * <L>, <f1> <f2> ... + * where <f1>, <f2> are feature values in Double and <L> is the corresponding label as Double. + * + * @param data An RDD of LabeledPoints containing data to be saved. + * @param dir Directory to save the data. + */ + def saveLabeledData(data: RDD[LabeledPoint], dir: String) { + val dataStr = data.map(x => x.label + "," + x.features.mkString(" ")) + dataStr.saveAsTextFile(dir) + } + + /** + * Utility function to compute mean and standard deviation on a given dataset. + * + * @param data - input data set whose statistics are computed + * @param nfeatures - number of features + * @param nexamples - number of examples in input dataset + * + * @return (yMean, xColMean, xColSd) - Tuple consisting of + * yMean - mean of the labels + * xColMean - Row vector with mean for every column (or feature) of the input data + * xColSd - Row vector standard deviation for every column (or feature) of the input data. + */ + def computeStats(data: RDD[LabeledPoint], nfeatures: Int, nexamples: Long): + (Double, DoubleMatrix, DoubleMatrix) = { + val yMean: Double = data.map { labeledPoint => labeledPoint.label }.reduce(_ + _) / nexamples + + // NOTE: We shuffle X by column here to compute column sum and sum of squares. + val xColSumSq: RDD[(Int, (Double, Double))] = data.flatMap { labeledPoint => + val nCols = labeledPoint.features.length + // Traverse over every column and emit (col, value, value^2) + Iterator.tabulate(nCols) { i => + (i, (labeledPoint.features(i), labeledPoint.features(i)*labeledPoint.features(i))) + } + }.reduceByKey { case(x1, x2) => + (x1._1 + x2._1, x1._2 + x2._2) + } + val xColSumsMap = xColSumSq.collectAsMap() + + val xColMean = DoubleMatrix.zeros(nfeatures, 1) + val xColSd = DoubleMatrix.zeros(nfeatures, 1) + + // Compute mean and unbiased variance using column sums + var col = 0 + while (col < nfeatures) { + xColMean.put(col, xColSumsMap(col)._1 / nexamples) + val variance = + (xColSumsMap(col)._2 - (math.pow(xColSumsMap(col)._1, 2) / nexamples)) / (nexamples) + xColSd.put(col, math.sqrt(variance)) + col += 1 + } + + (yMean, xColMean, xColSd) + } + + /** + * Return the squared Euclidean distance between two vectors. + */ + def squaredDistance(v1: Array[Double], v2: Array[Double]): Double = { + if (v1.length != v2.length) { + throw new IllegalArgumentException("Vector sizes don't match") + } + var i = 0 + var sum = 0.0 + while (i < v1.length) { + sum += (v1(i) - v2(i)) * (v1(i) - v2(i)) + i += 1 + } + sum + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala new file mode 100644 index 0000000000..07022093f3 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.util + +import scala.util.Random + +import org.jblas.DoubleMatrix + +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.regression.LabeledPoint + +/** + * Generate sample data used for SVM. This class generates uniform random values + * for the features and adds Gaussian noise with weight 0.1 to generate labels. + */ +object SVMDataGenerator { + + def main(args: Array[String]) { + if (args.length < 2) { + println("Usage: SVMGenerator " + + "<master> <output_dir> [num_examples] [num_features] [num_partitions]") + System.exit(1) + } + + val sparkMaster: String = args(0) + val outputPath: String = args(1) + val nexamples: Int = if (args.length > 2) args(2).toInt else 1000 + val nfeatures: Int = if (args.length > 3) args(3).toInt else 2 + val parts: Int = if (args.length > 4) args(4).toInt else 2 + + val sc = new SparkContext(sparkMaster, "SVMGenerator") + + val globalRnd = new Random(94720) + val trueWeights = new DoubleMatrix(1, nfeatures + 1, + Array.fill[Double](nfeatures + 1)(globalRnd.nextGaussian()):_*) + + val data: RDD[LabeledPoint] = sc.parallelize(0 until nexamples, parts).map { idx => + val rnd = new Random(42 + idx) + + val x = Array.fill[Double](nfeatures) { + rnd.nextDouble() * 2.0 - 1.0 + } + val yD = (new DoubleMatrix(1, x.length, x:_*)).dot(trueWeights) + rnd.nextGaussian() * 0.1 + val y = if (yD < 0) 0.0 else 1.0 + LabeledPoint(y, x) + } + + MLUtils.saveLabeledData(data, outputPath) + + sc.stop() + } +} diff --git a/mllib/src/main/scala/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/spark/mllib/clustering/KMeansModel.scala deleted file mode 100644 index 4fd0646160..0000000000 --- a/mllib/src/main/scala/spark/mllib/clustering/KMeansModel.scala +++ /dev/null @@ -1,27 +0,0 @@ -package spark.mllib.clustering - -import spark.RDD -import spark.SparkContext._ -import spark.mllib.util.MLUtils - - -/** - * A clustering model for K-means. Each point belongs to the cluster with the closest center. - */ -class KMeansModel(val clusterCenters: Array[Array[Double]]) extends Serializable { - /** Total number of clusters. */ - def k: Int = clusterCenters.length - - /** Return the cluster index that a given point belongs to. */ - def predict(point: Array[Double]): Int = { - KMeans.findClosest(clusterCenters, point)._1 - } - - /** - * Return the K-means cost (sum of squared distances of points to their nearest center) for this - * model on the given data. - */ - def computeCost(data: RDD[Array[Double]]): Double = { - data.map(p => KMeans.pointCost(clusterCenters, p)).sum - } -} diff --git a/mllib/src/main/scala/spark/mllib/optimization/Gradient.scala b/mllib/src/main/scala/spark/mllib/optimization/Gradient.scala deleted file mode 100644 index 90b0999a5e..0000000000 --- a/mllib/src/main/scala/spark/mllib/optimization/Gradient.scala +++ /dev/null @@ -1,33 +0,0 @@ -package spark.mllib.optimization - -import org.jblas.DoubleMatrix - -abstract class Gradient extends Serializable { - /** - * Compute the gradient for a given row of data. - * - * @param data - One row of data. Row matrix of size 1xn where n is the number of features. - * @param label - Label for this data item. - * @param weights - Column matrix containing weights for every feature. - */ - def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix): - (DoubleMatrix, Double) -} - -class LogisticGradient extends Gradient { - override def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix): - (DoubleMatrix, Double) = { - val margin: Double = -1.0 * data.dot(weights) - val gradientMultiplier = (1.0 / (1.0 + math.exp(margin))) - label - - val gradient = data.mul(gradientMultiplier) - val loss = - if (margin > 0) { - math.log(1 + math.exp(0 - margin)) - } else { - math.log(1 + math.exp(margin)) - margin - } - - (gradient, loss) - } -} diff --git a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala deleted file mode 100644 index eff853f379..0000000000 --- a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala +++ /dev/null @@ -1,62 +0,0 @@ -package spark.mllib.optimization - -import spark.{Logging, RDD, SparkContext} -import spark.SparkContext._ - -import org.jblas.DoubleMatrix - -import scala.collection.mutable.ArrayBuffer - - -object GradientDescent { - - /** - * Run gradient descent in parallel using mini batches. - * Based on Matlab code written by John Duchi. - * - * @param data - Input data for SGD. RDD of form (label, [feature values]). - * @param gradient - Gradient object that will be used to compute the gradient. - * @param updater - Updater object that will be used to update the model. - * @param stepSize - stepSize to be used during update. - * @param numIters - number of iterations that SGD should be run. - * @param miniBatchFraction - fraction of the input data set that should be used for - * one iteration of SGD. Default value 1.0. - * - * @return weights - Column matrix containing weights for every feature. - * @return lossHistory - Array containing the loss computed for every iteration. - */ - def runMiniBatchSGD( - data: RDD[(Double, Array[Double])], - gradient: Gradient, - updater: Updater, - stepSize: Double, - numIters: Int, - miniBatchFraction: Double=1.0) : (DoubleMatrix, Array[Double]) = { - - val lossHistory = new ArrayBuffer[Double](numIters) - - val nfeatures: Int = data.take(1)(0)._2.length - val nexamples: Long = data.count() - val miniBatchSize = nexamples * miniBatchFraction - - // Initialize weights as a column matrix - var weights = DoubleMatrix.ones(nfeatures) - var reg_val = 0.0 - - for (i <- 1 to numIters) { - val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42+i).map { - case (y, features) => - val featuresRow = new DoubleMatrix(features.length, 1, features:_*) - val (grad, loss) = gradient.compute(featuresRow, y, weights) - (grad, loss) - }.reduce((a, b) => (a._1.addi(b._1), a._2 + b._2)) - - lossHistory.append(lossSum / miniBatchSize + reg_val) - val update = updater.compute(weights, gradientSum.div(miniBatchSize), stepSize, i) - weights = update._1 - reg_val = update._2 - } - - (weights, lossHistory.toArray) - } -} diff --git a/mllib/src/main/scala/spark/mllib/optimization/Updater.scala b/mllib/src/main/scala/spark/mllib/optimization/Updater.scala deleted file mode 100644 index ea80bfcbfd..0000000000 --- a/mllib/src/main/scala/spark/mllib/optimization/Updater.scala +++ /dev/null @@ -1,27 +0,0 @@ -package spark.mllib.optimization - -import org.jblas.DoubleMatrix - -abstract class Updater extends Serializable { - /** - * Compute an updated value for weights given the gradient, stepSize and iteration number. - * - * @param weightsOld - Column matrix of size nx1 where n is the number of features. - * @param gradient - Column matrix of size nx1 where n is the number of features. - * @param stepSize - step size across iterations - * @param iter - Iteration number - * - * @return weightsNew - Column matrix containing updated weights - * @return reg_val - regularization value - */ - def compute(weightsOlds: DoubleMatrix, gradient: DoubleMatrix, stepSize: Double, iter: Int): - (DoubleMatrix, Double) -} - -class SimpleUpdater extends Updater { - override def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix, - stepSize: Double, iter: Int): (DoubleMatrix, Double) = { - val normGradient = gradient.mul(stepSize / math.sqrt(iter)) - (weightsOld.sub(normGradient), 0) - } -} diff --git a/mllib/src/main/scala/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/spark/mllib/recommendation/MatrixFactorizationModel.scala deleted file mode 100644 index fb812a6dbe..0000000000 --- a/mllib/src/main/scala/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ /dev/null @@ -1,23 +0,0 @@ -package spark.mllib.recommendation - -import spark.RDD -import spark.SparkContext._ - -import org.jblas._ - -class MatrixFactorizationModel( - val rank: Int, - val userFeatures: RDD[(Int, Array[Double])], - val productFeatures: RDD[(Int, Array[Double])]) - extends Serializable -{ - /** Predict the rating of one user for one product. */ - def predict(user: Int, product: Int): Double = { - val userVector = new DoubleMatrix(userFeatures.lookup(user).head) - val productVector = new DoubleMatrix(productFeatures.lookup(product).head) - userVector.dot(productVector) - } - - // TODO: Figure out what good bulk prediction methods would look like. - // Probably want a way to get the top users for a product or vice-versa. -} diff --git a/mllib/src/main/scala/spark/mllib/regression/LogisticRegression.scala b/mllib/src/main/scala/spark/mllib/regression/LogisticRegression.scala deleted file mode 100644 index e4db7bb9b7..0000000000 --- a/mllib/src/main/scala/spark/mllib/regression/LogisticRegression.scala +++ /dev/null @@ -1,158 +0,0 @@ -package spark.mllib.regression - -import spark.{Logging, RDD, SparkContext} -import spark.mllib.optimization._ -import spark.mllib.util.MLUtils - -import org.jblas.DoubleMatrix - -/** - * Logistic Regression using Stochastic Gradient Descent. - * Based on Matlab code written by John Duchi. - */ -class LogisticRegressionModel( - val weights: DoubleMatrix, - val intercept: Double, - val losses: Array[Double]) extends RegressionModel { - - override def predict(testData: spark.RDD[Array[Double]]) = { - testData.map { x => - val margin = new DoubleMatrix(1, x.length, x:_*).mmul(this.weights).get(0) + this.intercept - 1.0/ (1.0 + math.exp(margin * -1)) - } - } - - override def predict(testData: Array[Double]): Double = { - val dataMat = new DoubleMatrix(1, testData.length, testData:_*) - val margin = dataMat.mmul(this.weights).get(0) + this.intercept - 1.0/ (1.0 + math.exp(margin * -1)) - } -} - -class LogisticRegression private (var stepSize: Double, var miniBatchFraction: Double, - var numIters: Int) - extends Logging { - - /** - * Construct a LogisticRegression object with default parameters - */ - def this() = this(1.0, 1.0, 100) - - /** - * Set the step size per-iteration of SGD. Default 1.0. - */ - def setStepSize(step: Double) = { - this.stepSize = step - this - } - - /** - * Set fraction of data to be used for each SGD iteration. Default 1.0. - */ - def setMiniBatchFraction(fraction: Double) = { - this.miniBatchFraction = fraction - this - } - - /** - * Set the number of iterations for SGD. Default 100. - */ - def setNumIterations(iters: Int) = { - this.numIters = iters - this - } - - def train(input: RDD[(Double, Array[Double])]): LogisticRegressionModel = { - // Add a extra variable consisting of all 1.0's for the intercept. - val data = input.map { case (y, features) => - (y, Array(1.0, features:_*)) - } - - val (weights, losses) = GradientDescent.runMiniBatchSGD( - data, new LogisticGradient(), new SimpleUpdater(), stepSize, numIters, miniBatchFraction) - - val weightsScaled = weights.getRange(1, weights.length) - val intercept = weights.get(0) - - val model = new LogisticRegressionModel(weightsScaled, intercept, losses) - - logInfo("Final model weights " + model.weights) - logInfo("Final model intercept " + model.intercept) - logInfo("Last 10 losses " + model.losses.takeRight(10).mkString(", ")) - model - } -} - -/** - * Top-level methods for calling Logistic Regression. - */ -object LogisticRegression { - - /** - * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed number - * of iterations of gradient descent using the specified step size. Each iteration uses - * `miniBatchFraction` fraction of the data to calculate the gradient. - * - * @param input RDD of (label, array of features) pairs. - * @param numIterations Number of iterations of gradient descent to run. - * @param stepSize Step size to be used for each iteration of gradient descent. - * @param miniBatchFraction Fraction of data to be used per iteration. - */ - def train( - input: RDD[(Double, Array[Double])], - numIterations: Int, - stepSize: Double, - miniBatchFraction: Double) - : LogisticRegressionModel = - { - new LogisticRegression(stepSize, miniBatchFraction, numIterations).train(input) - } - - /** - * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed number - * of iterations of gradient descent using the specified step size. We use the entire data set to update - * the gradient in each iteration. - * - * @param input RDD of (label, array of features) pairs. - * @param stepSize Step size to be used for each iteration of Gradient Descent. - * @param numIterations Number of iterations of gradient descent to run. - * @return a LogisticRegressionModel which has the weights and offset from training. - */ - def train( - input: RDD[(Double, Array[Double])], - numIterations: Int, - stepSize: Double) - : LogisticRegressionModel = - { - train(input, numIterations, stepSize, 1.0) - } - - /** - * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed number - * of iterations of gradient descent using a step size of 1.0. We use the entire data set to update - * the gradient in each iteration. - * - * @param input RDD of (label, array of features) pairs. - * @param numIterations Number of iterations of gradient descent to run. - * @return a LogisticRegressionModel which has the weights and offset from training. - */ - def train( - input: RDD[(Double, Array[Double])], - numIterations: Int) - : LogisticRegressionModel = - { - train(input, numIterations, 1.0, 1.0) - } - - def main(args: Array[String]) { - if (args.length != 4) { - println("Usage: LogisticRegression <master> <input_dir> <step_size> <niters>") - System.exit(1) - } - val sc = new SparkContext(args(0), "LogisticRegression") - val data = MLUtils.loadLabeledData(sc, args(1)) - val model = LogisticRegression.train(data, args(3).toInt, args(2).toDouble) - - sc.stop() - } -} diff --git a/mllib/src/main/scala/spark/mllib/regression/LogisticRegressionGenerator.scala b/mllib/src/main/scala/spark/mllib/regression/LogisticRegressionGenerator.scala deleted file mode 100644 index 6e7c023bac..0000000000 --- a/mllib/src/main/scala/spark/mllib/regression/LogisticRegressionGenerator.scala +++ /dev/null @@ -1,41 +0,0 @@ -package spark.mllib.regression - -import scala.util.Random - -import org.jblas.DoubleMatrix - -import spark.{RDD, SparkContext} -import spark.mllib.util.MLUtils - -object LogisticRegressionGenerator { - - def main(args: Array[String]) { - if (args.length != 5) { - println("Usage: LogisticRegressionGenerator " + - "<master> <output_dir> <num_examples> <num_features> <num_partitions>") - System.exit(1) - } - - val sparkMaster: String = args(0) - val outputPath: String = args(1) - val nexamples: Int = if (args.length > 2) args(2).toInt else 1000 - val nfeatures: Int = if (args.length > 3) args(3).toInt else 2 - val parts: Int = if (args.length > 4) args(4).toInt else 2 - val eps = 3 - - val sc = new SparkContext(sparkMaster, "LogisticRegressionGenerator") - - val data: RDD[(Double, Array[Double])] = sc.parallelize(0 until nexamples, parts).map { idx => - val rnd = new Random(42 + idx) - - val y = if (idx % 2 == 0) 0 else 1 - val x = Array.fill[Double](nfeatures) { - rnd.nextGaussian() + (y * eps) - } - (y, x) - } - - MLUtils.saveLabeledData(data, outputPath) - sc.stop() - } -} diff --git a/mllib/src/main/scala/spark/mllib/regression/Regression.scala b/mllib/src/main/scala/spark/mllib/regression/Regression.scala deleted file mode 100644 index f79974c191..0000000000 --- a/mllib/src/main/scala/spark/mllib/regression/Regression.scala +++ /dev/null @@ -1,21 +0,0 @@ -package spark.mllib.regression - -import spark.RDD - -trait RegressionModel { - /** - * Predict values for the given data set using the model trained. - * - * @param testData RDD representing data points to be predicted - * @return RDD[Double] where each entry contains the corresponding prediction - */ - def predict(testData: RDD[Array[Double]]): RDD[Double] - - /** - * Predict values for a single data point using the model trained. - * - * @param testData array representing a single data point - * @return Double prediction from the trained model - */ - def predict(testData: Array[Double]): Double -} diff --git a/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala b/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala deleted file mode 100644 index 5f813df402..0000000000 --- a/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala +++ /dev/null @@ -1,194 +0,0 @@ -package spark.mllib.regression - -import spark.{Logging, RDD, SparkContext} -import spark.mllib.util.MLUtils - -import org.jblas.DoubleMatrix -import org.jblas.Solve - -import scala.annotation.tailrec -import scala.collection.mutable - -/** - * Ridge Regression from Joseph Gonzalez's implementation in MLBase - */ -class RidgeRegressionModel( - val weights: DoubleMatrix, - val intercept: Double, - val lambdaOpt: Double, - val lambdas: Seq[(Double, Double, DoubleMatrix)]) - extends RegressionModel { - - override def predict(testData: RDD[Array[Double]]): RDD[Double] = { - testData.map { x => - (new DoubleMatrix(1, x.length, x:_*).mmul(this.weights)).get(0) + this.intercept - } - } - - override def predict(testData: Array[Double]): Double = { - (new DoubleMatrix(1, testData.length, testData:_*).mmul(this.weights)).get(0) + this.intercept - } -} - -class RidgeRegression private (var lambdaLow: Double, var lambdaHigh: Double) - extends Logging { - - def this() = this(0.0, 100.0) - - /** - * Set the lower bound on binary search for lambda's. Default is 0. - */ - def setLowLambda(low: Double) = { - this.lambdaLow = low - this - } - - /** - * Set the upper bound on binary search for lambda's. Default is 100.0. - */ - def setHighLambda(hi: Double) = { - this.lambdaHigh = hi - this - } - - def train(input: RDD[(Double, Array[Double])]): RidgeRegressionModel = { - val nfeatures: Int = input.take(1)(0)._2.length - val nexamples: Long = input.count() - - val (yMean, xColMean, xColSd) = MLUtils.computeStats(input, nfeatures, nexamples) - - val data = input.map { case(y, features) => - val yNormalized = y - yMean - val featuresMat = new DoubleMatrix(nfeatures, 1, features:_*) - val featuresNormalized = featuresMat.sub(xColMean).divi(xColSd) - (yNormalized, featuresNormalized.toArray) - } - - // Compute XtX - Size of XtX is nfeatures by nfeatures - val XtX: DoubleMatrix = data.map { case (y, features) => - val x = new DoubleMatrix(1, features.length, features:_*) - x.transpose().mmul(x) - }.reduce(_.addi(_)) - - // Compute Xt*y - Size of Xty is nfeatures by 1 - val Xty: DoubleMatrix = data.map { case (y, features) => - new DoubleMatrix(features.length, 1, features:_*).mul(y) - }.reduce(_.addi(_)) - - // Define a function to compute the leave one out cross validation error - // for a single example - def crossValidate(lambda: Double): (Double, Double, DoubleMatrix) = { - // Compute the MLE ridge regression parameter value - - // Ridge Regression parameter = inv(XtX + \lambda*I) * Xty - val XtXlambda = DoubleMatrix.eye(nfeatures).muli(lambda).addi(XtX) - val w = Solve.solveSymmetric(XtXlambda, Xty) - - val invXtX = Solve.solveSymmetric(XtXlambda, DoubleMatrix.eye(nfeatures)) - - // compute the generalized cross validation score - val cverror = data.map { - case (y, features) => - val x = new DoubleMatrix(features.length, 1, features:_*) - val yhat = w.transpose().mmul(x).get(0) - val H_ii = x.transpose().mmul(invXtX).mmul(x).get(0) - val residual = (y - yhat) / (1.0 - H_ii) - residual * residual - }.reduce(_ + _) / nexamples - - (lambda, cverror, w) - } - - // Binary search for the best assignment to lambda. - def binSearch(low: Double, high: Double): Seq[(Double, Double, DoubleMatrix)] = { - val buffer = mutable.ListBuffer.empty[(Double, Double, DoubleMatrix)] - - @tailrec - def loop(low: Double, high: Double): Seq[(Double, Double, DoubleMatrix)] = { - val mid = (high - low) / 2 + low - val lowValue = crossValidate((mid - low) / 2 + low) - val highValue = crossValidate((high - mid) / 2 + mid) - val (newLow, newHigh) = if (lowValue._2 < highValue._2) { - (low, mid + (high-low)/4) - } else { - (mid - (high-low)/4, high) - } - if (newHigh - newLow > 1.0E-7) { - buffer += lowValue += highValue - loop(newLow, newHigh) - } else { - buffer += lowValue += highValue - buffer.result() - } - } - - loop(low, high) - } - - // Actually compute the best lambda - val lambdas = binSearch(lambdaLow, lambdaHigh).sortBy(_._1) - - // Find the best parameter set by taking the lowest cverror. - val (lambdaOpt, cverror, weights) = lambdas.reduce((a, b) => if (a._2 < b._2) a else b) - - // Return the model which contains the solution - val weightsScaled = weights.div(xColSd) - val intercept = yMean - (weights.transpose().mmul(xColMean.div(xColSd)).get(0)) - val model = new RidgeRegressionModel(weightsScaled, intercept, lambdaOpt, lambdas) - - logInfo("RidgeRegression: optimal lambda " + model.lambdaOpt) - logInfo("RidgeRegression: optimal weights " + model.weights) - logInfo("RidgeRegression: optimal intercept " + model.intercept) - logInfo("RidgeRegression: cross-validation error " + cverror) - - model - } -} - -/** - * Top-level methods for calling Ridge Regression. - */ -object RidgeRegression { - - /** - * Train a ridge regression model given an RDD of (response, features) pairs. - * We use the closed form solution to compute the cross-validation score for - * a given lambda. The optimal lambda is computed by performing binary search - * between the provided bounds of lambda. - * - * @param input RDD of (response, array of features) pairs. - * @param lambdaLow lower bound used in binary search for lambda - * @param lambdaHigh upper bound used in binary search for lambda - */ - def train( - input: RDD[(Double, Array[Double])], - lambdaLow: Double, - lambdaHigh: Double) - : RidgeRegressionModel = - { - new RidgeRegression(lambdaLow, lambdaHigh).train(input) - } - - /** - * Train a ridge regression model given an RDD of (response, features) pairs. - * We use the closed form solution to compute the cross-validation score for - * a given lambda. The optimal lambda is computed by performing binary search - * between lambda values of 0 and 100. - * - * @param input RDD of (response, array of features) pairs. - */ - def train(input: RDD[(Double, Array[Double])]) : RidgeRegressionModel = { - train(input, 0.0, 100.0) - } - - def main(args: Array[String]) { - if (args.length != 2) { - println("Usage: RidgeRegression <master> <input_dir>") - System.exit(1) - } - val sc = new SparkContext(args(0), "RidgeRegression") - val data = MLUtils.loadLabeledData(sc, args(1)) - val model = RidgeRegression.train(data, 0, 1000) - sc.stop() - } -} diff --git a/mllib/src/main/scala/spark/mllib/regression/RidgeRegressionGenerator.scala b/mllib/src/main/scala/spark/mllib/regression/RidgeRegressionGenerator.scala deleted file mode 100644 index b83f505d8e..0000000000 --- a/mllib/src/main/scala/spark/mllib/regression/RidgeRegressionGenerator.scala +++ /dev/null @@ -1,55 +0,0 @@ -package spark.mllib.regression - -import scala.util.Random - -import org.jblas.DoubleMatrix - -import spark.{RDD, SparkContext} -import spark.mllib.util.MLUtils - - -object RidgeRegressionGenerator { - - def main(args: Array[String]) { - if (args.length != 5) { - println("Usage: RidgeRegressionGenerator " + - "<master> <output_dir> <num_examples> <num_features> <num_partitions>") - System.exit(1) - } - - val sparkMaster: String = args(0) - val outputPath: String = args(1) - val nexamples: Int = if (args.length > 2) args(2).toInt else 1000 - val nfeatures: Int = if (args.length > 3) args(3).toInt else 100 - val parts: Int = if (args.length > 4) args(4).toInt else 2 - val eps = 10 - - org.jblas.util.Random.seed(42) - val sc = new SparkContext(sparkMaster, "RidgeRegressionGenerator") - - // Random values distributed uniformly in [-0.5, 0.5] - val w = DoubleMatrix.rand(nfeatures, 1).subi(0.5) - w.put(0, 0, 10) - w.put(1, 0, 10) - - val data: RDD[(Double, Array[Double])] = sc.parallelize(0 until parts, parts).flatMap { p => - org.jblas.util.Random.seed(42 + p) - val examplesInPartition = nexamples / parts - - val X = DoubleMatrix.rand(examplesInPartition, nfeatures) - val y = X.mmul(w) - - val rnd = new Random(42 + p) - - val normalValues = Array.fill[Double](examplesInPartition)(rnd.nextGaussian() * eps) - val yObs = new DoubleMatrix(normalValues).addi(y) - - Iterator.tabulate(examplesInPartition) { i => - (yObs.get(i, 0), X.getRow(i).toArray) - } - } - - MLUtils.saveLabeledData(data, outputPath) - sc.stop() - } -} diff --git a/mllib/src/main/scala/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/spark/mllib/util/MLUtils.scala deleted file mode 100644 index 08a031dded..0000000000 --- a/mllib/src/main/scala/spark/mllib/util/MLUtils.scala +++ /dev/null @@ -1,95 +0,0 @@ -package spark.mllib.util - -import spark.{RDD, SparkContext} -import spark.SparkContext._ - -import org.jblas.DoubleMatrix - -/** - * Helper methods to load and save data - * Data format: - * <l>, <f1> <f2> ... - * where <f1>, <f2> are feature values in Double and <l> is the corresponding label as Double. - */ -object MLUtils { - - /** - * @param sc SparkContext - * @param dir Directory to the input data files. - * @return An RDD of tuples. For each tuple, the first element is the label, and the second - * element represents the feature values (an array of Double). - */ - def loadLabeledData(sc: SparkContext, dir: String): RDD[(Double, Array[Double])] = { - sc.textFile(dir).map { line => - val parts = line.split(",") - val label = parts(0).toDouble - val features = parts(1).trim().split(" ").map(_.toDouble) - (label, features) - } - } - - def saveLabeledData(data: RDD[(Double, Array[Double])], dir: String) { - val dataStr = data.map(x => x._1 + "," + x._2.mkString(" ")) - dataStr.saveAsTextFile(dir) - } - - /** - * Utility function to compute mean and standard deviation on a given dataset. - * - * @param data - input data set whose statistics are computed - * @param nfeatures - number of features - * @param nexamples - number of examples in input dataset - * - * @return (yMean, xColMean, xColSd) - Tuple consisting of - * yMean - mean of the labels - * xColMean - Row vector with mean for every column (or feature) of the input data - * xColSd - Row vector standard deviation for every column (or feature) of the input data. - */ - def computeStats(data: RDD[(Double, Array[Double])], nfeatures: Int, nexamples: Long): - (Double, DoubleMatrix, DoubleMatrix) = { - val yMean: Double = data.map { case (y, features) => y }.reduce(_ + _) / nexamples - - // NOTE: We shuffle X by column here to compute column sum and sum of squares. - val xColSumSq: RDD[(Int, (Double, Double))] = data.flatMap { case(y, features) => - val nCols = features.length - // Traverse over every column and emit (col, value, value^2) - Iterator.tabulate(nCols) { i => - (i, (features(i), features(i)*features(i))) - } - }.reduceByKey { case(x1, x2) => - (x1._1 + x2._1, x1._2 + x2._2) - } - val xColSumsMap = xColSumSq.collectAsMap() - - val xColMean = DoubleMatrix.zeros(nfeatures, 1) - val xColSd = DoubleMatrix.zeros(nfeatures, 1) - - // Compute mean and unbiased variance using column sums - var col = 0 - while (col < nfeatures) { - xColMean.put(col, xColSumsMap(col)._1 / nexamples) - val variance = - (xColSumsMap(col)._2 - (math.pow(xColSumsMap(col)._1, 2) / nexamples)) / (nexamples) - xColSd.put(col, math.sqrt(variance)) - col += 1 - } - - (yMean, xColMean, xColSd) - } - - /** - * Return the squared Euclidean distance between two vectors. - */ - def squaredDistance(v1: Array[Double], v2: Array[Double]): Double = { - if (v1.length != v2.length) { - throw new IllegalArgumentException("Vector sizes don't match") - } - var i = 0 - var sum = 0.0 - while (i < v1.length) { - sum += (v1(i) - v2(i)) * (v1(i) - v2(i)) - i += 1 - } - sum - } -} diff --git a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaLogisticRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaLogisticRegressionSuite.java new file mode 100644 index 0000000000..e18e3bc6a8 --- /dev/null +++ b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaLogisticRegressionSuite.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.classification; + +import java.io.Serializable; +import java.util.List; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; + +import org.apache.spark.mllib.regression.LabeledPoint; + +public class JavaLogisticRegressionSuite implements Serializable { + private transient JavaSparkContext sc; + + @Before + public void setUp() { + sc = new JavaSparkContext("local", "JavaLogisticRegressionSuite"); + } + + @After + public void tearDown() { + sc.stop(); + sc = null; + System.clearProperty("spark.driver.port"); + } + + int validatePrediction(List<LabeledPoint> validationData, LogisticRegressionModel model) { + int numAccurate = 0; + for (LabeledPoint point: validationData) { + Double prediction = model.predict(point.features()); + if (prediction == point.label()) { + numAccurate++; + } + } + return numAccurate; + } + + @Test + public void runLRUsingConstructor() { + int nPoints = 10000; + double A = 2.0; + double B = -1.5; + + JavaRDD<LabeledPoint> testRDD = sc.parallelize( + LogisticRegressionSuite.generateLogisticInputAsList(A, B, nPoints, 42), 2).cache(); + List<LabeledPoint> validationData = + LogisticRegressionSuite.generateLogisticInputAsList(A, B, nPoints, 17); + + LogisticRegressionWithSGD lrImpl = new LogisticRegressionWithSGD(); + lrImpl.optimizer().setStepSize(1.0) + .setRegParam(1.0) + .setNumIterations(100); + LogisticRegressionModel model = lrImpl.run(testRDD.rdd()); + + int numAccurate = validatePrediction(validationData, model); + Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); + } + + @Test + public void runLRUsingStaticMethods() { + int nPoints = 10000; + double A = 2.0; + double B = -1.5; + + JavaRDD<LabeledPoint> testRDD = sc.parallelize( + LogisticRegressionSuite.generateLogisticInputAsList(A, B, nPoints, 42), 2).cache(); + List<LabeledPoint> validationData = + LogisticRegressionSuite.generateLogisticInputAsList(A, B, nPoints, 17); + + LogisticRegressionModel model = LogisticRegressionWithSGD.train( + testRDD.rdd(), 100, 1.0, 1.0); + + int numAccurate = validatePrediction(validationData, model); + Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); + } + +} diff --git a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java new file mode 100644 index 0000000000..117e5eaa8b --- /dev/null +++ b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.classification; + + +import java.io.Serializable; +import java.util.List; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; + +import org.apache.spark.mllib.regression.LabeledPoint; + +public class JavaSVMSuite implements Serializable { + private transient JavaSparkContext sc; + + @Before + public void setUp() { + sc = new JavaSparkContext("local", "JavaSVMSuite"); + } + + @After + public void tearDown() { + sc.stop(); + sc = null; + System.clearProperty("spark.driver.port"); + } + + int validatePrediction(List<LabeledPoint> validationData, SVMModel model) { + int numAccurate = 0; + for (LabeledPoint point: validationData) { + Double prediction = model.predict(point.features()); + if (prediction == point.label()) { + numAccurate++; + } + } + return numAccurate; + } + + @Test + public void runSVMUsingConstructor() { + int nPoints = 10000; + double A = 2.0; + double[] weights = {-1.5, 1.0}; + + JavaRDD<LabeledPoint> testRDD = sc.parallelize(SVMSuite.generateSVMInputAsList(A, + weights, nPoints, 42), 2).cache(); + List<LabeledPoint> validationData = + SVMSuite.generateSVMInputAsList(A, weights, nPoints, 17); + + SVMWithSGD svmSGDImpl = new SVMWithSGD(); + svmSGDImpl.optimizer().setStepSize(1.0) + .setRegParam(1.0) + .setNumIterations(100); + SVMModel model = svmSGDImpl.run(testRDD.rdd()); + + int numAccurate = validatePrediction(validationData, model); + Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); + } + + @Test + public void runSVMUsingStaticMethods() { + int nPoints = 10000; + double A = 2.0; + double[] weights = {-1.5, 1.0}; + + JavaRDD<LabeledPoint> testRDD = sc.parallelize(SVMSuite.generateSVMInputAsList(A, + weights, nPoints, 42), 2).cache(); + List<LabeledPoint> validationData = + SVMSuite.generateSVMInputAsList(A, weights, nPoints, 17); + + SVMModel model = SVMWithSGD.train(testRDD.rdd(), 100, 1.0, 1.0, 1.0); + + int numAccurate = validatePrediction(validationData, model); + Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); + } + +} diff --git a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java new file mode 100644 index 0000000000..32d3934ac1 --- /dev/null +++ b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; + +public class JavaKMeansSuite implements Serializable { + private transient JavaSparkContext sc; + + @Before + public void setUp() { + sc = new JavaSparkContext("local", "JavaKMeans"); + } + + @After + public void tearDown() { + sc.stop(); + sc = null; + System.clearProperty("spark.driver.port"); + } + + // L1 distance between two points + double distance1(double[] v1, double[] v2) { + double distance = 0.0; + for (int i = 0; i < v1.length; ++i) { + distance = Math.max(distance, Math.abs(v1[i] - v2[i])); + } + return distance; + } + + // Assert that two sets of points are equal, within EPSILON tolerance + void assertSetsEqual(double[][] v1, double[][] v2) { + double EPSILON = 1e-4; + Assert.assertTrue(v1.length == v2.length); + for (int i = 0; i < v1.length; ++i) { + double minDistance = Double.MAX_VALUE; + for (int j = 0; j < v2.length; ++j) { + minDistance = Math.min(minDistance, distance1(v1[i], v2[j])); + } + Assert.assertTrue(minDistance <= EPSILON); + } + + for (int i = 0; i < v2.length; ++i) { + double minDistance = Double.MAX_VALUE; + for (int j = 0; j < v1.length; ++j) { + minDistance = Math.min(minDistance, distance1(v2[i], v1[j])); + } + Assert.assertTrue(minDistance <= EPSILON); + } + } + + + @Test + public void runKMeansUsingStaticMethods() { + List<double[]> points = new ArrayList(); + points.add(new double[]{1.0, 2.0, 6.0}); + points.add(new double[]{1.0, 3.0, 0.0}); + points.add(new double[]{1.0, 4.0, 6.0}); + + double[][] expectedCenter = { {1.0, 3.0, 4.0} }; + + JavaRDD<double[]> data = sc.parallelize(points, 2); + KMeansModel model = KMeans.train(data.rdd(), 1, 1); + assertSetsEqual(model.clusterCenters(), expectedCenter); + + model = KMeans.train(data.rdd(), 1, 1, 1, KMeans.RANDOM()); + assertSetsEqual(model.clusterCenters(), expectedCenter); + } + + @Test + public void runKMeansUsingConstructor() { + List<double[]> points = new ArrayList(); + points.add(new double[]{1.0, 2.0, 6.0}); + points.add(new double[]{1.0, 3.0, 0.0}); + points.add(new double[]{1.0, 4.0, 6.0}); + + double[][] expectedCenter = { {1.0, 3.0, 4.0} }; + + JavaRDD<double[]> data = sc.parallelize(points, 2); + KMeansModel model = new KMeans().setK(1).setMaxIterations(5).run(data.rdd()); + assertSetsEqual(model.clusterCenters(), expectedCenter); + + model = new KMeans().setK(1) + .setMaxIterations(1) + .setRuns(1) + .setInitializationMode(KMeans.RANDOM()) + .run(data.rdd()); + assertSetsEqual(model.clusterCenters(), expectedCenter); + } +} diff --git a/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java b/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java new file mode 100644 index 0000000000..3323f6cee2 --- /dev/null +++ b/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.recommendation; + +import java.io.Serializable; +import java.util.List; + +import scala.Tuple2; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; + +import org.jblas.DoubleMatrix; + +public class JavaALSSuite implements Serializable { + private transient JavaSparkContext sc; + + @Before + public void setUp() { + sc = new JavaSparkContext("local", "JavaALS"); + } + + @After + public void tearDown() { + sc.stop(); + sc = null; + System.clearProperty("spark.driver.port"); + } + + void validatePrediction(MatrixFactorizationModel model, int users, int products, int features, + DoubleMatrix trueRatings, double matchThreshold) { + DoubleMatrix predictedU = new DoubleMatrix(users, features); + List<scala.Tuple2<Object, double[]>> userFeatures = model.userFeatures().toJavaRDD().collect(); + for (int i = 0; i < features; ++i) { + for (scala.Tuple2<Object, double[]> userFeature : userFeatures) { + predictedU.put((Integer)userFeature._1(), i, userFeature._2()[i]); + } + } + DoubleMatrix predictedP = new DoubleMatrix(products, features); + + List<scala.Tuple2<Object, double[]>> productFeatures = + model.productFeatures().toJavaRDD().collect(); + for (int i = 0; i < features; ++i) { + for (scala.Tuple2<Object, double[]> productFeature : productFeatures) { + predictedP.put((Integer)productFeature._1(), i, productFeature._2()[i]); + } + } + + DoubleMatrix predictedRatings = predictedU.mmul(predictedP.transpose()); + + for (int u = 0; u < users; ++u) { + for (int p = 0; p < products; ++p) { + double prediction = predictedRatings.get(u, p); + double correct = trueRatings.get(u, p); + Assert.assertTrue(Math.abs(prediction - correct) < matchThreshold); + } + } + } + + @Test + public void runALSUsingStaticMethods() { + int features = 1; + int iterations = 15; + int users = 10; + int products = 10; + scala.Tuple2<List<Rating>, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList( + users, products, features, 0.7); + + JavaRDD<Rating> data = sc.parallelize(testData._1()); + MatrixFactorizationModel model = ALS.train(data.rdd(), features, iterations); + validatePrediction(model, users, products, features, testData._2(), 0.3); + } + + @Test + public void runALSUsingConstructor() { + int features = 2; + int iterations = 15; + int users = 20; + int products = 30; + scala.Tuple2<List<Rating>, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList( + users, products, features, 0.7); + + JavaRDD<Rating> data = sc.parallelize(testData._1()); + + MatrixFactorizationModel model = new ALS().setRank(features) + .setIterations(iterations) + .run(data.rdd()); + validatePrediction(model, users, products, features, testData._2(), 0.3); + } +} diff --git a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLassoSuite.java b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLassoSuite.java new file mode 100644 index 0000000000..f44b25cd44 --- /dev/null +++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLassoSuite.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.regression; + +import java.io.Serializable; +import java.util.List; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.mllib.util.LinearDataGenerator; + +public class JavaLassoSuite implements Serializable { + private transient JavaSparkContext sc; + + @Before + public void setUp() { + sc = new JavaSparkContext("local", "JavaLassoSuite"); + } + + @After + public void tearDown() { + sc.stop(); + sc = null; + System.clearProperty("spark.driver.port"); + } + + int validatePrediction(List<LabeledPoint> validationData, LassoModel model) { + int numAccurate = 0; + for (LabeledPoint point: validationData) { + Double prediction = model.predict(point.features()); + // A prediction is off if the prediction is more than 0.5 away from expected value. + if (Math.abs(prediction - point.label()) <= 0.5) { + numAccurate++; + } + } + return numAccurate; + } + + @Test + public void runLassoUsingConstructor() { + int nPoints = 10000; + double A = 2.0; + double[] weights = {-1.5, 1.0e-2}; + + JavaRDD<LabeledPoint> testRDD = sc.parallelize(LinearDataGenerator.generateLinearInputAsList(A, + weights, nPoints, 42, 0.1), 2).cache(); + List<LabeledPoint> validationData = + LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 17, 0.1); + + LassoWithSGD lassoSGDImpl = new LassoWithSGD(); + lassoSGDImpl.optimizer().setStepSize(1.0) + .setRegParam(0.01) + .setNumIterations(20); + LassoModel model = lassoSGDImpl.run(testRDD.rdd()); + + int numAccurate = validatePrediction(validationData, model); + Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); + } + + @Test + public void runLassoUsingStaticMethods() { + int nPoints = 10000; + double A = 2.0; + double[] weights = {-1.5, 1.0e-2}; + + JavaRDD<LabeledPoint> testRDD = sc.parallelize(LinearDataGenerator.generateLinearInputAsList(A, + weights, nPoints, 42, 0.1), 2).cache(); + List<LabeledPoint> validationData = + LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 17, 0.1); + + LassoModel model = LassoWithSGD.train(testRDD.rdd(), 100, 1.0, 0.01, 1.0); + + int numAccurate = validatePrediction(validationData, model); + Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); + } + +} diff --git a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLinearRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLinearRegressionSuite.java new file mode 100644 index 0000000000..5a4410a632 --- /dev/null +++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLinearRegressionSuite.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.regression; + +import java.io.Serializable; +import java.util.List; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.mllib.util.LinearDataGenerator; + +public class JavaLinearRegressionSuite implements Serializable { + private transient JavaSparkContext sc; + + @Before + public void setUp() { + sc = new JavaSparkContext("local", "JavaLinearRegressionSuite"); + } + + @After + public void tearDown() { + sc.stop(); + sc = null; + System.clearProperty("spark.driver.port"); + } + + int validatePrediction(List<LabeledPoint> validationData, LinearRegressionModel model) { + int numAccurate = 0; + for (LabeledPoint point: validationData) { + Double prediction = model.predict(point.features()); + // A prediction is off if the prediction is more than 0.5 away from expected value. + if (Math.abs(prediction - point.label()) <= 0.5) { + numAccurate++; + } + } + return numAccurate; + } + + @Test + public void runLinearRegressionUsingConstructor() { + int nPoints = 100; + double A = 3.0; + double[] weights = {10, 10}; + + JavaRDD<LabeledPoint> testRDD = sc.parallelize( + LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 42, 0.1), 2).cache(); + List<LabeledPoint> validationData = + LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 17, 0.1); + + LinearRegressionWithSGD linSGDImpl = new LinearRegressionWithSGD(); + LinearRegressionModel model = linSGDImpl.run(testRDD.rdd()); + + int numAccurate = validatePrediction(validationData, model); + Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); + } + + @Test + public void runLinearRegressionUsingStaticMethods() { + int nPoints = 100; + double A = 3.0; + double[] weights = {10, 10}; + + JavaRDD<LabeledPoint> testRDD = sc.parallelize( + LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 42, 0.1), 2).cache(); + List<LabeledPoint> validationData = + LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 17, 0.1); + + LinearRegressionModel model = LinearRegressionWithSGD.train(testRDD.rdd(), 100); + + int numAccurate = validatePrediction(validationData, model); + Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); + } + +} diff --git a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaRidgeRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaRidgeRegressionSuite.java new file mode 100644 index 0000000000..2fdd5fc8fd --- /dev/null +++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaRidgeRegressionSuite.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.regression; + +import java.io.Serializable; +import java.util.List; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.jblas.DoubleMatrix; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.mllib.util.LinearDataGenerator; + +public class JavaRidgeRegressionSuite implements Serializable { + private transient JavaSparkContext sc; + + @Before + public void setUp() { + sc = new JavaSparkContext("local", "JavaRidgeRegressionSuite"); + } + + @After + public void tearDown() { + sc.stop(); + sc = null; + System.clearProperty("spark.driver.port"); + } + + double predictionError(List<LabeledPoint> validationData, RidgeRegressionModel model) { + double errorSum = 0; + for (LabeledPoint point: validationData) { + Double prediction = model.predict(point.features()); + errorSum += (prediction - point.label()) * (prediction - point.label()); + } + return errorSum / validationData.size(); + } + + List<LabeledPoint> generateRidgeData(int numPoints, int nfeatures, double eps) { + org.jblas.util.Random.seed(42); + // Pick weights as random values distributed uniformly in [-0.5, 0.5] + DoubleMatrix w = DoubleMatrix.rand(nfeatures, 1).subi(0.5); + // Set first two weights to eps + w.put(0, 0, eps); + w.put(1, 0, eps); + return LinearDataGenerator.generateLinearInputAsList(0.0, w.data, numPoints, 42, eps); + } + + @Test + public void runRidgeRegressionUsingConstructor() { + int nexamples = 200; + int nfeatures = 20; + double eps = 10.0; + List<LabeledPoint> data = generateRidgeData(2*nexamples, nfeatures, eps); + + JavaRDD<LabeledPoint> testRDD = sc.parallelize(data.subList(0, nexamples)); + List<LabeledPoint> validationData = data.subList(nexamples, 2*nexamples); + + RidgeRegressionWithSGD ridgeSGDImpl = new RidgeRegressionWithSGD(); + ridgeSGDImpl.optimizer().setStepSize(1.0) + .setRegParam(0.0) + .setNumIterations(200); + RidgeRegressionModel model = ridgeSGDImpl.run(testRDD.rdd()); + double unRegularizedErr = predictionError(validationData, model); + + ridgeSGDImpl.optimizer().setRegParam(0.1); + model = ridgeSGDImpl.run(testRDD.rdd()); + double regularizedErr = predictionError(validationData, model); + + Assert.assertTrue(regularizedErr < unRegularizedErr); + } + + @Test + public void runRidgeRegressionUsingStaticMethods() { + int nexamples = 200; + int nfeatures = 20; + double eps = 10.0; + List<LabeledPoint> data = generateRidgeData(2*nexamples, nfeatures, eps); + + JavaRDD<LabeledPoint> testRDD = sc.parallelize(data.subList(0, nexamples)); + List<LabeledPoint> validationData = data.subList(nexamples, 2*nexamples); + + RidgeRegressionModel model = RidgeRegressionWithSGD.train(testRDD.rdd(), 200, 1.0, 0.0); + double unRegularizedErr = predictionError(validationData, model); + + model = RidgeRegressionWithSGD.train(testRDD.rdd(), 200, 1.0, 0.1); + double regularizedErr = predictionError(validationData, model); + + Assert.assertTrue(regularizedErr < unRegularizedErr); + } +} diff --git a/mllib/src/test/resources/log4j.properties b/mllib/src/test/resources/log4j.properties index 390c92763c..4265ba6e5d 100644 --- a/mllib/src/test/resources/log4j.properties +++ b/mllib/src/test/resources/log4j.properties @@ -1,8 +1,25 @@ -# Set everything to be logged to the file core/target/unit-tests.log +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file core/target/unit-tests.log log4j.rootCategory=INFO, file log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=false -log4j.appender.file.file=ml/target/unit-tests.log +log4j.appender.file.file=mllib/target/unit-tests.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala new file mode 100644 index 0000000000..34c67294e9 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.classification + +import scala.util.Random +import scala.collection.JavaConversions._ + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.FunSuite +import org.scalatest.matchers.ShouldMatchers + +import org.apache.spark.SparkContext +import org.apache.spark.mllib.regression._ + +object LogisticRegressionSuite { + + def generateLogisticInputAsList( + offset: Double, + scale: Double, + nPoints: Int, + seed: Int): java.util.List[LabeledPoint] = { + seqAsJavaList(generateLogisticInput(offset, scale, nPoints, seed)) + } + + // Generate input of the form Y = logistic(offset + scale*X) + def generateLogisticInput( + offset: Double, + scale: Double, + nPoints: Int, + seed: Int): Seq[LabeledPoint] = { + val rnd = new Random(seed) + val x1 = Array.fill[Double](nPoints)(rnd.nextGaussian()) + + // NOTE: if U is uniform[0, 1] then ln(u) - ln(1-u) is Logistic(0,1) + val unifRand = new scala.util.Random(45) + val rLogis = (0 until nPoints).map { i => + val u = unifRand.nextDouble() + math.log(u) - math.log(1.0-u) + } + + // y <- A + B*x + rLogis() + // y <- as.numeric(y > 0) + val y: Seq[Int] = (0 until nPoints).map { i => + val yVal = offset + scale * x1(i) + rLogis(i) + if (yVal > 0) 1 else 0 + } + + val testData = (0 until nPoints).map(i => LabeledPoint(y(i), Array(x1(i)))) + testData + } + +} + +class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll with ShouldMatchers { + @transient private var sc: SparkContext = _ + + override def beforeAll() { + sc = new SparkContext("local", "test") + } + + + override def afterAll() { + sc.stop() + System.clearProperty("spark.driver.port") + } + + def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) { + val numOffPredictions = predictions.zip(input).filter { case (prediction, expected) => + (prediction != expected.label) + }.size + // At least 83% of the predictions should be on. + ((input.length - numOffPredictions).toDouble / input.length) should be > 0.83 + } + + // Test if we can correctly learn A, B where Y = logistic(A + B*X) + test("logistic regression") { + val nPoints = 10000 + val A = 2.0 + val B = -1.5 + + val testData = LogisticRegressionSuite.generateLogisticInput(A, B, nPoints, 42) + + val testRDD = sc.parallelize(testData, 2) + testRDD.cache() + val lr = new LogisticRegressionWithSGD() + lr.optimizer.setStepSize(10.0).setNumIterations(20) + + val model = lr.run(testRDD) + + // Test the weights + val weight0 = model.weights(0) + assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]") + assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + " not in [1.9, 2.1]") + + val validationData = LogisticRegressionSuite.generateLogisticInput(A, B, nPoints, 17) + val validationRDD = sc.parallelize(validationData, 2) + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + } + + test("logistic regression with initial weights") { + val nPoints = 10000 + val A = 2.0 + val B = -1.5 + + val testData = LogisticRegressionSuite.generateLogisticInput(A, B, nPoints, 42) + + val initialB = -1.0 + val initialWeights = Array(initialB) + + val testRDD = sc.parallelize(testData, 2) + testRDD.cache() + + // Use half as many iterations as the previous test. + val lr = new LogisticRegressionWithSGD() + lr.optimizer.setStepSize(10.0).setNumIterations(10) + + val model = lr.run(testRDD, initialWeights) + + val weight0 = model.weights(0) + assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]") + assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + " not in [1.9, 2.1]") + + val validationData = LogisticRegressionSuite.generateLogisticInput(A, B, nPoints, 17) + val validationRDD = sc.parallelize(validationData, 2) + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala new file mode 100644 index 0000000000..6a957e3ddc --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.classification + +import scala.util.Random +import scala.math.signum +import scala.collection.JavaConversions._ + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.FunSuite + +import org.jblas.DoubleMatrix + +import org.apache.spark.{SparkException, SparkContext} +import org.apache.spark.mllib.regression._ + +object SVMSuite { + + def generateSVMInputAsList( + intercept: Double, + weights: Array[Double], + nPoints: Int, + seed: Int): java.util.List[LabeledPoint] = { + seqAsJavaList(generateSVMInput(intercept, weights, nPoints, seed)) + } + + // Generate noisy input of the form Y = signum(x.dot(weights) + intercept + noise) + def generateSVMInput( + intercept: Double, + weights: Array[Double], + nPoints: Int, + seed: Int): Seq[LabeledPoint] = { + val rnd = new Random(seed) + val weightsMat = new DoubleMatrix(1, weights.length, weights:_*) + val x = Array.fill[Array[Double]](nPoints)( + Array.fill[Double](weights.length)(rnd.nextDouble() * 2.0 - 1.0)) + val y = x.map { xi => + val yD = (new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) + + intercept + 0.01 * rnd.nextGaussian() + if (yD < 0) 0.0 else 1.0 + } + y.zip(x).map(p => LabeledPoint(p._1, p._2)) + } + +} + +class SVMSuite extends FunSuite with BeforeAndAfterAll { + @transient private var sc: SparkContext = _ + + override def beforeAll() { + sc = new SparkContext("local", "test") + } + + override def afterAll() { + sc.stop() + System.clearProperty("spark.driver.port") + } + + def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) { + val numOffPredictions = predictions.zip(input).filter { case (prediction, expected) => + (prediction != expected.label) + }.size + // At least 80% of the predictions should be on. + assert(numOffPredictions < input.length / 5) + } + + + test("SVM using local random SGD") { + val nPoints = 10000 + + // NOTE: Intercept should be small for generating equal 0s and 1s + val A = 0.01 + val B = -1.5 + val C = 1.0 + + val testData = SVMSuite.generateSVMInput(A, Array[Double](B,C), nPoints, 42) + + val testRDD = sc.parallelize(testData, 2) + testRDD.cache() + + val svm = new SVMWithSGD() + svm.optimizer.setStepSize(1.0).setRegParam(1.0).setNumIterations(100) + + val model = svm.run(testRDD) + + val validationData = SVMSuite.generateSVMInput(A, Array[Double](B,C), nPoints, 17) + val validationRDD = sc.parallelize(validationData, 2) + + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + } + + test("SVM local random SGD with initial weights") { + val nPoints = 10000 + + // NOTE: Intercept should be small for generating equal 0s and 1s + val A = 0.01 + val B = -1.5 + val C = 1.0 + + val testData = SVMSuite.generateSVMInput(A, Array[Double](B,C), nPoints, 42) + + val initialB = -1.0 + val initialC = -1.0 + val initialWeights = Array(initialB,initialC) + + val testRDD = sc.parallelize(testData, 2) + testRDD.cache() + + val svm = new SVMWithSGD() + svm.optimizer.setStepSize(1.0).setRegParam(1.0).setNumIterations(100) + + val model = svm.run(testRDD, initialWeights) + + val validationData = SVMSuite.generateSVMInput(A, Array[Double](B,C), nPoints, 17) + val validationRDD = sc.parallelize(validationData,2) + + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + } + + test("SVM with invalid labels") { + val nPoints = 10000 + + // NOTE: Intercept should be small for generating equal 0s and 1s + val A = 0.01 + val B = -1.5 + val C = 1.0 + + val testData = SVMSuite.generateSVMInput(A, Array[Double](B,C), nPoints, 42) + val testRDD = sc.parallelize(testData, 2) + + val testRDDInvalid = testRDD.map { lp => + if (lp.label == 0.0) { + LabeledPoint(-1.0, lp.features) + } else { + lp + } + } + + intercept[SparkException] { + val model = SVMWithSGD.train(testRDDInvalid, 100) + } + + // Turning off data validation should not throw an exception + val noValidationModel = new SVMWithSGD().setValidateData(false).run(testRDDInvalid) + } +} diff --git a/mllib/src/test/scala/spark/mllib/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala index cb096f39a9..94245f6027 100644 --- a/mllib/src/test/scala/spark/mllib/clustering/KMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala @@ -1,18 +1,38 @@ -package spark.mllib.clustering +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering import scala.util.Random import org.scalatest.BeforeAndAfterAll import org.scalatest.FunSuite -import spark.SparkContext -import spark.SparkContext._ +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ import org.jblas._ - class KMeansSuite extends FunSuite with BeforeAndAfterAll { - val sc = new SparkContext("local", "test") + @transient private var sc: SparkContext = _ + + override def beforeAll() { + sc = new SparkContext("local", "test") + } override def afterAll() { sc.stop() diff --git a/mllib/src/test/scala/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala index 2ada9ae76b..347ef238f4 100644 --- a/mllib/src/test/scala/spark/mllib/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala @@ -1,18 +1,76 @@ -package spark.mllib.recommendation - +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.recommendation + +import scala.collection.JavaConversions._ import scala.util.Random import org.scalatest.BeforeAndAfterAll import org.scalatest.FunSuite -import spark.SparkContext -import spark.SparkContext._ +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ import org.jblas._ +object ALSSuite { + + def generateRatingsAsJavaList( + users: Int, + products: Int, + features: Int, + samplingRate: Double): (java.util.List[Rating], DoubleMatrix) = { + val (sampledRatings, trueRatings) = generateRatings(users, products, features, samplingRate) + (seqAsJavaList(sampledRatings), trueRatings) + } + + def generateRatings( + users: Int, + products: Int, + features: Int, + samplingRate: Double): (Seq[Rating], DoubleMatrix) = { + val rand = new Random(42) + + // Create a random matrix with uniform values from -1 to 1 + def randomMatrix(m: Int, n: Int) = + new DoubleMatrix(m, n, Array.fill(m * n)(rand.nextDouble() * 2 - 1): _*) + + val userMatrix = randomMatrix(users, features) + val productMatrix = randomMatrix(features, products) + val trueRatings = userMatrix.mmul(productMatrix) + + val sampledRatings = { + for (u <- 0 until users; p <- 0 until products if rand.nextDouble() < samplingRate) + yield Rating(u, p, trueRatings.get(u, p)) + } + + (sampledRatings, trueRatings) + } + +} + class ALSSuite extends FunSuite with BeforeAndAfterAll { - val sc = new SparkContext("local", "test") + @transient private var sc: SparkContext = _ + + override def beforeAll() { + sc = new SparkContext("local", "test") + } override def afterAll() { sc.stop() @@ -40,21 +98,8 @@ class ALSSuite extends FunSuite with BeforeAndAfterAll { def testALS(users: Int, products: Int, features: Int, iterations: Int, samplingRate: Double, matchThreshold: Double) { - val rand = new Random(42) - - // Create a random matrix with uniform values from -1 to 1 - def randomMatrix(m: Int, n: Int) = - new DoubleMatrix(m, n, Array.fill(m * n)(rand.nextDouble() * 2 - 1): _*) - - val userMatrix = randomMatrix(users, features) - val productMatrix = randomMatrix(features, products) - val trueRatings = userMatrix.mmul(productMatrix) - - val sampledRatings = { - for (u <- 0 until users; p <- 0 until products if rand.nextDouble() < samplingRate) - yield (u, p, trueRatings.get(u, p)) - } - + val (sampledRatings, trueRatings) = ALSSuite.generateRatings(users, products, + features, samplingRate) val model = ALS.train(sc.parallelize(sampledRatings), features, iterations) val predictedU = new DoubleMatrix(users, features) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala new file mode 100644 index 0000000000..db980c7bae --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.regression + +import scala.collection.JavaConversions._ +import scala.util.Random + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext +import org.apache.spark.mllib.util.LinearDataGenerator + + +class LassoSuite extends FunSuite with BeforeAndAfterAll { + @transient private var sc: SparkContext = _ + + override def beforeAll() { + sc = new SparkContext("local", "test") + } + + + override def afterAll() { + sc.stop() + System.clearProperty("spark.driver.port") + } + + def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) { + val numOffPredictions = predictions.zip(input).filter { case (prediction, expected) => + // A prediction is off if the prediction is more than 0.5 away from expected value. + math.abs(prediction - expected.label) > 0.5 + }.size + // At least 80% of the predictions should be on. + assert(numOffPredictions < input.length / 5) + } + + test("Lasso local random SGD") { + val nPoints = 10000 + + val A = 2.0 + val B = -1.5 + val C = 1.0e-2 + + val testData = LinearDataGenerator.generateLinearInput(A, Array[Double](B,C), nPoints, 42) + + val testRDD = sc.parallelize(testData, 2) + testRDD.cache() + + val ls = new LassoWithSGD() + ls.optimizer.setStepSize(1.0).setRegParam(0.01).setNumIterations(20) + + val model = ls.run(testRDD) + + val weight0 = model.weights(0) + val weight1 = model.weights(1) + assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + " not in [1.9, 2.1]") + assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]") + assert(weight1 >= -1.0e-3 && weight1 <= 1.0e-3, weight1 + " not in [-0.001, 0.001]") + + val validationData = LinearDataGenerator.generateLinearInput(A, Array[Double](B,C), nPoints, 17) + val validationRDD = sc.parallelize(validationData, 2) + + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + } + + test("Lasso local random SGD with initial weights") { + val nPoints = 10000 + + val A = 2.0 + val B = -1.5 + val C = 1.0e-2 + + val testData = LinearDataGenerator.generateLinearInput(A, Array[Double](B,C), nPoints, 42) + + val initialB = -1.0 + val initialC = -1.0 + val initialWeights = Array(initialB,initialC) + + val testRDD = sc.parallelize(testData, 2) + testRDD.cache() + + val ls = new LassoWithSGD() + ls.optimizer.setStepSize(1.0).setRegParam(0.01).setNumIterations(20) + + val model = ls.run(testRDD, initialWeights) + + val weight0 = model.weights(0) + val weight1 = model.weights(1) + assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + " not in [1.9, 2.1]") + assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]") + assert(weight1 >= -1.0e-3 && weight1 <= 1.0e-3, weight1 + " not in [-0.001, 0.001]") + + val validationData = LinearDataGenerator.generateLinearInput(A, Array[Double](B,C), nPoints, 17) + val validationRDD = sc.parallelize(validationData,2) + + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala new file mode 100644 index 0000000000..ef500c704c --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.regression + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.util.LinearDataGenerator + +class LinearRegressionSuite extends FunSuite with BeforeAndAfterAll { + @transient private var sc: SparkContext = _ + + override def beforeAll() { + sc = new SparkContext("local", "test") + } + + override def afterAll() { + sc.stop() + System.clearProperty("spark.driver.port") + } + + def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) { + val numOffPredictions = predictions.zip(input).filter { case (prediction, expected) => + // A prediction is off if the prediction is more than 0.5 away from expected value. + math.abs(prediction - expected.label) > 0.5 + }.size + // At least 80% of the predictions should be on. + assert(numOffPredictions < input.length / 5) + } + + // Test if we can correctly learn Y = 3 + 10*X1 + 10*X2 + test("linear regression") { + val testRDD = sc.parallelize(LinearDataGenerator.generateLinearInput( + 3.0, Array(10.0, 10.0), 100, 42), 2).cache() + val linReg = new LinearRegressionWithSGD() + linReg.optimizer.setNumIterations(1000).setStepSize(1.0) + + val model = linReg.run(testRDD) + + assert(model.intercept >= 2.5 && model.intercept <= 3.5) + assert(model.weights.length === 2) + assert(model.weights(0) >= 9.0 && model.weights(0) <= 11.0) + assert(model.weights(1) >= 9.0 && model.weights(1) <= 11.0) + + val validationData = LinearDataGenerator.generateLinearInput( + 3.0, Array(10.0, 10.0), 100, 17) + val validationRDD = sc.parallelize(validationData, 2).cache() + + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala new file mode 100644 index 0000000000..c18092d804 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.regression + +import scala.collection.JavaConversions._ +import scala.util.Random + +import org.jblas.DoubleMatrix +import org.scalatest.BeforeAndAfterAll +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.util.LinearDataGenerator + +class RidgeRegressionSuite extends FunSuite with BeforeAndAfterAll { + @transient private var sc: SparkContext = _ + + override def beforeAll() { + sc = new SparkContext("local", "test") + } + + override def afterAll() { + sc.stop() + System.clearProperty("spark.driver.port") + } + + def predictionError(predictions: Seq[Double], input: Seq[LabeledPoint]) = { + predictions.zip(input).map { case (prediction, expected) => + (prediction - expected.label) * (prediction - expected.label) + }.reduceLeft(_ + _) / predictions.size + } + + test("regularization with skewed weights") { + val nexamples = 200 + val nfeatures = 20 + val eps = 10 + + org.jblas.util.Random.seed(42) + // Pick weights as random values distributed uniformly in [-0.5, 0.5] + val w = DoubleMatrix.rand(nfeatures, 1).subi(0.5) + // Set first two weights to eps + w.put(0, 0, eps) + w.put(1, 0, eps) + + // Use half of data for training and other half for validation + val data = LinearDataGenerator.generateLinearInput(3.0, w.toArray, 2*nexamples, 42, eps) + val testData = data.take(nexamples) + val validationData = data.takeRight(nexamples) + + val testRDD = sc.parallelize(testData, 2).cache() + val validationRDD = sc.parallelize(validationData, 2).cache() + + // First run without regularization. + val linearReg = new LinearRegressionWithSGD() + linearReg.optimizer.setNumIterations(200) + .setStepSize(1.0) + + val linearModel = linearReg.run(testRDD) + val linearErr = predictionError( + linearModel.predict(validationRDD.map(_.features)).collect(), validationData) + + val ridgeReg = new RidgeRegressionWithSGD() + ridgeReg.optimizer.setNumIterations(200) + .setRegParam(0.1) + .setStepSize(1.0) + val ridgeModel = ridgeReg.run(testRDD) + val ridgeErr = predictionError( + ridgeModel.predict(validationRDD.map(_.features)).collect(), validationData) + + // Ridge CV-error should be lower than linear regression + assert(ridgeErr < linearErr, + "ridgeError (" + ridgeErr + ") was not less than linearError(" + linearErr + ")") + } +} diff --git a/mllib/src/test/scala/spark/mllib/regression/LogisticRegressionSuite.scala b/mllib/src/test/scala/spark/mllib/regression/LogisticRegressionSuite.scala deleted file mode 100644 index 04d3400cb4..0000000000 --- a/mllib/src/test/scala/spark/mllib/regression/LogisticRegressionSuite.scala +++ /dev/null @@ -1,57 +0,0 @@ -package spark.mllib.regression - -import scala.util.Random - -import org.scalatest.BeforeAndAfterAll -import org.scalatest.FunSuite - -import spark.SparkContext -import spark.SparkContext._ - - -class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll { - val sc = new SparkContext("local", "test") - - override def afterAll() { - sc.stop() - System.clearProperty("spark.driver.port") - } - - // Test if we can correctly learn A, B where Y = logistic(A + B*X) - test("logistic regression") { - val nPoints = 10000 - val rnd = new Random(42) - - val x1 = Array.fill[Double](nPoints)(rnd.nextGaussian()) - - val A = 2.0 - val B = -1.5 - - // NOTE: if U is uniform[0, 1] then ln(u) - ln(1-u) is Logistic(0,1) - val unifRand = new scala.util.Random(45) - val rLogis = (0 until nPoints).map { i => - val u = unifRand.nextDouble() - math.log(u) - math.log(1.0-u) - } - - // y <- A + B*x + rlogis(100) - // y <- as.numeric(y > 0) - val y = (0 until nPoints).map { i => - val yVal = A + B * x1(i) + rLogis(i) - if (yVal > 0) 1.0 else 0.0 - } - - val testData = (0 until nPoints).map(i => (y(i).toDouble, Array(x1(i)))).toArray - - val testRDD = sc.parallelize(testData, 2) - testRDD.cache() - val lr = new LogisticRegression().setStepSize(10.0) - .setNumIterations(20) - - val model = lr.train(testRDD) - - val weight0 = model.weights.get(0) - assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]") - assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + " not in [1.9, 2.1]") - } -} diff --git a/mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala b/mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala deleted file mode 100644 index df41dbbdff..0000000000 --- a/mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala +++ /dev/null @@ -1,47 +0,0 @@ -package spark.mllib.regression - -import scala.util.Random - -import org.scalatest.BeforeAndAfterAll -import org.scalatest.FunSuite - -import spark.SparkContext -import spark.SparkContext._ - - -class RidgeRegressionSuite extends FunSuite with BeforeAndAfterAll { - val sc = new SparkContext("local", "test") - - override def afterAll() { - sc.stop() - System.clearProperty("spark.driver.port") - } - - // Test if we can correctly learn Y = 3 + X1 + X2 when - // X1 and X2 are collinear. - test("multi-collinear variables") { - val rnd = new Random(43) - val x1 = Array.fill[Double](20)(rnd.nextGaussian()) - - // Pick a mean close to mean of x1 - val rnd1 = new Random(42) //new NormalDistribution(0.1, 0.01) - val x2 = Array.fill[Double](20)(0.1 + rnd1.nextGaussian() * 0.01) - - val xMat = (0 until 20).map(i => Array(x1(i), x2(i))).toArray - - val y = xMat.map(i => 3 + i(0) + i(1)) - val testData = (0 until 20).map(i => (y(i), xMat(i))).toArray - - val testRDD = sc.parallelize(testData, 2) - testRDD.cache() - val ridgeReg = new RidgeRegression().setLowLambda(0) - .setHighLambda(10) - - val model = ridgeReg.train(testRDD) - - assert(model.intercept >= 2.9 && model.intercept <= 3.1) - assert(model.weights.length === 2) - assert(model.weights.get(0) >= 0.9 && model.weights.get(0) <= 1.1) - assert(model.weights.get(1) >= 0.9 && model.weights.get(1) <= 1.1) - } -} |