aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVinceShieh <vincent.xie@intel.com>2016-09-21 10:20:57 +0100
committerSean Owen <sowen@cloudera.com>2016-09-21 10:20:57 +0100
commit57dc326bd00cf0a49da971e9c573c48ae28acaa2 (patch)
tree3e57f59b33e42beddd1df72dbd85266e7b09ef7f
parentb366f18496e1ce8bd20fe58a0245ef7d91819a03 (diff)
downloadspark-57dc326bd00cf0a49da971e9c573c48ae28acaa2.tar.gz
spark-57dc326bd00cf0a49da971e9c573c48ae28acaa2.tar.bz2
spark-57dc326bd00cf0a49da971e9c573c48ae28acaa2.zip
[SPARK-17219][ML] Add NaN value handling in Bucketizer
## What changes were proposed in this pull request? This PR fixes an issue when Bucketizer is called to handle a dataset containing NaN value. Sometimes, null value might also be useful to users, so in these cases, Bucketizer should reserve one extra bucket for NaN values, instead of throwing an illegal exception. Before: ``` Bucketizer.transform on NaN value threw an illegal exception. ``` After: ``` NaN values will be grouped in an extra bucket. ``` ## How was this patch tested? New test cases added in `BucketizerSuite`. Signed-off-by: VinceShieh <vincent.xieintel.com> Author: VinceShieh <vincent.xie@intel.com> Closes #14858 from VinceShieh/spark-17219.
-rw-r--r--docs/ml-features.md6
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala13
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala9
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala31
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala29
-rwxr-xr-xpython/pyspark/ml/feature.py5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala4
7 files changed, 85 insertions, 12 deletions
diff --git a/docs/ml-features.md b/docs/ml-features.md
index 746593fb9e..a39b31c8f7 100644
--- a/docs/ml-features.md
+++ b/docs/ml-features.md
@@ -1102,7 +1102,11 @@ for more details on the API.
## QuantileDiscretizer
`QuantileDiscretizer` takes a column with continuous features and outputs a column with binned
-categorical features. The number of bins is set by the `numBuckets` parameter.
+categorical features. The number of bins is set by the `numBuckets` parameter. It is possible
+that the number of buckets used will be less than this value, for example, if there are too few
+distinct values of the input to create enough distinct quantiles. Note also that NaN values are
+handled specially and placed into their own bucket. For example, if 4 buckets are used, then
+non-NaN data will be put into buckets[0-3], but NaNs will be counted in a special bucket[4].
The bin ranges are chosen using an approximate algorithm (see the documentation for
[approxQuantile](api/scala/index.html#org.apache.spark.sql.DataFrameStatFunctions) for a
detailed description). The precision of the approximation can be controlled with the
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala
index 100d9e7f6c..ec0ea05f9e 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala
@@ -106,7 +106,10 @@ final class Bucketizer @Since("1.4.0") (@Since("1.4.0") override val uid: String
@Since("1.6.0")
object Bucketizer extends DefaultParamsReadable[Bucketizer] {
- /** We require splits to be of length >= 3 and to be in strictly increasing order. */
+ /**
+ * We require splits to be of length >= 3 and to be in strictly increasing order.
+ * No NaN split should be accepted.
+ */
private[feature] def checkSplits(splits: Array[Double]): Boolean = {
if (splits.length < 3) {
false
@@ -114,10 +117,10 @@ object Bucketizer extends DefaultParamsReadable[Bucketizer] {
var i = 0
val n = splits.length - 1
while (i < n) {
- if (splits(i) >= splits(i + 1)) return false
+ if (splits(i) >= splits(i + 1) || splits(i).isNaN) return false
i += 1
}
- true
+ !splits(n).isNaN
}
}
@@ -126,7 +129,9 @@ object Bucketizer extends DefaultParamsReadable[Bucketizer] {
* @throws SparkException if a feature is < splits.head or > splits.last
*/
private[feature] def binarySearchForBuckets(splits: Array[Double], feature: Double): Double = {
- if (feature == splits.last) {
+ if (feature.isNaN) {
+ splits.length - 1
+ } else if (feature == splits.last) {
splits.length - 2
} else {
val idx = ju.Arrays.binarySearch(splits, feature)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala
index e09800877c..1e59d71a70 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala
@@ -39,7 +39,7 @@ private[feature] trait QuantileDiscretizerBase extends Params
* default: 2
* @group param
*/
- val numBuckets = new IntParam(this, "numBuckets", "Maximum number of buckets (quantiles, or " +
+ val numBuckets = new IntParam(this, "numBuckets", "Number of buckets (quantiles, or " +
"categories) into which data points are grouped. Must be >= 2.",
ParamValidators.gtEq(2))
setDefault(numBuckets -> 2)
@@ -65,7 +65,12 @@ private[feature] trait QuantileDiscretizerBase extends Params
/**
* `QuantileDiscretizer` takes a column with continuous features and outputs a column with binned
- * categorical features. The number of bins can be set using the `numBuckets` parameter.
+ * categorical features. The number of bins can be set using the `numBuckets` parameter. It is
+ * possible that the number of buckets used will be less than this value, for example, if there
+ * are too few distinct values of the input to create enough distinct quantiles. Note also that
+ * NaN values are handled specially and placed into their own bucket. For example, if 4 buckets
+ * are used, then non-NaN data will be put into buckets(0-3), but NaNs will be counted in a special
+ * bucket(4).
* The bin ranges are chosen using an approximate algorithm (see the documentation for
* [[org.apache.spark.sql.DataFrameStatFunctions.approxQuantile approxQuantile]]
* for a detailed description). The precision of the approximation can be controlled with the
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala
index cd10c78311..c7f5093e74 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala
@@ -88,6 +88,37 @@ class BucketizerSuite extends SparkFunSuite with MLlibTestSparkContext with Defa
}
}
+ test("Bucket continuous features, with NaN data but non-NaN splits") {
+ val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity)
+ val validData = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9, Double.NaN, Double.NaN, Double.NaN)
+ val expectedBuckets = Array(0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0, 4.0, 4.0, 4.0)
+ val dataFrame: DataFrame =
+ spark.createDataFrame(validData.zip(expectedBuckets)).toDF("feature", "expected")
+
+ val bucketizer: Bucketizer = new Bucketizer()
+ .setInputCol("feature")
+ .setOutputCol("result")
+ .setSplits(splits)
+
+ bucketizer.transform(dataFrame).select("result", "expected").collect().foreach {
+ case Row(x: Double, y: Double) =>
+ assert(x === y,
+ s"The feature value is not correct after bucketing. Expected $y but found $x")
+ }
+ }
+
+ test("Bucket continuous features, with NaN splits") {
+ val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity, Double.NaN)
+ withClue("Invalid NaN split was not caught as an invalid split!") {
+ intercept[IllegalArgumentException] {
+ val bucketizer: Bucketizer = new Bucketizer()
+ .setInputCol("feature")
+ .setOutputCol("result")
+ .setSplits(splits)
+ }
+ }
+ }
+
test("Binary search correctness on hand-picked examples") {
import BucketizerSuite.checkBinarySearch
// length 3, with -inf
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala
index 18f1e89ee8..6822594044 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala
@@ -52,12 +52,12 @@ class QuantileDiscretizerSuite
"Bucket sizes are not within expected relative error tolerance.")
}
- test("Test Bucketizer on duplicated splits") {
+ test("Test on data with high proportion of duplicated values") {
val spark = this.spark
import spark.implicits._
- val datasetSize = 12
val numBuckets = 5
+ val expectedNumBuckets = 3
val df = sc.parallelize(Array(1.0, 3.0, 2.0, 1.0, 1.0, 2.0, 3.0, 2.0, 2.0, 2.0, 1.0, 3.0))
.map(Tuple1.apply).toDF("input")
val discretizer = new QuantileDiscretizer()
@@ -65,10 +65,31 @@ class QuantileDiscretizerSuite
.setOutputCol("result")
.setNumBuckets(numBuckets)
val result = discretizer.fit(df).transform(df)
+ val observedNumBuckets = result.select("result").distinct.count
+ assert(observedNumBuckets == expectedNumBuckets,
+ s"Observed number of buckets are not correct." +
+ s" Expected $expectedNumBuckets but found $observedNumBuckets")
+ }
+
+ test("Test transform on data with NaN value") {
+ val spark = this.spark
+ import spark.implicits._
+
+ val numBuckets = 3
+ val df = sc.parallelize(Array(1.0, 1.0, 1.0, Double.NaN))
+ .map(Tuple1.apply).toDF("input")
+ val discretizer = new QuantileDiscretizer()
+ .setInputCol("input")
+ .setOutputCol("result")
+ .setNumBuckets(numBuckets)
+ // Reserve extra one bucket for NaN
+ val expectedNumBuckets = discretizer.fit(df).getSplits.length - 1
+ val result = discretizer.fit(df).transform(df)
val observedNumBuckets = result.select("result").distinct.count
- assert(2 <= observedNumBuckets && observedNumBuckets <= numBuckets,
- "Observed number of buckets are not within expected range.")
+ assert(observedNumBuckets == expectedNumBuckets,
+ s"Observed number of buckets are not correct." +
+ s" Expected $expectedNumBuckets but found $observedNumBuckets")
}
test("Test transform method on unseen data") {
diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py
index 2881380152..c45434f1a5 100755
--- a/python/pyspark/ml/feature.py
+++ b/python/pyspark/ml/feature.py
@@ -1155,6 +1155,11 @@ class QuantileDiscretizer(JavaEstimator, HasInputCol, HasOutputCol, JavaMLReadab
`QuantileDiscretizer` takes a column with continuous features and outputs a column with binned
categorical features. The number of bins can be set using the :py:attr:`numBuckets` parameter.
+ It is possible that the number of buckets used will be less than this value, for example, if
+ there are too few distinct values of the input to create enough distinct quantiles. Note also
+ that NaN values are handled specially and placed into their own bucket. For example, if 4
+ buckets are used, then non-NaN data will be put into buckets(0-3), but NaNs will be counted in
+ a special bucket(4).
The bin ranges are chosen using an approximate algorithm (see the documentation for
:py:meth:`~.DataFrameStatFunctions.approxQuantile` for a detailed description).
The precision of the approximation can be controlled with the
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala
index 1855eab96e..d69be36917 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala
@@ -52,6 +52,7 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) {
* The algorithm was first present in [[http://dx.doi.org/10.1145/375663.375670 Space-efficient
* Online Computation of Quantile Summaries]] by Greenwald and Khanna.
*
+ * Note that NaN values will be removed from the numerical column before calculation
* @param col the name of the numerical column
* @param probabilities a list of quantile probabilities
* Each number must belong to [0, 1].
@@ -67,7 +68,8 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) {
col: String,
probabilities: Array[Double],
relativeError: Double): Array[Double] = {
- StatFunctions.multipleApproxQuantiles(df, Seq(col), probabilities, relativeError).head.toArray
+ StatFunctions.multipleApproxQuantiles(df.select(col).na.drop(),
+ Seq(col), probabilities, relativeError).head.toArray
}
/**