aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala9
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala11
2 files changed, 17 insertions, 3 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
index 1cd6f2a896..377326f873 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
@@ -35,6 +35,7 @@ import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.util.{Loader, Saveable}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
+import org.apache.spark.RangePartitioner
/**
* Regression model for isotonic regression.
@@ -408,9 +409,11 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
*/
private def parallelPoolAdjacentViolators(
input: RDD[(Double, Double, Double)]): Array[(Double, Double, Double)] = {
- val parallelStepResult = input
- .sortBy(x => (x._2, x._1))
- .glom()
+ val keyedInput = input.keyBy(_._2)
+ val parallelStepResult = keyedInput
+ .partitionBy(new RangePartitioner(keyedInput.getNumPartitions, keyedInput))
+ .values
+ .mapPartitions(p => Iterator(p.toArray.sortBy(x => (x._2, x._1))))
.flatMap(poolAdjacentViolators)
.collect()
.sortBy(x => (x._2, x._1)) // Sort again because collect() doesn't promise ordering.
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala
index ea4f286575..94da626d92 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala
@@ -176,6 +176,17 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w
assert(model.predictions === Array(1, 2, 2))
}
+ test("SPARK-16426 isotonic regression with duplicate features that produce NaNs") {
+ val trainRDD = sc.parallelize(Seq[(Double, Double, Double)]((2, 1, 1), (1, 1, 1), (0, 2, 1),
+ (1, 2, 1), (0.5, 3, 1), (0, 3, 1)),
+ 2)
+
+ val model = new IsotonicRegression().run(trainRDD)
+
+ assert(model.boundaries === Array(1.0, 3.0))
+ assert(model.predictions === Array(0.75, 0.75))
+ }
+
test("isotonic regression prediction") {
val model = runIsotonicRegression(Seq(1, 2, 7, 1, 2), true)