From 363793f2bf57205f1d753d4705583aaf441849b5 Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Wed, 17 Aug 2016 06:15:04 -0700 Subject: [SPARK-16444][SPARKR] Isotonic Regression wrapper in SparkR ## What changes were proposed in this pull request? (Please fill in changes proposed in this fix) Add Isotonic Regression wrapper in SparkR Wrappers in R and Scala are added. Unit tests Documentation ## How was this patch tested? Manually tested with sudo ./R/run-tests.sh (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) Author: wm624@hotmail.com Closes #14182 from wangmiao1981/isoR. --- R/pkg/NAMESPACE | 3 +- R/pkg/R/generics.R | 4 ++ R/pkg/R/mllib.R | 118 +++++++++++++++++++++++++++++++++ R/pkg/inst/tests/testthat/test_mllib.R | 32 +++++++++ 4 files changed, 156 insertions(+), 1 deletion(-) (limited to 'R/pkg') diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index aaab92f5cf..1e23b233c1 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -24,7 +24,8 @@ exportMethods("glm", "spark.kmeans", "fitted", "spark.naiveBayes", - "spark.survreg") + "spark.survreg", + "spark.isoreg") # Job group lifecycle management methods export("setJobGroup", diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 52ab730e21..ebacc11741 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -1304,6 +1304,10 @@ setGeneric("spark.naiveBayes", function(data, formula, ...) { standardGeneric("s #' @export setGeneric("spark.survreg", function(data, formula, ...) { standardGeneric("spark.survreg") }) +#' @rdname spark.isoreg +#' @export +setGeneric("spark.isoreg", function(data, formula, ...) { standardGeneric("spark.isoreg") }) + #' @rdname write.ml #' @export setGeneric("write.ml", function(object, path, ...) { standardGeneric("write.ml") }) diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index 6f6e2fc255..0dcc54d7af 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -53,6 +53,13 @@ setClass("AFTSurvivalRegressionModel", representation(jobj = "jobj")) #' @note KMeansModel since 2.0.0 setClass("KMeansModel", representation(jobj = "jobj")) +#' S4 class that represents an IsotonicRegressionModel +#' +#' @param jobj a Java object reference to the backing Scala IsotonicRegressionModel +#' @export +#' @note IsotonicRegressionModel since 2.1.0 +setClass("IsotonicRegressionModel", representation(jobj = "jobj")) + #' Saves the MLlib model to the input path #' #' Saves the MLlib model to the input path. For more information, see the specific @@ -62,6 +69,7 @@ setClass("KMeansModel", representation(jobj = "jobj")) #' @export #' @seealso \link{spark.glm}, \link{glm} #' @seealso \link{spark.kmeans}, \link{spark.naiveBayes}, \link{spark.survreg} +#' @seealso \link{spark.isoreg} #' @seealso \link{read.ml} NULL @@ -74,6 +82,7 @@ NULL #' @export #' @seealso \link{spark.glm}, \link{glm} #' @seealso \link{spark.kmeans}, \link{spark.naiveBayes}, \link{spark.survreg} +#' @seealso \link{spark.isoreg} NULL #' Generalized Linear Models @@ -299,6 +308,94 @@ setMethod("summary", signature(object = "NaiveBayesModel"), return(list(apriori = apriori, tables = tables)) }) +#' Isotonic Regression Model +#' +#' Fits an Isotonic Regression model against a Spark DataFrame, similarly to R's isoreg(). +#' Users can print, make predictions on the produced model and save the model to the input path. +#' +#' @param data SparkDataFrame for training +#' @param formula A symbolic description of the model to be fitted. Currently only a few formula +#' operators are supported, including '~', '.', ':', '+', and '-'. +#' @param isotonic Whether the output sequence should be isotonic/increasing (TRUE) or +#' antitonic/decreasing (FALSE) +#' @param featureIndex The index of the feature if \code{featuresCol} is a vector column (default: `0`), +#' no effect otherwise +#' @param weightCol The weight column name. +#' @return \code{spark.isoreg} returns a fitted Isotonic Regression model +#' @rdname spark.isoreg +#' @aliases spark.isoreg,SparkDataFrame,formula-method +#' @name spark.isoreg +#' @export +#' @examples +#' \dontrun{ +#' sparkR.session() +#' data <- list(list(7.0, 0.0), list(5.0, 1.0), list(3.0, 2.0), +#' list(5.0, 3.0), list(1.0, 4.0)) +#' df <- createDataFrame(data, c("label", "feature")) +#' model <- spark.isoreg(df, label ~ feature, isotonic = FALSE) +#' # return model boundaries and prediction as lists +#' result <- summary(model, df) +#' # prediction based on fitted model +#' predict_data <- list(list(-2.0), list(-1.0), list(0.5), +#' list(0.75), list(1.0), list(2.0), list(9.0)) +#' predict_df <- createDataFrame(predict_data, c("feature")) +#' # get prediction column +#' predict_result <- collect(select(predict(model, predict_df), "prediction")) +#' +#' # save fitted model to input path +#' path <- "path/to/model" +#' write.ml(model, path) +#' +#' # can also read back the saved model and print +#' savedModel <- read.ml(path) +#' summary(savedModel) +#' } +#' @note spark.isoreg since 2.1.0 +setMethod("spark.isoreg", signature(data = "SparkDataFrame", formula = "formula"), + function(data, formula, isotonic = TRUE, featureIndex = 0, weightCol = NULL) { + formula <- paste0(deparse(formula), collapse = "") + + if (is.null(weightCol)) { + weightCol <- "" + } + + jobj <- callJStatic("org.apache.spark.ml.r.IsotonicRegressionWrapper", "fit", + data@sdf, formula, as.logical(isotonic), as.integer(featureIndex), + as.character(weightCol)) + return(new("IsotonicRegressionModel", jobj = jobj)) + }) + +# Predicted values based on an isotonicRegression model + +#' @param object a fitted IsotonicRegressionModel +#' @param newData SparkDataFrame for testing +#' @return \code{predict} returns a SparkDataFrame containing predicted values +#' @rdname spark.isoreg +#' @aliases predict,IsotonicRegressionModel,SparkDataFrame-method +#' @export +#' @note predict(IsotonicRegressionModel) since 2.1.0 +setMethod("predict", signature(object = "IsotonicRegressionModel"), + function(object, newData) { + return(dataFrame(callJMethod(object@jobj, "transform", newData@sdf))) + }) + +# Get the summary of an IsotonicRegressionModel model + +#' @param object a fitted IsotonicRegressionModel +#' @param ... Other optional arguments to summary of an IsotonicRegressionModel +#' @return \code{summary} returns the model's boundaries and prediction as lists +#' @rdname spark.isoreg +#' @aliases summary,IsotonicRegressionModel-method +#' @export +#' @note summary(IsotonicRegressionModel) since 2.1.0 +setMethod("summary", signature(object = "IsotonicRegressionModel"), + function(object, ...) { + jobj <- object@jobj + boundaries <- callJMethod(jobj, "boundaries") + predictions <- callJMethod(jobj, "predictions") + return(list(boundaries = boundaries, predictions = predictions)) + }) + #' K-Means Clustering Model #' #' Fits a k-means clustering model against a Spark DataFrame, similarly to R's kmeans(). @@ -533,6 +630,25 @@ setMethod("write.ml", signature(object = "KMeansModel", path = "character"), invisible(callJMethod(writer, "save", path)) }) +# Save fitted IsotonicRegressionModel to the input path + +#' @param path The directory where the model is saved +#' @param overwrite Overwrites or not if the output path already exists. Default is FALSE +#' which means throw exception if the output path exists. +#' +#' @rdname spark.isoreg +#' @aliases write.ml,IsotonicRegressionModel,character-method +#' @export +#' @note write.ml(IsotonicRegression, character) since 2.1.0 +setMethod("write.ml", signature(object = "IsotonicRegressionModel", path = "character"), + function(object, path, overwrite = FALSE) { + writer <- callJMethod(object@jobj, "write") + if (overwrite) { + writer <- callJMethod(writer, "overwrite") + } + invisible(callJMethod(writer, "save", path)) + }) + #' Load a fitted MLlib model from the input path. #' #' @param path Path of the model to read. @@ -558,6 +674,8 @@ read.ml <- function(path) { return(new("GeneralizedLinearRegressionModel", jobj = jobj)) } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.KMeansWrapper")) { return(new("KMeansModel", jobj = jobj)) + } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.IsotonicRegressionWrapper")) { + return(new("IsotonicRegressionModel", jobj = jobj)) } else { stop(paste("Unsupported model: ", jobj)) } diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index bc18224680..b759b28927 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -476,4 +476,36 @@ test_that("spark.survreg", { } }) +test_that("spark.isotonicRegression", { + label <- c(7.0, 5.0, 3.0, 5.0, 1.0) + feature <- c(0.0, 1.0, 2.0, 3.0, 4.0) + weight <- c(1.0, 1.0, 1.0, 1.0, 1.0) + data <- as.data.frame(cbind(label, feature, weight)) + df <- suppressWarnings(createDataFrame(data)) + + model <- spark.isoreg(df, label ~ feature, isotonic = FALSE, + weightCol = "weight") + # only allow one variable on the right hand side of the formula + expect_error(model2 <- spark.isoreg(df, ~., isotonic = FALSE)) + result <- summary(model, df) + expect_equal(result$predictions, list(7, 5, 4, 4, 1)) + + # Test model prediction + predict_data <- list(list(-2.0), list(-1.0), list(0.5), + list(0.75), list(1.0), list(2.0), list(9.0)) + predict_df <- createDataFrame(predict_data, c("feature")) + predict_result <- collect(select(predict(model, predict_df), "prediction")) + expect_equal(predict_result$prediction, c(7.0, 7.0, 6.0, 5.5, 5.0, 4.0, 1.0)) + + # Test model save/load + modelPath <- tempfile(pattern = "spark-isotonicRegression", fileext = ".tmp") + write.ml(model, modelPath) + expect_error(write.ml(model, modelPath)) + write.ml(model, modelPath, overwrite = TRUE) + model2 <- read.ml(modelPath) + expect_equal(result, summary(model2, df)) + + unlink(modelPath) +}) + sparkR.session.stop() -- cgit v1.2.3