aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--R/pkg/inst/tests/testthat/test_mllib.R18
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/r/GBTClassificationWrapper.scala18
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala5
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala14
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/r/RWrapperUtils.scala36
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/r/RandomForestClassificationWrapper.scala18
6 files changed, 68 insertions, 41 deletions
diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R
index b76f75dbdc..07df4b6d6f 100644
--- a/R/pkg/inst/tests/testthat/test_mllib.R
+++ b/R/pkg/inst/tests/testthat/test_mllib.R
@@ -881,7 +881,8 @@ test_that("spark.kstest", {
expect_match(capture.output(stats)[1], "Kolmogorov-Smirnov test summary:")
})
-test_that("spark.randomForest Regression", {
+test_that("spark.randomForest", {
+ # regression
data <- suppressWarnings(createDataFrame(longley))
model <- spark.randomForest(data, Employed ~ ., "regression", maxDepth = 5, maxBins = 16,
numTrees = 1)
@@ -923,9 +924,8 @@ test_that("spark.randomForest Regression", {
expect_equal(stats$treeWeights, stats2$treeWeights)
unlink(modelPath)
-})
-test_that("spark.randomForest Classification", {
+ # classification
data <- suppressWarnings(createDataFrame(iris))
model <- spark.randomForest(data, Species ~ Petal_Length + Petal_Width, "classification",
maxDepth = 5, maxBins = 16)
@@ -971,6 +971,12 @@ test_that("spark.randomForest Classification", {
predictions <- collect(predict(model, data))$prediction
expect_equal(length(grep("1.0", predictions)), 50)
expect_equal(length(grep("2.0", predictions)), 50)
+
+ # spark.randomForest classification can work on libsvm data
+ data <- read.df(absoluteSparkPath("data/mllib/sample_multiclass_classification_data.txt"),
+ source = "libsvm")
+ model <- spark.randomForest(data, label ~ features, "classification")
+ expect_equal(summary(model)$numFeatures, 4)
})
test_that("spark.gbt", {
@@ -1039,6 +1045,12 @@ test_that("spark.gbt", {
expect_equal(iris2$NumericSpecies, as.double(collect(predict(m, df))$prediction))
expect_equal(s$numFeatures, 5)
expect_equal(s$numTrees, 20)
+
+ # spark.gbt classification can work on libsvm data
+ data <- read.df(absoluteSparkPath("data/mllib/sample_binary_classification_data.txt"),
+ source = "libsvm")
+ model <- spark.gbt(data, label ~ features, "classification")
+ expect_equal(summary(model)$numFeatures, 692)
})
sparkR.session.stop()
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/GBTClassificationWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/GBTClassificationWrapper.scala
index 8946025032..aacb41ee26 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/GBTClassificationWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/GBTClassificationWrapper.scala
@@ -23,10 +23,10 @@ import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import org.apache.spark.ml.{Pipeline, PipelineModel}
-import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NominalAttribute}
import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier}
import org.apache.spark.ml.feature.{IndexToString, RFormula}
import org.apache.spark.ml.linalg.Vector
+import org.apache.spark.ml.r.RWrapperUtils._
import org.apache.spark.ml.util._
import org.apache.spark.sql.{DataFrame, Dataset}
@@ -51,6 +51,7 @@ private[r] class GBTClassifierWrapper private (
pipeline.transform(dataset)
.drop(PREDICTED_LABEL_INDEX_COL)
.drop(gbtcModel.getFeaturesCol)
+ .drop(gbtcModel.getLabelCol)
}
override def write: MLWriter = new
@@ -81,19 +82,11 @@ private[r] object GBTClassifierWrapper extends MLReadable[GBTClassifierWrapper]
val rFormula = new RFormula()
.setFormula(formula)
.setForceIndexLabel(true)
- RWrapperUtils.checkDataColumns(rFormula, data)
+ checkDataColumns(rFormula, data)
val rFormulaModel = rFormula.fit(data)
- // get feature names from output schema
- val schema = rFormulaModel.transform(data).schema
- val featureAttrs = AttributeGroup.fromStructField(schema(rFormulaModel.getFeaturesCol))
- .attributes.get
- val features = featureAttrs.map(_.name.get)
-
- // get label names from output schema
- val labelAttr = Attribute.fromStructField(schema(rFormulaModel.getLabelCol))
- .asInstanceOf[NominalAttribute]
- val labels = labelAttr.values.get
+ // get labels and feature names from output schema
+ val (features, labels) = getFeaturesAndLabels(rFormulaModel, data)
// assemble and fit the pipeline
val rfc = new GBTClassifier()
@@ -109,6 +102,7 @@ private[r] object GBTClassifierWrapper extends MLReadable[GBTClassifierWrapper]
.setMaxMemoryInMB(maxMemoryInMB)
.setCacheNodeIds(cacheNodeIds)
.setFeaturesCol(rFormula.getFeaturesCol)
+ .setLabelCol(rFormula.getLabelCol)
.setPredictionCol(PREDICTED_LABEL_INDEX_COL)
if (seed != null && seed.length > 0) rfc.setSeed(seed.toLong)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
index 995b1ef03b..add4d49110 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
@@ -29,6 +29,7 @@ import org.apache.spark.ml.regression._
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.param.shared._
+import org.apache.spark.ml.r.RWrapperUtils._
import org.apache.spark.ml.util._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
@@ -64,6 +65,7 @@ private[r] class GeneralizedLinearRegressionWrapper private (
.drop(PREDICTED_LABEL_PROB_COL)
.drop(PREDICTED_LABEL_INDEX_COL)
.drop(glm.getFeaturesCol)
+ .drop(glm.getLabelCol)
} else {
pipeline.transform(dataset)
.drop(glm.getFeaturesCol)
@@ -92,7 +94,7 @@ private[r] object GeneralizedLinearRegressionWrapper
regParam: Double): GeneralizedLinearRegressionWrapper = {
val rFormula = new RFormula().setFormula(formula)
if (family == "binomial") rFormula.setForceIndexLabel(true)
- RWrapperUtils.checkDataColumns(rFormula, data)
+ checkDataColumns(rFormula, data)
val rFormulaModel = rFormula.fit(data)
// get labels and feature names from output schema
val schema = rFormulaModel.transform(data).schema
@@ -109,6 +111,7 @@ private[r] object GeneralizedLinearRegressionWrapper
.setWeightCol(weightCol)
.setRegParam(regParam)
.setFeaturesCol(rFormula.getFeaturesCol)
+ .setLabelCol(rFormula.getLabelCol)
val pipeline = if (family == "binomial") {
// Convert prediction from probability to label index.
val probToPred = new ProbabilityToPrediction()
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala
index 4fdab2dd94..0afea4be3d 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala
@@ -23,9 +23,9 @@ import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import org.apache.spark.ml.{Pipeline, PipelineModel}
-import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NominalAttribute}
import org.apache.spark.ml.classification.{NaiveBayes, NaiveBayesModel}
import org.apache.spark.ml.feature.{IndexToString, RFormula}
+import org.apache.spark.ml.r.RWrapperUtils._
import org.apache.spark.ml.util._
import org.apache.spark.sql.{DataFrame, Dataset}
@@ -46,6 +46,7 @@ private[r] class NaiveBayesWrapper private (
pipeline.transform(dataset)
.drop(PREDICTED_LABEL_INDEX_COL)
.drop(naiveBayesModel.getFeaturesCol)
+ .drop(naiveBayesModel.getLabelCol)
}
override def write: MLWriter = new NaiveBayesWrapper.NaiveBayesWrapperWriter(this)
@@ -60,21 +61,16 @@ private[r] object NaiveBayesWrapper extends MLReadable[NaiveBayesWrapper] {
val rFormula = new RFormula()
.setFormula(formula)
.setForceIndexLabel(true)
- RWrapperUtils.checkDataColumns(rFormula, data)
+ checkDataColumns(rFormula, data)
val rFormulaModel = rFormula.fit(data)
// get labels and feature names from output schema
- val schema = rFormulaModel.transform(data).schema
- val labelAttr = Attribute.fromStructField(schema(rFormulaModel.getLabelCol))
- .asInstanceOf[NominalAttribute]
- val labels = labelAttr.values.get
- val featureAttrs = AttributeGroup.fromStructField(schema(rFormulaModel.getFeaturesCol))
- .attributes.get
- val features = featureAttrs.map(_.name.get)
+ val (features, labels) = getFeaturesAndLabels(rFormulaModel, data)
// assemble and fit the pipeline
val naiveBayes = new NaiveBayes()
.setSmoothing(smoothing)
.setModelType("bernoulli")
.setFeaturesCol(rFormula.getFeaturesCol)
+ .setLabelCol(rFormula.getLabelCol)
.setPredictionCol(PREDICTED_LABEL_INDEX_COL)
val idxToStr = new IndexToString()
.setInputCol(PREDICTED_LABEL_INDEX_COL)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/RWrapperUtils.scala b/mllib/src/main/scala/org/apache/spark/ml/r/RWrapperUtils.scala
index 379007c4d9..665e50af67 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/RWrapperUtils.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/RWrapperUtils.scala
@@ -18,11 +18,12 @@
package org.apache.spark.ml.r
import org.apache.spark.internal.Logging
-import org.apache.spark.ml.feature.RFormula
+import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NominalAttribute}
+import org.apache.spark.ml.feature.{RFormula, RFormulaModel}
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.Dataset
-object RWrapperUtils extends Logging {
+private[r] object RWrapperUtils extends Logging {
/**
* DataFrame column check.
@@ -32,14 +33,41 @@ object RWrapperUtils extends Logging {
*
* @param rFormula RFormula instance
* @param data Input dataset
- * @return Unit
*/
def checkDataColumns(rFormula: RFormula, data: Dataset[_]): Unit = {
if (data.schema.fieldNames.contains(rFormula.getFeaturesCol)) {
val newFeaturesName = s"${Identifiable.randomUID(rFormula.getFeaturesCol)}"
- logWarning(s"data containing ${rFormula.getFeaturesCol} column, " +
+ logInfo(s"data containing ${rFormula.getFeaturesCol} column, " +
s"using new name $newFeaturesName instead")
rFormula.setFeaturesCol(newFeaturesName)
}
+
+ if (rFormula.getForceIndexLabel && data.schema.fieldNames.contains(rFormula.getLabelCol)) {
+ val newLabelName = s"${Identifiable.randomUID(rFormula.getLabelCol)}"
+ logInfo(s"data containing ${rFormula.getLabelCol} column and we force to index label, " +
+ s"using new name $newLabelName instead")
+ rFormula.setLabelCol(newLabelName)
+ }
+ }
+
+ /**
+ * Get the feature names and original labels from the schema
+ * of DataFrame transformed by RFormulaModel.
+ *
+ * @param rFormulaModel The RFormulaModel instance.
+ * @param data Input dataset.
+ * @return The feature names and original labels.
+ */
+ def getFeaturesAndLabels(
+ rFormulaModel: RFormulaModel,
+ data: Dataset[_]): (Array[String], Array[String]) = {
+ val schema = rFormulaModel.transform(data).schema
+ val featureAttrs = AttributeGroup.fromStructField(schema(rFormulaModel.getFeaturesCol))
+ .attributes.get
+ val features = featureAttrs.map(_.name.get)
+ val labelAttr = Attribute.fromStructField(schema(rFormulaModel.getLabelCol))
+ .asInstanceOf[NominalAttribute]
+ val labels = labelAttr.values.get
+ (features, labels)
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestClassificationWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestClassificationWrapper.scala
index 31f846dc6c..0b860e5af9 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestClassificationWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestClassificationWrapper.scala
@@ -23,10 +23,10 @@ import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import org.apache.spark.ml.{Pipeline, PipelineModel}
-import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NominalAttribute}
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.feature.{IndexToString, RFormula}
import org.apache.spark.ml.linalg.Vector
+import org.apache.spark.ml.r.RWrapperUtils._
import org.apache.spark.ml.util._
import org.apache.spark.sql.{DataFrame, Dataset}
@@ -51,6 +51,7 @@ private[r] class RandomForestClassifierWrapper private (
pipeline.transform(dataset)
.drop(PREDICTED_LABEL_INDEX_COL)
.drop(rfcModel.getFeaturesCol)
+ .drop(rfcModel.getLabelCol)
}
override def write: MLWriter = new
@@ -82,19 +83,11 @@ private[r] object RandomForestClassifierWrapper extends MLReadable[RandomForestC
val rFormula = new RFormula()
.setFormula(formula)
.setForceIndexLabel(true)
- RWrapperUtils.checkDataColumns(rFormula, data)
+ checkDataColumns(rFormula, data)
val rFormulaModel = rFormula.fit(data)
- // get feature names from output schema
- val schema = rFormulaModel.transform(data).schema
- val featureAttrs = AttributeGroup.fromStructField(schema(rFormulaModel.getFeaturesCol))
- .attributes.get
- val features = featureAttrs.map(_.name.get)
-
- // get label names from output schema
- val labelAttr = Attribute.fromStructField(schema(rFormulaModel.getLabelCol))
- .asInstanceOf[NominalAttribute]
- val labels = labelAttr.values.get
+ // get labels and feature names from output schema
+ val (features, labels) = getFeaturesAndLabels(rFormulaModel, data)
// assemble and fit the pipeline
val rfc = new RandomForestClassifier()
@@ -111,6 +104,7 @@ private[r] object RandomForestClassifierWrapper extends MLReadable[RandomForestC
.setCacheNodeIds(cacheNodeIds)
.setProbabilityCol(probabilityCol)
.setFeaturesCol(rFormula.getFeaturesCol)
+ .setLabelCol(rFormula.getLabelCol)
.setPredictionCol(PREDICTED_LABEL_INDEX_COL)
if (seed != null && seed.length > 0) rfc.setSeed(seed.toLong)