diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/pyspark/mllib/linalg/__init__.py | 32 | ||||
-rw-r--r-- | python/pyspark/mllib/linalg/distributed.py | 270 |
2 files changed, 299 insertions, 3 deletions
diff --git a/python/pyspark/mllib/linalg/__init__.py b/python/pyspark/mllib/linalg/__init__.py index 4cd7306edb..70509a6d9b 100644 --- a/python/pyspark/mllib/linalg/__init__.py +++ b/python/pyspark/mllib/linalg/__init__.py @@ -38,12 +38,14 @@ else: import numpy as np +from pyspark import since from pyspark.sql.types import UserDefinedType, StructField, StructType, ArrayType, DoubleType, \ IntegerType, ByteType, BooleanType __all__ = ['Vector', 'DenseVector', 'SparseVector', 'Vectors', - 'Matrix', 'DenseMatrix', 'SparseMatrix', 'Matrices'] + 'Matrix', 'DenseMatrix', 'SparseMatrix', 'Matrices', + 'QRDecomposition'] if sys.version_info[:2] == (2, 7): @@ -1235,6 +1237,34 @@ class Matrices(object): return SparseMatrix(numRows, numCols, colPtrs, rowIndices, values) +class QRDecomposition(object): + """ + .. note:: Experimental + + Represents QR factors. + """ + def __init__(self, Q, R): + self._Q = Q + self._R = R + + @property + @since('2.0.0') + def Q(self): + """ + An orthogonal matrix Q in a QR decomposition. + May be null if not computed. + """ + return self._Q + + @property + @since('2.0.0') + def R(self): + """ + An upper triangular matrix R in a QR decomposition. + """ + return self._R + + def _test(): import doctest (failure_count, test_count) = doctest.testmod(optionflags=doctest.ELLIPSIS) diff --git a/python/pyspark/mllib/linalg/distributed.py b/python/pyspark/mllib/linalg/distributed.py index 43cb0beef1..af34ce346b 100644 --- a/python/pyspark/mllib/linalg/distributed.py +++ b/python/pyspark/mllib/linalg/distributed.py @@ -26,9 +26,11 @@ if sys.version >= '3': from py4j.java_gateway import JavaObject -from pyspark import RDD +from pyspark import RDD, since from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper -from pyspark.mllib.linalg import _convert_to_vector, Matrix +from pyspark.mllib.linalg import _convert_to_vector, Matrix, QRDecomposition +from pyspark.mllib.stat import MultivariateStatisticalSummary +from pyspark.storagelevel import StorageLevel __all__ = ['DistributedMatrix', 'RowMatrix', 'IndexedRow', @@ -151,6 +153,156 @@ class RowMatrix(DistributedMatrix): """ return self._java_matrix_wrapper.call("numCols") + @since('2.0.0') + def computeColumnSummaryStatistics(self): + """ + Computes column-wise summary statistics. + + :return: :class:`MultivariateStatisticalSummary` object + containing column-wise summary statistics. + + >>> rows = sc.parallelize([[1, 2, 3], [4, 5, 6]]) + >>> mat = RowMatrix(rows) + + >>> colStats = mat.computeColumnSummaryStatistics() + >>> colStats.mean() + array([ 2.5, 3.5, 4.5]) + """ + java_col_stats = self._java_matrix_wrapper.call("computeColumnSummaryStatistics") + return MultivariateStatisticalSummary(java_col_stats) + + @since('2.0.0') + def computeCovariance(self): + """ + Computes the covariance matrix, treating each row as an + observation. Note that this cannot be computed on matrices + with more than 65535 columns. + + >>> rows = sc.parallelize([[1, 2], [2, 1]]) + >>> mat = RowMatrix(rows) + + >>> mat.computeCovariance() + DenseMatrix(2, 2, [0.5, -0.5, -0.5, 0.5], 0) + """ + return self._java_matrix_wrapper.call("computeCovariance") + + @since('2.0.0') + def computeGramianMatrix(self): + """ + Computes the Gramian matrix `A^T A`. Note that this cannot be + computed on matrices with more than 65535 columns. + + >>> rows = sc.parallelize([[1, 2, 3], [4, 5, 6]]) + >>> mat = RowMatrix(rows) + + >>> mat.computeGramianMatrix() + DenseMatrix(3, 3, [17.0, 22.0, 27.0, 22.0, 29.0, 36.0, 27.0, 36.0, 45.0], 0) + """ + return self._java_matrix_wrapper.call("computeGramianMatrix") + + @since('2.0.0') + def columnSimilarities(self, threshold=0.0): + """ + Compute similarities between columns of this matrix. + + The threshold parameter is a trade-off knob between estimate + quality and computational cost. + + The default threshold setting of 0 guarantees deterministically + correct results, but uses the brute-force approach of computing + normalized dot products. + + Setting the threshold to positive values uses a sampling + approach and incurs strictly less computational cost than the + brute-force approach. However the similarities computed will + be estimates. + + The sampling guarantees relative-error correctness for those + pairs of columns that have similarity greater than the given + similarity threshold. + + To describe the guarantee, we set some notation: + * Let A be the smallest in magnitude non-zero element of + this matrix. + * Let B be the largest in magnitude non-zero element of + this matrix. + * Let L be the maximum number of non-zeros per row. + + For example, for {0,1} matrices: A=B=1. + Another example, for the Netflix matrix: A=1, B=5 + + For those column pairs that are above the threshold, the + computed similarity is correct to within 20% relative error + with probability at least 1 - (0.981)^10/B^ + + The shuffle size is bounded by the *smaller* of the following + two expressions: + + * O(n log(n) L / (threshold * A)) + * O(m L^2^) + + The latter is the cost of the brute-force approach, so for + non-zero thresholds, the cost is always cheaper than the + brute-force approach. + + :param: threshold: Set to 0 for deterministic guaranteed + correctness. Similarities above this + threshold are estimated with the cost vs + estimate quality trade-off described above. + :return: An n x n sparse upper-triangular CoordinateMatrix of + cosine similarities between columns of this matrix. + + >>> rows = sc.parallelize([[1, 2], [1, 5]]) + >>> mat = RowMatrix(rows) + + >>> sims = mat.columnSimilarities() + >>> sims.entries.first().value + 0.91914503... + """ + java_sims_mat = self._java_matrix_wrapper.call("columnSimilarities", float(threshold)) + return CoordinateMatrix(java_sims_mat) + + @since('2.0.0') + def tallSkinnyQR(self, computeQ=False): + """ + Compute the QR decomposition of this RowMatrix. + + The implementation is designed to optimize the QR decomposition + (factorization) for the RowMatrix of a tall and skinny shape. + + Reference: + Paul G. Constantine, David F. Gleich. "Tall and skinny QR + factorizations in MapReduce architectures" + ([[http://dx.doi.org/10.1145/1996092.1996103]]) + + :param: computeQ: whether to computeQ + :return: QRDecomposition(Q: RowMatrix, R: Matrix), where + Q = None if computeQ = false. + + >>> rows = sc.parallelize([[3, -6], [4, -8], [0, 1]]) + >>> mat = RowMatrix(rows) + >>> decomp = mat.tallSkinnyQR(True) + >>> Q = decomp.Q + >>> R = decomp.R + + >>> # Test with absolute values + >>> absQRows = Q.rows.map(lambda row: abs(row.toArray()).tolist()) + >>> absQRows.collect() + [[0.6..., 0.0], [0.8..., 0.0], [0.0, 1.0]] + + >>> # Test with absolute values + >>> abs(R.toArray()).tolist() + [[5.0, 10.0], [0.0, 1.0]] + """ + decomp = JavaModelWrapper(self._java_matrix_wrapper.call("tallSkinnyQR", computeQ)) + if computeQ: + java_Q = decomp.call("Q") + Q = RowMatrix(java_Q) + else: + Q = None + R = decomp.call("R") + return QRDecomposition(Q, R) + class IndexedRow(object): """ @@ -311,6 +463,21 @@ class IndexedRowMatrix(DistributedMatrix): java_coordinate_matrix = self._java_matrix_wrapper.call("columnSimilarities") return CoordinateMatrix(java_coordinate_matrix) + @since('2.0.0') + def computeGramianMatrix(self): + """ + Computes the Gramian matrix `A^T A`. Note that this cannot be + computed on matrices with more than 65535 columns. + + >>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]), + ... IndexedRow(1, [4, 5, 6])]) + >>> mat = IndexedRowMatrix(rows) + + >>> mat.computeGramianMatrix() + DenseMatrix(3, 3, [17.0, 22.0, 27.0, 22.0, 29.0, 36.0, 27.0, 36.0, 45.0], 0) + """ + return self._java_matrix_wrapper.call("computeGramianMatrix") + def toRowMatrix(self): """ Convert this matrix to a RowMatrix. @@ -514,6 +681,26 @@ class CoordinateMatrix(DistributedMatrix): """ return self._java_matrix_wrapper.call("numCols") + @since('2.0.0') + def transpose(self): + """ + Transpose this CoordinateMatrix. + + >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2), + ... MatrixEntry(1, 0, 2), + ... MatrixEntry(2, 1, 3.7)]) + >>> mat = CoordinateMatrix(entries) + >>> mat_transposed = mat.transpose() + + >>> print(mat_transposed.numRows()) + 2 + + >>> print(mat_transposed.numCols()) + 3 + """ + java_transposed_matrix = self._java_matrix_wrapper.call("transpose") + return CoordinateMatrix(java_transposed_matrix) + def toRowMatrix(self): """ Convert this matrix to a RowMatrix. @@ -789,6 +976,33 @@ class BlockMatrix(DistributedMatrix): """ return self._java_matrix_wrapper.call("numCols") + @since('2.0.0') + def cache(self): + """ + Caches the underlying RDD. + """ + self._java_matrix_wrapper.call("cache") + return self + + @since('2.0.0') + def persist(self, storageLevel): + """ + Persists the underlying RDD with the specified storage level. + """ + if not isinstance(storageLevel, StorageLevel): + raise TypeError("`storageLevel` should be a StorageLevel, got %s" % type(storageLevel)) + javaStorageLevel = self._java_matrix_wrapper._sc._getJavaStorageLevel(storageLevel) + self._java_matrix_wrapper.call("persist", javaStorageLevel) + return self + + @since('2.0.0') + def validate(self): + """ + Validates the block matrix info against the matrix data (`blocks`) + and throws an exception if any error is found. + """ + self._java_matrix_wrapper.call("validate") + def add(self, other): """ Adds two block matrices together. The matrices must have the @@ -822,6 +1036,41 @@ class BlockMatrix(DistributedMatrix): java_block_matrix = self._java_matrix_wrapper.call("add", other_java_block_matrix) return BlockMatrix(java_block_matrix, self.rowsPerBlock, self.colsPerBlock) + @since('2.0.0') + def subtract(self, other): + """ + Subtracts the given block matrix `other` from this block matrix: + `this - other`. The matrices must have the same size and + matching `rowsPerBlock` and `colsPerBlock` values. If one of + the sub matrix blocks that are being subtracted is a + SparseMatrix, the resulting sub matrix block will also be a + SparseMatrix, even if it is being subtracted from a DenseMatrix. + If two dense sub matrix blocks are subtracted, the output block + will also be a DenseMatrix. + + >>> dm1 = Matrices.dense(3, 2, [3, 1, 5, 4, 6, 2]) + >>> dm2 = Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]) + >>> sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 1, 2], [1, 2, 3]) + >>> blocks1 = sc.parallelize([((0, 0), dm1), ((1, 0), dm2)]) + >>> blocks2 = sc.parallelize([((0, 0), dm2), ((1, 0), dm1)]) + >>> blocks3 = sc.parallelize([((0, 0), sm), ((1, 0), dm2)]) + >>> mat1 = BlockMatrix(blocks1, 3, 2) + >>> mat2 = BlockMatrix(blocks2, 3, 2) + >>> mat3 = BlockMatrix(blocks3, 3, 2) + + >>> mat1.subtract(mat2).toLocalMatrix() + DenseMatrix(6, 2, [-4.0, -7.0, -4.0, 4.0, 7.0, 4.0, -6.0, -5.0, -10.0, 6.0, 5.0, 10.0], 0) + + >>> mat2.subtract(mat3).toLocalMatrix() + DenseMatrix(6, 2, [6.0, 8.0, 9.0, -4.0, -7.0, -4.0, 10.0, 9.0, 9.0, -6.0, -5.0, -10.0], 0) + """ + if not isinstance(other, BlockMatrix): + raise TypeError("Other should be a BlockMatrix, got %s" % type(other)) + + other_java_block_matrix = other._java_matrix_wrapper._java_model + java_block_matrix = self._java_matrix_wrapper.call("subtract", other_java_block_matrix) + return BlockMatrix(java_block_matrix, self.rowsPerBlock, self.colsPerBlock) + def multiply(self, other): """ Left multiplies this BlockMatrix by `other`, another @@ -857,6 +1106,23 @@ class BlockMatrix(DistributedMatrix): java_block_matrix = self._java_matrix_wrapper.call("multiply", other_java_block_matrix) return BlockMatrix(java_block_matrix, self.rowsPerBlock, self.colsPerBlock) + @since('2.0.0') + def transpose(self): + """ + Transpose this BlockMatrix. Returns a new BlockMatrix + instance sharing the same underlying data. Is a lazy operation. + + >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), + ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]) + >>> mat = BlockMatrix(blocks, 3, 2) + + >>> mat_transposed = mat.transpose() + >>> mat_transposed.toLocalMatrix() + DenseMatrix(2, 6, [1.0, 4.0, 2.0, 5.0, 3.0, 6.0, 7.0, 10.0, 8.0, 11.0, 9.0, 12.0], 0) + """ + java_transposed_matrix = self._java_matrix_wrapper.call("transpose") + return BlockMatrix(java_transposed_matrix, self.colsPerBlock, self.rowsPerBlock) + def toLocalMatrix(self): """ Collect the distributed matrix on the driver as a DenseMatrix. |