aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorShivaram Venkataraman <shivaram@eecs.berkeley.edu>2013-07-17 14:04:05 -0700
committerShivaram Venkataraman <shivaram@eecs.berkeley.edu>2013-07-17 14:04:05 -0700
commit84fa20c2a135f54745ddde9abb4f5e60af8856d1 (patch)
tree10041963447a4ee4151ea2139053cfd4fa9a075f /mllib
parentcad48edb70c58a48f66851e963f51db712ab7863 (diff)
downloadspark-84fa20c2a135f54745ddde9abb4f5e60af8856d1.tar.gz
spark-84fa20c2a135f54745ddde9abb4f5e60af8856d1.tar.bz2
spark-84fa20c2a135f54745ddde9abb4f5e60af8856d1.zip
Allow initial weight vectors in LogisticRegression.
Also move LogisticGradient to the LogisticRegression file and fix the unit tests log path.
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/spark/mllib/optimization/Gradient.scala18
-rw-r--r--mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala5
-rw-r--r--mllib/src/main/scala/spark/mllib/regression/LogisticRegression.scala61
-rw-r--r--mllib/src/test/resources/log4j.properties2
-rw-r--r--mllib/src/test/scala/spark/mllib/regression/LogisticRegressionSuite.scala52
5 files changed, 106 insertions, 32 deletions
diff --git a/mllib/src/main/scala/spark/mllib/optimization/Gradient.scala b/mllib/src/main/scala/spark/mllib/optimization/Gradient.scala
index 2fb0c8136f..d5338360c8 100644
--- a/mllib/src/main/scala/spark/mllib/optimization/Gradient.scala
+++ b/mllib/src/main/scala/spark/mllib/optimization/Gradient.scala
@@ -30,21 +30,3 @@ abstract class Gradient extends Serializable {
def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix):
(DoubleMatrix, Double)
}
-
-class LogisticGradient extends Gradient {
- override def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix):
- (DoubleMatrix, Double) = {
- val margin: Double = -1.0 * data.dot(weights)
- val gradientMultiplier = (1.0 / (1.0 + math.exp(margin))) - label
-
- val gradient = data.mul(gradientMultiplier)
- val loss =
- if (margin > 0) {
- math.log(1 + math.exp(0 - margin))
- } else {
- math.log(1 + math.exp(margin)) - margin
- }
-
- (gradient, loss)
- }
-}
diff --git a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala
index e1b73bc25e..2ac0808357 100644
--- a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala
+++ b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala
@@ -48,16 +48,17 @@ object GradientDescent {
updater: Updater,
stepSize: Double,
numIters: Int,
+ initialWeights: Array[Double],
miniBatchFraction: Double=1.0) : (DoubleMatrix, Array[Double]) = {
val lossHistory = new ArrayBuffer[Double](numIters)
- val nfeatures: Int = data.take(1)(0)._2.length
val nexamples: Long = data.count()
val miniBatchSize = nexamples * miniBatchFraction
// Initialize weights as a column matrix
- var weights = DoubleMatrix.ones(nfeatures)
+ var weights = new DoubleMatrix(1, initialWeights.length,
+ initialWeights:_*)
var reg_val = 0.0
for (i <- 1 to numIters) {
diff --git a/mllib/src/main/scala/spark/mllib/regression/LogisticRegression.scala b/mllib/src/main/scala/spark/mllib/regression/LogisticRegression.scala
index bb294c2257..dab15aa386 100644
--- a/mllib/src/main/scala/spark/mllib/regression/LogisticRegression.scala
+++ b/mllib/src/main/scala/spark/mllib/regression/LogisticRegression.scala
@@ -46,6 +46,24 @@ class LogisticRegressionModel(
}
}
+class LogisticGradient extends Gradient {
+ override def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix):
+ (DoubleMatrix, Double) = {
+ val margin: Double = -1.0 * data.dot(weights)
+ val gradientMultiplier = (1.0 / (1.0 + math.exp(margin))) - label
+
+ val gradient = data.mul(gradientMultiplier)
+ val loss =
+ if (margin > 0) {
+ math.log(1 + math.exp(0 - margin))
+ } else {
+ math.log(1 + math.exp(margin)) - margin
+ }
+
+ (gradient, loss)
+ }
+}
+
class LogisticRegression private (var stepSize: Double, var miniBatchFraction: Double,
var numIters: Int)
extends Logging {
@@ -80,13 +98,30 @@ class LogisticRegression private (var stepSize: Double, var miniBatchFraction: D
}
def train(input: RDD[(Double, Array[Double])]): LogisticRegressionModel = {
+ val nfeatures: Int = input.take(1)(0)._2.length
+ val initialWeights = Array.fill(nfeatures)(1.0)
+ train(input, initialWeights)
+ }
+
+ def train(
+ input: RDD[(Double, Array[Double])],
+ initialWeights: Array[Double]): LogisticRegressionModel = {
+
// Add a extra variable consisting of all 1.0's for the intercept.
val data = input.map { case (y, features) =>
(y, Array(1.0, features:_*))
}
+ val initalWeightsWithIntercept = Array(1.0, initialWeights:_*)
+
val (weights, losses) = GradientDescent.runMiniBatchSGD(
- data, new LogisticGradient(), new SimpleUpdater(), stepSize, numIters, miniBatchFraction)
+ data,
+ new LogisticGradient(),
+ new SimpleUpdater(),
+ stepSize,
+ numIters,
+ initalWeightsWithIntercept,
+ miniBatchFraction)
val weightsScaled = weights.getRange(1, weights.length)
val intercept = weights.get(0)
@@ -108,6 +143,30 @@ object LogisticRegression {
/**
* Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed number
* of iterations of gradient descent using the specified step size. Each iteration uses
+ * `miniBatchFraction` fraction of the data to calculate the gradient. The weights used in
+ * gradient descent are initialized using the initial weights provided.
+ *
+ * @param input RDD of (label, array of features) pairs.
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @param stepSize Step size to be used for each iteration of gradient descent.
+ * @param miniBatchFraction Fraction of data to be used per iteration.
+ * @param initialWeights Initial set of weights to be used. Array should be equal in size to
+ * the number of features in the data.
+ */
+ def train(
+ input: RDD[(Double, Array[Double])],
+ numIterations: Int,
+ stepSize: Double,
+ miniBatchFraction: Double,
+ initialWeights: Array[Double])
+ : LogisticRegressionModel =
+ {
+ new LogisticRegression(stepSize, miniBatchFraction, numIterations).train(input, initialWeights)
+ }
+
+ /**
+ * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed number
+ * of iterations of gradient descent using the specified step size. Each iteration uses
* `miniBatchFraction` fraction of the data to calculate the gradient.
*
* @param input RDD of (label, array of features) pairs.
diff --git a/mllib/src/test/resources/log4j.properties b/mllib/src/test/resources/log4j.properties
index a112e0b506..4265ba6e5d 100644
--- a/mllib/src/test/resources/log4j.properties
+++ b/mllib/src/test/resources/log4j.properties
@@ -19,7 +19,7 @@
log4j.rootCategory=INFO, file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
-log4j.appender.file.file=ml/target/unit-tests.log
+log4j.appender.file.file=mllib/target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
diff --git a/mllib/src/test/scala/spark/mllib/regression/LogisticRegressionSuite.scala b/mllib/src/test/scala/spark/mllib/regression/LogisticRegressionSuite.scala
index bc9bfd054f..2ff248d256 100644
--- a/mllib/src/test/scala/spark/mllib/regression/LogisticRegressionSuite.scala
+++ b/mllib/src/test/scala/spark/mllib/regression/LogisticRegressionSuite.scala
@@ -34,16 +34,14 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll {
System.clearProperty("spark.driver.port")
}
- // Test if we can correctly learn A, B where Y = logistic(A + B*X)
- test("logistic regression") {
- val nPoints = 10000
+ // Generate input of the form Y = logistic(offset + scale*X)
+ def generateLogisticInput(
+ offset: Double,
+ scale: Double,
+ nPoints: Int) : Seq[(Double, Array[Double])] = {
val rnd = new Random(42)
-
val x1 = Array.fill[Double](nPoints)(rnd.nextGaussian())
- val A = 2.0
- val B = -1.5
-
// NOTE: if U is uniform[0, 1] then ln(u) - ln(1-u) is Logistic(0,1)
val unifRand = new scala.util.Random(45)
val rLogis = (0 until nPoints).map { i =>
@@ -51,14 +49,24 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll {
math.log(u) - math.log(1.0-u)
}
- // y <- A + B*x + rlogis(100)
+ // y <- A + B*x + rLogis()
// y <- as.numeric(y > 0)
val y = (0 until nPoints).map { i =>
- val yVal = A + B * x1(i) + rLogis(i)
+ val yVal = offset + scale * x1(i) + rLogis(i)
if (yVal > 0) 1.0 else 0.0
}
- val testData = (0 until nPoints).map(i => (y(i).toDouble, Array(x1(i)))).toArray
+ val testData = (0 until nPoints).map(i => (y(i).toDouble, Array(x1(i))))
+ testData
+ }
+
+ // Test if we can correctly learn A, B where Y = logistic(A + B*X)
+ test("logistic regression") {
+ val nPoints = 10000
+ val A = 2.0
+ val B = -1.5
+
+ val testData = generateLogisticInput(A, B, nPoints)
val testRDD = sc.parallelize(testData, 2)
testRDD.cache()
@@ -71,4 +79,28 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll {
assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]")
assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + " not in [1.9, 2.1]")
}
+
+ test("logistic regression with initial weights") {
+ val nPoints = 10000
+ val A = 2.0
+ val B = -1.5
+
+ val testData = generateLogisticInput(A, B, nPoints)
+
+ val initialB = -1.0
+ val initialWeights = Array(initialB)
+
+ val testRDD = sc.parallelize(testData, 2)
+ testRDD.cache()
+
+ // Use half as many iterations as the previous test.
+ val lr = new LogisticRegression().setStepSize(10.0)
+ .setNumIterations(10)
+
+ val model = lr.train(testRDD, initialWeights)
+
+ val weight0 = model.weights.get(0)
+ assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]")
+ assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + " not in [1.9, 2.1]")
+ }
}