aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala160
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala42
-rw-r--r--python/pyspark/ml/feature.py58
3 files changed, 176 insertions, 84 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala
index eb6ec49f85..8f34878c8d 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala
@@ -17,94 +17,152 @@
package org.apache.spark.ml.feature
-import org.apache.spark.SparkException
import org.apache.spark.annotation.Experimental
-import org.apache.spark.ml.UnaryTransformer
-import org.apache.spark.ml.attribute.{Attribute, BinaryAttribute, NominalAttribute}
+import org.apache.spark.ml.Transformer
+import org.apache.spark.ml.attribute._
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
import org.apache.spark.ml.util.{Identifiable, SchemaUtils}
-import org.apache.spark.mllib.linalg.{Vector, VectorUDT, Vectors}
-import org.apache.spark.sql.types.{DataType, DoubleType, StructType}
+import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.functions.{col, udf}
+import org.apache.spark.sql.types.{DoubleType, StructType}
/**
* :: Experimental ::
- * A one-hot encoder that maps a column of label indices to a column of binary vectors, with
- * at most a single one-value. By default, the binary vector has an element for each category, so
- * with 5 categories, an input value of 2.0 would map to an output vector of
- * (0.0, 0.0, 1.0, 0.0, 0.0). If includeFirst is set to false, the first category is omitted, so the
- * output vector for the previous example would be (0.0, 1.0, 0.0, 0.0) and an input value
- * of 0.0 would map to a vector of all zeros. Including the first category makes the vector columns
- * linearly dependent because they sum up to one.
+ * A one-hot encoder that maps a column of category indices to a column of binary vectors, with
+ * at most a single one-value per row that indicates the input category index.
+ * For example with 5 categories, an input value of 2.0 would map to an output vector of
+ * `[0.0, 0.0, 1.0, 0.0]`.
+ * The last category is not included by default (configurable via [[OneHotEncoder!.dropLast]]
+ * because it makes the vector entries sum up to one, and hence linearly dependent.
+ * So an input value of 4.0 maps to `[0.0, 0.0, 0.0, 0.0]`.
+ * Note that this is different from scikit-learn's OneHotEncoder, which keeps all categories.
+ * The output vectors are sparse.
+ *
+ * @see [[StringIndexer]] for converting categorical values into category indices
*/
@Experimental
-class OneHotEncoder(override val uid: String)
- extends UnaryTransformer[Double, Vector, OneHotEncoder] with HasInputCol with HasOutputCol {
+class OneHotEncoder(override val uid: String) extends Transformer
+ with HasInputCol with HasOutputCol {
def this() = this(Identifiable.randomUID("oneHot"))
/**
- * Whether to include a component in the encoded vectors for the first category, defaults to true.
+ * Whether to drop the last category in the encoded vector (default: true)
* @group param
*/
- final val includeFirst: BooleanParam =
- new BooleanParam(this, "includeFirst", "include first category")
- setDefault(includeFirst -> true)
-
- private var categories: Array[String] = _
+ final val dropLast: BooleanParam =
+ new BooleanParam(this, "dropLast", "whether to drop the last category")
+ setDefault(dropLast -> true)
/** @group setParam */
- def setIncludeFirst(value: Boolean): this.type = set(includeFirst, value)
+ def setDropLast(value: Boolean): this.type = set(dropLast, value)
/** @group setParam */
- override def setInputCol(value: String): this.type = set(inputCol, value)
+ def setInputCol(value: String): this.type = set(inputCol, value)
/** @group setParam */
- override def setOutputCol(value: String): this.type = set(outputCol, value)
+ def setOutputCol(value: String): this.type = set(outputCol, value)
override def transformSchema(schema: StructType): StructType = {
- SchemaUtils.checkColumnType(schema, $(inputCol), DoubleType)
- val inputFields = schema.fields
+ val is = "_is_"
+ val inputColName = $(inputCol)
val outputColName = $(outputCol)
- require(inputFields.forall(_.name != $(outputCol)),
- s"Output column ${$(outputCol)} already exists.")
- val inputColAttr = Attribute.fromStructField(schema($(inputCol)))
- categories = inputColAttr match {
+ SchemaUtils.checkColumnType(schema, inputColName, DoubleType)
+ val inputFields = schema.fields
+ require(!inputFields.exists(_.name == outputColName),
+ s"Output column $outputColName already exists.")
+
+ val inputAttr = Attribute.fromStructField(schema(inputColName))
+ val outputAttrNames: Option[Array[String]] = inputAttr match {
case nominal: NominalAttribute =>
- nominal.values.getOrElse((0 until nominal.numValues.get).map(_.toString).toArray)
- case binary: BinaryAttribute => binary.values.getOrElse(Array("0", "1"))
+ if (nominal.values.isDefined) {
+ nominal.values.map(_.map(v => inputColName + is + v))
+ } else if (nominal.numValues.isDefined) {
+ nominal.numValues.map(n => Array.tabulate(n)(i => inputColName + is + i))
+ } else {
+ None
+ }
+ case binary: BinaryAttribute =>
+ if (binary.values.isDefined) {
+ binary.values.map(_.map(v => inputColName + is + v))
+ } else {
+ Some(Array.tabulate(2)(i => inputColName + is + i))
+ }
+ case _: NumericAttribute =>
+ throw new RuntimeException(
+ s"The input column $inputColName cannot be numeric.")
case _ =>
- throw new SparkException(s"OneHotEncoder input column ${$(inputCol)} is not nominal")
+ None // optimistic about unknown attributes
}
- val attrValues = (if ($(includeFirst)) categories else categories.drop(1)).toArray
- val attr = NominalAttribute.defaultAttr.withName(outputColName).withValues(attrValues)
- val outputFields = inputFields :+ attr.toStructField()
+ val filteredOutputAttrNames = outputAttrNames.map { names =>
+ if ($(dropLast)) {
+ require(names.length > 1,
+ s"The input column $inputColName should have at least two distinct values.")
+ names.dropRight(1)
+ } else {
+ names
+ }
+ }
+
+ val outputAttrGroup = if (filteredOutputAttrNames.isDefined) {
+ val attrs: Array[Attribute] = filteredOutputAttrNames.get.map { name =>
+ BinaryAttribute.defaultAttr.withName(name)
+ }
+ new AttributeGroup($(outputCol), attrs)
+ } else {
+ new AttributeGroup($(outputCol))
+ }
+
+ val outputFields = inputFields :+ outputAttrGroup.toStructField()
StructType(outputFields)
}
- protected override def createTransformFunc(): (Double) => Vector = {
- val first = $(includeFirst)
- val vecLen = if (first) categories.length else categories.length - 1
+ override def transform(dataset: DataFrame): DataFrame = {
+ // schema transformation
+ val is = "_is_"
+ val inputColName = $(inputCol)
+ val outputColName = $(outputCol)
+ val shouldDropLast = $(dropLast)
+ var outputAttrGroup = AttributeGroup.fromStructField(
+ transformSchema(dataset.schema)(outputColName))
+ if (outputAttrGroup.size < 0) {
+ // If the number of attributes is unknown, we check the values from the input column.
+ val numAttrs = dataset.select(col(inputColName).cast(DoubleType)).map(_.getDouble(0))
+ .aggregate(0.0)(
+ (m, x) => {
+ assert(x >=0.0 && x == x.toInt,
+ s"Values from column $inputColName must be indices, but got $x.")
+ math.max(m, x)
+ },
+ (m0, m1) => {
+ math.max(m0, m1)
+ }
+ ).toInt + 1
+ val outputAttrNames = Array.tabulate(numAttrs)(i => inputColName + is + i)
+ val filtered = if (shouldDropLast) outputAttrNames.dropRight(1) else outputAttrNames
+ val outputAttrs: Array[Attribute] =
+ filtered.map(name => BinaryAttribute.defaultAttr.withName(name))
+ outputAttrGroup = new AttributeGroup(outputColName, outputAttrs)
+ }
+ val metadata = outputAttrGroup.toMetadata()
+
+ // data transformation
+ val size = outputAttrGroup.size
val oneValue = Array(1.0)
val emptyValues = Array[Double]()
val emptyIndices = Array[Int]()
- label: Double => {
- val values = if (first || label != 0.0) oneValue else emptyValues
- val indices = if (first) {
- Array(label.toInt)
- } else if (label != 0.0) {
- Array(label.toInt - 1)
+ val encode = udf { label: Double =>
+ if (label < size) {
+ Vectors.sparse(size, Array(label.toInt), oneValue)
} else {
- emptyIndices
+ Vectors.sparse(size, emptyIndices, emptyValues)
}
- Vectors.sparse(vecLen, indices, values)
}
- }
- /**
- * Returns the data type of the output column.
- */
- protected def outputDataType: DataType = new VectorUDT
+ dataset.select(col("*"), encode(col(inputColName).cast(DoubleType)).as(outputColName, metadata))
+ }
}
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala
index 056b9eda86..9018d0024d 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala
@@ -19,10 +19,11 @@ package org.apache.spark.ml.feature
import org.scalatest.FunSuite
+import org.apache.spark.ml.attribute.{AttributeGroup, BinaryAttribute, NominalAttribute}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.sql.DataFrame
-
+import org.apache.spark.sql.functions.col
class OneHotEncoderSuite extends FunSuite with MLlibTestSparkContext {
@@ -36,15 +37,16 @@ class OneHotEncoderSuite extends FunSuite with MLlibTestSparkContext {
indexer.transform(df)
}
- test("OneHotEncoder includeFirst = true") {
+ test("OneHotEncoder dropLast = false") {
val transformed = stringIndexed()
val encoder = new OneHotEncoder()
.setInputCol("labelIndex")
.setOutputCol("labelVec")
+ .setDropLast(false)
val encoded = encoder.transform(transformed)
val output = encoded.select("id", "labelVec").map { r =>
- val vec = r.get(1).asInstanceOf[Vector]
+ val vec = r.getAs[Vector](1)
(r.getInt(0), vec(0), vec(1), vec(2))
}.collect().toSet
// a -> 0, b -> 2, c -> 1
@@ -53,22 +55,46 @@ class OneHotEncoderSuite extends FunSuite with MLlibTestSparkContext {
assert(output === expected)
}
- test("OneHotEncoder includeFirst = false") {
+ test("OneHotEncoder dropLast = true") {
val transformed = stringIndexed()
val encoder = new OneHotEncoder()
- .setIncludeFirst(false)
.setInputCol("labelIndex")
.setOutputCol("labelVec")
val encoded = encoder.transform(transformed)
val output = encoded.select("id", "labelVec").map { r =>
- val vec = r.get(1).asInstanceOf[Vector]
+ val vec = r.getAs[Vector](1)
(r.getInt(0), vec(0), vec(1))
}.collect().toSet
// a -> 0, b -> 2, c -> 1
- val expected = Set((0, 0.0, 0.0), (1, 0.0, 1.0), (2, 1.0, 0.0),
- (3, 0.0, 0.0), (4, 0.0, 0.0), (5, 1.0, 0.0))
+ val expected = Set((0, 1.0, 0.0), (1, 0.0, 0.0), (2, 0.0, 1.0),
+ (3, 1.0, 0.0), (4, 1.0, 0.0), (5, 0.0, 1.0))
assert(output === expected)
}
+ test("input column with ML attribute") {
+ val attr = NominalAttribute.defaultAttr.withValues("small", "medium", "large")
+ val df = sqlContext.createDataFrame(Seq(0.0, 1.0, 2.0, 1.0).map(Tuple1.apply)).toDF("size")
+ .select(col("size").as("size", attr.toMetadata()))
+ val encoder = new OneHotEncoder()
+ .setInputCol("size")
+ .setOutputCol("encoded")
+ val output = encoder.transform(df)
+ val group = AttributeGroup.fromStructField(output.schema("encoded"))
+ assert(group.size === 2)
+ assert(group.getAttr(0) === BinaryAttribute.defaultAttr.withName("size_is_small").withIndex(0))
+ assert(group.getAttr(1) === BinaryAttribute.defaultAttr.withName("size_is_medium").withIndex(1))
+ }
+
+ test("input column without ML attribute") {
+ val df = sqlContext.createDataFrame(Seq(0.0, 1.0, 2.0, 1.0).map(Tuple1.apply)).toDF("index")
+ val encoder = new OneHotEncoder()
+ .setInputCol("index")
+ .setOutputCol("encoded")
+ val output = encoder.transform(df)
+ val group = AttributeGroup.fromStructField(output.schema("encoded"))
+ assert(group.size === 2)
+ assert(group.getAttr(0) === BinaryAttribute.defaultAttr.withName("index_is_0").withIndex(0))
+ assert(group.getAttr(1) === BinaryAttribute.defaultAttr.withName("index_is_1").withIndex(1))
+ }
}
diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py
index b0479d9b07..ddb33f427a 100644
--- a/python/pyspark/ml/feature.py
+++ b/python/pyspark/ml/feature.py
@@ -324,65 +324,73 @@ class Normalizer(JavaTransformer, HasInputCol, HasOutputCol):
@inherit_doc
class OneHotEncoder(JavaTransformer, HasInputCol, HasOutputCol):
"""
- A one-hot encoder that maps a column of label indices to a column of binary vectors, with
- at most a single one-value. By default, the binary vector has an element for each category, so
- with 5 categories, an input value of 2.0 would map to an output vector of
- (0.0, 0.0, 1.0, 0.0, 0.0). If includeFirst is set to false, the first category is omitted, so
- the output vector for the previous example would be (0.0, 1.0, 0.0, 0.0) and an input value
- of 0.0 would map to a vector of all zeros. Including the first category makes the vector columns
- linearly dependent because they sum up to one.
-
- TODO: This method requires the use of StringIndexer first. Decouple them.
+ A one-hot encoder that maps a column of category indices to a
+ column of binary vectors, with at most a single one-value per row
+ that indicates the input category index.
+ For example with 5 categories, an input value of 2.0 would map to
+ an output vector of `[0.0, 0.0, 1.0, 0.0]`.
+ The last category is not included by default (configurable via
+ :py:attr:`dropLast`) because it makes the vector entries sum up to
+ one, and hence linearly dependent.
+ So an input value of 4.0 maps to `[0.0, 0.0, 0.0, 0.0]`.
+ Note that this is different from scikit-learn's OneHotEncoder,
+ which keeps all categories.
+ The output vectors are sparse.
+
+ .. seealso::
+
+ :py:class:`StringIndexer` for converting categorical values into
+ category indices
>>> stringIndexer = StringIndexer(inputCol="label", outputCol="indexed")
>>> model = stringIndexer.fit(stringIndDf)
>>> td = model.transform(stringIndDf)
- >>> encoder = OneHotEncoder(includeFirst=False, inputCol="indexed", outputCol="features")
+ >>> encoder = OneHotEncoder(inputCol="indexed", outputCol="features")
>>> encoder.transform(td).head().features
- SparseVector(2, {})
+ SparseVector(2, {0: 1.0})
>>> encoder.setParams(outputCol="freqs").transform(td).head().freqs
- SparseVector(2, {})
- >>> params = {encoder.includeFirst: True, encoder.outputCol: "test"}
+ SparseVector(2, {0: 1.0})
+ >>> params = {encoder.dropLast: False, encoder.outputCol: "test"}
>>> encoder.transform(td, params).head().test
SparseVector(3, {0: 1.0})
"""
# a placeholder to make it appear in the generated doc
- includeFirst = Param(Params._dummy(), "includeFirst", "include first category")
+ dropLast = Param(Params._dummy(), "dropLast", "whether to drop the last category")
@keyword_only
- def __init__(self, includeFirst=True, inputCol=None, outputCol=None):
+ def __init__(self, dropLast=True, inputCol=None, outputCol=None):
"""
__init__(self, includeFirst=True, inputCol=None, outputCol=None)
"""
super(OneHotEncoder, self).__init__()
self._java_obj = self._new_java_obj("org.apache.spark.ml.feature.OneHotEncoder", self.uid)
- self.includeFirst = Param(self, "includeFirst", "include first category")
- self._setDefault(includeFirst=True)
+ self.dropLast = Param(self, "dropLast", "whether to drop the last category")
+ self._setDefault(dropLast=True)
kwargs = self.__init__._input_kwargs
self.setParams(**kwargs)
@keyword_only
- def setParams(self, includeFirst=True, inputCol=None, outputCol=None):
+ def setParams(self, dropLast=True, inputCol=None, outputCol=None):
"""
- setParams(self, includeFirst=True, inputCol=None, outputCol=None)
+ setParams(self, dropLast=True, inputCol=None, outputCol=None)
Sets params for this OneHotEncoder.
"""
kwargs = self.setParams._input_kwargs
return self._set(**kwargs)
- def setIncludeFirst(self, value):
+ def setDropLast(self, value):
"""
- Sets the value of :py:attr:`includeFirst`.
+ Sets the value of :py:attr:`dropLast`.
"""
- self._paramMap[self.includeFirst] = value
+ self._paramMap[self.dropLast] = value
return self
- def getIncludeFirst(self):
+ def getDropLast(self):
"""
- Gets the value of includeFirst or its default value.
+ Gets the value of dropLast or its default value.
"""
- return self.getOrDefault(self.includeFirst)
+ return self.getOrDefault(self.dropLast)
@inherit_doc