aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorXinghao <pxinghao@gmail.com>2013-07-26 18:57:39 -0700
committerXinghao <pxinghao@gmail.com>2013-07-26 18:57:39 -0700
commitb0bbc7f6a8da8e4c8e4e5bb656d7c7eed9b24511 (patch)
tree97c76e6b49ba88598a4d66a009d0d4f910ada8fd /mllib
parent071afe2a333b6ea4fac183c7045f83e168d482a9 (diff)
downloadspark-b0bbc7f6a8da8e4c8e4e5bb656d7c7eed9b24511.tar.gz
spark-b0bbc7f6a8da8e4c8e4e5bb656d7c7eed9b24511.tar.bz2
spark-b0bbc7f6a8da8e4c8e4e5bb656d7c7eed9b24511.zip
Resolve conflicts with master, removed regParam for LogisticRegression
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala120
-rw-r--r--mllib/src/main/scala/spark/mllib/classification/SVM.scala96
-rw-r--r--mllib/src/main/scala/spark/mllib/optimization/Gradient.scala18
-rw-r--r--mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala37
-rw-r--r--mllib/src/main/scala/spark/mllib/regression/Lasso.scala97
-rw-r--r--mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala108
6 files changed, 412 insertions, 64 deletions
diff --git a/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala
index 243a346364..40b96fbe3a 100644
--- a/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala
+++ b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package spark.mllib.classification
import spark.{Logging, RDD, SparkContext}
@@ -11,32 +28,39 @@ import org.jblas.DoubleMatrix
* Based on Matlab code written by John Duchi.
*/
class LogisticRegressionModel(
- val weights: DoubleMatrix,
+ val weights: Array[Double],
val intercept: Double,
- val losses: Array[Double]) extends ClassificationModel {
+ val stochasticLosses: Array[Double]) extends ClassificationModel {
+
+ // Create a column vector that can be used for predictions
+ private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*)
override def predict(testData: spark.RDD[Array[Double]]) = {
+ // A small optimization to avoid serializing the entire model. Only the weightsMatrix
+ // and intercept is needed.
+ val localWeights = weightsMatrix
+ val localIntercept = intercept
testData.map { x =>
- val margin = new DoubleMatrix(1, x.length, x:_*).mmul(this.weights).get(0) + this.intercept
+ val margin = new DoubleMatrix(1, x.length, x:_*).mmul(localWeights).get(0) + localIntercept
1.0/ (1.0 + math.exp(margin * -1))
}
}
override def predict(testData: Array[Double]): Double = {
val dataMat = new DoubleMatrix(1, testData.length, testData:_*)
- val margin = dataMat.mmul(this.weights).get(0) + this.intercept
+ val margin = dataMat.mmul(weightsMatrix).get(0) + this.intercept
1.0/ (1.0 + math.exp(margin * -1))
}
}
-class LogisticRegression_LocalRandomSGD private (var stepSize: Double, var regParam: Double, var miniBatchFraction: Double,
+class LogisticRegression_LocalRandomSGD private (var stepSize: Double, var miniBatchFraction: Double,
var numIters: Int)
extends Logging {
/**
* Construct a LogisticRegression object with default parameters
*/
- def this() = this(1.0, 0.0, 1.0, 100)
+ def this() = this(1.0, 1.0, 100)
/**
* Set the step size per-iteration of SGD. Default 1.0.
@@ -46,6 +70,14 @@ class LogisticRegression_LocalRandomSGD private (var stepSize: Double, var regPa
this
}
+
+
+
+
+
+
+
+
/**
* Set fraction of data to be used for each SGD iteration. Default 1.0.
*/
@@ -63,51 +95,97 @@ class LogisticRegression_LocalRandomSGD private (var stepSize: Double, var regPa
}
def train(input: RDD[(Double, Array[Double])]): LogisticRegressionModel = {
+ val nfeatures: Int = input.take(1)(0)._2.length
+ val initialWeights = Array.fill(nfeatures)(1.0)
+ train(input, initialWeights)
+ }
+
+ def train(
+ input: RDD[(Double, Array[Double])],
+ initialWeights: Array[Double]): LogisticRegressionModel = {
+
// Add a extra variable consisting of all 1.0's for the intercept.
val data = input.map { case (y, features) =>
(y, Array(1.0, features:_*))
}
- val (weights, losses) = GradientDescent.runMiniBatchSGD(
- data, new LogisticGradient(), new SimpleUpdater(), stepSize, numIters, regParam, miniBatchFraction)
+ val initalWeightsWithIntercept = Array(1.0, initialWeights:_*)
- val weightsScaled = weights.getRange(1, weights.length)
- val intercept = weights.get(0)
+ val (weights, stochasticLosses) = GradientDescent.runMiniBatchSGD(
+ data,
+ new LogisticGradient(),
+ new SimpleUpdater(),
+ stepSize,
+ numIters,
+ 0.0,
+ initalWeightsWithIntercept,
+ miniBatchFraction)
- val model = new LogisticRegressionModel(weightsScaled, intercept, losses)
+ val intercept = weights(0)
+ val weightsScaled = weights.tail
- logInfo("Final model weights " + model.weights)
+ val model = new LogisticRegressionModel(weightsScaled, intercept, stochasticLosses)
+
+ logInfo("Final model weights " + model.weights.mkString(","))
logInfo("Final model intercept " + model.intercept)
- logInfo("Last 10 losses " + model.losses.takeRight(10).mkString(", "))
+ logInfo("Last 10 stochastic losses " + model.stochasticLosses.takeRight(10).mkString(", "))
model
}
}
/**
* Top-level methods for calling Logistic Regression.
+ * NOTE(shivaram): We use multiple train methods instead of default arguments to support
+ * Java programs.
*/
object LogisticRegression_LocalRandomSGD {
/**
* Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed number
* of iterations of gradient descent using the specified step size. Each iteration uses
+ * `miniBatchFraction` fraction of the data to calculate the gradient. The weights used in
+ * gradient descent are initialized using the initial weights provided.
+ *
+ * @param input RDD of (label, array of features) pairs.
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @param stepSize Step size to be used for each iteration of gradient descent.
+
+ * @param miniBatchFraction Fraction of data to be used per iteration.
+ * @param initialWeights Initial set of weights to be used. Array should be equal in size to
+ * the number of features in the data.
+ */
+ def train(
+ input: RDD[(Double, Array[Double])],
+ numIterations: Int,
+ stepSize: Double,
+
+ miniBatchFraction: Double,
+ initialWeights: Array[Double])
+ : LogisticRegressionModel =
+ {
+ new LogisticRegression_LocalRandomSGD(stepSize, miniBatchFraction, numIterations).train(input, initialWeights)
+ }
+
+ /**
+ * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed number
+ * of iterations of gradient descent using the specified step size. Each iteration uses
* `miniBatchFraction` fraction of the data to calculate the gradient.
*
* @param input RDD of (label, array of features) pairs.
* @param numIterations Number of iterations of gradient descent to run.
* @param stepSize Step size to be used for each iteration of gradient descent.
- * @param regParam Regularization parameter.
+
* @param miniBatchFraction Fraction of data to be used per iteration.
*/
def train(
input: RDD[(Double, Array[Double])],
numIterations: Int,
stepSize: Double,
- regParam: Double,
+
miniBatchFraction: Double)
: LogisticRegressionModel =
{
- new LogisticRegression_LocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train(input)
+ new LogisticRegression_LocalRandomSGD(stepSize, miniBatchFraction, numIterations).train(input)
}
/**
@@ -117,18 +195,18 @@ object LogisticRegression_LocalRandomSGD {
*
* @param input RDD of (label, array of features) pairs.
* @param stepSize Step size to be used for each iteration of Gradient Descent.
- * @param regParam Regularization parameter.
+
* @param numIterations Number of iterations of gradient descent to run.
* @return a LogisticRegressionModel which has the weights and offset from training.
*/
def train(
input: RDD[(Double, Array[Double])],
numIterations: Int,
- stepSize: Double,
- regParam: Double)
+ stepSize: Double
+ )
: LogisticRegressionModel =
{
- train(input, numIterations, stepSize, regParam, 1.0)
+ train(input, numIterations, stepSize, 1.0)
}
/**
@@ -145,7 +223,7 @@ object LogisticRegression_LocalRandomSGD {
numIterations: Int)
: LogisticRegressionModel =
{
- train(input, numIterations, 1.0, 0.0, 1.0)
+ train(input, numIterations, 1.0, 1.0)
}
def main(args: Array[String]) {
diff --git a/mllib/src/main/scala/spark/mllib/classification/SVM.scala b/mllib/src/main/scala/spark/mllib/classification/SVM.scala
index bf10493bf5..2cd1d668eb 100644
--- a/mllib/src/main/scala/spark/mllib/classification/SVM.scala
+++ b/mllib/src/main/scala/spark/mllib/classification/SVM.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package spark.mllib.classification
import scala.math.signum
@@ -11,22 +28,31 @@ import org.jblas.DoubleMatrix
* SVM using Stochastic Gradient Descent.
*/
class SVMModel(
- val weights: DoubleMatrix,
+ val weights: Array[Double],
val intercept: Double,
- val losses: Array[Double]) extends ClassificationModel {
+ val stochasticLosses: Array[Double]) extends ClassificationModel {
+
+ // Create a column vector that can be used for predictions
+ private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*)
override def predict(testData: spark.RDD[Array[Double]]) = {
+ // A small optimization to avoid serializing the entire model. Only the weightsMatrix
+ // and intercept is needed.
+ val localWeights = weightsMatrix
+ val localIntercept = intercept
testData.map { x =>
- signum(new DoubleMatrix(1, x.length, x:_*).dot(this.weights) + this.intercept)
+ signum(new DoubleMatrix(1, x.length, x:_*).dot(localWeights) + localIntercept)
}
}
override def predict(testData: Array[Double]): Double = {
val dataMat = new DoubleMatrix(1, testData.length, testData:_*)
- signum(dataMat.dot(this.weights) + this.intercept)
+ signum(dataMat.dot(weightsMatrix) + this.intercept)
}
}
+
+
class SVM_LocalRandomSGD private (var stepSize: Double, var regParam: Double, var miniBatchFraction: Double,
var numIters: Int)
extends Logging {
@@ -69,34 +95,80 @@ class SVM_LocalRandomSGD private (var stepSize: Double, var regParam: Double, va
}
def train(input: RDD[(Double, Array[Double])]): SVMModel = {
+ val nfeatures: Int = input.take(1)(0)._2.length
+ val initialWeights = Array.fill(nfeatures)(1.0)
+ train(input, initialWeights)
+ }
+
+ def train(
+ input: RDD[(Double, Array[Double])],
+ initialWeights: Array[Double]): SVMModel = {
+
// Add a extra variable consisting of all 1.0's for the intercept.
val data = input.map { case (y, features) =>
(y, Array(1.0, features:_*))
}
- val (weights, losses) = GradientDescent.runMiniBatchSGD(
- data, new HingeGradient(), new SquaredL2Updater(), stepSize, numIters, regParam, miniBatchFraction)
+ val initalWeightsWithIntercept = Array(1.0, initialWeights:_*)
+
+ val (weights, stochasticLosses) = GradientDescent.runMiniBatchSGD(
+ data,
+ new HingeGradient(),
+ new SquaredL2Updater(),
+ stepSize,
+ numIters,
+ regParam,
+ initalWeightsWithIntercept,
+ miniBatchFraction)
- val weightsScaled = weights.getRange(1, weights.length)
- val intercept = weights.get(0)
+ val intercept = weights(0)
+ val weightsScaled = weights.tail
- val model = new SVMModel(weightsScaled, intercept, losses)
+ val model = new SVMModel(weightsScaled, intercept, stochasticLosses)
- logInfo("Final model weights " + model.weights)
+ logInfo("Final model weights " + model.weights.mkString(","))
logInfo("Final model intercept " + model.intercept)
- logInfo("Last 10 losses " + model.losses.takeRight(10).mkString(", "))
+ logInfo("Last 10 stochasticLosses " + model.stochasticLosses.takeRight(10).mkString(", "))
model
}
}
/**
* Top-level methods for calling SVM.
+
+
*/
object SVM_LocalRandomSGD {
/**
* Train a SVM model given an RDD of (label, features) pairs. We run a fixed number
* of iterations of gradient descent using the specified step size. Each iteration uses
+ * `miniBatchFraction` fraction of the data to calculate the gradient. The weights used in
+ * gradient descent are initialized using the initial weights provided.
+ *
+ * @param input RDD of (label, array of features) pairs.
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @param stepSize Step size to be used for each iteration of gradient descent.
+ * @param regParam Regularization parameter.
+ * @param miniBatchFraction Fraction of data to be used per iteration.
+ * @param initialWeights Initial set of weights to be used. Array should be equal in size to
+ * the number of features in the data.
+ */
+ def train(
+ input: RDD[(Double, Array[Double])],
+ numIterations: Int,
+ stepSize: Double,
+ regParam: Double,
+ miniBatchFraction: Double,
+ initialWeights: Array[Double])
+ : SVMModel =
+ {
+ new SVM_LocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train(input, initialWeights)
+ }
+
+ /**
+ * Train a SVM model given an RDD of (label, features) pairs. We run a fixed number
+ * of iterations of gradient descent using the specified step size. Each iteration uses
* `miniBatchFraction` fraction of the data to calculate the gradient.
*
* @param input RDD of (label, array of features) pairs.
@@ -151,7 +223,7 @@ object SVM_LocalRandomSGD {
numIterations: Int)
: SVMModel =
{
- train(input, numIterations, 1.0, 0.10, 1.0)
+ train(input, numIterations, 1.0, 1.0, 1.0)
}
def main(args: Array[String]) {
diff --git a/mllib/src/main/scala/spark/mllib/optimization/Gradient.scala b/mllib/src/main/scala/spark/mllib/optimization/Gradient.scala
index 6ffc3b128b..4864ab7ccf 100644
--- a/mllib/src/main/scala/spark/mllib/optimization/Gradient.scala
+++ b/mllib/src/main/scala/spark/mllib/optimization/Gradient.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package spark.mllib.optimization
import org.jblas.DoubleMatrix
@@ -58,4 +75,3 @@ class HingeGradient extends Gradient {
(DoubleMatrix.zeros(1,weights.length), 0.0)
}
}
-
diff --git a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala
index bd8489c386..8387d4939b 100644
--- a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala
+++ b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package spark.mllib.optimization
import spark.{Logging, RDD, SparkContext}
@@ -23,8 +40,9 @@ object GradientDescent {
* @param miniBatchFraction - fraction of the input data set that should be used for
* one iteration of SGD. Default value 1.0.
*
- * @return weights - Column matrix containing weights for every feature.
- * @return lossHistory - Array containing the loss computed for every iteration.
+ * @return A tuple containing two elements. The first element is a column matrix containing
+ * weights for every feature, and the second element is an array containing the stochastic
+ * loss computed for every iteration.
*/
def runMiniBatchSGD(
data: RDD[(Double, Array[Double])],
@@ -33,16 +51,16 @@ object GradientDescent {
stepSize: Double,
numIters: Int,
regParam: Double,
- miniBatchFraction: Double=1.0) : (DoubleMatrix, Array[Double]) = {
+ initialWeights: Array[Double],
+ miniBatchFraction: Double=1.0) : (Array[Double], Array[Double]) = {
- val lossHistory = new ArrayBuffer[Double](numIters)
+ val stochasticLossHistory = new ArrayBuffer[Double](numIters)
- val nfeatures: Int = data.take(1)(0)._2.length
val nexamples: Long = data.count()
val miniBatchSize = nexamples * miniBatchFraction
- // Initialize weights as a column matrix
- var weights = DoubleMatrix.ones(nfeatures)
+ // Initialize weights as a column vector
+ var weights = new DoubleMatrix(initialWeights.length, 1, initialWeights:_*)
var reg_val = 0.0
for (i <- 1 to numIters) {
@@ -53,16 +71,17 @@ object GradientDescent {
(grad, loss)
}.reduce((a, b) => (a._1.addi(b._1), a._2 + b._2))
+ stochasticLossHistory.append(lossSum / miniBatchSize + reg_val)
val update = updater.compute(weights, gradientSum.div(miniBatchSize), stepSize, i, regParam)
weights = update._1
reg_val = update._2
- lossHistory.append(lossSum / miniBatchSize + reg_val)
+ stochasticLossHistory.append(lossSum / miniBatchSize + reg_val)
/***
Xinghao: The loss here is sum of lossSum computed using the weights before applying updater,
and reg_val using weights after applying updater
***/
}
- (weights, lossHistory.toArray)
+ (weights.toArray, stochasticLossHistory.toArray)
}
}
diff --git a/mllib/src/main/scala/spark/mllib/regression/Lasso.scala b/mllib/src/main/scala/spark/mllib/regression/Lasso.scala
index bb2305c811..64364323a2 100644
--- a/mllib/src/main/scala/spark/mllib/regression/Lasso.scala
+++ b/mllib/src/main/scala/spark/mllib/regression/Lasso.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package spark.mllib.regression
import spark.{Logging, RDD, SparkContext}
@@ -8,24 +25,34 @@ import org.jblas.DoubleMatrix
/**
* Lasso using Stochastic Gradient Descent.
+ *
*/
class LassoModel(
- val weights: DoubleMatrix,
+ val weights: Array[Double],
val intercept: Double,
- val losses: Array[Double]) extends RegressionModel {
+ val stochasticLosses: Array[Double]) extends RegressionModel {
+
+ // Create a column vector that can be used for predictions
+ private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*)
override def predict(testData: spark.RDD[Array[Double]]) = {
+ // A small optimization to avoid serializing the entire model. Only the weightsMatrix
+ // and intercept is needed.
+ val localWeights = weightsMatrix
+ val localIntercept = intercept
testData.map { x =>
- new DoubleMatrix(1, x.length, x:_*).dot(this.weights) + this.intercept
+ new DoubleMatrix(1, x.length, x:_*).dot(localWeights) + localIntercept
}
}
+
override def predict(testData: Array[Double]): Double = {
val dataMat = new DoubleMatrix(1, testData.length, testData:_*)
- dataMat.dot(this.weights) + this.intercept
+ dataMat.dot(weightsMatrix) + this.intercept
}
}
+
class Lasso_LocalRandomSGD private (var stepSize: Double, var regParam: Double, var miniBatchFraction: Double,
var numIters: Int)
extends Logging {
@@ -68,34 +95,80 @@ class Lasso_LocalRandomSGD private (var stepSize: Double, var regParam: Double,
}
def train(input: RDD[(Double, Array[Double])]): LassoModel = {
+ val nfeatures: Int = input.take(1)(0)._2.length
+ val initialWeights = Array.fill(nfeatures)(1.0)
+ train(input, initialWeights)
+ }
+
+ def train(
+ input: RDD[(Double, Array[Double])],
+ initialWeights: Array[Double]): LassoModel = {
+
// Add a extra variable consisting of all 1.0's for the intercept.
val data = input.map { case (y, features) =>
(y, Array(1.0, features:_*))
}
- val (weights, losses) = GradientDescent.runMiniBatchSGD(
- data, new SquaredGradient(), new L1Updater(), stepSize, numIters, regParam, miniBatchFraction)
+ val initalWeightsWithIntercept = Array(1.0, initialWeights:_*)
+
+ val (weights, stochasticLosses) = GradientDescent.runMiniBatchSGD(
+ data,
+ new SquaredGradient(),
+ new L1Updater(),
+ stepSize,
+ numIters,
+ regParam,
+ initalWeightsWithIntercept,
+ miniBatchFraction)
- val weightsScaled = weights.getRange(1, weights.length)
- val intercept = weights.get(0)
+ val intercept = weights(0)
+ val weightsScaled = weights.tail
- val model = new LassoModel(weightsScaled, intercept, losses)
+ val model = new LassoModel(weightsScaled, intercept, stochasticLosses)
- logInfo("Final model weights " + model.weights)
+ logInfo("Final model weights " + model.weights.mkString(","))
logInfo("Final model intercept " + model.intercept)
- logInfo("Last 10 losses " + model.losses.takeRight(10).mkString(", "))
+ logInfo("Last 10 stochasticLosses " + model.stochasticLosses.takeRight(10).mkString(", "))
model
}
}
/**
* Top-level methods for calling Lasso.
+ *
+ *
*/
object Lasso_LocalRandomSGD {
/**
* Train a Lasso model given an RDD of (label, features) pairs. We run a fixed number
* of iterations of gradient descent using the specified step size. Each iteration uses
+ * `miniBatchFraction` fraction of the data to calculate the gradient. The weights used in
+ * gradient descent are initialized using the initial weights provided.
+ *
+ * @param input RDD of (label, array of features) pairs.
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @param stepSize Step size to be used for each iteration of gradient descent.
+ * @param regParam Regularization parameter.
+ * @param miniBatchFraction Fraction of data to be used per iteration.
+ * @param initialWeights Initial set of weights to be used. Array should be equal in size to
+ * the number of features in the data.
+ */
+ def train(
+ input: RDD[(Double, Array[Double])],
+ numIterations: Int,
+ stepSize: Double,
+ regParam: Double,
+ miniBatchFraction: Double,
+ initialWeights: Array[Double])
+ : LassoModel =
+ {
+ new Lasso_LocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train(input, initialWeights)
+ }
+
+ /**
+ * Train a Lasso model given an RDD of (label, features) pairs. We run a fixed number
+ * of iterations of gradient descent using the specified step size. Each iteration uses
* `miniBatchFraction` fraction of the data to calculate the gradient.
*
* @param input RDD of (label, array of features) pairs.
@@ -150,7 +223,7 @@ object Lasso_LocalRandomSGD {
numIterations: Int)
: LassoModel =
{
- train(input, numIterations, 1.0, 0.10, 1.0)
+ train(input, numIterations, 1.0, 1.0, 1.0)
}
def main(args: Array[String]) {
diff --git a/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala
index 5741906a14..827ca66330 100644
--- a/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala
+++ b/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala
@@ -1,4 +1,25 @@
+<<<<<<< HEAD:mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala
package spark.mllib.classification
+=======
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.regression
+>>>>>>> FETCH_HEAD:mllib/src/test/scala/spark/mllib/regression/LogisticRegressionSuite.scala
import scala.util.Random
@@ -6,7 +27,6 @@ import org.scalatest.BeforeAndAfterAll
import org.scalatest.FunSuite
import spark.SparkContext
-import spark.SparkContext._
class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll {
@@ -17,16 +37,23 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll {
System.clearProperty("spark.driver.port")
}
+<<<<<<< HEAD:mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala
// Test if we can correctly learn A, B where Y = logistic(A + B*X)
test("LogisticRegression_LocalRandomSGD") {
val nPoints = 10000
val rnd = new Random(42)
+=======
+ // Generate input of the form Y = logistic(offset + scale*X)
+ def generateLogisticInput(
+ offset: Double,
+ scale: Double,
+ nPoints: Int,
+ seed: Int): Seq[(Double, Array[Double])] = {
+ val rnd = new Random(seed)
+>>>>>>> FETCH_HEAD:mllib/src/test/scala/spark/mllib/regression/LogisticRegressionSuite.scala
val x1 = Array.fill[Double](nPoints)(rnd.nextGaussian())
- val A = 2.0
- val B = -1.5
-
// NOTE: if U is uniform[0, 1] then ln(u) - ln(1-u) is Logistic(0,1)
val unifRand = new scala.util.Random(45)
val rLogis = (0 until nPoints).map { i =>
@@ -34,24 +61,87 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll {
math.log(u) - math.log(1.0-u)
}
- // y <- A + B*x + rlogis(100)
+ // y <- A + B*x + rLogis()
// y <- as.numeric(y > 0)
- val y = (0 until nPoints).map { i =>
- val yVal = A + B * x1(i) + rLogis(i)
+ val y: Seq[Double] = (0 until nPoints).map { i =>
+ val yVal = offset + scale * x1(i) + rLogis(i)
if (yVal > 0) 1.0 else 0.0
}
- val testData = (0 until nPoints).map(i => (y(i).toDouble, Array(x1(i)))).toArray
+ val testData = (0 until nPoints).map(i => (y(i), Array(x1(i))))
+ testData
+ }
+
+ def validatePrediction(predictions: Seq[Double], input: Seq[(Double, Array[Double])]) {
+ val numOffPredictions = predictions.zip(input).filter { case (prediction, (expected, _)) =>
+ // A prediction is off if the prediction is more than 0.5 away from expected value.
+ math.abs(prediction - expected) > 0.5
+ }.size
+ // At least 80% of the predictions should be on.
+ assert(numOffPredictions < input.length / 5)
+ }
+
+ // Test if we can correctly learn A, B where Y = logistic(A + B*X)
+ test("logistic regression") {
+ val nPoints = 10000
+ val A = 2.0
+ val B = -1.5
+
+ val testData = generateLogisticInput(A, B, nPoints, 42)
val testRDD = sc.parallelize(testData, 2)
testRDD.cache()
+<<<<<<< HEAD:mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala
val lr = new LogisticRegression_LocalRandomSGD().setStepSize(10.0)
.setNumIterations(20)
+=======
+ val lr = new LogisticRegression().setStepSize(10.0).setNumIterations(20)
+>>>>>>> FETCH_HEAD:mllib/src/test/scala/spark/mllib/regression/LogisticRegressionSuite.scala
val model = lr.train(testRDD)
- val weight0 = model.weights.get(0)
+ // Test the weights
+ val weight0 = model.weights(0)
+ assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]")
+ assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + " not in [1.9, 2.1]")
+
+ val validationData = generateLogisticInput(A, B, nPoints, 17)
+ val validationRDD = sc.parallelize(validationData, 2)
+ // Test prediction on RDD.
+ validatePrediction(model.predict(validationRDD.map(_._2)).collect(), validationData)
+
+ // Test prediction on Array.
+ validatePrediction(validationData.map(row => model.predict(row._2)), validationData)
+ }
+
+ test("logistic regression with initial weights") {
+ val nPoints = 10000
+ val A = 2.0
+ val B = -1.5
+
+ val testData = generateLogisticInput(A, B, nPoints, 42)
+
+ val initialB = -1.0
+ val initialWeights = Array(initialB)
+
+ val testRDD = sc.parallelize(testData, 2)
+ testRDD.cache()
+
+ // Use half as many iterations as the previous test.
+ val lr = new LogisticRegression().setStepSize(10.0).setNumIterations(10)
+
+ val model = lr.train(testRDD, initialWeights)
+
+ val weight0 = model.weights(0)
assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]")
assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + " not in [1.9, 2.1]")
+
+ val validationData = generateLogisticInput(A, B, nPoints, 17)
+ val validationRDD = sc.parallelize(validationData, 2)
+ // Test prediction on RDD.
+ validatePrediction(model.predict(validationRDD.map(_._2)).collect(), validationData)
+
+ // Test prediction on Array.
+ validatePrediction(validationData.map(row => model.predict(row._2)), validationData)
}
}