diff options
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala | 151 | ||||
-rw-r--r-- | mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala | 17 |
2 files changed, 97 insertions, 71 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index 36894d5234..2d236509d5 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -236,9 +236,8 @@ object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] { * Only univariate (single feature) algorithm supported. * * Sequential PAV implementation based on: - * Tibshirani, Ryan J., Holger Hoefling, and Robert Tibshirani. - * "Nearly-isotonic regression." Technometrics 53.1 (2011): 54-61. - * Available from <a href="http://www.stat.cmu.edu/~ryantibs/papers/neariso.pdf">here</a> + * Grotzinger, S. J., and C. Witzgall. + * "Projections onto order simplexes." Applied mathematics and Optimization 12.1 (1984): 247-270. * * Sequential PAV parallelization based on: * Kearsley, Anthony J., Richard A. Tapia, and Michael W. Trosset. @@ -312,90 +311,118 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali } /** - * Performs a pool adjacent violators algorithm (PAV). - * Uses approach with single processing of data where violators - * in previously processed data created by pooling are fixed immediately. - * Uses optimization of discovering monotonicity violating sequences (blocks). + * Performs a pool adjacent violators algorithm (PAV). Implements the algorithm originally + * described in [1], using the formulation from [2, 3]. Uses an array to keep track of start + * and end indices of blocks. * - * @param input Input data of tuples (label, feature, weight). + * [1] Grotzinger, S. J., and C. Witzgall. "Projections onto order simplexes." Applied + * mathematics and Optimization 12.1 (1984): 247-270. + * + * [2] Best, Michael J., and Nilotpal Chakravarti. "Active set algorithms for isotonic + * regression; a unifying framework." Mathematical Programming 47.1-3 (1990): 425-439. + * + * [3] Best, Michael J., Nilotpal Chakravarti, and Vasant A. Ubhaya. "Minimizing separable convex + * functions subject to simple chain constraints." SIAM Journal on Optimization 10.3 (2000): + * 658-672. + * + * @param input Input data of tuples (label, feature, weight). Weights must + be non-negative. * @return Result tuples (label, feature, weight) where labels were updated * to form a monotone sequence as per isotonic regression definition. */ private def poolAdjacentViolators( input: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] = { - if (input.isEmpty) { - return Array.empty + val cleanInput = input.filter{ case (y, x, weight) => + require( + weight >= 0.0, + s"Negative weight at point ($y, $x, $weight). Weights must be non-negative" + ) + weight > 0 } - // Pools sub array within given bounds assigning weighted average value to all elements. - def pool(input: Array[(Double, Double, Double)], start: Int, end: Int): Unit = { - val poolSubArray = input.slice(start, end + 1) + if (cleanInput.isEmpty) { + return Array.empty + } - val weightedSum = poolSubArray.map(lp => lp._1 * lp._3).sum - val weight = poolSubArray.map(_._3).sum + // Keeps track of the start and end indices of the blocks. if [i, j] is a valid block from + // cleanInput(i) to cleanInput(j) (inclusive), then blockBounds(i) = j and blockBounds(j) = i + // Initially, each data point is its own block. + val blockBounds = Array.range(0, cleanInput.length) - var i = start - while (i <= end) { - input(i) = (weightedSum / weight, input(i)._2, input(i)._3) - i = i + 1 - } + // Keep track of the sum of weights and sum of weight * y for each block. weights(start) + // gives the values for the block. Entries that are not at the start of a block + // are meaningless. + val weights: Array[(Double, Double)] = cleanInput.map { case (y, _, weight) => + (weight, weight * y) } - var i = 0 - val len = input.length - while (i < len) { - var j = i + // a few convenience functions to make the code more readable - // Find monotonicity violating sequence, if any. - while (j < len - 1 && input(j)._1 > input(j + 1)._1) { - j = j + 1 - } + // blockStart and blockEnd have identical implementations. We create two different + // functions to make the code more expressive + def blockEnd(start: Int): Int = blockBounds(start) + def blockStart(end: Int): Int = blockBounds(end) - // If monotonicity was not violated, move to next data point. - if (i == j) { - i = i + 1 - } else { - // Otherwise pool the violating sequence - // and check if pooling caused monotonicity violation in previously processed points. - while (i >= 0 && input(i)._1 > input(i + 1)._1) { - pool(input, i, j) - i = i - 1 - } + // the next block starts at the index after the end of this block + def nextBlock(start: Int): Int = blockEnd(start) + 1 - i = j - } + // the previous block ends at the index before the start of this block + // we then use blockStart to find the start + def prevBlock(start: Int): Int = blockStart(start - 1) + + // Merge two adjacent blocks, updating blockBounds and weights to reflect the merge + // Return the start index of the merged block + def merge(block1: Int, block2: Int): Int = { + assert( + blockEnd(block1) + 1 == block2, + s"Attempting to merge non-consecutive blocks [${block1}, ${blockEnd(block1)}]" + + s" and [${block2}, ${blockEnd(block2)}]. This is likely a bug in the isotonic regression" + + " implementation. Please file a bug report." + ) + blockBounds(block1) = blockEnd(block2) + blockBounds(blockEnd(block2)) = block1 + val w1 = weights(block1) + val w2 = weights(block2) + weights(block1) = (w1._1 + w2._1, w1._2 + w2._2) + block1 } - // For points having the same prediction, we only keep two boundary points. - val compressed = ArrayBuffer.empty[(Double, Double, Double)] + // average value of a block + def average(start: Int): Double = weights(start)._2 / weights(start)._1 - var (curLabel, curFeature, curWeight) = input.head - var rightBound = curFeature - def merge(): Unit = { - compressed += ((curLabel, curFeature, curWeight)) - if (rightBound > curFeature) { - compressed += ((curLabel, rightBound, 0.0)) + // Implement Algorithm PAV from [3]. + // Merge on >= instead of > because it eliminates adjacent blocks with the same average, and we + // want to compress our output as much as possible. Both give correct results. + var i = 0 + while (nextBlock(i) < cleanInput.length) { + if (average(i) >= average(nextBlock(i))) { + merge(i, nextBlock(i)) + while((i > 0) && (average(prevBlock(i)) >= average(i))) { + i = merge(prevBlock(i), i) + } + } else { + i = nextBlock(i) } } - i = 1 - while (i < input.length) { - val (label, feature, weight) = input(i) - if (label == curLabel) { - curWeight += weight - rightBound = feature + + // construct the output by walking through the blocks in order + val output = ArrayBuffer.empty[(Double, Double, Double)] + i = 0 + while (i < cleanInput.length) { + // If block size is > 1, a point at the start and end of the block, + // each receiving half the weight. Otherwise, a single point with + // all the weight. + if (cleanInput(blockEnd(i))._2 > cleanInput(i)._2) { + output += ((average(i), cleanInput(i)._2, weights(i)._1 / 2)) + output += ((average(i), cleanInput(blockEnd(i))._2, weights(i)._1 / 2)) } else { - merge() - curLabel = label - curFeature = feature - curWeight = weight - rightBound = curFeature + output += ((average(i), cleanInput(i)._2, weights(i)._1)) } - i += 1 + i = nextBlock(i) } - merge() - compressed.toArray + output.toArray } /** diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala index 94da626d92..02ea74b87f 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.mllib.regression import org.scalatest.Matchers -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.mllib.util.TestingUtils._ import org.apache.spark.util.Utils @@ -163,17 +163,16 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w } test("weighted isotonic regression with negative weights") { - val model = runIsotonicRegression(Seq(1, 2, 3, 2, 1), Seq(-1, 1, -3, 1, -5), true) - - assert(model.boundaries === Array(0.0, 1.0, 4.0)) - assert(model.predictions === Array(1.0, 10.0/6, 10.0/6)) + val ex = intercept[SparkException] { + runIsotonicRegression(Seq(1, 2, 3, 2, 1), Seq(-1, 1, -3, 1, -5), true) + } + assert(ex.getCause.isInstanceOf[IllegalArgumentException]) } test("weighted isotonic regression with zero weights") { - val model = runIsotonicRegression(Seq[Double](1, 2, 3, 2, 1), Seq[Double](0, 0, 0, 1, 0), true) - - assert(model.boundaries === Array(0.0, 1.0, 4.0)) - assert(model.predictions === Array(1, 2, 2)) + val model = runIsotonicRegression(Seq(1, 2, 3, 2, 1, 0), Seq(0, 0, 0, 1, 1, 0), true) + assert(model.boundaries === Array(3, 4)) + assert(model.predictions === Array(1.5, 1.5)) } test("SPARK-16426 isotonic regression with duplicate features that produce NaNs") { |