aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorYanbo Liang <ybliang8@gmail.com>2016-11-13 20:25:12 -0800
committerYanbo Liang <ybliang8@gmail.com>2016-11-13 20:25:12 -0800
commit07be232ea12dfc8dc3701ca948814be7dbebf4ee (patch)
tree856a17d84397a6205c6cc5c83c25c8cdfe09d21b /mllib
parentb91a51bb231af321860415075a7f404bc46e0a74 (diff)
downloadspark-07be232ea12dfc8dc3701ca948814be7dbebf4ee.tar.gz
spark-07be232ea12dfc8dc3701ca948814be7dbebf4ee.tar.bz2
spark-07be232ea12dfc8dc3701ca948814be7dbebf4ee.zip
[SPARK-18412][SPARKR][ML] Fix exception for some SparkR ML algorithms training on libsvm data
## What changes were proposed in this pull request? * Fix the following exceptions which throws when ```spark.randomForest```(classification), ```spark.gbt```(classification), ```spark.naiveBayes``` and ```spark.glm```(binomial family) were fitted on libsvm data. ``` java.lang.IllegalArgumentException: requirement failed: If label column already exists, forceIndexLabel can not be set with true. ``` See [SPARK-18412](https://issues.apache.org/jira/browse/SPARK-18412) for more detail about how to reproduce this bug. * Refactor out ```getFeaturesAndLabels``` to RWrapperUtils, since lots of ML algorithm wrappers use this function. * Drop some unwanted columns when making prediction. ## How was this patch tested? Add unit test. Author: Yanbo Liang <ybliang8@gmail.com> Closes #15851 from yanboliang/spark-18412.
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/r/GBTClassificationWrapper.scala18
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala5
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala14
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/r/RWrapperUtils.scala36
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/r/RandomForestClassificationWrapper.scala18
5 files changed, 53 insertions, 38 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/GBTClassificationWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/GBTClassificationWrapper.scala
index 8946025032..aacb41ee26 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/GBTClassificationWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/GBTClassificationWrapper.scala
@@ -23,10 +23,10 @@ import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import org.apache.spark.ml.{Pipeline, PipelineModel}
-import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NominalAttribute}
import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier}
import org.apache.spark.ml.feature.{IndexToString, RFormula}
import org.apache.spark.ml.linalg.Vector
+import org.apache.spark.ml.r.RWrapperUtils._
import org.apache.spark.ml.util._
import org.apache.spark.sql.{DataFrame, Dataset}
@@ -51,6 +51,7 @@ private[r] class GBTClassifierWrapper private (
pipeline.transform(dataset)
.drop(PREDICTED_LABEL_INDEX_COL)
.drop(gbtcModel.getFeaturesCol)
+ .drop(gbtcModel.getLabelCol)
}
override def write: MLWriter = new
@@ -81,19 +82,11 @@ private[r] object GBTClassifierWrapper extends MLReadable[GBTClassifierWrapper]
val rFormula = new RFormula()
.setFormula(formula)
.setForceIndexLabel(true)
- RWrapperUtils.checkDataColumns(rFormula, data)
+ checkDataColumns(rFormula, data)
val rFormulaModel = rFormula.fit(data)
- // get feature names from output schema
- val schema = rFormulaModel.transform(data).schema
- val featureAttrs = AttributeGroup.fromStructField(schema(rFormulaModel.getFeaturesCol))
- .attributes.get
- val features = featureAttrs.map(_.name.get)
-
- // get label names from output schema
- val labelAttr = Attribute.fromStructField(schema(rFormulaModel.getLabelCol))
- .asInstanceOf[NominalAttribute]
- val labels = labelAttr.values.get
+ // get labels and feature names from output schema
+ val (features, labels) = getFeaturesAndLabels(rFormulaModel, data)
// assemble and fit the pipeline
val rfc = new GBTClassifier()
@@ -109,6 +102,7 @@ private[r] object GBTClassifierWrapper extends MLReadable[GBTClassifierWrapper]
.setMaxMemoryInMB(maxMemoryInMB)
.setCacheNodeIds(cacheNodeIds)
.setFeaturesCol(rFormula.getFeaturesCol)
+ .setLabelCol(rFormula.getLabelCol)
.setPredictionCol(PREDICTED_LABEL_INDEX_COL)
if (seed != null && seed.length > 0) rfc.setSeed(seed.toLong)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
index 995b1ef03b..add4d49110 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
@@ -29,6 +29,7 @@ import org.apache.spark.ml.regression._
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.param.shared._
+import org.apache.spark.ml.r.RWrapperUtils._
import org.apache.spark.ml.util._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
@@ -64,6 +65,7 @@ private[r] class GeneralizedLinearRegressionWrapper private (
.drop(PREDICTED_LABEL_PROB_COL)
.drop(PREDICTED_LABEL_INDEX_COL)
.drop(glm.getFeaturesCol)
+ .drop(glm.getLabelCol)
} else {
pipeline.transform(dataset)
.drop(glm.getFeaturesCol)
@@ -92,7 +94,7 @@ private[r] object GeneralizedLinearRegressionWrapper
regParam: Double): GeneralizedLinearRegressionWrapper = {
val rFormula = new RFormula().setFormula(formula)
if (family == "binomial") rFormula.setForceIndexLabel(true)
- RWrapperUtils.checkDataColumns(rFormula, data)
+ checkDataColumns(rFormula, data)
val rFormulaModel = rFormula.fit(data)
// get labels and feature names from output schema
val schema = rFormulaModel.transform(data).schema
@@ -109,6 +111,7 @@ private[r] object GeneralizedLinearRegressionWrapper
.setWeightCol(weightCol)
.setRegParam(regParam)
.setFeaturesCol(rFormula.getFeaturesCol)
+ .setLabelCol(rFormula.getLabelCol)
val pipeline = if (family == "binomial") {
// Convert prediction from probability to label index.
val probToPred = new ProbabilityToPrediction()
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala
index 4fdab2dd94..0afea4be3d 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala
@@ -23,9 +23,9 @@ import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import org.apache.spark.ml.{Pipeline, PipelineModel}
-import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NominalAttribute}
import org.apache.spark.ml.classification.{NaiveBayes, NaiveBayesModel}
import org.apache.spark.ml.feature.{IndexToString, RFormula}
+import org.apache.spark.ml.r.RWrapperUtils._
import org.apache.spark.ml.util._
import org.apache.spark.sql.{DataFrame, Dataset}
@@ -46,6 +46,7 @@ private[r] class NaiveBayesWrapper private (
pipeline.transform(dataset)
.drop(PREDICTED_LABEL_INDEX_COL)
.drop(naiveBayesModel.getFeaturesCol)
+ .drop(naiveBayesModel.getLabelCol)
}
override def write: MLWriter = new NaiveBayesWrapper.NaiveBayesWrapperWriter(this)
@@ -60,21 +61,16 @@ private[r] object NaiveBayesWrapper extends MLReadable[NaiveBayesWrapper] {
val rFormula = new RFormula()
.setFormula(formula)
.setForceIndexLabel(true)
- RWrapperUtils.checkDataColumns(rFormula, data)
+ checkDataColumns(rFormula, data)
val rFormulaModel = rFormula.fit(data)
// get labels and feature names from output schema
- val schema = rFormulaModel.transform(data).schema
- val labelAttr = Attribute.fromStructField(schema(rFormulaModel.getLabelCol))
- .asInstanceOf[NominalAttribute]
- val labels = labelAttr.values.get
- val featureAttrs = AttributeGroup.fromStructField(schema(rFormulaModel.getFeaturesCol))
- .attributes.get
- val features = featureAttrs.map(_.name.get)
+ val (features, labels) = getFeaturesAndLabels(rFormulaModel, data)
// assemble and fit the pipeline
val naiveBayes = new NaiveBayes()
.setSmoothing(smoothing)
.setModelType("bernoulli")
.setFeaturesCol(rFormula.getFeaturesCol)
+ .setLabelCol(rFormula.getLabelCol)
.setPredictionCol(PREDICTED_LABEL_INDEX_COL)
val idxToStr = new IndexToString()
.setInputCol(PREDICTED_LABEL_INDEX_COL)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/RWrapperUtils.scala b/mllib/src/main/scala/org/apache/spark/ml/r/RWrapperUtils.scala
index 379007c4d9..665e50af67 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/RWrapperUtils.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/RWrapperUtils.scala
@@ -18,11 +18,12 @@
package org.apache.spark.ml.r
import org.apache.spark.internal.Logging
-import org.apache.spark.ml.feature.RFormula
+import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NominalAttribute}
+import org.apache.spark.ml.feature.{RFormula, RFormulaModel}
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.Dataset
-object RWrapperUtils extends Logging {
+private[r] object RWrapperUtils extends Logging {
/**
* DataFrame column check.
@@ -32,14 +33,41 @@ object RWrapperUtils extends Logging {
*
* @param rFormula RFormula instance
* @param data Input dataset
- * @return Unit
*/
def checkDataColumns(rFormula: RFormula, data: Dataset[_]): Unit = {
if (data.schema.fieldNames.contains(rFormula.getFeaturesCol)) {
val newFeaturesName = s"${Identifiable.randomUID(rFormula.getFeaturesCol)}"
- logWarning(s"data containing ${rFormula.getFeaturesCol} column, " +
+ logInfo(s"data containing ${rFormula.getFeaturesCol} column, " +
s"using new name $newFeaturesName instead")
rFormula.setFeaturesCol(newFeaturesName)
}
+
+ if (rFormula.getForceIndexLabel && data.schema.fieldNames.contains(rFormula.getLabelCol)) {
+ val newLabelName = s"${Identifiable.randomUID(rFormula.getLabelCol)}"
+ logInfo(s"data containing ${rFormula.getLabelCol} column and we force to index label, " +
+ s"using new name $newLabelName instead")
+ rFormula.setLabelCol(newLabelName)
+ }
+ }
+
+ /**
+ * Get the feature names and original labels from the schema
+ * of DataFrame transformed by RFormulaModel.
+ *
+ * @param rFormulaModel The RFormulaModel instance.
+ * @param data Input dataset.
+ * @return The feature names and original labels.
+ */
+ def getFeaturesAndLabels(
+ rFormulaModel: RFormulaModel,
+ data: Dataset[_]): (Array[String], Array[String]) = {
+ val schema = rFormulaModel.transform(data).schema
+ val featureAttrs = AttributeGroup.fromStructField(schema(rFormulaModel.getFeaturesCol))
+ .attributes.get
+ val features = featureAttrs.map(_.name.get)
+ val labelAttr = Attribute.fromStructField(schema(rFormulaModel.getLabelCol))
+ .asInstanceOf[NominalAttribute]
+ val labels = labelAttr.values.get
+ (features, labels)
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestClassificationWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestClassificationWrapper.scala
index 31f846dc6c..0b860e5af9 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestClassificationWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestClassificationWrapper.scala
@@ -23,10 +23,10 @@ import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import org.apache.spark.ml.{Pipeline, PipelineModel}
-import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NominalAttribute}
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.feature.{IndexToString, RFormula}
import org.apache.spark.ml.linalg.Vector
+import org.apache.spark.ml.r.RWrapperUtils._
import org.apache.spark.ml.util._
import org.apache.spark.sql.{DataFrame, Dataset}
@@ -51,6 +51,7 @@ private[r] class RandomForestClassifierWrapper private (
pipeline.transform(dataset)
.drop(PREDICTED_LABEL_INDEX_COL)
.drop(rfcModel.getFeaturesCol)
+ .drop(rfcModel.getLabelCol)
}
override def write: MLWriter = new
@@ -82,19 +83,11 @@ private[r] object RandomForestClassifierWrapper extends MLReadable[RandomForestC
val rFormula = new RFormula()
.setFormula(formula)
.setForceIndexLabel(true)
- RWrapperUtils.checkDataColumns(rFormula, data)
+ checkDataColumns(rFormula, data)
val rFormulaModel = rFormula.fit(data)
- // get feature names from output schema
- val schema = rFormulaModel.transform(data).schema
- val featureAttrs = AttributeGroup.fromStructField(schema(rFormulaModel.getFeaturesCol))
- .attributes.get
- val features = featureAttrs.map(_.name.get)
-
- // get label names from output schema
- val labelAttr = Attribute.fromStructField(schema(rFormulaModel.getLabelCol))
- .asInstanceOf[NominalAttribute]
- val labels = labelAttr.values.get
+ // get labels and feature names from output schema
+ val (features, labels) = getFeaturesAndLabels(rFormulaModel, data)
// assemble and fit the pipeline
val rfc = new RandomForestClassifier()
@@ -111,6 +104,7 @@ private[r] object RandomForestClassifierWrapper extends MLReadable[RandomForestC
.setCacheNodeIds(cacheNodeIds)
.setProbabilityCol(probabilityCol)
.setFeaturesCol(rFormula.getFeaturesCol)
+ .setLabelCol(rFormula.getLabelCol)
.setPredictionCol(PREDICTED_LABEL_INDEX_COL)
if (seed != null && seed.length > 0) rfc.setSeed(seed.toLong)