aboutsummaryrefslogtreecommitdiff
path: root/mllib/src/main/scala
diff options
context:
space:
mode:
authorEvan Sparks <evan.sparks@gmail.com>2013-08-11 10:52:55 -0700
committerEvan Sparks <evan.sparks@gmail.com>2013-08-11 10:52:55 -0700
commitff9ebfabb47e3439c7b78cb4e3c33423a1467a9a (patch)
tree8ce3ef1b12dbe1f0a8e9c64fc5a684bcab66e6bb /mllib/src/main/scala
parent95c62ca3060c89a44aa19aaab1fc9a9fff5a1196 (diff)
parenta65a6ed5140446651916aff1761a9a755194eaf4 (diff)
downloadspark-ff9ebfabb47e3439c7b78cb4e3c33423a1467a9a.tar.gz
spark-ff9ebfabb47e3439c7b78cb4e3c33423a1467a9a.tar.bz2
spark-ff9ebfabb47e3439c7b78cb4e3c33423a1467a9a.zip
Merge pull request #762 from shivaram/sgd-cleanup
Refactor SGD options into a new class.
Diffstat (limited to 'mllib/src/main/scala')
-rw-r--r--mllib/src/main/scala/spark/mllib/classification/ClassificationModel.scala4
-rw-r--r--mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala141
-rw-r--r--mllib/src/main/scala/spark/mllib/classification/SVM.scala143
-rw-r--r--mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala91
-rw-r--r--mllib/src/main/scala/spark/mllib/optimization/Optimizer.scala29
-rw-r--r--mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala116
-rw-r--r--mllib/src/main/scala/spark/mllib/regression/LabeledPoint.scala26
-rw-r--r--mllib/src/main/scala/spark/mllib/regression/Lasso.scala139
-rw-r--r--mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala7
-rw-r--r--mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala4
-rw-r--r--mllib/src/main/scala/spark/mllib/util/LogisticRegressionDataGenerator.scala5
-rw-r--r--mllib/src/main/scala/spark/mllib/util/MLUtils.scala9
-rw-r--r--mllib/src/main/scala/spark/mllib/util/RidgeRegressionDataGenerator.scala7
-rw-r--r--mllib/src/main/scala/spark/mllib/util/SVMDataGenerator.scala5
14 files changed, 389 insertions, 337 deletions
diff --git a/mllib/src/main/scala/spark/mllib/classification/ClassificationModel.scala b/mllib/src/main/scala/spark/mllib/classification/ClassificationModel.scala
index d6154b66ae..70fae8c15a 100644
--- a/mllib/src/main/scala/spark/mllib/classification/ClassificationModel.scala
+++ b/mllib/src/main/scala/spark/mllib/classification/ClassificationModel.scala
@@ -9,7 +9,7 @@ trait ClassificationModel extends Serializable {
* @param testData RDD representing data points to be predicted
* @return RDD[Int] where each entry contains the corresponding prediction
*/
- def predict(testData: RDD[Array[Double]]): RDD[Int]
+ def predict(testData: RDD[Array[Double]]): RDD[Double]
/**
* Predict values for a single data point using the model trained.
@@ -17,5 +17,5 @@ trait ClassificationModel extends Serializable {
* @param testData array representing a single data point
* @return Int prediction from the trained model
*/
- def predict(testData: Array[Double]): Int
+ def predict(testData: Array[Double]): Double
}
diff --git a/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala
index 203aa8fdd4..73949b0103 100644
--- a/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala
+++ b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala
@@ -19,6 +19,7 @@ package spark.mllib.classification
import spark.{Logging, RDD, SparkContext}
import spark.mllib.optimization._
+import spark.mllib.regression._
import spark.mllib.util.MLUtils
import scala.math.round
@@ -30,109 +31,49 @@ import org.jblas.DoubleMatrix
* Based on Matlab code written by John Duchi.
*/
class LogisticRegressionModel(
- val weights: Array[Double],
- val intercept: Double,
- val stochasticLosses: Array[Double]) extends ClassificationModel {
-
- // Create a column vector that can be used for predictions
- private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*)
-
- override def predict(testData: spark.RDD[Array[Double]]): RDD[Int] = {
- // A small optimization to avoid serializing the entire model. Only the weightsMatrix
- // and intercept is needed.
- val localWeights = weightsMatrix
- val localIntercept = intercept
- testData.map { x =>
- val margin = new DoubleMatrix(1, x.length, x:_*).mmul(localWeights).get(0) + localIntercept
- round(1.0/ (1.0 + math.exp(margin * -1))).toInt
- }
- }
-
- override def predict(testData: Array[Double]): Int = {
- val dataMat = new DoubleMatrix(1, testData.length, testData:_*)
- val margin = dataMat.mmul(weightsMatrix).get(0) + this.intercept
- round(1.0/ (1.0 + math.exp(margin * -1))).toInt
+ override val weights: Array[Double],
+ override val intercept: Double)
+ extends GeneralizedLinearModel(weights, intercept)
+ with ClassificationModel with Serializable {
+
+ override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix,
+ intercept: Double) = {
+ val margin = dataMatrix.mmul(weightMatrix).get(0) + intercept
+ round(1.0/ (1.0 + math.exp(margin * -1)))
}
}
-class LogisticRegressionLocalRandomSGD private (var stepSize: Double, var miniBatchFraction: Double,
- var numIters: Int)
- extends Logging {
-
+class LogisticRegressionWithSGD (
+ var stepSize: Double,
+ var numIterations: Int,
+ var regParam: Double,
+ var miniBatchFraction: Double,
+ var addIntercept: Boolean)
+ extends GeneralizedLinearAlgorithm[LogisticRegressionModel]
+ with Serializable {
+
+ val gradient = new LogisticGradient()
+ val updater = new SimpleUpdater()
+ val optimizer = new GradientDescent(gradient, updater).setStepSize(stepSize)
+ .setNumIterations(numIterations)
+ .setRegParam(regParam)
+ .setMiniBatchFraction(miniBatchFraction)
/**
* Construct a LogisticRegression object with default parameters
*/
- def this() = this(1.0, 1.0, 100)
-
- /**
- * Set the step size per-iteration of SGD. Default 1.0.
- */
- def setStepSize(step: Double) = {
- this.stepSize = step
- this
- }
-
- /**
- * Set fraction of data to be used for each SGD iteration. Default 1.0.
- */
- def setMiniBatchFraction(fraction: Double) = {
- this.miniBatchFraction = fraction
- this
- }
-
- /**
- * Set the number of iterations for SGD. Default 100.
- */
- def setNumIterations(iters: Int) = {
- this.numIters = iters
- this
- }
-
- def train(input: RDD[(Int, Array[Double])]): LogisticRegressionModel = {
- val nfeatures: Int = input.take(1)(0)._2.length
- val initialWeights = Array.fill(nfeatures)(1.0)
- train(input, initialWeights)
- }
-
- def train(
- input: RDD[(Int, Array[Double])],
- initialWeights: Array[Double]): LogisticRegressionModel = {
-
- // Add a extra variable consisting of all 1.0's for the intercept.
- val data = input.map { case (y, features) =>
- (y.toDouble, Array(1.0, features:_*))
- }
-
- val initalWeightsWithIntercept = Array(1.0, initialWeights:_*)
-
- val (weights, stochasticLosses) = GradientDescent.runMiniBatchSGD(
- data,
- new LogisticGradient(),
- new SimpleUpdater(),
- stepSize,
- numIters,
- 0.0,
- initalWeightsWithIntercept,
- miniBatchFraction)
-
- val intercept = weights(0)
- val weightsScaled = weights.tail
-
- val model = new LogisticRegressionModel(weightsScaled, intercept, stochasticLosses)
+ def this() = this(1.0, 100, 0.0, 1.0, true)
- logInfo("Final model weights " + model.weights.mkString(","))
- logInfo("Final model intercept " + model.intercept)
- logInfo("Last 10 stochastic losses " + model.stochasticLosses.takeRight(10).mkString(", "))
- model
+ def createModel(weights: Array[Double], intercept: Double) = {
+ new LogisticRegressionModel(weights, intercept)
}
}
/**
* Top-level methods for calling Logistic Regression.
- * NOTE(shivaram): We use multiple train methods instead of default arguments to support
+ * NOTE(shivaram): We use multiple train methods instead of default arguments to support
* Java programs.
*/
-object LogisticRegressionLocalRandomSGD {
+object LogisticRegressionWithSGD {
/**
* Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed
@@ -148,14 +89,14 @@ object LogisticRegressionLocalRandomSGD {
* the number of features in the data.
*/
def train(
- input: RDD[(Int, Array[Double])],
+ input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
miniBatchFraction: Double,
initialWeights: Array[Double])
: LogisticRegressionModel =
{
- new LogisticRegressionLocalRandomSGD(stepSize, miniBatchFraction, numIterations).train(
+ new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction, true).run(
input, initialWeights)
}
@@ -171,13 +112,14 @@ object LogisticRegressionLocalRandomSGD {
* @param miniBatchFraction Fraction of data to be used per iteration.
*/
def train(
- input: RDD[(Int, Array[Double])],
+ input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
miniBatchFraction: Double)
: LogisticRegressionModel =
{
- new LogisticRegressionLocalRandomSGD(stepSize, miniBatchFraction, numIterations).train(input)
+ new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction, true).run(
+ input)
}
/**
@@ -192,7 +134,7 @@ object LogisticRegressionLocalRandomSGD {
* @return a LogisticRegressionModel which has the weights and offset from training.
*/
def train(
- input: RDD[(Int, Array[Double])],
+ input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double)
: LogisticRegressionModel =
@@ -210,7 +152,7 @@ object LogisticRegressionLocalRandomSGD {
* @return a LogisticRegressionModel which has the weights and offset from training.
*/
def train(
- input: RDD[(Int, Array[Double])],
+ input: RDD[LabeledPoint],
numIterations: Int)
: LogisticRegressionModel =
{
@@ -218,15 +160,14 @@ object LogisticRegressionLocalRandomSGD {
}
def main(args: Array[String]) {
- if (args.length != 5) {
+ if (args.length != 4) {
println("Usage: LogisticRegression <master> <input_dir> <step_size> " +
- "<regularization_parameter> <niters>")
+ "<niters>")
System.exit(1)
}
val sc = new SparkContext(args(0), "LogisticRegression")
- val data = MLUtils.loadLabeledData(sc, args(1)).map(yx => (yx._1.toInt, yx._2))
- val model = LogisticRegressionLocalRandomSGD.train(
- data, args(4).toInt, args(2).toDouble, args(3).toDouble)
+ val data = MLUtils.loadLabeledData(sc, args(1))
+ val model = LogisticRegressionWithSGD.train(data, args(3).toInt, args(2).toDouble)
sc.stop()
}
diff --git a/mllib/src/main/scala/spark/mllib/classification/SVM.scala b/mllib/src/main/scala/spark/mllib/classification/SVM.scala
index 3a6a12814a..fa9d5a9471 100644
--- a/mllib/src/main/scala/spark/mllib/classification/SVM.scala
+++ b/mllib/src/main/scala/spark/mllib/classification/SVM.scala
@@ -20,6 +20,7 @@ package spark.mllib.classification
import scala.math.signum
import spark.{Logging, RDD, SparkContext}
import spark.mllib.optimization._
+import spark.mllib.regression._
import spark.mllib.util.MLUtils
import org.jblas.DoubleMatrix
@@ -28,117 +29,45 @@ import org.jblas.DoubleMatrix
* SVM using Stochastic Gradient Descent.
*/
class SVMModel(
- val weights: Array[Double],
- val intercept: Double,
- val stochasticLosses: Array[Double]) extends ClassificationModel {
-
- // Create a column vector that can be used for predictions
- private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*)
-
- override def predict(testData: spark.RDD[Array[Double]]): RDD[Int] = {
- // A small optimization to avoid serializing the entire model. Only the weightsMatrix
- // and intercept is needed.
- val localWeights = weightsMatrix
- val localIntercept = intercept
- testData.map { x =>
- signum(new DoubleMatrix(1, x.length, x:_*).dot(localWeights) + localIntercept).toInt
- }
- }
-
- override def predict(testData: Array[Double]): Int = {
- val dataMat = new DoubleMatrix(1, testData.length, testData:_*)
- signum(dataMat.dot(weightsMatrix) + this.intercept).toInt
+ override val weights: Array[Double],
+ override val intercept: Double)
+ extends GeneralizedLinearModel(weights, intercept)
+ with ClassificationModel with Serializable {
+
+ override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix,
+ intercept: Double) = {
+ signum(dataMatrix.dot(weightMatrix) + intercept)
}
}
-
-
-class SVMLocalRandomSGD private (var stepSize: Double, var regParam: Double,
- var miniBatchFraction: Double, var numIters: Int)
- extends Logging {
-
+class SVMWithSGD private (
+ var stepSize: Double,
+ var numIterations: Int,
+ var regParam: Double,
+ var miniBatchFraction: Double,
+ var addIntercept: Boolean)
+ extends GeneralizedLinearAlgorithm[SVMModel] with Serializable {
+
+ val gradient = new HingeGradient()
+ val updater = new SquaredL2Updater()
+ val optimizer = new GradientDescent(gradient, updater).setStepSize(stepSize)
+ .setNumIterations(numIterations)
+ .setRegParam(regParam)
+ .setMiniBatchFraction(miniBatchFraction)
/**
* Construct a SVM object with default parameters
*/
- def this() = this(1.0, 1.0, 1.0, 100)
-
- /**
- * Set the step size per-iteration of SGD. Default 1.0.
- */
- def setStepSize(step: Double) = {
- this.stepSize = step
- this
- }
-
- /**
- * Set the regularization parameter. Default 1.0.
- */
- def setRegParam(param: Double) = {
- this.regParam = param
- this
- }
-
- /**
- * Set fraction of data to be used for each SGD iteration. Default 1.0.
- */
- def setMiniBatchFraction(fraction: Double) = {
- this.miniBatchFraction = fraction
- this
- }
-
- /**
- * Set the number of iterations for SGD. Default 100.
- */
- def setNumIterations(iters: Int) = {
- this.numIters = iters
- this
- }
-
- def train(input: RDD[(Int, Array[Double])]): SVMModel = {
- val nfeatures: Int = input.take(1)(0)._2.length
- val initialWeights = Array.fill(nfeatures)(1.0)
- train(input, initialWeights)
- }
-
- def train(
- input: RDD[(Int, Array[Double])],
- initialWeights: Array[Double]): SVMModel = {
-
- // Add a extra variable consisting of all 1.0's for the intercept.
- val data = input.map { case (y, features) =>
- (y.toDouble, Array(1.0, features:_*))
- }
-
- val initalWeightsWithIntercept = Array(1.0, initialWeights:_*)
-
- val (weights, stochasticLosses) = GradientDescent.runMiniBatchSGD(
- data,
- new HingeGradient(),
- new SquaredL2Updater(),
- stepSize,
- numIters,
- regParam,
- initalWeightsWithIntercept,
- miniBatchFraction)
+ def this() = this(1.0, 100, 1.0, 1.0, true)
- val intercept = weights(0)
- val weightsScaled = weights.tail
-
- val model = new SVMModel(weightsScaled, intercept, stochasticLosses)
-
- logInfo("Final model weights " + model.weights.mkString(","))
- logInfo("Final model intercept " + model.intercept)
- logInfo("Last 10 stochasticLosses " + model.stochasticLosses.takeRight(10).mkString(", "))
- model
+ def createModel(weights: Array[Double], intercept: Double) = {
+ new SVMModel(weights, intercept)
}
}
/**
* Top-level methods for calling SVM.
-
-
*/
-object SVMLocalRandomSGD {
+object SVMWithSGD {
/**
* Train a SVM model given an RDD of (label, features) pairs. We run a fixed number
@@ -155,7 +84,7 @@ object SVMLocalRandomSGD {
* the number of features in the data.
*/
def train(
- input: RDD[(Int, Array[Double])],
+ input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
regParam: Double,
@@ -163,8 +92,8 @@ object SVMLocalRandomSGD {
initialWeights: Array[Double])
: SVMModel =
{
- new SVMLocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train(
- input, initialWeights)
+ new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).run(input,
+ initialWeights)
}
/**
@@ -179,14 +108,14 @@ object SVMLocalRandomSGD {
* @param miniBatchFraction Fraction of data to be used per iteration.
*/
def train(
- input: RDD[(Int, Array[Double])],
+ input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
regParam: Double,
miniBatchFraction: Double)
: SVMModel =
{
- new SVMLocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train(input)
+ new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).run(input)
}
/**
@@ -201,7 +130,7 @@ object SVMLocalRandomSGD {
* @return a SVMModel which has the weights and offset from training.
*/
def train(
- input: RDD[(Int, Array[Double])],
+ input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
regParam: Double)
@@ -220,7 +149,7 @@ object SVMLocalRandomSGD {
* @return a SVMModel which has the weights and offset from training.
*/
def train(
- input: RDD[(Int, Array[Double])],
+ input: RDD[LabeledPoint],
numIterations: Int)
: SVMModel =
{
@@ -233,8 +162,8 @@ object SVMLocalRandomSGD {
System.exit(1)
}
val sc = new SparkContext(args(0), "SVM")
- val data = MLUtils.loadLabeledData(sc, args(1)).map(yx => (yx._1.toInt, yx._2))
- val model = SVMLocalRandomSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble)
+ val data = MLUtils.loadLabeledData(sc, args(1))
+ val model = SVMWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble)
sc.stop()
}
diff --git a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala
index 19cda26446..1f04398d0c 100644
--- a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala
+++ b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala
@@ -24,10 +24,81 @@ import org.jblas.DoubleMatrix
import scala.collection.mutable.ArrayBuffer
+class GradientDescent(var gradient: Gradient, var updater: Updater) extends Optimizer {
-object GradientDescent {
+ var stepSize: Double = 1.0
+ var numIterations: Int = 100
+ var regParam: Double = 0.0
+ var miniBatchFraction: Double = 1.0
/**
+ * Set the step size per-iteration of SGD. Default 1.0.
+ */
+ def setStepSize(step: Double): this.type = {
+ this.stepSize = step
+ this
+ }
+
+ /**
+ * Set fraction of data to be used for each SGD iteration. Default 1.0.
+ */
+ def setMiniBatchFraction(fraction: Double): this.type = {
+ this.miniBatchFraction = fraction
+ this
+ }
+
+ /**
+ * Set the number of iterations for SGD. Default 100.
+ */
+ def setNumIterations(iters: Int): this.type = {
+ this.numIterations = iters
+ this
+ }
+
+ /**
+ * Set the regularization parameter used for SGD. Default 0.0.
+ */
+ def setRegParam(regParam: Double): this.type = {
+ this.regParam = regParam
+ this
+ }
+
+ /**
+ * Set the gradient function to be used for SGD.
+ */
+ def setGradient(gradient: Gradient): this.type = {
+ this.gradient = gradient
+ this
+ }
+
+
+ /**
+ * Set the updater function to be used for SGD.
+ */
+ def setUpdater(updater: Updater): this.type = {
+ this.updater = updater
+ this
+ }
+
+ def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double])
+ : Array[Double] = {
+
+ val (weights, stochasticLossHistory) = GradientDescent.runMiniBatchSGD(
+ data,
+ gradient,
+ updater,
+ stepSize,
+ numIterations,
+ regParam,
+ miniBatchFraction,
+ initialWeights)
+ weights
+ }
+
+}
+
+object GradientDescent extends Logging {
+ /**
* Run gradient descent in parallel using mini batches.
* Based on Matlab code written by John Duchi.
*
@@ -35,7 +106,7 @@ object GradientDescent {
* @param gradient - Gradient object that will be used to compute the gradient.
* @param updater - Updater object that will be used to update the model.
* @param stepSize - stepSize to be used during update.
- * @param numIters - number of iterations that SGD should be run.
+ * @param numIterations - number of iterations that SGD should be run.
* @param regParam - regularization parameter
* @param miniBatchFraction - fraction of the input data set that should be used for
* one iteration of SGD. Default value 1.0.
@@ -49,12 +120,12 @@ object GradientDescent {
gradient: Gradient,
updater: Updater,
stepSize: Double,
- numIters: Int,
+ numIterations: Int,
regParam: Double,
- initialWeights: Array[Double],
- miniBatchFraction: Double=1.0) : (Array[Double], Array[Double]) = {
+ miniBatchFraction: Double,
+ initialWeights: Array[Double]) : (Array[Double], Array[Double]) = {
- val stochasticLossHistory = new ArrayBuffer[Double](numIters)
+ val stochasticLossHistory = new ArrayBuffer[Double](numIterations)
val nexamples: Long = data.count()
val miniBatchSize = nexamples * miniBatchFraction
@@ -63,7 +134,7 @@ object GradientDescent {
var weights = new DoubleMatrix(initialWeights.length, 1, initialWeights:_*)
var regVal = 0.0
- for (i <- 1 to numIters) {
+ for (i <- 1 to numIterations) {
val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42+i).map {
case (y, features) =>
val featuresRow = new DoubleMatrix(features.length, 1, features:_*)
@@ -76,11 +147,15 @@ object GradientDescent {
* and regVal is the regularization value computed in the previous iteration as well.
*/
stochasticLossHistory.append(lossSum / miniBatchSize + regVal)
- val update = updater.compute(weights, gradientSum.div(miniBatchSize), stepSize, i, regParam)
+ val update = updater.compute(
+ weights, gradientSum.div(miniBatchSize), stepSize, i, regParam)
weights = update._1
regVal = update._2
}
+ logInfo("GradientDescent finished. Last 10 stochastic losses %s".format(
+ stochasticLossHistory.takeRight(10).mkString(", ")))
+
(weights.toArray, stochasticLossHistory.toArray)
}
}
diff --git a/mllib/src/main/scala/spark/mllib/optimization/Optimizer.scala b/mllib/src/main/scala/spark/mllib/optimization/Optimizer.scala
new file mode 100644
index 0000000000..76a519c338
--- /dev/null
+++ b/mllib/src/main/scala/spark/mllib/optimization/Optimizer.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.optimization
+
+import spark.RDD
+
+trait Optimizer {
+
+ /**
+ * Solve the provided convex optimization problem.
+ */
+ def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double]): Array[Double]
+
+}
diff --git a/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala b/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
new file mode 100644
index 0000000000..8ea823b307
--- /dev/null
+++ b/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.regression
+
+import spark.{Logging, RDD}
+import spark.mllib.optimization._
+
+import org.jblas.DoubleMatrix
+
+/**
+ * GeneralizedLinearModel (GLM) represents a model trained using
+ * GeneralizedLinearAlgorithm. GLMs consist of a weight vector,
+ * an intercept.
+ */
+abstract class GeneralizedLinearModel(val weights: Array[Double], val intercept: Double)
+ extends Serializable {
+
+ // Create a column vector that can be used for predictions
+ private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*)
+
+ /**
+ * Predict the result given a data point and the weights learned.
+ *
+ * @param dataMatrix Row vector containing the features for this data point
+ * @param weightMatrix Column vector containing the weights of the model
+ * @param intercept Intercept of the model.
+ */
+ def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix,
+ intercept: Double): Double
+
+ def predict(testData: spark.RDD[Array[Double]]): RDD[Double] = {
+ // A small optimization to avoid serializing the entire model. Only the weightsMatrix
+ // and intercept is needed.
+ val localWeights = weightsMatrix
+ val localIntercept = intercept
+
+ testData.map { x =>
+ val dataMatrix = new DoubleMatrix(1, x.length, x:_*)
+ predictPoint(dataMatrix, localWeights, localIntercept)
+ }
+ }
+
+ def predict(testData: Array[Double]): Double = {
+ val dataMat = new DoubleMatrix(1, testData.length, testData:_*)
+ predictPoint(dataMat, weightsMatrix, intercept)
+ }
+}
+
+/**
+ * GeneralizedLinearAlgorithm abstracts out the training for all GLMs.
+ * This class should be extended with an Optimizer to create a new GLM.
+ */
+abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
+ extends Logging with Serializable {
+
+ val optimizer: Optimizer
+
+ def createModel(weights: Array[Double], intercept: Double): M
+
+ var addIntercept: Boolean
+
+ /**
+ * Set if the algorithm should add an intercept. Default true.
+ */
+ def setIntercept(addIntercept: Boolean): this.type = {
+ this.addIntercept = addIntercept
+ this
+ }
+
+ def run(input: RDD[LabeledPoint]) : M = {
+ val nfeatures: Int = input.first().features.length
+ val initialWeights = Array.fill(nfeatures)(1.0)
+ run(input, initialWeights)
+ }
+
+ def run(input: RDD[LabeledPoint], initialWeights: Array[Double]) : M = {
+
+ // Add a extra variable consisting of all 1.0's for the intercept.
+ val data = if (addIntercept) {
+ input.map(labeledPoint => (labeledPoint.label, Array(1.0, labeledPoint.features:_*)))
+ } else {
+ input.map(labeledPoint => (labeledPoint.label, labeledPoint.features))
+ }
+
+ val initialWeightsWithIntercept = if (addIntercept) {
+ Array(1.0, initialWeights:_*)
+ } else {
+ initialWeights
+ }
+
+ val weights = optimizer.optimize(data, initialWeightsWithIntercept)
+ val intercept = weights(0)
+ val weightsScaled = weights.tail
+
+ val model = createModel(weightsScaled, intercept)
+
+ logInfo("Final model weights " + model.weights.mkString(","))
+ logInfo("Final model intercept " + model.intercept)
+ model
+ }
+}
diff --git a/mllib/src/main/scala/spark/mllib/regression/LabeledPoint.scala b/mllib/src/main/scala/spark/mllib/regression/LabeledPoint.scala
new file mode 100644
index 0000000000..3de60482c5
--- /dev/null
+++ b/mllib/src/main/scala/spark/mllib/regression/LabeledPoint.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.regression
+
+/**
+ * Class that represents the features and labels of a data point.
+ *
+ * @param label Label for this data point.
+ * @param features List of features for this data point.
+ */
+case class LabeledPoint(val label: Double, val features: Array[Double])
diff --git a/mllib/src/main/scala/spark/mllib/regression/Lasso.scala b/mllib/src/main/scala/spark/mllib/regression/Lasso.scala
index e8b1ed8a48..989e5ded58 100644
--- a/mllib/src/main/scala/spark/mllib/regression/Lasso.scala
+++ b/mllib/src/main/scala/spark/mllib/regression/Lasso.scala
@@ -28,117 +28,48 @@ import org.jblas.DoubleMatrix
*
*/
class LassoModel(
- val weights: Array[Double],
- val intercept: Double,
- val stochasticLosses: Array[Double]) extends RegressionModel {
-
- // Create a column vector that can be used for predictions
- private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*)
-
- override def predict(testData: spark.RDD[Array[Double]]) = {
- // A small optimization to avoid serializing the entire model. Only the weightsMatrix
- // and intercept is needed.
- val localWeights = weightsMatrix
- val localIntercept = intercept
- testData.map { x =>
- new DoubleMatrix(1, x.length, x:_*).dot(localWeights) + localIntercept
- }
- }
-
-
- override def predict(testData: Array[Double]): Double = {
- val dataMat = new DoubleMatrix(1, testData.length, testData:_*)
- dataMat.dot(weightsMatrix) + this.intercept
+ override val weights: Array[Double],
+ override val intercept: Double)
+ extends GeneralizedLinearModel(weights, intercept)
+ with RegressionModel with Serializable {
+
+ override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix,
+ intercept: Double) = {
+ dataMatrix.dot(weightMatrix) + intercept
}
}
-class LassoLocalRandomSGD private (var stepSize: Double, var regParam: Double,
- var miniBatchFraction: Double, var numIters: Int)
- extends Logging {
+class LassoWithSGD (
+ var stepSize: Double,
+ var numIterations: Int,
+ var regParam: Double,
+ var miniBatchFraction: Double,
+ var addIntercept: Boolean)
+ extends GeneralizedLinearAlgorithm[LassoModel]
+ with Serializable {
- /**
- * Construct a Lasso object with default parameters
- */
- def this() = this(1.0, 1.0, 1.0, 100)
+ val gradient = new SquaredGradient()
+ val updater = new L1Updater()
+ val optimizer = new GradientDescent(gradient, updater).setStepSize(stepSize)
+ .setNumIterations(numIterations)
+ .setRegParam(regParam)
+ .setMiniBatchFraction(miniBatchFraction)
/**
- * Set the step size per-iteration of SGD. Default 1.0.
- */
- def setStepSize(step: Double) = {
- this.stepSize = step
- this
- }
-
- /**
- * Set the regularization parameter. Default 1.0.
- */
- def setRegParam(param: Double) = {
- this.regParam = param
- this
- }
-
- /**
- * Set fraction of data to be used for each SGD iteration. Default 1.0.
- */
- def setMiniBatchFraction(fraction: Double) = {
- this.miniBatchFraction = fraction
- this
- }
-
- /**
- * Set the number of iterations for SGD. Default 100.
+ * Construct a Lasso object with default parameters
*/
- def setNumIterations(iters: Int) = {
- this.numIters = iters
- this
- }
-
- def train(input: RDD[(Double, Array[Double])]): LassoModel = {
- val nfeatures: Int = input.take(1)(0)._2.length
- val initialWeights = Array.fill(nfeatures)(1.0)
- train(input, initialWeights)
- }
-
- def train(
- input: RDD[(Double, Array[Double])],
- initialWeights: Array[Double]): LassoModel = {
+ def this() = this(1.0, 100, 1.0, 1.0, true)
- // Add a extra variable consisting of all 1.0's for the intercept.
- val data = input.map { case (y, features) =>
- (y, Array(1.0, features:_*))
- }
-
- val initalWeightsWithIntercept = Array(1.0, initialWeights:_*)
-
- val (weights, stochasticLosses) = GradientDescent.runMiniBatchSGD(
- data,
- new SquaredGradient(),
- new L1Updater(),
- stepSize,
- numIters,
- regParam,
- initalWeightsWithIntercept,
- miniBatchFraction)
-
- val intercept = weights(0)
- val weightsScaled = weights.tail
-
- val model = new LassoModel(weightsScaled, intercept, stochasticLosses)
-
- logInfo("Final model weights " + model.weights.mkString(","))
- logInfo("Final model intercept " + model.intercept)
- logInfo("Last 10 stochasticLosses " + model.stochasticLosses.takeRight(10).mkString(", "))
- model
+ def createModel(weights: Array[Double], intercept: Double) = {
+ new LassoModel(weights, intercept)
}
}
/**
* Top-level methods for calling Lasso.
- *
- *
*/
-object LassoLocalRandomSGD {
+object LassoWithSGD {
/**
* Train a Lasso model given an RDD of (label, features) pairs. We run a fixed number
@@ -155,7 +86,7 @@ object LassoLocalRandomSGD {
* the number of features in the data.
*/
def train(
- input: RDD[(Double, Array[Double])],
+ input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
regParam: Double,
@@ -163,8 +94,8 @@ object LassoLocalRandomSGD {
initialWeights: Array[Double])
: LassoModel =
{
- new LassoLocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train(
- input, initialWeights)
+ new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).run(input,
+ initialWeights)
}
/**
@@ -179,14 +110,14 @@ object LassoLocalRandomSGD {
* @param miniBatchFraction Fraction of data to be used per iteration.
*/
def train(
- input: RDD[(Double, Array[Double])],
+ input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
regParam: Double,
miniBatchFraction: Double)
: LassoModel =
{
- new LassoLocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train(input)
+ new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).run(input)
}
/**
@@ -201,7 +132,7 @@ object LassoLocalRandomSGD {
* @return a LassoModel which has the weights and offset from training.
*/
def train(
- input: RDD[(Double, Array[Double])],
+ input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
regParam: Double)
@@ -220,7 +151,7 @@ object LassoLocalRandomSGD {
* @return a LassoModel which has the weights and offset from training.
*/
def train(
- input: RDD[(Double, Array[Double])],
+ input: RDD[LabeledPoint],
numIterations: Int)
: LassoModel =
{
@@ -234,7 +165,7 @@ object LassoLocalRandomSGD {
}
val sc = new SparkContext(args(0), "Lasso")
val data = MLUtils.loadLabeledData(sc, args(1))
- val model = LassoLocalRandomSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble)
+ val model = LassoWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble)
sc.stop()
}
diff --git a/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala b/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala
index 6ba141e8fb..de790dde51 100644
--- a/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala
+++ b/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala
@@ -71,7 +71,8 @@ class RidgeRegression private (var lambdaLow: Double, var lambdaHigh: Double)
this
}
- def train(input: RDD[(Double, Array[Double])]): RidgeRegressionModel = {
+ def train(inputLabeled: RDD[LabeledPoint]): RidgeRegressionModel = {
+ val input = inputLabeled.map(labeledPoint => (labeledPoint.label, labeledPoint.features))
val nfeatures: Int = input.take(1)(0)._2.length
val nexamples: Long = input.count()
@@ -183,7 +184,7 @@ object RidgeRegression {
* @param lambdaHigh upper bound used in binary search for lambda
*/
def train(
- input: RDD[(Double, Array[Double])],
+ input: RDD[LabeledPoint],
lambdaLow: Double,
lambdaHigh: Double)
: RidgeRegressionModel =
@@ -199,7 +200,7 @@ object RidgeRegression {
*
* @param input RDD of (response, array of features) pairs.
*/
- def train(input: RDD[(Double, Array[Double])]) : RidgeRegressionModel = {
+ def train(input: RDD[LabeledPoint]) : RidgeRegressionModel = {
train(input, 0.0, 100.0)
}
diff --git a/mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala
index ef4f42a494..1f185c9de7 100644
--- a/mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala
+++ b/mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala
@@ -29,14 +29,14 @@ object LassoGenerator {
val trueWeights = new DoubleMatrix(1, nfeatures+1,
Array.fill[Double](nfeatures + 1) { globalRnd.nextGaussian() }:_*)
- val data: RDD[(Double, Array[Double])] = sc.parallelize(0 until nexamples, parts).map { idx =>
+ val data: RDD[LabeledPoint] = sc.parallelize(0 until nexamples, parts).map { idx =>
val rnd = new Random(42 + idx)
val x = Array.fill[Double](nfeatures) {
rnd.nextDouble() * 2.0 - 1.0
}
val y = (new DoubleMatrix(1, x.length, x:_*)).dot(trueWeights) + rnd.nextGaussian() * 0.1
- (y, x)
+ LabeledPoint(y, x)
}
MLUtils.saveLabeledData(data, outputPath)
diff --git a/mllib/src/main/scala/spark/mllib/util/LogisticRegressionDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/LogisticRegressionDataGenerator.scala
index 8d659cd97c..4fa19c3c23 100644
--- a/mllib/src/main/scala/spark/mllib/util/LogisticRegressionDataGenerator.scala
+++ b/mllib/src/main/scala/spark/mllib/util/LogisticRegressionDataGenerator.scala
@@ -20,6 +20,7 @@ package spark.mllib.util
import scala.util.Random
import spark.{RDD, SparkContext}
+import spark.mllib.regression.LabeledPoint
object LogisticRegressionDataGenerator {
@@ -40,7 +41,7 @@ object LogisticRegressionDataGenerator {
nfeatures: Int,
eps: Double,
nparts: Int = 2,
- probOne: Double = 0.5): RDD[(Double, Array[Double])] = {
+ probOne: Double = 0.5): RDD[LabeledPoint] = {
val data = sc.parallelize(0 until nexamples, nparts).map { idx =>
val rnd = new Random(42 + idx)
@@ -48,7 +49,7 @@ object LogisticRegressionDataGenerator {
val x = Array.fill[Double](nfeatures) {
rnd.nextGaussian() + (y * eps)
}
- (y, x)
+ LabeledPoint(y, x)
}
data
}
diff --git a/mllib/src/main/scala/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/spark/mllib/util/MLUtils.scala
index 25d9673004..9174e8cea7 100644
--- a/mllib/src/main/scala/spark/mllib/util/MLUtils.scala
+++ b/mllib/src/main/scala/spark/mllib/util/MLUtils.scala
@@ -21,6 +21,7 @@ import spark.{RDD, SparkContext}
import spark.SparkContext._
import org.jblas.DoubleMatrix
+import spark.mllib.regression.LabeledPoint
/**
* Helper methods to load and save data
@@ -36,17 +37,17 @@ object MLUtils {
* @return An RDD of tuples. For each tuple, the first element is the label, and the second
* element represents the feature values (an array of Double).
*/
- def loadLabeledData(sc: SparkContext, dir: String): RDD[(Double, Array[Double])] = {
+ def loadLabeledData(sc: SparkContext, dir: String): RDD[LabeledPoint] = {
sc.textFile(dir).map { line =>
val parts = line.split(',')
val label = parts(0).toDouble
val features = parts(1).trim().split(' ').map(_.toDouble)
- (label, features)
+ LabeledPoint(label, features)
}
}
- def saveLabeledData(data: RDD[(Double, Array[Double])], dir: String) {
- val dataStr = data.map(x => x._1 + "," + x._2.mkString(" "))
+ def saveLabeledData(data: RDD[LabeledPoint], dir: String) {
+ val dataStr = data.map(x => x.label + "," + x.features.mkString(" "))
dataStr.saveAsTextFile(dir)
}
diff --git a/mllib/src/main/scala/spark/mllib/util/RidgeRegressionDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/RidgeRegressionDataGenerator.scala
index c5b8a29942..c4d65c3f9a 100644
--- a/mllib/src/main/scala/spark/mllib/util/RidgeRegressionDataGenerator.scala
+++ b/mllib/src/main/scala/spark/mllib/util/RidgeRegressionDataGenerator.scala
@@ -22,6 +22,7 @@ import scala.util.Random
import org.jblas.DoubleMatrix
import spark.{RDD, SparkContext}
+import spark.mllib.regression.LabeledPoint
object RidgeRegressionDataGenerator {
@@ -41,14 +42,14 @@ object RidgeRegressionDataGenerator {
nexamples: Int,
nfeatures: Int,
eps: Double,
- nparts: Int = 2) : RDD[(Double, Array[Double])] = {
+ nparts: Int = 2) : RDD[LabeledPoint] = {
org.jblas.util.Random.seed(42)
// Random values distributed uniformly in [-0.5, 0.5]
val w = DoubleMatrix.rand(nfeatures, 1).subi(0.5)
w.put(0, 0, 10)
w.put(1, 0, 10)
- val data: RDD[(Double, Array[Double])] = sc.parallelize(0 until nparts, nparts).flatMap { p =>
+ val data: RDD[LabeledPoint] = sc.parallelize(0 until nparts, nparts).flatMap { p =>
org.jblas.util.Random.seed(42 + p)
val examplesInPartition = nexamples / nparts
@@ -61,7 +62,7 @@ object RidgeRegressionDataGenerator {
val yObs = new DoubleMatrix(normalValues).addi(y)
Iterator.tabulate(examplesInPartition) { i =>
- (yObs.get(i, 0), X.getRow(i).toArray)
+ LabeledPoint(yObs.get(i, 0), X.getRow(i).toArray)
}
}
data
diff --git a/mllib/src/main/scala/spark/mllib/util/SVMDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/SVMDataGenerator.scala
index 00a54d9a70..a37f6eb3b3 100644
--- a/mllib/src/main/scala/spark/mllib/util/SVMDataGenerator.scala
+++ b/mllib/src/main/scala/spark/mllib/util/SVMDataGenerator.scala
@@ -9,6 +9,7 @@ import spark.{RDD, SparkContext}
import spark.mllib.util.MLUtils
import org.jblas.DoubleMatrix
+import spark.mllib.regression.LabeledPoint
object SVMGenerator {
@@ -32,14 +33,14 @@ object SVMGenerator {
val trueWeights = new DoubleMatrix(1, nfeatures+1,
Array.fill[Double](nfeatures + 1) { globalRnd.nextGaussian() }:_*)
- val data: RDD[(Double, Array[Double])] = sc.parallelize(0 until nexamples, parts).map { idx =>
+ val data: RDD[LabeledPoint] = sc.parallelize(0 until nexamples, parts).map { idx =>
val rnd = new Random(42 + idx)
val x = Array.fill[Double](nfeatures) {
rnd.nextDouble() * 2.0 - 1.0
}
val y = signum((new DoubleMatrix(1, x.length, x:_*)).dot(trueWeights) + rnd.nextGaussian() * 0.1)
- (y, x)
+ LabeledPoint(y, x)
}
MLUtils.saveLabeledData(data, outputPath)