aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/mllib/linalg.py
diff options
context:
space:
mode:
authorDavies Liu <davies.liu@gmail.com>2014-09-19 15:01:11 -0700
committerXiangrui Meng <meng@databricks.com>2014-09-19 15:01:11 -0700
commitfce5e251d636c788cda91345867e0294280c074d (patch)
tree4bded23a826bcfeb02deef73bd735cf0a05d4ee7 /python/pyspark/mllib/linalg.py
parenta03e5b81e91d9d792b6a2e01d1505394ea303dd8 (diff)
downloadspark-fce5e251d636c788cda91345867e0294280c074d.tar.gz
spark-fce5e251d636c788cda91345867e0294280c074d.tar.bz2
spark-fce5e251d636c788cda91345867e0294280c074d.zip
[SPARK-3491] [MLlib] [PySpark] use pickle to serialize data in MLlib
Currently, we serialize the data between JVM and Python case by case manually, this cannot scale to support so many APIs in MLlib. This patch will try to address this problem by serialize the data using pickle protocol, using Pyrolite library to serialize/deserialize in JVM. Pickle protocol can be easily extended to support customized class. All the modules are refactored to use this protocol. Known issues: There will be some performance regression (both CPU and memory, the serialized data increased) Author: Davies Liu <davies.liu@gmail.com> Closes #2378 from davies/pickle_mllib and squashes the following commits: dffbba2 [Davies Liu] Merge branch 'master' of github.com:apache/spark into pickle_mllib 810f97f [Davies Liu] fix equal of matrix 032cd62 [Davies Liu] add more type check and conversion for user_product bd738ab [Davies Liu] address comments e431377 [Davies Liu] fix cache of rdd, refactor 19d0967 [Davies Liu] refactor Picklers 2511e76 [Davies Liu] cleanup 1fccf1a [Davies Liu] address comments a2cc855 [Davies Liu] fix tests 9ceff73 [Davies Liu] test size of serialized Rating 44e0551 [Davies Liu] fix cache a379a81 [Davies Liu] fix pickle array in python2.7 df625c7 [Davies Liu] Merge commit '154d141' into pickle_mllib 154d141 [Davies Liu] fix autobatchedpickler 44736d7 [Davies Liu] speed up pickling array in Python 2.7 e1d1bfc [Davies Liu] refactor 708dc02 [Davies Liu] fix tests 9dcfb63 [Davies Liu] fix style 88034f0 [Davies Liu] rafactor, address comments 46a501e [Davies Liu] choose batch size automatically df19464 [Davies Liu] memorize the module and class name during pickleing f3506c5 [Davies Liu] Merge branch 'master' into pickle_mllib 722dd96 [Davies Liu] cleanup _common.py 0ee1525 [Davies Liu] remove outdated tests b02e34f [Davies Liu] remove _common.py 84c721d [Davies Liu] Merge branch 'master' into pickle_mllib 4d7963e [Davies Liu] remove muanlly serialization 6d26b03 [Davies Liu] fix tests c383544 [Davies Liu] classification f2a0856 [Davies Liu] mllib/regression d9f691f [Davies Liu] mllib/util cccb8b1 [Davies Liu] mllib/tree 8fe166a [Davies Liu] Merge branch 'pickle' into pickle_mllib aa2287e [Davies Liu] random f1544c4 [Davies Liu] refactor clustering 52d1350 [Davies Liu] use new protocol in mllib/stat b30ef35 [Davies Liu] use pickle to serialize data for mllib/recommendation f44f771 [Davies Liu] enable tests about array 3908f5c [Davies Liu] Merge branch 'master' into pickle c77c87b [Davies Liu] cleanup debugging code 60e4e2f [Davies Liu] support unpickle array.array for Python 2.6
Diffstat (limited to 'python/pyspark/mllib/linalg.py')
-rw-r--r--python/pyspark/mllib/linalg.py256
1 files changed, 219 insertions, 37 deletions
diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py
index e69051c104..0a5dcaac55 100644
--- a/python/pyspark/mllib/linalg.py
+++ b/python/pyspark/mllib/linalg.py
@@ -23,14 +23,148 @@ object from MLlib or pass SciPy C{scipy.sparse} column vectors if
SciPy is available in their environment.
"""
-import numpy
-from numpy import array, array_equal, ndarray, float64, int32
+import sys
+import array
+import copy_reg
+import numpy as np
-__all__ = ['SparseVector', 'Vectors']
+__all__ = ['Vector', 'DenseVector', 'SparseVector', 'Vectors']
-class SparseVector(object):
+if sys.version_info[:2] == (2, 7):
+ # speed up pickling array in Python 2.7
+ def fast_pickle_array(ar):
+ return array.array, (ar.typecode, ar.tostring())
+ copy_reg.pickle(array.array, fast_pickle_array)
+
+
+# Check whether we have SciPy. MLlib works without it too, but if we have it, some methods,
+# such as _dot and _serialize_double_vector, start to support scipy.sparse matrices.
+
+try:
+ import scipy.sparse
+ _have_scipy = True
+except:
+ # No SciPy in environment, but that's okay
+ _have_scipy = False
+
+
+def _convert_to_vector(l):
+ if isinstance(l, Vector):
+ return l
+ elif type(l) in (array.array, np.array, np.ndarray, list, tuple):
+ return DenseVector(l)
+ elif _have_scipy and scipy.sparse.issparse(l):
+ assert l.shape[1] == 1, "Expected column vector"
+ csc = l.tocsc()
+ return SparseVector(l.shape[0], csc.indices, csc.data)
+ else:
+ raise TypeError("Cannot convert type %s into Vector" % type(l))
+
+
+class Vector(object):
+ """
+ Abstract class for DenseVector and SparseVector
+ """
+ def toArray(self):
+ """
+ Convert the vector into an numpy.ndarray
+ :return: numpy.ndarray
+ """
+ raise NotImplementedError
+
+
+class DenseVector(Vector):
+ def __init__(self, ar):
+ if not isinstance(ar, array.array):
+ ar = array.array('d', ar)
+ self.array = ar
+
+ def __reduce__(self):
+ return DenseVector, (self.array,)
+
+ def dot(self, other):
+ """
+ Compute the dot product of two Vectors. We support
+ (Numpy array, list, SparseVector, or SciPy sparse)
+ and a target NumPy array that is either 1- or 2-dimensional.
+ Equivalent to calling numpy.dot of the two vectors.
+
+ >>> dense = DenseVector(array.array('d', [1., 2.]))
+ >>> dense.dot(dense)
+ 5.0
+ >>> dense.dot(SparseVector(2, [0, 1], [2., 1.]))
+ 4.0
+ >>> dense.dot(range(1, 3))
+ 5.0
+ >>> dense.dot(np.array(range(1, 3)))
+ 5.0
+ """
+ if isinstance(other, SparseVector):
+ return other.dot(self)
+ elif _have_scipy and scipy.sparse.issparse(other):
+ return other.transpose().dot(self.toArray())[0]
+ elif isinstance(other, Vector):
+ return np.dot(self.toArray(), other.toArray())
+ else:
+ return np.dot(self.toArray(), other)
+
+ def squared_distance(self, other):
+ """
+ Squared distance of two Vectors.
+
+ >>> dense1 = DenseVector(array.array('d', [1., 2.]))
+ >>> dense1.squared_distance(dense1)
+ 0.0
+ >>> dense2 = np.array([2., 1.])
+ >>> dense1.squared_distance(dense2)
+ 2.0
+ >>> dense3 = [2., 1.]
+ >>> dense1.squared_distance(dense3)
+ 2.0
+ >>> sparse1 = SparseVector(2, [0, 1], [2., 1.])
+ >>> dense1.squared_distance(sparse1)
+ 2.0
+ """
+ if isinstance(other, SparseVector):
+ return other.squared_distance(self)
+ elif _have_scipy and scipy.sparse.issparse(other):
+ return _convert_to_vector(other).squared_distance(self)
+
+ if isinstance(other, Vector):
+ other = other.toArray()
+ elif not isinstance(other, np.ndarray):
+ other = np.array(other)
+ diff = self.toArray() - other
+ return np.dot(diff, diff)
+
+ def toArray(self):
+ return np.array(self.array)
+
+ def __getitem__(self, item):
+ return self.array[item]
+
+ def __len__(self):
+ return len(self.array)
+
+ def __str__(self):
+ return "[" + ",".join([str(v) for v in self.array]) + "]"
+
+ def __repr__(self):
+ return "DenseVector(%r)" % self.array
+
+ def __eq__(self, other):
+ return isinstance(other, DenseVector) and self.array == other.array
+
+ def __ne__(self, other):
+ return not self == other
+
+ def __getattr__(self, item):
+ return getattr(self.array, item)
+
+
+class SparseVector(Vector):
"""
A simple sparse vector class for passing data to MLlib. Users may
@@ -61,16 +195,19 @@ class SparseVector(object):
if type(pairs) == dict:
pairs = pairs.items()
pairs = sorted(pairs)
- self.indices = array([p[0] for p in pairs], dtype=int32)
- self.values = array([p[1] for p in pairs], dtype=float64)
+ self.indices = array.array('i', [p[0] for p in pairs])
+ self.values = array.array('d', [p[1] for p in pairs])
else:
assert len(args[0]) == len(args[1]), "index and value arrays not same length"
- self.indices = array(args[0], dtype=int32)
- self.values = array(args[1], dtype=float64)
+ self.indices = array.array('i', args[0])
+ self.values = array.array('d', args[1])
for i in xrange(len(self.indices) - 1):
if self.indices[i] >= self.indices[i + 1]:
raise TypeError("indices array must be sorted")
+ def __reduce__(self):
+ return (SparseVector, (self.size, self.indices, self.values))
+
def dot(self, other):
"""
Dot product with a SparseVector or 1- or 2-dimensional Numpy array.
@@ -78,15 +215,15 @@ class SparseVector(object):
>>> a = SparseVector(4, [1, 3], [3.0, 4.0])
>>> a.dot(a)
25.0
- >>> a.dot(array([1., 2., 3., 4.]))
+ >>> a.dot(array.array('d', [1., 2., 3., 4.]))
22.0
>>> b = SparseVector(4, [2, 4], [1.0, 2.0])
>>> a.dot(b)
0.0
- >>> a.dot(array([[1, 1], [2, 2], [3, 3], [4, 4]]))
+ >>> a.dot(np.array([[1, 1], [2, 2], [3, 3], [4, 4]]))
array([ 22., 22.])
"""
- if type(other) == ndarray:
+ if type(other) == np.ndarray:
if other.ndim == 1:
result = 0.0
for i in xrange(len(self.indices)):
@@ -94,10 +231,17 @@ class SparseVector(object):
return result
elif other.ndim == 2:
results = [self.dot(other[:, i]) for i in xrange(other.shape[1])]
- return array(results)
+ return np.array(results)
else:
raise Exception("Cannot call dot with %d-dimensional array" % other.ndim)
- else:
+
+ elif type(other) in (array.array, DenseVector):
+ result = 0.0
+ for i in xrange(len(self.indices)):
+ result += self.values[i] * other[self.indices[i]]
+ return result
+
+ elif type(other) is SparseVector:
result = 0.0
i, j = 0, 0
while i < len(self.indices) and j < len(other.indices):
@@ -110,6 +254,8 @@ class SparseVector(object):
else:
j += 1
return result
+ else:
+ return self.dot(_convert_to_vector(other))
def squared_distance(self, other):
"""
@@ -118,7 +264,9 @@ class SparseVector(object):
>>> a = SparseVector(4, [1, 3], [3.0, 4.0])
>>> a.squared_distance(a)
0.0
- >>> a.squared_distance(array([1., 2., 3., 4.]))
+ >>> a.squared_distance(array.array('d', [1., 2., 3., 4.]))
+ 11.0
+ >>> a.squared_distance(np.array([1., 2., 3., 4.]))
11.0
>>> b = SparseVector(4, [2, 4], [1.0, 2.0])
>>> a.squared_distance(b)
@@ -126,22 +274,22 @@ class SparseVector(object):
>>> b.squared_distance(a)
30.0
"""
- if type(other) == ndarray:
- if other.ndim == 1:
- result = 0.0
- j = 0 # index into our own array
- for i in xrange(other.shape[0]):
- if j < len(self.indices) and self.indices[j] == i:
- diff = self.values[j] - other[i]
- result += diff * diff
- j += 1
- else:
- result += other[i] * other[i]
- return result
- else:
+ if type(other) in (list, array.array, DenseVector, np.array, np.ndarray):
+ if type(other) is np.array and other.ndim != 1:
raise Exception("Cannot call squared_distance with %d-dimensional array" %
other.ndim)
- else:
+ result = 0.0
+ j = 0 # index into our own array
+ for i in xrange(len(other)):
+ if j < len(self.indices) and self.indices[j] == i:
+ diff = self.values[j] - other[i]
+ result += diff * diff
+ j += 1
+ else:
+ result += other[i] * other[i]
+ return result
+
+ elif type(other) is SparseVector:
result = 0.0
i, j = 0, 0
while i < len(self.indices) and j < len(other.indices):
@@ -163,16 +311,21 @@ class SparseVector(object):
result += other.values[j] * other.values[j]
j += 1
return result
+ else:
+ return self.squared_distance(_convert_to_vector(other))
def toArray(self):
"""
Returns a copy of this SparseVector as a 1-dimensional NumPy array.
"""
- arr = numpy.zeros(self.size)
+ arr = np.zeros((self.size,), dtype=np.float64)
for i in xrange(self.indices.size):
arr[self.indices[i]] = self.values[i]
return arr
+ def __len__(self):
+ return self.size
+
def __str__(self):
inds = "[" + ",".join([str(i) for i in self.indices]) + "]"
vals = "[" + ",".join([str(v) for v in self.values]) + "]"
@@ -198,8 +351,8 @@ class SparseVector(object):
return (isinstance(other, self.__class__)
and other.size == self.size
- and array_equal(other.indices, self.indices)
- and array_equal(other.values, self.values))
+ and other.indices == self.indices
+ and other.values == self.values)
def __ne__(self, other):
return not self.__eq__(other)
@@ -242,9 +395,9 @@ class Vectors(object):
returns a NumPy array.
>>> Vectors.dense([1, 2, 3])
- array([ 1., 2., 3.])
+ DenseVector(array('d', [1.0, 2.0, 3.0]))
"""
- return array(elements, dtype=float64)
+ return DenseVector(elements)
@staticmethod
def stringify(vector):
@@ -257,10 +410,39 @@ class Vectors(object):
>>> Vectors.stringify(Vectors.dense([0.0, 1.0]))
'[0.0,1.0]'
"""
- if type(vector) == SparseVector:
- return str(vector)
- else:
- return "[" + ",".join([str(v) for v in vector]) + "]"
+ return str(vector)
+
+
+class Matrix(object):
+ """ the Matrix """
+ def __init__(self, nRow, nCol):
+ self.nRow = nRow
+ self.nCol = nCol
+
+ def toArray(self):
+ raise NotImplementedError
+
+
+class DenseMatrix(Matrix):
+ def __init__(self, nRow, nCol, values):
+ Matrix.__init__(self, nRow, nCol)
+ assert len(values) == nRow * nCol
+ self.values = values
+
+ def __reduce__(self):
+ return DenseMatrix, (self.nRow, self.nCol, self.values)
+
+ def toArray(self):
+ """
+ Return an numpy.ndarray
+
+ >>> arr = array.array('d', [float(i) for i in range(4)])
+ >>> m = DenseMatrix(2, 2, arr)
+ >>> m.toArray()
+ array([[ 0., 1.],
+ [ 2., 3.]])
+ """
+ return np.ndarray((self.nRow, self.nCol), np.float64, buffer=self.values.tostring())
def _test():