aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala304
-rw-r--r--mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java89
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala241
3 files changed, 634 insertions, 0 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
new file mode 100644
index 0000000000..5ed6477bae
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.regression
+
+import java.io.Serializable
+import java.lang.{Double => JDouble}
+import java.util.Arrays.binarySearch
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD}
+import org.apache.spark.rdd.RDD
+
+/**
+ * Regression model for isotonic regression.
+ *
+ * @param boundaries Array of boundaries for which predictions are known.
+ * Boundaries must be sorted in increasing order.
+ * @param predictions Array of predictions associated to the boundaries at the same index.
+ * Results of isotonic regression and therefore monotone.
+ * @param isotonic indicates whether this is isotonic or antitonic.
+ */
+class IsotonicRegressionModel (
+ val boundaries: Array[Double],
+ val predictions: Array[Double],
+ val isotonic: Boolean) extends Serializable {
+
+ private val predictionOrd = if (isotonic) Ordering[Double] else Ordering[Double].reverse
+
+ require(boundaries.length == predictions.length)
+ assertOrdered(boundaries)
+ assertOrdered(predictions)(predictionOrd)
+
+ /** Asserts the input array is monotone with the given ordering. */
+ private def assertOrdered(xs: Array[Double])(implicit ord: Ordering[Double]): Unit = {
+ var i = 1
+ while (i < xs.length) {
+ require(ord.compare(xs(i - 1), xs(i)) <= 0,
+ s"Elements (${xs(i - 1)}, ${xs(i)}) are not ordered.")
+ i += 1
+ }
+ }
+
+ /**
+ * Predict labels for provided features.
+ * Using a piecewise linear function.
+ *
+ * @param testData Features to be labeled.
+ * @return Predicted labels.
+ */
+ def predict(testData: RDD[Double]): RDD[Double] = {
+ testData.map(predict)
+ }
+
+ /**
+ * Predict labels for provided features.
+ * Using a piecewise linear function.
+ *
+ * @param testData Features to be labeled.
+ * @return Predicted labels.
+ */
+ def predict(testData: JavaDoubleRDD): JavaDoubleRDD = {
+ JavaDoubleRDD.fromRDD(predict(testData.rdd.retag.asInstanceOf[RDD[Double]]))
+ }
+
+ /**
+ * Predict a single label.
+ * Using a piecewise linear function.
+ *
+ * @param testData Feature to be labeled.
+ * @return Predicted label.
+ * 1) If testData exactly matches a boundary then associated prediction is returned.
+ * In case there are multiple predictions with the same boundary then one of them
+ * is returned. Which one is undefined (same as java.util.Arrays.binarySearch).
+ * 2) If testData is lower or higher than all boundaries then first or last prediction
+ * is returned respectively. In case there are multiple predictions with the same
+ * boundary then the lowest or highest is returned respectively.
+ * 3) If testData falls between two values in boundary array then prediction is treated
+ * as piecewise linear function and interpolated value is returned. In case there are
+ * multiple values with the same boundary then the same rules as in 2) are used.
+ */
+ def predict(testData: Double): Double = {
+
+ def linearInterpolation(x1: Double, y1: Double, x2: Double, y2: Double, x: Double): Double = {
+ y1 + (y2 - y1) * (x - x1) / (x2 - x1)
+ }
+
+ val foundIndex = binarySearch(boundaries, testData)
+ val insertIndex = -foundIndex - 1
+
+ // Find if the index was lower than all values,
+ // higher than all values, in between two values or exact match.
+ if (insertIndex == 0) {
+ predictions.head
+ } else if (insertIndex == boundaries.length){
+ predictions.last
+ } else if (foundIndex < 0) {
+ linearInterpolation(
+ boundaries(insertIndex - 1),
+ predictions(insertIndex - 1),
+ boundaries(insertIndex),
+ predictions(insertIndex),
+ testData)
+ } else {
+ predictions(foundIndex)
+ }
+ }
+}
+
+/**
+ * Isotonic regression.
+ * Currently implemented using parallelized pool adjacent violators algorithm.
+ * Only univariate (single feature) algorithm supported.
+ *
+ * Sequential PAV implementation based on:
+ * Tibshirani, Ryan J., Holger Hoefling, and Robert Tibshirani.
+ * "Nearly-isotonic regression." Technometrics 53.1 (2011): 54-61.
+ * Available from http://www.stat.cmu.edu/~ryantibs/papers/neariso.pdf
+ *
+ * Sequential PAV parallelization based on:
+ * Kearsley, Anthony J., Richard A. Tapia, and Michael W. Trosset.
+ * "An approach to parallelizing isotonic regression."
+ * Applied Mathematics and Parallel Computing. Physica-Verlag HD, 1996. 141-147.
+ * Available from http://softlib.rice.edu/pub/CRPC-TRs/reports/CRPC-TR96640.pdf
+ */
+class IsotonicRegression private (private var isotonic: Boolean) extends Serializable {
+
+ /**
+ * Constructs IsotonicRegression instance with default parameter isotonic = true.
+ *
+ * @return New instance of IsotonicRegression.
+ */
+ def this() = this(true)
+
+ /**
+ * Sets the isotonic parameter.
+ *
+ * @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
+ * @return This instance of IsotonicRegression.
+ */
+ def setIsotonic(isotonic: Boolean): this.type = {
+ this.isotonic = isotonic
+ this
+ }
+
+ /**
+ * Run IsotonicRegression algorithm to obtain isotonic regression model.
+ *
+ * @param input RDD of tuples (label, feature, weight) where label is dependent variable
+ * for which we calculate isotonic regression, feature is independent variable
+ * and weight represents number of measures with default 1.
+ * If multiple labels share the same feature value then they are ordered before
+ * the algorithm is executed.
+ * @return Isotonic regression model.
+ */
+ def run(input: RDD[(Double, Double, Double)]): IsotonicRegressionModel = {
+ val preprocessedInput = if (isotonic) {
+ input
+ } else {
+ input.map(x => (-x._1, x._2, x._3))
+ }
+
+ val pooled = parallelPoolAdjacentViolators(preprocessedInput)
+
+ val predictions = if (isotonic) pooled.map(_._1) else pooled.map(-_._1)
+ val boundaries = pooled.map(_._2)
+
+ new IsotonicRegressionModel(boundaries, predictions, isotonic)
+ }
+
+ /**
+ * Run pool adjacent violators algorithm to obtain isotonic regression model.
+ *
+ * @param input JavaRDD of tuples (label, feature, weight) where label is dependent variable
+ * for which we calculate isotonic regression, feature is independent variable
+ * and weight represents number of measures with default 1.
+ * If multiple labels share the same feature value then they are ordered before
+ * the algorithm is executed.
+ * @return Isotonic regression model.
+ */
+ def run(input: JavaRDD[(JDouble, JDouble, JDouble)]): IsotonicRegressionModel = {
+ run(input.rdd.retag.asInstanceOf[RDD[(Double, Double, Double)]])
+ }
+
+ /**
+ * Performs a pool adjacent violators algorithm (PAV).
+ * Uses approach with single processing of data where violators
+ * in previously processed data created by pooling are fixed immediately.
+ * Uses optimization of discovering monotonicity violating sequences (blocks).
+ *
+ * @param input Input data of tuples (label, feature, weight).
+ * @return Result tuples (label, feature, weight) where labels were updated
+ * to form a monotone sequence as per isotonic regression definition.
+ */
+ private def poolAdjacentViolators(
+ input: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] = {
+
+ if (input.isEmpty) {
+ return Array.empty
+ }
+
+ // Pools sub array within given bounds assigning weighted average value to all elements.
+ def pool(input: Array[(Double, Double, Double)], start: Int, end: Int): Unit = {
+ val poolSubArray = input.slice(start, end + 1)
+
+ val weightedSum = poolSubArray.map(lp => lp._1 * lp._3).sum
+ val weight = poolSubArray.map(_._3).sum
+
+ var i = start
+ while (i <= end) {
+ input(i) = (weightedSum / weight, input(i)._2, input(i)._3)
+ i = i + 1
+ }
+ }
+
+ var i = 0
+ while (i < input.length) {
+ var j = i
+
+ // Find monotonicity violating sequence, if any.
+ while (j < input.length - 1 && input(j)._1 > input(j + 1)._1) {
+ j = j + 1
+ }
+
+ // If monotonicity was not violated, move to next data point.
+ if (i == j) {
+ i = i + 1
+ } else {
+ // Otherwise pool the violating sequence
+ // and check if pooling caused monotonicity violation in previously processed points.
+ while (i >= 0 && input(i)._1 > input(i + 1)._1) {
+ pool(input, i, j)
+ i = i - 1
+ }
+
+ i = j
+ }
+ }
+
+ // For points having the same prediction, we only keep two boundary points.
+ val compressed = ArrayBuffer.empty[(Double, Double, Double)]
+
+ var (curLabel, curFeature, curWeight) = input.head
+ var rightBound = curFeature
+ def merge(): Unit = {
+ compressed += ((curLabel, curFeature, curWeight))
+ if (rightBound > curFeature) {
+ compressed += ((curLabel, rightBound, 0.0))
+ }
+ }
+ i = 1
+ while (i < input.length) {
+ val (label, feature, weight) = input(i)
+ if (label == curLabel) {
+ curWeight += weight
+ rightBound = feature
+ } else {
+ merge()
+ curLabel = label
+ curFeature = feature
+ curWeight = weight
+ rightBound = curFeature
+ }
+ i += 1
+ }
+ merge()
+
+ compressed.toArray
+ }
+
+ /**
+ * Performs parallel pool adjacent violators algorithm.
+ * Performs Pool adjacent violators algorithm on each partition and then again on the result.
+ *
+ * @param input Input data of tuples (label, feature, weight).
+ * @return Result tuples (label, feature, weight) where labels were updated
+ * to form a monotone sequence as per isotonic regression definition.
+ */
+ private def parallelPoolAdjacentViolators(
+ input: RDD[(Double, Double, Double)]): Array[(Double, Double, Double)] = {
+ val parallelStepResult = input
+ .sortBy(x => (x._2, x._1))
+ .glom()
+ .flatMap(poolAdjacentViolators)
+ .collect()
+ .sortBy(x => (x._2, x._1)) // Sort again because collect() doesn't promise ordering.
+ poolAdjacentViolators(parallelStepResult)
+ }
+}
diff --git a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java
new file mode 100644
index 0000000000..d38fc91ace
--- /dev/null
+++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.regression;
+
+import java.io.Serializable;
+import java.util.List;
+
+import scala.Tuple3;
+
+import com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.api.java.JavaDoubleRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+public class JavaIsotonicRegressionSuite implements Serializable {
+ private transient JavaSparkContext sc;
+
+ private List<Tuple3<Double, Double, Double>> generateIsotonicInput(double[] labels) {
+ List<Tuple3<Double, Double, Double>> input = Lists.newArrayList();
+
+ for (int i = 1; i <= labels.length; i++) {
+ input.add(new Tuple3<Double, Double, Double>(labels[i-1], (double) i, 1d));
+ }
+
+ return input;
+ }
+
+ private IsotonicRegressionModel runIsotonicRegression(double[] labels) {
+ JavaRDD<Tuple3<Double, Double, Double>> trainRDD =
+ sc.parallelize(generateIsotonicInput(labels), 2).cache();
+
+ return new IsotonicRegression().run(trainRDD);
+ }
+
+ @Before
+ public void setUp() {
+ sc = new JavaSparkContext("local", "JavaLinearRegressionSuite");
+ }
+
+ @After
+ public void tearDown() {
+ sc.stop();
+ sc = null;
+ }
+
+ @Test
+ public void testIsotonicRegressionJavaRDD() {
+ IsotonicRegressionModel model =
+ runIsotonicRegression(new double[]{1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12});
+
+ Assert.assertArrayEquals(
+ new double[] {1, 2, 7d/3, 7d/3, 6, 7, 8, 10, 10, 12}, model.predictions(), 1e-14);
+ }
+
+ @Test
+ public void testIsotonicRegressionPredictionsJavaRDD() {
+ IsotonicRegressionModel model =
+ runIsotonicRegression(new double[]{1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 12});
+
+ JavaDoubleRDD testRDD = sc.parallelizeDoubles(Lists.newArrayList(0.0, 1.0, 9.5, 12.0, 13.0));
+ List<Double> predictions = model.predict(testRDD).collect();
+
+ Assert.assertTrue(predictions.get(0) == 1d);
+ Assert.assertTrue(predictions.get(1) == 1d);
+ Assert.assertTrue(predictions.get(2) == 10d);
+ Assert.assertTrue(predictions.get(3) == 12d);
+ Assert.assertTrue(predictions.get(4) == 12d);
+ }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala
new file mode 100644
index 0000000000..7ef4524828
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.regression
+
+import org.scalatest.{Matchers, FunSuite}
+
+import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.mllib.util.TestingUtils._
+
+class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with Matchers {
+
+ private def round(d: Double) = {
+ Math.round(d * 100).toDouble / 100
+ }
+
+ private def generateIsotonicInput(labels: Seq[Double]): Seq[(Double, Double, Double)] = {
+ Seq.tabulate(labels.size)(i => (labels(i), i.toDouble, 1d))
+ }
+
+ private def generateIsotonicInput(
+ labels: Seq[Double],
+ weights: Seq[Double]): Seq[(Double, Double, Double)] = {
+ Seq.tabulate(labels.size)(i => (labels(i), i.toDouble, weights(i)))
+ }
+
+ private def runIsotonicRegression(
+ labels: Seq[Double],
+ weights: Seq[Double],
+ isotonic: Boolean): IsotonicRegressionModel = {
+ val trainRDD = sc.parallelize(generateIsotonicInput(labels, weights)).cache()
+ new IsotonicRegression().setIsotonic(isotonic).run(trainRDD)
+ }
+
+ private def runIsotonicRegression(
+ labels: Seq[Double],
+ isotonic: Boolean): IsotonicRegressionModel = {
+ runIsotonicRegression(labels, Array.fill(labels.size)(1d), isotonic)
+ }
+
+ test("increasing isotonic regression") {
+ /*
+ The following result could be re-produced with sklearn.
+
+ > from sklearn.isotonic import IsotonicRegression
+ > x = range(9)
+ > y = [1, 2, 3, 1, 6, 17, 16, 17, 18]
+ > ir = IsotonicRegression(x, y)
+ > print ir.predict(x)
+
+ array([ 1. , 2. , 2. , 2. , 6. , 16.5, 16.5, 17. , 18. ])
+ */
+ val model = runIsotonicRegression(Seq(1, 2, 3, 1, 6, 17, 16, 17, 18), true)
+
+ assert(Array.tabulate(9)(x => model.predict(x)) === Array(1, 2, 2, 2, 6, 16.5, 16.5, 17, 18))
+
+ assert(model.boundaries === Array(0, 1, 3, 4, 5, 6, 7, 8))
+ assert(model.predictions === Array(1, 2, 2, 6, 16.5, 16.5, 17.0, 18.0))
+ assert(model.isotonic)
+ }
+
+ test("isotonic regression with size 0") {
+ val model = runIsotonicRegression(Seq(), true)
+
+ assert(model.predictions === Array())
+ }
+
+ test("isotonic regression with size 1") {
+ val model = runIsotonicRegression(Seq(1), true)
+
+ assert(model.predictions === Array(1.0))
+ }
+
+ test("isotonic regression strictly increasing sequence") {
+ val model = runIsotonicRegression(Seq(1, 2, 3, 4, 5), true)
+
+ assert(model.predictions === Array(1, 2, 3, 4, 5))
+ }
+
+ test("isotonic regression strictly decreasing sequence") {
+ val model = runIsotonicRegression(Seq(5, 4, 3, 2, 1), true)
+
+ assert(model.boundaries === Array(0, 4))
+ assert(model.predictions === Array(3, 3))
+ }
+
+ test("isotonic regression with last element violating monotonicity") {
+ val model = runIsotonicRegression(Seq(1, 2, 3, 4, 2), true)
+
+ assert(model.boundaries === Array(0, 1, 2, 4))
+ assert(model.predictions === Array(1, 2, 3, 3))
+ }
+
+ test("isotonic regression with first element violating monotonicity") {
+ val model = runIsotonicRegression(Seq(4, 2, 3, 4, 5), true)
+
+ assert(model.boundaries === Array(0, 2, 3, 4))
+ assert(model.predictions === Array(3, 3, 4, 5))
+ }
+
+ test("isotonic regression with negative labels") {
+ val model = runIsotonicRegression(Seq(-1, -2, 0, 1, -1), true)
+
+ assert(model.boundaries === Array(0, 1, 2, 4))
+ assert(model.predictions === Array(-1.5, -1.5, 0, 0))
+ }
+
+ test("isotonic regression with unordered input") {
+ val trainRDD = sc.parallelize(generateIsotonicInput(Seq(1, 2, 3, 4, 5)).reverse, 2).cache()
+
+ val model = new IsotonicRegression().run(trainRDD)
+ assert(model.predictions === Array(1, 2, 3, 4, 5))
+ }
+
+ test("weighted isotonic regression") {
+ val model = runIsotonicRegression(Seq(1, 2, 3, 4, 2), Seq(1, 1, 1, 1, 2), true)
+
+ assert(model.boundaries === Array(0, 1, 2, 4))
+ assert(model.predictions === Array(1, 2, 2.75, 2.75))
+ }
+
+ test("weighted isotonic regression with weights lower than 1") {
+ val model = runIsotonicRegression(Seq(1, 2, 3, 2, 1), Seq(1, 1, 1, 0.1, 0.1), true)
+
+ assert(model.boundaries === Array(0, 1, 2, 4))
+ assert(model.predictions.map(round) === Array(1, 2, 3.3/1.2, 3.3/1.2))
+ }
+
+ test("weighted isotonic regression with negative weights") {
+ val model = runIsotonicRegression(Seq(1, 2, 3, 2, 1), Seq(-1, 1, -3, 1, -5), true)
+
+ assert(model.boundaries === Array(0.0, 1.0, 4.0))
+ assert(model.predictions === Array(1.0, 10.0/6, 10.0/6))
+ }
+
+ test("weighted isotonic regression with zero weights") {
+ val model = runIsotonicRegression(Seq[Double](1, 2, 3, 2, 1), Seq[Double](0, 0, 0, 1, 0), true)
+
+ assert(model.boundaries === Array(0.0, 1.0, 4.0))
+ assert(model.predictions === Array(1, 2, 2))
+ }
+
+ test("isotonic regression prediction") {
+ val model = runIsotonicRegression(Seq(1, 2, 7, 1, 2), true)
+
+ assert(model.predict(-2) === 1)
+ assert(model.predict(-1) === 1)
+ assert(model.predict(0.5) === 1.5)
+ assert(model.predict(0.75) === 1.75)
+ assert(model.predict(1) === 2)
+ assert(model.predict(2) === 10d/3)
+ assert(model.predict(9) === 10d/3)
+ }
+
+ test("isotonic regression prediction with duplicate features") {
+ val trainRDD = sc.parallelize(
+ Seq[(Double, Double, Double)](
+ (2, 1, 1), (1, 1, 1), (4, 2, 1), (2, 2, 1), (6, 3, 1), (5, 3, 1)), 2).cache()
+ val model = new IsotonicRegression().run(trainRDD)
+
+ assert(model.predict(0) === 1)
+ assert(model.predict(1.5) === 2)
+ assert(model.predict(2.5) === 4.5)
+ assert(model.predict(4) === 6)
+ }
+
+ test("antitonic regression prediction with duplicate features") {
+ val trainRDD = sc.parallelize(
+ Seq[(Double, Double, Double)](
+ (5, 1, 1), (6, 1, 1), (2, 2, 1), (4, 2, 1), (1, 3, 1), (2, 3, 1)), 2).cache()
+ val model = new IsotonicRegression().setIsotonic(false).run(trainRDD)
+
+ assert(model.predict(0) === 6)
+ assert(model.predict(1.5) === 4.5)
+ assert(model.predict(2.5) === 2)
+ assert(model.predict(4) === 1)
+ }
+
+ test("isotonic regression RDD prediction") {
+ val model = runIsotonicRegression(Seq(1, 2, 7, 1, 2), true)
+
+ val testRDD = sc.parallelize(List(-2.0, -1.0, 0.5, 0.75, 1.0, 2.0, 9.0), 2).cache()
+ val predictions = testRDD.map(x => (x, model.predict(x))).collect().sortBy(_._1).map(_._2)
+ assert(predictions === Array(1, 1, 1.5, 1.75, 2, 10.0/3, 10.0/3))
+ }
+
+ test("antitonic regression prediction") {
+ val model = runIsotonicRegression(Seq(7, 5, 3, 5, 1), false)
+
+ assert(model.predict(-2) === 7)
+ assert(model.predict(-1) === 7)
+ assert(model.predict(0.5) === 6)
+ assert(model.predict(0.75) === 5.5)
+ assert(model.predict(1) === 5)
+ assert(model.predict(2) === 4)
+ assert(model.predict(9) === 1)
+ }
+
+ test("model construction") {
+ val model = new IsotonicRegressionModel(Array(0.0, 1.0), Array(1.0, 2.0), isotonic = true)
+ assert(model.predict(-0.5) === 1.0)
+ assert(model.predict(0.0) === 1.0)
+ assert(model.predict(0.5) ~== 1.5 absTol 1e-14)
+ assert(model.predict(1.0) === 2.0)
+ assert(model.predict(1.5) === 2.0)
+
+ intercept[IllegalArgumentException] {
+ // different array sizes.
+ new IsotonicRegressionModel(Array(0.0, 1.0), Array(1.0), isotonic = true)
+ }
+
+ intercept[IllegalArgumentException] {
+ // unordered boundaries
+ new IsotonicRegressionModel(Array(1.0, 0.0), Array(1.0, 2.0), isotonic = true)
+ }
+
+ intercept[IllegalArgumentException] {
+ // unordered predictions (isotonic)
+ new IsotonicRegressionModel(Array(0.0, 1.0), Array(2.0, 1.0), isotonic = true)
+ }
+
+ intercept[IllegalArgumentException] {
+ // unordered predictions (antitonic)
+ new IsotonicRegressionModel(Array(0.0, 1.0), Array(1.0, 2.0), isotonic = false)
+ }
+ }
+}