From 4935a2558b18965f3ec6bc3963b6ce95e9fa3ef3 Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Sun, 11 Aug 2013 19:02:43 -0700 Subject: Clean up scaladoc in ML Lib. Also build and copy ML Lib scaladoc in Spark docs build. Some more minor cleanup with respect to naming, test locations etc. --- docs/_plugins/copy_api_dirs.rb | 2 +- .../mllib/classification/LogisticRegression.scala | 15 ++- .../scala/spark/mllib/classification/SVM.scala | 8 +- .../scala/spark/mllib/optimization/Gradient.scala | 23 ++++- .../spark/mllib/optimization/GradientDescent.scala | 19 ++-- .../scala/spark/mllib/optimization/Updater.scala | 23 ++++- .../recommendation/MatrixFactorizationModel.scala | 9 ++ .../regression/GeneralizedLinearAlgorithm.scala | 34 +++++- .../main/scala/spark/mllib/regression/Lasso.scala | 10 +- .../spark/mllib/regression/RidgeRegression.scala | 4 +- .../spark/mllib/util/KMeansDataGenerator.scala | 10 +- .../spark/mllib/util/LassoDataGenerator.scala | 15 +-- .../util/LogisticRegressionDataGenerator.scala | 8 +- .../src/main/scala/spark/mllib/util/MLUtils.scala | 21 ++-- .../mllib/util/RidgeRegressionDataGenerator.scala | 16 ++- .../scala/spark/mllib/util/SVMDataGenerator.scala | 16 +-- .../spark/mllib/clustering/JavaKMeansSuite.java | 115 +++++++++++++++++++++ .../spark/mllib/recommendation/JavaALSSuite.java | 110 ++++++++++++++++++++ .../spark/mllib/clustering/JavaKMeansSuite.java | 115 --------------------- .../spark/mllib/recommendation/JavaALSSuite.java | 110 -------------------- 20 files changed, 397 insertions(+), 286 deletions(-) create mode 100644 mllib/src/test/java/spark/mllib/clustering/JavaKMeansSuite.java create mode 100644 mllib/src/test/java/spark/mllib/recommendation/JavaALSSuite.java delete mode 100644 mllib/src/test/scala/spark/mllib/clustering/JavaKMeansSuite.java delete mode 100644 mllib/src/test/scala/spark/mllib/recommendation/JavaALSSuite.java diff --git a/docs/_plugins/copy_api_dirs.rb b/docs/_plugins/copy_api_dirs.rb index 45ef4bba82..217254c59f 100644 --- a/docs/_plugins/copy_api_dirs.rb +++ b/docs/_plugins/copy_api_dirs.rb @@ -20,7 +20,7 @@ include FileUtils if ENV['SKIP_API'] != '1' # Build Scaladoc for Java/Scala - projects = ["core", "examples", "repl", "bagel", "streaming"] + projects = ["core", "examples", "repl", "bagel", "streaming", "mllib"] puts "Moving to project root and building scaladoc." curr_dir = pwd diff --git a/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala index 73949b0103..30ee0ab0ff 100644 --- a/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala @@ -27,8 +27,10 @@ import scala.math.round import org.jblas.DoubleMatrix /** - * Logistic Regression using Stochastic Gradient Descent. - * Based on Matlab code written by John Duchi. + * Classification model trained using Logistic Regression. + * + * @param weights Weights computed for every feature. + * @param intercept Intercept computed for this model. */ class LogisticRegressionModel( override val weights: Array[Double], @@ -43,7 +45,10 @@ class LogisticRegressionModel( } } -class LogisticRegressionWithSGD ( +/** + * Train a classification model for Logistic Regression using Stochastic Gradient Descent. + */ +class LogisticRegressionWithSGD private ( var stepSize: Double, var numIterations: Int, var regParam: Double, @@ -70,10 +75,10 @@ class LogisticRegressionWithSGD ( /** * Top-level methods for calling Logistic Regression. - * NOTE(shivaram): We use multiple train methods instead of default arguments to support - * Java programs. */ object LogisticRegressionWithSGD { + // NOTE(shivaram): We use multiple train methods instead of default arguments to support + // Java programs. /** * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed diff --git a/mllib/src/main/scala/spark/mllib/classification/SVM.scala b/mllib/src/main/scala/spark/mllib/classification/SVM.scala index fa9d5a9471..f799cb2829 100644 --- a/mllib/src/main/scala/spark/mllib/classification/SVM.scala +++ b/mllib/src/main/scala/spark/mllib/classification/SVM.scala @@ -26,7 +26,10 @@ import spark.mllib.util.MLUtils import org.jblas.DoubleMatrix /** - * SVM using Stochastic Gradient Descent. + * Model built using SVM. + * + * @param weights Weights computed for every feature. + * @param intercept Intercept computed for this model. */ class SVMModel( override val weights: Array[Double], @@ -40,6 +43,9 @@ class SVMModel( } } +/** + * Train an SVM using Stochastic Gradient Descent. + */ class SVMWithSGD private ( var stepSize: Double, var numIterations: Int, diff --git a/mllib/src/main/scala/spark/mllib/optimization/Gradient.scala b/mllib/src/main/scala/spark/mllib/optimization/Gradient.scala index 22b2ec5ed6..e72b8b3a92 100644 --- a/mllib/src/main/scala/spark/mllib/optimization/Gradient.scala +++ b/mllib/src/main/scala/spark/mllib/optimization/Gradient.scala @@ -19,18 +19,29 @@ package spark.mllib.optimization import org.jblas.DoubleMatrix +/** + * Class used to compute the gradient for a loss function, given a single data point. + */ abstract class Gradient extends Serializable { /** - * Compute the gradient for a given row of data. + * Compute the gradient and loss given features of a single data point. * - * @param data - One row of data. Row matrix of size 1xn where n is the number of features. + * @param data - Feature values for one data point. Column matrix of size nx1 + * where n is the number of features. * @param label - Label for this data item. * @param weights - Column matrix containing weights for every feature. + * + * @return A tuple of 2 elements. The first element is a column matrix containing the computed + * gradient and the second element is the loss computed at this data point. + * */ def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix): (DoubleMatrix, Double) } +/** + * Compute gradient and loss for a logistic loss function. + */ class LogisticGradient extends Gradient { override def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix): (DoubleMatrix, Double) = { @@ -49,7 +60,9 @@ class LogisticGradient extends Gradient { } } - +/** + * Compute gradient and loss for a Least-squared loss function. + */ class SquaredGradient extends Gradient { override def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix): (DoubleMatrix, Double) = { @@ -62,7 +75,9 @@ class SquaredGradient extends Gradient { } } - +/** + * Compute gradient and loss for a Hinge loss function. + */ class HingeGradient extends Gradient { override def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix): (DoubleMatrix, Double) = { diff --git a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala index 1f04398d0c..31917df7e8 100644 --- a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala @@ -24,12 +24,17 @@ import org.jblas.DoubleMatrix import scala.collection.mutable.ArrayBuffer +/** + * Class used to solve an optimization problem using Gradient Descent. + * @param gradient Gradient function to be used. + * @param updater Updater to be used to update weights after every iteration. + */ class GradientDescent(var gradient: Gradient, var updater: Updater) extends Optimizer { - var stepSize: Double = 1.0 - var numIterations: Int = 100 - var regParam: Double = 0.0 - var miniBatchFraction: Double = 1.0 + private var stepSize: Double = 1.0 + private var numIterations: Int = 100 + private var regParam: Double = 0.0 + private var miniBatchFraction: Double = 1.0 /** * Set the step size per-iteration of SGD. Default 1.0. @@ -97,10 +102,10 @@ class GradientDescent(var gradient: Gradient, var updater: Updater) extends Opti } +// Top-level method to run gradient descent. object GradientDescent extends Logging { /** * Run gradient descent in parallel using mini batches. - * Based on Matlab code written by John Duchi. * * @param data - Input data for SGD. RDD of form (label, [feature values]). * @param gradient - Gradient object that will be used to compute the gradient. @@ -137,8 +142,8 @@ object GradientDescent extends Logging { for (i <- 1 to numIterations) { val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42+i).map { case (y, features) => - val featuresRow = new DoubleMatrix(features.length, 1, features:_*) - val (grad, loss) = gradient.compute(featuresRow, y, weights) + val featuresCol = new DoubleMatrix(features.length, 1, features:_*) + val (grad, loss) = gradient.compute(featuresCol, y, weights) (grad, loss) }.reduce((a, b) => (a._1.addi(b._1), a._2 + b._2)) diff --git a/mllib/src/main/scala/spark/mllib/optimization/Updater.scala b/mllib/src/main/scala/spark/mllib/optimization/Updater.scala index 3ebc1409b6..db67d6b0bc 100644 --- a/mllib/src/main/scala/spark/mllib/optimization/Updater.scala +++ b/mllib/src/main/scala/spark/mllib/optimization/Updater.scala @@ -20,10 +20,14 @@ package spark.mllib.optimization import scala.math._ import org.jblas.DoubleMatrix +/** + * Class used to update weights used in Gradient Descent. + */ abstract class Updater extends Serializable { /** - * Compute an updated value for weights given the gradient, stepSize and iteration number. - * Also returns the regularization value computed using the *updated* weights. + * Compute an updated value for weights given the gradient, stepSize, iteration number and + * regularization parameter. Also returns the regularization value computed using the + * *updated* weights. * * @param weightsOld - Column matrix of size nx1 where n is the number of features. * @param gradient - Column matrix of size nx1 where n is the number of features. @@ -38,6 +42,10 @@ abstract class Updater extends Serializable { regParam: Double): (DoubleMatrix, Double) } +/** + * A simple updater that adaptively adjusts the learning rate the + * square root of the number of iterations. Does not perform any regularization. + */ class SimpleUpdater extends Updater { override def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix, stepSize: Double, iter: Int, regParam: Double): (DoubleMatrix, Double) = { @@ -48,11 +56,15 @@ class SimpleUpdater extends Updater { } /** - * L1 regularization -- corresponding proximal operator is the soft-thresholding function - * That is, each weight component is shrunk towards 0 by shrinkageVal + * Updater that adjusts learning rate and performs L1 regularization. + * + * The corresponding proximal operator used is the soft-thresholding function. + * That is, each weight component is shrunk towards 0 by shrinkageVal. + * * If w > shrinkageVal, set weight component to w-shrinkageVal. * If w < -shrinkageVal, set weight component to w+shrinkageVal. * If -shrinkageVal < w < shrinkageVal, set weight component to 0. + * * Equivalently, set weight component to signum(w) * max(0.0, abs(w) - shrinkageVal) */ class L1Updater extends Updater { @@ -72,6 +84,9 @@ class L1Updater extends Updater { } } +/** + * Updater that adjusts the learning rate and performs L2 regularization + */ class SquaredL2Updater extends Updater { override def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix, stepSize: Double, iter: Int, regParam: Double): (DoubleMatrix, Double) = { diff --git a/mllib/src/main/scala/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/spark/mllib/recommendation/MatrixFactorizationModel.scala index 38637b3dd1..5e21717da5 100644 --- a/mllib/src/main/scala/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -22,6 +22,15 @@ import spark.SparkContext._ import org.jblas._ +/** + * Model representing the result of matrix factorization. + * + * @param rank Rank for the features in this model. + * @param userFeatures RDD of tuples where each tuple represents the userId and + * the features computed for this user. + * @param productFeatures RDD of tuples where each tuple represents the productId + * and the features computed for this product. + */ class MatrixFactorizationModel( val rank: Int, val userFeatures: RDD[(Int, Array[Double])], diff --git a/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala b/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala index 8ea823b307..4ecafff08b 100644 --- a/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala +++ b/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala @@ -24,8 +24,11 @@ import org.jblas.DoubleMatrix /** * GeneralizedLinearModel (GLM) represents a model trained using - * GeneralizedLinearAlgorithm. GLMs consist of a weight vector, + * GeneralizedLinearAlgorithm. GLMs consist of a weight vector and * an intercept. + * + * @param weights Weights computed for every feature. + * @param intercept Intercept computed for this model. */ abstract class GeneralizedLinearModel(val weights: Array[Double], val intercept: Double) extends Serializable { @@ -43,6 +46,12 @@ abstract class GeneralizedLinearModel(val weights: Array[Double], val intercept: def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix, intercept: Double): Double + /** + * Predict values for the given data set using the model trained. + * + * @param testData RDD representing data points to be predicted + * @return RDD[Double] where each entry contains the corresponding prediction + */ def predict(testData: spark.RDD[Array[Double]]): RDD[Double] = { // A small optimization to avoid serializing the entire model. Only the weightsMatrix // and intercept is needed. @@ -55,6 +64,12 @@ abstract class GeneralizedLinearModel(val weights: Array[Double], val intercept: } } + /** + * Predict values for a single data point using the model trained. + * + * @param testData array representing a single data point + * @return Double prediction from the trained model + */ def predict(testData: Array[Double]): Double = { val dataMat = new DoubleMatrix(1, testData.length, testData:_*) predictPoint(dataMat, weightsMatrix, intercept) @@ -62,7 +77,7 @@ abstract class GeneralizedLinearModel(val weights: Array[Double], val intercept: } /** - * GeneralizedLinearAlgorithm abstracts out the training for all GLMs. + * GeneralizedLinearAlgorithm implements methods to train a Genearalized Linear Model (GLM). * This class should be extended with an Optimizer to create a new GLM. */ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel] @@ -70,9 +85,12 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel] val optimizer: Optimizer - def createModel(weights: Array[Double], intercept: Double): M + /** + * Create a model given the weights and intercept + */ + protected def createModel(weights: Array[Double], intercept: Double): M - var addIntercept: Boolean + protected var addIntercept: Boolean /** * Set if the algorithm should add an intercept. Default true. @@ -82,12 +100,20 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel] this } + /** + * Run the algorithm with the configured parameters on an input + * RDD of LabeledPoint entries. + */ def run(input: RDD[LabeledPoint]) : M = { val nfeatures: Int = input.first().features.length val initialWeights = Array.fill(nfeatures)(1.0) run(input, initialWeights) } + /** + * Run the algorithm with the configured parameters on an input RDD + * of LabeledPoint entries starting from the initial weights provided. + */ def run(input: RDD[LabeledPoint], initialWeights: Array[Double]) : M = { // Add a extra variable consisting of all 1.0's for the intercept. diff --git a/mllib/src/main/scala/spark/mllib/regression/Lasso.scala b/mllib/src/main/scala/spark/mllib/regression/Lasso.scala index 989e5ded58..6bbc990a5a 100644 --- a/mllib/src/main/scala/spark/mllib/regression/Lasso.scala +++ b/mllib/src/main/scala/spark/mllib/regression/Lasso.scala @@ -24,8 +24,10 @@ import spark.mllib.util.MLUtils import org.jblas.DoubleMatrix /** - * Lasso using Stochastic Gradient Descent. + * Regression model trained using Lasso. * + * @param weights Weights computed for every feature. + * @param intercept Intercept computed for this model. */ class LassoModel( override val weights: Array[Double], @@ -39,8 +41,10 @@ class LassoModel( } } - -class LassoWithSGD ( +/** + * Train a regression model with L1-regularization using Stochastic Gradient Descent. + */ +class LassoWithSGD private ( var stepSize: Double, var numIterations: Int, var regParam: Double, diff --git a/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala b/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala index de790dde51..b42d94af41 100644 --- a/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala +++ b/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala @@ -168,10 +168,10 @@ class RidgeRegression private (var lambdaLow: Double, var lambdaHigh: Double) /** * Top-level methods for calling Ridge Regression. - * NOTE(shivaram): We use multiple train methods instead of default arguments to support - * Java programs. */ object RidgeRegression { + // NOTE(shivaram): We use multiple train methods instead of default arguments to support + // Java programs. /** * Train a ridge regression model given an RDD of (response, features) pairs. diff --git a/mllib/src/main/scala/spark/mllib/util/KMeansDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/KMeansDataGenerator.scala index c89e5dd738..672b63f65a 100644 --- a/mllib/src/main/scala/spark/mllib/util/KMeansDataGenerator.scala +++ b/mllib/src/main/scala/spark/mllib/util/KMeansDataGenerator.scala @@ -21,12 +21,16 @@ import scala.util.Random import spark.{RDD, SparkContext} +/** + * Generate test data for KMeans. This class first chooses k cluster centers + * from a d-dimensional Gaussian distribution scaled by factor r and then creates a Gaussian + * cluster with scale 1 around each center. + */ + object KMeansDataGenerator { /** - * Generate an RDD containing test data for KMeans. This function chooses k cluster centers - * from a d-dimensional Gaussian distribution scaled by factor r, then creates a Gaussian - * cluster with scale 1 around each center. + * Generate an RDD containing test data for KMeans. * * @param sc SparkContext to use for creating the RDD * @param numPoints Number of points that will be contained in the RDD diff --git a/mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala index 1f185c9de7..eeb14fc4e3 100644 --- a/mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala +++ b/mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala @@ -1,18 +1,22 @@ -package spark.mllib.regression +package spark.mllib.util import scala.util.Random import org.jblas.DoubleMatrix import spark.{RDD, SparkContext} -import spark.mllib.util.MLUtils +import spark.mllib.regression.LabeledPoint -object LassoGenerator { +/** + * Generate sample data used for Lasso Regression. This class generates uniform random values + * for the features and adds Gaussian noise with weight 0.1 to generate response variables. + */ +object LassoDataGenerator { def main(args: Array[String]) { - if (args.length != 5) { + if (args.length < 2) { println("Usage: LassoGenerator " + - " ") + " [num_examples] [num_features] [num_partitions]") System.exit(1) } @@ -21,7 +25,6 @@ object LassoGenerator { val nexamples: Int = if (args.length > 2) args(2).toInt else 1000 val nfeatures: Int = if (args.length > 3) args(3).toInt else 2 val parts: Int = if (args.length > 4) args(4).toInt else 2 - val eps = 3 val sc = new SparkContext(sparkMaster, "LassoGenerator") diff --git a/mllib/src/main/scala/spark/mllib/util/LogisticRegressionDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/LogisticRegressionDataGenerator.scala index 4fa19c3c23..d6402f23e2 100644 --- a/mllib/src/main/scala/spark/mllib/util/LogisticRegressionDataGenerator.scala +++ b/mllib/src/main/scala/spark/mllib/util/LogisticRegressionDataGenerator.scala @@ -22,11 +22,15 @@ import scala.util.Random import spark.{RDD, SparkContext} import spark.mllib.regression.LabeledPoint +/** + * Generate test data for LogisticRegression. This class chooses positive labels + * with probability `probOne` and scales features for positive examples by `eps`. + */ + object LogisticRegressionDataGenerator { /** - * Generate an RDD containing test data for LogisticRegression. This function chooses - * positive labels with probability `probOne` and scales positive examples by `eps`. + * Generate an RDD containing test data for LogisticRegression. * * @param sc SparkContext to use for creating the RDD. * @param nexamples Number of examples that will be contained in the RDD. diff --git a/mllib/src/main/scala/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/spark/mllib/util/MLUtils.scala index 9174e8cea7..4e030a81b4 100644 --- a/mllib/src/main/scala/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/spark/mllib/util/MLUtils.scala @@ -24,18 +24,19 @@ import org.jblas.DoubleMatrix import spark.mllib.regression.LabeledPoint /** - * Helper methods to load and save data - * Data format: - * , ... - * where , are feature values in Double and is the corresponding label as Double. + * Helper methods to load, save and pre-process data used in ML Lib. */ object MLUtils { /** + * Load labeled data from a file. The data format used here is + * , ... + * where , are feature values in Double and is the corresponding label as Double. + * * @param sc SparkContext * @param dir Directory to the input data files. - * @return An RDD of tuples. For each tuple, the first element is the label, and the second - * element represents the feature values (an array of Double). + * @return An RDD of LabeledPoint. Each labeled point has two elements: the first element is + * the label, and the second element represents the feature values (an array of Double). */ def loadLabeledData(sc: SparkContext, dir: String): RDD[LabeledPoint] = { sc.textFile(dir).map { line => @@ -46,6 +47,14 @@ object MLUtils { } } + /** + * Save labeled data to a file. The data format used here is + * , ... + * where , are feature values in Double and is the corresponding label as Double. + * + * @param data An RDD of LabeledPoints containing data to be saved. + * @param dir Directory to save the data. + */ def saveLabeledData(data: RDD[LabeledPoint], dir: String) { val dataStr = data.map(x => x.label + "," + x.features.mkString(" ")) dataStr.saveAsTextFile(dir) diff --git a/mllib/src/main/scala/spark/mllib/util/RidgeRegressionDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/RidgeRegressionDataGenerator.scala index c4d65c3f9a..4d329168be 100644 --- a/mllib/src/main/scala/spark/mllib/util/RidgeRegressionDataGenerator.scala +++ b/mllib/src/main/scala/spark/mllib/util/RidgeRegressionDataGenerator.scala @@ -24,18 +24,24 @@ import org.jblas.DoubleMatrix import spark.{RDD, SparkContext} import spark.mllib.regression.LabeledPoint +/** + * Generate sample data used for RidgeRegression. This class generates + * uniformly random values for every feature and adds Gaussian noise with mean `eps` to the + * response variable `Y`. + * + */ object RidgeRegressionDataGenerator { /** - * Generate an RDD containing test data used for RidgeRegression. This function generates - * uniformly random values for every feature and adds Gaussian noise with mean `eps` to the - * response variable `Y`. + * Generate an RDD containing sample data for RidgeRegression. * * @param sc SparkContext to be used for generating the RDD. * @param nexamples Number of examples that will be contained in the RDD. * @param nfeatures Number of features to generate for each example. * @param eps Epsilon factor by which examples are scaled. * @param nparts Number of partitions in the RDD. Default value is 2. + * + * @return RDD of LabeledPoint containing sample data. */ def generateRidgeRDD( sc: SparkContext, @@ -69,9 +75,9 @@ object RidgeRegressionDataGenerator { } def main(args: Array[String]) { - if (args.length != 5) { + if (args.length < 2) { println("Usage: RidgeRegressionGenerator " + - " ") + " [num_examples] [num_features] [num_partitions]") System.exit(1) } diff --git a/mllib/src/main/scala/spark/mllib/util/SVMDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/SVMDataGenerator.scala index a37f6eb3b3..e02bd190f6 100644 --- a/mllib/src/main/scala/spark/mllib/util/SVMDataGenerator.scala +++ b/mllib/src/main/scala/spark/mllib/util/SVMDataGenerator.scala @@ -1,22 +1,23 @@ -package spark.mllib.classification +package spark.mllib.util import scala.util.Random import scala.math.signum -import org.jblas.DoubleMatrix - import spark.{RDD, SparkContext} -import spark.mllib.util.MLUtils import org.jblas.DoubleMatrix import spark.mllib.regression.LabeledPoint -object SVMGenerator { +/** + * Generate sample data used for SVM. This class generates uniform random values + * for the features and adds Gaussian noise with weight 0.1 to generate labels. + */ +object SVMDataGenerator { def main(args: Array[String]) { - if (args.length != 5) { + if (args.length < 2) { println("Usage: SVMGenerator " + - " ") + " [num_examples] [num_features] [num_partitions]") System.exit(1) } @@ -25,7 +26,6 @@ object SVMGenerator { val nexamples: Int = if (args.length > 2) args(2).toInt else 1000 val nfeatures: Int = if (args.length > 3) args(3).toInt else 2 val parts: Int = if (args.length > 4) args(4).toInt else 2 - val eps = 3 val sc = new SparkContext(sparkMaster, "SVMGenerator") diff --git a/mllib/src/test/java/spark/mllib/clustering/JavaKMeansSuite.java b/mllib/src/test/java/spark/mllib/clustering/JavaKMeansSuite.java new file mode 100644 index 0000000000..3f2d82bfb4 --- /dev/null +++ b/mllib/src/test/java/spark/mllib/clustering/JavaKMeansSuite.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.mllib.clustering; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import spark.api.java.JavaRDD; +import spark.api.java.JavaSparkContext; + +public class JavaKMeansSuite implements Serializable { + private transient JavaSparkContext sc; + + @Before + public void setUp() { + sc = new JavaSparkContext("local", "JavaKMeans"); + } + + @After + public void tearDown() { + sc.stop(); + sc = null; + System.clearProperty("spark.driver.port"); + } + + // L1 distance between two points + double distance1(double[] v1, double[] v2) { + double distance = 0.0; + for (int i = 0; i < v1.length; ++i) { + distance = Math.max(distance, Math.abs(v1[i] - v2[i])); + } + return distance; + } + + // Assert that two sets of points are equal, within EPSILON tolerance + void assertSetsEqual(double[][] v1, double[][] v2) { + double EPSILON = 1e-4; + Assert.assertTrue(v1.length == v2.length); + for (int i = 0; i < v1.length; ++i) { + double minDistance = Double.MAX_VALUE; + for (int j = 0; j < v2.length; ++j) { + minDistance = Math.min(minDistance, distance1(v1[i], v2[j])); + } + Assert.assertTrue(minDistance <= EPSILON); + } + + for (int i = 0; i < v2.length; ++i) { + double minDistance = Double.MAX_VALUE; + for (int j = 0; j < v1.length; ++j) { + minDistance = Math.min(minDistance, distance1(v2[i], v1[j])); + } + Assert.assertTrue(minDistance <= EPSILON); + } + } + + + @Test + public void runKMeansUsingStaticMethods() { + List points = new ArrayList(); + points.add(new double[]{1.0, 2.0, 6.0}); + points.add(new double[]{1.0, 3.0, 0.0}); + points.add(new double[]{1.0, 4.0, 6.0}); + + double[][] expectedCenter = { {1.0, 3.0, 4.0} }; + + JavaRDD data = sc.parallelize(points, 2); + KMeansModel model = KMeans.train(data.rdd(), 1, 1); + assertSetsEqual(model.clusterCenters(), expectedCenter); + + model = KMeans.train(data.rdd(), 1, 1, 1, KMeans.RANDOM()); + assertSetsEqual(model.clusterCenters(), expectedCenter); + } + + @Test + public void runKMeansUsingConstructor() { + List points = new ArrayList(); + points.add(new double[]{1.0, 2.0, 6.0}); + points.add(new double[]{1.0, 3.0, 0.0}); + points.add(new double[]{1.0, 4.0, 6.0}); + + double[][] expectedCenter = { {1.0, 3.0, 4.0} }; + + JavaRDD data = sc.parallelize(points, 2); + KMeansModel model = new KMeans().setK(1).setMaxIterations(5).run(data.rdd()); + assertSetsEqual(model.clusterCenters(), expectedCenter); + + model = new KMeans().setK(1) + .setMaxIterations(1) + .setRuns(1) + .setInitializationMode(KMeans.RANDOM()) + .run(data.rdd()); + assertSetsEqual(model.clusterCenters(), expectedCenter); + } +} diff --git a/mllib/src/test/java/spark/mllib/recommendation/JavaALSSuite.java b/mllib/src/test/java/spark/mllib/recommendation/JavaALSSuite.java new file mode 100644 index 0000000000..7993629a6d --- /dev/null +++ b/mllib/src/test/java/spark/mllib/recommendation/JavaALSSuite.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.mllib.recommendation; + +import java.io.Serializable; +import java.util.List; + +import scala.Tuple2; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import spark.api.java.JavaRDD; +import spark.api.java.JavaSparkContext; + +import org.jblas.DoubleMatrix; + +public class JavaALSSuite implements Serializable { + private transient JavaSparkContext sc; + + @Before + public void setUp() { + sc = new JavaSparkContext("local", "JavaALS"); + } + + @After + public void tearDown() { + sc.stop(); + sc = null; + System.clearProperty("spark.driver.port"); + } + + void validatePrediction(MatrixFactorizationModel model, int users, int products, int features, + DoubleMatrix trueRatings, double matchThreshold) { + DoubleMatrix predictedU = new DoubleMatrix(users, features); + List> userFeatures = model.userFeatures().toJavaRDD().collect(); + for (int i = 0; i < features; ++i) { + for (scala.Tuple2 userFeature : userFeatures) { + predictedU.put((Integer)userFeature._1(), i, userFeature._2()[i]); + } + } + DoubleMatrix predictedP = new DoubleMatrix(products, features); + + List> productFeatures = + model.productFeatures().toJavaRDD().collect(); + for (int i = 0; i < features; ++i) { + for (scala.Tuple2 productFeature : productFeatures) { + predictedP.put((Integer)productFeature._1(), i, productFeature._2()[i]); + } + } + + DoubleMatrix predictedRatings = predictedU.mmul(predictedP.transpose()); + + for (int u = 0; u < users; ++u) { + for (int p = 0; p < products; ++p) { + double prediction = predictedRatings.get(u, p); + double correct = trueRatings.get(u, p); + Assert.assertTrue(Math.abs(prediction - correct) < matchThreshold); + } + } + } + + @Test + public void runALSUsingStaticMethods() { + int features = 1; + int iterations = 15; + int users = 10; + int products = 10; + scala.Tuple2, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList( + users, products, features, 0.7); + + JavaRDD data = sc.parallelize(testData._1()); + MatrixFactorizationModel model = ALS.train(data.rdd(), features, iterations); + validatePrediction(model, users, products, features, testData._2(), 0.3); + } + + @Test + public void runALSUsingConstructor() { + int features = 2; + int iterations = 15; + int users = 20; + int products = 30; + scala.Tuple2, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList( + users, products, features, 0.7); + + JavaRDD data = sc.parallelize(testData._1()); + + MatrixFactorizationModel model = new ALS().setRank(features) + .setIterations(iterations) + .run(data.rdd()); + validatePrediction(model, users, products, features, testData._2(), 0.3); + } +} diff --git a/mllib/src/test/scala/spark/mllib/clustering/JavaKMeansSuite.java b/mllib/src/test/scala/spark/mllib/clustering/JavaKMeansSuite.java deleted file mode 100644 index 3f2d82bfb4..0000000000 --- a/mllib/src/test/scala/spark/mllib/clustering/JavaKMeansSuite.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package spark.mllib.clustering; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; - -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import spark.api.java.JavaRDD; -import spark.api.java.JavaSparkContext; - -public class JavaKMeansSuite implements Serializable { - private transient JavaSparkContext sc; - - @Before - public void setUp() { - sc = new JavaSparkContext("local", "JavaKMeans"); - } - - @After - public void tearDown() { - sc.stop(); - sc = null; - System.clearProperty("spark.driver.port"); - } - - // L1 distance between two points - double distance1(double[] v1, double[] v2) { - double distance = 0.0; - for (int i = 0; i < v1.length; ++i) { - distance = Math.max(distance, Math.abs(v1[i] - v2[i])); - } - return distance; - } - - // Assert that two sets of points are equal, within EPSILON tolerance - void assertSetsEqual(double[][] v1, double[][] v2) { - double EPSILON = 1e-4; - Assert.assertTrue(v1.length == v2.length); - for (int i = 0; i < v1.length; ++i) { - double minDistance = Double.MAX_VALUE; - for (int j = 0; j < v2.length; ++j) { - minDistance = Math.min(minDistance, distance1(v1[i], v2[j])); - } - Assert.assertTrue(minDistance <= EPSILON); - } - - for (int i = 0; i < v2.length; ++i) { - double minDistance = Double.MAX_VALUE; - for (int j = 0; j < v1.length; ++j) { - minDistance = Math.min(minDistance, distance1(v2[i], v1[j])); - } - Assert.assertTrue(minDistance <= EPSILON); - } - } - - - @Test - public void runKMeansUsingStaticMethods() { - List points = new ArrayList(); - points.add(new double[]{1.0, 2.0, 6.0}); - points.add(new double[]{1.0, 3.0, 0.0}); - points.add(new double[]{1.0, 4.0, 6.0}); - - double[][] expectedCenter = { {1.0, 3.0, 4.0} }; - - JavaRDD data = sc.parallelize(points, 2); - KMeansModel model = KMeans.train(data.rdd(), 1, 1); - assertSetsEqual(model.clusterCenters(), expectedCenter); - - model = KMeans.train(data.rdd(), 1, 1, 1, KMeans.RANDOM()); - assertSetsEqual(model.clusterCenters(), expectedCenter); - } - - @Test - public void runKMeansUsingConstructor() { - List points = new ArrayList(); - points.add(new double[]{1.0, 2.0, 6.0}); - points.add(new double[]{1.0, 3.0, 0.0}); - points.add(new double[]{1.0, 4.0, 6.0}); - - double[][] expectedCenter = { {1.0, 3.0, 4.0} }; - - JavaRDD data = sc.parallelize(points, 2); - KMeansModel model = new KMeans().setK(1).setMaxIterations(5).run(data.rdd()); - assertSetsEqual(model.clusterCenters(), expectedCenter); - - model = new KMeans().setK(1) - .setMaxIterations(1) - .setRuns(1) - .setInitializationMode(KMeans.RANDOM()) - .run(data.rdd()); - assertSetsEqual(model.clusterCenters(), expectedCenter); - } -} diff --git a/mllib/src/test/scala/spark/mllib/recommendation/JavaALSSuite.java b/mllib/src/test/scala/spark/mllib/recommendation/JavaALSSuite.java deleted file mode 100644 index 7993629a6d..0000000000 --- a/mllib/src/test/scala/spark/mllib/recommendation/JavaALSSuite.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package spark.mllib.recommendation; - -import java.io.Serializable; -import java.util.List; - -import scala.Tuple2; - -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import spark.api.java.JavaRDD; -import spark.api.java.JavaSparkContext; - -import org.jblas.DoubleMatrix; - -public class JavaALSSuite implements Serializable { - private transient JavaSparkContext sc; - - @Before - public void setUp() { - sc = new JavaSparkContext("local", "JavaALS"); - } - - @After - public void tearDown() { - sc.stop(); - sc = null; - System.clearProperty("spark.driver.port"); - } - - void validatePrediction(MatrixFactorizationModel model, int users, int products, int features, - DoubleMatrix trueRatings, double matchThreshold) { - DoubleMatrix predictedU = new DoubleMatrix(users, features); - List> userFeatures = model.userFeatures().toJavaRDD().collect(); - for (int i = 0; i < features; ++i) { - for (scala.Tuple2 userFeature : userFeatures) { - predictedU.put((Integer)userFeature._1(), i, userFeature._2()[i]); - } - } - DoubleMatrix predictedP = new DoubleMatrix(products, features); - - List> productFeatures = - model.productFeatures().toJavaRDD().collect(); - for (int i = 0; i < features; ++i) { - for (scala.Tuple2 productFeature : productFeatures) { - predictedP.put((Integer)productFeature._1(), i, productFeature._2()[i]); - } - } - - DoubleMatrix predictedRatings = predictedU.mmul(predictedP.transpose()); - - for (int u = 0; u < users; ++u) { - for (int p = 0; p < products; ++p) { - double prediction = predictedRatings.get(u, p); - double correct = trueRatings.get(u, p); - Assert.assertTrue(Math.abs(prediction - correct) < matchThreshold); - } - } - } - - @Test - public void runALSUsingStaticMethods() { - int features = 1; - int iterations = 15; - int users = 10; - int products = 10; - scala.Tuple2, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList( - users, products, features, 0.7); - - JavaRDD data = sc.parallelize(testData._1()); - MatrixFactorizationModel model = ALS.train(data.rdd(), features, iterations); - validatePrediction(model, users, products, features, testData._2(), 0.3); - } - - @Test - public void runALSUsingConstructor() { - int features = 2; - int iterations = 15; - int users = 20; - int products = 30; - scala.Tuple2, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList( - users, products, features, 0.7); - - JavaRDD data = sc.parallelize(testData._1()); - - MatrixFactorizationModel model = new ALS().setRank(features) - .setIterations(iterations) - .run(data.rdd()); - validatePrediction(model, users, products, features, testData._2(), 0.3); - } -} -- cgit v1.2.3