From 75e05a5a964c9585dd09a2ef6178881929bab1f1 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Tue, 12 Apr 2016 10:51:07 -0700 Subject: [SPARK-12566][SPARK-14324][ML] GLM model family, link function support in SparkR:::glm * SparkR glm supports families and link functions which match R's signature for family. * SparkR glm API refactor. The comparative standard of the new API is R glm, so I only expose the arguments that R glm supports: ```formula, family, data, epsilon and maxit```. * This PR is focus on glm() and predict(), summary statistics will be done in a separate PR after this get in. * This PR depends on #12287 which make GLMs support link prediction at Scala side. After that merged, I will add more tests for predict() to this PR. Unit tests. cc mengxr jkbradley hhbyyh Author: Yanbo Liang Closes #12294 from yanboliang/spark-12566. --- R/pkg/R/mllib.R | 139 +++++++++------------ R/pkg/inst/tests/testthat/test_mllib.R | 95 +++++--------- .../ml/r/GeneralizedLinearRegressionWrapper.scala | 79 ++++++++++++ .../org/apache/spark/ml/r/SparkRWrappers.scala | 115 ----------------- 4 files changed, 169 insertions(+), 259 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala delete mode 100644 mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index f3152cc232..31bca16580 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -17,10 +17,10 @@ # mllib.R: Provides methods for MLlib integration -#' @title S4 class that represents a PipelineModel -#' @param model A Java object reference to the backing Scala PipelineModel +#' @title S4 class that represents a generalized linear model +#' @param jobj a Java object reference to the backing Scala GeneralizedLinearRegressionWrapper #' @export -setClass("PipelineModel", representation(model = "jobj")) +setClass("GeneralizedLinearRegressionModel", representation(jobj = "jobj")) #' @title S4 class that represents a NaiveBayesModel #' @param jobj a Java object reference to the backing Scala NaiveBayesWrapper @@ -39,21 +39,18 @@ setClass("KMeansModel", representation(jobj = "jobj")) #' Fits a generalized linear model #' -#' Fits a generalized linear model, similarly to R's glm(). Also see the glmnet package. +#' Fits a generalized linear model, similarly to R's glm(). #' #' @param formula A symbolic description of the model to be fitted. Currently only a few formula #' operators are supported, including '~', '.', ':', '+', and '-'. -#' @param data DataFrame for training -#' @param family Error distribution. "gaussian" -> linear regression, "binomial" -> logistic reg. -#' @param lambda Regularization parameter -#' @param alpha Elastic-net mixing parameter (see glmnet's documentation for details) -#' @param standardize Whether to standardize features before training -#' @param solver The solver algorithm used for optimization, this can be "l-bfgs", "normal" and -#' "auto". "l-bfgs" denotes Limited-memory BFGS which is a limited-memory -#' quasi-Newton optimization method. "normal" denotes using Normal Equation as an -#' analytical solution to the linear regression problem. The default value is "auto" -#' which means that the solver algorithm is selected automatically. -#' @return a fitted MLlib model +#' @param data DataFrame for training. +#' @param family A description of the error distribution and link function to be used in the model. +#' This can be a character string naming a family function, a family function or +#' the result of a call to a family function. Refer R family at +#' \url{https://stat.ethz.ch/R-manual/R-devel/library/stats/html/family.html}. +#' @param epsilon Positive convergence tolerance of iterations. +#' @param maxit Integer giving the maximal number of IRLS iterations. +#' @return a fitted generalized linear model #' @rdname glm #' @export #' @examples @@ -64,25 +61,59 @@ setClass("KMeansModel", representation(jobj = "jobj")) #' df <- createDataFrame(sqlContext, iris) #' model <- glm(Sepal_Length ~ Sepal_Width, df, family="gaussian") #' summary(model) -#'} +#' } setMethod("glm", signature(formula = "formula", family = "ANY", data = "DataFrame"), - function(formula, family = c("gaussian", "binomial"), data, lambda = 0, alpha = 0, - standardize = TRUE, solver = "auto") { - family <- match.arg(family) + function(formula, family = gaussian, data, epsilon = 1e-06, maxit = 25) { + if (is.character(family)) { + family <- get(family, mode = "function", envir = parent.frame()) + } + if (is.function(family)) { + family <- family() + } + if (is.null(family$family)) { + print(family) + stop("'family' not recognized") + } + formula <- paste(deparse(formula), collapse = "") - model <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", - "fitRModelFormula", formula, data@sdf, family, lambda, - alpha, standardize, solver) - return(new("PipelineModel", model = model)) + + jobj <- callJStatic("org.apache.spark.ml.r.GeneralizedLinearRegressionWrapper", + "fit", formula, data@sdf, family$family, family$link, + epsilon, as.integer(maxit)) + return(new("GeneralizedLinearRegressionModel", jobj = jobj)) }) -#' Make predictions from a model +#' Get the summary of a generalized linear model #' -#' Makes predictions from a model produced by glm(), similarly to R's predict(). +#' Returns the summary of a model produced by glm(), similarly to R's summary(). #' -#' @param object A fitted MLlib model +#' @param object A fitted generalized linear model +#' @return coefficients the model's coefficients, intercept +#' @rdname summary +#' @export +#' @examples +#' \dontrun{ +#' model <- glm(y ~ x, trainingData) +#' summary(model) +#' } +setMethod("summary", signature(object = "GeneralizedLinearRegressionModel"), + function(object, ...) { + jobj <- object@jobj + features <- callJMethod(jobj, "rFeatures") + coefficients <- callJMethod(jobj, "rCoefficients") + coefficients <- as.matrix(unlist(coefficients)) + colnames(coefficients) <- c("Estimate") + rownames(coefficients) <- unlist(features) + return(list(coefficients = coefficients)) + }) + +#' Make predictions from a generalized linear model +#' +#' Makes predictions from a generalized linear model produced by glm(), similarly to R's predict(). +#' +#' @param object A fitted generalized linear model #' @param newData DataFrame for testing -#' @return DataFrame containing predicted values +#' @return DataFrame containing predicted labels in a column named "prediction" #' @rdname predict #' @export #' @examples @@ -90,10 +121,10 @@ setMethod("glm", signature(formula = "formula", family = "ANY", data = "DataFram #' model <- glm(y ~ x, trainingData) #' predicted <- predict(model, testData) #' showDF(predicted) -#'} -setMethod("predict", signature(object = "PipelineModel"), +#' } +setMethod("predict", signature(object = "GeneralizedLinearRegressionModel"), function(object, newData) { - return(dataFrame(callJMethod(object@model, "transform", newData@sdf))) + return(dataFrame(callJMethod(object@jobj, "transform", newData@sdf))) }) #' Make predictions from a naive Bayes model @@ -116,54 +147,6 @@ setMethod("predict", signature(object = "NaiveBayesModel"), return(dataFrame(callJMethod(object@jobj, "transform", newData@sdf))) }) -#' Get the summary of a model -#' -#' Returns the summary of a model produced by glm(), similarly to R's summary(). -#' -#' @param object A fitted MLlib model -#' @return a list with 'devianceResiduals' and 'coefficients' components for gaussian family -#' or a list with 'coefficients' component for binomial family. \cr -#' For gaussian family: the 'devianceResiduals' gives the min/max deviance residuals -#' of the estimation, the 'coefficients' gives the estimated coefficients and their -#' estimated standard errors, t values and p-values. (It only available when model -#' fitted by normal solver.) \cr -#' For binomial family: the 'coefficients' gives the estimated coefficients. -#' See summary.glm for more information. \cr -#' @rdname summary -#' @export -#' @examples -#' \dontrun{ -#' model <- glm(y ~ x, trainingData) -#' summary(model) -#'} -setMethod("summary", signature(object = "PipelineModel"), - function(object, ...) { - modelName <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", - "getModelName", object@model) - features <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", - "getModelFeatures", object@model) - coefficients <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", - "getModelCoefficients", object@model) - if (modelName == "LinearRegressionModel") { - devianceResiduals <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", - "getModelDevianceResiduals", object@model) - devianceResiduals <- matrix(devianceResiduals, nrow = 1) - colnames(devianceResiduals) <- c("Min", "Max") - rownames(devianceResiduals) <- rep("", times = 1) - coefficients <- matrix(coefficients, ncol = 4) - colnames(coefficients) <- c("Estimate", "Std. Error", "t value", "Pr(>|t|)") - rownames(coefficients) <- unlist(features) - return(list(devianceResiduals = devianceResiduals, coefficients = coefficients)) - } else if (modelName == "LogisticRegressionModel") { - coefficients <- as.matrix(unlist(coefficients)) - colnames(coefficients) <- c("Estimate") - rownames(coefficients) <- unlist(features) - return(list(coefficients = coefficients)) - } else { - stop(paste("Unsupported model", modelName, sep = " ")) - } - }) - #' Get the summary of a naive Bayes model #' #' Returns the summary of a naive Bayes model produced by naiveBayes(), similarly to R's summary(). diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index fdb591756e..a9dbd2bdc4 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -25,20 +25,21 @@ sc <- sparkR.init() sqlContext <- sparkRSQL.init(sc) -test_that("glm and predict", { +test_that("formula of glm", { training <- suppressWarnings(createDataFrame(sqlContext, iris)) - test <- select(training, "Sepal_Length") - model <- glm(Sepal_Width ~ Sepal_Length, training, family = "gaussian") - prediction <- predict(model, test) - expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double") + # dot minus and intercept vs native glm + model <- glm(Sepal_Width ~ . - Species + 0, data = training) + vals <- collect(select(predict(model, training), "prediction")) + rVals <- predict(glm(Sepal.Width ~ . - Species + 0, data = iris), iris) + expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) - # Test stats::predict is working - x <- rnorm(15) - y <- x + rnorm(15) - expect_equal(length(predict(lm(y ~ x))), 15) -}) + # feature interaction vs native glm + model <- glm(Sepal_Width ~ Species:Sepal_Length, data = training) + vals <- collect(select(predict(model, training), "prediction")) + rVals <- predict(glm(Sepal.Width ~ Species:Sepal.Length, data = iris), iris) + expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) -test_that("glm should work with long formula", { + # glm should work with long formula training <- suppressWarnings(createDataFrame(sqlContext, iris)) training$LongLongLongLongLongName <- training$Sepal_Width training$VeryLongLongLongLonLongName <- training$Sepal_Length @@ -50,68 +51,30 @@ test_that("glm should work with long formula", { expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) }) -test_that("predictions match with native glm", { +test_that("glm and predict", { training <- suppressWarnings(createDataFrame(sqlContext, iris)) + # gaussian family model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training) - vals <- collect(select(predict(model, training), "prediction")) + prediction <- predict(model, training) + expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double") + vals <- collect(select(prediction, "prediction")) rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris) expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) -}) - -test_that("dot minus and intercept vs native glm", { - training <- suppressWarnings(createDataFrame(sqlContext, iris)) - model <- glm(Sepal_Width ~ . - Species + 0, data = training) - vals <- collect(select(predict(model, training), "prediction")) - rVals <- predict(glm(Sepal.Width ~ . - Species + 0, data = iris), iris) - expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) -}) -test_that("feature interaction vs native glm", { - training <- suppressWarnings(createDataFrame(sqlContext, iris)) - model <- glm(Sepal_Width ~ Species:Sepal_Length, data = training) - vals <- collect(select(predict(model, training), "prediction")) - rVals <- predict(glm(Sepal.Width ~ Species:Sepal.Length, data = iris), iris) + # poisson family + model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training, + family = poisson(link = identity)) + prediction <- predict(model, training) + expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double") + vals <- collect(select(prediction, "prediction")) + rVals <- suppressWarnings(predict(glm(Sepal.Width ~ Sepal.Length + Species, + data = iris, family = poisson(link = identity)), iris)) expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) -}) -test_that("summary coefficients match with native glm", { - training <- suppressWarnings(createDataFrame(sqlContext, iris)) - stats <- summary(glm(Sepal_Width ~ Sepal_Length + Species, data = training, solver = "normal")) - coefs <- unlist(stats$coefficients) - devianceResiduals <- unlist(stats$devianceResiduals) - - rStats <- summary(glm(Sepal.Width ~ Sepal.Length + Species, data = iris)) - rCoefs <- unlist(rStats$coefficients) - rDevianceResiduals <- c(-0.95096, 0.72918) - - expect_true(all(abs(rCoefs - coefs) < 1e-5)) - expect_true(all(abs(rDevianceResiduals - devianceResiduals) < 1e-5)) - expect_true(all( - rownames(stats$coefficients) == - c("(Intercept)", "Sepal_Length", "Species_versicolor", "Species_virginica"))) -}) - -test_that("summary coefficients match with native glm of family 'binomial'", { - df <- suppressWarnings(createDataFrame(sqlContext, iris)) - training <- filter(df, df$Species != "setosa") - stats <- summary(glm(Species ~ Sepal_Length + Sepal_Width, data = training, - family = "binomial")) - coefs <- as.vector(stats$coefficients[, 1]) - - rTraining <- iris[iris$Species %in% c("versicolor", "virginica"), ] - rCoefs <- as.vector(coef(glm(Species ~ Sepal.Length + Sepal.Width, data = rTraining, - family = binomial(link = "logit")))) - - expect_true(all(abs(rCoefs - coefs) < 1e-4)) - expect_true(all( - rownames(stats$coefficients) == - c("(Intercept)", "Sepal_Length", "Sepal_Width"))) -}) - -test_that("summary works on base GLM models", { - baseModel <- stats::glm(Sepal.Width ~ Sepal.Length + Species, data = iris) - baseSummary <- summary(baseModel) - expect_true(abs(baseSummary$deviance - 12.19313) < 1e-4) + # Test stats::predict is working + x <- rnorm(15) + y <- x + rnorm(15) + expect_equal(length(predict(lm(y ~ x))), 15) }) test_that("kmeans", { diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala new file mode 100644 index 0000000000..475a308385 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.r + +import org.apache.spark.ml.{Pipeline, PipelineModel} +import org.apache.spark.ml.attribute.AttributeGroup +import org.apache.spark.ml.feature.RFormula +import org.apache.spark.ml.regression._ +import org.apache.spark.sql._ + +private[r] class GeneralizedLinearRegressionWrapper private ( + pipeline: PipelineModel, + val features: Array[String]) { + + private val glm: GeneralizedLinearRegressionModel = + pipeline.stages(1).asInstanceOf[GeneralizedLinearRegressionModel] + + lazy val rCoefficients: Array[Double] = if (glm.getFitIntercept) { + Array(glm.intercept) ++ glm.coefficients.toArray + } else { + glm.coefficients.toArray + } + + lazy val rFeatures: Array[String] = if (glm.getFitIntercept) { + Array("(Intercept)") ++ features + } else { + features + } + + def transform(dataset: DataFrame): DataFrame = { + pipeline.transform(dataset).drop(glm.getFeaturesCol) + } +} + +private[r] object GeneralizedLinearRegressionWrapper { + + def fit( + formula: String, + data: DataFrame, + family: String, + link: String, + epsilon: Double, + maxit: Int): GeneralizedLinearRegressionWrapper = { + val rFormula = new RFormula() + .setFormula(formula) + val rFormulaModel = rFormula.fit(data) + // get labels and feature names from output schema + val schema = rFormulaModel.transform(data).schema + val featureAttrs = AttributeGroup.fromStructField(schema(rFormula.getFeaturesCol)) + .attributes.get + val features = featureAttrs.map(_.name.get) + // assemble and fit the pipeline + val glm = new GeneralizedLinearRegression() + .setFamily(family) + .setLink(link) + .setFitIntercept(rFormula.hasIntercept) + .setTol(epsilon) + .setMaxIter(maxit) + val pipeline = new Pipeline() + .setStages(Array(rFormulaModel, glm)) + .fit(data) + new GeneralizedLinearRegressionWrapper(pipeline, features) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala deleted file mode 100644 index fa143715be..0000000000 --- a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.ml.api.r - -import org.apache.spark.ml.{Pipeline, PipelineModel} -import org.apache.spark.ml.attribute._ -import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel} -import org.apache.spark.ml.feature.RFormula -import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel} -import org.apache.spark.sql.DataFrame - -private[r] object SparkRWrappers { - def fitRModelFormula( - value: String, - df: DataFrame, - family: String, - lambda: Double, - alpha: Double, - standardize: Boolean, - solver: String): PipelineModel = { - val formula = new RFormula().setFormula(value) - val estimator = family match { - case "gaussian" => new LinearRegression() - .setRegParam(lambda) - .setElasticNetParam(alpha) - .setFitIntercept(formula.hasIntercept) - .setStandardization(standardize) - .setSolver(solver) - case "binomial" => new LogisticRegression() - .setRegParam(lambda) - .setElasticNetParam(alpha) - .setFitIntercept(formula.hasIntercept) - .setStandardization(standardize) - } - val pipeline = new Pipeline().setStages(Array(formula, estimator)) - pipeline.fit(df) - } - - def getModelCoefficients(model: PipelineModel): Array[Double] = { - model.stages.last match { - case m: LinearRegressionModel => - val coefficientStandardErrorsR = Array(m.summary.coefficientStandardErrors.last) ++ - m.summary.coefficientStandardErrors.dropRight(1) - val tValuesR = Array(m.summary.tValues.last) ++ m.summary.tValues.dropRight(1) - val pValuesR = Array(m.summary.pValues.last) ++ m.summary.pValues.dropRight(1) - if (m.getFitIntercept) { - Array(m.intercept) ++ m.coefficients.toArray ++ coefficientStandardErrorsR ++ - tValuesR ++ pValuesR - } else { - m.coefficients.toArray ++ coefficientStandardErrorsR ++ tValuesR ++ pValuesR - } - case m: LogisticRegressionModel => - if (m.getFitIntercept) { - Array(m.intercept) ++ m.coefficients.toArray - } else { - m.coefficients.toArray - } - } - } - - def getModelDevianceResiduals(model: PipelineModel): Array[Double] = { - model.stages.last match { - case m: LinearRegressionModel => - m.summary.devianceResiduals - case m: LogisticRegressionModel => - throw new UnsupportedOperationException( - "No deviance residuals available for LogisticRegressionModel") - } - } - - def getModelFeatures(model: PipelineModel): Array[String] = { - model.stages.last match { - case m: LinearRegressionModel => - val attrs = AttributeGroup.fromStructField( - m.summary.predictions.schema(m.summary.featuresCol)) - if (m.getFitIntercept) { - Array("(Intercept)") ++ attrs.attributes.get.map(_.name.get) - } else { - attrs.attributes.get.map(_.name.get) - } - case m: LogisticRegressionModel => - val attrs = AttributeGroup.fromStructField( - m.summary.predictions.schema(m.summary.featuresCol)) - if (m.getFitIntercept) { - Array("(Intercept)") ++ attrs.attributes.get.map(_.name.get) - } else { - attrs.attributes.get.map(_.name.get) - } - } - } - - def getModelName(model: PipelineModel): String = { - model.stages.last match { - case m: LinearRegressionModel => - "LinearRegressionModel" - case m: LogisticRegressionModel => - "LogisticRegressionModel" - } - } -} -- cgit v1.2.3