diff options
author | WeichenXu <WeichenXu123@outlook.com> | 2016-09-22 04:35:54 -0700 |
---|---|---|
committer | Yanbo Liang <ybliang8@gmail.com> | 2016-09-22 04:35:54 -0700 |
commit | 72d9fba26c19aae73116fd0d00b566967934c6fc (patch) | |
tree | ec8a590dd79baa81f7949c471634ea7443b7f526 /mllib/src/main/scala | |
parent | 646f383465c123062cbcce288a127e23984c7c7f (diff) | |
download | spark-72d9fba26c19aae73116fd0d00b566967934c6fc.tar.gz spark-72d9fba26c19aae73116fd0d00b566967934c6fc.tar.bz2 spark-72d9fba26c19aae73116fd0d00b566967934c6fc.zip |
[SPARK-17281][ML][MLLIB] Add treeAggregateDepth parameter for AFTSurvivalRegression
## What changes were proposed in this pull request?
Add treeAggregateDepth parameter for AFTSurvivalRegression to keep consistent with LiR/LoR.
## How was this patch tested?
Existing tests.
Author: WeichenXu <WeichenXu123@outlook.com>
Closes #14851 from WeichenXu123/add_treeAggregate_param_for_survival_regression.
Diffstat (limited to 'mllib/src/main/scala')
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala | 24 |
1 files changed, 19 insertions, 5 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala index 3179f4882f..9d5ba99978 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala @@ -46,7 +46,7 @@ import org.apache.spark.storage.StorageLevel */ private[regression] trait AFTSurvivalRegressionParams extends Params with HasFeaturesCol with HasLabelCol with HasPredictionCol with HasMaxIter - with HasTol with HasFitIntercept with Logging { + with HasTol with HasFitIntercept with HasAggregationDepth with Logging { /** * Param for censor column name. @@ -184,6 +184,17 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S setDefault(tol -> 1E-6) /** + * Suggested depth for treeAggregate (>= 2). + * If the dimensions of features or the number of partitions are large, + * this param could be adjusted to a larger size. + * Default is 2. + * @group expertSetParam + */ + @Since("2.1.0") + def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value) + setDefault(aggregationDepth -> 2) + + /** * Extract [[featuresCol]], [[labelCol]] and [[censorCol]] from input dataset, * and put it in an RDD with strong types. */ @@ -207,7 +218,9 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S val combOp = (c1: MultivariateOnlineSummarizer, c2: MultivariateOnlineSummarizer) => { c1.merge(c2) } - instances.treeAggregate(new MultivariateOnlineSummarizer)(seqOp, combOp) + instances.treeAggregate( + new MultivariateOnlineSummarizer + )(seqOp, combOp, $(aggregationDepth)) } val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt) @@ -222,7 +235,7 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S val bcFeaturesStd = instances.context.broadcast(featuresStd) - val costFun = new AFTCostFun(instances, $(fitIntercept), bcFeaturesStd) + val costFun = new AFTCostFun(instances, $(fitIntercept), bcFeaturesStd, $(aggregationDepth)) val optimizer = new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol)) /* @@ -591,7 +604,8 @@ private class AFTAggregator( private class AFTCostFun( data: RDD[AFTPoint], fitIntercept: Boolean, - bcFeaturesStd: Broadcast[Array[Double]]) extends DiffFunction[BDV[Double]] { + bcFeaturesStd: Broadcast[Array[Double]], + aggregationDepth: Int) extends DiffFunction[BDV[Double]] { override def calculate(parameters: BDV[Double]): (Double, BDV[Double]) = { @@ -604,7 +618,7 @@ private class AFTCostFun( }, combOp = (c1, c2) => (c1, c2) match { case (aggregator1, aggregator2) => aggregator1.merge(aggregator2) - }) + }, depth = aggregationDepth) bcParameters.destroy(blocking = false) (aftAggregator.loss, aftAggregator.gradient) |