aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Dusenberry <mwdusenb@us.ibm.com>2016-04-27 19:48:05 +0200
committerNick Pentreath <nickp@za.ibm.com>2016-04-27 19:48:05 +0200
commit607f50341c8d86f0034f3aae69a55f25d55a012e (patch)
treed6fd08b8855cdaecde4137a7d0d3153aa678e149
parenta234cc61465bbefafd9e69c1cabe9aaaf968a91f (diff)
downloadspark-607f50341c8d86f0034f3aae69a55f25d55a012e.tar.gz
spark-607f50341c8d86f0034f3aae69a55f25d55a012e.tar.bz2
spark-607f50341c8d86f0034f3aae69a55f25d55a012e.zip
[SPARK-9656][MLLIB][PYTHON] Add missing methods to PySpark's Distributed Linear Algebra Classes
This PR adds the remaining group of methods to PySpark's distributed linear algebra classes as follows: * `RowMatrix` <sup>**[1]**</sup> 1. `computeGramianMatrix` 2. `computeCovariance` 3. `computeColumnSummaryStatistics` 4. `columnSimilarities` 5. `tallSkinnyQR` <sup>**[2]**</sup> * `IndexedRowMatrix` <sup>**[3]**</sup> 1. `computeGramianMatrix` * `CoordinateMatrix` 1. `transpose` * `BlockMatrix` 1. `validate` 2. `cache` 3. `persist` 4. `transpose` **[1]**: Note: `multiply`, `computeSVD`, and `computePrincipalComponents` are already part of PR #7963 for SPARK-6227. **[2]**: Implementing `tallSkinnyQR` uncovered a bug with our PySpark `RowMatrix` constructor. As discussed on the dev list [here](http://apache-spark-developers-list.1001551.n3.nabble.com/K-Means-And-Class-Tags-td10038.html), there appears to be an issue with type erasure with RDDs coming from Java, and by extension from PySpark. Although we are attempting to construct a `RowMatrix` from an `RDD[Vector]` in [PythonMLlibAPI](https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala#L1115), the `Vector` type is erased, resulting in an `RDD[Object]`. Thus, when calling Scala's `tallSkinnyQR` from PySpark, we get a Java `ClassCastException` in which an `Object` cannot be cast to a Spark `Vector`. As noted in the aforementioned dev list thread, this issue was also encountered with `DecisionTrees`, and the fix involved an explicit `retag` of the RDD with a `Vector` type. Thus, this PR currently contains that fix applied to the `createRowMatrix` helper function in `PythonMLlibAPI`. `IndexedRowMatrix` and `CoordinateMatrix` do not appear to have this issue likely due to their related helper functions in `PythonMLlibAPI` creating the RDDs explicitly from DataFrames with pattern matching, thus preserving the types. However, this fix may be out of scope for this single PR, and it may be better suited in a separate JIRA/PR. Therefore, I have marked this PR as WIP and am open to discussion. **[3]**: Note: `multiply` and `computeSVD` are already part of PR #7963 for SPARK-6227. Author: Mike Dusenberry <mwdusenb@us.ibm.com> Closes #9441 from dusenberrymw/SPARK-9656_Add_Missing_Methods_to_PySpark_Distributed_Linear_Algebra.
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala3
-rw-r--r--python/pyspark/mllib/linalg/__init__.py32
-rw-r--r--python/pyspark/mllib/linalg/distributed.py270
3 files changed, 301 insertions, 4 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala
index 06b9c4ac67..b03b3ecde9 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala
@@ -188,7 +188,8 @@ class IndexedRowMatrix @Since("1.0.0") (
}
/**
- * Computes the Gramian matrix `A^T A`.
+ * Computes the Gramian matrix `A^T A`. Note that this cannot be
+ * computed on matrices with more than 65535 columns.
*/
@Since("1.0.0")
def computeGramianMatrix(): Matrix = {
diff --git a/python/pyspark/mllib/linalg/__init__.py b/python/pyspark/mllib/linalg/__init__.py
index 4cd7306edb..70509a6d9b 100644
--- a/python/pyspark/mllib/linalg/__init__.py
+++ b/python/pyspark/mllib/linalg/__init__.py
@@ -38,12 +38,14 @@ else:
import numpy as np
+from pyspark import since
from pyspark.sql.types import UserDefinedType, StructField, StructType, ArrayType, DoubleType, \
IntegerType, ByteType, BooleanType
__all__ = ['Vector', 'DenseVector', 'SparseVector', 'Vectors',
- 'Matrix', 'DenseMatrix', 'SparseMatrix', 'Matrices']
+ 'Matrix', 'DenseMatrix', 'SparseMatrix', 'Matrices',
+ 'QRDecomposition']
if sys.version_info[:2] == (2, 7):
@@ -1235,6 +1237,34 @@ class Matrices(object):
return SparseMatrix(numRows, numCols, colPtrs, rowIndices, values)
+class QRDecomposition(object):
+ """
+ .. note:: Experimental
+
+ Represents QR factors.
+ """
+ def __init__(self, Q, R):
+ self._Q = Q
+ self._R = R
+
+ @property
+ @since('2.0.0')
+ def Q(self):
+ """
+ An orthogonal matrix Q in a QR decomposition.
+ May be null if not computed.
+ """
+ return self._Q
+
+ @property
+ @since('2.0.0')
+ def R(self):
+ """
+ An upper triangular matrix R in a QR decomposition.
+ """
+ return self._R
+
+
def _test():
import doctest
(failure_count, test_count) = doctest.testmod(optionflags=doctest.ELLIPSIS)
diff --git a/python/pyspark/mllib/linalg/distributed.py b/python/pyspark/mllib/linalg/distributed.py
index 43cb0beef1..af34ce346b 100644
--- a/python/pyspark/mllib/linalg/distributed.py
+++ b/python/pyspark/mllib/linalg/distributed.py
@@ -26,9 +26,11 @@ if sys.version >= '3':
from py4j.java_gateway import JavaObject
-from pyspark import RDD
+from pyspark import RDD, since
from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
-from pyspark.mllib.linalg import _convert_to_vector, Matrix
+from pyspark.mllib.linalg import _convert_to_vector, Matrix, QRDecomposition
+from pyspark.mllib.stat import MultivariateStatisticalSummary
+from pyspark.storagelevel import StorageLevel
__all__ = ['DistributedMatrix', 'RowMatrix', 'IndexedRow',
@@ -151,6 +153,156 @@ class RowMatrix(DistributedMatrix):
"""
return self._java_matrix_wrapper.call("numCols")
+ @since('2.0.0')
+ def computeColumnSummaryStatistics(self):
+ """
+ Computes column-wise summary statistics.
+
+ :return: :class:`MultivariateStatisticalSummary` object
+ containing column-wise summary statistics.
+
+ >>> rows = sc.parallelize([[1, 2, 3], [4, 5, 6]])
+ >>> mat = RowMatrix(rows)
+
+ >>> colStats = mat.computeColumnSummaryStatistics()
+ >>> colStats.mean()
+ array([ 2.5, 3.5, 4.5])
+ """
+ java_col_stats = self._java_matrix_wrapper.call("computeColumnSummaryStatistics")
+ return MultivariateStatisticalSummary(java_col_stats)
+
+ @since('2.0.0')
+ def computeCovariance(self):
+ """
+ Computes the covariance matrix, treating each row as an
+ observation. Note that this cannot be computed on matrices
+ with more than 65535 columns.
+
+ >>> rows = sc.parallelize([[1, 2], [2, 1]])
+ >>> mat = RowMatrix(rows)
+
+ >>> mat.computeCovariance()
+ DenseMatrix(2, 2, [0.5, -0.5, -0.5, 0.5], 0)
+ """
+ return self._java_matrix_wrapper.call("computeCovariance")
+
+ @since('2.0.0')
+ def computeGramianMatrix(self):
+ """
+ Computes the Gramian matrix `A^T A`. Note that this cannot be
+ computed on matrices with more than 65535 columns.
+
+ >>> rows = sc.parallelize([[1, 2, 3], [4, 5, 6]])
+ >>> mat = RowMatrix(rows)
+
+ >>> mat.computeGramianMatrix()
+ DenseMatrix(3, 3, [17.0, 22.0, 27.0, 22.0, 29.0, 36.0, 27.0, 36.0, 45.0], 0)
+ """
+ return self._java_matrix_wrapper.call("computeGramianMatrix")
+
+ @since('2.0.0')
+ def columnSimilarities(self, threshold=0.0):
+ """
+ Compute similarities between columns of this matrix.
+
+ The threshold parameter is a trade-off knob between estimate
+ quality and computational cost.
+
+ The default threshold setting of 0 guarantees deterministically
+ correct results, but uses the brute-force approach of computing
+ normalized dot products.
+
+ Setting the threshold to positive values uses a sampling
+ approach and incurs strictly less computational cost than the
+ brute-force approach. However the similarities computed will
+ be estimates.
+
+ The sampling guarantees relative-error correctness for those
+ pairs of columns that have similarity greater than the given
+ similarity threshold.
+
+ To describe the guarantee, we set some notation:
+ * Let A be the smallest in magnitude non-zero element of
+ this matrix.
+ * Let B be the largest in magnitude non-zero element of
+ this matrix.
+ * Let L be the maximum number of non-zeros per row.
+
+ For example, for {0,1} matrices: A=B=1.
+ Another example, for the Netflix matrix: A=1, B=5
+
+ For those column pairs that are above the threshold, the
+ computed similarity is correct to within 20% relative error
+ with probability at least 1 - (0.981)^10/B^
+
+ The shuffle size is bounded by the *smaller* of the following
+ two expressions:
+
+ * O(n log(n) L / (threshold * A))
+ * O(m L^2^)
+
+ The latter is the cost of the brute-force approach, so for
+ non-zero thresholds, the cost is always cheaper than the
+ brute-force approach.
+
+ :param: threshold: Set to 0 for deterministic guaranteed
+ correctness. Similarities above this
+ threshold are estimated with the cost vs
+ estimate quality trade-off described above.
+ :return: An n x n sparse upper-triangular CoordinateMatrix of
+ cosine similarities between columns of this matrix.
+
+ >>> rows = sc.parallelize([[1, 2], [1, 5]])
+ >>> mat = RowMatrix(rows)
+
+ >>> sims = mat.columnSimilarities()
+ >>> sims.entries.first().value
+ 0.91914503...
+ """
+ java_sims_mat = self._java_matrix_wrapper.call("columnSimilarities", float(threshold))
+ return CoordinateMatrix(java_sims_mat)
+
+ @since('2.0.0')
+ def tallSkinnyQR(self, computeQ=False):
+ """
+ Compute the QR decomposition of this RowMatrix.
+
+ The implementation is designed to optimize the QR decomposition
+ (factorization) for the RowMatrix of a tall and skinny shape.
+
+ Reference:
+ Paul G. Constantine, David F. Gleich. "Tall and skinny QR
+ factorizations in MapReduce architectures"
+ ([[http://dx.doi.org/10.1145/1996092.1996103]])
+
+ :param: computeQ: whether to computeQ
+ :return: QRDecomposition(Q: RowMatrix, R: Matrix), where
+ Q = None if computeQ = false.
+
+ >>> rows = sc.parallelize([[3, -6], [4, -8], [0, 1]])
+ >>> mat = RowMatrix(rows)
+ >>> decomp = mat.tallSkinnyQR(True)
+ >>> Q = decomp.Q
+ >>> R = decomp.R
+
+ >>> # Test with absolute values
+ >>> absQRows = Q.rows.map(lambda row: abs(row.toArray()).tolist())
+ >>> absQRows.collect()
+ [[0.6..., 0.0], [0.8..., 0.0], [0.0, 1.0]]
+
+ >>> # Test with absolute values
+ >>> abs(R.toArray()).tolist()
+ [[5.0, 10.0], [0.0, 1.0]]
+ """
+ decomp = JavaModelWrapper(self._java_matrix_wrapper.call("tallSkinnyQR", computeQ))
+ if computeQ:
+ java_Q = decomp.call("Q")
+ Q = RowMatrix(java_Q)
+ else:
+ Q = None
+ R = decomp.call("R")
+ return QRDecomposition(Q, R)
+
class IndexedRow(object):
"""
@@ -311,6 +463,21 @@ class IndexedRowMatrix(DistributedMatrix):
java_coordinate_matrix = self._java_matrix_wrapper.call("columnSimilarities")
return CoordinateMatrix(java_coordinate_matrix)
+ @since('2.0.0')
+ def computeGramianMatrix(self):
+ """
+ Computes the Gramian matrix `A^T A`. Note that this cannot be
+ computed on matrices with more than 65535 columns.
+
+ >>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
+ ... IndexedRow(1, [4, 5, 6])])
+ >>> mat = IndexedRowMatrix(rows)
+
+ >>> mat.computeGramianMatrix()
+ DenseMatrix(3, 3, [17.0, 22.0, 27.0, 22.0, 29.0, 36.0, 27.0, 36.0, 45.0], 0)
+ """
+ return self._java_matrix_wrapper.call("computeGramianMatrix")
+
def toRowMatrix(self):
"""
Convert this matrix to a RowMatrix.
@@ -514,6 +681,26 @@ class CoordinateMatrix(DistributedMatrix):
"""
return self._java_matrix_wrapper.call("numCols")
+ @since('2.0.0')
+ def transpose(self):
+ """
+ Transpose this CoordinateMatrix.
+
+ >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
+ ... MatrixEntry(1, 0, 2),
+ ... MatrixEntry(2, 1, 3.7)])
+ >>> mat = CoordinateMatrix(entries)
+ >>> mat_transposed = mat.transpose()
+
+ >>> print(mat_transposed.numRows())
+ 2
+
+ >>> print(mat_transposed.numCols())
+ 3
+ """
+ java_transposed_matrix = self._java_matrix_wrapper.call("transpose")
+ return CoordinateMatrix(java_transposed_matrix)
+
def toRowMatrix(self):
"""
Convert this matrix to a RowMatrix.
@@ -789,6 +976,33 @@ class BlockMatrix(DistributedMatrix):
"""
return self._java_matrix_wrapper.call("numCols")
+ @since('2.0.0')
+ def cache(self):
+ """
+ Caches the underlying RDD.
+ """
+ self._java_matrix_wrapper.call("cache")
+ return self
+
+ @since('2.0.0')
+ def persist(self, storageLevel):
+ """
+ Persists the underlying RDD with the specified storage level.
+ """
+ if not isinstance(storageLevel, StorageLevel):
+ raise TypeError("`storageLevel` should be a StorageLevel, got %s" % type(storageLevel))
+ javaStorageLevel = self._java_matrix_wrapper._sc._getJavaStorageLevel(storageLevel)
+ self._java_matrix_wrapper.call("persist", javaStorageLevel)
+ return self
+
+ @since('2.0.0')
+ def validate(self):
+ """
+ Validates the block matrix info against the matrix data (`blocks`)
+ and throws an exception if any error is found.
+ """
+ self._java_matrix_wrapper.call("validate")
+
def add(self, other):
"""
Adds two block matrices together. The matrices must have the
@@ -822,6 +1036,41 @@ class BlockMatrix(DistributedMatrix):
java_block_matrix = self._java_matrix_wrapper.call("add", other_java_block_matrix)
return BlockMatrix(java_block_matrix, self.rowsPerBlock, self.colsPerBlock)
+ @since('2.0.0')
+ def subtract(self, other):
+ """
+ Subtracts the given block matrix `other` from this block matrix:
+ `this - other`. The matrices must have the same size and
+ matching `rowsPerBlock` and `colsPerBlock` values. If one of
+ the sub matrix blocks that are being subtracted is a
+ SparseMatrix, the resulting sub matrix block will also be a
+ SparseMatrix, even if it is being subtracted from a DenseMatrix.
+ If two dense sub matrix blocks are subtracted, the output block
+ will also be a DenseMatrix.
+
+ >>> dm1 = Matrices.dense(3, 2, [3, 1, 5, 4, 6, 2])
+ >>> dm2 = Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12])
+ >>> sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 1, 2], [1, 2, 3])
+ >>> blocks1 = sc.parallelize([((0, 0), dm1), ((1, 0), dm2)])
+ >>> blocks2 = sc.parallelize([((0, 0), dm2), ((1, 0), dm1)])
+ >>> blocks3 = sc.parallelize([((0, 0), sm), ((1, 0), dm2)])
+ >>> mat1 = BlockMatrix(blocks1, 3, 2)
+ >>> mat2 = BlockMatrix(blocks2, 3, 2)
+ >>> mat3 = BlockMatrix(blocks3, 3, 2)
+
+ >>> mat1.subtract(mat2).toLocalMatrix()
+ DenseMatrix(6, 2, [-4.0, -7.0, -4.0, 4.0, 7.0, 4.0, -6.0, -5.0, -10.0, 6.0, 5.0, 10.0], 0)
+
+ >>> mat2.subtract(mat3).toLocalMatrix()
+ DenseMatrix(6, 2, [6.0, 8.0, 9.0, -4.0, -7.0, -4.0, 10.0, 9.0, 9.0, -6.0, -5.0, -10.0], 0)
+ """
+ if not isinstance(other, BlockMatrix):
+ raise TypeError("Other should be a BlockMatrix, got %s" % type(other))
+
+ other_java_block_matrix = other._java_matrix_wrapper._java_model
+ java_block_matrix = self._java_matrix_wrapper.call("subtract", other_java_block_matrix)
+ return BlockMatrix(java_block_matrix, self.rowsPerBlock, self.colsPerBlock)
+
def multiply(self, other):
"""
Left multiplies this BlockMatrix by `other`, another
@@ -857,6 +1106,23 @@ class BlockMatrix(DistributedMatrix):
java_block_matrix = self._java_matrix_wrapper.call("multiply", other_java_block_matrix)
return BlockMatrix(java_block_matrix, self.rowsPerBlock, self.colsPerBlock)
+ @since('2.0.0')
+ def transpose(self):
+ """
+ Transpose this BlockMatrix. Returns a new BlockMatrix
+ instance sharing the same underlying data. Is a lazy operation.
+
+ >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
+ ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
+ >>> mat = BlockMatrix(blocks, 3, 2)
+
+ >>> mat_transposed = mat.transpose()
+ >>> mat_transposed.toLocalMatrix()
+ DenseMatrix(2, 6, [1.0, 4.0, 2.0, 5.0, 3.0, 6.0, 7.0, 10.0, 8.0, 11.0, 9.0, 12.0], 0)
+ """
+ java_transposed_matrix = self._java_matrix_wrapper.call("transpose")
+ return BlockMatrix(java_transposed_matrix, self.colsPerBlock, self.rowsPerBlock)
+
def toLocalMatrix(self):
"""
Collect the distributed matrix on the driver as a DenseMatrix.