aboutsummaryrefslogtreecommitdiff
path: root/mllib/src
diff options
context:
space:
mode:
Diffstat (limited to 'mllib/src')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala151
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala17
2 files changed, 97 insertions, 71 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
index 36894d5234..2d236509d5 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
@@ -236,9 +236,8 @@ object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] {
* Only univariate (single feature) algorithm supported.
*
* Sequential PAV implementation based on:
- * Tibshirani, Ryan J., Holger Hoefling, and Robert Tibshirani.
- * "Nearly-isotonic regression." Technometrics 53.1 (2011): 54-61.
- * Available from <a href="http://www.stat.cmu.edu/~ryantibs/papers/neariso.pdf">here</a>
+ * Grotzinger, S. J., and C. Witzgall.
+ * "Projections onto order simplexes." Applied mathematics and Optimization 12.1 (1984): 247-270.
*
* Sequential PAV parallelization based on:
* Kearsley, Anthony J., Richard A. Tapia, and Michael W. Trosset.
@@ -312,90 +311,118 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
}
/**
- * Performs a pool adjacent violators algorithm (PAV).
- * Uses approach with single processing of data where violators
- * in previously processed data created by pooling are fixed immediately.
- * Uses optimization of discovering monotonicity violating sequences (blocks).
+ * Performs a pool adjacent violators algorithm (PAV). Implements the algorithm originally
+ * described in [1], using the formulation from [2, 3]. Uses an array to keep track of start
+ * and end indices of blocks.
*
- * @param input Input data of tuples (label, feature, weight).
+ * [1] Grotzinger, S. J., and C. Witzgall. "Projections onto order simplexes." Applied
+ * mathematics and Optimization 12.1 (1984): 247-270.
+ *
+ * [2] Best, Michael J., and Nilotpal Chakravarti. "Active set algorithms for isotonic
+ * regression; a unifying framework." Mathematical Programming 47.1-3 (1990): 425-439.
+ *
+ * [3] Best, Michael J., Nilotpal Chakravarti, and Vasant A. Ubhaya. "Minimizing separable convex
+ * functions subject to simple chain constraints." SIAM Journal on Optimization 10.3 (2000):
+ * 658-672.
+ *
+ * @param input Input data of tuples (label, feature, weight). Weights must
+ be non-negative.
* @return Result tuples (label, feature, weight) where labels were updated
* to form a monotone sequence as per isotonic regression definition.
*/
private def poolAdjacentViolators(
input: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] = {
- if (input.isEmpty) {
- return Array.empty
+ val cleanInput = input.filter{ case (y, x, weight) =>
+ require(
+ weight >= 0.0,
+ s"Negative weight at point ($y, $x, $weight). Weights must be non-negative"
+ )
+ weight > 0
}
- // Pools sub array within given bounds assigning weighted average value to all elements.
- def pool(input: Array[(Double, Double, Double)], start: Int, end: Int): Unit = {
- val poolSubArray = input.slice(start, end + 1)
+ if (cleanInput.isEmpty) {
+ return Array.empty
+ }
- val weightedSum = poolSubArray.map(lp => lp._1 * lp._3).sum
- val weight = poolSubArray.map(_._3).sum
+ // Keeps track of the start and end indices of the blocks. if [i, j] is a valid block from
+ // cleanInput(i) to cleanInput(j) (inclusive), then blockBounds(i) = j and blockBounds(j) = i
+ // Initially, each data point is its own block.
+ val blockBounds = Array.range(0, cleanInput.length)
- var i = start
- while (i <= end) {
- input(i) = (weightedSum / weight, input(i)._2, input(i)._3)
- i = i + 1
- }
+ // Keep track of the sum of weights and sum of weight * y for each block. weights(start)
+ // gives the values for the block. Entries that are not at the start of a block
+ // are meaningless.
+ val weights: Array[(Double, Double)] = cleanInput.map { case (y, _, weight) =>
+ (weight, weight * y)
}
- var i = 0
- val len = input.length
- while (i < len) {
- var j = i
+ // a few convenience functions to make the code more readable
- // Find monotonicity violating sequence, if any.
- while (j < len - 1 && input(j)._1 > input(j + 1)._1) {
- j = j + 1
- }
+ // blockStart and blockEnd have identical implementations. We create two different
+ // functions to make the code more expressive
+ def blockEnd(start: Int): Int = blockBounds(start)
+ def blockStart(end: Int): Int = blockBounds(end)
- // If monotonicity was not violated, move to next data point.
- if (i == j) {
- i = i + 1
- } else {
- // Otherwise pool the violating sequence
- // and check if pooling caused monotonicity violation in previously processed points.
- while (i >= 0 && input(i)._1 > input(i + 1)._1) {
- pool(input, i, j)
- i = i - 1
- }
+ // the next block starts at the index after the end of this block
+ def nextBlock(start: Int): Int = blockEnd(start) + 1
- i = j
- }
+ // the previous block ends at the index before the start of this block
+ // we then use blockStart to find the start
+ def prevBlock(start: Int): Int = blockStart(start - 1)
+
+ // Merge two adjacent blocks, updating blockBounds and weights to reflect the merge
+ // Return the start index of the merged block
+ def merge(block1: Int, block2: Int): Int = {
+ assert(
+ blockEnd(block1) + 1 == block2,
+ s"Attempting to merge non-consecutive blocks [${block1}, ${blockEnd(block1)}]" +
+ s" and [${block2}, ${blockEnd(block2)}]. This is likely a bug in the isotonic regression" +
+ " implementation. Please file a bug report."
+ )
+ blockBounds(block1) = blockEnd(block2)
+ blockBounds(blockEnd(block2)) = block1
+ val w1 = weights(block1)
+ val w2 = weights(block2)
+ weights(block1) = (w1._1 + w2._1, w1._2 + w2._2)
+ block1
}
- // For points having the same prediction, we only keep two boundary points.
- val compressed = ArrayBuffer.empty[(Double, Double, Double)]
+ // average value of a block
+ def average(start: Int): Double = weights(start)._2 / weights(start)._1
- var (curLabel, curFeature, curWeight) = input.head
- var rightBound = curFeature
- def merge(): Unit = {
- compressed += ((curLabel, curFeature, curWeight))
- if (rightBound > curFeature) {
- compressed += ((curLabel, rightBound, 0.0))
+ // Implement Algorithm PAV from [3].
+ // Merge on >= instead of > because it eliminates adjacent blocks with the same average, and we
+ // want to compress our output as much as possible. Both give correct results.
+ var i = 0
+ while (nextBlock(i) < cleanInput.length) {
+ if (average(i) >= average(nextBlock(i))) {
+ merge(i, nextBlock(i))
+ while((i > 0) && (average(prevBlock(i)) >= average(i))) {
+ i = merge(prevBlock(i), i)
+ }
+ } else {
+ i = nextBlock(i)
}
}
- i = 1
- while (i < input.length) {
- val (label, feature, weight) = input(i)
- if (label == curLabel) {
- curWeight += weight
- rightBound = feature
+
+ // construct the output by walking through the blocks in order
+ val output = ArrayBuffer.empty[(Double, Double, Double)]
+ i = 0
+ while (i < cleanInput.length) {
+ // If block size is > 1, a point at the start and end of the block,
+ // each receiving half the weight. Otherwise, a single point with
+ // all the weight.
+ if (cleanInput(blockEnd(i))._2 > cleanInput(i)._2) {
+ output += ((average(i), cleanInput(i)._2, weights(i)._1 / 2))
+ output += ((average(i), cleanInput(blockEnd(i))._2, weights(i)._1 / 2))
} else {
- merge()
- curLabel = label
- curFeature = feature
- curWeight = weight
- rightBound = curFeature
+ output += ((average(i), cleanInput(i)._2, weights(i)._1))
}
- i += 1
+ i = nextBlock(i)
}
- merge()
- compressed.toArray
+ output.toArray
}
/**
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala
index 94da626d92..02ea74b87f 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.mllib.regression
import org.scalatest.Matchers
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.mllib.util.TestingUtils._
import org.apache.spark.util.Utils
@@ -163,17 +163,16 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w
}
test("weighted isotonic regression with negative weights") {
- val model = runIsotonicRegression(Seq(1, 2, 3, 2, 1), Seq(-1, 1, -3, 1, -5), true)
-
- assert(model.boundaries === Array(0.0, 1.0, 4.0))
- assert(model.predictions === Array(1.0, 10.0/6, 10.0/6))
+ val ex = intercept[SparkException] {
+ runIsotonicRegression(Seq(1, 2, 3, 2, 1), Seq(-1, 1, -3, 1, -5), true)
+ }
+ assert(ex.getCause.isInstanceOf[IllegalArgumentException])
}
test("weighted isotonic regression with zero weights") {
- val model = runIsotonicRegression(Seq[Double](1, 2, 3, 2, 1), Seq[Double](0, 0, 0, 1, 0), true)
-
- assert(model.boundaries === Array(0.0, 1.0, 4.0))
- assert(model.predictions === Array(1, 2, 2))
+ val model = runIsotonicRegression(Seq(1, 2, 3, 2, 1, 0), Seq(0, 0, 0, 1, 1, 0), true)
+ assert(model.boundaries === Array(3, 4))
+ assert(model.predictions === Array(1.5, 1.5))
}
test("SPARK-16426 isotonic regression with duplicate features that produce NaNs") {