aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwm624@hotmail.com <wm624@hotmail.com>2016-08-17 06:15:04 -0700
committerFelix Cheung <felixcheung@apache.org>2016-08-17 06:15:04 -0700
commit363793f2bf57205f1d753d4705583aaf441849b5 (patch)
tree86f1b24956b4fc3f082e7ebcf61654928355ff16
parent4d0cc84afca9efd4541a2e8d583e3e0f2df37c0d (diff)
downloadspark-363793f2bf57205f1d753d4705583aaf441849b5.tar.gz
spark-363793f2bf57205f1d753d4705583aaf441849b5.tar.bz2
spark-363793f2bf57205f1d753d4705583aaf441849b5.zip
[SPARK-16444][SPARKR] Isotonic Regression wrapper in SparkR
## What changes were proposed in this pull request? (Please fill in changes proposed in this fix) Add Isotonic Regression wrapper in SparkR Wrappers in R and Scala are added. Unit tests Documentation ## How was this patch tested? Manually tested with sudo ./R/run-tests.sh (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) Author: wm624@hotmail.com <wm624@hotmail.com> Closes #14182 from wangmiao1981/isoR.
-rw-r--r--R/pkg/NAMESPACE3
-rw-r--r--R/pkg/R/generics.R4
-rw-r--r--R/pkg/R/mllib.R118
-rw-r--r--R/pkg/inst/tests/testthat/test_mllib.R32
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala119
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala2
6 files changed, 277 insertions, 1 deletions
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index aaab92f5cf..1e23b233c1 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -24,7 +24,8 @@ exportMethods("glm",
"spark.kmeans",
"fitted",
"spark.naiveBayes",
- "spark.survreg")
+ "spark.survreg",
+ "spark.isoreg")
# Job group lifecycle management methods
export("setJobGroup",
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 52ab730e21..ebacc11741 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -1304,6 +1304,10 @@ setGeneric("spark.naiveBayes", function(data, formula, ...) { standardGeneric("s
#' @export
setGeneric("spark.survreg", function(data, formula, ...) { standardGeneric("spark.survreg") })
+#' @rdname spark.isoreg
+#' @export
+setGeneric("spark.isoreg", function(data, formula, ...) { standardGeneric("spark.isoreg") })
+
#' @rdname write.ml
#' @export
setGeneric("write.ml", function(object, path, ...) { standardGeneric("write.ml") })
diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R
index 6f6e2fc255..0dcc54d7af 100644
--- a/R/pkg/R/mllib.R
+++ b/R/pkg/R/mllib.R
@@ -53,6 +53,13 @@ setClass("AFTSurvivalRegressionModel", representation(jobj = "jobj"))
#' @note KMeansModel since 2.0.0
setClass("KMeansModel", representation(jobj = "jobj"))
+#' S4 class that represents an IsotonicRegressionModel
+#'
+#' @param jobj a Java object reference to the backing Scala IsotonicRegressionModel
+#' @export
+#' @note IsotonicRegressionModel since 2.1.0
+setClass("IsotonicRegressionModel", representation(jobj = "jobj"))
+
#' Saves the MLlib model to the input path
#'
#' Saves the MLlib model to the input path. For more information, see the specific
@@ -62,6 +69,7 @@ setClass("KMeansModel", representation(jobj = "jobj"))
#' @export
#' @seealso \link{spark.glm}, \link{glm}
#' @seealso \link{spark.kmeans}, \link{spark.naiveBayes}, \link{spark.survreg}
+#' @seealso \link{spark.isoreg}
#' @seealso \link{read.ml}
NULL
@@ -74,6 +82,7 @@ NULL
#' @export
#' @seealso \link{spark.glm}, \link{glm}
#' @seealso \link{spark.kmeans}, \link{spark.naiveBayes}, \link{spark.survreg}
+#' @seealso \link{spark.isoreg}
NULL
#' Generalized Linear Models
@@ -299,6 +308,94 @@ setMethod("summary", signature(object = "NaiveBayesModel"),
return(list(apriori = apriori, tables = tables))
})
+#' Isotonic Regression Model
+#'
+#' Fits an Isotonic Regression model against a Spark DataFrame, similarly to R's isoreg().
+#' Users can print, make predictions on the produced model and save the model to the input path.
+#'
+#' @param data SparkDataFrame for training
+#' @param formula A symbolic description of the model to be fitted. Currently only a few formula
+#' operators are supported, including '~', '.', ':', '+', and '-'.
+#' @param isotonic Whether the output sequence should be isotonic/increasing (TRUE) or
+#' antitonic/decreasing (FALSE)
+#' @param featureIndex The index of the feature if \code{featuresCol} is a vector column (default: `0`),
+#' no effect otherwise
+#' @param weightCol The weight column name.
+#' @return \code{spark.isoreg} returns a fitted Isotonic Regression model
+#' @rdname spark.isoreg
+#' @aliases spark.isoreg,SparkDataFrame,formula-method
+#' @name spark.isoreg
+#' @export
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' data <- list(list(7.0, 0.0), list(5.0, 1.0), list(3.0, 2.0),
+#' list(5.0, 3.0), list(1.0, 4.0))
+#' df <- createDataFrame(data, c("label", "feature"))
+#' model <- spark.isoreg(df, label ~ feature, isotonic = FALSE)
+#' # return model boundaries and prediction as lists
+#' result <- summary(model, df)
+#' # prediction based on fitted model
+#' predict_data <- list(list(-2.0), list(-1.0), list(0.5),
+#' list(0.75), list(1.0), list(2.0), list(9.0))
+#' predict_df <- createDataFrame(predict_data, c("feature"))
+#' # get prediction column
+#' predict_result <- collect(select(predict(model, predict_df), "prediction"))
+#'
+#' # save fitted model to input path
+#' path <- "path/to/model"
+#' write.ml(model, path)
+#'
+#' # can also read back the saved model and print
+#' savedModel <- read.ml(path)
+#' summary(savedModel)
+#' }
+#' @note spark.isoreg since 2.1.0
+setMethod("spark.isoreg", signature(data = "SparkDataFrame", formula = "formula"),
+ function(data, formula, isotonic = TRUE, featureIndex = 0, weightCol = NULL) {
+ formula <- paste0(deparse(formula), collapse = "")
+
+ if (is.null(weightCol)) {
+ weightCol <- ""
+ }
+
+ jobj <- callJStatic("org.apache.spark.ml.r.IsotonicRegressionWrapper", "fit",
+ data@sdf, formula, as.logical(isotonic), as.integer(featureIndex),
+ as.character(weightCol))
+ return(new("IsotonicRegressionModel", jobj = jobj))
+ })
+
+# Predicted values based on an isotonicRegression model
+
+#' @param object a fitted IsotonicRegressionModel
+#' @param newData SparkDataFrame for testing
+#' @return \code{predict} returns a SparkDataFrame containing predicted values
+#' @rdname spark.isoreg
+#' @aliases predict,IsotonicRegressionModel,SparkDataFrame-method
+#' @export
+#' @note predict(IsotonicRegressionModel) since 2.1.0
+setMethod("predict", signature(object = "IsotonicRegressionModel"),
+ function(object, newData) {
+ return(dataFrame(callJMethod(object@jobj, "transform", newData@sdf)))
+ })
+
+# Get the summary of an IsotonicRegressionModel model
+
+#' @param object a fitted IsotonicRegressionModel
+#' @param ... Other optional arguments to summary of an IsotonicRegressionModel
+#' @return \code{summary} returns the model's boundaries and prediction as lists
+#' @rdname spark.isoreg
+#' @aliases summary,IsotonicRegressionModel-method
+#' @export
+#' @note summary(IsotonicRegressionModel) since 2.1.0
+setMethod("summary", signature(object = "IsotonicRegressionModel"),
+ function(object, ...) {
+ jobj <- object@jobj
+ boundaries <- callJMethod(jobj, "boundaries")
+ predictions <- callJMethod(jobj, "predictions")
+ return(list(boundaries = boundaries, predictions = predictions))
+ })
+
#' K-Means Clustering Model
#'
#' Fits a k-means clustering model against a Spark DataFrame, similarly to R's kmeans().
@@ -533,6 +630,25 @@ setMethod("write.ml", signature(object = "KMeansModel", path = "character"),
invisible(callJMethod(writer, "save", path))
})
+# Save fitted IsotonicRegressionModel to the input path
+
+#' @param path The directory where the model is saved
+#' @param overwrite Overwrites or not if the output path already exists. Default is FALSE
+#' which means throw exception if the output path exists.
+#'
+#' @rdname spark.isoreg
+#' @aliases write.ml,IsotonicRegressionModel,character-method
+#' @export
+#' @note write.ml(IsotonicRegression, character) since 2.1.0
+setMethod("write.ml", signature(object = "IsotonicRegressionModel", path = "character"),
+ function(object, path, overwrite = FALSE) {
+ writer <- callJMethod(object@jobj, "write")
+ if (overwrite) {
+ writer <- callJMethod(writer, "overwrite")
+ }
+ invisible(callJMethod(writer, "save", path))
+ })
+
#' Load a fitted MLlib model from the input path.
#'
#' @param path Path of the model to read.
@@ -558,6 +674,8 @@ read.ml <- function(path) {
return(new("GeneralizedLinearRegressionModel", jobj = jobj))
} else if (isInstanceOf(jobj, "org.apache.spark.ml.r.KMeansWrapper")) {
return(new("KMeansModel", jobj = jobj))
+ } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.IsotonicRegressionWrapper")) {
+ return(new("IsotonicRegressionModel", jobj = jobj))
} else {
stop(paste("Unsupported model: ", jobj))
}
diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R
index bc18224680..b759b28927 100644
--- a/R/pkg/inst/tests/testthat/test_mllib.R
+++ b/R/pkg/inst/tests/testthat/test_mllib.R
@@ -476,4 +476,36 @@ test_that("spark.survreg", {
}
})
+test_that("spark.isotonicRegression", {
+ label <- c(7.0, 5.0, 3.0, 5.0, 1.0)
+ feature <- c(0.0, 1.0, 2.0, 3.0, 4.0)
+ weight <- c(1.0, 1.0, 1.0, 1.0, 1.0)
+ data <- as.data.frame(cbind(label, feature, weight))
+ df <- suppressWarnings(createDataFrame(data))
+
+ model <- spark.isoreg(df, label ~ feature, isotonic = FALSE,
+ weightCol = "weight")
+ # only allow one variable on the right hand side of the formula
+ expect_error(model2 <- spark.isoreg(df, ~., isotonic = FALSE))
+ result <- summary(model, df)
+ expect_equal(result$predictions, list(7, 5, 4, 4, 1))
+
+ # Test model prediction
+ predict_data <- list(list(-2.0), list(-1.0), list(0.5),
+ list(0.75), list(1.0), list(2.0), list(9.0))
+ predict_df <- createDataFrame(predict_data, c("feature"))
+ predict_result <- collect(select(predict(model, predict_df), "prediction"))
+ expect_equal(predict_result$prediction, c(7.0, 7.0, 6.0, 5.5, 5.0, 4.0, 1.0))
+
+ # Test model save/load
+ modelPath <- tempfile(pattern = "spark-isotonicRegression", fileext = ".tmp")
+ write.ml(model, modelPath)
+ expect_error(write.ml(model, modelPath))
+ write.ml(model, modelPath, overwrite = TRUE)
+ model2 <- read.ml(modelPath)
+ expect_equal(result, summary(model2, df))
+
+ unlink(modelPath)
+})
+
sparkR.session.stop()
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala
new file mode 100644
index 0000000000..1ea80cb46a
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.r
+
+import org.apache.hadoop.fs.Path
+import org.json4s._
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.ml.{Pipeline, PipelineModel}
+import org.apache.spark.ml.attribute.{AttributeGroup}
+import org.apache.spark.ml.feature.RFormula
+import org.apache.spark.ml.regression.{IsotonicRegression, IsotonicRegressionModel}
+import org.apache.spark.ml.util._
+import org.apache.spark.sql.{DataFrame, Dataset}
+
+private[r] class IsotonicRegressionWrapper private (
+ val pipeline: PipelineModel,
+ val features: Array[String]) extends MLWritable {
+
+ private val isotonicRegressionModel: IsotonicRegressionModel =
+ pipeline.stages(1).asInstanceOf[IsotonicRegressionModel]
+
+ lazy val boundaries: Array[Double] = isotonicRegressionModel.boundaries.toArray
+
+ lazy val predictions: Array[Double] = isotonicRegressionModel.predictions.toArray
+
+ def transform(dataset: Dataset[_]): DataFrame = {
+ pipeline.transform(dataset).drop(isotonicRegressionModel.getFeaturesCol)
+ }
+
+ override def write: MLWriter = new IsotonicRegressionWrapper.IsotonicRegressionWrapperWriter(this)
+}
+
+private[r] object IsotonicRegressionWrapper
+ extends MLReadable[IsotonicRegressionWrapper] {
+
+ def fit(
+ data: DataFrame,
+ formula: String,
+ isotonic: Boolean,
+ featureIndex: Int,
+ weightCol: String): IsotonicRegressionWrapper = {
+
+ val rFormulaModel = new RFormula()
+ .setFormula(formula)
+ .setFeaturesCol("features")
+ .fit(data)
+
+ // get feature names from output schema
+ val schema = rFormulaModel.transform(data).schema
+ val featureAttrs = AttributeGroup.fromStructField(schema(rFormulaModel.getFeaturesCol))
+ .attributes.get
+ val features = featureAttrs.map(_.name.get)
+ require(features.size == 1)
+
+ // assemble and fit the pipeline
+ val isotonicRegression = new IsotonicRegression()
+ .setIsotonic(isotonic)
+ .setFeatureIndex(featureIndex)
+ .setWeightCol(weightCol)
+
+ val pipeline = new Pipeline()
+ .setStages(Array(rFormulaModel, isotonicRegression))
+ .fit(data)
+
+ new IsotonicRegressionWrapper(pipeline, features)
+ }
+
+ override def read: MLReader[IsotonicRegressionWrapper] = new IsotonicRegressionWrapperReader
+
+ override def load(path: String): IsotonicRegressionWrapper = super.load(path)
+
+ class IsotonicRegressionWrapperWriter(instance: IsotonicRegressionWrapper) extends MLWriter {
+
+ override protected def saveImpl(path: String): Unit = {
+ val rMetadataPath = new Path(path, "rMetadata").toString
+ val pipelinePath = new Path(path, "pipeline").toString
+
+ val rMetadata = ("class" -> instance.getClass.getName) ~
+ ("features" -> instance.features.toSeq)
+ val rMetadataJson: String = compact(render(rMetadata))
+ sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+
+ instance.pipeline.save(pipelinePath)
+ }
+ }
+
+ class IsotonicRegressionWrapperReader extends MLReader[IsotonicRegressionWrapper] {
+
+ override def load(path: String): IsotonicRegressionWrapper = {
+ implicit val format = DefaultFormats
+ val rMetadataPath = new Path(path, "rMetadata").toString
+ val pipelinePath = new Path(path, "pipeline").toString
+
+ val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+ val rMetadata = parse(rMetadataStr)
+ val features = (rMetadata \ "features").extract[Array[String]]
+
+ val pipeline = PipelineModel.load(pipelinePath)
+ new IsotonicRegressionWrapper(pipeline, features)
+ }
+ }
+}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala b/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
index 568c160ee5..f9a44d60e6 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
@@ -44,6 +44,8 @@ private[r] object RWrappers extends MLReader[Object] {
GeneralizedLinearRegressionWrapper.load(path)
case "org.apache.spark.ml.r.KMeansWrapper" =>
KMeansWrapper.load(path)
+ case "org.apache.spark.ml.r.IsotonicRegressionWrapper" =>
+ IsotonicRegressionWrapper.load(path)
case _ =>
throw new SparkException(s"SparkR read.ml does not support load $className")
}