diff options
author | Xusen Yin <yinxusen@gmail.com> | 2016-03-22 14:16:51 -0700 |
---|---|---|
committer | Xiangrui Meng <meng@databricks.com> | 2016-03-22 14:16:51 -0700 |
commit | d6dc12ef0146ae409834c78737c116050961f350 (patch) | |
tree | 7e99255f2a15ee2d088677253465ec6951b0a8d4 /mllib/src/main | |
parent | b2b1ad7d4cc3b3469c3d2c841b40b58ed0e34447 (diff) | |
download | spark-d6dc12ef0146ae409834c78737c116050961f350.tar.gz spark-d6dc12ef0146ae409834c78737c116050961f350.tar.bz2 spark-d6dc12ef0146ae409834c78737c116050961f350.zip |
[SPARK-13449] Naive Bayes wrapper in SparkR
## What changes were proposed in this pull request?
This PR continues the work in #11486 from yinxusen with some code refactoring. In R package e1071, `naiveBayes` supports both categorical (Bernoulli) and continuous features (Gaussian), while in MLlib we support Bernoulli and multinomial. This PR implements the common subset: Bernoulli.
I moved the implementation out from SparkRWrappers to NaiveBayesWrapper to make it easier to read. Argument names, default values, and summary now match e1071's naiveBayes.
I removed the preprocess part that omit NA values because we don't know which columns to process.
## How was this patch tested?
Test against output from R package e1071's naiveBayes.
cc: yanboliang yinxusen
Closes #11486
Author: Xusen Yin <yinxusen@gmail.com>
Author: Xiangrui Meng <meng@databricks.com>
Closes #11890 from mengxr/SPARK-13449.
Diffstat (limited to 'mllib/src/main')
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala | 75 |
1 files changed, 75 insertions, 0 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala new file mode 100644 index 0000000000..07383d393d --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.r + +import org.apache.spark.ml.{Pipeline, PipelineModel} +import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NominalAttribute} +import org.apache.spark.ml.classification.{NaiveBayes, NaiveBayesModel} +import org.apache.spark.ml.feature.{IndexToString, RFormula} +import org.apache.spark.sql.DataFrame + +private[r] class NaiveBayesWrapper private ( + pipeline: PipelineModel, + val labels: Array[String], + val features: Array[String]) { + + import NaiveBayesWrapper._ + + private val naiveBayesModel: NaiveBayesModel = pipeline.stages(1).asInstanceOf[NaiveBayesModel] + + lazy val apriori: Array[Double] = naiveBayesModel.pi.toArray.map(math.exp) + + lazy val tables: Array[Double] = naiveBayesModel.theta.toArray.map(math.exp) + + def transform(dataset: DataFrame): DataFrame = { + pipeline.transform(dataset).drop(PREDICTED_LABEL_INDEX_COL) + } +} + +private[r] object NaiveBayesWrapper { + + val PREDICTED_LABEL_INDEX_COL = "pred_label_idx" + val PREDICTED_LABEL_COL = "prediction" + + def fit(formula: String, data: DataFrame, laplace: Double): NaiveBayesWrapper = { + val rFormula = new RFormula() + .setFormula(formula) + .fit(data) + // get labels and feature names from output schema + val schema = rFormula.transform(data).schema + val labelAttr = Attribute.fromStructField(schema(rFormula.getLabelCol)) + .asInstanceOf[NominalAttribute] + val labels = labelAttr.values.get + val featureAttrs = AttributeGroup.fromStructField(schema(rFormula.getFeaturesCol)) + .attributes.get + val features = featureAttrs.map(_.name.get) + // assemble and fit the pipeline + val naiveBayes = new NaiveBayes() + .setSmoothing(laplace) + .setModelType("bernoulli") + .setPredictionCol(PREDICTED_LABEL_INDEX_COL) + val idxToStr = new IndexToString() + .setInputCol(PREDICTED_LABEL_INDEX_COL) + .setOutputCol(PREDICTED_LABEL_COL) + .setLabels(labels) + val pipeline = new Pipeline() + .setStages(Array(rFormula, naiveBayes, idxToStr)) + .fit(data) + new NaiveBayesWrapper(pipeline, labels, features) + } +} |