aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYanbo Liang <ybliang8@gmail.com>2015-09-14 12:08:52 -0700
committerXiangrui Meng <meng@databricks.com>2015-09-14 12:08:52 -0700
commitce6f3f163bc667cb5da9ab4331c8bad10cc0d701 (patch)
treeb45c7d15811bef5d745ebe2b33b496a91ae9ab21
parentcf2821ef5fd9965eb6256e8e8b3f1e00c0788098 (diff)
downloadspark-ce6f3f163bc667cb5da9ab4331c8bad10cc0d701.tar.gz
spark-ce6f3f163bc667cb5da9ab4331c8bad10cc0d701.tar.bz2
spark-ce6f3f163bc667cb5da9ab4331c8bad10cc0d701.zip
[SPARK-10194] [MLLIB] [PYSPARK] SGD algorithms need convergenceTol parameter in Python
[SPARK-3382](https://issues.apache.org/jira/browse/SPARK-3382) added a ```convergenceTol``` parameter for GradientDescent-based methods in Scala. We need that parameter in Python; otherwise, Python users will not be able to adjust that behavior (or even reproduce behavior from previous releases since the default changed). Author: Yanbo Liang <ybliang8@gmail.com> Closes #8457 from yanboliang/spark-10194.
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala20
-rw-r--r--python/pyspark/mllib/classification.py17
-rw-r--r--python/pyspark/mllib/regression.py32
3 files changed, 48 insertions, 21 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index f585aacd45..69ce7f5070 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -132,7 +132,8 @@ private[python] class PythonMLLibAPI extends Serializable {
regParam: Double,
regType: String,
intercept: Boolean,
- validateData: Boolean): JList[Object] = {
+ validateData: Boolean,
+ convergenceTol: Double): JList[Object] = {
val lrAlg = new LinearRegressionWithSGD()
lrAlg.setIntercept(intercept)
.setValidateData(validateData)
@@ -141,6 +142,7 @@ private[python] class PythonMLLibAPI extends Serializable {
.setRegParam(regParam)
.setStepSize(stepSize)
.setMiniBatchFraction(miniBatchFraction)
+ .setConvergenceTol(convergenceTol)
lrAlg.optimizer.setUpdater(getUpdaterFromString(regType))
trainRegressionModel(
lrAlg,
@@ -159,7 +161,8 @@ private[python] class PythonMLLibAPI extends Serializable {
miniBatchFraction: Double,
initialWeights: Vector,
intercept: Boolean,
- validateData: Boolean): JList[Object] = {
+ validateData: Boolean,
+ convergenceTol: Double): JList[Object] = {
val lassoAlg = new LassoWithSGD()
lassoAlg.setIntercept(intercept)
.setValidateData(validateData)
@@ -168,6 +171,7 @@ private[python] class PythonMLLibAPI extends Serializable {
.setRegParam(regParam)
.setStepSize(stepSize)
.setMiniBatchFraction(miniBatchFraction)
+ .setConvergenceTol(convergenceTol)
trainRegressionModel(
lassoAlg,
data,
@@ -185,7 +189,8 @@ private[python] class PythonMLLibAPI extends Serializable {
miniBatchFraction: Double,
initialWeights: Vector,
intercept: Boolean,
- validateData: Boolean): JList[Object] = {
+ validateData: Boolean,
+ convergenceTol: Double): JList[Object] = {
val ridgeAlg = new RidgeRegressionWithSGD()
ridgeAlg.setIntercept(intercept)
.setValidateData(validateData)
@@ -194,6 +199,7 @@ private[python] class PythonMLLibAPI extends Serializable {
.setRegParam(regParam)
.setStepSize(stepSize)
.setMiniBatchFraction(miniBatchFraction)
+ .setConvergenceTol(convergenceTol)
trainRegressionModel(
ridgeAlg,
data,
@@ -212,7 +218,8 @@ private[python] class PythonMLLibAPI extends Serializable {
initialWeights: Vector,
regType: String,
intercept: Boolean,
- validateData: Boolean): JList[Object] = {
+ validateData: Boolean,
+ convergenceTol: Double): JList[Object] = {
val SVMAlg = new SVMWithSGD()
SVMAlg.setIntercept(intercept)
.setValidateData(validateData)
@@ -221,6 +228,7 @@ private[python] class PythonMLLibAPI extends Serializable {
.setRegParam(regParam)
.setStepSize(stepSize)
.setMiniBatchFraction(miniBatchFraction)
+ .setConvergenceTol(convergenceTol)
SVMAlg.optimizer.setUpdater(getUpdaterFromString(regType))
trainRegressionModel(
SVMAlg,
@@ -240,7 +248,8 @@ private[python] class PythonMLLibAPI extends Serializable {
regParam: Double,
regType: String,
intercept: Boolean,
- validateData: Boolean): JList[Object] = {
+ validateData: Boolean,
+ convergenceTol: Double): JList[Object] = {
val LogRegAlg = new LogisticRegressionWithSGD()
LogRegAlg.setIntercept(intercept)
.setValidateData(validateData)
@@ -249,6 +258,7 @@ private[python] class PythonMLLibAPI extends Serializable {
.setRegParam(regParam)
.setStepSize(stepSize)
.setMiniBatchFraction(miniBatchFraction)
+ .setConvergenceTol(convergenceTol)
LogRegAlg.optimizer.setUpdater(getUpdaterFromString(regType))
trainRegressionModel(
LogRegAlg,
diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py
index 8f27c446a6..cb4ee83678 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -241,7 +241,7 @@ class LogisticRegressionWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0,
initialWeights=None, regParam=0.01, regType="l2", intercept=False,
- validateData=True):
+ validateData=True, convergenceTol=0.001):
"""
Train a logistic regression model on the given data.
@@ -274,11 +274,13 @@ class LogisticRegressionWithSGD(object):
:param validateData: Boolean parameter which indicates if
the algorithm should validate data
before training. (default: True)
+ :param convergenceTol: A condition which decides iteration termination.
+ (default: 0.001)
"""
def train(rdd, i):
return callMLlibFunc("trainLogisticRegressionModelWithSGD", rdd, int(iterations),
float(step), float(miniBatchFraction), i, float(regParam), regType,
- bool(intercept), bool(validateData))
+ bool(intercept), bool(validateData), float(convergenceTol))
return _regression_train_wrapper(train, LogisticRegressionModel, data, initialWeights)
@@ -439,7 +441,7 @@ class SVMWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0, regParam=0.01,
miniBatchFraction=1.0, initialWeights=None, regType="l2",
- intercept=False, validateData=True):
+ intercept=False, validateData=True, convergenceTol=0.001):
"""
Train a support vector machine on the given data.
@@ -472,11 +474,13 @@ class SVMWithSGD(object):
:param validateData: Boolean parameter which indicates if
the algorithm should validate data
before training. (default: True)
+ :param convergenceTol: A condition which decides iteration termination.
+ (default: 0.001)
"""
def train(rdd, i):
return callMLlibFunc("trainSVMModelWithSGD", rdd, int(iterations), float(step),
float(regParam), float(miniBatchFraction), i, regType,
- bool(intercept), bool(validateData))
+ bool(intercept), bool(validateData), float(convergenceTol))
return _regression_train_wrapper(train, SVMModel, data, initialWeights)
@@ -600,12 +604,15 @@ class StreamingLogisticRegressionWithSGD(StreamingLinearAlgorithm):
:param miniBatchFraction: Fraction of data on which SGD is run for each
iteration.
:param regParam: L2 Regularization parameter.
+ :param convergenceTol: A condition which decides iteration termination.
"""
- def __init__(self, stepSize=0.1, numIterations=50, miniBatchFraction=1.0, regParam=0.01):
+ def __init__(self, stepSize=0.1, numIterations=50, miniBatchFraction=1.0, regParam=0.01,
+ convergenceTol=0.001):
self.stepSize = stepSize
self.numIterations = numIterations
self.regParam = regParam
self.miniBatchFraction = miniBatchFraction
+ self.convergenceTol = convergenceTol
self._model = None
super(StreamingLogisticRegressionWithSGD, self).__init__(
model=self._model)
diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py
index 41946e3674..256b7537fe 100644
--- a/python/pyspark/mllib/regression.py
+++ b/python/pyspark/mllib/regression.py
@@ -28,7 +28,8 @@ __all__ = ['LabeledPoint', 'LinearModel',
'LinearRegressionModel', 'LinearRegressionWithSGD',
'RidgeRegressionModel', 'RidgeRegressionWithSGD',
'LassoModel', 'LassoWithSGD', 'IsotonicRegressionModel',
- 'IsotonicRegression']
+ 'IsotonicRegression', 'StreamingLinearAlgorithm',
+ 'StreamingLinearRegressionWithSGD']
class LabeledPoint(object):
@@ -202,7 +203,7 @@ class LinearRegressionWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0,
initialWeights=None, regParam=0.0, regType=None, intercept=False,
- validateData=True):
+ validateData=True, convergenceTol=0.001):
"""
Train a linear regression model using Stochastic Gradient
Descent (SGD).
@@ -244,11 +245,14 @@ class LinearRegressionWithSGD(object):
:param validateData: Boolean parameter which indicates if
the algorithm should validate data
before training. (default: True)
+ :param convergenceTol: A condition which decides iteration termination.
+ (default: 0.001)
"""
def train(rdd, i):
return callMLlibFunc("trainLinearRegressionModelWithSGD", rdd, int(iterations),
float(step), float(miniBatchFraction), i, float(regParam),
- regType, bool(intercept), bool(validateData))
+ regType, bool(intercept), bool(validateData),
+ float(convergenceTol))
return _regression_train_wrapper(train, LinearRegressionModel, data, initialWeights)
@@ -330,7 +334,7 @@ class LassoWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0, regParam=0.01,
miniBatchFraction=1.0, initialWeights=None, intercept=False,
- validateData=True):
+ validateData=True, convergenceTol=0.001):
"""
Train a regression model with L1-regularization using
Stochastic Gradient Descent.
@@ -362,11 +366,13 @@ class LassoWithSGD(object):
:param validateData: Boolean parameter which indicates if
the algorithm should validate data
before training. (default: True)
+ :param convergenceTol: A condition which decides iteration termination.
+ (default: 0.001)
"""
def train(rdd, i):
return callMLlibFunc("trainLassoModelWithSGD", rdd, int(iterations), float(step),
float(regParam), float(miniBatchFraction), i, bool(intercept),
- bool(validateData))
+ bool(validateData), float(convergenceTol))
return _regression_train_wrapper(train, LassoModel, data, initialWeights)
@@ -449,7 +455,7 @@ class RidgeRegressionWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0, regParam=0.01,
miniBatchFraction=1.0, initialWeights=None, intercept=False,
- validateData=True):
+ validateData=True, convergenceTol=0.001):
"""
Train a regression model with L2-regularization using
Stochastic Gradient Descent.
@@ -481,11 +487,13 @@ class RidgeRegressionWithSGD(object):
:param validateData: Boolean parameter which indicates if
the algorithm should validate data
before training. (default: True)
+ :param convergenceTol: A condition which decides iteration termination.
+ (default: 0.001)
"""
def train(rdd, i):
return callMLlibFunc("trainRidgeModelWithSGD", rdd, int(iterations), float(step),
float(regParam), float(miniBatchFraction), i, bool(intercept),
- bool(validateData))
+ bool(validateData), float(convergenceTol))
return _regression_train_wrapper(train, RidgeRegressionModel, data, initialWeights)
@@ -636,15 +644,17 @@ class StreamingLinearRegressionWithSGD(StreamingLinearAlgorithm):
After training on a batch of data, the weights obtained at the end of
training are used as initial weights for the next batch.
- :param: stepSize Step size for each iteration of gradient descent.
- :param: numIterations Total number of iterations run.
- :param: miniBatchFraction Fraction of data on which SGD is run for each
+ :param stepSize: Step size for each iteration of gradient descent.
+ :param numIterations: Total number of iterations run.
+ :param miniBatchFraction: Fraction of data on which SGD is run for each
iteration.
+ :param convergenceTol: A condition which decides iteration termination.
"""
- def __init__(self, stepSize=0.1, numIterations=50, miniBatchFraction=1.0):
+ def __init__(self, stepSize=0.1, numIterations=50, miniBatchFraction=1.0, convergenceTol=0.001):
self.stepSize = stepSize
self.numIterations = numIterations
self.miniBatchFraction = miniBatchFraction
+ self.convergenceTol = convergenceTol
self._model = None
super(StreamingLinearRegressionWithSGD, self).__init__(
model=self._model)