aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorMike Dusenberry <mwdusenb@us.ibm.com>2015-08-05 07:40:50 -0700
committerXiangrui Meng <meng@databricks.com>2015-08-05 07:40:50 -0700
commit34dcf10104460816382908b2b8eeb6c925e862bf (patch)
treea4767e939dd4a33e2d3eda23696ffcf03f55c4fa /python
parent519cf6d3f764a977770266784d6902fe205a070f (diff)
downloadspark-34dcf10104460816382908b2b8eeb6c925e862bf.tar.gz
spark-34dcf10104460816382908b2b8eeb6c925e862bf.tar.bz2
spark-34dcf10104460816382908b2b8eeb6c925e862bf.zip
[SPARK-6486] [MLLIB] [PYTHON] Add BlockMatrix to PySpark.
mengxr This adds the `BlockMatrix` to PySpark. I have the conversions to `IndexedRowMatrix` and `CoordinateMatrix` ready as well, so once PR #7554 is completed (which relies on PR #7746), this PR can be finished. Author: Mike Dusenberry <mwdusenb@us.ibm.com> Closes #7761 from dusenberrymw/SPARK-6486_Add_BlockMatrix_to_PySpark and squashes the following commits: 27195c2 [Mike Dusenberry] Adding one more check to _convert_to_matrix_block_tuple, and a few minor documentation changes. ae50883 [Mike Dusenberry] Minor update: BlockMatrix should inherit from DistributedMatrix. b8acc1c [Mike Dusenberry] Moving BlockMatrix to pyspark.mllib.linalg.distributed, updating the logic to match that of the other distributed matrices, adding conversions, and adding documentation. c014002 [Mike Dusenberry] Using properties for better documentation. 3bda6ab [Mike Dusenberry] Adding documentation. 8fb3095 [Mike Dusenberry] Small cleanup. e17af2e [Mike Dusenberry] Adding BlockMatrix to PySpark.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/mllib/linalg/distributed.py328
1 files changed, 322 insertions, 6 deletions
diff --git a/python/pyspark/mllib/linalg/distributed.py b/python/pyspark/mllib/linalg/distributed.py
index 666d833019..aec407de90 100644
--- a/python/pyspark/mllib/linalg/distributed.py
+++ b/python/pyspark/mllib/linalg/distributed.py
@@ -28,11 +28,12 @@ from py4j.java_gateway import JavaObject
from pyspark import RDD
from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
-from pyspark.mllib.linalg import _convert_to_vector
+from pyspark.mllib.linalg import _convert_to_vector, Matrix
__all__ = ['DistributedMatrix', 'RowMatrix', 'IndexedRow',
- 'IndexedRowMatrix', 'MatrixEntry', 'CoordinateMatrix']
+ 'IndexedRowMatrix', 'MatrixEntry', 'CoordinateMatrix',
+ 'BlockMatrix']
class DistributedMatrix(object):
@@ -322,6 +323,35 @@ class IndexedRowMatrix(DistributedMatrix):
java_coordinate_matrix = self._java_matrix_wrapper.call("toCoordinateMatrix")
return CoordinateMatrix(java_coordinate_matrix)
+ def toBlockMatrix(self, rowsPerBlock=1024, colsPerBlock=1024):
+ """
+ Convert this matrix to a BlockMatrix.
+
+ :param rowsPerBlock: Number of rows that make up each block.
+ The blocks forming the final rows are not
+ required to have the given number of rows.
+ :param colsPerBlock: Number of columns that make up each block.
+ The blocks forming the final columns are not
+ required to have the given number of columns.
+
+ >>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
+ ... IndexedRow(6, [4, 5, 6])])
+ >>> mat = IndexedRowMatrix(rows).toBlockMatrix()
+
+ >>> # This IndexedRowMatrix will have 7 effective rows, due to
+ >>> # the highest row index being 6, and the ensuing
+ >>> # BlockMatrix will have 7 rows as well.
+ >>> print(mat.numRows())
+ 7
+
+ >>> print(mat.numCols())
+ 3
+ """
+ java_block_matrix = self._java_matrix_wrapper.call("toBlockMatrix",
+ rowsPerBlock,
+ colsPerBlock)
+ return BlockMatrix(java_block_matrix, rowsPerBlock, colsPerBlock)
+
class MatrixEntry(object):
"""
@@ -476,19 +506,18 @@ class CoordinateMatrix(DistributedMatrix):
>>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
... MatrixEntry(6, 4, 2.1)])
+ >>> mat = CoordinateMatrix(entries).toRowMatrix()
>>> # This CoordinateMatrix will have 7 effective rows, due to
>>> # the highest row index being 6, but the ensuing RowMatrix
>>> # will only have 2 rows since there are only entries on 2
>>> # unique rows.
- >>> mat = CoordinateMatrix(entries).toRowMatrix()
>>> print(mat.numRows())
2
>>> # This CoordinateMatrix will have 5 columns, due to the
>>> # highest column index being 4, and the ensuing RowMatrix
>>> # will have 5 columns as well.
- >>> mat = CoordinateMatrix(entries).toRowMatrix()
>>> print(mat.numCols())
5
"""
@@ -501,33 +530,320 @@ class CoordinateMatrix(DistributedMatrix):
>>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
... MatrixEntry(6, 4, 2.1)])
+ >>> mat = CoordinateMatrix(entries).toIndexedRowMatrix()
>>> # This CoordinateMatrix will have 7 effective rows, due to
>>> # the highest row index being 6, and the ensuing
>>> # IndexedRowMatrix will have 7 rows as well.
- >>> mat = CoordinateMatrix(entries).toIndexedRowMatrix()
>>> print(mat.numRows())
7
>>> # This CoordinateMatrix will have 5 columns, due to the
>>> # highest column index being 4, and the ensuing
>>> # IndexedRowMatrix will have 5 columns as well.
- >>> mat = CoordinateMatrix(entries).toIndexedRowMatrix()
>>> print(mat.numCols())
5
"""
java_indexed_row_matrix = self._java_matrix_wrapper.call("toIndexedRowMatrix")
return IndexedRowMatrix(java_indexed_row_matrix)
+ def toBlockMatrix(self, rowsPerBlock=1024, colsPerBlock=1024):
+ """
+ Convert this matrix to a BlockMatrix.
+
+ :param rowsPerBlock: Number of rows that make up each block.
+ The blocks forming the final rows are not
+ required to have the given number of rows.
+ :param colsPerBlock: Number of columns that make up each block.
+ The blocks forming the final columns are not
+ required to have the given number of columns.
+
+ >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
+ ... MatrixEntry(6, 4, 2.1)])
+ >>> mat = CoordinateMatrix(entries).toBlockMatrix()
+
+ >>> # This CoordinateMatrix will have 7 effective rows, due to
+ >>> # the highest row index being 6, and the ensuing
+ >>> # BlockMatrix will have 7 rows as well.
+ >>> print(mat.numRows())
+ 7
+
+ >>> # This CoordinateMatrix will have 5 columns, due to the
+ >>> # highest column index being 4, and the ensuing
+ >>> # BlockMatrix will have 5 columns as well.
+ >>> print(mat.numCols())
+ 5
+ """
+ java_block_matrix = self._java_matrix_wrapper.call("toBlockMatrix",
+ rowsPerBlock,
+ colsPerBlock)
+ return BlockMatrix(java_block_matrix, rowsPerBlock, colsPerBlock)
+
+
+def _convert_to_matrix_block_tuple(block):
+ if (isinstance(block, tuple) and len(block) == 2
+ and isinstance(block[0], tuple) and len(block[0]) == 2
+ and isinstance(block[1], Matrix)):
+ blockRowIndex = int(block[0][0])
+ blockColIndex = int(block[0][1])
+ subMatrix = block[1]
+ return ((blockRowIndex, blockColIndex), subMatrix)
+ else:
+ raise TypeError("Cannot convert type %s into a sub-matrix block tuple" % type(block))
+
+
+class BlockMatrix(DistributedMatrix):
+ """
+ .. note:: Experimental
+
+ Represents a distributed matrix in blocks of local matrices.
+
+ :param blocks: An RDD of sub-matrix blocks
+ ((blockRowIndex, blockColIndex), sub-matrix) that
+ form this distributed matrix. If multiple blocks
+ with the same index exist, the results for
+ operations like add and multiply will be
+ unpredictable.
+ :param rowsPerBlock: Number of rows that make up each block.
+ The blocks forming the final rows are not
+ required to have the given number of rows.
+ :param colsPerBlock: Number of columns that make up each block.
+ The blocks forming the final columns are not
+ required to have the given number of columns.
+ :param numRows: Number of rows of this matrix. If the supplied
+ value is less than or equal to zero, the number
+ of rows will be calculated when `numRows` is
+ invoked.
+ :param numCols: Number of columns of this matrix. If the supplied
+ value is less than or equal to zero, the number
+ of columns will be calculated when `numCols` is
+ invoked.
+ """
+ def __init__(self, blocks, rowsPerBlock, colsPerBlock, numRows=0, numCols=0):
+ """
+ Note: This docstring is not shown publicly.
+
+ Create a wrapper over a Java BlockMatrix.
+
+ Publicly, we require that `blocks` be an RDD. However, for
+ internal usage, `blocks` can also be a Java BlockMatrix
+ object, in which case we can wrap it directly. This
+ assists in clean matrix conversions.
+
+ >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
+ ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
+ >>> mat = BlockMatrix(blocks, 3, 2)
+
+ >>> mat_diff = BlockMatrix(blocks, 3, 2)
+ >>> (mat_diff._java_matrix_wrapper._java_model ==
+ ... mat._java_matrix_wrapper._java_model)
+ False
+
+ >>> mat_same = BlockMatrix(mat._java_matrix_wrapper._java_model, 3, 2)
+ >>> (mat_same._java_matrix_wrapper._java_model ==
+ ... mat._java_matrix_wrapper._java_model)
+ True
+ """
+ if isinstance(blocks, RDD):
+ blocks = blocks.map(_convert_to_matrix_block_tuple)
+ # We use DataFrames for serialization of sub-matrix blocks
+ # from Python, so first convert the RDD to a DataFrame on
+ # this side. This will convert each sub-matrix block
+ # tuple to a Row containing the 'blockRowIndex',
+ # 'blockColIndex', and 'subMatrix' values, which can
+ # each be easily serialized. We will convert back to
+ # ((blockRowIndex, blockColIndex), sub-matrix) tuples on
+ # the Scala side.
+ java_matrix = callMLlibFunc("createBlockMatrix", blocks.toDF(),
+ int(rowsPerBlock), int(colsPerBlock),
+ long(numRows), long(numCols))
+ elif (isinstance(blocks, JavaObject)
+ and blocks.getClass().getSimpleName() == "BlockMatrix"):
+ java_matrix = blocks
+ else:
+ raise TypeError("blocks should be an RDD of sub-matrix blocks as "
+ "((int, int), matrix) tuples, got %s" % type(blocks))
+
+ self._java_matrix_wrapper = JavaModelWrapper(java_matrix)
+
+ @property
+ def blocks(self):
+ """
+ The RDD of sub-matrix blocks
+ ((blockRowIndex, blockColIndex), sub-matrix) that form this
+ distributed matrix.
+
+ >>> mat = BlockMatrix(
+ ... sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
+ ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]), 3, 2)
+ >>> blocks = mat.blocks
+ >>> blocks.first()
+ ((0, 0), DenseMatrix(3, 2, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], 0))
+
+ """
+ # We use DataFrames for serialization of sub-matrix blocks
+ # from Java, so we first convert the RDD of blocks to a
+ # DataFrame on the Scala/Java side. Then we map each Row in
+ # the DataFrame back to a sub-matrix block on this side.
+ blocks_df = callMLlibFunc("getMatrixBlocks", self._java_matrix_wrapper._java_model)
+ blocks = blocks_df.map(lambda row: ((row[0][0], row[0][1]), row[1]))
+ return blocks
+
+ @property
+ def rowsPerBlock(self):
+ """
+ Number of rows that make up each block.
+
+ >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
+ ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
+ >>> mat = BlockMatrix(blocks, 3, 2)
+ >>> mat.rowsPerBlock
+ 3
+ """
+ return self._java_matrix_wrapper.call("rowsPerBlock")
+
+ @property
+ def colsPerBlock(self):
+ """
+ Number of columns that make up each block.
+
+ >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
+ ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
+ >>> mat = BlockMatrix(blocks, 3, 2)
+ >>> mat.colsPerBlock
+ 2
+ """
+ return self._java_matrix_wrapper.call("colsPerBlock")
+
+ @property
+ def numRowBlocks(self):
+ """
+ Number of rows of blocks in the BlockMatrix.
+
+ >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
+ ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
+ >>> mat = BlockMatrix(blocks, 3, 2)
+ >>> mat.numRowBlocks
+ 2
+ """
+ return self._java_matrix_wrapper.call("numRowBlocks")
+
+ @property
+ def numColBlocks(self):
+ """
+ Number of columns of blocks in the BlockMatrix.
+
+ >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
+ ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
+ >>> mat = BlockMatrix(blocks, 3, 2)
+ >>> mat.numColBlocks
+ 1
+ """
+ return self._java_matrix_wrapper.call("numColBlocks")
+
+ def numRows(self):
+ """
+ Get or compute the number of rows.
+
+ >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
+ ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
+
+ >>> mat = BlockMatrix(blocks, 3, 2)
+ >>> print(mat.numRows())
+ 6
+
+ >>> mat = BlockMatrix(blocks, 3, 2, 7, 6)
+ >>> print(mat.numRows())
+ 7
+ """
+ return self._java_matrix_wrapper.call("numRows")
+
+ def numCols(self):
+ """
+ Get or compute the number of cols.
+
+ >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
+ ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
+
+ >>> mat = BlockMatrix(blocks, 3, 2)
+ >>> print(mat.numCols())
+ 2
+
+ >>> mat = BlockMatrix(blocks, 3, 2, 7, 6)
+ >>> print(mat.numCols())
+ 6
+ """
+ return self._java_matrix_wrapper.call("numCols")
+
+ def toLocalMatrix(self):
+ """
+ Collect the distributed matrix on the driver as a DenseMatrix.
+
+ >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
+ ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
+ >>> mat = BlockMatrix(blocks, 3, 2).toLocalMatrix()
+
+ >>> # This BlockMatrix will have 6 effective rows, due to
+ >>> # having two sub-matrix blocks stacked, each with 3 rows.
+ >>> # The ensuing DenseMatrix will also have 6 rows.
+ >>> print(mat.numRows)
+ 6
+
+ >>> # This BlockMatrix will have 2 effective columns, due to
+ >>> # having two sub-matrix blocks stacked, each with 2
+ >>> # columns. The ensuing DenseMatrix will also have 2 columns.
+ >>> print(mat.numCols)
+ 2
+ """
+ return self._java_matrix_wrapper.call("toLocalMatrix")
+
+ def toIndexedRowMatrix(self):
+ """
+ Convert this matrix to an IndexedRowMatrix.
+
+ >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
+ ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
+ >>> mat = BlockMatrix(blocks, 3, 2).toIndexedRowMatrix()
+
+ >>> # This BlockMatrix will have 6 effective rows, due to
+ >>> # having two sub-matrix blocks stacked, each with 3 rows.
+ >>> # The ensuing IndexedRowMatrix will also have 6 rows.
+ >>> print(mat.numRows())
+ 6
+
+ >>> # This BlockMatrix will have 2 effective columns, due to
+ >>> # having two sub-matrix blocks stacked, each with 2 columns.
+ >>> # The ensuing IndexedRowMatrix will also have 2 columns.
+ >>> print(mat.numCols())
+ 2
+ """
+ java_indexed_row_matrix = self._java_matrix_wrapper.call("toIndexedRowMatrix")
+ return IndexedRowMatrix(java_indexed_row_matrix)
+
+ def toCoordinateMatrix(self):
+ """
+ Convert this matrix to a CoordinateMatrix.
+
+ >>> blocks = sc.parallelize([((0, 0), Matrices.dense(1, 2, [1, 2])),
+ ... ((1, 0), Matrices.dense(1, 2, [7, 8]))])
+ >>> mat = BlockMatrix(blocks, 1, 2).toCoordinateMatrix()
+ >>> mat.entries.take(3)
+ [MatrixEntry(0, 0, 1.0), MatrixEntry(0, 1, 2.0), MatrixEntry(1, 0, 7.0)]
+ """
+ java_coordinate_matrix = self._java_matrix_wrapper.call("toCoordinateMatrix")
+ return CoordinateMatrix(java_coordinate_matrix)
+
def _test():
import doctest
from pyspark import SparkContext
from pyspark.sql import SQLContext
+ from pyspark.mllib.linalg import Matrices
import pyspark.mllib.linalg.distributed
globs = pyspark.mllib.linalg.distributed.__dict__.copy()
globs['sc'] = SparkContext('local[2]', 'PythonTest', batchSize=2)
globs['sqlContext'] = SQLContext(globs['sc'])
+ globs['Matrices'] = Matrices
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count: