aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorMechCoder <manojkumarsivaraj334@gmail.com>2015-04-09 23:10:13 -0700
committerXiangrui Meng <meng@databricks.com>2015-04-09 23:10:13 -0700
commite2360810f50de77f79d372cc9b46db117d451cfc (patch)
tree7a0d651141c4149fe36dd2276c66f5ab2214185f /python
parentb5c51c8df480f1a82a82e4d597d8eea631bffb4e (diff)
downloadspark-e2360810f50de77f79d372cc9b46db117d451cfc.tar.gz
spark-e2360810f50de77f79d372cc9b46db117d451cfc.tar.bz2
spark-e2360810f50de77f79d372cc9b46db117d451cfc.zip
[SPARK-6577] [MLlib] [PySpark] SparseMatrix should be supported in PySpark
Supporting of SparseMatrix in PySpark. Author: MechCoder <manojkumarsivaraj334@gmail.com> Closes #5355 from MechCoder/spark-6577 and squashes the following commits: 7492190 [MechCoder] More readable code for densifying ea2c54b [MechCoder] Check bounds for indexing 454ef2c [MechCoder] Made the following changes 1. Used convert_to_array for array conversion. 2. Used F order for toArray 3. Minor improvements in speed. db76caf [MechCoder] Add support for CSR matrix 29653e7 [MechCoder] Renamed indices to rowIndices and indptr to colPtrs b6384fe [MechCoder] [SPARK-6577] SparseMatrix should be supported in PySpark
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/mllib/linalg.py110
-rw-r--r--python/pyspark/mllib/tests.py52
2 files changed, 154 insertions, 8 deletions
diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py
index 51c1490b16..a80320c52d 100644
--- a/python/pyspark/mllib/linalg.py
+++ b/python/pyspark/mllib/linalg.py
@@ -640,6 +640,15 @@ class Matrix(object):
"""
raise NotImplementedError
+ @staticmethod
+ def _convert_to_array(array_like, dtype):
+ """
+ Convert Matrix attributes which are array-like or buffer to array.
+ """
+ if isinstance(array_like, basestring):
+ return np.frombuffer(array_like, dtype=dtype)
+ return np.asarray(array_like, dtype=dtype)
+
class DenseMatrix(Matrix):
"""
@@ -647,13 +656,8 @@ class DenseMatrix(Matrix):
"""
def __init__(self, numRows, numCols, values):
Matrix.__init__(self, numRows, numCols)
- if isinstance(values, basestring):
- values = np.frombuffer(values, dtype=np.float64)
- elif not isinstance(values, np.ndarray):
- values = np.array(values, dtype=np.float64)
+ values = self._convert_to_array(values, np.float64)
assert len(values) == numRows * numCols
- if values.dtype != np.float64:
- values.astype(np.float64)
self.values = values
def __reduce__(self):
@@ -670,6 +674,17 @@ class DenseMatrix(Matrix):
"""
return self.values.reshape((self.numRows, self.numCols), order='F')
+ def toSparse(self):
+ """Convert to SparseMatrix"""
+ indices = np.nonzero(self.values)[0]
+ colCounts = np.bincount(indices / self.numRows)
+ colPtrs = np.cumsum(np.hstack(
+ (0, colCounts, np.zeros(self.numCols - colCounts.size))))
+ values = self.values[indices]
+ rowIndices = indices % self.numRows
+
+ return SparseMatrix(self.numRows, self.numCols, colPtrs, rowIndices, values)
+
def __getitem__(self, indices):
i, j = indices
if i < 0 or i >= self.numRows:
@@ -687,6 +702,82 @@ class DenseMatrix(Matrix):
all(self.values == other.values))
+class SparseMatrix(Matrix):
+ """Sparse Matrix stored in CSC format."""
+ def __init__(self, numRows, numCols, colPtrs, rowIndices, values,
+ isTransposed=False):
+ Matrix.__init__(self, numRows, numCols)
+ self.isTransposed = isTransposed
+ self.colPtrs = self._convert_to_array(colPtrs, np.int32)
+ self.rowIndices = self._convert_to_array(rowIndices, np.int32)
+ self.values = self._convert_to_array(values, np.float64)
+
+ if self.isTransposed:
+ if self.colPtrs.size != numRows + 1:
+ raise ValueError("Expected colPtrs of size %d, got %d."
+ % (numRows + 1, self.colPtrs.size))
+ else:
+ if self.colPtrs.size != numCols + 1:
+ raise ValueError("Expected colPtrs of size %d, got %d."
+ % (numCols + 1, self.colPtrs.size))
+ if self.rowIndices.size != self.values.size:
+ raise ValueError("Expected rowIndices of length %d, got %d."
+ % (self.rowIndices.size, self.values.size))
+
+ def __reduce__(self):
+ return SparseMatrix, (
+ self.numRows, self.numCols, self.colPtrs.tostring(),
+ self.rowIndices.tostring(), self.values.tostring(),
+ self.isTransposed)
+
+ def __getitem__(self, indices):
+ i, j = indices
+ if i < 0 or i >= self.numRows:
+ raise ValueError("Row index %d is out of range [0, %d)"
+ % (i, self.numRows))
+ if j < 0 or j >= self.numCols:
+ raise ValueError("Column index %d is out of range [0, %d)"
+ % (j, self.numCols))
+
+ # If a CSR matrix is given, then the row index should be searched
+ # for in ColPtrs, and the column index should be searched for in the
+ # corresponding slice obtained from rowIndices.
+ if self.isTransposed:
+ j, i = i, j
+
+ colStart = self.colPtrs[j]
+ colEnd = self.colPtrs[j + 1]
+ nz = self.rowIndices[colStart: colEnd]
+ ind = np.searchsorted(nz, i) + colStart
+ if ind < colEnd and self.rowIndices[ind] == i:
+ return self.values[ind]
+ else:
+ return 0.0
+
+ def toArray(self):
+ """
+ Return an numpy.ndarray
+ """
+ A = np.zeros((self.numRows, self.numCols), dtype=np.float64, order='F')
+ for k in xrange(self.colPtrs.size - 1):
+ startptr = self.colPtrs[k]
+ endptr = self.colPtrs[k + 1]
+ if self.isTransposed:
+ A[k, self.rowIndices[startptr:endptr]] = self.values[startptr:endptr]
+ else:
+ A[self.rowIndices[startptr:endptr], k] = self.values[startptr:endptr]
+ return A
+
+ def toDense(self):
+ densevals = np.reshape(
+ self.toArray(), (self.numRows * self.numCols), order='F')
+ return DenseMatrix(self.numRows, self.numCols, densevals)
+
+ # TODO: More efficient implementation:
+ def __eq__(self, other):
+ return np.all(self.toArray == other.toArray)
+
+
class Matrices(object):
@staticmethod
def dense(numRows, numCols, values):
@@ -695,6 +786,13 @@ class Matrices(object):
"""
return DenseMatrix(numRows, numCols, values)
+ @staticmethod
+ def sparse(numRows, numCols, colPtrs, rowIndices, values):
+ """
+ Create a SparseMatrix
+ """
+ return SparseMatrix(numRows, numCols, colPtrs, rowIndices, values)
+
def _test():
import doctest
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index 61ef398487..3b40158c12 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -24,7 +24,7 @@ import sys
import tempfile
import array as pyarray
-from numpy import array, array_equal
+from numpy import array, array_equal, zeros
from py4j.protocol import Py4JJavaError
if sys.version_info[:2] <= (2, 6):
@@ -38,7 +38,7 @@ else:
from pyspark.mllib.common import _to_java_object_rdd
from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, _convert_to_vector,\
- DenseMatrix, Vectors, Matrices
+ DenseMatrix, SparseMatrix, Vectors, Matrices
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.random import RandomRDDs
from pyspark.mllib.stat import Statistics
@@ -144,6 +144,54 @@ class VectorTests(PySparkTestCase):
for j in range(2):
self.assertEquals(mat[i, j], expected[i][j])
+ def test_sparse_matrix(self):
+ # Test sparse matrix creation.
+ sm1 = SparseMatrix(
+ 3, 4, [0, 2, 2, 4, 4], [1, 2, 1, 2], [1.0, 2.0, 4.0, 5.0])
+ self.assertEquals(sm1.numRows, 3)
+ self.assertEquals(sm1.numCols, 4)
+ self.assertEquals(sm1.colPtrs.tolist(), [0, 2, 2, 4, 4])
+ self.assertEquals(sm1.rowIndices.tolist(), [1, 2, 1, 2])
+ self.assertEquals(sm1.values.tolist(), [1.0, 2.0, 4.0, 5.0])
+
+ # Test indexing
+ expected = [
+ [0, 0, 0, 0],
+ [1, 0, 4, 0],
+ [2, 0, 5, 0]]
+
+ for i in range(3):
+ for j in range(4):
+ self.assertEquals(expected[i][j], sm1[i, j])
+ self.assertTrue(array_equal(sm1.toArray(), expected))
+
+ # Test conversion to dense and sparse.
+ smnew = sm1.toDense().toSparse()
+ self.assertEquals(sm1.numRows, smnew.numRows)
+ self.assertEquals(sm1.numCols, smnew.numCols)
+ self.assertTrue(array_equal(sm1.colPtrs, smnew.colPtrs))
+ self.assertTrue(array_equal(sm1.rowIndices, smnew.rowIndices))
+ self.assertTrue(array_equal(sm1.values, smnew.values))
+
+ sm1t = SparseMatrix(
+ 3, 4, [0, 2, 3, 5], [0, 1, 2, 0, 2], [3.0, 2.0, 4.0, 9.0, 8.0],
+ isTransposed=True)
+ self.assertEquals(sm1t.numRows, 3)
+ self.assertEquals(sm1t.numCols, 4)
+ self.assertEquals(sm1t.colPtrs.tolist(), [0, 2, 3, 5])
+ self.assertEquals(sm1t.rowIndices.tolist(), [0, 1, 2, 0, 2])
+ self.assertEquals(sm1t.values.tolist(), [3.0, 2.0, 4.0, 9.0, 8.0])
+
+ expected = [
+ [3, 2, 0, 0],
+ [0, 0, 4, 0],
+ [9, 0, 8, 0]]
+
+ for i in range(3):
+ for j in range(4):
+ self.assertEquals(expected[i][j], sm1t[i, j])
+ self.assertTrue(array_equal(sm1t.toArray(), expected))
+
class ListTests(PySparkTestCase):