aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorz001qdp <Nicholas.Eggert@target.com>2016-07-15 12:30:22 +0100
committerSean Owen <sowen@cloudera.com>2016-07-15 12:30:22 +0100
commit71ad945bbbdd154eae852cd7f841e98f7a83e8d4 (patch)
tree9d6d5b62dba642b46978a729a968e0057faecaf8 /mllib
parent1832423827fd518853b63f91c321e4568a39107d (diff)
downloadspark-71ad945bbbdd154eae852cd7f841e98f7a83e8d4.tar.gz
spark-71ad945bbbdd154eae852cd7f841e98f7a83e8d4.tar.bz2
spark-71ad945bbbdd154eae852cd7f841e98f7a83e8d4.zip
[SPARK-16426][MLLIB] Fix bug that caused NaNs in IsotonicRegression
## What changes were proposed in this pull request? Fixed a bug that caused `NaN`s in `IsotonicRegression`. The problem occurs when training rows with the same feature value but different labels end up on different partitions. This patch changes a `sortBy` call to a `partitionBy(RangePartitioner)` followed by a `mapPartitions(sortBy)` in order to ensure that all rows with the same feature value end up on the same partition. ## How was this patch tested? Added a unit test. Author: z001qdp <Nicholas.Eggert@target.com> Closes #14140 from neggert/SPARK-16426-isotonic-nan.
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala9
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala11
2 files changed, 17 insertions, 3 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
index 1cd6f2a896..377326f873 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
@@ -35,6 +35,7 @@ import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.util.{Loader, Saveable}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
+import org.apache.spark.RangePartitioner
/**
* Regression model for isotonic regression.
@@ -408,9 +409,11 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
*/
private def parallelPoolAdjacentViolators(
input: RDD[(Double, Double, Double)]): Array[(Double, Double, Double)] = {
- val parallelStepResult = input
- .sortBy(x => (x._2, x._1))
- .glom()
+ val keyedInput = input.keyBy(_._2)
+ val parallelStepResult = keyedInput
+ .partitionBy(new RangePartitioner(keyedInput.getNumPartitions, keyedInput))
+ .values
+ .mapPartitions(p => Iterator(p.toArray.sortBy(x => (x._2, x._1))))
.flatMap(poolAdjacentViolators)
.collect()
.sortBy(x => (x._2, x._1)) // Sort again because collect() doesn't promise ordering.
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala
index ea4f286575..94da626d92 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala
@@ -176,6 +176,17 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w
assert(model.predictions === Array(1, 2, 2))
}
+ test("SPARK-16426 isotonic regression with duplicate features that produce NaNs") {
+ val trainRDD = sc.parallelize(Seq[(Double, Double, Double)]((2, 1, 1), (1, 1, 1), (0, 2, 1),
+ (1, 2, 1), (0.5, 3, 1), (0, 3, 1)),
+ 2)
+
+ val model = new IsotonicRegression().run(trainRDD)
+
+ assert(model.boundaries === Array(1.0, 3.0))
+ assert(model.predictions === Array(0.75, 0.75))
+ }
+
test("isotonic regression prediction") {
val model = runIsotonicRegression(Seq(1, 2, 7, 1, 2), true)