diff options
author | Zheng RuiFeng <ruifengz@foxmail.com> | 2017-01-17 15:39:51 -0800 |
---|---|---|
committer | Joseph K. Bradley <joseph@databricks.com> | 2017-01-17 15:39:51 -0800 |
commit | e7f982b20d8a1c0db711e0dcfe26b2f39f98dd64 (patch) | |
tree | 1862dcff95b817e75e0074e472f5df630f89ac2c | |
parent | 2992a0e79edcabf3b7434b566239e2a861bc99ea (diff) | |
download | spark-e7f982b20d8a1c0db711e0dcfe26b2f39f98dd64.tar.gz spark-e7f982b20d8a1c0db711e0dcfe26b2f39f98dd64.tar.bz2 spark-e7f982b20d8a1c0db711e0dcfe26b2f39f98dd64.zip |
[SPARK-18206][ML] Add instrumentation for MLP,NB,LDA,AFT,GLM,Isotonic,LiR
## What changes were proposed in this pull request?
add instrumentation for MLP,NB,LDA,AFT,GLM,Isotonic,LiR
## How was this patch tested?
local test in spark-shell
Author: Zheng RuiFeng <ruifengz@foxmail.com>
Author: Ruifeng Zheng <ruifengz@foxmail.com>
Closes #15671 from zhengruifeng/lir_instr.
13 files changed, 105 insertions, 33 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala index c99b63b25d..bb93ba5d9c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala @@ -160,7 +160,9 @@ class GBTClassifier @Since("1.4.0") ( val boostingStrategy = super.getOldBoostingStrategy(categoricalFeatures, OldAlgo.Classification) val instr = Instrumentation.create(this, oldDataset) - instr.logParams(params: _*) + instr.logParams(labelCol, featuresCol, predictionCol, impurity, lossType, + maxDepth, maxBins, maxIter, maxMemoryInMB, minInfoGain, minInstancesPerNode, + seed, stepSize, subsamplingRate, cacheNodeIds, checkpointInterval) instr.logNumFeatures(numFeatures) instr.logNumClasses(2) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala index aaaf7df345..93cc1e6f09 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala @@ -232,8 +232,15 @@ class MultilayerPerceptronClassifier @Since("1.5.0") ( * @return Fitted model */ override protected def train(dataset: Dataset[_]): MultilayerPerceptronClassificationModel = { + val instr = Instrumentation.create(this, dataset) + instr.logParams(labelCol, featuresCol, predictionCol, layers, maxIter, tol, + blockSize, solver, stepSize, seed) + val myLayers = $(layers) val labels = myLayers.last + instr.logNumClasses(labels) + instr.logNumFeatures(myLayers.head) + val lpData = extractLabeledPoints(dataset) val data = lpData.map(lp => LabelConverter.encodeLabeledPoint(lp, labels)) val topology = FeedForwardTopology.multiLayerPerceptron(myLayers, softmaxOnTop = true) @@ -258,7 +265,10 @@ class MultilayerPerceptronClassifier @Since("1.5.0") ( } trainer.setStackSize($(blockSize)) val mlpModel = trainer.train(data) - new MultilayerPerceptronClassificationModel(uid, myLayers, mlpModel.weights) + val model = new MultilayerPerceptronClassificationModel(uid, myLayers, mlpModel.weights) + + instr.logSuccess(model) + model } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala index e90040dbf1..e571359940 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala @@ -147,7 +147,12 @@ class NaiveBayes @Since("1.5.0") ( } } + val instr = Instrumentation.create(this, dataset) + instr.logParams(labelCol, featuresCol, weightCol, predictionCol, rawPredictionCol, + probabilityCol, modelType, smoothing, thresholds) + val numFeatures = dataset.select(col($(featuresCol))).head().getAs[Vector](0).size + instr.logNumFeatures(numFeatures) val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol)) // Aggregates term frequencies per label. @@ -169,6 +174,7 @@ class NaiveBayes @Since("1.5.0") ( }).collect().sortBy(_._1) val numLabels = aggregated.length + instr.logNumClasses(numLabels) val numDocuments = aggregated.map(_._2._1).sum val labelArray = new Array[Double](numLabels) @@ -198,7 +204,9 @@ class NaiveBayes @Since("1.5.0") ( val pi = Vectors.dense(piArray) val theta = new DenseMatrix(numLabels, numFeatures, thetaArray, true) - new NaiveBayesModel(uid, pi, theta).setOldLabels(labelArray) + val model = new NaiveBayesModel(uid, pi, theta).setOldLabels(labelArray) + instr.logSuccess(model) + model } @Since("1.5.0") diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala index 5bbaafeff3..ce834f1d17 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala @@ -131,7 +131,9 @@ class RandomForestClassifier @Since("1.4.0") ( super.getOldStrategy(categoricalFeatures, numClasses, OldAlgo.Classification, getOldImpurity) val instr = Instrumentation.create(this, oldDataset) - instr.logParams(params: _*) + instr.logParams(labelCol, featuresCol, predictionCol, probabilityCol, rawPredictionCol, + impurity, numTrees, featureSubsetStrategy, maxDepth, maxBins, maxMemoryInMB, minInfoGain, + minInstancesPerNode, seed, subsamplingRate, thresholds, cacheNodeIds, checkpointInterval) val trees = RandomForest .run(oldDataset, strategy, getNumTrees, getFeatureSubsetStrategy, getSeed, Some(instr)) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala index 728a883b1a..03f4ac5b28 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala @@ -888,6 +888,12 @@ class LDA @Since("1.6.0") ( @Since("2.0.0") override def fit(dataset: Dataset[_]): LDAModel = { transformSchema(dataset.schema, logging = true) + + val instr = Instrumentation.create(this, dataset) + instr.logParams(featuresCol, topicDistributionCol, k, maxIter, subsamplingRate, + checkpointInterval, keepLastCheckpoint, optimizeDocConcentration, topicConcentration, + learningDecay, optimizer, learningOffset, seed) + val oldLDA = new OldLDA() .setK($(k)) .setDocConcentration(getOldDocConcentration) @@ -905,7 +911,11 @@ class LDA @Since("1.6.0") ( case m: OldDistributedLDAModel => new DistributedLDAModel(uid, m.vocabSize, m, dataset.sparkSession, None) } - copyValues(newModel).setParent(this) + + instr.logNumFeatures(newModel.vocabSize) + val model = copyValues(newModel).setParent(this) + instr.logSuccess(model) + model } @Since("1.6.0") diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index cdea90ec1a..995780bf64 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -457,10 +457,12 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel] .map { row => Rating(row.getInt(0), row.getInt(1), row.getFloat(2)) } + val instr = Instrumentation.create(this, ratings) - instr.logParams(rank, numUserBlocks, numItemBlocks, implicitPrefs, alpha, - userCol, itemCol, ratingCol, predictionCol, maxIter, - regParam, nonnegative, checkpointInterval, seed) + instr.logParams(rank, numUserBlocks, numItemBlocks, implicitPrefs, alpha, userCol, + itemCol, ratingCol, predictionCol, maxIter, regParam, nonnegative, checkpointInterval, + seed, intermediateStorageLevel, finalStorageLevel) + val (userFactors, itemFactors) = ALS.train(ratings, rank = $(rank), numUserBlocks = $(numUserBlocks), numItemBlocks = $(numItemBlocks), maxIter = $(maxIter), regParam = $(regParam), implicitPrefs = $(implicitPrefs), diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala index af68e7b9d5..2f78dd30b3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala @@ -227,6 +227,12 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt) val numFeatures = featuresStd.size + val instr = Instrumentation.create(this, dataset) + instr.logParams(labelCol, featuresCol, censorCol, predictionCol, quantilesCol, + fitIntercept, maxIter, tol, aggregationDepth) + instr.logNamedValue("quantileProbabilities.size", $(quantileProbabilities).length) + instr.logNumFeatures(numFeatures) + if (!$(fitIntercept) && (0 until numFeatures).exists { i => featuresStd(i) == 0.0 && featuresSummarizer.mean(i) != 0.0 }) { logWarning("Fitting AFTSurvivalRegressionModel without intercept on dataset with " + @@ -276,8 +282,10 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S val coefficients = Vectors.dense(rawCoefficients) val intercept = parameters(1) val scale = math.exp(parameters(0)) - val model = new AFTSurvivalRegressionModel(uid, coefficients, intercept, scale) - copyValues(model.setParent(this)) + val model = copyValues(new AFTSurvivalRegressionModel(uid, coefficients, + intercept, scale).setParent(this)) + instr.logSuccess(model) + model } @Since("1.6.0") diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GBTRegressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GBTRegressor.scala index f8ab3d3a45..08d175cb94 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GBTRegressor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GBTRegressor.scala @@ -148,7 +148,9 @@ class GBTRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: String) val boostingStrategy = super.getOldBoostingStrategy(categoricalFeatures, OldAlgo.Regression) val instr = Instrumentation.create(this, oldDataset) - instr.logParams(params: _*) + instr.logParams(labelCol, featuresCol, predictionCol, impurity, lossType, + maxDepth, maxBins, maxIter, maxMemoryInMB, minInfoGain, minInstancesPerNode, + seed, stepSize, subsamplingRate, cacheNodeIds, checkpointInterval) instr.logNumFeatures(numFeatures) val (baseLearners, learnerWeights) = GradientBoostedTrees.run(oldDataset, boostingStrategy, diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index 3891ae63a4..a32302bf5d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -251,6 +251,11 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val val familyAndLink = new FamilyAndLink(familyObj, linkObj) val numFeatures = dataset.select(col($(featuresCol))).first().getAs[Vector](0).size + val instr = Instrumentation.create(this, dataset) + instr.logParams(labelCol, featuresCol, weightCol, predictionCol, linkPredictionCol, + family, solver, fitIntercept, link, maxIter, regParam, tol) + instr.logNumFeatures(numFeatures) + if (numFeatures > WeightedLeastSquares.MAX_NUM_FEATURES) { val msg = "Currently, GeneralizedLinearRegression only supports number of features" + s" <= ${WeightedLeastSquares.MAX_NUM_FEATURES}. Found $numFeatures in the input dataset." @@ -264,7 +269,7 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val Instance(label, weight, features) } - if (familyObj == Gaussian && linkObj == Identity) { + val model = if (familyObj == Gaussian && linkObj == Identity) { // TODO: Make standardizeFeatures and standardizeLabel configurable. val optimizer = new WeightedLeastSquares($(fitIntercept), $(regParam), elasticNetParam = 0.0, standardizeFeatures = true, standardizeLabel = true) @@ -274,21 +279,23 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val .setParent(this)) val trainingSummary = new GeneralizedLinearRegressionTrainingSummary(dataset, model, wlsModel.diagInvAtWA.toArray, 1, getSolver) - return model.setSummary(Some(trainingSummary)) + model.setSummary(Some(trainingSummary)) + } else { + // Fit Generalized Linear Model by iteratively reweighted least squares (IRLS). + val initialModel = familyAndLink.initialize(instances, $(fitIntercept), $(regParam)) + val optimizer = new IterativelyReweightedLeastSquares(initialModel, + familyAndLink.reweightFunc, $(fitIntercept), $(regParam), $(maxIter), $(tol)) + val irlsModel = optimizer.fit(instances) + val model = copyValues( + new GeneralizedLinearRegressionModel(uid, irlsModel.coefficients, irlsModel.intercept) + .setParent(this)) + val trainingSummary = new GeneralizedLinearRegressionTrainingSummary(dataset, model, + irlsModel.diagInvAtWA.toArray, irlsModel.numIterations, getSolver) + model.setSummary(Some(trainingSummary)) } - // Fit Generalized Linear Model by iteratively reweighted least squares (IRLS). - val initialModel = familyAndLink.initialize(instances, $(fitIntercept), $(regParam)) - val optimizer = new IterativelyReweightedLeastSquares(initialModel, familyAndLink.reweightFunc, - $(fitIntercept), $(regParam), $(maxIter), $(tol)) - val irlsModel = optimizer.fit(instances) - - val model = copyValues( - new GeneralizedLinearRegressionModel(uid, irlsModel.coefficients, irlsModel.intercept) - .setParent(this)) - val trainingSummary = new GeneralizedLinearRegressionTrainingSummary(dataset, model, - irlsModel.diagInvAtWA.toArray, irlsModel.numIterations, getSolver) - model.setSummary(Some(trainingSummary)) + instr.logSuccess(model) + model } @Since("2.0.0") diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala index c378a99e3c..1ed9d3c809 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala @@ -171,10 +171,16 @@ class IsotonicRegression @Since("1.5.0") (@Since("1.5.0") override val uid: Stri val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) + val instr = Instrumentation.create(this, dataset) + instr.logParams(labelCol, featuresCol, weightCol, predictionCol, featureIndex, isotonic) + instr.logNumFeatures(1) + val isotonicRegression = new MLlibIsotonicRegression().setIsotonic($(isotonic)) val oldModel = isotonicRegression.run(instances) - copyValues(new IsotonicRegressionModel(uid, oldModel).setParent(this)) + val model = copyValues(new IsotonicRegressionModel(uid, oldModel).setParent(this)) + instr.logSuccess(model) + model } @Since("1.5.0") diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index 534ef87ec6..2de7e81d8d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -91,7 +91,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String setDefault(regParam -> 0.0) /** - * Set if we should fit the intercept + * Set if we should fit the intercept. * Default is true. * * @group setParam @@ -204,6 +204,11 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String Instance(label, weight, features) } + val instr = Instrumentation.create(this, dataset) + instr.logParams(labelCol, featuresCol, weightCol, predictionCol, solver, tol, + elasticNetParam, fitIntercept, maxIter, regParam, standardization, aggregationDepth) + instr.logNumFeatures(numFeatures) + if (($(solver) == "auto" && numFeatures <= WeightedLeastSquares.MAX_NUM_FEATURES) || $(solver) == "normal") { // For low dimensional data, WeightedLeastSquares is more efficient since the @@ -226,7 +231,9 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String model.diagInvAtWA.toArray, model.objectiveHistory) - return lrModel.setSummary(Some(trainingSummary)) + lrModel.setSummary(Some(trainingSummary)) + instr.logSuccess(lrModel) + return lrModel } val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE @@ -251,10 +258,10 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String val rawYStd = math.sqrt(ySummarizer.variance(0)) if (rawYStd == 0.0) { if ($(fitIntercept) || yMean == 0.0) { - // If the rawYStd is zero and fitIntercept=true, then the intercept is yMean with + // If the rawYStd==0 and fitIntercept==true, then the intercept is yMean with // zero coefficient; as a result, training is not needed. // Also, if yMean==0 and rawYStd==0, all the coefficients are zero regardless of - // the fitIntercept + // the fitIntercept. if (yMean == 0.0) { logWarning(s"Mean and standard deviation of the label are zero, so the coefficients " + s"and the intercept will all be zero; as a result, training is not needed.") @@ -279,7 +286,10 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String model, Array(0D), Array(0D)) - return model.setSummary(Some(trainingSummary)) + + model.setSummary(Some(trainingSummary)) + instr.logSuccess(model) + return model } else { require($(regParam) == 0.0, "The standard deviation of the label is zero. " + "Model cannot be regularized.") @@ -401,7 +411,10 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String model, Array(0D), objectiveHistory) + model.setSummary(Some(trainingSummary)) + instr.logSuccess(model) + model } @Since("1.4.0") diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala index ca4a50b825..2f524a8c57 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala @@ -122,7 +122,9 @@ class RandomForestRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: S super.getOldStrategy(categoricalFeatures, numClasses = 0, OldAlgo.Regression, getOldImpurity) val instr = Instrumentation.create(this, oldDataset) - instr.logParams(params: _*) + instr.logParams(labelCol, featuresCol, predictionCol, impurity, numTrees, + featureSubsetStrategy, maxDepth, maxBins, maxMemoryInMB, minInfoGain, + minInstancesPerNode, seed, subsamplingRate, cacheNodeIds, checkpointInterval) val trees = RandomForest .run(oldDataset, strategy, getNumTrees, getFeatureSubsetStrategy, getSeed, Some(instr)) diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala b/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala index a2794368db..7c46f45c59 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala @@ -96,7 +96,7 @@ private[spark] class Instrumentation[E <: Estimator[_]] private ( } /** - * Logs the successful completion of the training session and the value of the learned model. + * Logs the successful completion of the training session. */ def logSuccess(model: Model[_]): Unit = { log(s"training finished") |