aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2014-04-15 20:33:24 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-15 20:33:24 -0700
commit63ca581d9c84176549b1ea0a1d8d7c0cca982acc (patch)
tree9dc5f04a355117578e31b4b431d34da075b34ea3 /python/pyspark
parent8517911efb89aade61c8b8c54fee216dae9a4b4f (diff)
downloadspark-63ca581d9c84176549b1ea0a1d8d7c0cca982acc.tar.gz
spark-63ca581d9c84176549b1ea0a1d8d7c0cca982acc.tar.bz2
spark-63ca581d9c84176549b1ea0a1d8d7c0cca982acc.zip
[WIP] SPARK-1430: Support sparse data in Python MLlib
This PR adds a SparseVector class in PySpark and updates all the regression, classification and clustering algorithms and models to support sparse data, similar to MLlib. I chose to add this class because SciPy is quite difficult to install in many environments (more so than NumPy), but I plan to add support for SciPy sparse vectors later too, and make the methods work transparently on objects of either type. On the Scala side, we keep Python sparse vectors sparse and pass them to MLlib. We always return dense vectors from our models. Some to-do items left: - [x] Support SciPy's scipy.sparse matrix objects when SciPy is available. We can easily add a function to convert these to our own SparseVector. - [x] MLlib currently uses a vector with one extra column on the left to represent what we call LabeledPoint in Scala. Do we really want this? It may get annoying once you deal with sparse data since you must add/subtract 1 to each feature index when training. We can remove this API in 1.0 and use tuples for labeling. - [x] Explain how to use these in the Python MLlib docs. CC @mengxr, @joshrosen Author: Matei Zaharia <matei@databricks.com> Closes #341 from mateiz/py-ml-update and squashes the following commits: d52e763 [Matei Zaharia] Remove no-longer-needed slice code and handle review comments ea5a25a [Matei Zaharia] Fix remaining uses of copyto() after merge b9f97a3 [Matei Zaharia] Fix test 1e1bd0f [Matei Zaharia] Add MLlib logistic regression example in Python 88bc01f [Matei Zaharia] Clean up inheritance of LinearModel in Python, and expose its parametrs 37ab747 [Matei Zaharia] Fix some examples and docs due to changes in MLlib API da0f27e [Matei Zaharia] Added a MLlib K-means example and updated docs to discuss sparse data c48e85a [Matei Zaharia] Added some tests for passing lists as input, and added mllib/tests.py to run-tests script. a07ba10 [Matei Zaharia] Fix some typos and calculation of initial weights 74eefe7 [Matei Zaharia] Added LabeledPoint class in Python 889dde8 [Matei Zaharia] Support scipy.sparse matrices in all our algorithms and models ab244d1 [Matei Zaharia] Allow SparseVectors to be initialized using a dict a5d6426 [Matei Zaharia] Add linalg.py to run-tests script 0e7a3d8 [Matei Zaharia] Keep vectors sparse in Java when reading LabeledPoints eaee759 [Matei Zaharia] Update regression, classification and clustering models for sparse data 2abbb44 [Matei Zaharia] Further work to get linear models working with sparse data 154f45d [Matei Zaharia] Update docs, name some magic values 881fef7 [Matei Zaharia] Added a sparse vector in Python and made Java-Python format more compact
Diffstat (limited to 'python/pyspark')
-rw-r--r--python/pyspark/mllib/_common.py396
-rw-r--r--python/pyspark/mllib/classification.py75
-rw-r--r--python/pyspark/mllib/clustering.py51
-rw-r--r--python/pyspark/mllib/linalg.py245
-rw-r--r--python/pyspark/mllib/regression.py128
-rw-r--r--python/pyspark/mllib/tests.py302
6 files changed, 1066 insertions, 131 deletions
diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py
index e19f5d2aaa..e6f0953810 100644
--- a/python/pyspark/mllib/_common.py
+++ b/python/pyspark/mllib/_common.py
@@ -15,38 +15,86 @@
# limitations under the License.
#
-from numpy import ndarray, float64, int64, int32, ones, array_equal, array, dot, shape, complex, issubdtype
+import struct
+import numpy
+from numpy import ndarray, float64, int64, int32, array_equal, array
from pyspark import SparkContext, RDD
-import numpy as np
-
+from pyspark.mllib.linalg import SparseVector
from pyspark.serializers import Serializer
-import struct
-# Double vector format:
+"""
+Common utilities shared throughout MLlib, primarily for dealing with
+different data types. These include:
+- Serialization utilities to / from byte arrays that Java can handle
+- Serializers for other data types, like ALS Rating objects
+- Common methods for linear models
+- Methods to deal with the different vector types we support, such as
+ SparseVector and scipy.sparse matrices.
+"""
+
+
+# Check whether we have SciPy. MLlib works without it too, but if we have it, some methods,
+# such as _dot and _serialize_double_vector, start to support scipy.sparse matrices.
+
+_have_scipy = False
+_scipy_issparse = None
+try:
+ import scipy.sparse
+ _have_scipy = True
+ _scipy_issparse = scipy.sparse.issparse
+except:
+ # No SciPy in environment, but that's okay
+ pass
+
+
+# Serialization functions to and from Scala. These use the following formats, understood
+# by the PythonMLLibAPI class in Scala:
+#
+# Dense double vector format:
+#
+# [1-byte 1] [4-byte length] [length*8 bytes of data]
#
-# [8-byte 1] [8-byte length] [length*8 bytes of data]
+# Sparse double vector format:
+#
+# [1-byte 2] [4-byte length] [4-byte nonzeros] [nonzeros*4 bytes of indices] [nonzeros*8 bytes of values]
#
# Double matrix format:
#
-# [8-byte 2] [8-byte rows] [8-byte cols] [rows*cols*8 bytes of data]
+# [1-byte 3] [4-byte rows] [4-byte cols] [rows*cols*8 bytes of data]
+#
+# LabeledPoint format:
+#
+# [1-byte 4] [8-byte label] [dense or sparse vector]
#
# This is all in machine-endian. That means that the Java interpreter and the
# Python interpreter must agree on what endian the machine is.
-def _deserialize_byte_array(shape, ba, offset):
- """Wrapper around ndarray aliasing hack.
+
+DENSE_VECTOR_MAGIC = 1
+SPARSE_VECTOR_MAGIC = 2
+DENSE_MATRIX_MAGIC = 3
+LABELED_POINT_MAGIC = 4
+
+
+def _deserialize_numpy_array(shape, ba, offset, dtype=float64):
+ """
+ Deserialize a numpy array of the given type from an offset in
+ bytearray ba, assigning it the given shape.
>>> x = array([1.0, 2.0, 3.0, 4.0, 5.0])
- >>> array_equal(x, _deserialize_byte_array(x.shape, x.data, 0))
+ >>> array_equal(x, _deserialize_numpy_array(x.shape, x.data, 0))
True
>>> x = array([1.0, 2.0, 3.0, 4.0]).reshape(2,2)
- >>> array_equal(x, _deserialize_byte_array(x.shape, x.data, 0))
+ >>> array_equal(x, _deserialize_numpy_array(x.shape, x.data, 0))
+ True
+ >>> x = array([1, 2, 3], dtype=int32)
+ >>> array_equal(x, _deserialize_numpy_array(x.shape, x.data, 0, dtype=int32))
True
"""
- ar = ndarray(shape=shape, buffer=ba, offset=offset, dtype="float64",
- order='C')
+ ar = ndarray(shape=shape, buffer=ba, offset=offset, dtype=dtype, order='C')
return ar.copy()
+
def _serialize_double_vector(v):
"""Serialize a double vector into a mutually understood format.
@@ -55,160 +103,231 @@ def _serialize_double_vector(v):
>>> array_equal(y, array([1.0, 2.0, 3.0]))
True
"""
- if type(v) != ndarray:
- raise TypeError("_serialize_double_vector called on a %s; "
- "wanted ndarray" % type(v))
- """complex is only datatype that can't be converted to float64"""
- if issubdtype(v.dtype, complex):
+ v = _convert_vector(v)
+ if type(v) == ndarray:
+ return _serialize_dense_vector(v)
+ elif type(v) == SparseVector:
+ return _serialize_sparse_vector(v)
+ else:
raise TypeError("_serialize_double_vector called on a %s; "
- "wanted ndarray" % type(v))
- if v.dtype != float64:
- v = v.astype(float64)
+ "wanted ndarray or SparseVector" % type(v))
+
+
+def _serialize_dense_vector(v):
+ """Serialize a dense vector given as a NumPy array."""
if v.ndim != 1:
raise TypeError("_serialize_double_vector called on a %ddarray; "
"wanted a 1darray" % v.ndim)
+ if v.dtype != float64:
+ if numpy.issubdtype(v.dtype, numpy.complex):
+ raise TypeError("_serialize_double_vector called on an ndarray of %s; "
+ "wanted ndarray of float64" % v.dtype)
+ v = v.astype(float64)
length = v.shape[0]
- ba = bytearray(16 + 8*length)
- header = ndarray(shape=[2], buffer=ba, dtype="int64")
- header[0] = 1
- header[1] = length
- arr_mid = ndarray(shape=[length], buffer=ba, offset=16, dtype="float64")
- arr_mid[...] = v
+ ba = bytearray(5 + 8 * length)
+ ba[0] = DENSE_VECTOR_MAGIC
+ length_bytes = ndarray(shape=[1], buffer=ba, offset=1, dtype=int32)
+ length_bytes[0] = length
+ _copyto(v, buffer=ba, offset=5, shape=[length], dtype=float64)
+ return ba
+
+
+def _serialize_sparse_vector(v):
+ """Serialize a pyspark.mllib.linalg.SparseVector."""
+ nonzeros = len(v.indices)
+ ba = bytearray(9 + 12 * nonzeros)
+ ba[0] = SPARSE_VECTOR_MAGIC
+ header = ndarray(shape=[2], buffer=ba, offset=1, dtype=int32)
+ header[0] = v.size
+ header[1] = nonzeros
+ _copyto(v.indices, buffer=ba, offset=9, shape=[nonzeros], dtype=int32)
+ values_offset = 9 + 4 * nonzeros
+ _copyto(v.values, buffer=ba, offset=values_offset, shape=[nonzeros], dtype=float64)
return ba
+
def _deserialize_double_vector(ba):
"""Deserialize a double vector from a mutually understood format.
>>> x = array([1.0, 2.0, 3.0, 4.0, -1.0, 0.0, -0.0])
>>> array_equal(x, _deserialize_double_vector(_serialize_double_vector(x)))
True
+ >>> s = SparseVector(4, [1, 3], [3.0, 5.5])
+ >>> s == _deserialize_double_vector(_serialize_double_vector(s))
+ True
"""
if type(ba) != bytearray:
raise TypeError("_deserialize_double_vector called on a %s; "
"wanted bytearray" % type(ba))
- if len(ba) < 16:
+ if len(ba) < 5:
raise TypeError("_deserialize_double_vector called on a %d-byte array, "
"which is too short" % len(ba))
- if (len(ba) & 7) != 0:
- raise TypeError("_deserialize_double_vector called on a %d-byte array, "
- "which is not a multiple of 8" % len(ba))
- header = ndarray(shape=[2], buffer=ba, dtype="int64")
- if header[0] != 1:
+ if ba[0] == DENSE_VECTOR_MAGIC:
+ return _deserialize_dense_vector(ba)
+ elif ba[0] == SPARSE_VECTOR_MAGIC:
+ return _deserialize_sparse_vector(ba)
+ else:
raise TypeError("_deserialize_double_vector called on bytearray "
"with wrong magic")
- length = header[1]
- if len(ba) != 8*length + 16:
- raise TypeError("_deserialize_double_vector called on bytearray "
+
+
+def _deserialize_dense_vector(ba):
+ """Deserialize a dense vector into a numpy array."""
+ if len(ba) < 5:
+ raise TypeError("_deserialize_dense_vector called on a %d-byte array, "
+ "which is too short" % len(ba))
+ length = ndarray(shape=[1], buffer=ba, offset=1, dtype=int32)[0]
+ if len(ba) != 8 * length + 5:
+ raise TypeError("_deserialize_dense_vector called on bytearray "
+ "with wrong length")
+ return _deserialize_numpy_array([length], ba, 5)
+
+
+def _deserialize_sparse_vector(ba):
+ """Deserialize a sparse vector into a MLlib SparseVector object."""
+ if len(ba) < 9:
+ raise TypeError("_deserialize_sparse_vector called on a %d-byte array, "
+ "which is too short" % len(ba))
+ header = ndarray(shape=[2], buffer=ba, offset=1, dtype=int32)
+ size = header[0]
+ nonzeros = header[1]
+ if len(ba) != 9 + 12 * nonzeros:
+ raise TypeError("_deserialize_sparse_vector called on bytearray "
"with wrong length")
- return _deserialize_byte_array([length], ba, 16)
+ indices = _deserialize_numpy_array([nonzeros], ba, 9, dtype=int32)
+ values = _deserialize_numpy_array([nonzeros], ba, 9 + 4 * nonzeros, dtype=float64)
+ return SparseVector(int(size), indices, values)
+
def _serialize_double_matrix(m):
"""Serialize a double matrix into a mutually understood format."""
- if (type(m) == ndarray and m.dtype == float64 and m.ndim == 2):
+ if (type(m) == ndarray and m.ndim == 2):
+ if m.dtype != float64:
+ if numpy.issubdtype(m.dtype, numpy.complex):
+ raise TypeError("_serialize_double_matrix called on an ndarray of %s; "
+ "wanted ndarray of float64" % m.dtype)
+ m = m.astype(float64)
rows = m.shape[0]
cols = m.shape[1]
- ba = bytearray(24 + 8 * rows * cols)
- header = ndarray(shape=[3], buffer=ba, dtype="int64")
- header[0] = 2
- header[1] = rows
- header[2] = cols
- arr_mid = ndarray(shape=[rows, cols], buffer=ba, offset=24,
- dtype="float64", order='C')
- arr_mid[...] = m
+ ba = bytearray(9 + 8 * rows * cols)
+ ba[0] = DENSE_MATRIX_MAGIC
+ lengths = ndarray(shape=[3], buffer=ba, offset=1, dtype=int32)
+ lengths[0] = rows
+ lengths[1] = cols
+ _copyto(m, buffer=ba, offset=9, shape=[rows, cols], dtype=float64)
return ba
else:
raise TypeError("_serialize_double_matrix called on a "
"non-double-matrix")
+
def _deserialize_double_matrix(ba):
"""Deserialize a double matrix from a mutually understood format."""
if type(ba) != bytearray:
raise TypeError("_deserialize_double_matrix called on a %s; "
"wanted bytearray" % type(ba))
- if len(ba) < 24:
+ if len(ba) < 9:
raise TypeError("_deserialize_double_matrix called on a %d-byte array, "
"which is too short" % len(ba))
- if (len(ba) & 7) != 0:
- raise TypeError("_deserialize_double_matrix called on a %d-byte array, "
- "which is not a multiple of 8" % len(ba))
- header = ndarray(shape=[3], buffer=ba, dtype="int64")
- if (header[0] != 2):
+ if ba[0] != DENSE_MATRIX_MAGIC:
raise TypeError("_deserialize_double_matrix called on bytearray "
"with wrong magic")
- rows = header[1]
- cols = header[2]
- if (len(ba) != 8*rows*cols + 24):
+ lengths = ndarray(shape=[2], buffer=ba, offset=1, dtype=int32)
+ rows = lengths[0]
+ cols = lengths[1]
+ if (len(ba) != 8 * rows * cols + 9):
raise TypeError("_deserialize_double_matrix called on bytearray "
"with wrong length")
- return _deserialize_byte_array([rows, cols], ba, 24)
+ return _deserialize_numpy_array([rows, cols], ba, 9)
+
+
+def _serialize_labeled_point(p):
+ """Serialize a LabeledPoint with a features vector of any type."""
+ from pyspark.mllib.regression import LabeledPoint
+ serialized_features = _serialize_double_vector(p.features)
+ header = bytearray(9)
+ header[0] = LABELED_POINT_MAGIC
+ header_float = ndarray(shape=[1], buffer=header, offset=1, dtype=float64)
+ header_float[0] = p.label
+ return header + serialized_features
+
+
+def _copyto(array, buffer, offset, shape, dtype):
+ """
+ Copy the contents of a vector to a destination bytearray at the
+ given offset.
+
+ TODO: In the future this could use numpy.copyto on NumPy 1.7+, but
+ we should benchmark that to see whether it provides a benefit.
+ """
+ temp_array = ndarray(shape=shape, buffer=buffer, offset=offset, dtype=dtype, order='C')
+ temp_array[...] = array
-def _linear_predictor_typecheck(x, coeffs):
- """Check that x is a one-dimensional vector of the right shape.
- This is a temporary hackaround until I actually implement bulk predict."""
- if type(x) == ndarray:
- if x.ndim == 1:
- if x.shape == coeffs.shape:
- pass
- else:
- raise RuntimeError("Got array of %d elements; wanted %d"
- % (shape(x)[0], shape(coeffs)[0]))
- else:
- raise RuntimeError("Bulk predict not yet supported.")
- elif (type(x) == RDD):
- raise RuntimeError("Bulk predict not yet supported.")
- else:
- raise TypeError("Argument of type " + type(x).__name__ + " unsupported")
def _get_unmangled_rdd(data, serializer):
dataBytes = data.map(serializer)
dataBytes._bypass_serializer = True
- dataBytes.cache()
+ dataBytes.cache() # TODO: users should unpersist() this later!
return dataBytes
-# Map a pickled Python RDD of numpy double vectors to a Java RDD of
+
+# Map a pickled Python RDD of Python dense or sparse vectors to a Java RDD of
# _serialized_double_vectors
def _get_unmangled_double_vector_rdd(data):
return _get_unmangled_rdd(data, _serialize_double_vector)
-class LinearModel(object):
- """Something that has a vector of coefficients and an intercept."""
- def __init__(self, coeff, intercept):
- self._coeff = coeff
- self._intercept = intercept
-class LinearRegressionModelBase(LinearModel):
- """A linear regression model.
+# Map a pickled Python RDD of LabeledPoint to a Java RDD of _serialized_labeled_points
+def _get_unmangled_labeled_point_rdd(data):
+ return _get_unmangled_rdd(data, _serialize_labeled_point)
- >>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1)
- >>> abs(lrmb.predict(array([-1.03, 7.777])) - 14.624) < 1e-6
- True
+
+# Common functions for dealing with and training linear models
+
+def _linear_predictor_typecheck(x, coeffs):
"""
- def predict(self, x):
- """Predict the value of the dependent variable given a vector x"""
- """containing values for the independent variables."""
- _linear_predictor_typecheck(x, self._coeff)
- return dot(self._coeff, x) + self._intercept
+ Check that x is a one-dimensional vector of the right shape.
+ This is a temporary hackaround until we actually implement bulk predict.
+ """
+ x = _convert_vector(x)
+ if type(x) == ndarray:
+ if x.ndim == 1:
+ if x.shape != coeffs.shape:
+ raise RuntimeError("Got array of %d elements; wanted %d"
+ % (numpy.shape(x)[0], coeffs.shape[0]))
+ else:
+ raise RuntimeError("Bulk predict not yet supported.")
+ elif type(x) == SparseVector:
+ if x.size != coeffs.shape[0]:
+ raise RuntimeError("Got sparse vector of size %d; wanted %d"
+ % (x.size, coeffs.shape[0]))
+ elif (type(x) == RDD):
+ raise RuntimeError("Bulk predict not yet supported.")
+ else:
+ raise TypeError("Argument of type " + type(x).__name__ + " unsupported")
+
# If we weren't given initial weights, take a zero vector of the appropriate
# length.
def _get_initial_weights(initial_weights, data):
if initial_weights is None:
- initial_weights = data.first()
- if type(initial_weights) != ndarray:
- raise TypeError("At least one data element has type "
- + type(initial_weights).__name__ + " which is not ndarray")
- if initial_weights.ndim != 1:
- raise TypeError("At least one data element has "
- + initial_weights.ndim + " dimensions, which is not 1")
- initial_weights = ones([initial_weights.shape[0] - 1])
+ initial_weights = _convert_vector(data.first().features)
+ if type(initial_weights) == ndarray:
+ if initial_weights.ndim != 1:
+ raise TypeError("At least one data element has "
+ + initial_weights.ndim + " dimensions, which is not 1")
+ initial_weights = numpy.zeros([initial_weights.shape[0]])
+ elif type(initial_weights) == SparseVector:
+ initial_weights = numpy.zeros([initial_weights.size])
return initial_weights
+
# train_func should take two parameters, namely data and initial_weights, and
# return the result of a call to the appropriate JVM stub.
# _regression_train_wrapper is responsible for setup and error checking.
def _regression_train_wrapper(sc, train_func, klass, data, initial_weights):
initial_weights = _get_initial_weights(initial_weights, data)
- dataBytes = _get_unmangled_double_vector_rdd(data)
+ dataBytes = _get_unmangled_labeled_point_rdd(data)
ans = train_func(dataBytes, _serialize_double_vector(initial_weights))
if len(ans) != 2:
raise RuntimeError("JVM call result had unexpected length")
@@ -220,6 +339,9 @@ def _regression_train_wrapper(sc, train_func, klass, data, initial_weights):
+ type(ans[0]).__name__ + " which is not float")
return klass(_deserialize_double_vector(ans[0]), ans[1])
+
+# Functions for serializing ALS Rating objects and tuples
+
def _serialize_rating(r):
ba = bytearray(16)
intpart = ndarray(shape=[2], buffer=ba, dtype=int32)
@@ -227,11 +349,12 @@ def _serialize_rating(r):
intpart[0], intpart[1], doublepart[0] = r
return ba
+
class RatingDeserializer(Serializer):
def loads(self, stream):
length = struct.unpack("!i", stream.read(4))[0]
ba = stream.read(length)
- res = ndarray(shape=(3, ), buffer=ba, dtype="float64", offset=4)
+ res = ndarray(shape=(3, ), buffer=ba, dtype=float64, offset=4)
return int(res[0]), int(res[1]), res[2]
def load_stream(self, stream):
@@ -243,12 +366,86 @@ class RatingDeserializer(Serializer):
except EOFError:
return
+
def _serialize_tuple(t):
ba = bytearray(8)
intpart = ndarray(shape=[2], buffer=ba, dtype=int32)
intpart[0], intpart[1] = t
return ba
+
+# Vector math functions that support all of our vector types
+
+def _convert_vector(vec):
+ """
+ Convert a vector to a format we support internally. This does
+ the following:
+
+ * For dense NumPy vectors (ndarray), returns them as is
+ * For our SparseVector class, returns that as is
+ * For Python lists, converts them to NumPy vectors
+ * For scipy.sparse.*_matrix column vectors, converts them to
+ our own SparseVector type.
+
+ This should be called before passing any data to our algorithms
+ or attempting to serialize it to Java.
+ """
+ if type(vec) == ndarray or type(vec) == SparseVector:
+ return vec
+ elif type(vec) == list:
+ return array(vec, dtype=float64)
+ elif _have_scipy:
+ if _scipy_issparse(vec):
+ assert vec.shape[1] == 1, "Expected column vector"
+ csc = vec.tocsc()
+ return SparseVector(vec.shape[0], csc.indices, csc.data)
+ raise TypeError("Expected NumPy array, SparseVector, or scipy.sparse matrix")
+
+
+def _squared_distance(v1, v2):
+ """
+ Squared distance of two NumPy or sparse vectors.
+
+ >>> dense1 = array([1., 2.])
+ >>> sparse1 = SparseVector(2, [0, 1], [1., 2.])
+ >>> dense2 = array([2., 1.])
+ >>> sparse2 = SparseVector(2, [0, 1], [2., 1.])
+ >>> _squared_distance(dense1, dense2)
+ 2.0
+ >>> _squared_distance(dense1, sparse2)
+ 2.0
+ >>> _squared_distance(sparse1, dense2)
+ 2.0
+ >>> _squared_distance(sparse1, sparse2)
+ 2.0
+ """
+ v1 = _convert_vector(v1)
+ v2 = _convert_vector(v2)
+ if type(v1) == ndarray and type(v2) == ndarray:
+ diff = v1 - v2
+ return diff.dot(diff)
+ elif type(v1) == ndarray:
+ return v2.squared_distance(v1)
+ else:
+ return v1.squared_distance(v2)
+
+
+def _dot(vec, target):
+ """
+ Compute the dot product of a vector of the types we support
+ (Numpy array, list, SparseVector, or SciPy sparse) and a target
+ NumPy array that is either 1- or 2-dimensional. Equivalent to
+ calling numpy.dot of the two vectors, but for SciPy ones, we
+ have to transpose them because they're column vectors.
+ """
+ if type(vec) == ndarray or type(vec) == SparseVector:
+ return vec.dot(target)
+ elif type(vec) == list:
+ return _convert_vector(vec).dot(target)
+ else:
+ return vec.transpose().dot(target)[0]
+
+
def _test():
import doctest
globs = globals().copy()
@@ -259,5 +456,6 @@ def _test():
if failure_count:
exit(-1)
+
if __name__ == "__main__":
_test()
diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py
index d2f9cdb3f4..3a23e0801f 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -17,30 +17,55 @@
import numpy
-from numpy import array, dot, shape
+from numpy import array, shape
from pyspark import SparkContext
from pyspark.mllib._common import \
- _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
+ _dot, _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
_serialize_double_matrix, _deserialize_double_matrix, \
_serialize_double_vector, _deserialize_double_vector, \
_get_initial_weights, _serialize_rating, _regression_train_wrapper, \
- LinearModel, _linear_predictor_typecheck
+ _linear_predictor_typecheck, _get_unmangled_labeled_point_rdd
+from pyspark.mllib.linalg import SparseVector
+from pyspark.mllib.regression import LabeledPoint, LinearModel
from math import exp, log
class LogisticRegressionModel(LinearModel):
"""A linear binary classification model derived from logistic regression.
- >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2)
+ >>> data = [
+ ... LabeledPoint(0.0, [0.0]),
+ ... LabeledPoint(1.0, [1.0]),
+ ... LabeledPoint(1.0, [2.0]),
+ ... LabeledPoint(1.0, [3.0])
+ ... ]
>>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(data))
>>> lrm.predict(array([1.0])) > 0
True
+ >>> lrm.predict(array([0.0])) <= 0
+ True
+ >>> sparse_data = [
+ ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
+ ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
+ ... ]
+ >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(sparse_data))
+ >>> lrm.predict(array([0.0, 1.0])) > 0
+ True
+ >>> lrm.predict(array([0.0, 0.0])) <= 0
+ True
+ >>> lrm.predict(SparseVector(2, {1: 1.0})) > 0
+ True
+ >>> lrm.predict(SparseVector(2, {1: 0.0})) <= 0
+ True
"""
def predict(self, x):
_linear_predictor_typecheck(x, self._coeff)
- margin = dot(x, self._coeff) + self._intercept
+ margin = _dot(x, self._coeff) + self._intercept
prob = 1/(1 + exp(-margin))
return 1 if prob > 0.5 else 0
+
class LogisticRegressionWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0,
@@ -55,14 +80,30 @@ class LogisticRegressionWithSGD(object):
class SVMModel(LinearModel):
"""A support vector machine.
- >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2)
+ >>> data = [
+ ... LabeledPoint(0.0, [0.0]),
+ ... LabeledPoint(1.0, [1.0]),
+ ... LabeledPoint(1.0, [2.0]),
+ ... LabeledPoint(1.0, [3.0])
+ ... ]
>>> svm = SVMWithSGD.train(sc.parallelize(data))
>>> svm.predict(array([1.0])) > 0
True
+ >>> sparse_data = [
+ ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
+ ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
+ ... ]
+ >>> svm = SVMWithSGD.train(sc.parallelize(sparse_data))
+ >>> svm.predict(SparseVector(2, {1: 1.0})) > 0
+ True
+ >>> svm.predict(SparseVector(2, {1: 0.0})) <= 0
+ True
"""
def predict(self, x):
_linear_predictor_typecheck(x, self._coeff)
- margin = dot(x, self._coeff) + self._intercept
+ margin = _dot(x, self._coeff) + self._intercept
return 1 if margin >= 0 else 0
class SVMWithSGD(object):
@@ -84,12 +125,26 @@ class NaiveBayesModel(object):
- pi: vector of logs of class priors (dimension C)
- theta: matrix of logs of class conditional probabilities (CxD)
- >>> data = array([0.0, 0.0, 1.0, 0.0, 0.0, 2.0, 1.0, 1.0, 0.0]).reshape(3,3)
+ >>> data = [
+ ... LabeledPoint(0.0, [0.0, 0.0]),
+ ... LabeledPoint(0.0, [0.0, 1.0]),
+ ... LabeledPoint(1.0, [1.0, 0.0]),
+ ... ]
>>> model = NaiveBayes.train(sc.parallelize(data))
>>> model.predict(array([0.0, 1.0]))
0.0
>>> model.predict(array([1.0, 0.0]))
1.0
+ >>> sparse_data = [
+ ... LabeledPoint(0.0, SparseVector(2, {1: 0.0})),
+ ... LabeledPoint(0.0, SparseVector(2, {1: 1.0})),
+ ... LabeledPoint(1.0, SparseVector(2, {0: 1.0}))
+ ... ]
+ >>> model = NaiveBayes.train(sc.parallelize(sparse_data))
+ >>> model.predict(SparseVector(2, {1: 1.0}))
+ 0.0
+ >>> model.predict(SparseVector(2, {0: 1.0}))
+ 1.0
"""
def __init__(self, labels, pi, theta):
@@ -99,7 +154,7 @@ class NaiveBayesModel(object):
def predict(self, x):
"""Return the most likely class for a data vector x"""
- return self.labels[numpy.argmax(self.pi + dot(x, self.theta))]
+ return self.labels[numpy.argmax(self.pi + _dot(x, self.theta))]
class NaiveBayes(object):
@classmethod
@@ -119,7 +174,7 @@ class NaiveBayes(object):
@param lambda_: The smoothing parameter
"""
sc = data.context
- dataBytes = _get_unmangled_double_vector_rdd(data)
+ dataBytes = _get_unmangled_labeled_point_rdd(data)
ans = sc._jvm.PythonMLLibAPI().trainNaiveBayes(dataBytes._jrdd, lambda_)
return NaiveBayesModel(
_deserialize_double_vector(ans[0]),
diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py
index 30862918c3..f65088c917 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -19,37 +19,61 @@ from numpy import array, dot
from math import sqrt
from pyspark import SparkContext
from pyspark.mllib._common import \
- _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
+ _get_unmangled_rdd, _get_unmangled_double_vector_rdd, _squared_distance, \
_serialize_double_matrix, _deserialize_double_matrix, \
_serialize_double_vector, _deserialize_double_vector, \
_get_initial_weights, _serialize_rating, _regression_train_wrapper
+from pyspark.mllib.linalg import SparseVector
+
class KMeansModel(object):
"""A clustering model derived from the k-means method.
>>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2)
- >>> clusters = KMeans.train(sc.parallelize(data), 2, maxIterations=10, runs=30, initializationMode="random")
- >>> clusters.predict(array([0.0, 0.0])) == clusters.predict(array([1.0, 1.0]))
+ >>> model = KMeans.train(sc.parallelize(data), 2, maxIterations=10, runs=30, initializationMode="random")
+ >>> model.predict(array([0.0, 0.0])) == model.predict(array([1.0, 1.0]))
+ True
+ >>> model.predict(array([8.0, 9.0])) == model.predict(array([9.0, 8.0]))
+ True
+ >>> model = KMeans.train(sc.parallelize(data), 2)
+ >>> sparse_data = [
+ ... SparseVector(3, {1: 1.0}),
+ ... SparseVector(3, {1: 1.1}),
+ ... SparseVector(3, {2: 1.0}),
+ ... SparseVector(3, {2: 1.1})
+ ... ]
+ >>> model = KMeans.train(sc.parallelize(sparse_data), 2, initializationMode="k-means||")
+ >>> model.predict(array([0., 1., 0.])) == model.predict(array([0, 1.1, 0.]))
+ True
+ >>> model.predict(array([0., 0., 1.])) == model.predict(array([0, 0, 1.1]))
+ True
+ >>> model.predict(sparse_data[0]) == model.predict(sparse_data[1])
True
- >>> clusters.predict(array([8.0, 9.0])) == clusters.predict(array([9.0, 8.0]))
+ >>> model.predict(sparse_data[2]) == model.predict(sparse_data[3])
True
- >>> clusters = KMeans.train(sc.parallelize(data), 2)
+ >>> type(model.clusterCenters)
+ <type 'list'>
"""
- def __init__(self, centers_):
- self.centers = centers_
+ def __init__(self, centers):
+ self.centers = centers
+
+ @property
+ def clusterCenters(self):
+ """Get the cluster centers, represented as a list of NumPy arrays."""
+ return self.centers
def predict(self, x):
"""Find the cluster to which x belongs in this model."""
best = 0
- best_distance = 1e75
- for i in range(0, self.centers.shape[0]):
- diff = x - self.centers[i]
- distance = sqrt(dot(diff, diff))
+ best_distance = float("inf")
+ for i in range(0, len(self.centers)):
+ distance = _squared_distance(x, self.centers[i])
if distance < best_distance:
best = i
best_distance = distance
return best
+
class KMeans(object):
@classmethod
def train(cls, data, k, maxIterations=100, runs=1,
@@ -64,7 +88,9 @@ class KMeans(object):
elif type(ans[0]) != bytearray:
raise RuntimeError("JVM call result had first element of type "
+ type(ans[0]) + " which is not bytearray")
- return KMeansModel(_deserialize_double_matrix(ans[0]))
+ matrix = _deserialize_double_matrix(ans[0])
+ return KMeansModel([row for row in matrix])
+
def _test():
import doctest
@@ -76,5 +102,6 @@ def _test():
if failure_count:
exit(-1)
+
if __name__ == "__main__":
_test()
diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py
new file mode 100644
index 0000000000..0aa3a51de7
--- /dev/null
+++ b/python/pyspark/mllib/linalg.py
@@ -0,0 +1,245 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+MLlib utilities for linear algebra. For dense vectors, MLlib
+uses the NumPy C{array} type, so you can simply pass NumPy arrays
+around. For sparse vectors, users can construct a L{SparseVector}
+object from MLlib or pass SciPy C{scipy.sparse} column vectors if
+SciPy is available in their environment.
+"""
+
+from numpy import array, array_equal, ndarray, float64, int32
+
+
+class SparseVector(object):
+ """
+ A simple sparse vector class for passing data to MLlib. Users may
+ alternatively pass SciPy's {scipy.sparse} data types.
+ """
+
+ def __init__(self, size, *args):
+ """
+ Create a sparse vector, using either a dictionary, a list of
+ (index, value) pairs, or two separate arrays of indices and
+ values (sorted by index).
+
+ @param size: Size of the vector.
+ @param args: Non-zero entries, as a dictionary, list of tupes,
+ or two sorted lists containing indices and values.
+
+ >>> print SparseVector(4, {1: 1.0, 3: 5.5})
+ [1: 1.0, 3: 5.5]
+ >>> print SparseVector(4, [(1, 1.0), (3, 5.5)])
+ [1: 1.0, 3: 5.5]
+ >>> print SparseVector(4, [1, 3], [1.0, 5.5])
+ [1: 1.0, 3: 5.5]
+ """
+ assert type(size) == int, "first argument must be an int"
+ self.size = size
+ assert 1 <= len(args) <= 2, "must pass either 2 or 3 arguments"
+ if len(args) == 1:
+ pairs = args[0]
+ if type(pairs) == dict:
+ pairs = pairs.items()
+ pairs = sorted(pairs)
+ self.indices = array([p[0] for p in pairs], dtype=int32)
+ self.values = array([p[1] for p in pairs], dtype=float64)
+ else:
+ assert len(args[0]) == len(args[1]), "index and value arrays not same length"
+ self.indices = array(args[0], dtype=int32)
+ self.values = array(args[1], dtype=float64)
+ for i in xrange(len(self.indices) - 1):
+ if self.indices[i] >= self.indices[i + 1]:
+ raise TypeError("indices array must be sorted")
+
+ def dot(self, other):
+ """
+ Dot product with a SparseVector or 1- or 2-dimensional Numpy array.
+
+ >>> a = SparseVector(4, [1, 3], [3.0, 4.0])
+ >>> a.dot(a)
+ 25.0
+ >>> a.dot(array([1., 2., 3., 4.]))
+ 22.0
+ >>> b = SparseVector(4, [2, 4], [1.0, 2.0])
+ >>> a.dot(b)
+ 0.0
+ >>> a.dot(array([[1, 1], [2, 2], [3, 3], [4, 4]]))
+ array([ 22., 22.])
+ """
+ if type(other) == ndarray:
+ if other.ndim == 1:
+ result = 0.0
+ for i in xrange(len(self.indices)):
+ result += self.values[i] * other[self.indices[i]]
+ return result
+ elif other.ndim == 2:
+ results = [self.dot(other[:,i]) for i in xrange(other.shape[1])]
+ return array(results)
+ else:
+ raise Exception("Cannot call dot with %d-dimensional array" % other.ndim)
+ else:
+ result = 0.0
+ i, j = 0, 0
+ while i < len(self.indices) and j < len(other.indices):
+ if self.indices[i] == other.indices[j]:
+ result += self.values[i] * other.values[j]
+ i += 1
+ j += 1
+ elif self.indices[i] < other.indices[j]:
+ i += 1
+ else:
+ j += 1
+ return result
+
+ def squared_distance(self, other):
+ """
+ Squared distance from a SparseVector or 1-dimensional NumPy array.
+
+ >>> a = SparseVector(4, [1, 3], [3.0, 4.0])
+ >>> a.squared_distance(a)
+ 0.0
+ >>> a.squared_distance(array([1., 2., 3., 4.]))
+ 11.0
+ >>> b = SparseVector(4, [2, 4], [1.0, 2.0])
+ >>> a.squared_distance(b)
+ 30.0
+ >>> b.squared_distance(a)
+ 30.0
+ """
+ if type(other) == ndarray:
+ if other.ndim == 1:
+ result = 0.0
+ j = 0 # index into our own array
+ for i in xrange(other.shape[0]):
+ if j < len(self.indices) and self.indices[j] == i:
+ diff = self.values[j] - other[i]
+ result += diff * diff
+ j += 1
+ else:
+ result += other[i] * other[i]
+ return result
+ else:
+ raise Exception("Cannot call squared_distance with %d-dimensional array" %
+ other.ndim)
+ else:
+ result = 0.0
+ i, j = 0, 0
+ while i < len(self.indices) and j < len(other.indices):
+ if self.indices[i] == other.indices[j]:
+ diff = self.values[i] - other.values[j]
+ result += diff * diff
+ i += 1
+ j += 1
+ elif self.indices[i] < other.indices[j]:
+ result += self.values[i] * self.values[i]
+ i += 1
+ else:
+ result += other.values[j] * other.values[j]
+ j += 1
+ while i < len(self.indices):
+ result += self.values[i] * self.values[i]
+ i += 1
+ while j < len(other.indices):
+ result += other.values[j] * other.values[j]
+ j += 1
+ return result
+
+ def __str__(self):
+ inds = self.indices
+ vals = self.values
+ entries = ", ".join(["{0}: {1}".format(inds[i], vals[i]) for i in xrange(len(inds))])
+ return "[" + entries + "]"
+
+ def __repr__(self):
+ inds = self.indices
+ vals = self.values
+ entries = ", ".join(["{0}: {1}".format(inds[i], vals[i]) for i in xrange(len(inds))])
+ return "SparseVector({0}, {{{1}}})".format(self.size, entries)
+
+ def __eq__(self, other):
+ """
+ Test SparseVectors for equality.
+
+ >>> v1 = SparseVector(4, [(1, 1.0), (3, 5.5)])
+ >>> v2 = SparseVector(4, [(1, 1.0), (3, 5.5)])
+ >>> v1 == v2
+ True
+ >>> v1 != v2
+ False
+ """
+
+ return (isinstance(other, self.__class__)
+ and other.size == self.size
+ and array_equal(other.indices, self.indices)
+ and array_equal(other.values, self.values))
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+
+
+class Vectors(object):
+ """
+ Factory methods for working with vectors. Note that dense vectors
+ are simply represented as NumPy array objects, so there is no need
+ to covert them for use in MLlib. For sparse vectors, the factory
+ methods in this class create an MLlib-compatible type, or users
+ can pass in SciPy's C{scipy.sparse} column vectors.
+ """
+
+ @staticmethod
+ def sparse(size, *args):
+ """
+ Create a sparse vector, using either a dictionary, a list of
+ (index, value) pairs, or two separate arrays of indices and
+ values (sorted by index).
+
+ @param size: Size of the vector.
+ @param args: Non-zero entries, as a dictionary, list of tupes,
+ or two sorted lists containing indices and values.
+
+ >>> print Vectors.sparse(4, {1: 1.0, 3: 5.5})
+ [1: 1.0, 3: 5.5]
+ >>> print Vectors.sparse(4, [(1, 1.0), (3, 5.5)])
+ [1: 1.0, 3: 5.5]
+ >>> print Vectors.sparse(4, [1, 3], [1.0, 5.5])
+ [1: 1.0, 3: 5.5]
+ """
+ return SparseVector(size, *args)
+
+ @staticmethod
+ def dense(elements):
+ """
+ Create a dense vector of 64-bit floats from a Python list. Always
+ returns a NumPy array.
+
+ >>> Vectors.dense([1, 2, 3])
+ array([ 1., 2., 3.])
+ """
+ return array(elements, dtype=float64)
+
+
+def _test():
+ import doctest
+ (failure_count, test_count) = doctest.testmod(optionflags=doctest.ELLIPSIS)
+ if failure_count:
+ exit(-1)
+
+if __name__ == "__main__":
+ _test()
diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py
index 7656db07f6..266b31d3fa 100644
--- a/python/pyspark/mllib/regression.py
+++ b/python/pyspark/mllib/regression.py
@@ -15,41 +15,98 @@
# limitations under the License.
#
-from numpy import array, dot
+from numpy import array, ndarray
from pyspark import SparkContext
from pyspark.mllib._common import \
- _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
+ _dot, _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
_serialize_double_matrix, _deserialize_double_matrix, \
_serialize_double_vector, _deserialize_double_vector, \
_get_initial_weights, _serialize_rating, _regression_train_wrapper, \
- _linear_predictor_typecheck
+ _linear_predictor_typecheck, _have_scipy, _scipy_issparse
+from pyspark.mllib.linalg import SparseVector
+
+
+class LabeledPoint(object):
+ """
+ The features and labels of a data point.
+
+ @param label: Label for this data point.
+ @param features: Vector of features for this point (NumPy array, list,
+ pyspark.mllib.linalg.SparseVector, or scipy.sparse column matrix)
+ """
+ def __init__(self, label, features):
+ self.label = label
+ if (type(features) == ndarray or type(features) == SparseVector
+ or (_have_scipy and _scipy_issparse(features))):
+ self.features = features
+ elif type(features) == list:
+ self.features = array(features)
+ else:
+ raise TypeError("Expected NumPy array, list, SparseVector, or scipy.sparse matrix")
+
class LinearModel(object):
- """Something that has a vector of coefficients and an intercept."""
- def __init__(self, coeff, intercept):
- self._coeff = coeff
+ """A linear model that has a vector of coefficients and an intercept."""
+ def __init__(self, weights, intercept):
+ self._coeff = weights
self._intercept = intercept
+ @property
+ def weights(self):
+ return self._coeff
+
+ @property
+ def intercept(self):
+ return self._intercept
+
+
class LinearRegressionModelBase(LinearModel):
"""A linear regression model.
>>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1)
>>> abs(lrmb.predict(array([-1.03, 7.777])) - 14.624) < 1e-6
True
+ >>> abs(lrmb.predict(SparseVector(2, {0: -1.03, 1: 7.777})) - 14.624) < 1e-6
+ True
"""
def predict(self, x):
"""Predict the value of the dependent variable given a vector x"""
"""containing values for the independent variables."""
_linear_predictor_typecheck(x, self._coeff)
- return dot(self._coeff, x) + self._intercept
+ return _dot(x, self._coeff) + self._intercept
+
class LinearRegressionModel(LinearRegressionModelBase):
"""A linear regression model derived from a least-squares fit.
- >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
+ >>> from pyspark.mllib.regression import LabeledPoint
+ >>> data = [
+ ... LabeledPoint(0.0, [0.0]),
+ ... LabeledPoint(1.0, [1.0]),
+ ... LabeledPoint(3.0, [2.0]),
+ ... LabeledPoint(2.0, [3.0])
+ ... ]
>>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ True
+ >>> abs(lrm.predict(array([1.0])) - 1) < 0.5
+ True
+ >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
+ True
+ >>> data = [
+ ... LabeledPoint(0.0, SparseVector(1, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(1, {0: 1.0})),
+ ... LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
+ ... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
+ ... ]
+ >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ True
+ >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
+ True
"""
+
class LinearRegressionWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0,
@@ -61,14 +118,39 @@ class LinearRegressionWithSGD(object):
d._jrdd, iterations, step, miniBatchFraction, i),
LinearRegressionModel, data, initialWeights)
+
class LassoModel(LinearRegressionModelBase):
"""A linear regression model derived from a least-squares fit with an
l_1 penalty term.
- >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
+ >>> from pyspark.mllib.regression import LabeledPoint
+ >>> data = [
+ ... LabeledPoint(0.0, [0.0]),
+ ... LabeledPoint(1.0, [1.0]),
+ ... LabeledPoint(3.0, [2.0]),
+ ... LabeledPoint(2.0, [3.0])
+ ... ]
>>> lrm = LassoWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ True
+ >>> abs(lrm.predict(array([1.0])) - 1) < 0.5
+ True
+ >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
+ True
+ >>> data = [
+ ... LabeledPoint(0.0, SparseVector(1, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(1, {0: 1.0})),
+ ... LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
+ ... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
+ ... ]
+ >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ True
+ >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
+ True
"""
+
class LassoWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0, regParam=1.0,
@@ -80,14 +162,39 @@ class LassoWithSGD(object):
iterations, step, regParam, miniBatchFraction, i),
LassoModel, data, initialWeights)
+
class RidgeRegressionModel(LinearRegressionModelBase):
"""A linear regression model derived from a least-squares fit with an
l_2 penalty term.
- >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
+ >>> from pyspark.mllib.regression import LabeledPoint
+ >>> data = [
+ ... LabeledPoint(0.0, [0.0]),
+ ... LabeledPoint(1.0, [1.0]),
+ ... LabeledPoint(3.0, [2.0]),
+ ... LabeledPoint(2.0, [3.0])
+ ... ]
>>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ True
+ >>> abs(lrm.predict(array([1.0])) - 1) < 0.5
+ True
+ >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
+ True
+ >>> data = [
+ ... LabeledPoint(0.0, SparseVector(1, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(1, {0: 1.0})),
+ ... LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
+ ... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
+ ... ]
+ >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ True
+ >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
+ True
"""
+
class RidgeRegressionWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0, regParam=1.0,
@@ -99,6 +206,7 @@ class RidgeRegressionWithSGD(object):
iterations, step, regParam, miniBatchFraction, i),
RidgeRegressionModel, data, initialWeights)
+
def _test():
import doctest
globs = globals().copy()
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
new file mode 100644
index 0000000000..d4771d779f
--- /dev/null
+++ b/python/pyspark/mllib/tests.py
@@ -0,0 +1,302 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Fuller unit tests for Python MLlib.
+"""
+
+from numpy import array, array_equal
+import unittest
+
+from pyspark.mllib._common import _convert_vector, _serialize_double_vector, \
+ _deserialize_double_vector, _dot, _squared_distance
+from pyspark.mllib.linalg import SparseVector
+from pyspark.mllib.regression import LabeledPoint
+from pyspark.tests import PySparkTestCase
+
+
+_have_scipy = False
+try:
+ import scipy.sparse
+ _have_scipy = True
+except:
+ # No SciPy, but that's okay, we'll skip those tests
+ pass
+
+
+class VectorTests(unittest.TestCase):
+ def test_serialize(self):
+ sv = SparseVector(4, {1: 1, 3: 2})
+ dv = array([1., 2., 3., 4.])
+ lst = [1, 2, 3, 4]
+ self.assertTrue(sv is _convert_vector(sv))
+ self.assertTrue(dv is _convert_vector(dv))
+ self.assertTrue(array_equal(dv, _convert_vector(lst)))
+ self.assertEquals(sv,
+ _deserialize_double_vector(_serialize_double_vector(sv)))
+ self.assertTrue(array_equal(dv,
+ _deserialize_double_vector(_serialize_double_vector(dv))))
+ self.assertTrue(array_equal(dv,
+ _deserialize_double_vector(_serialize_double_vector(lst))))
+
+ def test_dot(self):
+ sv = SparseVector(4, {1: 1, 3: 2})
+ dv = array([1., 2., 3., 4.])
+ lst = [1, 2, 3, 4]
+ mat = array([[1., 2., 3., 4.],
+ [1., 2., 3., 4.],
+ [1., 2., 3., 4.],
+ [1., 2., 3., 4.]])
+ self.assertEquals(10.0, _dot(sv, dv))
+ self.assertTrue(array_equal(array([3., 6., 9., 12.]), _dot(sv, mat)))
+ self.assertEquals(30.0, _dot(dv, dv))
+ self.assertTrue(array_equal(array([10., 20., 30., 40.]), _dot(dv, mat)))
+ self.assertEquals(30.0, _dot(lst, dv))
+ self.assertTrue(array_equal(array([10., 20., 30., 40.]), _dot(lst, mat)))
+
+ def test_squared_distance(self):
+ sv = SparseVector(4, {1: 1, 3: 2})
+ dv = array([1., 2., 3., 4.])
+ lst = [4, 3, 2, 1]
+ self.assertEquals(15.0, _squared_distance(sv, dv))
+ self.assertEquals(25.0, _squared_distance(sv, lst))
+ self.assertEquals(20.0, _squared_distance(dv, lst))
+ self.assertEquals(15.0, _squared_distance(dv, sv))
+ self.assertEquals(25.0, _squared_distance(lst, sv))
+ self.assertEquals(20.0, _squared_distance(lst, dv))
+ self.assertEquals(0.0, _squared_distance(sv, sv))
+ self.assertEquals(0.0, _squared_distance(dv, dv))
+ self.assertEquals(0.0, _squared_distance(lst, lst))
+
+
+class ListTests(PySparkTestCase):
+ """
+ Test MLlib algorithms on plain lists, to make sure they're passed through
+ as NumPy arrays.
+ """
+
+ def test_clustering(self):
+ from pyspark.mllib.clustering import KMeans
+ data = [
+ [0, 1.1],
+ [0, 1.2],
+ [1.1, 0],
+ [1.2, 0],
+ ]
+ clusters = KMeans.train(self.sc.parallelize(data), 2, initializationMode="k-means||")
+ self.assertEquals(clusters.predict(data[0]), clusters.predict(data[1]))
+ self.assertEquals(clusters.predict(data[2]), clusters.predict(data[3]))
+
+ def test_classification(self):
+ from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD, NaiveBayes
+ data = [
+ LabeledPoint(0.0, [1, 0]),
+ LabeledPoint(1.0, [0, 1]),
+ LabeledPoint(0.0, [2, 0]),
+ LabeledPoint(1.0, [0, 2])
+ ]
+ rdd = self.sc.parallelize(data)
+ features = [p.features.tolist() for p in data]
+
+ lr_model = LogisticRegressionWithSGD.train(rdd)
+ self.assertTrue(lr_model.predict(features[0]) <= 0)
+ self.assertTrue(lr_model.predict(features[1]) > 0)
+ self.assertTrue(lr_model.predict(features[2]) <= 0)
+ self.assertTrue(lr_model.predict(features[3]) > 0)
+
+ svm_model = SVMWithSGD.train(rdd)
+ self.assertTrue(svm_model.predict(features[0]) <= 0)
+ self.assertTrue(svm_model.predict(features[1]) > 0)
+ self.assertTrue(svm_model.predict(features[2]) <= 0)
+ self.assertTrue(svm_model.predict(features[3]) > 0)
+
+ nb_model = NaiveBayes.train(rdd)
+ self.assertTrue(nb_model.predict(features[0]) <= 0)
+ self.assertTrue(nb_model.predict(features[1]) > 0)
+ self.assertTrue(nb_model.predict(features[2]) <= 0)
+ self.assertTrue(nb_model.predict(features[3]) > 0)
+
+ def test_regression(self):
+ from pyspark.mllib.regression import LinearRegressionWithSGD, LassoWithSGD, \
+ RidgeRegressionWithSGD
+ data = [
+ LabeledPoint(-1.0, [0, -1]),
+ LabeledPoint(1.0, [0, 1]),
+ LabeledPoint(-1.0, [0, -2]),
+ LabeledPoint(1.0, [0, 2])
+ ]
+ rdd = self.sc.parallelize(data)
+ features = [p.features.tolist() for p in data]
+
+ lr_model = LinearRegressionWithSGD.train(rdd)
+ self.assertTrue(lr_model.predict(features[0]) <= 0)
+ self.assertTrue(lr_model.predict(features[1]) > 0)
+ self.assertTrue(lr_model.predict(features[2]) <= 0)
+ self.assertTrue(lr_model.predict(features[3]) > 0)
+
+ lasso_model = LassoWithSGD.train(rdd)
+ self.assertTrue(lasso_model.predict(features[0]) <= 0)
+ self.assertTrue(lasso_model.predict(features[1]) > 0)
+ self.assertTrue(lasso_model.predict(features[2]) <= 0)
+ self.assertTrue(lasso_model.predict(features[3]) > 0)
+
+ rr_model = RidgeRegressionWithSGD.train(rdd)
+ self.assertTrue(rr_model.predict(features[0]) <= 0)
+ self.assertTrue(rr_model.predict(features[1]) > 0)
+ self.assertTrue(rr_model.predict(features[2]) <= 0)
+ self.assertTrue(rr_model.predict(features[3]) > 0)
+
+
+@unittest.skipIf(not _have_scipy, "SciPy not installed")
+class SciPyTests(PySparkTestCase):
+ """
+ Test both vector operations and MLlib algorithms with SciPy sparse matrices,
+ if SciPy is available.
+ """
+
+ def test_serialize(self):
+ from scipy.sparse import lil_matrix
+ lil = lil_matrix((4, 1))
+ lil[1, 0] = 1
+ lil[3, 0] = 2
+ sv = SparseVector(4, {1: 1, 3: 2})
+ self.assertEquals(sv, _convert_vector(lil))
+ self.assertEquals(sv, _convert_vector(lil.tocsc()))
+ self.assertEquals(sv, _convert_vector(lil.tocoo()))
+ self.assertEquals(sv, _convert_vector(lil.tocsr()))
+ self.assertEquals(sv, _convert_vector(lil.todok()))
+ self.assertEquals(sv,
+ _deserialize_double_vector(_serialize_double_vector(lil)))
+ self.assertEquals(sv,
+ _deserialize_double_vector(_serialize_double_vector(lil.tocsc())))
+ self.assertEquals(sv,
+ _deserialize_double_vector(_serialize_double_vector(lil.tocsr())))
+ self.assertEquals(sv,
+ _deserialize_double_vector(_serialize_double_vector(lil.todok())))
+
+ def test_dot(self):
+ from scipy.sparse import lil_matrix
+ lil = lil_matrix((4, 1))
+ lil[1, 0] = 1
+ lil[3, 0] = 2
+ dv = array([1., 2., 3., 4.])
+ sv = SparseVector(4, {0: 1, 1: 2, 2: 3, 3: 4})
+ mat = array([[1., 2., 3., 4.],
+ [1., 2., 3., 4.],
+ [1., 2., 3., 4.],
+ [1., 2., 3., 4.]])
+ self.assertEquals(10.0, _dot(lil, dv))
+ self.assertTrue(array_equal(array([3., 6., 9., 12.]), _dot(lil, mat)))
+
+ def test_squared_distance(self):
+ from scipy.sparse import lil_matrix
+ lil = lil_matrix((4, 1))
+ lil[1, 0] = 3
+ lil[3, 0] = 2
+ dv = array([1., 2., 3., 4.])
+ sv = SparseVector(4, {0: 1, 1: 2, 2: 3, 3: 4})
+ self.assertEquals(15.0, _squared_distance(lil, dv))
+ self.assertEquals(15.0, _squared_distance(lil, sv))
+ self.assertEquals(15.0, _squared_distance(dv, lil))
+ self.assertEquals(15.0, _squared_distance(sv, lil))
+
+ def scipy_matrix(self, size, values):
+ """Create a column SciPy matrix from a dictionary of values"""
+ from scipy.sparse import lil_matrix
+ lil = lil_matrix((size, 1))
+ for key, value in values.items():
+ lil[key, 0] = value
+ return lil
+
+ def test_clustering(self):
+ from pyspark.mllib.clustering import KMeans
+ data = [
+ self.scipy_matrix(3, {1: 1.0}),
+ self.scipy_matrix(3, {1: 1.1}),
+ self.scipy_matrix(3, {2: 1.0}),
+ self.scipy_matrix(3, {2: 1.1})
+ ]
+ clusters = KMeans.train(self.sc.parallelize(data), 2, initializationMode="k-means||")
+ self.assertEquals(clusters.predict(data[0]), clusters.predict(data[1]))
+ self.assertEquals(clusters.predict(data[2]), clusters.predict(data[3]))
+
+ def test_classification(self):
+ from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD, NaiveBayes
+ data = [
+ LabeledPoint(0.0, self.scipy_matrix(2, {0: 1.0})),
+ LabeledPoint(1.0, self.scipy_matrix(2, {1: 1.0})),
+ LabeledPoint(0.0, self.scipy_matrix(2, {0: 2.0})),
+ LabeledPoint(1.0, self.scipy_matrix(2, {1: 2.0}))
+ ]
+ rdd = self.sc.parallelize(data)
+ features = [p.features for p in data]
+
+ lr_model = LogisticRegressionWithSGD.train(rdd)
+ self.assertTrue(lr_model.predict(features[0]) <= 0)
+ self.assertTrue(lr_model.predict(features[1]) > 0)
+ self.assertTrue(lr_model.predict(features[2]) <= 0)
+ self.assertTrue(lr_model.predict(features[3]) > 0)
+
+ svm_model = SVMWithSGD.train(rdd)
+ self.assertTrue(svm_model.predict(features[0]) <= 0)
+ self.assertTrue(svm_model.predict(features[1]) > 0)
+ self.assertTrue(svm_model.predict(features[2]) <= 0)
+ self.assertTrue(svm_model.predict(features[3]) > 0)
+
+ nb_model = NaiveBayes.train(rdd)
+ self.assertTrue(nb_model.predict(features[0]) <= 0)
+ self.assertTrue(nb_model.predict(features[1]) > 0)
+ self.assertTrue(nb_model.predict(features[2]) <= 0)
+ self.assertTrue(nb_model.predict(features[3]) > 0)
+
+ def test_regression(self):
+ from pyspark.mllib.regression import LinearRegressionWithSGD, LassoWithSGD, \
+ RidgeRegressionWithSGD
+ data = [
+ LabeledPoint(-1.0, self.scipy_matrix(2, {1: -1.0})),
+ LabeledPoint(1.0, self.scipy_matrix(2, {1: 1.0})),
+ LabeledPoint(-1.0, self.scipy_matrix(2, {1: -2.0})),
+ LabeledPoint(1.0, self.scipy_matrix(2, {1: 2.0}))
+ ]
+ rdd = self.sc.parallelize(data)
+ features = [p.features for p in data]
+
+ lr_model = LinearRegressionWithSGD.train(rdd)
+ self.assertTrue(lr_model.predict(features[0]) <= 0)
+ self.assertTrue(lr_model.predict(features[1]) > 0)
+ self.assertTrue(lr_model.predict(features[2]) <= 0)
+ self.assertTrue(lr_model.predict(features[3]) > 0)
+
+ lasso_model = LassoWithSGD.train(rdd)
+ self.assertTrue(lasso_model.predict(features[0]) <= 0)
+ self.assertTrue(lasso_model.predict(features[1]) > 0)
+ self.assertTrue(lasso_model.predict(features[2]) <= 0)
+ self.assertTrue(lasso_model.predict(features[3]) > 0)
+
+ rr_model = RidgeRegressionWithSGD.train(rdd)
+ self.assertTrue(rr_model.predict(features[0]) <= 0)
+ self.assertTrue(rr_model.predict(features[1]) > 0)
+ self.assertTrue(rr_model.predict(features[2]) <= 0)
+ self.assertTrue(rr_model.predict(features[3]) > 0)
+
+
+if __name__ == "__main__":
+ if not _have_scipy:
+ print "NOTE: Skipping SciPy tests as it does not seem to be installed"
+ unittest.main()
+ if not _have_scipy:
+ print "NOTE: SciPy tests were skipped as it does not seem to be installed"