aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorShivaram Venkataraman <shivaram@eecs.berkeley.edu>2013-08-02 19:15:34 -0700
committerShivaram Venkataraman <shivaram@eecs.berkeley.edu>2013-08-02 19:15:34 -0700
commit00339cc0328b692417ad0b9aeb23d522edbb93e5 (patch)
tree2042d3aced47d4af5d252ef6677130fd5860d02c /mllib
parentcef178873b04960c36647d9899fcd13715fef62c (diff)
downloadspark-00339cc0328b692417ad0b9aeb23d522edbb93e5.tar.gz
spark-00339cc0328b692417ad0b9aeb23d522edbb93e5.tar.bz2
spark-00339cc0328b692417ad0b9aeb23d522edbb93e5.zip
Refactor optimizers and create GLMs
This change refactors the structure of GLMs to use mixins which maintain a similar interface to other ML lib algorithms. This change also creates an Optimizer trait which allows GLMs to be extended to use other optimization techniques.
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/spark/mllib/GradientDescentOpts.scala76
-rw-r--r--mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala93
-rw-r--r--mllib/src/main/scala/spark/mllib/classification/SVM.scala88
-rw-r--r--mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala80
-rw-r--r--mllib/src/main/scala/spark/mllib/optimization/Optimizer.scala29
-rw-r--r--mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala125
-rw-r--r--mllib/src/main/scala/spark/mllib/regression/Lasso.scala89
-rw-r--r--mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala6
-rw-r--r--mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala9
-rw-r--r--mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala11
10 files changed, 320 insertions, 286 deletions
diff --git a/mllib/src/main/scala/spark/mllib/GradientDescentOpts.scala b/mllib/src/main/scala/spark/mllib/GradientDescentOpts.scala
deleted file mode 100644
index d9c2be2a19..0000000000
--- a/mllib/src/main/scala/spark/mllib/GradientDescentOpts.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.mllib.optimization
-
-/**
- * Class used to configure options used for GradientDescent based optimization
- * algorithms.
- */
-
-class GradientDescentOpts private (
- var stepSize: Double,
- var numIters: Int,
- var regParam: Double,
- var miniBatchFraction: Double) {
-
- def this() = this(1.0, 100, 0.0, 1.0)
-
- /**
- * Set the step size per-iteration of SGD. Default 1.0.
- */
- def setStepSize(step: Double) = {
- this.stepSize = step
- this
- }
-
- /**
- * Set fraction of data to be used for each SGD iteration. Default 1.0.
- */
- def setMiniBatchFraction(fraction: Double) = {
- this.miniBatchFraction = fraction
- this
- }
-
- /**
- * Set the number of iterations for SGD. Default 100.
- */
- def setNumIterations(iters: Int) = {
- this.numIters = iters
- this
- }
-
- /**
- * Set the regularization parameter used for SGD. Default 0.0.
- */
- def setRegParam(regParam: Double) = {
- this.regParam = regParam
- this
- }
-}
-
-object GradientDescentOpts {
-
- def apply(stepSize: Double, numIters: Int, regParam: Double, miniBatchFraction: Double) = {
- new GradientDescentOpts(stepSize, numIters, regParam, miniBatchFraction)
- }
-
- def apply() = {
- new GradientDescentOpts()
- }
-
-}
diff --git a/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala
index bc711fd2d8..0af99c616d 100644
--- a/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala
+++ b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala
@@ -19,6 +19,7 @@ package spark.mllib.classification
import spark.{Logging, RDD, SparkContext}
import spark.mllib.optimization._
+import spark.mllib.regression._
import spark.mllib.util.MLUtils
import scala.math.round
@@ -30,80 +31,46 @@ import org.jblas.DoubleMatrix
* Based on Matlab code written by John Duchi.
*/
class LogisticRegressionModel(
- val weights: Array[Double],
- val intercept: Double,
- val stochasticLosses: Array[Double]) extends ClassificationModel {
-
- // Create a column vector that can be used for predictions
- private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*)
-
- override def predict(testData: spark.RDD[Array[Double]]): RDD[Int] = {
- // A small optimization to avoid serializing the entire model. Only the weightsMatrix
- // and intercept is needed.
- val localWeights = weightsMatrix
- val localIntercept = intercept
- testData.map { x =>
- val margin = new DoubleMatrix(1, x.length, x:_*).mmul(localWeights).get(0) + localIntercept
- round(1.0/ (1.0 + math.exp(margin * -1))).toInt
- }
- }
-
- override def predict(testData: Array[Double]): Int = {
- val dataMat = new DoubleMatrix(1, testData.length, testData:_*)
- val margin = dataMat.mmul(weightsMatrix).get(0) + this.intercept
+ override val weights: Array[Double],
+ override val intercept: Double)
+ extends GeneralizedLinearModel[Int](weights, intercept)
+ with ClassificationModel with Serializable {
+
+ override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix,
+ intercept: Double) = {
+ val margin = dataMatrix.mmul(weightMatrix).get(0) + intercept
round(1.0/ (1.0 + math.exp(margin * -1))).toInt
}
}
-class LogisticRegression(val opts: GradientDescentOpts) extends Logging {
+class LogisticRegressionWithSGD (
+ var stepSize: Double,
+ var numIterations: Int,
+ var regParam: Double,
+ var miniBatchFraction: Double,
+ var addIntercept: Boolean)
+ extends GeneralizedLinearAlgorithm[Int, LogisticRegressionModel]
+ with GradientDescent with Serializable {
+
+ val gradient = new LogisticGradient()
+ val updater = new SimpleUpdater()
/**
* Construct a LogisticRegression object with default parameters
*/
- def this() = this(new GradientDescentOpts())
-
- def train(input: RDD[(Int, Array[Double])]): LogisticRegressionModel = {
- val nfeatures: Int = input.take(1)(0)._2.length
- val initialWeights = Array.fill(nfeatures)(1.0)
- train(input, initialWeights)
- }
-
- def train(
- input: RDD[(Int, Array[Double])],
- initialWeights: Array[Double]): LogisticRegressionModel = {
-
- // Add a extra variable consisting of all 1.0's for the intercept.
- val data = input.map { case (y, features) =>
- (y.toDouble, Array(1.0, features:_*))
- }
-
- val initalWeightsWithIntercept = Array(1.0, initialWeights:_*)
-
- val (weights, stochasticLosses) = GradientDescent.runMiniBatchSGD(
- data,
- new LogisticGradient(),
- new SimpleUpdater(),
- opts,
- initalWeightsWithIntercept)
-
- val intercept = weights(0)
- val weightsScaled = weights.tail
-
- val model = new LogisticRegressionModel(weightsScaled, intercept, stochasticLosses)
+ def this() = this(1.0, 100, 0.0, 1.0, true)
- logInfo("Final model weights " + model.weights.mkString(","))
- logInfo("Final model intercept " + model.intercept)
- logInfo("Last 10 stochastic losses " + model.stochasticLosses.takeRight(10).mkString(", "))
- model
+ def createModel(weights: Array[Double], intercept: Double) = {
+ new LogisticRegressionModel(weights, intercept)
}
}
/**
* Top-level methods for calling Logistic Regression.
- * NOTE(shivaram): We use multiple train methods instead of default arguments to support
+ * NOTE(shivaram): We use multiple train methods instead of default arguments to support
* Java programs.
*/
-object LogisticRegression {
+object LogisticRegressionWithSGD {
/**
* Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed
@@ -126,8 +93,8 @@ object LogisticRegression {
initialWeights: Array[Double])
: LogisticRegressionModel =
{
- val sgdOpts = GradientDescentOpts(stepSize, numIterations, 0.0, miniBatchFraction)
- new LogisticRegression(sgdOpts).train(input, initialWeights)
+ new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction, true).train(
+ input, initialWeights)
}
/**
@@ -148,8 +115,8 @@ object LogisticRegression {
miniBatchFraction: Double)
: LogisticRegressionModel =
{
- val sgdOpts = GradientDescentOpts(stepSize, numIterations, 0.0, miniBatchFraction)
- new LogisticRegression(sgdOpts).train(input)
+ new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction, true).train(
+ input)
}
/**
@@ -197,7 +164,7 @@ object LogisticRegression {
}
val sc = new SparkContext(args(0), "LogisticRegression")
val data = MLUtils.loadLabeledData(sc, args(1)).map(yx => (yx._1.toInt, yx._2))
- val model = LogisticRegression.train(
+ val model = LogisticRegressionWithSGD.train(
data, args(4).toInt, args(2).toDouble, args(3).toDouble)
sc.stop()
diff --git a/mllib/src/main/scala/spark/mllib/classification/SVM.scala b/mllib/src/main/scala/spark/mllib/classification/SVM.scala
index 1c137168b6..caf9e3cb93 100644
--- a/mllib/src/main/scala/spark/mllib/classification/SVM.scala
+++ b/mllib/src/main/scala/spark/mllib/classification/SVM.scala
@@ -20,6 +20,7 @@ package spark.mllib.classification
import scala.math.signum
import spark.{Logging, RDD, SparkContext}
import spark.mllib.optimization._
+import spark.mllib.regression._
import spark.mllib.util.MLUtils
import org.jblas.DoubleMatrix
@@ -28,78 +29,42 @@ import org.jblas.DoubleMatrix
* SVM using Stochastic Gradient Descent.
*/
class SVMModel(
- val weights: Array[Double],
- val intercept: Double,
- val stochasticLosses: Array[Double]) extends ClassificationModel {
-
- // Create a column vector that can be used for predictions
- private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*)
-
- override def predict(testData: spark.RDD[Array[Double]]): RDD[Int] = {
- // A small optimization to avoid serializing the entire model. Only the weightsMatrix
- // and intercept is needed.
- val localWeights = weightsMatrix
- val localIntercept = intercept
- testData.map { x =>
- signum(new DoubleMatrix(1, x.length, x:_*).dot(localWeights) + localIntercept).toInt
- }
- }
-
- override def predict(testData: Array[Double]): Int = {
- val dataMat = new DoubleMatrix(1, testData.length, testData:_*)
- signum(dataMat.dot(weightsMatrix) + this.intercept).toInt
+ override val weights: Array[Double],
+ override val intercept: Double)
+ extends GeneralizedLinearModel[Int](weights, intercept)
+ with ClassificationModel with Serializable {
+
+ override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix,
+ intercept: Double) = {
+ signum(dataMatrix.dot(weightMatrix) + intercept).toInt
}
}
+class SVMWithSGD private (
+ var stepSize: Double,
+ var numIterations: Int,
+ var regParam: Double,
+ var miniBatchFraction: Double,
+ var addIntercept: Boolean)
+ extends GeneralizedLinearAlgorithm[Int, SVMModel] with GradientDescent with Serializable {
-
-class SVM(val opts: GradientDescentOpts) extends Logging {
+ val gradient = new HingeGradient()
+ val updater = new SquaredL2Updater()
/**
* Construct a SVM object with default parameters
*/
- def this() = this(GradientDescentOpts(1.0, 100, 1.0, 1.0))
-
- def train(input: RDD[(Int, Array[Double])]): SVMModel = {
- val nfeatures: Int = input.take(1)(0)._2.length
- val initialWeights = Array.fill(nfeatures)(1.0)
- train(input, initialWeights)
- }
-
- def train(
- input: RDD[(Int, Array[Double])],
- initialWeights: Array[Double]): SVMModel = {
-
- // Add a extra variable consisting of all 1.0's for the intercept.
- val data = input.map { case (y, features) =>
- (y.toDouble, Array(1.0, features:_*))
- }
-
- val initalWeightsWithIntercept = Array(1.0, initialWeights:_*)
-
- val (weights, stochasticLosses) = GradientDescent.runMiniBatchSGD(
- data,
- new HingeGradient(),
- new SquaredL2Updater(),
- opts,
- initalWeightsWithIntercept)
-
- val intercept = weights(0)
- val weightsScaled = weights.tail
-
- val model = new SVMModel(weightsScaled, intercept, stochasticLosses)
+ def this() = this(1.0, 100, 1.0, 1.0, true)
- logInfo("Final model weights " + model.weights.mkString(","))
- logInfo("Final model intercept " + model.intercept)
- logInfo("Last 10 stochasticLosses " + model.stochasticLosses.takeRight(10).mkString(", "))
- model
+ def createModel(weights: Array[Double], intercept: Double) = {
+ new SVMModel(weights, intercept)
}
}
/**
* Top-level methods for calling SVM.
*/
-object SVM {
+object SVMWithSGD {
/**
* Train a SVM model given an RDD of (label, features) pairs. We run a fixed number
@@ -124,8 +89,8 @@ object SVM {
initialWeights: Array[Double])
: SVMModel =
{
- val sgdOpts = GradientDescentOpts(stepSize, numIterations, regParam, miniBatchFraction)
- new SVM(sgdOpts).train(input, initialWeights)
+ new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).train(input,
+ initialWeights)
}
/**
@@ -147,8 +112,7 @@ object SVM {
miniBatchFraction: Double)
: SVMModel =
{
- val sgdOpts = GradientDescentOpts(stepSize, numIterations, regParam, miniBatchFraction)
- new SVM(sgdOpts).train(input)
+ new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).train(input)
}
/**
@@ -196,7 +160,7 @@ object SVM {
}
val sc = new SparkContext(args(0), "SVM")
val data = MLUtils.loadLabeledData(sc, args(1)).map(yx => (yx._1.toInt, yx._2))
- val model = SVM.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble)
+ val model = SVMWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble)
sc.stop()
}
diff --git a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala
index 67451ff053..f7d09a2bd3 100644
--- a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala
+++ b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala
@@ -24,8 +24,66 @@ import org.jblas.DoubleMatrix
import scala.collection.mutable.ArrayBuffer
-object GradientDescent {
+trait GradientDescent extends Optimizer {
+ val gradient: Gradient
+ val updater: Updater
+
+ var stepSize: Double
+ var numIterations: Int
+ var regParam: Double
+ var miniBatchFraction: Double
+
+ /**
+ * Set the step size per-iteration of SGD. Default 1.0.
+ */
+ def setStepSize(step: Double): this.type = {
+ this.stepSize = step
+ this
+ }
+
+ /**
+ * Set fraction of data to be used for each SGD iteration. Default 1.0.
+ */
+ def setMiniBatchFraction(fraction: Double): this.type = {
+ this.miniBatchFraction = fraction
+ this
+ }
+
+ /**
+ * Set the number of iterations for SGD. Default 100.
+ */
+ def setNumIterations(iters: Int): this.type = {
+ this.numIterations = iters
+ this
+ }
+
+ /**
+ * Set the regularization parameter used for SGD. Default 0.0.
+ */
+ def setRegParam(regParam: Double): this.type = {
+ this.regParam = regParam
+ this
+ }
+
+ def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double])
+ : Array[Double] = {
+
+ val (weights, stochasticLossHistory) = GradientDescent.runMiniBatchSGD(
+ data,
+ gradient,
+ updater,
+ stepSize,
+ numIterations,
+ regParam,
+ miniBatchFraction,
+ initialWeights)
+ weights
+ }
+
+}
+
+object GradientDescent extends Logging {
/**
* Run gradient descent in parallel using mini batches.
* Based on Matlab code written by John Duchi.
@@ -34,7 +92,7 @@ object GradientDescent {
* @param gradient - Gradient object that will be used to compute the gradient.
* @param updater - Updater object that will be used to update the model.
* @param stepSize - stepSize to be used during update.
- * @param numIters - number of iterations that SGD should be run.
+ * @param numIterations - number of iterations that SGD should be run.
* @param regParam - regularization parameter
* @param miniBatchFraction - fraction of the input data set that should be used for
* one iteration of SGD. Default value 1.0.
@@ -47,20 +105,23 @@ object GradientDescent {
data: RDD[(Double, Array[Double])],
gradient: Gradient,
updater: Updater,
- opts: GradientDescentOpts,
+ stepSize: Double,
+ numIterations: Int,
+ regParam: Double,
+ miniBatchFraction: Double,
initialWeights: Array[Double]) : (Array[Double], Array[Double]) = {
- val stochasticLossHistory = new ArrayBuffer[Double](opts.numIters)
+ val stochasticLossHistory = new ArrayBuffer[Double](numIterations)
val nexamples: Long = data.count()
- val miniBatchSize = nexamples * opts.miniBatchFraction
+ val miniBatchSize = nexamples * miniBatchFraction
// Initialize weights as a column vector
var weights = new DoubleMatrix(initialWeights.length, 1, initialWeights:_*)
var regVal = 0.0
- for (i <- 1 to opts.numIters) {
- val (gradientSum, lossSum) = data.sample(false, opts.miniBatchFraction, 42+i).map {
+ for (i <- 1 to numIterations) {
+ val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42+i).map {
case (y, features) =>
val featuresRow = new DoubleMatrix(features.length, 1, features:_*)
val (grad, loss) = gradient.compute(featuresRow, y, weights)
@@ -73,11 +134,14 @@ object GradientDescent {
*/
stochasticLossHistory.append(lossSum / miniBatchSize + regVal)
val update = updater.compute(
- weights, gradientSum.div(miniBatchSize), opts.stepSize, i, opts.regParam)
+ weights, gradientSum.div(miniBatchSize), stepSize, i, regParam)
weights = update._1
regVal = update._2
}
+ logInfo("GradientDescent finished. Last 10 stochastic losses %s".format(
+ stochasticLossHistory.takeRight(10).mkString(", ")))
+
(weights.toArray, stochasticLossHistory.toArray)
}
}
diff --git a/mllib/src/main/scala/spark/mllib/optimization/Optimizer.scala b/mllib/src/main/scala/spark/mllib/optimization/Optimizer.scala
new file mode 100644
index 0000000000..76a519c338
--- /dev/null
+++ b/mllib/src/main/scala/spark/mllib/optimization/Optimizer.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.optimization
+
+import spark.RDD
+
+trait Optimizer {
+
+ /**
+ * Solve the provided convex optimization problem.
+ */
+ def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double]): Array[Double]
+
+}
diff --git a/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala b/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
new file mode 100644
index 0000000000..0bbc9424e6
--- /dev/null
+++ b/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.regression
+
+import spark.{Logging, RDD, SparkContext, SparkException}
+import spark.mllib.optimization._
+import spark.mllib.util.MLUtils
+
+import scala.math.round
+
+import org.jblas.DoubleMatrix
+
+/**
+ * GeneralizedLinearModel (GLM) represents a model trained using
+ * GeneralizedLinearAlgorithm. GLMs consist of a weight vector,
+ * an intercept.
+ */
+abstract class GeneralizedLinearModel[T: ClassManifest](
+ val weights: Array[Double],
+ val intercept: Double)
+ extends Serializable {
+
+ // Create a column vector that can be used for predictions
+ private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*)
+
+ def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix,
+ intercept: Double): T
+
+ def predict(testData: spark.RDD[Array[Double]]): RDD[T] = {
+ // A small optimization to avoid serializing the entire model. Only the weightsMatrix
+ // and intercept is needed.
+ val localWeights = weightsMatrix
+ val localIntercept = intercept
+
+ testData.map { x =>
+ val dataMatrix = new DoubleMatrix(1, x.length, x:_*)
+ predictPoint(dataMatrix, localWeights, localIntercept)
+ }
+ }
+
+ def predict(testData: Array[Double]): T = {
+ val dataMat = new DoubleMatrix(1, testData.length, testData:_*)
+ predictPoint(dataMat, weightsMatrix, intercept)
+ }
+}
+
+/**
+ * GeneralizedLinearAlgorithm abstracts out the training for all GLMs.
+ * This class should be mixed in with an Optimizer to create a new GLM.
+ *
+ * NOTE(shivaram): This is an abstract class rather than a trait as we use
+ * a view bound to convert labels to Double.
+ */
+abstract class GeneralizedLinearAlgorithm[T <% Double, M <: GeneralizedLinearModel[T]]
+ extends Logging with Serializable {
+
+ // We need an optimizer mixin to solve the GLM
+ self : Optimizer =>
+
+ var addIntercept: Boolean
+
+ def createModel(weights: Array[Double], intercept: Double): M
+
+ /**
+ * Set if the algorithm should add an intercept. Default true.
+ */
+ def setIntercept(addIntercept: Boolean): this.type = {
+ this.addIntercept = addIntercept
+ this
+ }
+
+ def train(input: RDD[(T, Array[Double])])(implicit mt: Manifest[T]) : M = {
+ val nfeatures: Int = input.take(1)(0)._2.length
+ val initialWeights = Array.fill(nfeatures)(1.0)
+ train(input, initialWeights)
+ }
+
+ def train(
+ input: RDD[(T, Array[Double])],
+ initialWeights: Array[Double])(implicit mt: Manifest[T])
+ : M = {
+
+ // Add a extra variable consisting of all 1.0's for the intercept.
+ val data = if (addIntercept) {
+ input.map { case (y, features) =>
+ (y.toDouble, Array(1.0, features:_*))
+ }
+ } else {
+ input.map { case (y, features) =>
+ (y.toDouble, features)
+ }
+ }
+
+ val initialWeightsWithIntercept = if (addIntercept) {
+ Array(1.0, initialWeights:_*)
+ } else {
+ initialWeights
+ }
+
+ val weights = optimize(data, initialWeightsWithIntercept)
+ val intercept = weights(0)
+ val weightsScaled = weights.tail
+
+ val model = createModel(weightsScaled, intercept)
+
+ logInfo("Final model weights " + model.weights.mkString(","))
+ logInfo("Final model intercept " + model.intercept)
+ model
+ }
+}
diff --git a/mllib/src/main/scala/spark/mllib/regression/Lasso.scala b/mllib/src/main/scala/spark/mllib/regression/Lasso.scala
index 7f6fa8025c..f8b15033aa 100644
--- a/mllib/src/main/scala/spark/mllib/regression/Lasso.scala
+++ b/mllib/src/main/scala/spark/mllib/regression/Lasso.scala
@@ -28,78 +28,44 @@ import org.jblas.DoubleMatrix
*
*/
class LassoModel(
- val weights: Array[Double],
- val intercept: Double,
- val stochasticLosses: Array[Double]) extends RegressionModel {
-
- // Create a column vector that can be used for predictions
- private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*)
-
- override def predict(testData: spark.RDD[Array[Double]]) = {
- // A small optimization to avoid serializing the entire model. Only the weightsMatrix
- // and intercept is needed.
- val localWeights = weightsMatrix
- val localIntercept = intercept
- testData.map { x =>
- new DoubleMatrix(1, x.length, x:_*).dot(localWeights) + localIntercept
- }
- }
-
-
- override def predict(testData: Array[Double]): Double = {
- val dataMat = new DoubleMatrix(1, testData.length, testData:_*)
- dataMat.dot(weightsMatrix) + this.intercept
+ override val weights: Array[Double],
+ override val intercept: Double)
+ extends GeneralizedLinearModel[Double](weights, intercept)
+ with RegressionModel with Serializable {
+
+ override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix,
+ intercept: Double) = {
+ dataMatrix.dot(weightMatrix) + intercept
}
}
-class Lasso(val opts: GradientDescentOpts) extends Logging {
+class LassoWithSGD (
+ var stepSize: Double,
+ var numIterations: Int,
+ var regParam: Double,
+ var miniBatchFraction: Double,
+ var addIntercept: Boolean)
+ extends GeneralizedLinearAlgorithm[Double, LassoModel]
+ with GradientDescent with Serializable {
+
+ val gradient = new SquaredGradient()
+ val updater = new L1Updater()
/**
* Construct a Lasso object with default parameters
*/
- def this() = this(GradientDescentOpts(1.0, 100, 1.0, 1.0))
-
- def train(input: RDD[(Double, Array[Double])]): LassoModel = {
- val nfeatures: Int = input.take(1)(0)._2.length
- val initialWeights = Array.fill(nfeatures)(1.0)
- train(input, initialWeights)
- }
-
- def train(
- input: RDD[(Double, Array[Double])],
- initialWeights: Array[Double]): LassoModel = {
-
- // Add a extra variable consisting of all 1.0's for the intercept.
- val data = input.map { case (y, features) =>
- (y, Array(1.0, features:_*))
- }
-
- val initalWeightsWithIntercept = Array(1.0, initialWeights:_*)
-
- val (weights, stochasticLosses) = GradientDescent.runMiniBatchSGD(
- data,
- new SquaredGradient(),
- new L1Updater(),
- opts,
- initalWeightsWithIntercept)
-
- val intercept = weights(0)
- val weightsScaled = weights.tail
-
- val model = new LassoModel(weightsScaled, intercept, stochasticLosses)
+ def this() = this(1.0, 100, 1.0, 1.0, true)
- logInfo("Final model weights " + model.weights.mkString(","))
- logInfo("Final model intercept " + model.intercept)
- logInfo("Last 10 stochasticLosses " + model.stochasticLosses.takeRight(10).mkString(", "))
- model
+ def createModel(weights: Array[Double], intercept: Double) = {
+ new LassoModel(weights, intercept)
}
}
/**
* Top-level methods for calling Lasso.
*/
-object Lasso {
+object LassoWithSGD {
/**
* Train a Lasso model given an RDD of (label, features) pairs. We run a fixed number
@@ -124,8 +90,8 @@ object Lasso {
initialWeights: Array[Double])
: LassoModel =
{
- val sgdOpts = GradientDescentOpts(stepSize, numIterations, regParam, miniBatchFraction)
- new Lasso(sgdOpts).train(input, initialWeights)
+ new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).train(input,
+ initialWeights)
}
/**
@@ -147,8 +113,7 @@ object Lasso {
miniBatchFraction: Double)
: LassoModel =
{
- val sgdOpts = GradientDescentOpts(stepSize, numIterations, regParam, miniBatchFraction)
- new Lasso(sgdOpts).train(input)
+ new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).train(input)
}
/**
@@ -196,7 +161,7 @@ object Lasso {
}
val sc = new SparkContext(args(0), "Lasso")
val data = MLUtils.loadLabeledData(sc, args(1))
- val model = Lasso.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble)
+ val model = LassoWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble)
sc.stop()
}
diff --git a/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala
index 439867d163..ee38486212 100644
--- a/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala
+++ b/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala
@@ -80,8 +80,7 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll with Shoul
val testRDD = sc.parallelize(testData, 2)
testRDD.cache()
- val sgdOpts = GradientDescentOpts().setStepSize(10.0).setNumIterations(20)
- val lr = new LogisticRegression(sgdOpts)
+ val lr = new LogisticRegressionWithSGD().setStepSize(10.0).setNumIterations(20)
val model = lr.train(testRDD)
@@ -113,8 +112,7 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll with Shoul
testRDD.cache()
// Use half as many iterations as the previous test.
- val sgdOpts = GradientDescentOpts().setStepSize(10.0).setNumIterations(10)
- val lr = new LogisticRegression(sgdOpts)
+ val lr = new LogisticRegressionWithSGD().setStepSize(10.0).setNumIterations(10)
val model = lr.train(testRDD, initialWeights)
diff --git a/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala b/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala
index a624b42c38..1eef9387e3 100644
--- a/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala
+++ b/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala
@@ -44,7 +44,8 @@ class SVMSuite extends FunSuite with BeforeAndAfterAll {
seed: Int): Seq[(Int, Array[Double])] = {
val rnd = new Random(seed)
val weightsMat = new DoubleMatrix(1, weights.length, weights:_*)
- val x = Array.fill[Array[Double]](nPoints)(Array.fill[Double](weights.length)(rnd.nextGaussian()))
+ val x = Array.fill[Array[Double]](nPoints)(
+ Array.fill[Double](weights.length)(rnd.nextGaussian()))
val y = x.map { xi =>
signum(
(new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) +
@@ -75,8 +76,7 @@ class SVMSuite extends FunSuite with BeforeAndAfterAll {
val testRDD = sc.parallelize(testData, 2)
testRDD.cache()
- val sgdOpts = GradientDescentOpts().setStepSize(1.0).setRegParam(1.0).setNumIterations(100)
- val svm = new SVM(sgdOpts)
+ val svm = new SVMWithSGD().setStepSize(1.0).setRegParam(1.0).setNumIterations(100)
val model = svm.train(testRDD)
@@ -106,8 +106,7 @@ class SVMSuite extends FunSuite with BeforeAndAfterAll {
val testRDD = sc.parallelize(testData, 2)
testRDD.cache()
- val sgdOpts = GradientDescentOpts().setStepSize(1.0).setRegParam(1.0).setNumIterations(100)
- val svm = new SVM(sgdOpts)
+ val svm = new SVMWithSGD().setStepSize(1.0).setRegParam(1.0).setNumIterations(100)
val model = svm.train(testRDD, initialWeights)
diff --git a/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala b/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala
index 531746ec02..ab1d07b879 100644
--- a/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala
+++ b/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala
@@ -44,10 +44,11 @@ class LassoSuite extends FunSuite with BeforeAndAfterAll {
seed: Int): Seq[(Double, Array[Double])] = {
val rnd = new Random(seed)
val weightsMat = new DoubleMatrix(1, weights.length, weights:_*)
- val x = Array.fill[Array[Double]](nPoints)(Array.fill[Double](weights.length)(rnd.nextGaussian()))
+ val x = Array.fill[Array[Double]](nPoints)(
+ Array.fill[Double](weights.length)(rnd.nextGaussian()))
val y = x.map(xi =>
(new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) + intercept + 0.1 * rnd.nextGaussian()
- )
+ )
y zip x
}
@@ -72,8 +73,7 @@ class LassoSuite extends FunSuite with BeforeAndAfterAll {
val testRDD = sc.parallelize(testData, 2)
testRDD.cache()
- val sgdOpts = GradientDescentOpts().setStepSize(1.0).setRegParam(0.01).setNumIterations(20)
- val ls = new Lasso(sgdOpts)
+ val ls = new LassoWithSGD().setStepSize(1.0).setRegParam(0.01).setNumIterations(20)
val model = ls.train(testRDD)
@@ -109,8 +109,7 @@ class LassoSuite extends FunSuite with BeforeAndAfterAll {
val testRDD = sc.parallelize(testData, 2)
testRDD.cache()
- val sgdOpts = GradientDescentOpts().setStepSize(1.0).setRegParam(0.01).setNumIterations(20)
- val ls = new Lasso(sgdOpts)
+ val ls = new LassoWithSGD().setStepSize(1.0).setRegParam(0.01).setNumIterations(20)
val model = ls.train(testRDD, initialWeights)