diff options
author | Yanbo Liang <ybliang8@gmail.com> | 2016-11-16 01:04:18 -0800 |
---|---|---|
committer | Yanbo Liang <ybliang8@gmail.com> | 2016-11-16 01:04:18 -0800 |
commit | 95eb06bd7d0f7110ef62c8d1cb6337c72b10d99f (patch) | |
tree | 24610489c5c3c41a4f6e29da144abcc28dcbdfc0 /mllib/src/main/scala | |
parent | 4ac9759f807d217b6f67badc6d5f6b7138eb92d2 (diff) | |
download | spark-95eb06bd7d0f7110ef62c8d1cb6337c72b10d99f.tar.gz spark-95eb06bd7d0f7110ef62c8d1cb6337c72b10d99f.tar.bz2 spark-95eb06bd7d0f7110ef62c8d1cb6337c72b10d99f.zip |
[SPARK-18438][SPARKR][ML] spark.mlp should support RFormula.
## What changes were proposed in this pull request?
```spark.mlp``` should support ```RFormula``` like other ML algorithm wrappers.
BTW, I did some cleanup and improvement for ```spark.mlp```.
## How was this patch tested?
Unit tests.
Author: Yanbo Liang <ybliang8@gmail.com>
Closes #15883 from yanboliang/spark-18438.
Diffstat (limited to 'mllib/src/main/scala')
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala | 61 |
1 files changed, 34 insertions, 27 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala index 2193eb80e9..d34de30931 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala @@ -24,19 +24,29 @@ import org.json4s.jackson.JsonMethods._ import org.apache.spark.ml.{Pipeline, PipelineModel} import org.apache.spark.ml.classification.{MultilayerPerceptronClassificationModel, MultilayerPerceptronClassifier} +import org.apache.spark.ml.feature.{IndexToString, RFormula} import org.apache.spark.ml.linalg.Vectors +import org.apache.spark.ml.r.RWrapperUtils._ import org.apache.spark.ml.util.{MLReadable, MLReader, MLWritable, MLWriter} import org.apache.spark.sql.{DataFrame, Dataset} private[r] class MultilayerPerceptronClassifierWrapper private ( - val pipeline: PipelineModel, - val labelCount: Long, - val layers: Array[Int], - val weights: Array[Double] + val pipeline: PipelineModel ) extends MLWritable { + import MultilayerPerceptronClassifierWrapper._ + + val mlpModel: MultilayerPerceptronClassificationModel = + pipeline.stages(1).asInstanceOf[MultilayerPerceptronClassificationModel] + + val weights: Array[Double] = mlpModel.weights.toArray + val layers: Array[Int] = mlpModel.layers + def transform(dataset: Dataset[_]): DataFrame = { pipeline.transform(dataset) + .drop(mlpModel.getFeaturesCol) + .drop(mlpModel.getLabelCol) + .drop(PREDICTED_LABEL_INDEX_COL) } /** @@ -49,10 +59,12 @@ private[r] class MultilayerPerceptronClassifierWrapper private ( private[r] object MultilayerPerceptronClassifierWrapper extends MLReadable[MultilayerPerceptronClassifierWrapper] { + val PREDICTED_LABEL_INDEX_COL = "pred_label_idx" val PREDICTED_LABEL_COL = "prediction" def fit( data: DataFrame, + formula: String, blockSize: Int, layers: Array[Int], solver: String, @@ -62,8 +74,13 @@ private[r] object MultilayerPerceptronClassifierWrapper seed: String, initialWeights: Array[Double] ): MultilayerPerceptronClassifierWrapper = { + val rFormula = new RFormula() + .setFormula(formula) + .setForceIndexLabel(true) + checkDataColumns(rFormula, data) + val rFormulaModel = rFormula.fit(data) // get labels and feature names from output schema - val schema = data.schema + val (_, labels) = getFeaturesAndLabels(rFormulaModel, data) // assemble and fit the pipeline val mlp = new MultilayerPerceptronClassifier() @@ -73,25 +90,25 @@ private[r] object MultilayerPerceptronClassifierWrapper .setMaxIter(maxIter) .setTol(tol) .setStepSize(stepSize) - .setPredictionCol(PREDICTED_LABEL_COL) + .setFeaturesCol(rFormula.getFeaturesCol) + .setLabelCol(rFormula.getLabelCol) + .setPredictionCol(PREDICTED_LABEL_INDEX_COL) if (seed != null && seed.length > 0) mlp.setSeed(seed.toInt) if (initialWeights != null) { require(initialWeights.length > 0) mlp.setInitialWeights(Vectors.dense(initialWeights)) } + val idxToStr = new IndexToString() + .setInputCol(PREDICTED_LABEL_INDEX_COL) + .setOutputCol(PREDICTED_LABEL_COL) + .setLabels(labels) + val pipeline = new Pipeline() - .setStages(Array(mlp)) + .setStages(Array(rFormulaModel, mlp, idxToStr)) .fit(data) - val multilayerPerceptronClassificationModel: MultilayerPerceptronClassificationModel = - pipeline.stages.head.asInstanceOf[MultilayerPerceptronClassificationModel] - - val weights = multilayerPerceptronClassificationModel.weights.toArray - val layersFromPipeline = multilayerPerceptronClassificationModel.layers - val labelCount = data.select("label").distinct().count() - - new MultilayerPerceptronClassifierWrapper(pipeline, labelCount, layersFromPipeline, weights) + new MultilayerPerceptronClassifierWrapper(pipeline) } /** @@ -107,17 +124,10 @@ private[r] object MultilayerPerceptronClassifierWrapper override def load(path: String): MultilayerPerceptronClassifierWrapper = { implicit val format = DefaultFormats - val rMetadataPath = new Path(path, "rMetadata").toString val pipelinePath = new Path(path, "pipeline").toString - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() - val rMetadata = parse(rMetadataStr) - val labelCount = (rMetadata \ "labelCount").extract[Long] - val layers = (rMetadata \ "layers").extract[Array[Int]] - val weights = (rMetadata \ "weights").extract[Array[Double]] - val pipeline = PipelineModel.load(pipelinePath) - new MultilayerPerceptronClassifierWrapper(pipeline, labelCount, layers, weights) + new MultilayerPerceptronClassifierWrapper(pipeline) } } @@ -128,10 +138,7 @@ private[r] object MultilayerPerceptronClassifierWrapper val rMetadataPath = new Path(path, "rMetadata").toString val pipelinePath = new Path(path, "pipeline").toString - val rMetadata = ("class" -> instance.getClass.getName) ~ - ("labelCount" -> instance.labelCount) ~ - ("layers" -> instance.layers.toSeq) ~ - ("weights" -> instance.weights.toArray.toSeq) + val rMetadata = "class" -> instance.getClass.getName val rMetadataJson: String = compact(render(rMetadata)) sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) |