aboutsummaryrefslogtreecommitdiff
path: root/mllib/src/main
diff options
context:
space:
mode:
authorYanbo Liang <ybliang8@gmail.com>2016-11-16 01:04:18 -0800
committerYanbo Liang <ybliang8@gmail.com>2016-11-16 01:04:18 -0800
commit95eb06bd7d0f7110ef62c8d1cb6337c72b10d99f (patch)
tree24610489c5c3c41a4f6e29da144abcc28dcbdfc0 /mllib/src/main
parent4ac9759f807d217b6f67badc6d5f6b7138eb92d2 (diff)
downloadspark-95eb06bd7d0f7110ef62c8d1cb6337c72b10d99f.tar.gz
spark-95eb06bd7d0f7110ef62c8d1cb6337c72b10d99f.tar.bz2
spark-95eb06bd7d0f7110ef62c8d1cb6337c72b10d99f.zip
[SPARK-18438][SPARKR][ML] spark.mlp should support RFormula.
## What changes were proposed in this pull request? ```spark.mlp``` should support ```RFormula``` like other ML algorithm wrappers. BTW, I did some cleanup and improvement for ```spark.mlp```. ## How was this patch tested? Unit tests. Author: Yanbo Liang <ybliang8@gmail.com> Closes #15883 from yanboliang/spark-18438.
Diffstat (limited to 'mllib/src/main')
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala61
1 files changed, 34 insertions, 27 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala
index 2193eb80e9..d34de30931 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala
@@ -24,19 +24,29 @@ import org.json4s.jackson.JsonMethods._
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.{MultilayerPerceptronClassificationModel, MultilayerPerceptronClassifier}
+import org.apache.spark.ml.feature.{IndexToString, RFormula}
import org.apache.spark.ml.linalg.Vectors
+import org.apache.spark.ml.r.RWrapperUtils._
import org.apache.spark.ml.util.{MLReadable, MLReader, MLWritable, MLWriter}
import org.apache.spark.sql.{DataFrame, Dataset}
private[r] class MultilayerPerceptronClassifierWrapper private (
- val pipeline: PipelineModel,
- val labelCount: Long,
- val layers: Array[Int],
- val weights: Array[Double]
+ val pipeline: PipelineModel
) extends MLWritable {
+ import MultilayerPerceptronClassifierWrapper._
+
+ val mlpModel: MultilayerPerceptronClassificationModel =
+ pipeline.stages(1).asInstanceOf[MultilayerPerceptronClassificationModel]
+
+ val weights: Array[Double] = mlpModel.weights.toArray
+ val layers: Array[Int] = mlpModel.layers
+
def transform(dataset: Dataset[_]): DataFrame = {
pipeline.transform(dataset)
+ .drop(mlpModel.getFeaturesCol)
+ .drop(mlpModel.getLabelCol)
+ .drop(PREDICTED_LABEL_INDEX_COL)
}
/**
@@ -49,10 +59,12 @@ private[r] class MultilayerPerceptronClassifierWrapper private (
private[r] object MultilayerPerceptronClassifierWrapper
extends MLReadable[MultilayerPerceptronClassifierWrapper] {
+ val PREDICTED_LABEL_INDEX_COL = "pred_label_idx"
val PREDICTED_LABEL_COL = "prediction"
def fit(
data: DataFrame,
+ formula: String,
blockSize: Int,
layers: Array[Int],
solver: String,
@@ -62,8 +74,13 @@ private[r] object MultilayerPerceptronClassifierWrapper
seed: String,
initialWeights: Array[Double]
): MultilayerPerceptronClassifierWrapper = {
+ val rFormula = new RFormula()
+ .setFormula(formula)
+ .setForceIndexLabel(true)
+ checkDataColumns(rFormula, data)
+ val rFormulaModel = rFormula.fit(data)
// get labels and feature names from output schema
- val schema = data.schema
+ val (_, labels) = getFeaturesAndLabels(rFormulaModel, data)
// assemble and fit the pipeline
val mlp = new MultilayerPerceptronClassifier()
@@ -73,25 +90,25 @@ private[r] object MultilayerPerceptronClassifierWrapper
.setMaxIter(maxIter)
.setTol(tol)
.setStepSize(stepSize)
- .setPredictionCol(PREDICTED_LABEL_COL)
+ .setFeaturesCol(rFormula.getFeaturesCol)
+ .setLabelCol(rFormula.getLabelCol)
+ .setPredictionCol(PREDICTED_LABEL_INDEX_COL)
if (seed != null && seed.length > 0) mlp.setSeed(seed.toInt)
if (initialWeights != null) {
require(initialWeights.length > 0)
mlp.setInitialWeights(Vectors.dense(initialWeights))
}
+ val idxToStr = new IndexToString()
+ .setInputCol(PREDICTED_LABEL_INDEX_COL)
+ .setOutputCol(PREDICTED_LABEL_COL)
+ .setLabels(labels)
+
val pipeline = new Pipeline()
- .setStages(Array(mlp))
+ .setStages(Array(rFormulaModel, mlp, idxToStr))
.fit(data)
- val multilayerPerceptronClassificationModel: MultilayerPerceptronClassificationModel =
- pipeline.stages.head.asInstanceOf[MultilayerPerceptronClassificationModel]
-
- val weights = multilayerPerceptronClassificationModel.weights.toArray
- val layersFromPipeline = multilayerPerceptronClassificationModel.layers
- val labelCount = data.select("label").distinct().count()
-
- new MultilayerPerceptronClassifierWrapper(pipeline, labelCount, layersFromPipeline, weights)
+ new MultilayerPerceptronClassifierWrapper(pipeline)
}
/**
@@ -107,17 +124,10 @@ private[r] object MultilayerPerceptronClassifierWrapper
override def load(path: String): MultilayerPerceptronClassifierWrapper = {
implicit val format = DefaultFormats
- val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString
- val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
- val rMetadata = parse(rMetadataStr)
- val labelCount = (rMetadata \ "labelCount").extract[Long]
- val layers = (rMetadata \ "layers").extract[Array[Int]]
- val weights = (rMetadata \ "weights").extract[Array[Double]]
-
val pipeline = PipelineModel.load(pipelinePath)
- new MultilayerPerceptronClassifierWrapper(pipeline, labelCount, layers, weights)
+ new MultilayerPerceptronClassifierWrapper(pipeline)
}
}
@@ -128,10 +138,7 @@ private[r] object MultilayerPerceptronClassifierWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString
- val rMetadata = ("class" -> instance.getClass.getName) ~
- ("labelCount" -> instance.labelCount) ~
- ("layers" -> instance.layers.toSeq) ~
- ("weights" -> instance.weights.toArray.toSeq)
+ val rMetadata = "class" -> instance.getClass.getName
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)