aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--R/pkg/R/mllib.R139
-rw-r--r--R/pkg/inst/tests/testthat/test_mllib.R95
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala79
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala115
4 files changed, 169 insertions, 259 deletions
diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R
index f3152cc232..31bca16580 100644
--- a/R/pkg/R/mllib.R
+++ b/R/pkg/R/mllib.R
@@ -17,10 +17,10 @@
# mllib.R: Provides methods for MLlib integration
-#' @title S4 class that represents a PipelineModel
-#' @param model A Java object reference to the backing Scala PipelineModel
+#' @title S4 class that represents a generalized linear model
+#' @param jobj a Java object reference to the backing Scala GeneralizedLinearRegressionWrapper
#' @export
-setClass("PipelineModel", representation(model = "jobj"))
+setClass("GeneralizedLinearRegressionModel", representation(jobj = "jobj"))
#' @title S4 class that represents a NaiveBayesModel
#' @param jobj a Java object reference to the backing Scala NaiveBayesWrapper
@@ -39,21 +39,18 @@ setClass("KMeansModel", representation(jobj = "jobj"))
#' Fits a generalized linear model
#'
-#' Fits a generalized linear model, similarly to R's glm(). Also see the glmnet package.
+#' Fits a generalized linear model, similarly to R's glm().
#'
#' @param formula A symbolic description of the model to be fitted. Currently only a few formula
#' operators are supported, including '~', '.', ':', '+', and '-'.
-#' @param data DataFrame for training
-#' @param family Error distribution. "gaussian" -> linear regression, "binomial" -> logistic reg.
-#' @param lambda Regularization parameter
-#' @param alpha Elastic-net mixing parameter (see glmnet's documentation for details)
-#' @param standardize Whether to standardize features before training
-#' @param solver The solver algorithm used for optimization, this can be "l-bfgs", "normal" and
-#' "auto". "l-bfgs" denotes Limited-memory BFGS which is a limited-memory
-#' quasi-Newton optimization method. "normal" denotes using Normal Equation as an
-#' analytical solution to the linear regression problem. The default value is "auto"
-#' which means that the solver algorithm is selected automatically.
-#' @return a fitted MLlib model
+#' @param data DataFrame for training.
+#' @param family A description of the error distribution and link function to be used in the model.
+#' This can be a character string naming a family function, a family function or
+#' the result of a call to a family function. Refer R family at
+#' \url{https://stat.ethz.ch/R-manual/R-devel/library/stats/html/family.html}.
+#' @param epsilon Positive convergence tolerance of iterations.
+#' @param maxit Integer giving the maximal number of IRLS iterations.
+#' @return a fitted generalized linear model
#' @rdname glm
#' @export
#' @examples
@@ -64,25 +61,59 @@ setClass("KMeansModel", representation(jobj = "jobj"))
#' df <- createDataFrame(sqlContext, iris)
#' model <- glm(Sepal_Length ~ Sepal_Width, df, family="gaussian")
#' summary(model)
-#'}
+#' }
setMethod("glm", signature(formula = "formula", family = "ANY", data = "DataFrame"),
- function(formula, family = c("gaussian", "binomial"), data, lambda = 0, alpha = 0,
- standardize = TRUE, solver = "auto") {
- family <- match.arg(family)
+ function(formula, family = gaussian, data, epsilon = 1e-06, maxit = 25) {
+ if (is.character(family)) {
+ family <- get(family, mode = "function", envir = parent.frame())
+ }
+ if (is.function(family)) {
+ family <- family()
+ }
+ if (is.null(family$family)) {
+ print(family)
+ stop("'family' not recognized")
+ }
+
formula <- paste(deparse(formula), collapse = "")
- model <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
- "fitRModelFormula", formula, data@sdf, family, lambda,
- alpha, standardize, solver)
- return(new("PipelineModel", model = model))
+
+ jobj <- callJStatic("org.apache.spark.ml.r.GeneralizedLinearRegressionWrapper",
+ "fit", formula, data@sdf, family$family, family$link,
+ epsilon, as.integer(maxit))
+ return(new("GeneralizedLinearRegressionModel", jobj = jobj))
})
-#' Make predictions from a model
+#' Get the summary of a generalized linear model
#'
-#' Makes predictions from a model produced by glm(), similarly to R's predict().
+#' Returns the summary of a model produced by glm(), similarly to R's summary().
#'
-#' @param object A fitted MLlib model
+#' @param object A fitted generalized linear model
+#' @return coefficients the model's coefficients, intercept
+#' @rdname summary
+#' @export
+#' @examples
+#' \dontrun{
+#' model <- glm(y ~ x, trainingData)
+#' summary(model)
+#' }
+setMethod("summary", signature(object = "GeneralizedLinearRegressionModel"),
+ function(object, ...) {
+ jobj <- object@jobj
+ features <- callJMethod(jobj, "rFeatures")
+ coefficients <- callJMethod(jobj, "rCoefficients")
+ coefficients <- as.matrix(unlist(coefficients))
+ colnames(coefficients) <- c("Estimate")
+ rownames(coefficients) <- unlist(features)
+ return(list(coefficients = coefficients))
+ })
+
+#' Make predictions from a generalized linear model
+#'
+#' Makes predictions from a generalized linear model produced by glm(), similarly to R's predict().
+#'
+#' @param object A fitted generalized linear model
#' @param newData DataFrame for testing
-#' @return DataFrame containing predicted values
+#' @return DataFrame containing predicted labels in a column named "prediction"
#' @rdname predict
#' @export
#' @examples
@@ -90,10 +121,10 @@ setMethod("glm", signature(formula = "formula", family = "ANY", data = "DataFram
#' model <- glm(y ~ x, trainingData)
#' predicted <- predict(model, testData)
#' showDF(predicted)
-#'}
-setMethod("predict", signature(object = "PipelineModel"),
+#' }
+setMethod("predict", signature(object = "GeneralizedLinearRegressionModel"),
function(object, newData) {
- return(dataFrame(callJMethod(object@model, "transform", newData@sdf)))
+ return(dataFrame(callJMethod(object@jobj, "transform", newData@sdf)))
})
#' Make predictions from a naive Bayes model
@@ -116,54 +147,6 @@ setMethod("predict", signature(object = "NaiveBayesModel"),
return(dataFrame(callJMethod(object@jobj, "transform", newData@sdf)))
})
-#' Get the summary of a model
-#'
-#' Returns the summary of a model produced by glm(), similarly to R's summary().
-#'
-#' @param object A fitted MLlib model
-#' @return a list with 'devianceResiduals' and 'coefficients' components for gaussian family
-#' or a list with 'coefficients' component for binomial family. \cr
-#' For gaussian family: the 'devianceResiduals' gives the min/max deviance residuals
-#' of the estimation, the 'coefficients' gives the estimated coefficients and their
-#' estimated standard errors, t values and p-values. (It only available when model
-#' fitted by normal solver.) \cr
-#' For binomial family: the 'coefficients' gives the estimated coefficients.
-#' See summary.glm for more information. \cr
-#' @rdname summary
-#' @export
-#' @examples
-#' \dontrun{
-#' model <- glm(y ~ x, trainingData)
-#' summary(model)
-#'}
-setMethod("summary", signature(object = "PipelineModel"),
- function(object, ...) {
- modelName <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
- "getModelName", object@model)
- features <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
- "getModelFeatures", object@model)
- coefficients <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
- "getModelCoefficients", object@model)
- if (modelName == "LinearRegressionModel") {
- devianceResiduals <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
- "getModelDevianceResiduals", object@model)
- devianceResiduals <- matrix(devianceResiduals, nrow = 1)
- colnames(devianceResiduals) <- c("Min", "Max")
- rownames(devianceResiduals) <- rep("", times = 1)
- coefficients <- matrix(coefficients, ncol = 4)
- colnames(coefficients) <- c("Estimate", "Std. Error", "t value", "Pr(>|t|)")
- rownames(coefficients) <- unlist(features)
- return(list(devianceResiduals = devianceResiduals, coefficients = coefficients))
- } else if (modelName == "LogisticRegressionModel") {
- coefficients <- as.matrix(unlist(coefficients))
- colnames(coefficients) <- c("Estimate")
- rownames(coefficients) <- unlist(features)
- return(list(coefficients = coefficients))
- } else {
- stop(paste("Unsupported model", modelName, sep = " "))
- }
- })
-
#' Get the summary of a naive Bayes model
#'
#' Returns the summary of a naive Bayes model produced by naiveBayes(), similarly to R's summary().
diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R
index fdb591756e..a9dbd2bdc4 100644
--- a/R/pkg/inst/tests/testthat/test_mllib.R
+++ b/R/pkg/inst/tests/testthat/test_mllib.R
@@ -25,20 +25,21 @@ sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)
-test_that("glm and predict", {
+test_that("formula of glm", {
training <- suppressWarnings(createDataFrame(sqlContext, iris))
- test <- select(training, "Sepal_Length")
- model <- glm(Sepal_Width ~ Sepal_Length, training, family = "gaussian")
- prediction <- predict(model, test)
- expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
+ # dot minus and intercept vs native glm
+ model <- glm(Sepal_Width ~ . - Species + 0, data = training)
+ vals <- collect(select(predict(model, training), "prediction"))
+ rVals <- predict(glm(Sepal.Width ~ . - Species + 0, data = iris), iris)
+ expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
- # Test stats::predict is working
- x <- rnorm(15)
- y <- x + rnorm(15)
- expect_equal(length(predict(lm(y ~ x))), 15)
-})
+ # feature interaction vs native glm
+ model <- glm(Sepal_Width ~ Species:Sepal_Length, data = training)
+ vals <- collect(select(predict(model, training), "prediction"))
+ rVals <- predict(glm(Sepal.Width ~ Species:Sepal.Length, data = iris), iris)
+ expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-test_that("glm should work with long formula", {
+ # glm should work with long formula
training <- suppressWarnings(createDataFrame(sqlContext, iris))
training$LongLongLongLongLongName <- training$Sepal_Width
training$VeryLongLongLongLonLongName <- training$Sepal_Length
@@ -50,68 +51,30 @@ test_that("glm should work with long formula", {
expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
})
-test_that("predictions match with native glm", {
+test_that("glm and predict", {
training <- suppressWarnings(createDataFrame(sqlContext, iris))
+ # gaussian family
model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training)
- vals <- collect(select(predict(model, training), "prediction"))
+ prediction <- predict(model, training)
+ expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
+ vals <- collect(select(prediction, "prediction"))
rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris)
expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-})
-
-test_that("dot minus and intercept vs native glm", {
- training <- suppressWarnings(createDataFrame(sqlContext, iris))
- model <- glm(Sepal_Width ~ . - Species + 0, data = training)
- vals <- collect(select(predict(model, training), "prediction"))
- rVals <- predict(glm(Sepal.Width ~ . - Species + 0, data = iris), iris)
- expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-})
-test_that("feature interaction vs native glm", {
- training <- suppressWarnings(createDataFrame(sqlContext, iris))
- model <- glm(Sepal_Width ~ Species:Sepal_Length, data = training)
- vals <- collect(select(predict(model, training), "prediction"))
- rVals <- predict(glm(Sepal.Width ~ Species:Sepal.Length, data = iris), iris)
+ # poisson family
+ model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training,
+ family = poisson(link = identity))
+ prediction <- predict(model, training)
+ expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
+ vals <- collect(select(prediction, "prediction"))
+ rVals <- suppressWarnings(predict(glm(Sepal.Width ~ Sepal.Length + Species,
+ data = iris, family = poisson(link = identity)), iris))
expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-})
-test_that("summary coefficients match with native glm", {
- training <- suppressWarnings(createDataFrame(sqlContext, iris))
- stats <- summary(glm(Sepal_Width ~ Sepal_Length + Species, data = training, solver = "normal"))
- coefs <- unlist(stats$coefficients)
- devianceResiduals <- unlist(stats$devianceResiduals)
-
- rStats <- summary(glm(Sepal.Width ~ Sepal.Length + Species, data = iris))
- rCoefs <- unlist(rStats$coefficients)
- rDevianceResiduals <- c(-0.95096, 0.72918)
-
- expect_true(all(abs(rCoefs - coefs) < 1e-5))
- expect_true(all(abs(rDevianceResiduals - devianceResiduals) < 1e-5))
- expect_true(all(
- rownames(stats$coefficients) ==
- c("(Intercept)", "Sepal_Length", "Species_versicolor", "Species_virginica")))
-})
-
-test_that("summary coefficients match with native glm of family 'binomial'", {
- df <- suppressWarnings(createDataFrame(sqlContext, iris))
- training <- filter(df, df$Species != "setosa")
- stats <- summary(glm(Species ~ Sepal_Length + Sepal_Width, data = training,
- family = "binomial"))
- coefs <- as.vector(stats$coefficients[, 1])
-
- rTraining <- iris[iris$Species %in% c("versicolor", "virginica"), ]
- rCoefs <- as.vector(coef(glm(Species ~ Sepal.Length + Sepal.Width, data = rTraining,
- family = binomial(link = "logit"))))
-
- expect_true(all(abs(rCoefs - coefs) < 1e-4))
- expect_true(all(
- rownames(stats$coefficients) ==
- c("(Intercept)", "Sepal_Length", "Sepal_Width")))
-})
-
-test_that("summary works on base GLM models", {
- baseModel <- stats::glm(Sepal.Width ~ Sepal.Length + Species, data = iris)
- baseSummary <- summary(baseModel)
- expect_true(abs(baseSummary$deviance - 12.19313) < 1e-4)
+ # Test stats::predict is working
+ x <- rnorm(15)
+ y <- x + rnorm(15)
+ expect_equal(length(predict(lm(y ~ x))), 15)
})
test_that("kmeans", {
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
new file mode 100644
index 0000000000..475a308385
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.r
+
+import org.apache.spark.ml.{Pipeline, PipelineModel}
+import org.apache.spark.ml.attribute.AttributeGroup
+import org.apache.spark.ml.feature.RFormula
+import org.apache.spark.ml.regression._
+import org.apache.spark.sql._
+
+private[r] class GeneralizedLinearRegressionWrapper private (
+ pipeline: PipelineModel,
+ val features: Array[String]) {
+
+ private val glm: GeneralizedLinearRegressionModel =
+ pipeline.stages(1).asInstanceOf[GeneralizedLinearRegressionModel]
+
+ lazy val rCoefficients: Array[Double] = if (glm.getFitIntercept) {
+ Array(glm.intercept) ++ glm.coefficients.toArray
+ } else {
+ glm.coefficients.toArray
+ }
+
+ lazy val rFeatures: Array[String] = if (glm.getFitIntercept) {
+ Array("(Intercept)") ++ features
+ } else {
+ features
+ }
+
+ def transform(dataset: DataFrame): DataFrame = {
+ pipeline.transform(dataset).drop(glm.getFeaturesCol)
+ }
+}
+
+private[r] object GeneralizedLinearRegressionWrapper {
+
+ def fit(
+ formula: String,
+ data: DataFrame,
+ family: String,
+ link: String,
+ epsilon: Double,
+ maxit: Int): GeneralizedLinearRegressionWrapper = {
+ val rFormula = new RFormula()
+ .setFormula(formula)
+ val rFormulaModel = rFormula.fit(data)
+ // get labels and feature names from output schema
+ val schema = rFormulaModel.transform(data).schema
+ val featureAttrs = AttributeGroup.fromStructField(schema(rFormula.getFeaturesCol))
+ .attributes.get
+ val features = featureAttrs.map(_.name.get)
+ // assemble and fit the pipeline
+ val glm = new GeneralizedLinearRegression()
+ .setFamily(family)
+ .setLink(link)
+ .setFitIntercept(rFormula.hasIntercept)
+ .setTol(epsilon)
+ .setMaxIter(maxit)
+ val pipeline = new Pipeline()
+ .setStages(Array(rFormulaModel, glm))
+ .fit(data)
+ new GeneralizedLinearRegressionWrapper(pipeline, features)
+ }
+}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala
deleted file mode 100644
index fa143715be..0000000000
--- a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.ml.api.r
-
-import org.apache.spark.ml.{Pipeline, PipelineModel}
-import org.apache.spark.ml.attribute._
-import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel}
-import org.apache.spark.ml.feature.RFormula
-import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel}
-import org.apache.spark.sql.DataFrame
-
-private[r] object SparkRWrappers {
- def fitRModelFormula(
- value: String,
- df: DataFrame,
- family: String,
- lambda: Double,
- alpha: Double,
- standardize: Boolean,
- solver: String): PipelineModel = {
- val formula = new RFormula().setFormula(value)
- val estimator = family match {
- case "gaussian" => new LinearRegression()
- .setRegParam(lambda)
- .setElasticNetParam(alpha)
- .setFitIntercept(formula.hasIntercept)
- .setStandardization(standardize)
- .setSolver(solver)
- case "binomial" => new LogisticRegression()
- .setRegParam(lambda)
- .setElasticNetParam(alpha)
- .setFitIntercept(formula.hasIntercept)
- .setStandardization(standardize)
- }
- val pipeline = new Pipeline().setStages(Array(formula, estimator))
- pipeline.fit(df)
- }
-
- def getModelCoefficients(model: PipelineModel): Array[Double] = {
- model.stages.last match {
- case m: LinearRegressionModel =>
- val coefficientStandardErrorsR = Array(m.summary.coefficientStandardErrors.last) ++
- m.summary.coefficientStandardErrors.dropRight(1)
- val tValuesR = Array(m.summary.tValues.last) ++ m.summary.tValues.dropRight(1)
- val pValuesR = Array(m.summary.pValues.last) ++ m.summary.pValues.dropRight(1)
- if (m.getFitIntercept) {
- Array(m.intercept) ++ m.coefficients.toArray ++ coefficientStandardErrorsR ++
- tValuesR ++ pValuesR
- } else {
- m.coefficients.toArray ++ coefficientStandardErrorsR ++ tValuesR ++ pValuesR
- }
- case m: LogisticRegressionModel =>
- if (m.getFitIntercept) {
- Array(m.intercept) ++ m.coefficients.toArray
- } else {
- m.coefficients.toArray
- }
- }
- }
-
- def getModelDevianceResiduals(model: PipelineModel): Array[Double] = {
- model.stages.last match {
- case m: LinearRegressionModel =>
- m.summary.devianceResiduals
- case m: LogisticRegressionModel =>
- throw new UnsupportedOperationException(
- "No deviance residuals available for LogisticRegressionModel")
- }
- }
-
- def getModelFeatures(model: PipelineModel): Array[String] = {
- model.stages.last match {
- case m: LinearRegressionModel =>
- val attrs = AttributeGroup.fromStructField(
- m.summary.predictions.schema(m.summary.featuresCol))
- if (m.getFitIntercept) {
- Array("(Intercept)") ++ attrs.attributes.get.map(_.name.get)
- } else {
- attrs.attributes.get.map(_.name.get)
- }
- case m: LogisticRegressionModel =>
- val attrs = AttributeGroup.fromStructField(
- m.summary.predictions.schema(m.summary.featuresCol))
- if (m.getFitIntercept) {
- Array("(Intercept)") ++ attrs.attributes.get.map(_.name.get)
- } else {
- attrs.attributes.get.map(_.name.get)
- }
- }
- }
-
- def getModelName(model: PipelineModel): String = {
- model.stages.last match {
- case m: LinearRegressionModel =>
- "LinearRegressionModel"
- case m: LogisticRegressionModel =>
- "LogisticRegressionModel"
- }
- }
-}