aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Dusenberry <mwdusenb@us.ibm.com>2015-08-05 07:40:50 -0700
committerXiangrui Meng <meng@databricks.com>2015-08-05 07:40:50 -0700
commit34dcf10104460816382908b2b8eeb6c925e862bf (patch)
treea4767e939dd4a33e2d3eda23696ffcf03f55c4fa
parent519cf6d3f764a977770266784d6902fe205a070f (diff)
downloadspark-34dcf10104460816382908b2b8eeb6c925e862bf.tar.gz
spark-34dcf10104460816382908b2b8eeb6c925e862bf.tar.bz2
spark-34dcf10104460816382908b2b8eeb6c925e862bf.zip
[SPARK-6486] [MLLIB] [PYTHON] Add BlockMatrix to PySpark.
mengxr This adds the `BlockMatrix` to PySpark. I have the conversions to `IndexedRowMatrix` and `CoordinateMatrix` ready as well, so once PR #7554 is completed (which relies on PR #7746), this PR can be finished. Author: Mike Dusenberry <mwdusenb@us.ibm.com> Closes #7761 from dusenberrymw/SPARK-6486_Add_BlockMatrix_to_PySpark and squashes the following commits: 27195c2 [Mike Dusenberry] Adding one more check to _convert_to_matrix_block_tuple, and a few minor documentation changes. ae50883 [Mike Dusenberry] Minor update: BlockMatrix should inherit from DistributedMatrix. b8acc1c [Mike Dusenberry] Moving BlockMatrix to pyspark.mllib.linalg.distributed, updating the logic to match that of the other distributed matrices, adding conversions, and adding documentation. c014002 [Mike Dusenberry] Using properties for better documentation. 3bda6ab [Mike Dusenberry] Adding documentation. 8fb3095 [Mike Dusenberry] Small cleanup. e17af2e [Mike Dusenberry] Adding BlockMatrix to PySpark.
-rw-r--r--docs/mllib-data-types.md41
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala25
-rw-r--r--python/pyspark/mllib/linalg/distributed.py328
3 files changed, 388 insertions, 6 deletions
diff --git a/docs/mllib-data-types.md b/docs/mllib-data-types.md
index 11033bf4f9..f0e8d54956 100644
--- a/docs/mllib-data-types.md
+++ b/docs/mllib-data-types.md
@@ -494,6 +494,9 @@ rowMat = mat.toRowMatrix()
# Convert to a CoordinateMatrix.
coordinateMat = mat.toCoordinateMatrix()
+
+# Convert to a BlockMatrix.
+blockMat = mat.toBlockMatrix()
{% endhighlight %}
</div>
@@ -594,6 +597,9 @@ rowMat = mat.toRowMatrix()
# Convert to an IndexedRowMatrix.
indexedRowMat = mat.toIndexedRowMatrix()
+
+# Convert to a BlockMatrix.
+blockMat = mat.toBlockMatrix()
{% endhighlight %}
</div>
@@ -661,4 +667,39 @@ matA.validate();
BlockMatrix ata = matA.transpose().multiply(matA);
{% endhighlight %}
</div>
+
+<div data-lang="python" markdown="1">
+
+A [`BlockMatrix`](api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.BlockMatrix)
+can be created from an `RDD` of sub-matrix blocks, where a sub-matrix block is a
+`((blockRowIndex, blockColIndex), sub-matrix)` tuple.
+
+{% highlight python %}
+from pyspark.mllib.linalg import Matrices
+from pyspark.mllib.linalg.distributed import BlockMatrix
+
+# Create an RDD of sub-matrix blocks.
+blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
+ ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
+
+# Create a BlockMatrix from an RDD of sub-matrix blocks.
+mat = BlockMatrix(blocks, 3, 2)
+
+# Get its size.
+m = mat.numRows() # 6
+n = mat.numCols() # 2
+
+# Get the blocks as an RDD of sub-matrix blocks.
+blocksRDD = mat.blocks
+
+# Convert to a LocalMatrix.
+localMat = mat.toLocalMatrix()
+
+# Convert to an IndexedRowMatrix.
+indexedRowMat = mat.toIndexedRowMatrix()
+
+# Convert to a CoordinateMatrix.
+coordinateMat = mat.toCoordinateMatrix()
+{% endhighlight %}
+</div>
</div>
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index d2b3fae381..f585aacd45 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -1129,6 +1129,21 @@ private[python] class PythonMLLibAPI extends Serializable {
}
/**
+ * Wrapper around BlockMatrix constructor.
+ */
+ def createBlockMatrix(blocks: DataFrame, rowsPerBlock: Int, colsPerBlock: Int,
+ numRows: Long, numCols: Long): BlockMatrix = {
+ // We use DataFrames for serialization of sub-matrix blocks from
+ // Python, so map each Row in the DataFrame back to a
+ // ((blockRowIndex, blockColIndex), sub-matrix) tuple.
+ val blockTuples = blocks.map {
+ case Row(Row(blockRowIndex: Long, blockColIndex: Long), subMatrix: Matrix) =>
+ ((blockRowIndex.toInt, blockColIndex.toInt), subMatrix)
+ }
+ new BlockMatrix(blockTuples, rowsPerBlock, colsPerBlock, numRows, numCols)
+ }
+
+ /**
* Return the rows of an IndexedRowMatrix.
*/
def getIndexedRows(indexedRowMatrix: IndexedRowMatrix): DataFrame = {
@@ -1147,6 +1162,16 @@ private[python] class PythonMLLibAPI extends Serializable {
val sqlContext = new SQLContext(coordinateMatrix.entries.sparkContext)
sqlContext.createDataFrame(coordinateMatrix.entries)
}
+
+ /**
+ * Return the sub-matrix blocks of a BlockMatrix.
+ */
+ def getMatrixBlocks(blockMatrix: BlockMatrix): DataFrame = {
+ // We use DataFrames for serialization of sub-matrix blocks to
+ // Python, so return a DataFrame.
+ val sqlContext = new SQLContext(blockMatrix.blocks.sparkContext)
+ sqlContext.createDataFrame(blockMatrix.blocks)
+ }
}
/**
diff --git a/python/pyspark/mllib/linalg/distributed.py b/python/pyspark/mllib/linalg/distributed.py
index 666d833019..aec407de90 100644
--- a/python/pyspark/mllib/linalg/distributed.py
+++ b/python/pyspark/mllib/linalg/distributed.py
@@ -28,11 +28,12 @@ from py4j.java_gateway import JavaObject
from pyspark import RDD
from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
-from pyspark.mllib.linalg import _convert_to_vector
+from pyspark.mllib.linalg import _convert_to_vector, Matrix
__all__ = ['DistributedMatrix', 'RowMatrix', 'IndexedRow',
- 'IndexedRowMatrix', 'MatrixEntry', 'CoordinateMatrix']
+ 'IndexedRowMatrix', 'MatrixEntry', 'CoordinateMatrix',
+ 'BlockMatrix']
class DistributedMatrix(object):
@@ -322,6 +323,35 @@ class IndexedRowMatrix(DistributedMatrix):
java_coordinate_matrix = self._java_matrix_wrapper.call("toCoordinateMatrix")
return CoordinateMatrix(java_coordinate_matrix)
+ def toBlockMatrix(self, rowsPerBlock=1024, colsPerBlock=1024):
+ """
+ Convert this matrix to a BlockMatrix.
+
+ :param rowsPerBlock: Number of rows that make up each block.
+ The blocks forming the final rows are not
+ required to have the given number of rows.
+ :param colsPerBlock: Number of columns that make up each block.
+ The blocks forming the final columns are not
+ required to have the given number of columns.
+
+ >>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
+ ... IndexedRow(6, [4, 5, 6])])
+ >>> mat = IndexedRowMatrix(rows).toBlockMatrix()
+
+ >>> # This IndexedRowMatrix will have 7 effective rows, due to
+ >>> # the highest row index being 6, and the ensuing
+ >>> # BlockMatrix will have 7 rows as well.
+ >>> print(mat.numRows())
+ 7
+
+ >>> print(mat.numCols())
+ 3
+ """
+ java_block_matrix = self._java_matrix_wrapper.call("toBlockMatrix",
+ rowsPerBlock,
+ colsPerBlock)
+ return BlockMatrix(java_block_matrix, rowsPerBlock, colsPerBlock)
+
class MatrixEntry(object):
"""
@@ -476,19 +506,18 @@ class CoordinateMatrix(DistributedMatrix):
>>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
... MatrixEntry(6, 4, 2.1)])
+ >>> mat = CoordinateMatrix(entries).toRowMatrix()
>>> # This CoordinateMatrix will have 7 effective rows, due to
>>> # the highest row index being 6, but the ensuing RowMatrix
>>> # will only have 2 rows since there are only entries on 2
>>> # unique rows.
- >>> mat = CoordinateMatrix(entries).toRowMatrix()
>>> print(mat.numRows())
2
>>> # This CoordinateMatrix will have 5 columns, due to the
>>> # highest column index being 4, and the ensuing RowMatrix
>>> # will have 5 columns as well.
- >>> mat = CoordinateMatrix(entries).toRowMatrix()
>>> print(mat.numCols())
5
"""
@@ -501,33 +530,320 @@ class CoordinateMatrix(DistributedMatrix):
>>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
... MatrixEntry(6, 4, 2.1)])
+ >>> mat = CoordinateMatrix(entries).toIndexedRowMatrix()
>>> # This CoordinateMatrix will have 7 effective rows, due to
>>> # the highest row index being 6, and the ensuing
>>> # IndexedRowMatrix will have 7 rows as well.
- >>> mat = CoordinateMatrix(entries).toIndexedRowMatrix()
>>> print(mat.numRows())
7
>>> # This CoordinateMatrix will have 5 columns, due to the
>>> # highest column index being 4, and the ensuing
>>> # IndexedRowMatrix will have 5 columns as well.
- >>> mat = CoordinateMatrix(entries).toIndexedRowMatrix()
>>> print(mat.numCols())
5
"""
java_indexed_row_matrix = self._java_matrix_wrapper.call("toIndexedRowMatrix")
return IndexedRowMatrix(java_indexed_row_matrix)
+ def toBlockMatrix(self, rowsPerBlock=1024, colsPerBlock=1024):
+ """
+ Convert this matrix to a BlockMatrix.
+
+ :param rowsPerBlock: Number of rows that make up each block.
+ The blocks forming the final rows are not
+ required to have the given number of rows.
+ :param colsPerBlock: Number of columns that make up each block.
+ The blocks forming the final columns are not
+ required to have the given number of columns.
+
+ >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
+ ... MatrixEntry(6, 4, 2.1)])
+ >>> mat = CoordinateMatrix(entries).toBlockMatrix()
+
+ >>> # This CoordinateMatrix will have 7 effective rows, due to
+ >>> # the highest row index being 6, and the ensuing
+ >>> # BlockMatrix will have 7 rows as well.
+ >>> print(mat.numRows())
+ 7
+
+ >>> # This CoordinateMatrix will have 5 columns, due to the
+ >>> # highest column index being 4, and the ensuing
+ >>> # BlockMatrix will have 5 columns as well.
+ >>> print(mat.numCols())
+ 5
+ """
+ java_block_matrix = self._java_matrix_wrapper.call("toBlockMatrix",
+ rowsPerBlock,
+ colsPerBlock)
+ return BlockMatrix(java_block_matrix, rowsPerBlock, colsPerBlock)
+
+
+def _convert_to_matrix_block_tuple(block):
+ if (isinstance(block, tuple) and len(block) == 2
+ and isinstance(block[0], tuple) and len(block[0]) == 2
+ and isinstance(block[1], Matrix)):
+ blockRowIndex = int(block[0][0])
+ blockColIndex = int(block[0][1])
+ subMatrix = block[1]
+ return ((blockRowIndex, blockColIndex), subMatrix)
+ else:
+ raise TypeError("Cannot convert type %s into a sub-matrix block tuple" % type(block))
+
+
+class BlockMatrix(DistributedMatrix):
+ """
+ .. note:: Experimental
+
+ Represents a distributed matrix in blocks of local matrices.
+
+ :param blocks: An RDD of sub-matrix blocks
+ ((blockRowIndex, blockColIndex), sub-matrix) that
+ form this distributed matrix. If multiple blocks
+ with the same index exist, the results for
+ operations like add and multiply will be
+ unpredictable.
+ :param rowsPerBlock: Number of rows that make up each block.
+ The blocks forming the final rows are not
+ required to have the given number of rows.
+ :param colsPerBlock: Number of columns that make up each block.
+ The blocks forming the final columns are not
+ required to have the given number of columns.
+ :param numRows: Number of rows of this matrix. If the supplied
+ value is less than or equal to zero, the number
+ of rows will be calculated when `numRows` is
+ invoked.
+ :param numCols: Number of columns of this matrix. If the supplied
+ value is less than or equal to zero, the number
+ of columns will be calculated when `numCols` is
+ invoked.
+ """
+ def __init__(self, blocks, rowsPerBlock, colsPerBlock, numRows=0, numCols=0):
+ """
+ Note: This docstring is not shown publicly.
+
+ Create a wrapper over a Java BlockMatrix.
+
+ Publicly, we require that `blocks` be an RDD. However, for
+ internal usage, `blocks` can also be a Java BlockMatrix
+ object, in which case we can wrap it directly. This
+ assists in clean matrix conversions.
+
+ >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
+ ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
+ >>> mat = BlockMatrix(blocks, 3, 2)
+
+ >>> mat_diff = BlockMatrix(blocks, 3, 2)
+ >>> (mat_diff._java_matrix_wrapper._java_model ==
+ ... mat._java_matrix_wrapper._java_model)
+ False
+
+ >>> mat_same = BlockMatrix(mat._java_matrix_wrapper._java_model, 3, 2)
+ >>> (mat_same._java_matrix_wrapper._java_model ==
+ ... mat._java_matrix_wrapper._java_model)
+ True
+ """
+ if isinstance(blocks, RDD):
+ blocks = blocks.map(_convert_to_matrix_block_tuple)
+ # We use DataFrames for serialization of sub-matrix blocks
+ # from Python, so first convert the RDD to a DataFrame on
+ # this side. This will convert each sub-matrix block
+ # tuple to a Row containing the 'blockRowIndex',
+ # 'blockColIndex', and 'subMatrix' values, which can
+ # each be easily serialized. We will convert back to
+ # ((blockRowIndex, blockColIndex), sub-matrix) tuples on
+ # the Scala side.
+ java_matrix = callMLlibFunc("createBlockMatrix", blocks.toDF(),
+ int(rowsPerBlock), int(colsPerBlock),
+ long(numRows), long(numCols))
+ elif (isinstance(blocks, JavaObject)
+ and blocks.getClass().getSimpleName() == "BlockMatrix"):
+ java_matrix = blocks
+ else:
+ raise TypeError("blocks should be an RDD of sub-matrix blocks as "
+ "((int, int), matrix) tuples, got %s" % type(blocks))
+
+ self._java_matrix_wrapper = JavaModelWrapper(java_matrix)
+
+ @property
+ def blocks(self):
+ """
+ The RDD of sub-matrix blocks
+ ((blockRowIndex, blockColIndex), sub-matrix) that form this
+ distributed matrix.
+
+ >>> mat = BlockMatrix(
+ ... sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
+ ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]), 3, 2)
+ >>> blocks = mat.blocks
+ >>> blocks.first()
+ ((0, 0), DenseMatrix(3, 2, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], 0))
+
+ """
+ # We use DataFrames for serialization of sub-matrix blocks
+ # from Java, so we first convert the RDD of blocks to a
+ # DataFrame on the Scala/Java side. Then we map each Row in
+ # the DataFrame back to a sub-matrix block on this side.
+ blocks_df = callMLlibFunc("getMatrixBlocks", self._java_matrix_wrapper._java_model)
+ blocks = blocks_df.map(lambda row: ((row[0][0], row[0][1]), row[1]))
+ return blocks
+
+ @property
+ def rowsPerBlock(self):
+ """
+ Number of rows that make up each block.
+
+ >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
+ ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
+ >>> mat = BlockMatrix(blocks, 3, 2)
+ >>> mat.rowsPerBlock
+ 3
+ """
+ return self._java_matrix_wrapper.call("rowsPerBlock")
+
+ @property
+ def colsPerBlock(self):
+ """
+ Number of columns that make up each block.
+
+ >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
+ ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
+ >>> mat = BlockMatrix(blocks, 3, 2)
+ >>> mat.colsPerBlock
+ 2
+ """
+ return self._java_matrix_wrapper.call("colsPerBlock")
+
+ @property
+ def numRowBlocks(self):
+ """
+ Number of rows of blocks in the BlockMatrix.
+
+ >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
+ ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
+ >>> mat = BlockMatrix(blocks, 3, 2)
+ >>> mat.numRowBlocks
+ 2
+ """
+ return self._java_matrix_wrapper.call("numRowBlocks")
+
+ @property
+ def numColBlocks(self):
+ """
+ Number of columns of blocks in the BlockMatrix.
+
+ >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
+ ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
+ >>> mat = BlockMatrix(blocks, 3, 2)
+ >>> mat.numColBlocks
+ 1
+ """
+ return self._java_matrix_wrapper.call("numColBlocks")
+
+ def numRows(self):
+ """
+ Get or compute the number of rows.
+
+ >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
+ ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
+
+ >>> mat = BlockMatrix(blocks, 3, 2)
+ >>> print(mat.numRows())
+ 6
+
+ >>> mat = BlockMatrix(blocks, 3, 2, 7, 6)
+ >>> print(mat.numRows())
+ 7
+ """
+ return self._java_matrix_wrapper.call("numRows")
+
+ def numCols(self):
+ """
+ Get or compute the number of cols.
+
+ >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
+ ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
+
+ >>> mat = BlockMatrix(blocks, 3, 2)
+ >>> print(mat.numCols())
+ 2
+
+ >>> mat = BlockMatrix(blocks, 3, 2, 7, 6)
+ >>> print(mat.numCols())
+ 6
+ """
+ return self._java_matrix_wrapper.call("numCols")
+
+ def toLocalMatrix(self):
+ """
+ Collect the distributed matrix on the driver as a DenseMatrix.
+
+ >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
+ ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
+ >>> mat = BlockMatrix(blocks, 3, 2).toLocalMatrix()
+
+ >>> # This BlockMatrix will have 6 effective rows, due to
+ >>> # having two sub-matrix blocks stacked, each with 3 rows.
+ >>> # The ensuing DenseMatrix will also have 6 rows.
+ >>> print(mat.numRows)
+ 6
+
+ >>> # This BlockMatrix will have 2 effective columns, due to
+ >>> # having two sub-matrix blocks stacked, each with 2
+ >>> # columns. The ensuing DenseMatrix will also have 2 columns.
+ >>> print(mat.numCols)
+ 2
+ """
+ return self._java_matrix_wrapper.call("toLocalMatrix")
+
+ def toIndexedRowMatrix(self):
+ """
+ Convert this matrix to an IndexedRowMatrix.
+
+ >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
+ ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
+ >>> mat = BlockMatrix(blocks, 3, 2).toIndexedRowMatrix()
+
+ >>> # This BlockMatrix will have 6 effective rows, due to
+ >>> # having two sub-matrix blocks stacked, each with 3 rows.
+ >>> # The ensuing IndexedRowMatrix will also have 6 rows.
+ >>> print(mat.numRows())
+ 6
+
+ >>> # This BlockMatrix will have 2 effective columns, due to
+ >>> # having two sub-matrix blocks stacked, each with 2 columns.
+ >>> # The ensuing IndexedRowMatrix will also have 2 columns.
+ >>> print(mat.numCols())
+ 2
+ """
+ java_indexed_row_matrix = self._java_matrix_wrapper.call("toIndexedRowMatrix")
+ return IndexedRowMatrix(java_indexed_row_matrix)
+
+ def toCoordinateMatrix(self):
+ """
+ Convert this matrix to a CoordinateMatrix.
+
+ >>> blocks = sc.parallelize([((0, 0), Matrices.dense(1, 2, [1, 2])),
+ ... ((1, 0), Matrices.dense(1, 2, [7, 8]))])
+ >>> mat = BlockMatrix(blocks, 1, 2).toCoordinateMatrix()
+ >>> mat.entries.take(3)
+ [MatrixEntry(0, 0, 1.0), MatrixEntry(0, 1, 2.0), MatrixEntry(1, 0, 7.0)]
+ """
+ java_coordinate_matrix = self._java_matrix_wrapper.call("toCoordinateMatrix")
+ return CoordinateMatrix(java_coordinate_matrix)
+
def _test():
import doctest
from pyspark import SparkContext
from pyspark.sql import SQLContext
+ from pyspark.mllib.linalg import Matrices
import pyspark.mllib.linalg.distributed
globs = pyspark.mllib.linalg.distributed.__dict__.copy()
globs['sc'] = SparkContext('local[2]', 'PythonTest', batchSize=2)
globs['sqlContext'] = SQLContext(globs['sc'])
+ globs['Matrices'] = Matrices
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count: