aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala3
-rw-r--r--python/pyspark/mllib/linalg/__init__.py32
-rw-r--r--python/pyspark/mllib/linalg/distributed.py270
3 files changed, 301 insertions, 4 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala
index 06b9c4ac67..b03b3ecde9 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala
@@ -188,7 +188,8 @@ class IndexedRowMatrix @Since("1.0.0") (
}
/**
- * Computes the Gramian matrix `A^T A`.
+ * Computes the Gramian matrix `A^T A`. Note that this cannot be
+ * computed on matrices with more than 65535 columns.
*/
@Since("1.0.0")
def computeGramianMatrix(): Matrix = {
diff --git a/python/pyspark/mllib/linalg/__init__.py b/python/pyspark/mllib/linalg/__init__.py
index 4cd7306edb..70509a6d9b 100644
--- a/python/pyspark/mllib/linalg/__init__.py
+++ b/python/pyspark/mllib/linalg/__init__.py
@@ -38,12 +38,14 @@ else:
import numpy as np
+from pyspark import since
from pyspark.sql.types import UserDefinedType, StructField, StructType, ArrayType, DoubleType, \
IntegerType, ByteType, BooleanType
__all__ = ['Vector', 'DenseVector', 'SparseVector', 'Vectors',
- 'Matrix', 'DenseMatrix', 'SparseMatrix', 'Matrices']
+ 'Matrix', 'DenseMatrix', 'SparseMatrix', 'Matrices',
+ 'QRDecomposition']
if sys.version_info[:2] == (2, 7):
@@ -1235,6 +1237,34 @@ class Matrices(object):
return SparseMatrix(numRows, numCols, colPtrs, rowIndices, values)
+class QRDecomposition(object):
+ """
+ .. note:: Experimental
+
+ Represents QR factors.
+ """
+ def __init__(self, Q, R):
+ self._Q = Q
+ self._R = R
+
+ @property
+ @since('2.0.0')
+ def Q(self):
+ """
+ An orthogonal matrix Q in a QR decomposition.
+ May be null if not computed.
+ """
+ return self._Q
+
+ @property
+ @since('2.0.0')
+ def R(self):
+ """
+ An upper triangular matrix R in a QR decomposition.
+ """
+ return self._R
+
+
def _test():
import doctest
(failure_count, test_count) = doctest.testmod(optionflags=doctest.ELLIPSIS)
diff --git a/python/pyspark/mllib/linalg/distributed.py b/python/pyspark/mllib/linalg/distributed.py
index 43cb0beef1..af34ce346b 100644
--- a/python/pyspark/mllib/linalg/distributed.py
+++ b/python/pyspark/mllib/linalg/distributed.py
@@ -26,9 +26,11 @@ if sys.version >= '3':
from py4j.java_gateway import JavaObject
-from pyspark import RDD
+from pyspark import RDD, since
from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
-from pyspark.mllib.linalg import _convert_to_vector, Matrix
+from pyspark.mllib.linalg import _convert_to_vector, Matrix, QRDecomposition
+from pyspark.mllib.stat import MultivariateStatisticalSummary
+from pyspark.storagelevel import StorageLevel
__all__ = ['DistributedMatrix', 'RowMatrix', 'IndexedRow',
@@ -151,6 +153,156 @@ class RowMatrix(DistributedMatrix):
"""
return self._java_matrix_wrapper.call("numCols")
+ @since('2.0.0')
+ def computeColumnSummaryStatistics(self):
+ """
+ Computes column-wise summary statistics.
+
+ :return: :class:`MultivariateStatisticalSummary` object
+ containing column-wise summary statistics.
+
+ >>> rows = sc.parallelize([[1, 2, 3], [4, 5, 6]])
+ >>> mat = RowMatrix(rows)
+
+ >>> colStats = mat.computeColumnSummaryStatistics()
+ >>> colStats.mean()
+ array([ 2.5, 3.5, 4.5])
+ """
+ java_col_stats = self._java_matrix_wrapper.call("computeColumnSummaryStatistics")
+ return MultivariateStatisticalSummary(java_col_stats)
+
+ @since('2.0.0')
+ def computeCovariance(self):
+ """
+ Computes the covariance matrix, treating each row as an
+ observation. Note that this cannot be computed on matrices
+ with more than 65535 columns.
+
+ >>> rows = sc.parallelize([[1, 2], [2, 1]])
+ >>> mat = RowMatrix(rows)
+
+ >>> mat.computeCovariance()
+ DenseMatrix(2, 2, [0.5, -0.5, -0.5, 0.5], 0)
+ """
+ return self._java_matrix_wrapper.call("computeCovariance")
+
+ @since('2.0.0')
+ def computeGramianMatrix(self):
+ """
+ Computes the Gramian matrix `A^T A`. Note that this cannot be
+ computed on matrices with more than 65535 columns.
+
+ >>> rows = sc.parallelize([[1, 2, 3], [4, 5, 6]])
+ >>> mat = RowMatrix(rows)
+
+ >>> mat.computeGramianMatrix()
+ DenseMatrix(3, 3, [17.0, 22.0, 27.0, 22.0, 29.0, 36.0, 27.0, 36.0, 45.0], 0)
+ """
+ return self._java_matrix_wrapper.call("computeGramianMatrix")
+
+ @since('2.0.0')
+ def columnSimilarities(self, threshold=0.0):
+ """
+ Compute similarities between columns of this matrix.
+
+ The threshold parameter is a trade-off knob between estimate
+ quality and computational cost.
+
+ The default threshold setting of 0 guarantees deterministically
+ correct results, but uses the brute-force approach of computing
+ normalized dot products.
+
+ Setting the threshold to positive values uses a sampling
+ approach and incurs strictly less computational cost than the
+ brute-force approach. However the similarities computed will
+ be estimates.
+
+ The sampling guarantees relative-error correctness for those
+ pairs of columns that have similarity greater than the given
+ similarity threshold.
+
+ To describe the guarantee, we set some notation:
+ * Let A be the smallest in magnitude non-zero element of
+ this matrix.
+ * Let B be the largest in magnitude non-zero element of
+ this matrix.
+ * Let L be the maximum number of non-zeros per row.
+
+ For example, for {0,1} matrices: A=B=1.
+ Another example, for the Netflix matrix: A=1, B=5
+
+ For those column pairs that are above the threshold, the
+ computed similarity is correct to within 20% relative error
+ with probability at least 1 - (0.981)^10/B^
+
+ The shuffle size is bounded by the *smaller* of the following
+ two expressions:
+
+ * O(n log(n) L / (threshold * A))
+ * O(m L^2^)
+
+ The latter is the cost of the brute-force approach, so for
+ non-zero thresholds, the cost is always cheaper than the
+ brute-force approach.
+
+ :param: threshold: Set to 0 for deterministic guaranteed
+ correctness. Similarities above this
+ threshold are estimated with the cost vs
+ estimate quality trade-off described above.
+ :return: An n x n sparse upper-triangular CoordinateMatrix of
+ cosine similarities between columns of this matrix.
+
+ >>> rows = sc.parallelize([[1, 2], [1, 5]])
+ >>> mat = RowMatrix(rows)
+
+ >>> sims = mat.columnSimilarities()
+ >>> sims.entries.first().value
+ 0.91914503...
+ """
+ java_sims_mat = self._java_matrix_wrapper.call("columnSimilarities", float(threshold))
+ return CoordinateMatrix(java_sims_mat)
+
+ @since('2.0.0')
+ def tallSkinnyQR(self, computeQ=False):
+ """
+ Compute the QR decomposition of this RowMatrix.
+
+ The implementation is designed to optimize the QR decomposition
+ (factorization) for the RowMatrix of a tall and skinny shape.
+
+ Reference:
+ Paul G. Constantine, David F. Gleich. "Tall and skinny QR
+ factorizations in MapReduce architectures"
+ ([[http://dx.doi.org/10.1145/1996092.1996103]])
+
+ :param: computeQ: whether to computeQ
+ :return: QRDecomposition(Q: RowMatrix, R: Matrix), where
+ Q = None if computeQ = false.
+
+ >>> rows = sc.parallelize([[3, -6], [4, -8], [0, 1]])
+ >>> mat = RowMatrix(rows)
+ >>> decomp = mat.tallSkinnyQR(True)
+ >>> Q = decomp.Q
+ >>> R = decomp.R
+
+ >>> # Test with absolute values
+ >>> absQRows = Q.rows.map(lambda row: abs(row.toArray()).tolist())
+ >>> absQRows.collect()
+ [[0.6..., 0.0], [0.8..., 0.0], [0.0, 1.0]]
+
+ >>> # Test with absolute values
+ >>> abs(R.toArray()).tolist()
+ [[5.0, 10.0], [0.0, 1.0]]
+ """
+ decomp = JavaModelWrapper(self._java_matrix_wrapper.call("tallSkinnyQR", computeQ))
+ if computeQ:
+ java_Q = decomp.call("Q")
+ Q = RowMatrix(java_Q)
+ else:
+ Q = None
+ R = decomp.call("R")
+ return QRDecomposition(Q, R)
+
class IndexedRow(object):
"""
@@ -311,6 +463,21 @@ class IndexedRowMatrix(DistributedMatrix):
java_coordinate_matrix = self._java_matrix_wrapper.call("columnSimilarities")
return CoordinateMatrix(java_coordinate_matrix)
+ @since('2.0.0')
+ def computeGramianMatrix(self):
+ """
+ Computes the Gramian matrix `A^T A`. Note that this cannot be
+ computed on matrices with more than 65535 columns.
+
+ >>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
+ ... IndexedRow(1, [4, 5, 6])])
+ >>> mat = IndexedRowMatrix(rows)
+
+ >>> mat.computeGramianMatrix()
+ DenseMatrix(3, 3, [17.0, 22.0, 27.0, 22.0, 29.0, 36.0, 27.0, 36.0, 45.0], 0)
+ """
+ return self._java_matrix_wrapper.call("computeGramianMatrix")
+
def toRowMatrix(self):
"""
Convert this matrix to a RowMatrix.
@@ -514,6 +681,26 @@ class CoordinateMatrix(DistributedMatrix):
"""
return self._java_matrix_wrapper.call("numCols")
+ @since('2.0.0')
+ def transpose(self):
+ """
+ Transpose this CoordinateMatrix.
+
+ >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
+ ... MatrixEntry(1, 0, 2),
+ ... MatrixEntry(2, 1, 3.7)])
+ >>> mat = CoordinateMatrix(entries)
+ >>> mat_transposed = mat.transpose()
+
+ >>> print(mat_transposed.numRows())
+ 2
+
+ >>> print(mat_transposed.numCols())
+ 3
+ """
+ java_transposed_matrix = self._java_matrix_wrapper.call("transpose")
+ return CoordinateMatrix(java_transposed_matrix)
+
def toRowMatrix(self):
"""
Convert this matrix to a RowMatrix.
@@ -789,6 +976,33 @@ class BlockMatrix(DistributedMatrix):
"""
return self._java_matrix_wrapper.call("numCols")
+ @since('2.0.0')
+ def cache(self):
+ """
+ Caches the underlying RDD.
+ """
+ self._java_matrix_wrapper.call("cache")
+ return self
+
+ @since('2.0.0')
+ def persist(self, storageLevel):
+ """
+ Persists the underlying RDD with the specified storage level.
+ """
+ if not isinstance(storageLevel, StorageLevel):
+ raise TypeError("`storageLevel` should be a StorageLevel, got %s" % type(storageLevel))
+ javaStorageLevel = self._java_matrix_wrapper._sc._getJavaStorageLevel(storageLevel)
+ self._java_matrix_wrapper.call("persist", javaStorageLevel)
+ return self
+
+ @since('2.0.0')
+ def validate(self):
+ """
+ Validates the block matrix info against the matrix data (`blocks`)
+ and throws an exception if any error is found.
+ """
+ self._java_matrix_wrapper.call("validate")
+
def add(self, other):
"""
Adds two block matrices together. The matrices must have the
@@ -822,6 +1036,41 @@ class BlockMatrix(DistributedMatrix):
java_block_matrix = self._java_matrix_wrapper.call("add", other_java_block_matrix)
return BlockMatrix(java_block_matrix, self.rowsPerBlock, self.colsPerBlock)
+ @since('2.0.0')
+ def subtract(self, other):
+ """
+ Subtracts the given block matrix `other` from this block matrix:
+ `this - other`. The matrices must have the same size and
+ matching `rowsPerBlock` and `colsPerBlock` values. If one of
+ the sub matrix blocks that are being subtracted is a
+ SparseMatrix, the resulting sub matrix block will also be a
+ SparseMatrix, even if it is being subtracted from a DenseMatrix.
+ If two dense sub matrix blocks are subtracted, the output block
+ will also be a DenseMatrix.
+
+ >>> dm1 = Matrices.dense(3, 2, [3, 1, 5, 4, 6, 2])
+ >>> dm2 = Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12])
+ >>> sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 1, 2], [1, 2, 3])
+ >>> blocks1 = sc.parallelize([((0, 0), dm1), ((1, 0), dm2)])
+ >>> blocks2 = sc.parallelize([((0, 0), dm2), ((1, 0), dm1)])
+ >>> blocks3 = sc.parallelize([((0, 0), sm), ((1, 0), dm2)])
+ >>> mat1 = BlockMatrix(blocks1, 3, 2)
+ >>> mat2 = BlockMatrix(blocks2, 3, 2)
+ >>> mat3 = BlockMatrix(blocks3, 3, 2)
+
+ >>> mat1.subtract(mat2).toLocalMatrix()
+ DenseMatrix(6, 2, [-4.0, -7.0, -4.0, 4.0, 7.0, 4.0, -6.0, -5.0, -10.0, 6.0, 5.0, 10.0], 0)
+
+ >>> mat2.subtract(mat3).toLocalMatrix()
+ DenseMatrix(6, 2, [6.0, 8.0, 9.0, -4.0, -7.0, -4.0, 10.0, 9.0, 9.0, -6.0, -5.0, -10.0], 0)
+ """
+ if not isinstance(other, BlockMatrix):
+ raise TypeError("Other should be a BlockMatrix, got %s" % type(other))
+
+ other_java_block_matrix = other._java_matrix_wrapper._java_model
+ java_block_matrix = self._java_matrix_wrapper.call("subtract", other_java_block_matrix)
+ return BlockMatrix(java_block_matrix, self.rowsPerBlock, self.colsPerBlock)
+
def multiply(self, other):
"""
Left multiplies this BlockMatrix by `other`, another
@@ -857,6 +1106,23 @@ class BlockMatrix(DistributedMatrix):
java_block_matrix = self._java_matrix_wrapper.call("multiply", other_java_block_matrix)
return BlockMatrix(java_block_matrix, self.rowsPerBlock, self.colsPerBlock)
+ @since('2.0.0')
+ def transpose(self):
+ """
+ Transpose this BlockMatrix. Returns a new BlockMatrix
+ instance sharing the same underlying data. Is a lazy operation.
+
+ >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
+ ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
+ >>> mat = BlockMatrix(blocks, 3, 2)
+
+ >>> mat_transposed = mat.transpose()
+ >>> mat_transposed.toLocalMatrix()
+ DenseMatrix(2, 6, [1.0, 4.0, 2.0, 5.0, 3.0, 6.0, 7.0, 10.0, 8.0, 11.0, 9.0, 12.0], 0)
+ """
+ java_transposed_matrix = self._java_matrix_wrapper.call("transpose")
+ return BlockMatrix(java_transposed_matrix, self.colsPerBlock, self.rowsPerBlock)
+
def toLocalMatrix(self):
"""
Collect the distributed matrix on the driver as a DenseMatrix.