aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--R/pkg/NAMESPACE1
-rw-r--r--R/pkg/R/generics.R4
-rw-r--r--R/pkg/R/mllib.R125
-rw-r--r--R/pkg/inst/tests/testthat/test_mllib.R32
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala134
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala2
6 files changed, 293 insertions, 5 deletions
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 709057675e..ad587a6b7d 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -27,6 +27,7 @@ exportMethods("glm",
"summary",
"spark.kmeans",
"fitted",
+ "spark.mlp",
"spark.naiveBayes",
"spark.survreg",
"spark.lda",
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 88884e6257..7e626be508 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -1330,6 +1330,10 @@ setGeneric("spark.kmeans", function(data, formula, ...) { standardGeneric("spark
#' @export
setGeneric("fitted")
+#' @rdname spark.mlp
+#' @export
+setGeneric("spark.mlp", function(data, ...) { standardGeneric("spark.mlp") })
+
#' @rdname spark.naiveBayes
#' @export
setGeneric("spark.naiveBayes", function(data, formula, ...) { standardGeneric("spark.naiveBayes") })
diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R
index a40310d194..a670600ca6 100644
--- a/R/pkg/R/mllib.R
+++ b/R/pkg/R/mllib.R
@@ -60,6 +60,13 @@ setClass("AFTSurvivalRegressionModel", representation(jobj = "jobj"))
#' @note KMeansModel since 2.0.0
setClass("KMeansModel", representation(jobj = "jobj"))
+#' S4 class that represents a MultilayerPerceptronClassificationModel
+#'
+#' @param jobj a Java object reference to the backing Scala MultilayerPerceptronClassifierWrapper
+#' @export
+#' @note MultilayerPerceptronClassificationModel since 2.1.0
+setClass("MultilayerPerceptronClassificationModel", representation(jobj = "jobj"))
+
#' S4 class that represents an IsotonicRegressionModel
#'
#' @param jobj a Java object reference to the backing Scala IsotonicRegressionModel
@@ -90,7 +97,7 @@ setClass("ALSModel", representation(jobj = "jobj"))
#' @export
#' @seealso \link{spark.glm}, \link{glm},
#' @seealso \link{spark.als}, \link{spark.gaussianMixture}, \link{spark.isoreg}, \link{spark.kmeans},
-#' @seealso \link{spark.lda}, \link{spark.naiveBayes}, \link{spark.survreg},
+#' @seealso \link{spark.lda}, \link{spark.mlp}, \link{spark.naiveBayes}, \link{spark.survreg}
#' @seealso \link{read.ml}
NULL
@@ -103,7 +110,7 @@ NULL
#' @export
#' @seealso \link{spark.glm}, \link{glm},
#' @seealso \link{spark.als}, \link{spark.gaussianMixture}, \link{spark.isoreg}, \link{spark.kmeans},
-#' @seealso \link{spark.naiveBayes}, \link{spark.survreg},
+#' @seealso \link{spark.mlp}, \link{spark.naiveBayes}, \link{spark.survreg}
NULL
write_internal <- function(object, path, overwrite = FALSE) {
@@ -631,6 +638,95 @@ setMethod("predict", signature(object = "KMeansModel"),
predict_internal(object, newData)
})
+#' Multilayer Perceptron Classification Model
+#'
+#' \code{spark.mlp} fits a multi-layer perceptron neural network model against a SparkDataFrame.
+#' Users can call \code{summary} to print a summary of the fitted model, \code{predict} to make
+#' predictions on new data, and \code{write.ml}/\code{read.ml} to save/load fitted models.
+#' Only categorical data is supported.
+#' For more details, see
+#' \href{http://spark.apache.org/docs/latest/ml-classification-regression.html}{
+#' Multilayer Perceptron}
+#'
+#' @param data a \code{SparkDataFrame} of observations and labels for model fitting.
+#' @param blockSize blockSize parameter.
+#' @param layers integer vector containing the number of nodes for each layer
+#' @param solver solver parameter, supported options: "gd" (minibatch gradient descent) or "l-bfgs".
+#' @param maxIter maximum iteration number.
+#' @param tol convergence tolerance of iterations.
+#' @param stepSize stepSize parameter.
+#' @param seed seed parameter for weights initialization.
+#' @param ... additional arguments passed to the method.
+#' @return \code{spark.mlp} returns a fitted Multilayer Perceptron Classification Model.
+#' @rdname spark.mlp
+#' @aliases spark.mlp,SparkDataFrame-method
+#' @name spark.mlp
+#' @seealso \link{read.ml}
+#' @export
+#' @examples
+#' \dontrun{
+#' df <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
+#'
+#' # fit a Multilayer Perceptron Classification Model
+#' model <- spark.mlp(df, blockSize = 128, layers = c(4, 5, 4, 3), solver = "l-bfgs",
+#' maxIter = 100, tol = 0.5, stepSize = 1, seed = 1)
+#'
+#' # get the summary of the model
+#' summary(model)
+#'
+#' # make predictions
+#' predictions <- predict(model, df)
+#'
+#' # save and load the model
+#' path <- "path/to/model"
+#' write.ml(model, path)
+#' savedModel <- read.ml(path)
+#' summary(savedModel)
+#' }
+#' @note spark.mlp since 2.1.0
+setMethod("spark.mlp", signature(data = "SparkDataFrame"),
+ function(data, blockSize = 128, layers = c(3, 5, 2), solver = "l-bfgs", maxIter = 100,
+ tol = 0.5, stepSize = 1, seed = 1) {
+ jobj <- callJStatic("org.apache.spark.ml.r.MultilayerPerceptronClassifierWrapper",
+ "fit", data@sdf, as.integer(blockSize), as.array(layers),
+ as.character(solver), as.integer(maxIter), as.numeric(tol),
+ as.numeric(stepSize), as.integer(seed))
+ new("MultilayerPerceptronClassificationModel", jobj = jobj)
+ })
+
+# Makes predictions from a model produced by spark.mlp().
+
+#' @param newData a SparkDataFrame for testing.
+#' @return \code{predict} returns a SparkDataFrame containing predicted labeled in a column named
+#' "prediction".
+#' @rdname spark.mlp
+#' @aliases predict,MultilayerPerceptronClassificationModel-method
+#' @export
+#' @note predict(MultilayerPerceptronClassificationModel) since 2.1.0
+setMethod("predict", signature(object = "MultilayerPerceptronClassificationModel"),
+ function(object, newData) {
+ predict_internal(object, newData)
+ })
+
+# Returns the summary of a Multilayer Perceptron Classification Model produced by \code{spark.mlp}
+
+#' @param object a Multilayer Perceptron Classification Model fitted by \code{spark.mlp}
+#' @return \code{summary} returns a list containing \code{layers}, the label distribution, and
+#' \code{tables}, conditional probabilities given the target label.
+#' @rdname spark.mlp
+#' @export
+#' @aliases summary,MultilayerPerceptronClassificationModel-method
+#' @note summary(MultilayerPerceptronClassificationModel) since 2.1.0
+setMethod("summary", signature(object = "MultilayerPerceptronClassificationModel"),
+ function(object) {
+ jobj <- object@jobj
+ labelCount <- callJMethod(jobj, "labelCount")
+ layers <- unlist(callJMethod(jobj, "layers"))
+ weights <- callJMethod(jobj, "weights")
+ weights <- matrix(weights, nrow = length(weights))
+ list(labelCount = labelCount, layers = layers, weights = weights)
+ })
+
#' Naive Bayes Models
#'
#' \code{spark.naiveBayes} fits a Bernoulli naive Bayes model against a SparkDataFrame.
@@ -685,7 +781,7 @@ setMethod("spark.naiveBayes", signature(data = "SparkDataFrame", formula = "form
#'
#' @rdname spark.naiveBayes
#' @export
-#' @seealso \link{read.ml}
+#' @seealso \link{write.ml}
#' @note write.ml(NaiveBayesModel, character) since 2.0.0
setMethod("write.ml", signature(object = "NaiveBayesModel", path = "character"),
function(object, path, overwrite = FALSE) {
@@ -700,7 +796,7 @@ setMethod("write.ml", signature(object = "NaiveBayesModel", path = "character"),
#' @rdname spark.survreg
#' @export
#' @note write.ml(AFTSurvivalRegressionModel, character) since 2.0.0
-#' @seealso \link{read.ml}
+#' @seealso \link{write.ml}
setMethod("write.ml", signature(object = "AFTSurvivalRegressionModel", path = "character"),
function(object, path, overwrite = FALSE) {
write_internal(object, path, overwrite)
@@ -734,6 +830,23 @@ setMethod("write.ml", signature(object = "KMeansModel", path = "character"),
write_internal(object, path, overwrite)
})
+# Saves the Multilayer Perceptron Classification Model to the input path.
+
+#' @param path the directory where the model is saved.
+#' @param overwrite overwrites or not if the output path already exists. Default is FALSE
+#' which means throw exception if the output path exists.
+#'
+#' @rdname spark.mlp
+#' @aliases write.ml,MultilayerPerceptronClassificationModel,character-method
+#' @export
+#' @seealso \link{write.ml}
+#' @note write.ml(MultilayerPerceptronClassificationModel, character) since 2.1.0
+setMethod("write.ml", signature(object = "MultilayerPerceptronClassificationModel",
+ path = "character"),
+ function(object, path, overwrite = FALSE) {
+ write_internal(object, path, overwrite)
+ })
+
# Save fitted IsotonicRegressionModel to the input path
#' @param path The directory where the model is saved
@@ -791,6 +904,8 @@ read.ml <- function(path) {
new("KMeansModel", jobj = jobj)
} else if (isInstanceOf(jobj, "org.apache.spark.ml.r.LDAWrapper")) {
new("LDAModel", jobj = jobj)
+ } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.MultilayerPerceptronClassifierWrapper")) {
+ new("MultilayerPerceptronClassificationModel", jobj = jobj)
} else if (isInstanceOf(jobj, "org.apache.spark.ml.r.IsotonicRegressionWrapper")) {
new("IsotonicRegressionModel", jobj = jobj)
} else if (isInstanceOf(jobj, "org.apache.spark.ml.r.GaussianMixtureWrapper")) {
@@ -798,7 +913,7 @@ read.ml <- function(path) {
} else if (isInstanceOf(jobj, "org.apache.spark.ml.r.ALSWrapper")) {
new("ALSModel", jobj = jobj)
} else {
- stop(paste("Unsupported model: ", jobj))
+ stop("Unsupported model: ", jobj)
}
}
diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R
index de9bd48662..1e6da650d1 100644
--- a/R/pkg/inst/tests/testthat/test_mllib.R
+++ b/R/pkg/inst/tests/testthat/test_mllib.R
@@ -347,6 +347,38 @@ test_that("spark.kmeans", {
unlink(modelPath)
})
+test_that("spark.mlp", {
+ df <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
+ model <- spark.mlp(df, blockSize = 128, layers = c(4, 5, 4, 3), solver = "l-bfgs", maxIter = 100,
+ tol = 0.5, stepSize = 1, seed = 1)
+
+ # Test summary method
+ summary <- summary(model)
+ expect_equal(summary$labelCount, 3)
+ expect_equal(summary$layers, c(4, 5, 4, 3))
+ expect_equal(length(summary$weights), 64)
+
+ # Test predict method
+ mlpTestDF <- df
+ mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
+ expect_equal(head(mlpPredictions$prediction, 6), c(0, 1, 1, 1, 1, 1))
+
+ # Test model save/load
+ modelPath <- tempfile(pattern = "spark-mlp", fileext = ".tmp")
+ write.ml(model, modelPath)
+ expect_error(write.ml(model, modelPath))
+ write.ml(model, modelPath, overwrite = TRUE)
+ model2 <- read.ml(modelPath)
+ summary2 <- summary(model2)
+
+ expect_equal(summary2$labelCount, 3)
+ expect_equal(summary2$layers, c(4, 5, 4, 3))
+ expect_equal(length(summary2$weights), 64)
+
+ unlink(modelPath)
+
+})
+
test_that("spark.naiveBayes", {
# R code to reproduce the result.
# We do not support instance weights yet. So we ignore the frequencies.
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala
new file mode 100644
index 0000000000..be51e74187
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.r
+
+import org.apache.hadoop.fs.Path
+import org.json4s._
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.ml.{Pipeline, PipelineModel}
+import org.apache.spark.ml.classification.{MultilayerPerceptronClassificationModel, MultilayerPerceptronClassifier}
+import org.apache.spark.ml.util.{MLReadable, MLReader, MLWritable, MLWriter}
+import org.apache.spark.sql.{DataFrame, Dataset}
+
+private[r] class MultilayerPerceptronClassifierWrapper private (
+ val pipeline: PipelineModel,
+ val labelCount: Long,
+ val layers: Array[Int],
+ val weights: Array[Double]
+ ) extends MLWritable {
+
+ def transform(dataset: Dataset[_]): DataFrame = {
+ pipeline.transform(dataset)
+ }
+
+ /**
+ * Returns an [[MLWriter]] instance for this ML instance.
+ */
+ override def write: MLWriter =
+ new MultilayerPerceptronClassifierWrapper.MultilayerPerceptronClassifierWrapperWriter(this)
+}
+
+private[r] object MultilayerPerceptronClassifierWrapper
+ extends MLReadable[MultilayerPerceptronClassifierWrapper] {
+
+ val PREDICTED_LABEL_COL = "prediction"
+
+ def fit(
+ data: DataFrame,
+ blockSize: Int,
+ layers: Array[Double],
+ solver: String,
+ maxIter: Int,
+ tol: Double,
+ stepSize: Double,
+ seed: Int
+ ): MultilayerPerceptronClassifierWrapper = {
+ // get labels and feature names from output schema
+ val schema = data.schema
+
+ // assemble and fit the pipeline
+ val mlp = new MultilayerPerceptronClassifier()
+ .setLayers(layers.map(_.toInt))
+ .setBlockSize(blockSize)
+ .setSolver(solver)
+ .setMaxIter(maxIter)
+ .setTol(tol)
+ .setStepSize(stepSize)
+ .setSeed(seed)
+ .setPredictionCol(PREDICTED_LABEL_COL)
+ val pipeline = new Pipeline()
+ .setStages(Array(mlp))
+ .fit(data)
+
+ val multilayerPerceptronClassificationModel: MultilayerPerceptronClassificationModel =
+ pipeline.stages.head.asInstanceOf[MultilayerPerceptronClassificationModel]
+
+ val weights = multilayerPerceptronClassificationModel.weights.toArray
+ val layersFromPipeline = multilayerPerceptronClassificationModel.layers
+ val labelCount = data.select("label").distinct().count()
+
+ new MultilayerPerceptronClassifierWrapper(pipeline, labelCount, layersFromPipeline, weights)
+ }
+
+ /**
+ * Returns an [[MLReader]] instance for this class.
+ */
+ override def read: MLReader[MultilayerPerceptronClassifierWrapper] =
+ new MultilayerPerceptronClassifierWrapperReader
+
+ override def load(path: String): MultilayerPerceptronClassifierWrapper = super.load(path)
+
+ class MultilayerPerceptronClassifierWrapperReader
+ extends MLReader[MultilayerPerceptronClassifierWrapper]{
+
+ override def load(path: String): MultilayerPerceptronClassifierWrapper = {
+ implicit val format = DefaultFormats
+ val rMetadataPath = new Path(path, "rMetadata").toString
+ val pipelinePath = new Path(path, "pipeline").toString
+
+ val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+ val rMetadata = parse(rMetadataStr)
+ val labelCount = (rMetadata \ "labelCount").extract[Long]
+ val layers = (rMetadata \ "layers").extract[Array[Int]]
+ val weights = (rMetadata \ "weights").extract[Array[Double]]
+
+ val pipeline = PipelineModel.load(pipelinePath)
+ new MultilayerPerceptronClassifierWrapper(pipeline, labelCount, layers, weights)
+ }
+ }
+
+ class MultilayerPerceptronClassifierWrapperWriter(instance: MultilayerPerceptronClassifierWrapper)
+ extends MLWriter {
+
+ override protected def saveImpl(path: String): Unit = {
+ val rMetadataPath = new Path(path, "rMetadata").toString
+ val pipelinePath = new Path(path, "pipeline").toString
+
+ val rMetadata = ("class" -> instance.getClass.getName) ~
+ ("labelCount" -> instance.labelCount) ~
+ ("layers" -> instance.layers.toSeq) ~
+ ("weights" -> instance.weights.toArray.toSeq)
+ val rMetadataJson: String = compact(render(rMetadata))
+ sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+
+ instance.pipeline.save(pipelinePath)
+ }
+ }
+}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala b/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
index 51a65f7fc4..d64de1b6ab 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
@@ -44,6 +44,8 @@ private[r] object RWrappers extends MLReader[Object] {
GeneralizedLinearRegressionWrapper.load(path)
case "org.apache.spark.ml.r.KMeansWrapper" =>
KMeansWrapper.load(path)
+ case "org.apache.spark.ml.r.MultilayerPerceptronClassifierWrapper" =>
+ MultilayerPerceptronClassifierWrapper.load(path)
case "org.apache.spark.ml.r.LDAWrapper" =>
LDAWrapper.load(path)
case "org.apache.spark.ml.r.IsotonicRegressionWrapper" =>