aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala18
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala19
-rw-r--r--python/pyspark/mllib/regression.py73
3 files changed, 106 insertions, 4 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index b086cec083..426306d78c 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -283,6 +283,24 @@ private[python] class PythonMLLibAPI extends Serializable {
}
/**
+ * Java stub for Python mllib IsotonicRegression.run()
+ */
+ def trainIsotonicRegressionModel(
+ data: JavaRDD[Vector],
+ isotonic: Boolean): JList[Object] = {
+ val isotonicRegressionAlg = new IsotonicRegression().setIsotonic(isotonic)
+ val input = data.rdd.map { x =>
+ (x(0), x(1), x(2))
+ }.persist(StorageLevel.MEMORY_AND_DISK)
+ try {
+ val model = isotonicRegressionAlg.run(input)
+ List[AnyRef](model.boundaryVector, model.predictionVector).asJava
+ } finally {
+ data.rdd.unpersist(blocking = false)
+ }
+ }
+
+ /**
* Java stub for Python mllib KMeans.run()
*/
def trainKMeansModel(
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
index 1d7617046b..be2a00c2df 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
@@ -21,18 +21,20 @@ import java.io.Serializable
import java.lang.{Double => JDouble}
import java.util.Arrays.binarySearch
+import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
+import org.apache.spark.SparkContext
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD}
+import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.util.{Loader, Saveable}
import org.apache.spark.rdd.RDD
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.SQLContext
/**
* :: Experimental ::
@@ -57,6 +59,13 @@ class IsotonicRegressionModel (
assertOrdered(boundaries)
assertOrdered(predictions)(predictionOrd)
+ /** A Java-friendly constructor that takes two Iterable parameters and one Boolean parameter. */
+ def this(boundaries: java.lang.Iterable[Double],
+ predictions: java.lang.Iterable[Double],
+ isotonic: java.lang.Boolean) = {
+ this(boundaries.asScala.toArray, predictions.asScala.toArray, isotonic)
+ }
+
/** Asserts the input array is monotone with the given ordering. */
private def assertOrdered(xs: Array[Double])(implicit ord: Ordering[Double]): Unit = {
var i = 1
@@ -132,6 +141,12 @@ class IsotonicRegressionModel (
}
}
+ /** A convenient method for boundaries called by the Python API. */
+ private[mllib] def boundaryVector: Vector = Vectors.dense(boundaries)
+
+ /** A convenient method for boundaries called by the Python API. */
+ private[mllib] def predictionVector: Vector = Vectors.dense(predictions)
+
override def save(sc: SparkContext, path: String): Unit = {
IsotonicRegressionModel.SaveLoadV1_0.save(sc, path, boundaries, predictions, isotonic)
}
diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py
index 4bc6351bdf..41bde2ce3e 100644
--- a/python/pyspark/mllib/regression.py
+++ b/python/pyspark/mllib/regression.py
@@ -18,14 +18,16 @@
import numpy as np
from numpy import array
+from pyspark import RDD
from pyspark.mllib.common import callMLlibFunc, _py2java, _java2py, inherit_doc
-from pyspark.mllib.linalg import SparseVector, _convert_to_vector
+from pyspark.mllib.linalg import SparseVector, Vectors, _convert_to_vector
from pyspark.mllib.util import Saveable, Loader
__all__ = ['LabeledPoint', 'LinearModel',
'LinearRegressionModel', 'LinearRegressionWithSGD',
'RidgeRegressionModel', 'RidgeRegressionWithSGD',
- 'LassoModel', 'LassoWithSGD']
+ 'LassoModel', 'LassoWithSGD', 'IsotonicRegressionModel',
+ 'IsotonicRegression']
class LabeledPoint(object):
@@ -396,6 +398,73 @@ class RidgeRegressionWithSGD(object):
return _regression_train_wrapper(train, RidgeRegressionModel, data, initialWeights)
+class IsotonicRegressionModel(Saveable, Loader):
+
+ """Regression model for isotonic regression.
+
+ >>> data = [(1, 0, 1), (2, 1, 1), (3, 2, 1), (1, 3, 1), (6, 4, 1), (17, 5, 1), (16, 6, 1)]
+ >>> irm = IsotonicRegression.train(sc.parallelize(data))
+ >>> irm.predict(3)
+ 2.0
+ >>> irm.predict(5)
+ 16.5
+ >>> irm.predict(sc.parallelize([3, 5])).collect()
+ [2.0, 16.5]
+ >>> import os, tempfile
+ >>> path = tempfile.mkdtemp()
+ >>> irm.save(sc, path)
+ >>> sameModel = IsotonicRegressionModel.load(sc, path)
+ >>> sameModel.predict(3)
+ 2.0
+ >>> sameModel.predict(5)
+ 16.5
+ >>> try:
+ ... os.removedirs(path)
+ ... except OSError:
+ ... pass
+ """
+
+ def __init__(self, boundaries, predictions, isotonic):
+ self.boundaries = boundaries
+ self.predictions = predictions
+ self.isotonic = isotonic
+
+ def predict(self, x):
+ if isinstance(x, RDD):
+ return x.map(lambda v: self.predict(v))
+ return np.interp(x, self.boundaries, self.predictions)
+
+ def save(self, sc, path):
+ java_boundaries = _py2java(sc, self.boundaries.tolist())
+ java_predictions = _py2java(sc, self.predictions.tolist())
+ java_model = sc._jvm.org.apache.spark.mllib.regression.IsotonicRegressionModel(
+ java_boundaries, java_predictions, self.isotonic)
+ java_model.save(sc._jsc.sc(), path)
+
+ @classmethod
+ def load(cls, sc, path):
+ java_model = sc._jvm.org.apache.spark.mllib.regression.IsotonicRegressionModel.load(
+ sc._jsc.sc(), path)
+ py_boundaries = _java2py(sc, java_model.boundaryVector()).toArray()
+ py_predictions = _java2py(sc, java_model.predictionVector()).toArray()
+ return IsotonicRegressionModel(py_boundaries, py_predictions, java_model.isotonic)
+
+
+class IsotonicRegression(object):
+ """
+ Run IsotonicRegression algorithm to obtain isotonic regression model.
+
+ :param data: RDD of (label, feature, weight) tuples.
+ :param isotonic: Whether this is isotonic or antitonic.
+ """
+ @classmethod
+ def train(cls, data, isotonic=True):
+ """Train a isotonic regression model on the given data."""
+ boundaries, predictions = callMLlibFunc("trainIsotonicRegressionModel",
+ data.map(_convert_to_vector), bool(isotonic))
+ return IsotonicRegressionModel(boundaries.toArray(), predictions.toArray(), isotonic)
+
+
def _test():
import doctest
from pyspark import SparkContext