diff options
author | Eric Liang <ekl@databricks.com> | 2015-07-27 17:17:49 -0700 |
---|---|---|
committer | Xiangrui Meng <meng@databricks.com> | 2015-07-27 17:17:49 -0700 |
commit | 8ddfa52c208bf329c2b2c8909c6be04301e36083 (patch) | |
tree | e8482d5cee69d187b7f30c5807766c90539e518c /mllib/src/main | |
parent | dafe8d857dff4c61981476282cbfe11f5c008078 (diff) | |
download | spark-8ddfa52c208bf329c2b2c8909c6be04301e36083.tar.gz spark-8ddfa52c208bf329c2b2c8909c6be04301e36083.tar.bz2 spark-8ddfa52c208bf329c2b2c8909c6be04301e36083.zip |
[SPARK-9230] [ML] Support StringType features in RFormula
This adds StringType feature support via OneHotEncoder. As part of this task it was necessary to change RFormula to an Estimator, so that factor levels could be determined from the training dataset.
Not sure if I am using uids correctly here, would be good to get reviewer help on that.
cc mengxr
Umbrella design doc: https://docs.google.com/document/d/10NZNSEurN2EdWM31uFYsgayIPfCFHiuIu3pCWrUmP_c/edit#
Author: Eric Liang <ekl@databricks.com>
Closes #7574 from ericl/string-features and squashes the following commits:
f99131a [Eric Liang] comments
0bf3c26 [Eric Liang] update docs
c302a2c [Eric Liang] fix tests
9d1ac82 [Eric Liang] Merge remote-tracking branch 'upstream/master' into string-features
e713da3 [Eric Liang] comments
4d79193 [Eric Liang] revert to seq + distinct
169a085 [Eric Liang] tweak functional test
a230a47 [Eric Liang] Merge branch 'master' into string-features
72bd6f3 [Eric Liang] fix merge
d841cec [Eric Liang] Merge branch 'master' into string-features
5b2c4a2 [Eric Liang] Mon Jul 20 18:45:33 PDT 2015
b01c7c5 [Eric Liang] add test
8a637db [Eric Liang] encoder wip
a1d03f4 [Eric Liang] refactor into estimator
Diffstat (limited to 'mllib/src/main')
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala | 133 |
1 files changed, 103 insertions, 30 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala index f7b46efa10..0a95b1ee8d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala @@ -17,26 +17,42 @@ package org.apache.spark.ml.feature +import scala.collection.mutable.ArrayBuffer import scala.util.parsing.combinator.RegexParsers import org.apache.spark.annotation.Experimental -import org.apache.spark.ml.Transformer +import org.apache.spark.ml.{Estimator, Model, Transformer, Pipeline, PipelineModel, PipelineStage} import org.apache.spark.ml.param.{Param, ParamMap} import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasLabelCol} import org.apache.spark.ml.util.Identifiable +import org.apache.spark.mllib.linalg.VectorUDT import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ /** + * Base trait for [[RFormula]] and [[RFormulaModel]]. + */ +private[feature] trait RFormulaBase extends HasFeaturesCol with HasLabelCol { + /** @group getParam */ + def setFeaturesCol(value: String): this.type = set(featuresCol, value) + + /** @group getParam */ + def setLabelCol(value: String): this.type = set(labelCol, value) + + protected def hasLabelCol(schema: StructType): Boolean = { + schema.map(_.name).contains($(labelCol)) + } +} + +/** * :: Experimental :: * Implements the transforms required for fitting a dataset against an R model formula. Currently * we support a limited subset of the R operators, including '~' and '+'. Also see the R formula * docs here: http://stat.ethz.ch/R-manual/R-patched/library/stats/html/formula.html */ @Experimental -class RFormula(override val uid: String) - extends Transformer with HasFeaturesCol with HasLabelCol { +class RFormula(override val uid: String) extends Estimator[RFormulaModel] with RFormulaBase { def this() = this(Identifiable.randomUID("rFormula")) @@ -62,19 +78,74 @@ class RFormula(override val uid: String) /** @group getParam */ def getFormula: String = $(formula) - /** @group getParam */ - def setFeaturesCol(value: String): this.type = set(featuresCol, value) + override def fit(dataset: DataFrame): RFormulaModel = { + require(parsedFormula.isDefined, "Must call setFormula() first.") + // StringType terms and terms representing interactions need to be encoded before assembly. + // TODO(ekl) add support for feature interactions + var encoderStages = ArrayBuffer[PipelineStage]() + var tempColumns = ArrayBuffer[String]() + val encodedTerms = parsedFormula.get.terms.map { term => + dataset.schema(term) match { + case column if column.dataType == StringType => + val indexCol = term + "_idx_" + uid + val encodedCol = term + "_onehot_" + uid + encoderStages += new StringIndexer().setInputCol(term).setOutputCol(indexCol) + encoderStages += new OneHotEncoder().setInputCol(indexCol).setOutputCol(encodedCol) + tempColumns += indexCol + tempColumns += encodedCol + encodedCol + case _ => + term + } + } + encoderStages += new VectorAssembler(uid) + .setInputCols(encodedTerms.toArray) + .setOutputCol($(featuresCol)) + encoderStages += new ColumnPruner(tempColumns.toSet) + val pipelineModel = new Pipeline(uid).setStages(encoderStages.toArray).fit(dataset) + copyValues(new RFormulaModel(uid, parsedFormula.get, pipelineModel).setParent(this)) + } - /** @group getParam */ - def setLabelCol(value: String): this.type = set(labelCol, value) + // optimistic schema; does not contain any ML attributes + override def transformSchema(schema: StructType): StructType = { + if (hasLabelCol(schema)) { + StructType(schema.fields :+ StructField($(featuresCol), new VectorUDT, true)) + } else { + StructType(schema.fields :+ StructField($(featuresCol), new VectorUDT, true) :+ + StructField($(labelCol), DoubleType, true)) + } + } + + override def copy(extra: ParamMap): RFormula = defaultCopy(extra) + + override def toString: String = s"RFormula(${get(formula)})" +} + +/** + * :: Experimental :: + * A fitted RFormula. Fitting is required to determine the factor levels of formula terms. + * @param parsedFormula a pre-parsed R formula. + * @param pipelineModel the fitted feature model, including factor to index mappings. + */ +@Experimental +class RFormulaModel private[feature]( + override val uid: String, + parsedFormula: ParsedRFormula, + pipelineModel: PipelineModel) + extends Model[RFormulaModel] with RFormulaBase { + + override def transform(dataset: DataFrame): DataFrame = { + checkCanTransform(dataset.schema) + transformLabel(pipelineModel.transform(dataset)) + } override def transformSchema(schema: StructType): StructType = { checkCanTransform(schema) - val withFeatures = transformFeatures.transformSchema(schema) + val withFeatures = pipelineModel.transformSchema(schema) if (hasLabelCol(schema)) { withFeatures - } else if (schema.exists(_.name == parsedFormula.get.label)) { - val nullable = schema(parsedFormula.get.label).dataType match { + } else if (schema.exists(_.name == parsedFormula.label)) { + val nullable = schema(parsedFormula.label).dataType match { case _: NumericType | BooleanType => false case _ => true } @@ -86,24 +157,19 @@ class RFormula(override val uid: String) } } - override def transform(dataset: DataFrame): DataFrame = { - checkCanTransform(dataset.schema) - transformLabel(transformFeatures.transform(dataset)) - } - - override def copy(extra: ParamMap): RFormula = defaultCopy(extra) + override def copy(extra: ParamMap): RFormulaModel = copyValues( + new RFormulaModel(uid, parsedFormula, pipelineModel)) - override def toString: String = s"RFormula(${get(formula)})" + override def toString: String = s"RFormulaModel(${parsedFormula})" private def transformLabel(dataset: DataFrame): DataFrame = { - val labelName = parsedFormula.get.label + val labelName = parsedFormula.label if (hasLabelCol(dataset.schema)) { dataset } else if (dataset.schema.exists(_.name == labelName)) { dataset.schema(labelName).dataType match { case _: NumericType | BooleanType => dataset.withColumn($(labelCol), dataset(labelName).cast(DoubleType)) - // TODO(ekl) add support for string-type labels case other => throw new IllegalArgumentException("Unsupported type for label: " + other) } @@ -114,25 +180,32 @@ class RFormula(override val uid: String) } } - private def transformFeatures: Transformer = { - // TODO(ekl) add support for non-numeric features and feature interactions - new VectorAssembler(uid) - .setInputCols(parsedFormula.get.terms.toArray) - .setOutputCol($(featuresCol)) - } - private def checkCanTransform(schema: StructType) { - require(parsedFormula.isDefined, "Must call setFormula() first.") val columnNames = schema.map(_.name) require(!columnNames.contains($(featuresCol)), "Features column already exists.") require( !columnNames.contains($(labelCol)) || schema($(labelCol)).dataType == DoubleType, "Label column already exists and is not of type DoubleType.") } +} - private def hasLabelCol(schema: StructType): Boolean = { - schema.map(_.name).contains($(labelCol)) +/** + * Utility transformer for removing temporary columns from a DataFrame. + * TODO(ekl) make this a public transformer + */ +private class ColumnPruner(columnsToPrune: Set[String]) extends Transformer { + override val uid = Identifiable.randomUID("columnPruner") + + override def transform(dataset: DataFrame): DataFrame = { + val columnsToKeep = dataset.columns.filter(!columnsToPrune.contains(_)) + dataset.select(columnsToKeep.map(dataset.col) : _*) } + + override def transformSchema(schema: StructType): StructType = { + StructType(schema.fields.filter(col => !columnsToPrune.contains(col.name))) + } + + override def copy(extra: ParamMap): ColumnPruner = defaultCopy(extra) } /** @@ -149,7 +222,7 @@ private[ml] object RFormulaParser extends RegexParsers { def expr: Parser[List[String]] = term ~ rep("+" ~> term) ^^ { case a ~ list => a :: list } def formula: Parser[ParsedRFormula] = - (term ~ "~" ~ expr) ^^ { case r ~ "~" ~ t => ParsedRFormula(r, t) } + (term ~ "~" ~ expr) ^^ { case r ~ "~" ~ t => ParsedRFormula(r, t.distinct) } def parse(value: String): ParsedRFormula = parseAll(formula, value) match { case Success(result, _) => result |