aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVinceShieh <vincent.xie@intel.com>2017-03-07 11:24:20 -0800
committerJoseph K. Bradley <joseph@databricks.com>2017-03-07 11:24:20 -0800
commit4a9034b17374cf19c77cb74e36c86cd085d59602 (patch)
tree1bb589f4efd445295897173aff2146ecfec425f8
parentc05baabf10dd4c808929b4ae7a6d118aba6dd665 (diff)
downloadspark-4a9034b17374cf19c77cb74e36c86cd085d59602.tar.gz
spark-4a9034b17374cf19c77cb74e36c86cd085d59602.tar.bz2
spark-4a9034b17374cf19c77cb74e36c86cd085d59602.zip
[SPARK-17498][ML] StringIndexer enhancement for handling unseen labels
## What changes were proposed in this pull request? This PR is an enhancement to ML StringIndexer. Before this PR, String Indexer only supports "skip"/"error" options to deal with unseen records. But those unseen records might still be useful and user would like to keep the unseen labels in certain use cases, This PR enables StringIndexer to support keeping unseen labels as indices [numLabels]. '''Before StringIndexer().setHandleInvalid("skip") StringIndexer().setHandleInvalid("error") '''After support the third option "keep" StringIndexer().setHandleInvalid("keep") ## How was this patch tested? Test added in StringIndexerSuite Signed-off-by: VinceShieh <vincent.xieintel.com> (Please fill in changes proposed in this fix) Author: VinceShieh <vincent.xie@intel.com> Closes #16883 from VinceShieh/spark-17498.
-rw-r--r--docs/ml-features.md22
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala65
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala34
-rw-r--r--project/MimaExcludes.scala4
4 files changed, 95 insertions, 30 deletions
diff --git a/docs/ml-features.md b/docs/ml-features.md
index 57605bafbf..dad1c6db18 100644
--- a/docs/ml-features.md
+++ b/docs/ml-features.md
@@ -503,6 +503,7 @@ for more details on the API.
`StringIndexer` encodes a string column of labels to a column of label indices.
The indices are in `[0, numLabels)`, ordered by label frequencies, so the most frequent label gets index `0`.
+The unseen labels will be put at index numLabels if user chooses to keep them.
If the input column is numeric, we cast it to string and index the string
values. When downstream pipeline components such as `Estimator` or
`Transformer` make use of this string-indexed label, you must set the input
@@ -542,12 +543,13 @@ column, we should get the following:
"a" gets index `0` because it is the most frequent, followed by "c" with index `1` and "b" with
index `2`.
-Additionally, there are two strategies regarding how `StringIndexer` will handle
+Additionally, there are three strategies regarding how `StringIndexer` will handle
unseen labels when you have fit a `StringIndexer` on one dataset and then use it
to transform another:
- throw an exception (which is the default)
- skip the row containing the unseen label entirely
+- put unseen labels in a special additional bucket, at index numLabels
**Examples**
@@ -561,6 +563,7 @@ Let's go back to our previous example but this time reuse our previously defined
1 | b
2 | c
3 | d
+ 4 | e
~~~~
If you've not set how `StringIndexer` handles unseen labels or set it to
@@ -576,7 +579,22 @@ will be generated:
2 | c | 1.0
~~~~
-Notice that the row containing "d" does not appear.
+Notice that the rows containing "d" or "e" do not appear.
+
+If you call `setHandleInvalid("keep")`, the following dataset
+will be generated:
+
+~~~~
+ id | category | categoryIndex
+----|----------|---------------
+ 0 | a | 0.0
+ 1 | b | 2.0
+ 2 | c | 1.0
+ 3 | d | 3.0
+ 4 | e | 3.0
+~~~~
+
+Notice that the rows containing "d" or "e" are mapped to index "3.0"
<div class="codetabs">
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
index a503411b63..810b02febb 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
@@ -17,6 +17,8 @@
package org.apache.spark.ml.feature
+import scala.language.existentials
+
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
@@ -24,7 +26,7 @@ import org.apache.spark.annotation.Since
import org.apache.spark.ml.{Estimator, Model, Transformer}
import org.apache.spark.ml.attribute.{Attribute, NominalAttribute}
import org.apache.spark.ml.param._
-import org.apache.spark.ml.param.shared._
+import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
import org.apache.spark.ml.util._
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.functions._
@@ -34,8 +36,27 @@ import org.apache.spark.util.collection.OpenHashMap
/**
* Base trait for [[StringIndexer]] and [[StringIndexerModel]].
*/
-private[feature] trait StringIndexerBase extends Params with HasInputCol with HasOutputCol
- with HasHandleInvalid {
+private[feature] trait StringIndexerBase extends Params with HasInputCol with HasOutputCol {
+
+ /**
+ * Param for how to handle unseen labels. Options are 'skip' (filter out rows with
+ * unseen labels), 'error' (throw an error), or 'keep' (put unseen labels in a special additional
+ * bucket, at index numLabels.
+ * Default: "error"
+ * @group param
+ */
+ @Since("1.6.0")
+ val handleInvalid: Param[String] = new Param[String](this, "handleInvalid", "how to handle " +
+ "unseen labels. Options are 'skip' (filter out rows with unseen labels), " +
+ "error (throw an error), or 'keep' (put unseen labels in a special additional bucket, " +
+ "at index numLabels).",
+ ParamValidators.inArray(StringIndexer.supportedHandleInvalids))
+
+ setDefault(handleInvalid, StringIndexer.ERROR_UNSEEN_LABEL)
+
+ /** @group getParam */
+ @Since("1.6.0")
+ def getHandleInvalid: String = $(handleInvalid)
/** Validates and transforms the input schema. */
protected def validateAndTransformSchema(schema: StructType): StructType = {
@@ -73,7 +94,6 @@ class StringIndexer @Since("1.4.0") (
/** @group setParam */
@Since("1.6.0")
def setHandleInvalid(value: String): this.type = set(handleInvalid, value)
- setDefault(handleInvalid, "error")
/** @group setParam */
@Since("1.4.0")
@@ -105,6 +125,11 @@ class StringIndexer @Since("1.4.0") (
@Since("1.6.0")
object StringIndexer extends DefaultParamsReadable[StringIndexer] {
+ private[feature] val SKIP_UNSEEN_LABEL: String = "skip"
+ private[feature] val ERROR_UNSEEN_LABEL: String = "error"
+ private[feature] val KEEP_UNSEEN_LABEL: String = "keep"
+ private[feature] val supportedHandleInvalids: Array[String] =
+ Array(SKIP_UNSEEN_LABEL, ERROR_UNSEEN_LABEL, KEEP_UNSEEN_LABEL)
@Since("1.6.0")
override def load(path: String): StringIndexer = super.load(path)
@@ -144,7 +169,6 @@ class StringIndexerModel (
/** @group setParam */
@Since("1.6.0")
def setHandleInvalid(value: String): this.type = set(handleInvalid, value)
- setDefault(handleInvalid, "error")
/** @group setParam */
@Since("1.4.0")
@@ -163,25 +187,34 @@ class StringIndexerModel (
}
transformSchema(dataset.schema, logging = true)
- val indexer = udf { label: String =>
- if (labelToIndex.contains(label)) {
- labelToIndex(label)
- } else {
- throw new SparkException(s"Unseen label: $label.")
- }
+ val filteredLabels = getHandleInvalid match {
+ case StringIndexer.KEEP_UNSEEN_LABEL => labels :+ "__unknown"
+ case _ => labels
}
val metadata = NominalAttribute.defaultAttr
- .withName($(outputCol)).withValues(labels).toMetadata()
+ .withName($(outputCol)).withValues(filteredLabels).toMetadata()
// If we are skipping invalid records, filter them out.
- val filteredDataset = getHandleInvalid match {
- case "skip" =>
+ val (filteredDataset, keepInvalid) = getHandleInvalid match {
+ case StringIndexer.SKIP_UNSEEN_LABEL =>
val filterer = udf { label: String =>
labelToIndex.contains(label)
}
- dataset.where(filterer(dataset($(inputCol))))
- case _ => dataset
+ (dataset.where(filterer(dataset($(inputCol)))), false)
+ case _ => (dataset, getHandleInvalid == StringIndexer.KEEP_UNSEEN_LABEL)
}
+
+ val indexer = udf { label: String =>
+ if (labelToIndex.contains(label)) {
+ labelToIndex(label)
+ } else if (keepInvalid) {
+ labels.length
+ } else {
+ throw new SparkException(s"Unseen label: $label. To handle unseen labels, " +
+ s"set Param handleInvalid to ${StringIndexer.KEEP_UNSEEN_LABEL}.")
+ }
+ }
+
filteredDataset.select(col("*"),
indexer(dataset($(inputCol)).cast(StringType)).as($(outputCol), metadata))
}
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
index 2d0e63c9d6..188dffb3dd 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
@@ -64,7 +64,7 @@ class StringIndexerSuite
test("StringIndexerUnseen") {
val data = Seq((0, "a"), (1, "b"), (4, "b"))
- val data2 = Seq((0, "a"), (1, "b"), (2, "c"))
+ val data2 = Seq((0, "a"), (1, "b"), (2, "c"), (3, "d"))
val df = data.toDF("id", "label")
val df2 = data2.toDF("id", "label")
val indexer = new StringIndexer()
@@ -75,22 +75,32 @@ class StringIndexerSuite
intercept[SparkException] {
indexer.transform(df2).collect()
}
- val indexerSkipInvalid = new StringIndexer()
- .setInputCol("label")
- .setOutputCol("labelIndex")
- .setHandleInvalid("skip")
- .fit(df)
+
+ indexer.setHandleInvalid("skip")
// Verify that we skip the c record
- val transformed = indexerSkipInvalid.transform(df2)
- val attr = Attribute.fromStructField(transformed.schema("labelIndex"))
+ val transformedSkip = indexer.transform(df2)
+ val attrSkip = Attribute.fromStructField(transformedSkip.schema("labelIndex"))
.asInstanceOf[NominalAttribute]
- assert(attr.values.get === Array("b", "a"))
- val output = transformed.select("id", "labelIndex").rdd.map { r =>
+ assert(attrSkip.values.get === Array("b", "a"))
+ val outputSkip = transformedSkip.select("id", "labelIndex").rdd.map { r =>
(r.getInt(0), r.getDouble(1))
}.collect().toSet
// a -> 1, b -> 0
- val expected = Set((0, 1.0), (1, 0.0))
- assert(output === expected)
+ val expectedSkip = Set((0, 1.0), (1, 0.0))
+ assert(outputSkip === expectedSkip)
+
+ indexer.setHandleInvalid("keep")
+ // Verify that we keep the unseen records
+ val transformedKeep = indexer.transform(df2)
+ val attrKeep = Attribute.fromStructField(transformedKeep.schema("labelIndex"))
+ .asInstanceOf[NominalAttribute]
+ assert(attrKeep.values.get === Array("b", "a", "__unknown"))
+ val outputKeep = transformedKeep.select("id", "labelIndex").rdd.map { r =>
+ (r.getInt(0), r.getDouble(1))
+ }.collect().toSet
+ // a -> 1, b -> 0, c -> 2, d -> 3
+ val expectedKeep = Set((0, 1.0), (1, 0.0), (2, 2.0), (3, 2.0))
+ assert(outputKeep === expectedKeep)
}
test("StringIndexer with a numeric input column") {
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 56b8c0b95e..bd4528bd21 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -915,6 +915,10 @@ object MimaExcludes {
// [SPARK-17163] Unify logistic regression interface. Private constructor has new signature.
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.classification.LogisticRegressionModel.this")
) ++ Seq(
+ // [SPARK-17498] StringIndexer enhancement for handling unseen labels
+ ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ml.feature.StringIndexer"),
+ ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ml.feature.StringIndexerModel")
+ ) ++ Seq(
// [SPARK-17365][Core] Remove/Kill multiple executors together to reduce RPC call time
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.SparkContext")
) ++ Seq(