aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorhqzizania <hqzizania@gmail.com>2016-08-20 18:52:44 -0700
committerDB Tsai <dbt@netflix.com>2016-08-20 18:52:44 -0700
commit61ef74f2272faa7ce8f2badc7e00039908e3551f (patch)
treee6e39f47aea0f9a4e45e47b1d9ccabe465b32ff0 /mllib
parent9f37d4eac28dd179dd523fa7d645be97bb52af9c (diff)
downloadspark-61ef74f2272faa7ce8f2badc7e00039908e3551f.tar.gz
spark-61ef74f2272faa7ce8f2badc7e00039908e3551f.tar.bz2
spark-61ef74f2272faa7ce8f2badc7e00039908e3551f.zip
[SPARK-17090][ML] Make tree aggregation level in linear/logistic regression configurable
## What changes were proposed in this pull request? Linear/logistic regression use treeAggregate with default depth (always = 2) for collecting coefficient gradient updates to the driver. For high dimensional problems, this can cause OOM error on the driver. This patch makes it configurable to avoid this problem if users' input data has many features. It adds a HasTreeDepth API in `sharedParams.scala`, and extends it to both Linear regression and logistic regression in .ml Author: hqzizania <hqzizania@gmail.com> Closes #14717 from hqzizania/SPARK-17090.
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala24
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/classification/MultinomialLogisticRegression.scala16
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala4
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala25
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala22
5 files changed, 74 insertions, 17 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index ea31c68e4c..757d52052d 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -48,7 +48,7 @@ import org.apache.spark.storage.StorageLevel
*/
private[classification] trait LogisticRegressionParams extends ProbabilisticClassifierParams
with HasRegParam with HasElasticNetParam with HasMaxIter with HasFitIntercept with HasTol
- with HasStandardization with HasWeightCol with HasThreshold {
+ with HasStandardization with HasWeightCol with HasThreshold with HasAggregationDepth {
/**
* Set threshold in binary classification, in range [0, 1].
@@ -256,6 +256,17 @@ class LogisticRegression @Since("1.2.0") (
@Since("1.5.0")
override def getThresholds: Array[Double] = super.getThresholds
+ /**
+ * Suggested depth for treeAggregate (>= 2).
+ * If the dimensions of features or the number of partitions are large,
+ * this param could be adjusted to a larger size.
+ * Default is 2.
+ * @group expertSetParam
+ */
+ @Since("2.1.0")
+ def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value)
+ setDefault(aggregationDepth -> 2)
+
private var optInitialModel: Option[LogisticRegressionModel] = None
/** @group setParam */
@@ -294,7 +305,8 @@ class LogisticRegression @Since("1.2.0") (
(c1._1.merge(c2._1), c1._2.merge(c2._2))
instances.treeAggregate(
- new MultivariateOnlineSummarizer, new MultiClassSummarizer)(seqOp, combOp)
+ new MultivariateOnlineSummarizer, new MultiClassSummarizer
+ )(seqOp, combOp, $(aggregationDepth))
}
val histogram = labelSummarizer.histogram
@@ -358,7 +370,7 @@ class LogisticRegression @Since("1.2.0") (
val bcFeaturesStd = instances.context.broadcast(featuresStd)
val costFun = new LogisticCostFun(instances, numClasses, $(fitIntercept),
- $(standardization), bcFeaturesStd, regParamL2, multinomial = false)
+ $(standardization), bcFeaturesStd, regParamL2, multinomial = false, $(aggregationDepth))
val optimizer = if ($(elasticNetParam) == 0.0 || $(regParam) == 0.0) {
new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol))
@@ -1331,8 +1343,8 @@ private class LogisticCostFun(
standardization: Boolean,
bcFeaturesStd: Broadcast[Array[Double]],
regParamL2: Double,
- multinomial: Boolean) extends DiffFunction[BDV[Double]] {
-
+ multinomial: Boolean,
+ aggregationDepth: Int) extends DiffFunction[BDV[Double]] {
override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = {
val coeffs = Vectors.fromBreeze(coefficients)
@@ -1347,7 +1359,7 @@ private class LogisticCostFun(
instances.treeAggregate(
new LogisticAggregator(bcCoeffs, bcFeaturesStd, numClasses, fitIntercept,
multinomial)
- )(seqOp, combOp)
+ )(seqOp, combOp, aggregationDepth)
}
val totalGradientArray = logisticAggregator.gradient.toArray
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/MultinomialLogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/MultinomialLogisticRegression.scala
index dfadd68c5f..f85ac76a8d 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/MultinomialLogisticRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/MultinomialLogisticRegression.scala
@@ -44,7 +44,8 @@ import org.apache.spark.storage.StorageLevel
*/
private[classification] trait MultinomialLogisticRegressionParams
extends ProbabilisticClassifierParams with HasRegParam with HasElasticNetParam with HasMaxIter
- with HasFitIntercept with HasTol with HasStandardization with HasWeightCol {
+ with HasFitIntercept with HasTol with HasStandardization with HasWeightCol
+ with HasAggregationDepth {
/**
* Set thresholds in multiclass (or binary) classification to adjust the probability of
@@ -163,6 +164,17 @@ class MultinomialLogisticRegression @Since("2.1.0") (
@Since("2.1.0")
override def setThresholds(value: Array[Double]): this.type = super.setThresholds(value)
+ /**
+ * Suggested depth for treeAggregate (>= 2).
+ * If the dimensions of features or the number of partitions are large,
+ * this param could be adjusted to a larger size.
+ * Default is 2.
+ * @group expertSetParam
+ */
+ @Since("2.1.0")
+ def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value)
+ setDefault(aggregationDepth -> 2)
+
override protected[spark] def train(dataset: Dataset[_]): MultinomialLogisticRegressionModel = {
val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol))
val instances: RDD[Instance] =
@@ -245,7 +257,7 @@ class MultinomialLogisticRegression @Since("2.1.0") (
val bcFeaturesStd = instances.context.broadcast(featuresStd)
val costFun = new LogisticCostFun(instances, numClasses, $(fitIntercept),
- $(standardization), bcFeaturesStd, regParamL2, multinomial = true)
+ $(standardization), bcFeaturesStd, regParamL2, multinomial = true, $(aggregationDepth))
val optimizer = if ($(elasticNetParam) == 0.0 || $(regParam) == 0.0) {
new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol))
diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala
index 4ab0c16a1b..0f48a16a42 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala
@@ -78,7 +78,9 @@ private[shared] object SharedParamsCodeGen {
ParamDesc[String]("weightCol", "weight column name. If this is not set or empty, we treat " +
"all instance weights as 1.0"),
ParamDesc[String]("solver", "the solver algorithm for optimization. If this is not set or " +
- "empty, default value is 'auto'", Some("\"auto\"")))
+ "empty, default value is 'auto'", Some("\"auto\"")),
+ ParamDesc[Int]("aggregationDepth", "suggested depth for treeAggregate (>= 2)", Some("2"),
+ isValid = "ParamValidators.gtEq(2)"))
val code = genSharedParams(params)
val file = "src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala"
diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala
index 64d6af2766..6803772c63 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala
@@ -334,10 +334,10 @@ private[ml] trait HasElasticNetParam extends Params {
private[ml] trait HasTol extends Params {
/**
- * Param for the convergence tolerance for iterative algorithms.
+ * Param for the convergence tolerance for iterative algorithms (>= 0).
* @group param
*/
- final val tol: DoubleParam = new DoubleParam(this, "tol", "the convergence tolerance for iterative algorithms")
+ final val tol: DoubleParam = new DoubleParam(this, "tol", "the convergence tolerance for iterative algorithms (>= 0)", ParamValidators.gtEq(0))
/** @group getParam */
final def getTol: Double = $(tol)
@@ -349,10 +349,10 @@ private[ml] trait HasTol extends Params {
private[ml] trait HasStepSize extends Params {
/**
- * Param for Step size to be used for each iteration of optimization.
+ * Param for Step size to be used for each iteration of optimization (> 0).
* @group param
*/
- final val stepSize: DoubleParam = new DoubleParam(this, "stepSize", "Step size to be used for each iteration of optimization")
+ final val stepSize: DoubleParam = new DoubleParam(this, "stepSize", "Step size to be used for each iteration of optimization (> 0)", ParamValidators.gt(0))
/** @group getParam */
final def getStepSize: Double = $(stepSize)
@@ -389,4 +389,21 @@ private[ml] trait HasSolver extends Params {
/** @group getParam */
final def getSolver: String = $(solver)
}
+
+/**
+ * Trait for shared param aggregationDepth (default: 2).
+ */
+private[ml] trait HasAggregationDepth extends Params {
+
+ /**
+ * Param for suggested depth for treeAggregate (>= 2).
+ * @group param
+ */
+ final val aggregationDepth: IntParam = new IntParam(this, "aggregationDepth", "suggested depth for treeAggregate (>= 2)", ParamValidators.gtEq(2))
+
+ setDefault(aggregationDepth, 2)
+
+ /** @group getParam */
+ final def getAggregationDepth: Int = $(aggregationDepth)
+}
// scalastyle:on
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
index 76be4204e9..b1bb9b9fe0 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
@@ -53,6 +53,7 @@ import org.apache.spark.storage.StorageLevel
private[regression] trait LinearRegressionParams extends PredictorParams
with HasRegParam with HasElasticNetParam with HasMaxIter with HasTol
with HasFitIntercept with HasStandardization with HasWeightCol with HasSolver
+ with HasAggregationDepth
/**
* Linear regression.
@@ -172,6 +173,17 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
def setSolver(value: String): this.type = set(solver, value)
setDefault(solver -> "auto")
+ /**
+ * Suggested depth for treeAggregate (>= 2).
+ * If the dimensions of features or the number of partitions are large,
+ * this param could be adjusted to a larger size.
+ * Default is 2.
+ * @group expertSetParam
+ */
+ @Since("2.1.0")
+ def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value)
+ setDefault(aggregationDepth -> 2)
+
override protected def train(dataset: Dataset[_]): LinearRegressionModel = {
// Extract the number of features before deciding optimization solver.
val numFeatures = dataset.select(col($(featuresCol))).first().getAs[Vector](0).size
@@ -230,7 +242,8 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
(c1._1.merge(c2._1), c1._2.merge(c2._2))
instances.treeAggregate(
- new MultivariateOnlineSummarizer, new MultivariateOnlineSummarizer)(seqOp, combOp)
+ new MultivariateOnlineSummarizer, new MultivariateOnlineSummarizer
+ )(seqOp, combOp, $(aggregationDepth))
}
val yMean = ySummarizer.mean(0)
@@ -296,7 +309,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
val effectiveL2RegParam = (1.0 - $(elasticNetParam)) * effectiveRegParam
val costFun = new LeastSquaresCostFun(instances, yStd, yMean, $(fitIntercept),
- $(standardization), bcFeaturesStd, bcFeaturesMean, effectiveL2RegParam)
+ $(standardization), bcFeaturesStd, bcFeaturesMean, effectiveL2RegParam, $(aggregationDepth))
val optimizer = if ($(elasticNetParam) == 0.0 || effectiveRegParam == 0.0) {
new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol))
@@ -1016,7 +1029,8 @@ private class LeastSquaresCostFun(
standardization: Boolean,
bcFeaturesStd: Broadcast[Array[Double]],
bcFeaturesMean: Broadcast[Array[Double]],
- effectiveL2regParam: Double) extends DiffFunction[BDV[Double]] {
+ effectiveL2regParam: Double,
+ aggregationDepth: Int) extends DiffFunction[BDV[Double]] {
override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = {
val coeffs = Vectors.fromBreeze(coefficients)
@@ -1029,7 +1043,7 @@ private class LeastSquaresCostFun(
instances.treeAggregate(
new LeastSquaresAggregator(bcCoeffs, labelStd, labelMean, fitIntercept, bcFeaturesStd,
- bcFeaturesMean))(seqOp, combOp)
+ bcFeaturesMean))(seqOp, combOp, aggregationDepth)
}
val totalGradientArray = leastSquaresAggregator.gradient.toArray