diff options
Diffstat (limited to 'mllib/src/main/scala/org/apache')
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala | 425 | ||||
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/ml/classification/MultinomialLogisticRegression.scala | 620 |
2 files changed, 959 insertions, 86 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index fce3935d39..ea31c68e4c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -63,6 +63,7 @@ private[classification] trait LogisticRegressionParams extends ProbabilisticClas * equivalent. * * Default is 0.5. + * * @group setParam */ def setThreshold(value: Double): this.type = { @@ -131,6 +132,7 @@ private[classification] trait LogisticRegressionParams extends ProbabilisticClas /** * If [[threshold]] and [[thresholds]] are both set, ensures they are consistent. + * * @throws IllegalArgumentException if [[threshold]] and [[thresholds]] are not equivalent */ protected def checkThresholdConsistency(): Unit = { @@ -153,8 +155,8 @@ private[classification] trait LogisticRegressionParams extends ProbabilisticClas /** * Logistic regression. - * Currently, this class only supports binary classification. It will support multiclass - * in the future. + * Currently, this class only supports binary classification. For multiclass classification, + * use [[MultinomialLogisticRegression]] */ @Since("1.2.0") class LogisticRegression @Since("1.2.0") ( @@ -168,6 +170,7 @@ class LogisticRegression @Since("1.2.0") ( /** * Set the regularization parameter. * Default is 0.0. + * * @group setParam */ @Since("1.2.0") @@ -179,6 +182,7 @@ class LogisticRegression @Since("1.2.0") ( * For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. * For 0 < alpha < 1, the penalty is a combination of L1 and L2. * Default is 0.0 which is an L2 penalty. + * * @group setParam */ @Since("1.4.0") @@ -188,6 +192,7 @@ class LogisticRegression @Since("1.2.0") ( /** * Set the maximum number of iterations. * Default is 100. + * * @group setParam */ @Since("1.2.0") @@ -198,6 +203,7 @@ class LogisticRegression @Since("1.2.0") ( * Set the convergence tolerance of iterations. * Smaller value will lead to higher accuracy with the cost of more iterations. * Default is 1E-6. + * * @group setParam */ @Since("1.4.0") @@ -207,6 +213,7 @@ class LogisticRegression @Since("1.2.0") ( /** * Whether to fit an intercept term. * Default is true. + * * @group setParam */ @Since("1.4.0") @@ -220,6 +227,7 @@ class LogisticRegression @Since("1.2.0") ( * the models should be always converged to the same solution when no regularization * is applied. In R's GLMNET package, the default behavior is true as well. * Default is true. + * * @group setParam */ @Since("1.5.0") @@ -233,9 +241,10 @@ class LogisticRegression @Since("1.2.0") ( override def getThreshold: Double = super.getThreshold /** - * Whether to over-/under-sample training instances according to the given weights in weightCol. - * If not set or empty String, all instances are treated equally (weight 1.0). + * Sets the value of param [[weightCol]]. + * If this is not set or empty, we treat all instance weights as 1.0. * Default is not set, so all instances have weight one. + * * @group setParam */ @Since("1.6.0") @@ -310,12 +319,15 @@ class LogisticRegression @Since("1.2.0") ( throw new SparkException(msg) } + val isConstantLabel = histogram.count(_ != 0) == 1 + if (numClasses > 2) { - val msg = s"Currently, LogisticRegression with ElasticNet in ML package only supports " + - s"binary classification. Found $numClasses in the input dataset." + val msg = s"LogisticRegression with ElasticNet in ML package only supports " + + s"binary classification. Found $numClasses in the input dataset. Consider using " + + s"MultinomialLogisticRegression instead." logError(msg) throw new SparkException(msg) - } else if ($(fitIntercept) && numClasses == 2 && histogram(0) == 0.0) { + } else if ($(fitIntercept) && numClasses == 2 && isConstantLabel) { logWarning(s"All labels are one and fitIntercept=true, so the coefficients will be " + s"zeros and the intercept will be positive infinity; as a result, " + s"training is not needed.") @@ -326,12 +338,9 @@ class LogisticRegression @Since("1.2.0") ( s"training is not needed.") (Vectors.sparse(numFeatures, Seq()), Double.NegativeInfinity, Array.empty[Double]) } else { - if (!$(fitIntercept) && numClasses == 2 && histogram(0) == 0.0) { - logWarning(s"All labels are one and fitIntercept=false. It's a dangerous ground, " + - s"so the algorithm may not converge.") - } else if (!$(fitIntercept) && numClasses == 1) { - logWarning(s"All labels are zero and fitIntercept=false. It's a dangerous ground, " + - s"so the algorithm may not converge.") + if (!$(fitIntercept) && isConstantLabel) { + logWarning(s"All labels belong to a single class and fitIntercept=false. It's a " + + s"dangerous ground, so the algorithm may not converge.") } val featuresMean = summarizer.mean.toArray @@ -349,7 +358,7 @@ class LogisticRegression @Since("1.2.0") ( val bcFeaturesStd = instances.context.broadcast(featuresStd) val costFun = new LogisticCostFun(instances, numClasses, $(fitIntercept), - $(standardization), bcFeaturesStd, regParamL2) + $(standardization), bcFeaturesStd, regParamL2, multinomial = false) val optimizer = if ($(elasticNetParam) == 0.0 || $(regParam) == 0.0) { new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol)) @@ -416,7 +425,7 @@ class LogisticRegression @Since("1.2.0") ( /* Note that in Logistic Regression, the objective history (loss + regularization) - is log-likelihood which is invariance under feature standardization. As a result, + is log-likelihood which is invariant under feature standardization. As a result, the objective history from optimizer is the same as the one in the original space. */ val arrayBuilder = mutable.ArrayBuilder.make[Double] @@ -559,6 +568,7 @@ class LogisticRegressionModel private[spark] ( /** * Evaluates the model on a test dataset. + * * @param dataset Test dataset to evaluate model on. */ @Since("2.0.0") @@ -681,6 +691,7 @@ object LogisticRegressionModel extends MLReadable[LogisticRegressionModel] { val data = sparkSession.read.format("parquet").load(dataPath) // We will need numClasses, numFeatures in the future for multinomial logreg support. + // TODO: remove numClasses and numFeatures fields? val Row(numClasses: Int, numFeatures: Int, intercept: Double, coefficients: Vector) = MLUtils.convertVectorColumnsToML(data, "coefficients") .select("numClasses", "numFeatures", "intercept", "coefficients") @@ -710,6 +721,7 @@ private[classification] class MultiClassSummarizer extends Serializable { /** * Add a new label into this MultilabelSummarizer, and update the distinct map. + * * @param label The label for this data point. * @param weight The weight of this instances. * @return This MultilabelSummarizer @@ -933,32 +945,310 @@ class BinaryLogisticRegressionSummary private[classification] ( } /** - * LogisticAggregator computes the gradient and loss for binary logistic loss function, as used - * in binary classification for instances in sparse or dense vector in an online fashion. - * - * Note that multinomial logistic loss is not supported yet! + * LogisticAggregator computes the gradient and loss for binary or multinomial logistic (softmax) + * loss function, as used in classification for instances in sparse or dense vector in an online + * fashion. * - * Two LogisticAggregator can be merged together to have a summary of loss and gradient of + * Two LogisticAggregators can be merged together to have a summary of loss and gradient of * the corresponding joint dataset. * + * For improving the convergence rate during the optimization process and also to prevent against + * features with very large variances exerting an overly large influence during model training, + * packages like R's GLMNET perform the scaling to unit variance and remove the mean in order to + * reduce the condition number. The model is then trained in this scaled space, but returns the + * coefficients in the original scale. See page 9 in + * http://cran.r-project.org/web/packages/glmnet/glmnet.pdf + * + * However, we don't want to apply the [[org.apache.spark.ml.feature.StandardScaler]] on the + * training dataset, and then cache the standardized dataset since it will create a lot of overhead. + * As a result, we perform the scaling implicitly when we compute the objective function (though + * we do not subtract the mean). + * + * Note that there is a difference between multinomial (softmax) and binary loss. The binary case + * uses one outcome class as a "pivot" and regresses the other class against the pivot. In the + * multinomial case, the softmax loss function is used to model each class probability + * independently. Using softmax loss produces `K` sets of coefficients, while using a pivot class + * produces `K - 1` sets of coefficients (a single coefficient vector in the binary case). In the + * binary case, we can say that the coefficients are shared between the positive and negative + * classes. When regularization is applied, multinomial (softmax) loss will produce a result + * different from binary loss since the positive and negative don't share the coefficients while the + * binary regression shares the coefficients between positive and negative. + * + * The following is a mathematical derivation for the multinomial (softmax) loss. + * + * The probability of the multinomial outcome $y$ taking on any of the K possible outcomes is: + * + * <p><blockquote> + * $$ + * P(y_i=0|\vec{x}_i, \beta) = \frac{e^{\vec{x}_i^T \vec{\beta}_0}}{\sum_{k=0}^{K-1} + * e^{\vec{x}_i^T \vec{\beta}_k}} \\ + * P(y_i=1|\vec{x}_i, \beta) = \frac{e^{\vec{x}_i^T \vec{\beta}_1}}{\sum_{k=0}^{K-1} + * e^{\vec{x}_i^T \vec{\beta}_k}}\\ + * P(y_i=K-1|\vec{x}_i, \beta) = \frac{e^{\vec{x}_i^T \vec{\beta}_{K-1}}\,}{\sum_{k=0}^{K-1} + * e^{\vec{x}_i^T \vec{\beta}_k}} + * $$ + * </blockquote></p> + * + * The model coefficients $\beta = (\beta_0, \beta_1, \beta_2, ..., \beta_{K-1})$ become a matrix + * which has dimension of $K \times (N+1)$ if the intercepts are added. If the intercepts are not + * added, the dimension will be $K \times N$. + * + * Note that the coefficients in the model above lack identifiability. That is, any constant scalar + * can be added to all of the coefficients and the probabilities remain the same. + * + * <p><blockquote> + * $$ + * \begin{align} + * \frac{e^{\vec{x}_i^T \left(\vec{\beta}_0 + \vec{c}\right)}}{\sum_{k=0}^{K-1} + * e^{\vec{x}_i^T \left(\vec{\beta}_k + \vec{c}\right)}} + * = \frac{e^{\vec{x}_i^T \vec{\beta}_0}e^{\vec{x}_i^T \vec{c}}\,}{e^{\vec{x}_i^T \vec{c}} + * \sum_{k=0}^{K-1} e^{\vec{x}_i^T \vec{\beta}_k}} + * = \frac{e^{\vec{x}_i^T \vec{\beta}_0}}{\sum_{k=0}^{K-1} e^{\vec{x}_i^T \vec{\beta}_k}} + * \end{align} + * $$ + * </blockquote></p> + * + * However, when regularization is added to the loss function, the coefficients are indeed + * identifiable because there is only one set of coefficients which minimizes the regularization + * term. When no regularization is applied, we choose the coefficients with the minimum L2 + * penalty for consistency and reproducibility. For further discussion see: + * + * Friedman, et al. "Regularization Paths for Generalized Linear Models via Coordinate Descent" + * + * The loss of objective function for a single instance of data (we do not include the + * regularization term here for simplicity) can be written as + * + * <p><blockquote> + * $$ + * \begin{align} + * \ell\left(\beta, x_i\right) &= -log{P\left(y_i \middle| \vec{x}_i, \beta\right)} \\ + * &= log\left(\sum_{k=0}^{K-1}e^{\vec{x}_i^T \vec{\beta}_k}\right) - \vec{x}_i^T \vec{\beta}_y\\ + * &= log\left(\sum_{k=0}^{K-1} e^{margins_k}\right) - margins_y + * \end{align} + * $$ + * </blockquote></p> + * + * where ${margins}_k = \vec{x}_i^T \vec{\beta}_k$. + * + * For optimization, we have to calculate the first derivative of the loss function, and a simple + * calculation shows that + * + * <p><blockquote> + * $$ + * \begin{align} + * \frac{\partial \ell(\beta, \vec{x}_i, w_i)}{\partial \beta_{j, k}} + * &= x_{i,j} \cdot w_i \cdot \left(\frac{e^{\vec{x}_i \cdot \vec{\beta}_k}}{\sum_{k'=0}^{K-1} + * e^{\vec{x}_i \cdot \vec{\beta}_{k'}}\,} - I_{y=k}\right) \\ + * &= x_{i, j} \cdot w_i \cdot multiplier_k + * \end{align} + * $$ + * </blockquote></p> + * + * where $w_i$ is the sample weight, $I_{y=k}$ is an indicator function + * + * <p><blockquote> + * $$ + * I_{y=k} = \begin{cases} + * 1 & y = k \\ + * 0 & else + * \end{cases} + * $$ + * </blockquote></p> + * + * and + * + * <p><blockquote> + * $$ + * multiplier_k = \left(\frac{e^{\vec{x}_i \cdot \vec{\beta}_k}}{\sum_{k=0}^{K-1} + * e^{\vec{x}_i \cdot \vec{\beta}_k}} - I_{y=k}\right) + * $$ + * </blockquote></p> + * + * If any of margins is larger than 709.78, the numerical computation of multiplier and loss + * function will suffer from arithmetic overflow. This issue occurs when there are outliers in + * data which are far away from the hyperplane, and this will cause the failing of training once + * infinity is introduced. Note that this is only a concern when max(margins) > 0. + * + * Fortunately, when max(margins) = maxMargin > 0, the loss function and the multiplier can easily + * be rewritten into the following equivalent numerically stable formula. + * + * <p><blockquote> + * $$ + * \ell\left(\beta, x\right) = log\left(\sum_{k=0}^{K-1} e^{margins_k - maxMargin}\right) - + * margins_{y} + maxMargin + * $$ + * </blockquote></p> + * + * Note that each term, $(margins_k - maxMargin)$ in the exponential is no greater than zero; as a + * result, overflow will not happen with this formula. + * + * For $multiplier$, a similar trick can be applied as the following, + * + * <p><blockquote> + * $$ + * multiplier_k = \left(\frac{e^{\vec{x}_i \cdot \vec{\beta}_k - maxMargin}}{\sum_{k'=0}^{K-1} + * e^{\vec{x}_i \cdot \vec{\beta}_{k'} - maxMargin}} - I_{y=k}\right) + * $$ + * </blockquote></p> + * * @param bcCoefficients The broadcast coefficients corresponding to the features. * @param bcFeaturesStd The broadcast standard deviation values of the features. * @param numClasses the number of possible outcomes for k classes classification problem in * Multinomial Logistic Regression. * @param fitIntercept Whether to fit an intercept term. + * @param multinomial Whether to use multinomial (softmax) or binary loss */ private class LogisticAggregator( - val bcCoefficients: Broadcast[Vector], - val bcFeaturesStd: Broadcast[Array[Double]], - private val numFeatures: Int, + bcCoefficients: Broadcast[Vector], + bcFeaturesStd: Broadcast[Array[Double]], numClasses: Int, - fitIntercept: Boolean) extends Serializable { + fitIntercept: Boolean, + multinomial: Boolean) extends Serializable with Logging { + + private val numFeatures = bcFeaturesStd.value.length + private val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures + private val coefficientSize = bcCoefficients.value.size + if (multinomial) { + require(numClasses == coefficientSize / numFeaturesPlusIntercept, s"The number of " + + s"coefficients should be ${numClasses * numFeaturesPlusIntercept} but was $coefficientSize") + } else { + require(coefficientSize == numFeaturesPlusIntercept, s"Expected $numFeaturesPlusIntercept " + + s"coefficients but got $coefficientSize") + require(numClasses == 1 || numClasses == 2, s"Binary logistic aggregator requires numClasses " + + s"in {1, 2} but found $numClasses.") + } private var weightSum = 0.0 private var lossSum = 0.0 - private val gradientSumArray = - Array.ofDim[Double](if (fitIntercept) numFeatures + 1 else numFeatures) + private val gradientSumArray = Array.ofDim[Double](coefficientSize) + + if (multinomial && numClasses <= 2) { + logInfo(s"Multinomial logistic regression for binary classification yields separate " + + s"coefficients for positive and negative classes. When no regularization is applied, the" + + s"result will be effectively the same as binary logistic regression. When regularization" + + s"is applied, multinomial loss will produce a result different from binary loss.") + } + + /** Update gradient and loss using binary loss function. */ + private def binaryUpdateInPlace( + features: Vector, + weight: Double, + label: Double): Unit = { + + val localFeaturesStd = bcFeaturesStd.value + val localCoefficients = bcCoefficients.value + val localGradientArray = gradientSumArray + val margin = - { + var sum = 0.0 + features.foreachActive { (index, value) => + if (localFeaturesStd(index) != 0.0 && value != 0.0) { + sum += localCoefficients(index) * value / localFeaturesStd(index) + } + } + if (fitIntercept) sum += localCoefficients(numFeaturesPlusIntercept - 1) + sum + } + + val multiplier = weight * (1.0 / (1.0 + math.exp(margin)) - label) + + features.foreachActive { (index, value) => + if (localFeaturesStd(index) != 0.0 && value != 0.0) { + localGradientArray(index) += multiplier * value / localFeaturesStd(index) + } + } + + if (fitIntercept) { + localGradientArray(numFeaturesPlusIntercept - 1) += multiplier + } + + if (label > 0) { + // The following is equivalent to log(1 + exp(margin)) but more numerically stable. + lossSum += weight * MLUtils.log1pExp(margin) + } else { + lossSum += weight * (MLUtils.log1pExp(margin) - margin) + } + } + + /** Update gradient and loss using multinomial (softmax) loss function. */ + private def multinomialUpdateInPlace( + features: Vector, + weight: Double, + label: Double): Unit = { + // TODO: use level 2 BLAS operations + /* + Note: this can still be used when numClasses = 2 for binary + logistic regression without pivoting. + */ + val localFeaturesStd = bcFeaturesStd.value + val localCoefficients = bcCoefficients.value + val localGradientArray = gradientSumArray + + // marginOfLabel is margins(label) in the formula + var marginOfLabel = 0.0 + var maxMargin = Double.NegativeInfinity + + val margins = Array.tabulate(numClasses) { i => + var margin = 0.0 + features.foreachActive { (index, value) => + if (localFeaturesStd(index) != 0.0 && value != 0.0) { + margin += localCoefficients(i * numFeaturesPlusIntercept + index) * + value / localFeaturesStd(index) + } + } + + if (fitIntercept) { + margin += localCoefficients(i * numFeaturesPlusIntercept + numFeatures) + } + if (i == label.toInt) marginOfLabel = margin + if (margin > maxMargin) { + maxMargin = margin + } + margin + } + + /** + * When maxMargin > 0, the original formula could cause overflow. + * We address this by subtracting maxMargin from all the margins, so it's guaranteed + * that all of the new margins will be smaller than zero to prevent arithmetic overflow. + */ + val sum = { + var temp = 0.0 + if (maxMargin > 0) { + for (i <- 0 until numClasses) { + margins(i) -= maxMargin + temp += math.exp(margins(i)) + } + } else { + for (i <- 0 until numClasses) { + temp += math.exp(margins(i)) + } + } + temp + } + + for (i <- 0 until numClasses) { + val multiplier = math.exp(margins(i)) / sum - { + if (label == i) 1.0 else 0.0 + } + features.foreachActive { (index, value) => + if (localFeaturesStd(index) != 0.0 && value != 0.0) { + localGradientArray(i * numFeaturesPlusIntercept + index) += + weight * multiplier * value / localFeaturesStd(index) + } + } + if (fitIntercept) { + localGradientArray(i * numFeaturesPlusIntercept + numFeatures) += weight * multiplier + } + } + + val loss = if (maxMargin > 0) { + math.log(sum) - marginOfLabel + maxMargin + } else { + math.log(sum) - marginOfLabel + } + lossSum += weight * loss + } /** * Add a new training instance to this LogisticAggregator, and update the loss and gradient @@ -975,52 +1265,10 @@ private class LogisticAggregator( if (weight == 0.0) return this - val coefficientsArray = bcCoefficients.value match { - case dv: DenseVector => dv.values - case _ => - throw new IllegalArgumentException( - "coefficients only supports dense vector" + - s"but got type ${bcCoefficients.value.getClass}.") - } - val localGradientSumArray = gradientSumArray - - val featuresStd = bcFeaturesStd.value - numClasses match { - case 2 => - // For Binary Logistic Regression. - val margin = - { - var sum = 0.0 - features.foreachActive { (index, value) => - if (featuresStd(index) != 0.0 && value != 0.0) { - sum += coefficientsArray(index) * (value / featuresStd(index)) - } - } - sum + { - if (fitIntercept) coefficientsArray(numFeatures) else 0.0 - } - } - - val multiplier = weight * (1.0 / (1.0 + math.exp(margin)) - label) - - features.foreachActive { (index, value) => - if (featuresStd(index) != 0.0 && value != 0.0) { - localGradientSumArray(index) += multiplier * (value / featuresStd(index)) - } - } - - if (fitIntercept) { - localGradientSumArray(numFeatures) += multiplier - } - - if (label > 0) { - // The following is equivalent to log(1 + exp(margin)) but more numerically stable. - lossSum += weight * MLUtils.log1pExp(margin) - } else { - lossSum += weight * (MLUtils.log1pExp(margin) - margin) - } - case _ => - new NotImplementedError("LogisticRegression with ElasticNet in ML package " + - "only supports binary classification for now.") + if (multinomial) { + multinomialUpdateInPlace(features, weight, label) + } else { + binaryUpdateInPlace(features, weight, label) } weightSum += weight this @@ -1071,8 +1319,8 @@ private class LogisticAggregator( } /** - * LogisticCostFun implements Breeze's DiffFunction[T] for a multinomial logistic loss function, - * as used in multi-class classification (it is also used in binary logistic regression). + * LogisticCostFun implements Breeze's DiffFunction[T] for a multinomial (softmax) logistic loss + * function, as used in multi-class classification (it is also used in binary logistic regression). * It returns the loss and gradient with L2 regularization at a particular point (coefficients). * It's used in Breeze's convex optimization routines. */ @@ -1082,36 +1330,36 @@ private class LogisticCostFun( fitIntercept: Boolean, standardization: Boolean, bcFeaturesStd: Broadcast[Array[Double]], - regParamL2: Double) extends DiffFunction[BDV[Double]] { + regParamL2: Double, + multinomial: Boolean) extends DiffFunction[BDV[Double]] { - val featuresStd = bcFeaturesStd.value override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = { - val numFeatures = featuresStd.length val coeffs = Vectors.fromBreeze(coefficients) val bcCoeffs = instances.context.broadcast(coeffs) - val n = coeffs.size + val featuresStd = bcFeaturesStd.value + val numFeatures = featuresStd.length val logisticAggregator = { val seqOp = (c: LogisticAggregator, instance: Instance) => c.add(instance) val combOp = (c1: LogisticAggregator, c2: LogisticAggregator) => c1.merge(c2) instances.treeAggregate( - new LogisticAggregator(bcCoeffs, bcFeaturesStd, numFeatures, numClasses, fitIntercept) + new LogisticAggregator(bcCoeffs, bcFeaturesStd, numClasses, fitIntercept, + multinomial) )(seqOp, combOp) } val totalGradientArray = logisticAggregator.gradient.toArray - // regVal is the sum of coefficients squares excluding intercept for L2 regularization. val regVal = if (regParamL2 == 0.0) { 0.0 } else { var sum = 0.0 - coeffs.foreachActive { (index, value) => - // If `fitIntercept` is true, the last term which is intercept doesn't - // contribute to the regularization. - if (index != numFeatures) { + coeffs.foreachActive { case (index, value) => + // We do not apply regularization to the intercepts + val isIntercept = fitIntercept && ((index + 1) % (numFeatures + 1) == 0) + if (!isIntercept) { // The following code will compute the loss of the regularization; also // the gradient of the regularization, and add back to totalGradientArray. sum += { @@ -1119,13 +1367,18 @@ private class LogisticCostFun( totalGradientArray(index) += regParamL2 * value value * value } else { - if (featuresStd(index) != 0.0) { + val featureIndex = if (fitIntercept) { + index % (numFeatures + 1) + } else { + index % numFeatures + } + if (featuresStd(featureIndex) != 0.0) { // If `standardization` is false, we still standardize the data // to improve the rate of convergence; as a result, we have to // perform this reverse standardization by penalizing each component // differently to get effectively the same objective function when // the training dataset is not standardized. - val temp = value / (featuresStd(index) * featuresStd(index)) + val temp = value / (featuresStd(featureIndex) * featuresStd(featureIndex)) totalGradientArray(index) += regParamL2 * temp value * temp } else { diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/MultinomialLogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/MultinomialLogisticRegression.scala new file mode 100644 index 0000000000..dfadd68c5f --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/MultinomialLogisticRegression.scala @@ -0,0 +1,620 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.classification + +import scala.collection.mutable + +import breeze.linalg.{DenseVector => BDV} +import breeze.optimize.{CachedDiffFunction, LBFGS => BreezeLBFGS, OWLQN => BreezeOWLQN} +import org.apache.hadoop.fs.Path + +import org.apache.spark.SparkException +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.internal.Logging +import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.linalg._ +import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared._ +import org.apache.spark.ml.util._ +import org.apache.spark.mllib.linalg.VectorImplicits._ +import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Dataset, Row} +import org.apache.spark.sql.functions.{col, lit} +import org.apache.spark.sql.types.DoubleType +import org.apache.spark.storage.StorageLevel + +/** + * Params for multinomial logistic (softmax) regression. + */ +private[classification] trait MultinomialLogisticRegressionParams + extends ProbabilisticClassifierParams with HasRegParam with HasElasticNetParam with HasMaxIter + with HasFitIntercept with HasTol with HasStandardization with HasWeightCol { + + /** + * Set thresholds in multiclass (or binary) classification to adjust the probability of + * predicting each class. Array must have length equal to the number of classes, with values >= 0. + * The class with largest value p/t is predicted, where p is the original probability of that + * class and t is the class' threshold. + * + * @group setParam + */ + def setThresholds(value: Array[Double]): this.type = { + set(thresholds, value) + } + + /** + * Get thresholds for binary or multiclass classification. + * + * @group getParam + */ + override def getThresholds: Array[Double] = { + $(thresholds) + } +} + +/** + * :: Experimental :: + * Multinomial Logistic (softmax) regression. + */ +@Since("2.1.0") +@Experimental +class MultinomialLogisticRegression @Since("2.1.0") ( + @Since("2.1.0") override val uid: String) + extends ProbabilisticClassifier[Vector, + MultinomialLogisticRegression, MultinomialLogisticRegressionModel] + with MultinomialLogisticRegressionParams with DefaultParamsWritable with Logging { + + @Since("2.1.0") + def this() = this(Identifiable.randomUID("mlogreg")) + + /** + * Set the regularization parameter. + * Default is 0.0. + * + * @group setParam + */ + @Since("2.1.0") + def setRegParam(value: Double): this.type = set(regParam, value) + setDefault(regParam -> 0.0) + + /** + * Set the ElasticNet mixing parameter. + * For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. + * For 0 < alpha < 1, the penalty is a combination of L1 and L2. + * Default is 0.0 which is an L2 penalty. + * + * @group setParam + */ + @Since("2.1.0") + def setElasticNetParam(value: Double): this.type = set(elasticNetParam, value) + setDefault(elasticNetParam -> 0.0) + + /** + * Set the maximum number of iterations. + * Default is 100. + * + * @group setParam + */ + @Since("2.1.0") + def setMaxIter(value: Int): this.type = set(maxIter, value) + setDefault(maxIter -> 100) + + /** + * Set the convergence tolerance of iterations. + * Smaller value will lead to higher accuracy with the cost of more iterations. + * Default is 1E-6. + * + * @group setParam + */ + @Since("2.1.0") + def setTol(value: Double): this.type = set(tol, value) + setDefault(tol -> 1E-6) + + /** + * Whether to fit an intercept term. + * Default is true. + * + * @group setParam + */ + @Since("2.1.0") + def setFitIntercept(value: Boolean): this.type = set(fitIntercept, value) + setDefault(fitIntercept -> true) + + /** + * Whether to standardize the training features before fitting the model. + * The coefficients of models will be always returned on the original scale, + * so it will be transparent for users. Note that with/without standardization, + * the models should always converge to the same solution when no regularization + * is applied. In R's GLMNET package, the default behavior is true as well. + * Default is true. + * + * @group setParam + */ + @Since("2.1.0") + def setStandardization(value: Boolean): this.type = set(standardization, value) + setDefault(standardization -> true) + + /** + * Sets the value of param [[weightCol]]. + * If this is not set or empty, we treat all instance weights as 1.0. + * Default is not set, so all instances have weight one. + * + * @group setParam + */ + @Since("2.1.0") + def setWeightCol(value: String): this.type = set(weightCol, value) + + @Since("2.1.0") + override def setThresholds(value: Array[Double]): this.type = super.setThresholds(value) + + override protected[spark] def train(dataset: Dataset[_]): MultinomialLogisticRegressionModel = { + val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol)) + val instances: RDD[Instance] = + dataset.select(col($(labelCol)).cast(DoubleType), w, col($(featuresCol))).rdd.map { + case Row(label: Double, weight: Double, features: Vector) => + Instance(label, weight, features) + } + + val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE + if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) + + val instr = Instrumentation.create(this, instances) + instr.logParams(regParam, elasticNetParam, standardization, thresholds, + maxIter, tol, fitIntercept) + + val (summarizer, labelSummarizer) = { + val seqOp = (c: (MultivariateOnlineSummarizer, MultiClassSummarizer), + instance: Instance) => + (c._1.add(instance.features, instance.weight), c._2.add(instance.label, instance.weight)) + + val combOp = (c1: (MultivariateOnlineSummarizer, MultiClassSummarizer), + c2: (MultivariateOnlineSummarizer, MultiClassSummarizer)) => + (c1._1.merge(c2._1), c1._2.merge(c2._2)) + + instances.treeAggregate( + new MultivariateOnlineSummarizer, new MultiClassSummarizer)(seqOp, combOp) + } + + val histogram = labelSummarizer.histogram + val numInvalid = labelSummarizer.countInvalid + val numFeatures = summarizer.mean.size + val numFeaturesPlusIntercept = if (getFitIntercept) numFeatures + 1 else numFeatures + + val numClasses = MetadataUtils.getNumClasses(dataset.schema($(labelCol))) match { + case Some(n: Int) => + require(n >= histogram.length, s"Specified number of classes $n was " + + s"less than the number of unique labels ${histogram.length}") + n + case None => histogram.length + } + + instr.logNumClasses(numClasses) + instr.logNumFeatures(numFeatures) + + val (coefficients, intercepts, objectiveHistory) = { + if (numInvalid != 0) { + val msg = s"Classification labels should be in {0 to ${numClasses - 1} " + + s"Found $numInvalid invalid labels." + logError(msg) + throw new SparkException(msg) + } + + val isConstantLabel = histogram.count(_ != 0) == 1 + + if ($(fitIntercept) && isConstantLabel) { + // we want to produce a model that will always predict the constant label so all the + // coefficients will be zero, and the constant label class intercept will be +inf + val constantLabelIndex = Vectors.dense(histogram).argmax + (Matrices.sparse(numClasses, numFeatures, Array.fill(numFeatures + 1)(0), + Array.empty[Int], Array.empty[Double]), + Vectors.sparse(numClasses, Seq((constantLabelIndex, Double.PositiveInfinity))), + Array.empty[Double]) + } else { + if (!$(fitIntercept) && isConstantLabel) { + logWarning(s"All labels belong to a single class and fitIntercept=false. It's" + + s"a dangerous ground, so the algorithm may not converge.") + } + + val featuresStd = summarizer.variance.toArray.map(math.sqrt) + val featuresMean = summarizer.mean.toArray + if (!$(fitIntercept) && (0 until numFeatures).exists { i => + featuresStd(i) == 0.0 && featuresMean(i) != 0.0 }) { + logWarning("Fitting MultinomialLogisticRegressionModel without intercept on dataset " + + "with constant nonzero column, Spark MLlib outputs zero coefficients for constant " + + "nonzero columns. This behavior is the same as R glmnet but different from LIBSVM.") + } + + val regParamL1 = $(elasticNetParam) * $(regParam) + val regParamL2 = (1.0 - $(elasticNetParam)) * $(regParam) + + val bcFeaturesStd = instances.context.broadcast(featuresStd) + val costFun = new LogisticCostFun(instances, numClasses, $(fitIntercept), + $(standardization), bcFeaturesStd, regParamL2, multinomial = true) + + val optimizer = if ($(elasticNetParam) == 0.0 || $(regParam) == 0.0) { + new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol)) + } else { + val standardizationParam = $(standardization) + def regParamL1Fun = (index: Int) => { + // Remove the L1 penalization on the intercept + val isIntercept = $(fitIntercept) && ((index + 1) % numFeaturesPlusIntercept == 0) + if (isIntercept) { + 0.0 + } else { + if (standardizationParam) { + regParamL1 + } else { + val featureIndex = if ($(fitIntercept)) { + index % numFeaturesPlusIntercept + } else { + index % numFeatures + } + // If `standardization` is false, we still standardize the data + // to improve the rate of convergence; as a result, we have to + // perform this reverse standardization by penalizing each component + // differently to get effectively the same objective function when + // the training dataset is not standardized. + if (featuresStd(featureIndex) != 0.0) { + regParamL1 / featuresStd(featureIndex) + } else { + 0.0 + } + } + } + } + new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol)) + } + + val initialCoefficientsWithIntercept = Vectors.zeros(numClasses * numFeaturesPlusIntercept) + + if ($(fitIntercept)) { + /* + For multinomial logistic regression, when we initialize the coefficients as zeros, + it will converge faster if we initialize the intercepts such that + it follows the distribution of the labels. + {{{ + P(1) = \exp(b_1) / Z + ... + P(K) = \exp(b_K) / Z + where Z = \sum_{k=1}^{K} \exp(b_k) + }}} + Since this doesn't have a unique solution, one of the solutions that satisfies the + above equations is + {{{ + \exp(b_k) = count_k * \exp(\lambda) + b_k = \log(count_k) * \lambda + }}} + \lambda is a free parameter, so choose the phase \lambda such that the + mean is centered. This yields + {{{ + b_k = \log(count_k) + b_k' = b_k - \mean(b_k) + }}} + */ + val rawIntercepts = histogram.map(c => math.log(c + 1)) // add 1 for smoothing + val rawMean = rawIntercepts.sum / rawIntercepts.length + rawIntercepts.indices.foreach { i => + initialCoefficientsWithIntercept.toArray(i * numFeaturesPlusIntercept + numFeatures) = + rawIntercepts(i) - rawMean + } + } + + val states = optimizer.iterations(new CachedDiffFunction(costFun), + initialCoefficientsWithIntercept.asBreeze.toDenseVector) + + /* + Note that in Multinomial Logistic Regression, the objective history + (loss + regularization) is log-likelihood which is invariant under feature + standardization. As a result, the objective history from optimizer is the same as the + one in the original space. + */ + val arrayBuilder = mutable.ArrayBuilder.make[Double] + var state: optimizer.State = null + while (states.hasNext) { + state = states.next() + arrayBuilder += state.adjustedValue + } + + if (state == null) { + val msg = s"${optimizer.getClass.getName} failed." + logError(msg) + throw new SparkException(msg) + } + bcFeaturesStd.destroy(blocking = false) + + /* + The coefficients are trained in the scaled space; we're converting them back to + the original space. + Note that the intercept in scaled space and original space is the same; + as a result, no scaling is needed. + */ + val rawCoefficients = state.x.toArray + val interceptsArray: Array[Double] = if ($(fitIntercept)) { + Array.tabulate(numClasses) { i => + val coefIndex = (i + 1) * numFeaturesPlusIntercept - 1 + rawCoefficients(coefIndex) + } + } else { + Array[Double]() + } + + val coefficientArray: Array[Double] = Array.tabulate(numClasses * numFeatures) { i => + // flatIndex will loop though rawCoefficients, and skip the intercept terms. + val flatIndex = if ($(fitIntercept)) i + i / numFeatures else i + val featureIndex = i % numFeatures + if (featuresStd(featureIndex) != 0.0) { + rawCoefficients(flatIndex) / featuresStd(featureIndex) + } else { + 0.0 + } + } + val coefficientMatrix = + new DenseMatrix(numClasses, numFeatures, coefficientArray, isTransposed = true) + + /* + When no regularization is applied, the coefficients lack identifiability because + we do not use a pivot class. We can add any constant value to the coefficients and + get the same likelihood. So here, we choose the mean centered coefficients for + reproducibility. This method follows the approach in glmnet, described here: + + Friedman, et al. "Regularization Paths for Generalized Linear Models via + Coordinate Descent," https://core.ac.uk/download/files/153/6287975.pdf + */ + if ($(regParam) == 0.0) { + val coefficientMean = coefficientMatrix.values.sum / (numClasses * numFeatures) + coefficientMatrix.update(_ - coefficientMean) + } + /* + The intercepts are never regularized, so we always center the mean. + */ + val interceptVector = if (interceptsArray.nonEmpty) { + val interceptMean = interceptsArray.sum / numClasses + interceptsArray.indices.foreach { i => interceptsArray(i) -= interceptMean } + Vectors.dense(interceptsArray) + } else { + Vectors.sparse(numClasses, Seq()) + } + + (coefficientMatrix, interceptVector, arrayBuilder.result()) + } + } + + if (handlePersistence) instances.unpersist() + + val model = copyValues( + new MultinomialLogisticRegressionModel(uid, coefficients, intercepts, numClasses)) + instr.logSuccess(model) + model + } + + @Since("2.1.0") + override def copy(extra: ParamMap): MultinomialLogisticRegression = defaultCopy(extra) +} + +@Since("2.1.0") +object MultinomialLogisticRegression extends DefaultParamsReadable[MultinomialLogisticRegression] { + + @Since("2.1.0") + override def load(path: String): MultinomialLogisticRegression = super.load(path) +} + +/** + * :: Experimental :: + * Model produced by [[MultinomialLogisticRegression]]. + */ +@Since("2.1.0") +@Experimental +class MultinomialLogisticRegressionModel private[spark] ( + @Since("2.1.0") override val uid: String, + @Since("2.1.0") val coefficients: Matrix, + @Since("2.1.0") val intercepts: Vector, + @Since("2.1.0") val numClasses: Int) + extends ProbabilisticClassificationModel[Vector, MultinomialLogisticRegressionModel] + with MultinomialLogisticRegressionParams with MLWritable { + + @Since("2.1.0") + override def setThresholds(value: Array[Double]): this.type = super.setThresholds(value) + + @Since("2.1.0") + override def getThresholds: Array[Double] = super.getThresholds + + @Since("2.1.0") + override val numFeatures: Int = coefficients.numCols + + /** Margin (rawPrediction) for each class label. */ + private val margins: Vector => Vector = (features) => { + val m = intercepts.toDense.copy + BLAS.gemv(1.0, coefficients, features, 1.0, m) + m + } + + /** Score (probability) for each class label. */ + private val scores: Vector => Vector = (features) => { + val m = margins(features) + val maxMarginIndex = m.argmax + val marginArray = m.toArray + val maxMargin = marginArray(maxMarginIndex) + + // adjust margins for overflow + val sum = { + var temp = 0.0 + var k = 0 + while (k < numClasses) { + marginArray(k) = if (maxMargin > 0) { + math.exp(marginArray(k) - maxMargin) + } else { + math.exp(marginArray(k)) + } + temp += marginArray(k) + k += 1 + } + temp + } + + val scores = Vectors.dense(marginArray) + BLAS.scal(1 / sum, scores) + scores + } + + /** + * Predict label for the given feature vector. + * The behavior of this can be adjusted using [[thresholds]]. + */ + override protected def predict(features: Vector): Double = { + if (isDefined(thresholds)) { + val thresholds: Array[Double] = getThresholds + val probabilities = scores(features).toArray + var argMax = 0 + var max = Double.NegativeInfinity + var i = 0 + while (i < numClasses) { + if (thresholds(i) == 0.0) { + max = Double.PositiveInfinity + argMax = i + } else { + val scaled = probabilities(i) / thresholds(i) + if (scaled > max) { + max = scaled + argMax = i + } + } + i += 1 + } + argMax + } else { + scores(features).argmax + } + } + + override protected def raw2probabilityInPlace(rawPrediction: Vector): Vector = { + rawPrediction match { + case dv: DenseVector => + val size = dv.size + val values = dv.values + + // get the maximum margin + val maxMarginIndex = rawPrediction.argmax + val maxMargin = rawPrediction(maxMarginIndex) + + if (maxMargin == Double.PositiveInfinity) { + var k = 0 + while (k < size) { + values(k) = if (k == maxMarginIndex) 1.0 else 0.0 + k += 1 + } + } else { + val sum = { + var temp = 0.0 + var k = 0 + while (k < numClasses) { + values(k) = if (maxMargin > 0) { + math.exp(values(k) - maxMargin) + } else { + math.exp(values(k)) + } + temp += values(k) + k += 1 + } + temp + } + BLAS.scal(1 / sum, dv) + } + dv + case sv: SparseVector => + throw new RuntimeException("Unexpected error in MultinomialLogisticRegressionModel:" + + " raw2probabilitiesInPlace encountered SparseVector") + } + } + + override protected def predictRaw(features: Vector): Vector = margins(features) + + @Since("2.1.0") + override def copy(extra: ParamMap): MultinomialLogisticRegressionModel = { + val newModel = + copyValues( + new MultinomialLogisticRegressionModel(uid, coefficients, intercepts, numClasses), extra) + newModel.setParent(parent) + } + + /** + * Returns a [[org.apache.spark.ml.util.MLWriter]] instance for this ML instance. + * + * This does not save the [[parent]] currently. + */ + @Since("2.1.0") + override def write: MLWriter = + new MultinomialLogisticRegressionModel.MultinomialLogisticRegressionModelWriter(this) +} + + +@Since("2.1.0") +object MultinomialLogisticRegressionModel extends MLReadable[MultinomialLogisticRegressionModel] { + + @Since("2.1.0") + override def read: MLReader[MultinomialLogisticRegressionModel] = + new MultinomialLogisticRegressionModelReader + + @Since("2.1.0") + override def load(path: String): MultinomialLogisticRegressionModel = super.load(path) + + /** [[MLWriter]] instance for [[MultinomialLogisticRegressionModel]] */ + private[MultinomialLogisticRegressionModel] + class MultinomialLogisticRegressionModelWriter(instance: MultinomialLogisticRegressionModel) + extends MLWriter with Logging { + + private case class Data( + numClasses: Int, + numFeatures: Int, + intercepts: Vector, + coefficients: Matrix) + + override protected def saveImpl(path: String): Unit = { + // Save metadata and Params + DefaultParamsWriter.saveMetadata(instance, path, sc) + // Save model data: numClasses, numFeatures, intercept, coefficients + val data = Data(instance.numClasses, instance.numFeatures, instance.intercepts, + instance.coefficients) + val dataPath = new Path(path, "data").toString + sqlContext.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) + } + } + + private class MultinomialLogisticRegressionModelReader + extends MLReader[MultinomialLogisticRegressionModel] { + + /** Checked against metadata when loading model */ + private val className = classOf[MultinomialLogisticRegressionModel].getName + + override def load(path: String): MultinomialLogisticRegressionModel = { + val metadata = DefaultParamsReader.loadMetadata(path, sc, className) + + val dataPath = new Path(path, "data").toString + val data = sqlContext.read.format("parquet").load(dataPath) + .select("numClasses", "numFeatures", "intercepts", "coefficients").head() + val numClasses = data.getAs[Int](data.fieldIndex("numClasses")) + val intercepts = data.getAs[Vector](data.fieldIndex("intercepts")) + val coefficients = data.getAs[Matrix](data.fieldIndex("coefficients")) + val model = + new MultinomialLogisticRegressionModel(metadata.uid, coefficients, intercepts, numClasses) + + DefaultParamsReader.getAndSetParams(model, metadata) + model + } + } +} |