From 2328bdd00f701ca3b1bc7fdf8b2968fafc58fd11 Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Thu, 19 Dec 2013 22:45:16 -0500 Subject: Python side of python bindings for linear, Lasso, and ridge regression --- python/pyspark/__init__.py | 6 ++-- python/pyspark/mllib.py | 81 ++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 72 insertions(+), 15 deletions(-) (limited to 'python') diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py index 9f71db397d..7c8f9148d5 100644 --- a/python/pyspark/__init__.py +++ b/python/pyspark/__init__.py @@ -42,7 +42,9 @@ from pyspark.context import SparkContext from pyspark.rdd import RDD from pyspark.files import SparkFiles from pyspark.storagelevel import StorageLevel -from pyspark.mllib import LinearRegressionModel +from pyspark.mllib import LinearRegressionModel, LassoModel, \ + RidgeRegressionModel -__all__ = ["SparkContext", "RDD", "SparkFiles", "StorageLevel", "LinearRegressionModel"]; +__all__ = ["SparkContext", "RDD", "SparkFiles", "StorageLevel", \ + "LinearRegressionModel", "LassoModel", "RidgeRegressionModel"]; diff --git a/python/pyspark/mllib.py b/python/pyspark/mllib.py index 0dfc4909c7..d3127874be 100644 --- a/python/pyspark/mllib.py +++ b/python/pyspark/mllib.py @@ -75,7 +75,7 @@ def _deserialize_double_matrix(ba): else: raise TypeError("_deserialize_double_matrix called on a non-bytearray") -class LinearRegressionModel(object): +class LinearModel(object): def __init__(self, coeff, intercept): self._coeff = coeff self._intercept = intercept @@ -83,7 +83,7 @@ class LinearRegressionModel(object): def predict(self, x): if (type(x) == ndarray): if (x.ndim == 1): - return dot(_coeff, x) - _intercept + return dot(_coeff, x) + _intercept else: raise RuntimeError("Bulk predict not yet supported.") elif (type(x) == RDD): @@ -92,16 +92,71 @@ class LinearRegressionModel(object): raise TypeError("Bad type argument to " "LinearRegressionModel::predict") +# Map a pickled Python RDD of numpy double vectors to a Java RDD of +# _serialized_double_vectors +def _get_unmangled_double_vector_rdd(data): + dataBytes = data.map(_serialize_double_vector) + dataBytes._bypass_serializer = True + dataBytes.cache() + return dataBytes; + +# If we weren't given initial weights, take a zero vector of the appropriate +# length. +def _get_initial_weights(initial_weights, data): + if initial_weights is None: + initial_weights = data.first() + if type(initial_weights) != ndarray: + raise TypeError("At least one data element has type " + + type(initial_weights) + " which is not ndarray") + if initial_weights.ndim != 1: + raise TypeError("At least one data element has " + + initial_weights.ndim + " dimensions, which is not 1") + initial_weights = zeros([initial_weights.shape[0] - 1]); + return initial_weights; + +# train_func should take two parameters, namely data and initial_weights, and +# return the result of a call to the appropriate JVM stub. +# _regression_train_wrapper is responsible for setup and error checking. +def _regression_train_wrapper(sc, train_func, klass, data, initial_weights): + initial_weights = _get_initial_weights(initial_weights, data) + dataBytes = _get_unmangled_double_vector_rdd(data) + ans = train_func(dataBytes, _serialize_double_vector(initial_weights)) + if len(ans) != 2: + raise RuntimeError("JVM call result had unexpected length"); + elif type(ans[0]) != bytearray: + raise RuntimeError("JVM call result had first element of type " + + type(ans[0]) + " which is not bytearray"); + elif type(ans[1]) != float: + raise RuntimeError("JVM call result had second element of type " + + type(ans[0]) + " which is not float"); + return klass(_deserialize_double_vector(ans[0]), ans[1]); + +class LinearRegressionModel(LinearModel): @classmethod - def train(cls, sc, data): + def train(cls, sc, data, iterations=100, step=1.0, + mini_batch_fraction=1.0, initial_weights=None): """Train a linear regression model on the given data.""" - dataBytes = data.map(_serialize_double_vector) - dataBytes._bypass_serializer = True - dataBytes.cache() - api = sc._jvm.PythonMLLibAPI() - ans = api.trainLinearRegressionModel(dataBytes._jrdd) - if (len(ans) != 2 or type(ans[0]) != bytearray - or type(ans[1]) != float): - raise RuntimeError("train_linear_regression_model received " - "garbage from JVM") - return LinearRegressionModel(_deserialize_double_vector(ans[0]), ans[1]) + return _regression_train_wrapper(sc, lambda d, i: + sc._jvm.PythonMLLibAPI().trainLinearRegressionModel( + d._jrdd, iterations, step, mini_batch_fraction, i), + LinearRegressionModel, data, initial_weights) + +class LassoModel(LinearModel): + @classmethod + def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0, + mini_batch_fraction=1.0, initial_weights=None): + """Train a Lasso regression model on the given data.""" + return _regression_train_wrapper(sc, lambda d, i: + sc._jvm.PythonMLLibAPI().trainLassoModel(d._jrdd, + iterations, step, reg_param, mini_batch_fraction, i), + LassoModel, data, initial_weights) + +class RidgeRegressionModel(LinearModel): + @classmethod + def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0, + mini_batch_fraction=1.0, initial_weights=None): + """Train a ridge regression model on the given data.""" + return _regression_train_wrapper(sc, lambda d, i: + sc._jvm.PythonMLLibAPI().trainRidgeModel(d._jrdd, + iterations, step, reg_param, mini_batch_fraction, i), + RidgeRegressionModel, data, initial_weights) -- cgit v1.2.3