aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark')
-rw-r--r--python/pyspark/mllib/_common.py396
-rw-r--r--python/pyspark/mllib/classification.py75
-rw-r--r--python/pyspark/mllib/clustering.py51
-rw-r--r--python/pyspark/mllib/linalg.py245
-rw-r--r--python/pyspark/mllib/regression.py128
-rw-r--r--python/pyspark/mllib/tests.py302
6 files changed, 1066 insertions, 131 deletions
diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py
index e19f5d2aaa..e6f0953810 100644
--- a/python/pyspark/mllib/_common.py
+++ b/python/pyspark/mllib/_common.py
@@ -15,38 +15,86 @@
# limitations under the License.
#
-from numpy import ndarray, float64, int64, int32, ones, array_equal, array, dot, shape, complex, issubdtype
+import struct
+import numpy
+from numpy import ndarray, float64, int64, int32, array_equal, array
from pyspark import SparkContext, RDD
-import numpy as np
-
+from pyspark.mllib.linalg import SparseVector
from pyspark.serializers import Serializer
-import struct
-# Double vector format:
+"""
+Common utilities shared throughout MLlib, primarily for dealing with
+different data types. These include:
+- Serialization utilities to / from byte arrays that Java can handle
+- Serializers for other data types, like ALS Rating objects
+- Common methods for linear models
+- Methods to deal with the different vector types we support, such as
+ SparseVector and scipy.sparse matrices.
+"""
+
+
+# Check whether we have SciPy. MLlib works without it too, but if we have it, some methods,
+# such as _dot and _serialize_double_vector, start to support scipy.sparse matrices.
+
+_have_scipy = False
+_scipy_issparse = None
+try:
+ import scipy.sparse
+ _have_scipy = True
+ _scipy_issparse = scipy.sparse.issparse
+except:
+ # No SciPy in environment, but that's okay
+ pass
+
+
+# Serialization functions to and from Scala. These use the following formats, understood
+# by the PythonMLLibAPI class in Scala:
+#
+# Dense double vector format:
+#
+# [1-byte 1] [4-byte length] [length*8 bytes of data]
#
-# [8-byte 1] [8-byte length] [length*8 bytes of data]
+# Sparse double vector format:
+#
+# [1-byte 2] [4-byte length] [4-byte nonzeros] [nonzeros*4 bytes of indices] [nonzeros*8 bytes of values]
#
# Double matrix format:
#
-# [8-byte 2] [8-byte rows] [8-byte cols] [rows*cols*8 bytes of data]
+# [1-byte 3] [4-byte rows] [4-byte cols] [rows*cols*8 bytes of data]
+#
+# LabeledPoint format:
+#
+# [1-byte 4] [8-byte label] [dense or sparse vector]
#
# This is all in machine-endian. That means that the Java interpreter and the
# Python interpreter must agree on what endian the machine is.
-def _deserialize_byte_array(shape, ba, offset):
- """Wrapper around ndarray aliasing hack.
+
+DENSE_VECTOR_MAGIC = 1
+SPARSE_VECTOR_MAGIC = 2
+DENSE_MATRIX_MAGIC = 3
+LABELED_POINT_MAGIC = 4
+
+
+def _deserialize_numpy_array(shape, ba, offset, dtype=float64):
+ """
+ Deserialize a numpy array of the given type from an offset in
+ bytearray ba, assigning it the given shape.
>>> x = array([1.0, 2.0, 3.0, 4.0, 5.0])
- >>> array_equal(x, _deserialize_byte_array(x.shape, x.data, 0))
+ >>> array_equal(x, _deserialize_numpy_array(x.shape, x.data, 0))
True
>>> x = array([1.0, 2.0, 3.0, 4.0]).reshape(2,2)
- >>> array_equal(x, _deserialize_byte_array(x.shape, x.data, 0))
+ >>> array_equal(x, _deserialize_numpy_array(x.shape, x.data, 0))
+ True
+ >>> x = array([1, 2, 3], dtype=int32)
+ >>> array_equal(x, _deserialize_numpy_array(x.shape, x.data, 0, dtype=int32))
True
"""
- ar = ndarray(shape=shape, buffer=ba, offset=offset, dtype="float64",
- order='C')
+ ar = ndarray(shape=shape, buffer=ba, offset=offset, dtype=dtype, order='C')
return ar.copy()
+
def _serialize_double_vector(v):
"""Serialize a double vector into a mutually understood format.
@@ -55,160 +103,231 @@ def _serialize_double_vector(v):
>>> array_equal(y, array([1.0, 2.0, 3.0]))
True
"""
- if type(v) != ndarray:
- raise TypeError("_serialize_double_vector called on a %s; "
- "wanted ndarray" % type(v))
- """complex is only datatype that can't be converted to float64"""
- if issubdtype(v.dtype, complex):
+ v = _convert_vector(v)
+ if type(v) == ndarray:
+ return _serialize_dense_vector(v)
+ elif type(v) == SparseVector:
+ return _serialize_sparse_vector(v)
+ else:
raise TypeError("_serialize_double_vector called on a %s; "
- "wanted ndarray" % type(v))
- if v.dtype != float64:
- v = v.astype(float64)
+ "wanted ndarray or SparseVector" % type(v))
+
+
+def _serialize_dense_vector(v):
+ """Serialize a dense vector given as a NumPy array."""
if v.ndim != 1:
raise TypeError("_serialize_double_vector called on a %ddarray; "
"wanted a 1darray" % v.ndim)
+ if v.dtype != float64:
+ if numpy.issubdtype(v.dtype, numpy.complex):
+ raise TypeError("_serialize_double_vector called on an ndarray of %s; "
+ "wanted ndarray of float64" % v.dtype)
+ v = v.astype(float64)
length = v.shape[0]
- ba = bytearray(16 + 8*length)
- header = ndarray(shape=[2], buffer=ba, dtype="int64")
- header[0] = 1
- header[1] = length
- arr_mid = ndarray(shape=[length], buffer=ba, offset=16, dtype="float64")
- arr_mid[...] = v
+ ba = bytearray(5 + 8 * length)
+ ba[0] = DENSE_VECTOR_MAGIC
+ length_bytes = ndarray(shape=[1], buffer=ba, offset=1, dtype=int32)
+ length_bytes[0] = length
+ _copyto(v, buffer=ba, offset=5, shape=[length], dtype=float64)
+ return ba
+
+
+def _serialize_sparse_vector(v):
+ """Serialize a pyspark.mllib.linalg.SparseVector."""
+ nonzeros = len(v.indices)
+ ba = bytearray(9 + 12 * nonzeros)
+ ba[0] = SPARSE_VECTOR_MAGIC
+ header = ndarray(shape=[2], buffer=ba, offset=1, dtype=int32)
+ header[0] = v.size
+ header[1] = nonzeros
+ _copyto(v.indices, buffer=ba, offset=9, shape=[nonzeros], dtype=int32)
+ values_offset = 9 + 4 * nonzeros
+ _copyto(v.values, buffer=ba, offset=values_offset, shape=[nonzeros], dtype=float64)
return ba
+
def _deserialize_double_vector(ba):
"""Deserialize a double vector from a mutually understood format.
>>> x = array([1.0, 2.0, 3.0, 4.0, -1.0, 0.0, -0.0])
>>> array_equal(x, _deserialize_double_vector(_serialize_double_vector(x)))
True
+ >>> s = SparseVector(4, [1, 3], [3.0, 5.5])
+ >>> s == _deserialize_double_vector(_serialize_double_vector(s))
+ True
"""
if type(ba) != bytearray:
raise TypeError("_deserialize_double_vector called on a %s; "
"wanted bytearray" % type(ba))
- if len(ba) < 16:
+ if len(ba) < 5:
raise TypeError("_deserialize_double_vector called on a %d-byte array, "
"which is too short" % len(ba))
- if (len(ba) & 7) != 0:
- raise TypeError("_deserialize_double_vector called on a %d-byte array, "
- "which is not a multiple of 8" % len(ba))
- header = ndarray(shape=[2], buffer=ba, dtype="int64")
- if header[0] != 1:
+ if ba[0] == DENSE_VECTOR_MAGIC:
+ return _deserialize_dense_vector(ba)
+ elif ba[0] == SPARSE_VECTOR_MAGIC:
+ return _deserialize_sparse_vector(ba)
+ else:
raise TypeError("_deserialize_double_vector called on bytearray "
"with wrong magic")
- length = header[1]
- if len(ba) != 8*length + 16:
- raise TypeError("_deserialize_double_vector called on bytearray "
+
+
+def _deserialize_dense_vector(ba):
+ """Deserialize a dense vector into a numpy array."""
+ if len(ba) < 5:
+ raise TypeError("_deserialize_dense_vector called on a %d-byte array, "
+ "which is too short" % len(ba))
+ length = ndarray(shape=[1], buffer=ba, offset=1, dtype=int32)[0]
+ if len(ba) != 8 * length + 5:
+ raise TypeError("_deserialize_dense_vector called on bytearray "
+ "with wrong length")
+ return _deserialize_numpy_array([length], ba, 5)
+
+
+def _deserialize_sparse_vector(ba):
+ """Deserialize a sparse vector into a MLlib SparseVector object."""
+ if len(ba) < 9:
+ raise TypeError("_deserialize_sparse_vector called on a %d-byte array, "
+ "which is too short" % len(ba))
+ header = ndarray(shape=[2], buffer=ba, offset=1, dtype=int32)
+ size = header[0]
+ nonzeros = header[1]
+ if len(ba) != 9 + 12 * nonzeros:
+ raise TypeError("_deserialize_sparse_vector called on bytearray "
"with wrong length")
- return _deserialize_byte_array([length], ba, 16)
+ indices = _deserialize_numpy_array([nonzeros], ba, 9, dtype=int32)
+ values = _deserialize_numpy_array([nonzeros], ba, 9 + 4 * nonzeros, dtype=float64)
+ return SparseVector(int(size), indices, values)
+
def _serialize_double_matrix(m):
"""Serialize a double matrix into a mutually understood format."""
- if (type(m) == ndarray and m.dtype == float64 and m.ndim == 2):
+ if (type(m) == ndarray and m.ndim == 2):
+ if m.dtype != float64:
+ if numpy.issubdtype(m.dtype, numpy.complex):
+ raise TypeError("_serialize_double_matrix called on an ndarray of %s; "
+ "wanted ndarray of float64" % m.dtype)
+ m = m.astype(float64)
rows = m.shape[0]
cols = m.shape[1]
- ba = bytearray(24 + 8 * rows * cols)
- header = ndarray(shape=[3], buffer=ba, dtype="int64")
- header[0] = 2
- header[1] = rows
- header[2] = cols
- arr_mid = ndarray(shape=[rows, cols], buffer=ba, offset=24,
- dtype="float64", order='C')
- arr_mid[...] = m
+ ba = bytearray(9 + 8 * rows * cols)
+ ba[0] = DENSE_MATRIX_MAGIC
+ lengths = ndarray(shape=[3], buffer=ba, offset=1, dtype=int32)
+ lengths[0] = rows
+ lengths[1] = cols
+ _copyto(m, buffer=ba, offset=9, shape=[rows, cols], dtype=float64)
return ba
else:
raise TypeError("_serialize_double_matrix called on a "
"non-double-matrix")
+
def _deserialize_double_matrix(ba):
"""Deserialize a double matrix from a mutually understood format."""
if type(ba) != bytearray:
raise TypeError("_deserialize_double_matrix called on a %s; "
"wanted bytearray" % type(ba))
- if len(ba) < 24:
+ if len(ba) < 9:
raise TypeError("_deserialize_double_matrix called on a %d-byte array, "
"which is too short" % len(ba))
- if (len(ba) & 7) != 0:
- raise TypeError("_deserialize_double_matrix called on a %d-byte array, "
- "which is not a multiple of 8" % len(ba))
- header = ndarray(shape=[3], buffer=ba, dtype="int64")
- if (header[0] != 2):
+ if ba[0] != DENSE_MATRIX_MAGIC:
raise TypeError("_deserialize_double_matrix called on bytearray "
"with wrong magic")
- rows = header[1]
- cols = header[2]
- if (len(ba) != 8*rows*cols + 24):
+ lengths = ndarray(shape=[2], buffer=ba, offset=1, dtype=int32)
+ rows = lengths[0]
+ cols = lengths[1]
+ if (len(ba) != 8 * rows * cols + 9):
raise TypeError("_deserialize_double_matrix called on bytearray "
"with wrong length")
- return _deserialize_byte_array([rows, cols], ba, 24)
+ return _deserialize_numpy_array([rows, cols], ba, 9)
+
+
+def _serialize_labeled_point(p):
+ """Serialize a LabeledPoint with a features vector of any type."""
+ from pyspark.mllib.regression import LabeledPoint
+ serialized_features = _serialize_double_vector(p.features)
+ header = bytearray(9)
+ header[0] = LABELED_POINT_MAGIC
+ header_float = ndarray(shape=[1], buffer=header, offset=1, dtype=float64)
+ header_float[0] = p.label
+ return header + serialized_features
+
+
+def _copyto(array, buffer, offset, shape, dtype):
+ """
+ Copy the contents of a vector to a destination bytearray at the
+ given offset.
+
+ TODO: In the future this could use numpy.copyto on NumPy 1.7+, but
+ we should benchmark that to see whether it provides a benefit.
+ """
+ temp_array = ndarray(shape=shape, buffer=buffer, offset=offset, dtype=dtype, order='C')
+ temp_array[...] = array
-def _linear_predictor_typecheck(x, coeffs):
- """Check that x is a one-dimensional vector of the right shape.
- This is a temporary hackaround until I actually implement bulk predict."""
- if type(x) == ndarray:
- if x.ndim == 1:
- if x.shape == coeffs.shape:
- pass
- else:
- raise RuntimeError("Got array of %d elements; wanted %d"
- % (shape(x)[0], shape(coeffs)[0]))
- else:
- raise RuntimeError("Bulk predict not yet supported.")
- elif (type(x) == RDD):
- raise RuntimeError("Bulk predict not yet supported.")
- else:
- raise TypeError("Argument of type " + type(x).__name__ + " unsupported")
def _get_unmangled_rdd(data, serializer):
dataBytes = data.map(serializer)
dataBytes._bypass_serializer = True
- dataBytes.cache()
+ dataBytes.cache() # TODO: users should unpersist() this later!
return dataBytes
-# Map a pickled Python RDD of numpy double vectors to a Java RDD of
+
+# Map a pickled Python RDD of Python dense or sparse vectors to a Java RDD of
# _serialized_double_vectors
def _get_unmangled_double_vector_rdd(data):
return _get_unmangled_rdd(data, _serialize_double_vector)
-class LinearModel(object):
- """Something that has a vector of coefficients and an intercept."""
- def __init__(self, coeff, intercept):
- self._coeff = coeff
- self._intercept = intercept
-class LinearRegressionModelBase(LinearModel):
- """A linear regression model.
+# Map a pickled Python RDD of LabeledPoint to a Java RDD of _serialized_labeled_points
+def _get_unmangled_labeled_point_rdd(data):
+ return _get_unmangled_rdd(data, _serialize_labeled_point)
- >>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1)
- >>> abs(lrmb.predict(array([-1.03, 7.777])) - 14.624) < 1e-6
- True
+
+# Common functions for dealing with and training linear models
+
+def _linear_predictor_typecheck(x, coeffs):
"""
- def predict(self, x):
- """Predict the value of the dependent variable given a vector x"""
- """containing values for the independent variables."""
- _linear_predictor_typecheck(x, self._coeff)
- return dot(self._coeff, x) + self._intercept
+ Check that x is a one-dimensional vector of the right shape.
+ This is a temporary hackaround until we actually implement bulk predict.
+ """
+ x = _convert_vector(x)
+ if type(x) == ndarray:
+ if x.ndim == 1:
+ if x.shape != coeffs.shape:
+ raise RuntimeError("Got array of %d elements; wanted %d"
+ % (numpy.shape(x)[0], coeffs.shape[0]))
+ else:
+ raise RuntimeError("Bulk predict not yet supported.")
+ elif type(x) == SparseVector:
+ if x.size != coeffs.shape[0]:
+ raise RuntimeError("Got sparse vector of size %d; wanted %d"
+ % (x.size, coeffs.shape[0]))
+ elif (type(x) == RDD):
+ raise RuntimeError("Bulk predict not yet supported.")
+ else:
+ raise TypeError("Argument of type " + type(x).__name__ + " unsupported")
+
# If we weren't given initial weights, take a zero vector of the appropriate
# length.
def _get_initial_weights(initial_weights, data):
if initial_weights is None:
- initial_weights = data.first()
- if type(initial_weights) != ndarray:
- raise TypeError("At least one data element has type "
- + type(initial_weights).__name__ + " which is not ndarray")
- if initial_weights.ndim != 1:
- raise TypeError("At least one data element has "
- + initial_weights.ndim + " dimensions, which is not 1")
- initial_weights = ones([initial_weights.shape[0] - 1])
+ initial_weights = _convert_vector(data.first().features)
+ if type(initial_weights) == ndarray:
+ if initial_weights.ndim != 1:
+ raise TypeError("At least one data element has "
+ + initial_weights.ndim + " dimensions, which is not 1")
+ initial_weights = numpy.zeros([initial_weights.shape[0]])
+ elif type(initial_weights) == SparseVector:
+ initial_weights = numpy.zeros([initial_weights.size])
return initial_weights
+
# train_func should take two parameters, namely data and initial_weights, and
# return the result of a call to the appropriate JVM stub.
# _regression_train_wrapper is responsible for setup and error checking.
def _regression_train_wrapper(sc, train_func, klass, data, initial_weights):
initial_weights = _get_initial_weights(initial_weights, data)
- dataBytes = _get_unmangled_double_vector_rdd(data)
+ dataBytes = _get_unmangled_labeled_point_rdd(data)
ans = train_func(dataBytes, _serialize_double_vector(initial_weights))
if len(ans) != 2:
raise RuntimeError("JVM call result had unexpected length")
@@ -220,6 +339,9 @@ def _regression_train_wrapper(sc, train_func, klass, data, initial_weights):
+ type(ans[0]).__name__ + " which is not float")
return klass(_deserialize_double_vector(ans[0]), ans[1])
+
+# Functions for serializing ALS Rating objects and tuples
+
def _serialize_rating(r):
ba = bytearray(16)
intpart = ndarray(shape=[2], buffer=ba, dtype=int32)
@@ -227,11 +349,12 @@ def _serialize_rating(r):
intpart[0], intpart[1], doublepart[0] = r
return ba
+
class RatingDeserializer(Serializer):
def loads(self, stream):
length = struct.unpack("!i", stream.read(4))[0]
ba = stream.read(length)
- res = ndarray(shape=(3, ), buffer=ba, dtype="float64", offset=4)
+ res = ndarray(shape=(3, ), buffer=ba, dtype=float64, offset=4)
return int(res[0]), int(res[1]), res[2]
def load_stream(self, stream):
@@ -243,12 +366,86 @@ class RatingDeserializer(Serializer):
except EOFError:
return
+
def _serialize_tuple(t):
ba = bytearray(8)
intpart = ndarray(shape=[2], buffer=ba, dtype=int32)
intpart[0], intpart[1] = t
return ba
+
+# Vector math functions that support all of our vector types
+
+def _convert_vector(vec):
+ """
+ Convert a vector to a format we support internally. This does
+ the following:
+
+ * For dense NumPy vectors (ndarray), returns them as is
+ * For our SparseVector class, returns that as is
+ * For Python lists, converts them to NumPy vectors
+ * For scipy.sparse.*_matrix column vectors, converts them to
+ our own SparseVector type.
+
+ This should be called before passing any data to our algorithms
+ or attempting to serialize it to Java.
+ """
+ if type(vec) == ndarray or type(vec) == SparseVector:
+ return vec
+ elif type(vec) == list:
+ return array(vec, dtype=float64)
+ elif _have_scipy:
+ if _scipy_issparse(vec):
+ assert vec.shape[1] == 1, "Expected column vector"
+ csc = vec.tocsc()
+ return SparseVector(vec.shape[0], csc.indices, csc.data)
+ raise TypeError("Expected NumPy array, SparseVector, or scipy.sparse matrix")
+
+
+def _squared_distance(v1, v2):
+ """
+ Squared distance of two NumPy or sparse vectors.
+
+ >>> dense1 = array([1., 2.])
+ >>> sparse1 = SparseVector(2, [0, 1], [1., 2.])
+ >>> dense2 = array([2., 1.])
+ >>> sparse2 = SparseVector(2, [0, 1], [2., 1.])
+ >>> _squared_distance(dense1, dense2)
+ 2.0
+ >>> _squared_distance(dense1, sparse2)
+ 2.0
+ >>> _squared_distance(sparse1, dense2)
+ 2.0
+ >>> _squared_distance(sparse1, sparse2)
+ 2.0
+ """
+ v1 = _convert_vector(v1)
+ v2 = _convert_vector(v2)
+ if type(v1) == ndarray and type(v2) == ndarray:
+ diff = v1 - v2
+ return diff.dot(diff)
+ elif type(v1) == ndarray:
+ return v2.squared_distance(v1)
+ else:
+ return v1.squared_distance(v2)
+
+
+def _dot(vec, target):
+ """
+ Compute the dot product of a vector of the types we support
+ (Numpy array, list, SparseVector, or SciPy sparse) and a target
+ NumPy array that is either 1- or 2-dimensional. Equivalent to
+ calling numpy.dot of the two vectors, but for SciPy ones, we
+ have to transpose them because they're column vectors.
+ """
+ if type(vec) == ndarray or type(vec) == SparseVector:
+ return vec.dot(target)
+ elif type(vec) == list:
+ return _convert_vector(vec).dot(target)
+ else:
+ return vec.transpose().dot(target)[0]
+
+
def _test():
import doctest
globs = globals().copy()
@@ -259,5 +456,6 @@ def _test():
if failure_count:
exit(-1)
+
if __name__ == "__main__":
_test()
diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py
index d2f9cdb3f4..3a23e0801f 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -17,30 +17,55 @@
import numpy
-from numpy import array, dot, shape
+from numpy import array, shape
from pyspark import SparkContext
from pyspark.mllib._common import \
- _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
+ _dot, _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
_serialize_double_matrix, _deserialize_double_matrix, \
_serialize_double_vector, _deserialize_double_vector, \
_get_initial_weights, _serialize_rating, _regression_train_wrapper, \
- LinearModel, _linear_predictor_typecheck
+ _linear_predictor_typecheck, _get_unmangled_labeled_point_rdd
+from pyspark.mllib.linalg import SparseVector
+from pyspark.mllib.regression import LabeledPoint, LinearModel
from math import exp, log
class LogisticRegressionModel(LinearModel):
"""A linear binary classification model derived from logistic regression.
- >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2)
+ >>> data = [
+ ... LabeledPoint(0.0, [0.0]),
+ ... LabeledPoint(1.0, [1.0]),
+ ... LabeledPoint(1.0, [2.0]),
+ ... LabeledPoint(1.0, [3.0])
+ ... ]
>>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(data))
>>> lrm.predict(array([1.0])) > 0
True
+ >>> lrm.predict(array([0.0])) <= 0
+ True
+ >>> sparse_data = [
+ ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
+ ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
+ ... ]
+ >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(sparse_data))
+ >>> lrm.predict(array([0.0, 1.0])) > 0
+ True
+ >>> lrm.predict(array([0.0, 0.0])) <= 0
+ True
+ >>> lrm.predict(SparseVector(2, {1: 1.0})) > 0
+ True
+ >>> lrm.predict(SparseVector(2, {1: 0.0})) <= 0
+ True
"""
def predict(self, x):
_linear_predictor_typecheck(x, self._coeff)
- margin = dot(x, self._coeff) + self._intercept
+ margin = _dot(x, self._coeff) + self._intercept
prob = 1/(1 + exp(-margin))
return 1 if prob > 0.5 else 0
+
class LogisticRegressionWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0,
@@ -55,14 +80,30 @@ class LogisticRegressionWithSGD(object):
class SVMModel(LinearModel):
"""A support vector machine.
- >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2)
+ >>> data = [
+ ... LabeledPoint(0.0, [0.0]),
+ ... LabeledPoint(1.0, [1.0]),
+ ... LabeledPoint(1.0, [2.0]),
+ ... LabeledPoint(1.0, [3.0])
+ ... ]
>>> svm = SVMWithSGD.train(sc.parallelize(data))
>>> svm.predict(array([1.0])) > 0
True
+ >>> sparse_data = [
+ ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
+ ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
+ ... ]
+ >>> svm = SVMWithSGD.train(sc.parallelize(sparse_data))
+ >>> svm.predict(SparseVector(2, {1: 1.0})) > 0
+ True
+ >>> svm.predict(SparseVector(2, {1: 0.0})) <= 0
+ True
"""
def predict(self, x):
_linear_predictor_typecheck(x, self._coeff)
- margin = dot(x, self._coeff) + self._intercept
+ margin = _dot(x, self._coeff) + self._intercept
return 1 if margin >= 0 else 0
class SVMWithSGD(object):
@@ -84,12 +125,26 @@ class NaiveBayesModel(object):
- pi: vector of logs of class priors (dimension C)
- theta: matrix of logs of class conditional probabilities (CxD)
- >>> data = array([0.0, 0.0, 1.0, 0.0, 0.0, 2.0, 1.0, 1.0, 0.0]).reshape(3,3)
+ >>> data = [
+ ... LabeledPoint(0.0, [0.0, 0.0]),
+ ... LabeledPoint(0.0, [0.0, 1.0]),
+ ... LabeledPoint(1.0, [1.0, 0.0]),
+ ... ]
>>> model = NaiveBayes.train(sc.parallelize(data))
>>> model.predict(array([0.0, 1.0]))
0.0
>>> model.predict(array([1.0, 0.0]))
1.0
+ >>> sparse_data = [
+ ... LabeledPoint(0.0, SparseVector(2, {1: 0.0})),
+ ... LabeledPoint(0.0, SparseVector(2, {1: 1.0})),
+ ... LabeledPoint(1.0, SparseVector(2, {0: 1.0}))
+ ... ]
+ >>> model = NaiveBayes.train(sc.parallelize(sparse_data))
+ >>> model.predict(SparseVector(2, {1: 1.0}))
+ 0.0
+ >>> model.predict(SparseVector(2, {0: 1.0}))
+ 1.0
"""
def __init__(self, labels, pi, theta):
@@ -99,7 +154,7 @@ class NaiveBayesModel(object):
def predict(self, x):
"""Return the most likely class for a data vector x"""
- return self.labels[numpy.argmax(self.pi + dot(x, self.theta))]
+ return self.labels[numpy.argmax(self.pi + _dot(x, self.theta))]
class NaiveBayes(object):
@classmethod
@@ -119,7 +174,7 @@ class NaiveBayes(object):
@param lambda_: The smoothing parameter
"""
sc = data.context
- dataBytes = _get_unmangled_double_vector_rdd(data)
+ dataBytes = _get_unmangled_labeled_point_rdd(data)
ans = sc._jvm.PythonMLLibAPI().trainNaiveBayes(dataBytes._jrdd, lambda_)
return NaiveBayesModel(
_deserialize_double_vector(ans[0]),
diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py
index 30862918c3..f65088c917 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -19,37 +19,61 @@ from numpy import array, dot
from math import sqrt
from pyspark import SparkContext
from pyspark.mllib._common import \
- _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
+ _get_unmangled_rdd, _get_unmangled_double_vector_rdd, _squared_distance, \
_serialize_double_matrix, _deserialize_double_matrix, \
_serialize_double_vector, _deserialize_double_vector, \
_get_initial_weights, _serialize_rating, _regression_train_wrapper
+from pyspark.mllib.linalg import SparseVector
+
class KMeansModel(object):
"""A clustering model derived from the k-means method.
>>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2)
- >>> clusters = KMeans.train(sc.parallelize(data), 2, maxIterations=10, runs=30, initializationMode="random")
- >>> clusters.predict(array([0.0, 0.0])) == clusters.predict(array([1.0, 1.0]))
+ >>> model = KMeans.train(sc.parallelize(data), 2, maxIterations=10, runs=30, initializationMode="random")
+ >>> model.predict(array([0.0, 0.0])) == model.predict(array([1.0, 1.0]))
+ True
+ >>> model.predict(array([8.0, 9.0])) == model.predict(array([9.0, 8.0]))
+ True
+ >>> model = KMeans.train(sc.parallelize(data), 2)
+ >>> sparse_data = [
+ ... SparseVector(3, {1: 1.0}),
+ ... SparseVector(3, {1: 1.1}),
+ ... SparseVector(3, {2: 1.0}),
+ ... SparseVector(3, {2: 1.1})
+ ... ]
+ >>> model = KMeans.train(sc.parallelize(sparse_data), 2, initializationMode="k-means||")
+ >>> model.predict(array([0., 1., 0.])) == model.predict(array([0, 1.1, 0.]))
+ True
+ >>> model.predict(array([0., 0., 1.])) == model.predict(array([0, 0, 1.1]))
+ True
+ >>> model.predict(sparse_data[0]) == model.predict(sparse_data[1])
True
- >>> clusters.predict(array([8.0, 9.0])) == clusters.predict(array([9.0, 8.0]))
+ >>> model.predict(sparse_data[2]) == model.predict(sparse_data[3])
True
- >>> clusters = KMeans.train(sc.parallelize(data), 2)
+ >>> type(model.clusterCenters)
+ <type 'list'>
"""
- def __init__(self, centers_):
- self.centers = centers_
+ def __init__(self, centers):
+ self.centers = centers
+
+ @property
+ def clusterCenters(self):
+ """Get the cluster centers, represented as a list of NumPy arrays."""
+ return self.centers
def predict(self, x):
"""Find the cluster to which x belongs in this model."""
best = 0
- best_distance = 1e75
- for i in range(0, self.centers.shape[0]):
- diff = x - self.centers[i]
- distance = sqrt(dot(diff, diff))
+ best_distance = float("inf")
+ for i in range(0, len(self.centers)):
+ distance = _squared_distance(x, self.centers[i])
if distance < best_distance:
best = i
best_distance = distance
return best
+
class KMeans(object):
@classmethod
def train(cls, data, k, maxIterations=100, runs=1,
@@ -64,7 +88,9 @@ class KMeans(object):
elif type(ans[0]) != bytearray:
raise RuntimeError("JVM call result had first element of type "
+ type(ans[0]) + " which is not bytearray")
- return KMeansModel(_deserialize_double_matrix(ans[0]))
+ matrix = _deserialize_double_matrix(ans[0])
+ return KMeansModel([row for row in matrix])
+
def _test():
import doctest
@@ -76,5 +102,6 @@ def _test():
if failure_count:
exit(-1)
+
if __name__ == "__main__":
_test()
diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py
new file mode 100644
index 0000000000..0aa3a51de7
--- /dev/null
+++ b/python/pyspark/mllib/linalg.py
@@ -0,0 +1,245 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+MLlib utilities for linear algebra. For dense vectors, MLlib
+uses the NumPy C{array} type, so you can simply pass NumPy arrays
+around. For sparse vectors, users can construct a L{SparseVector}
+object from MLlib or pass SciPy C{scipy.sparse} column vectors if
+SciPy is available in their environment.
+"""
+
+from numpy import array, array_equal, ndarray, float64, int32
+
+
+class SparseVector(object):
+ """
+ A simple sparse vector class for passing data to MLlib. Users may
+ alternatively pass SciPy's {scipy.sparse} data types.
+ """
+
+ def __init__(self, size, *args):
+ """
+ Create a sparse vector, using either a dictionary, a list of
+ (index, value) pairs, or two separate arrays of indices and
+ values (sorted by index).
+
+ @param size: Size of the vector.
+ @param args: Non-zero entries, as a dictionary, list of tupes,
+ or two sorted lists containing indices and values.
+
+ >>> print SparseVector(4, {1: 1.0, 3: 5.5})
+ [1: 1.0, 3: 5.5]
+ >>> print SparseVector(4, [(1, 1.0), (3, 5.5)])
+ [1: 1.0, 3: 5.5]
+ >>> print SparseVector(4, [1, 3], [1.0, 5.5])
+ [1: 1.0, 3: 5.5]
+ """
+ assert type(size) == int, "first argument must be an int"
+ self.size = size
+ assert 1 <= len(args) <= 2, "must pass either 2 or 3 arguments"
+ if len(args) == 1:
+ pairs = args[0]
+ if type(pairs) == dict:
+ pairs = pairs.items()
+ pairs = sorted(pairs)
+ self.indices = array([p[0] for p in pairs], dtype=int32)
+ self.values = array([p[1] for p in pairs], dtype=float64)
+ else:
+ assert len(args[0]) == len(args[1]), "index and value arrays not same length"
+ self.indices = array(args[0], dtype=int32)
+ self.values = array(args[1], dtype=float64)
+ for i in xrange(len(self.indices) - 1):
+ if self.indices[i] >= self.indices[i + 1]:
+ raise TypeError("indices array must be sorted")
+
+ def dot(self, other):
+ """
+ Dot product with a SparseVector or 1- or 2-dimensional Numpy array.
+
+ >>> a = SparseVector(4, [1, 3], [3.0, 4.0])
+ >>> a.dot(a)
+ 25.0
+ >>> a.dot(array([1., 2., 3., 4.]))
+ 22.0
+ >>> b = SparseVector(4, [2, 4], [1.0, 2.0])
+ >>> a.dot(b)
+ 0.0
+ >>> a.dot(array([[1, 1], [2, 2], [3, 3], [4, 4]]))
+ array([ 22., 22.])
+ """
+ if type(other) == ndarray:
+ if other.ndim == 1:
+ result = 0.0
+ for i in xrange(len(self.indices)):
+ result += self.values[i] * other[self.indices[i]]
+ return result
+ elif other.ndim == 2:
+ results = [self.dot(other[:,i]) for i in xrange(other.shape[1])]
+ return array(results)
+ else:
+ raise Exception("Cannot call dot with %d-dimensional array" % other.ndim)
+ else:
+ result = 0.0
+ i, j = 0, 0
+ while i < len(self.indices) and j < len(other.indices):
+ if self.indices[i] == other.indices[j]:
+ result += self.values[i] * other.values[j]
+ i += 1
+ j += 1
+ elif self.indices[i] < other.indices[j]:
+ i += 1
+ else:
+ j += 1
+ return result
+
+ def squared_distance(self, other):
+ """
+ Squared distance from a SparseVector or 1-dimensional NumPy array.
+
+ >>> a = SparseVector(4, [1, 3], [3.0, 4.0])
+ >>> a.squared_distance(a)
+ 0.0
+ >>> a.squared_distance(array([1., 2., 3., 4.]))
+ 11.0
+ >>> b = SparseVector(4, [2, 4], [1.0, 2.0])
+ >>> a.squared_distance(b)
+ 30.0
+ >>> b.squared_distance(a)
+ 30.0
+ """
+ if type(other) == ndarray:
+ if other.ndim == 1:
+ result = 0.0
+ j = 0 # index into our own array
+ for i in xrange(other.shape[0]):
+ if j < len(self.indices) and self.indices[j] == i:
+ diff = self.values[j] - other[i]
+ result += diff * diff
+ j += 1
+ else:
+ result += other[i] * other[i]
+ return result
+ else:
+ raise Exception("Cannot call squared_distance with %d-dimensional array" %
+ other.ndim)
+ else:
+ result = 0.0
+ i, j = 0, 0
+ while i < len(self.indices) and j < len(other.indices):
+ if self.indices[i] == other.indices[j]:
+ diff = self.values[i] - other.values[j]
+ result += diff * diff
+ i += 1
+ j += 1
+ elif self.indices[i] < other.indices[j]:
+ result += self.values[i] * self.values[i]
+ i += 1
+ else:
+ result += other.values[j] * other.values[j]
+ j += 1
+ while i < len(self.indices):
+ result += self.values[i] * self.values[i]
+ i += 1
+ while j < len(other.indices):
+ result += other.values[j] * other.values[j]
+ j += 1
+ return result
+
+ def __str__(self):
+ inds = self.indices
+ vals = self.values
+ entries = ", ".join(["{0}: {1}".format(inds[i], vals[i]) for i in xrange(len(inds))])
+ return "[" + entries + "]"
+
+ def __repr__(self):
+ inds = self.indices
+ vals = self.values
+ entries = ", ".join(["{0}: {1}".format(inds[i], vals[i]) for i in xrange(len(inds))])
+ return "SparseVector({0}, {{{1}}})".format(self.size, entries)
+
+ def __eq__(self, other):
+ """
+ Test SparseVectors for equality.
+
+ >>> v1 = SparseVector(4, [(1, 1.0), (3, 5.5)])
+ >>> v2 = SparseVector(4, [(1, 1.0), (3, 5.5)])
+ >>> v1 == v2
+ True
+ >>> v1 != v2
+ False
+ """
+
+ return (isinstance(other, self.__class__)
+ and other.size == self.size
+ and array_equal(other.indices, self.indices)
+ and array_equal(other.values, self.values))
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+
+
+class Vectors(object):
+ """
+ Factory methods for working with vectors. Note that dense vectors
+ are simply represented as NumPy array objects, so there is no need
+ to covert them for use in MLlib. For sparse vectors, the factory
+ methods in this class create an MLlib-compatible type, or users
+ can pass in SciPy's C{scipy.sparse} column vectors.
+ """
+
+ @staticmethod
+ def sparse(size, *args):
+ """
+ Create a sparse vector, using either a dictionary, a list of
+ (index, value) pairs, or two separate arrays of indices and
+ values (sorted by index).
+
+ @param size: Size of the vector.
+ @param args: Non-zero entries, as a dictionary, list of tupes,
+ or two sorted lists containing indices and values.
+
+ >>> print Vectors.sparse(4, {1: 1.0, 3: 5.5})
+ [1: 1.0, 3: 5.5]
+ >>> print Vectors.sparse(4, [(1, 1.0), (3, 5.5)])
+ [1: 1.0, 3: 5.5]
+ >>> print Vectors.sparse(4, [1, 3], [1.0, 5.5])
+ [1: 1.0, 3: 5.5]
+ """
+ return SparseVector(size, *args)
+
+ @staticmethod
+ def dense(elements):
+ """
+ Create a dense vector of 64-bit floats from a Python list. Always
+ returns a NumPy array.
+
+ >>> Vectors.dense([1, 2, 3])
+ array([ 1., 2., 3.])
+ """
+ return array(elements, dtype=float64)
+
+
+def _test():
+ import doctest
+ (failure_count, test_count) = doctest.testmod(optionflags=doctest.ELLIPSIS)
+ if failure_count:
+ exit(-1)
+
+if __name__ == "__main__":
+ _test()
diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py
index 7656db07f6..266b31d3fa 100644
--- a/python/pyspark/mllib/regression.py
+++ b/python/pyspark/mllib/regression.py
@@ -15,41 +15,98 @@
# limitations under the License.
#
-from numpy import array, dot
+from numpy import array, ndarray
from pyspark import SparkContext
from pyspark.mllib._common import \
- _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
+ _dot, _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
_serialize_double_matrix, _deserialize_double_matrix, \
_serialize_double_vector, _deserialize_double_vector, \
_get_initial_weights, _serialize_rating, _regression_train_wrapper, \
- _linear_predictor_typecheck
+ _linear_predictor_typecheck, _have_scipy, _scipy_issparse
+from pyspark.mllib.linalg import SparseVector
+
+
+class LabeledPoint(object):
+ """
+ The features and labels of a data point.
+
+ @param label: Label for this data point.
+ @param features: Vector of features for this point (NumPy array, list,
+ pyspark.mllib.linalg.SparseVector, or scipy.sparse column matrix)
+ """
+ def __init__(self, label, features):
+ self.label = label
+ if (type(features) == ndarray or type(features) == SparseVector
+ or (_have_scipy and _scipy_issparse(features))):
+ self.features = features
+ elif type(features) == list:
+ self.features = array(features)
+ else:
+ raise TypeError("Expected NumPy array, list, SparseVector, or scipy.sparse matrix")
+
class LinearModel(object):
- """Something that has a vector of coefficients and an intercept."""
- def __init__(self, coeff, intercept):
- self._coeff = coeff
+ """A linear model that has a vector of coefficients and an intercept."""
+ def __init__(self, weights, intercept):
+ self._coeff = weights
self._intercept = intercept
+ @property
+ def weights(self):
+ return self._coeff
+
+ @property
+ def intercept(self):
+ return self._intercept
+
+
class LinearRegressionModelBase(LinearModel):
"""A linear regression model.
>>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1)
>>> abs(lrmb.predict(array([-1.03, 7.777])) - 14.624) < 1e-6
True
+ >>> abs(lrmb.predict(SparseVector(2, {0: -1.03, 1: 7.777})) - 14.624) < 1e-6
+ True
"""
def predict(self, x):
"""Predict the value of the dependent variable given a vector x"""
"""containing values for the independent variables."""
_linear_predictor_typecheck(x, self._coeff)
- return dot(self._coeff, x) + self._intercept
+ return _dot(x, self._coeff) + self._intercept
+
class LinearRegressionModel(LinearRegressionModelBase):
"""A linear regression model derived from a least-squares fit.
- >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
+ >>> from pyspark.mllib.regression import LabeledPoint
+ >>> data = [
+ ... LabeledPoint(0.0, [0.0]),
+ ... LabeledPoint(1.0, [1.0]),
+ ... LabeledPoint(3.0, [2.0]),
+ ... LabeledPoint(2.0, [3.0])
+ ... ]
>>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ True
+ >>> abs(lrm.predict(array([1.0])) - 1) < 0.5
+ True
+ >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
+ True
+ >>> data = [
+ ... LabeledPoint(0.0, SparseVector(1, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(1, {0: 1.0})),
+ ... LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
+ ... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
+ ... ]
+ >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ True
+ >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
+ True
"""
+
class LinearRegressionWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0,
@@ -61,14 +118,39 @@ class LinearRegressionWithSGD(object):
d._jrdd, iterations, step, miniBatchFraction, i),
LinearRegressionModel, data, initialWeights)
+
class LassoModel(LinearRegressionModelBase):
"""A linear regression model derived from a least-squares fit with an
l_1 penalty term.
- >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
+ >>> from pyspark.mllib.regression import LabeledPoint
+ >>> data = [
+ ... LabeledPoint(0.0, [0.0]),
+ ... LabeledPoint(1.0, [1.0]),
+ ... LabeledPoint(3.0, [2.0]),
+ ... LabeledPoint(2.0, [3.0])
+ ... ]
>>> lrm = LassoWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ True
+ >>> abs(lrm.predict(array([1.0])) - 1) < 0.5
+ True
+ >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
+ True
+ >>> data = [
+ ... LabeledPoint(0.0, SparseVector(1, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(1, {0: 1.0})),
+ ... LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
+ ... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
+ ... ]
+ >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ True
+ >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
+ True
"""
+
class LassoWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0, regParam=1.0,
@@ -80,14 +162,39 @@ class LassoWithSGD(object):
iterations, step, regParam, miniBatchFraction, i),
LassoModel, data, initialWeights)
+
class RidgeRegressionModel(LinearRegressionModelBase):
"""A linear regression model derived from a least-squares fit with an
l_2 penalty term.
- >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
+ >>> from pyspark.mllib.regression import LabeledPoint
+ >>> data = [
+ ... LabeledPoint(0.0, [0.0]),
+ ... LabeledPoint(1.0, [1.0]),
+ ... LabeledPoint(3.0, [2.0]),
+ ... LabeledPoint(2.0, [3.0])
+ ... ]
>>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ True
+ >>> abs(lrm.predict(array([1.0])) - 1) < 0.5
+ True
+ >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
+ True
+ >>> data = [
+ ... LabeledPoint(0.0, SparseVector(1, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(1, {0: 1.0})),
+ ... LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
+ ... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
+ ... ]
+ >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ True
+ >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
+ True
"""
+
class RidgeRegressionWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0, regParam=1.0,
@@ -99,6 +206,7 @@ class RidgeRegressionWithSGD(object):
iterations, step, regParam, miniBatchFraction, i),
RidgeRegressionModel, data, initialWeights)
+
def _test():
import doctest
globs = globals().copy()
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
new file mode 100644
index 0000000000..d4771d779f
--- /dev/null
+++ b/python/pyspark/mllib/tests.py
@@ -0,0 +1,302 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Fuller unit tests for Python MLlib.
+"""
+
+from numpy import array, array_equal
+import unittest
+
+from pyspark.mllib._common import _convert_vector, _serialize_double_vector, \
+ _deserialize_double_vector, _dot, _squared_distance
+from pyspark.mllib.linalg import SparseVector
+from pyspark.mllib.regression import LabeledPoint
+from pyspark.tests import PySparkTestCase
+
+
+_have_scipy = False
+try:
+ import scipy.sparse
+ _have_scipy = True
+except:
+ # No SciPy, but that's okay, we'll skip those tests
+ pass
+
+
+class VectorTests(unittest.TestCase):
+ def test_serialize(self):
+ sv = SparseVector(4, {1: 1, 3: 2})
+ dv = array([1., 2., 3., 4.])
+ lst = [1, 2, 3, 4]
+ self.assertTrue(sv is _convert_vector(sv))
+ self.assertTrue(dv is _convert_vector(dv))
+ self.assertTrue(array_equal(dv, _convert_vector(lst)))
+ self.assertEquals(sv,
+ _deserialize_double_vector(_serialize_double_vector(sv)))
+ self.assertTrue(array_equal(dv,
+ _deserialize_double_vector(_serialize_double_vector(dv))))
+ self.assertTrue(array_equal(dv,
+ _deserialize_double_vector(_serialize_double_vector(lst))))
+
+ def test_dot(self):
+ sv = SparseVector(4, {1: 1, 3: 2})
+ dv = array([1., 2., 3., 4.])
+ lst = [1, 2, 3, 4]
+ mat = array([[1., 2., 3., 4.],
+ [1., 2., 3., 4.],
+ [1., 2., 3., 4.],
+ [1., 2., 3., 4.]])
+ self.assertEquals(10.0, _dot(sv, dv))
+ self.assertTrue(array_equal(array([3., 6., 9., 12.]), _dot(sv, mat)))
+ self.assertEquals(30.0, _dot(dv, dv))
+ self.assertTrue(array_equal(array([10., 20., 30., 40.]), _dot(dv, mat)))
+ self.assertEquals(30.0, _dot(lst, dv))
+ self.assertTrue(array_equal(array([10., 20., 30., 40.]), _dot(lst, mat)))
+
+ def test_squared_distance(self):
+ sv = SparseVector(4, {1: 1, 3: 2})
+ dv = array([1., 2., 3., 4.])
+ lst = [4, 3, 2, 1]
+ self.assertEquals(15.0, _squared_distance(sv, dv))
+ self.assertEquals(25.0, _squared_distance(sv, lst))
+ self.assertEquals(20.0, _squared_distance(dv, lst))
+ self.assertEquals(15.0, _squared_distance(dv, sv))
+ self.assertEquals(25.0, _squared_distance(lst, sv))
+ self.assertEquals(20.0, _squared_distance(lst, dv))
+ self.assertEquals(0.0, _squared_distance(sv, sv))
+ self.assertEquals(0.0, _squared_distance(dv, dv))
+ self.assertEquals(0.0, _squared_distance(lst, lst))
+
+
+class ListTests(PySparkTestCase):
+ """
+ Test MLlib algorithms on plain lists, to make sure they're passed through
+ as NumPy arrays.
+ """
+
+ def test_clustering(self):
+ from pyspark.mllib.clustering import KMeans
+ data = [
+ [0, 1.1],
+ [0, 1.2],
+ [1.1, 0],
+ [1.2, 0],
+ ]
+ clusters = KMeans.train(self.sc.parallelize(data), 2, initializationMode="k-means||")
+ self.assertEquals(clusters.predict(data[0]), clusters.predict(data[1]))
+ self.assertEquals(clusters.predict(data[2]), clusters.predict(data[3]))
+
+ def test_classification(self):
+ from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD, NaiveBayes
+ data = [
+ LabeledPoint(0.0, [1, 0]),
+ LabeledPoint(1.0, [0, 1]),
+ LabeledPoint(0.0, [2, 0]),
+ LabeledPoint(1.0, [0, 2])
+ ]
+ rdd = self.sc.parallelize(data)
+ features = [p.features.tolist() for p in data]
+
+ lr_model = LogisticRegressionWithSGD.train(rdd)
+ self.assertTrue(lr_model.predict(features[0]) <= 0)
+ self.assertTrue(lr_model.predict(features[1]) > 0)
+ self.assertTrue(lr_model.predict(features[2]) <= 0)
+ self.assertTrue(lr_model.predict(features[3]) > 0)
+
+ svm_model = SVMWithSGD.train(rdd)
+ self.assertTrue(svm_model.predict(features[0]) <= 0)
+ self.assertTrue(svm_model.predict(features[1]) > 0)
+ self.assertTrue(svm_model.predict(features[2]) <= 0)
+ self.assertTrue(svm_model.predict(features[3]) > 0)
+
+ nb_model = NaiveBayes.train(rdd)
+ self.assertTrue(nb_model.predict(features[0]) <= 0)
+ self.assertTrue(nb_model.predict(features[1]) > 0)
+ self.assertTrue(nb_model.predict(features[2]) <= 0)
+ self.assertTrue(nb_model.predict(features[3]) > 0)
+
+ def test_regression(self):
+ from pyspark.mllib.regression import LinearRegressionWithSGD, LassoWithSGD, \
+ RidgeRegressionWithSGD
+ data = [
+ LabeledPoint(-1.0, [0, -1]),
+ LabeledPoint(1.0, [0, 1]),
+ LabeledPoint(-1.0, [0, -2]),
+ LabeledPoint(1.0, [0, 2])
+ ]
+ rdd = self.sc.parallelize(data)
+ features = [p.features.tolist() for p in data]
+
+ lr_model = LinearRegressionWithSGD.train(rdd)
+ self.assertTrue(lr_model.predict(features[0]) <= 0)
+ self.assertTrue(lr_model.predict(features[1]) > 0)
+ self.assertTrue(lr_model.predict(features[2]) <= 0)
+ self.assertTrue(lr_model.predict(features[3]) > 0)
+
+ lasso_model = LassoWithSGD.train(rdd)
+ self.assertTrue(lasso_model.predict(features[0]) <= 0)
+ self.assertTrue(lasso_model.predict(features[1]) > 0)
+ self.assertTrue(lasso_model.predict(features[2]) <= 0)
+ self.assertTrue(lasso_model.predict(features[3]) > 0)
+
+ rr_model = RidgeRegressionWithSGD.train(rdd)
+ self.assertTrue(rr_model.predict(features[0]) <= 0)
+ self.assertTrue(rr_model.predict(features[1]) > 0)
+ self.assertTrue(rr_model.predict(features[2]) <= 0)
+ self.assertTrue(rr_model.predict(features[3]) > 0)
+
+
+@unittest.skipIf(not _have_scipy, "SciPy not installed")
+class SciPyTests(PySparkTestCase):
+ """
+ Test both vector operations and MLlib algorithms with SciPy sparse matrices,
+ if SciPy is available.
+ """
+
+ def test_serialize(self):
+ from scipy.sparse import lil_matrix
+ lil = lil_matrix((4, 1))
+ lil[1, 0] = 1
+ lil[3, 0] = 2
+ sv = SparseVector(4, {1: 1, 3: 2})
+ self.assertEquals(sv, _convert_vector(lil))
+ self.assertEquals(sv, _convert_vector(lil.tocsc()))
+ self.assertEquals(sv, _convert_vector(lil.tocoo()))
+ self.assertEquals(sv, _convert_vector(lil.tocsr()))
+ self.assertEquals(sv, _convert_vector(lil.todok()))
+ self.assertEquals(sv,
+ _deserialize_double_vector(_serialize_double_vector(lil)))
+ self.assertEquals(sv,
+ _deserialize_double_vector(_serialize_double_vector(lil.tocsc())))
+ self.assertEquals(sv,
+ _deserialize_double_vector(_serialize_double_vector(lil.tocsr())))
+ self.assertEquals(sv,
+ _deserialize_double_vector(_serialize_double_vector(lil.todok())))
+
+ def test_dot(self):
+ from scipy.sparse import lil_matrix
+ lil = lil_matrix((4, 1))
+ lil[1, 0] = 1
+ lil[3, 0] = 2
+ dv = array([1., 2., 3., 4.])
+ sv = SparseVector(4, {0: 1, 1: 2, 2: 3, 3: 4})
+ mat = array([[1., 2., 3., 4.],
+ [1., 2., 3., 4.],
+ [1., 2., 3., 4.],
+ [1., 2., 3., 4.]])
+ self.assertEquals(10.0, _dot(lil, dv))
+ self.assertTrue(array_equal(array([3., 6., 9., 12.]), _dot(lil, mat)))
+
+ def test_squared_distance(self):
+ from scipy.sparse import lil_matrix
+ lil = lil_matrix((4, 1))
+ lil[1, 0] = 3
+ lil[3, 0] = 2
+ dv = array([1., 2., 3., 4.])
+ sv = SparseVector(4, {0: 1, 1: 2, 2: 3, 3: 4})
+ self.assertEquals(15.0, _squared_distance(lil, dv))
+ self.assertEquals(15.0, _squared_distance(lil, sv))
+ self.assertEquals(15.0, _squared_distance(dv, lil))
+ self.assertEquals(15.0, _squared_distance(sv, lil))
+
+ def scipy_matrix(self, size, values):
+ """Create a column SciPy matrix from a dictionary of values"""
+ from scipy.sparse import lil_matrix
+ lil = lil_matrix((size, 1))
+ for key, value in values.items():
+ lil[key, 0] = value
+ return lil
+
+ def test_clustering(self):
+ from pyspark.mllib.clustering import KMeans
+ data = [
+ self.scipy_matrix(3, {1: 1.0}),
+ self.scipy_matrix(3, {1: 1.1}),
+ self.scipy_matrix(3, {2: 1.0}),
+ self.scipy_matrix(3, {2: 1.1})
+ ]
+ clusters = KMeans.train(self.sc.parallelize(data), 2, initializationMode="k-means||")
+ self.assertEquals(clusters.predict(data[0]), clusters.predict(data[1]))
+ self.assertEquals(clusters.predict(data[2]), clusters.predict(data[3]))
+
+ def test_classification(self):
+ from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD, NaiveBayes
+ data = [
+ LabeledPoint(0.0, self.scipy_matrix(2, {0: 1.0})),
+ LabeledPoint(1.0, self.scipy_matrix(2, {1: 1.0})),
+ LabeledPoint(0.0, self.scipy_matrix(2, {0: 2.0})),
+ LabeledPoint(1.0, self.scipy_matrix(2, {1: 2.0}))
+ ]
+ rdd = self.sc.parallelize(data)
+ features = [p.features for p in data]
+
+ lr_model = LogisticRegressionWithSGD.train(rdd)
+ self.assertTrue(lr_model.predict(features[0]) <= 0)
+ self.assertTrue(lr_model.predict(features[1]) > 0)
+ self.assertTrue(lr_model.predict(features[2]) <= 0)
+ self.assertTrue(lr_model.predict(features[3]) > 0)
+
+ svm_model = SVMWithSGD.train(rdd)
+ self.assertTrue(svm_model.predict(features[0]) <= 0)
+ self.assertTrue(svm_model.predict(features[1]) > 0)
+ self.assertTrue(svm_model.predict(features[2]) <= 0)
+ self.assertTrue(svm_model.predict(features[3]) > 0)
+
+ nb_model = NaiveBayes.train(rdd)
+ self.assertTrue(nb_model.predict(features[0]) <= 0)
+ self.assertTrue(nb_model.predict(features[1]) > 0)
+ self.assertTrue(nb_model.predict(features[2]) <= 0)
+ self.assertTrue(nb_model.predict(features[3]) > 0)
+
+ def test_regression(self):
+ from pyspark.mllib.regression import LinearRegressionWithSGD, LassoWithSGD, \
+ RidgeRegressionWithSGD
+ data = [
+ LabeledPoint(-1.0, self.scipy_matrix(2, {1: -1.0})),
+ LabeledPoint(1.0, self.scipy_matrix(2, {1: 1.0})),
+ LabeledPoint(-1.0, self.scipy_matrix(2, {1: -2.0})),
+ LabeledPoint(1.0, self.scipy_matrix(2, {1: 2.0}))
+ ]
+ rdd = self.sc.parallelize(data)
+ features = [p.features for p in data]
+
+ lr_model = LinearRegressionWithSGD.train(rdd)
+ self.assertTrue(lr_model.predict(features[0]) <= 0)
+ self.assertTrue(lr_model.predict(features[1]) > 0)
+ self.assertTrue(lr_model.predict(features[2]) <= 0)
+ self.assertTrue(lr_model.predict(features[3]) > 0)
+
+ lasso_model = LassoWithSGD.train(rdd)
+ self.assertTrue(lasso_model.predict(features[0]) <= 0)
+ self.assertTrue(lasso_model.predict(features[1]) > 0)
+ self.assertTrue(lasso_model.predict(features[2]) <= 0)
+ self.assertTrue(lasso_model.predict(features[3]) > 0)
+
+ rr_model = RidgeRegressionWithSGD.train(rdd)
+ self.assertTrue(rr_model.predict(features[0]) <= 0)
+ self.assertTrue(rr_model.predict(features[1]) > 0)
+ self.assertTrue(rr_model.predict(features[2]) <= 0)
+ self.assertTrue(rr_model.predict(features[3]) > 0)
+
+
+if __name__ == "__main__":
+ if not _have_scipy:
+ print "NOTE: Skipping SciPy tests as it does not seem to be installed"
+ unittest.main()
+ if not _have_scipy:
+ print "NOTE: SciPy tests were skipped as it does not seem to be installed"