# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # """ Package for distributed linear algebra. """ import sys if sys.version >= '3': long = int from py4j.java_gateway import JavaObject from pyspark import RDD from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper from pyspark.mllib.linalg import _convert_to_vector __all__ = ['DistributedMatrix', 'RowMatrix', 'IndexedRow', 'IndexedRowMatrix', 'MatrixEntry', 'CoordinateMatrix'] class DistributedMatrix(object): """ .. note:: Experimental Represents a distributively stored matrix backed by one or more RDDs. """ def numRows(self): """Get or compute the number of rows.""" raise NotImplementedError def numCols(self): """Get or compute the number of cols.""" raise NotImplementedError class RowMatrix(DistributedMatrix): """ .. note:: Experimental Represents a row-oriented distributed Matrix with no meaningful row indices. :param rows: An RDD of vectors. :param numRows: Number of rows in the matrix. A non-positive value means unknown, at which point the number of rows will be determined by the number of records in the `rows` RDD. :param numCols: Number of columns in the matrix. A non-positive value means unknown, at which point the number of columns will be determined by the size of the first row. """ def __init__(self, rows, numRows=0, numCols=0): """ Note: This docstring is not shown publicly. Create a wrapper over a Java RowMatrix. Publicly, we require that `rows` be an RDD. However, for internal usage, `rows` can also be a Java RowMatrix object, in which case we can wrap it directly. This assists in clean matrix conversions. >>> rows = sc.parallelize([[1, 2, 3], [4, 5, 6]]) >>> mat = RowMatrix(rows) >>> mat_diff = RowMatrix(rows) >>> (mat_diff._java_matrix_wrapper._java_model == ... mat._java_matrix_wrapper._java_model) False >>> mat_same = RowMatrix(mat._java_matrix_wrapper._java_model) >>> (mat_same._java_matrix_wrapper._java_model == ... mat._java_matrix_wrapper._java_model) True """ if isinstance(rows, RDD): rows = rows.map(_convert_to_vector) java_matrix = callMLlibFunc("createRowMatrix", rows, long(numRows), int(numCols)) elif (isinstance(rows, JavaObject) and rows.getClass().getSimpleName() == "RowMatrix"): java_matrix = rows else: raise TypeError("rows should be an RDD of vectors, got %s" % type(rows)) self._java_matrix_wrapper = JavaModelWrapper(java_matrix) @property def rows(self): """ Rows of the RowMatrix stored as an RDD of vectors. >>> mat = RowMatrix(sc.parallelize([[1, 2, 3], [4, 5, 6]])) >>> rows = mat.rows >>> rows.first() DenseVector([1.0, 2.0, 3.0]) """ return self._java_matrix_wrapper.call("rows") def numRows(self): """ Get or compute the number of rows. >>> rows = sc.parallelize([[1, 2, 3], [4, 5, 6], ... [7, 8, 9], [10, 11, 12]]) >>> mat = RowMatrix(rows) >>> print(mat.numRows()) 4 >>> mat = RowMatrix(rows, 7, 6) >>> print(mat.numRows()) 7 """ return self._java_matrix_wrapper.call("numRows") def numCols(self): """ Get or compute the number of cols. >>> rows = sc.parallelize([[1, 2, 3], [4, 5, 6], ... [7, 8, 9], [10, 11, 12]]) >>> mat = RowMatrix(rows) >>> print(mat.numCols()) 3 >>> mat = RowMatrix(rows, 7, 6) >>> print(mat.numCols()) 6 """ return self._java_matrix_wrapper.call("numCols") class IndexedRow(object): """ .. note:: Experimental Represents a row of an IndexedRowMatrix. Just a wrapper over a (long, vector) tuple. :param index: The index for the given row. :param vector: The row in the matrix at the given index. """ def __init__(self, index, vector): self.index = long(index) self.vector = _convert_to_vector(vector) def __repr__(self): return "IndexedRow(%s, %s)" % (self.index, self.vector) def _convert_to_indexed_row(row): if isinstance(row, IndexedRow): return row elif isinstance(row, tuple) and len(row) == 2: return IndexedRow(*row) else: raise TypeError("Cannot convert type %s into IndexedRow" % type(row)) class IndexedRowMatrix(DistributedMatrix): """ .. note:: Experimental Represents a row-oriented distributed Matrix with indexed rows. :param rows: An RDD of IndexedRows or (long, vector) tuples. :param numRows: Number of rows in the matrix. A non-positive value means unknown, at which point the number of rows will be determined by the max row index plus one. :param numCols: Number of columns in the matrix. A non-positive value means unknown, at which point the number of columns will be determined by the size of the first row. """ def __init__(self, rows, numRows=0, numCols=0): """ Note: This docstring is not shown publicly. Create a wrapper over a Java IndexedRowMatrix. Publicly, we require that `rows` be an RDD. However, for internal usage, `rows` can also be a Java IndexedRowMatrix object, in which case we can wrap it directly. This assists in clean matrix conversions. >>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]), ... IndexedRow(1, [4, 5, 6])]) >>> mat = IndexedRowMatrix(rows) >>> mat_diff = IndexedRowMatrix(rows) >>> (mat_diff._java_matrix_wrapper._java_model == ... mat._java_matrix_wrapper._java_model) False >>> mat_same = IndexedRowMatrix(mat._java_matrix_wrapper._java_model) >>> (mat_same._java_matrix_wrapper._java_model == ... mat._java_matrix_wrapper._java_model) True """ if isinstance(rows, RDD): rows = rows.map(_convert_to_indexed_row) # We use DataFrames for serialization of IndexedRows from # Python, so first convert the RDD to a DataFrame on this # side. This will convert each IndexedRow to a Row # containing the 'index' and 'vector' values, which can # both be easily serialized. We will convert back to # IndexedRows on the Scala side. java_matrix = callMLlibFunc("createIndexedRowMatrix", rows.toDF(), long(numRows), int(numCols)) elif (isinstance(rows, JavaObject) and rows.getClass().getSimpleName() == "IndexedRowMatrix"): java_matrix = rows else: raise TypeError("rows should be an RDD of IndexedRows or (long, vector) tuples, " "got %s" % type(rows)) self._java_matrix_wrapper = JavaModelWrapper(java_matrix) @property def rows(self): """ Rows of the IndexedRowMatrix stored as an RDD of IndexedRows. >>> mat = IndexedRowMatrix(sc.parallelize([IndexedRow(0, [1, 2, 3]), ... IndexedRow(1, [4, 5, 6])])) >>> rows = mat.rows >>> rows.first() IndexedRow(0, [1.0,2.0,3.0]) """ # We use DataFrames for serialization of IndexedRows from # Java, so we first convert the RDD of rows to a DataFrame # on the Scala/Java side. Then we map each Row in the # DataFrame back to an IndexedRow on this side. rows_df = callMLlibFunc("getIndexedRows", self._java_matrix_wrapper._java_model) rows = rows_df.map(lambda row: IndexedRow(row[0], row[1])) return rows def numRows(self): """ Get or compute the number of rows. >>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]), ... IndexedRow(1, [4, 5, 6]), ... IndexedRow(2, [7, 8, 9]), ... IndexedRow(3, [10, 11, 12])]) >>> mat = IndexedRowMatrix(rows) >>> print(mat.numRows()) 4 >>> mat = IndexedRowMatrix(rows, 7, 6) >>> print(mat.numRows()) 7 """ return self._java_matrix_wrapper.call("numRows") def numCols(self): """ Get or compute the number of cols. >>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]), ... IndexedRow(1, [4, 5, 6]), ... IndexedRow(2, [7, 8, 9]), ... IndexedRow(3, [10, 11, 12])]) >>> mat = IndexedRowMatrix(rows) >>> print(mat.numCols()) 3 >>> mat = IndexedRowMatrix(rows, 7, 6) >>> print(mat.numCols()) 6 """ return self._java_matrix_wrapper.call("numCols") def toRowMatrix(self): """ Convert this matrix to a RowMatrix. >>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]), ... IndexedRow(6, [4, 5, 6])]) >>> mat = IndexedRowMatrix(rows).toRowMatrix() >>> mat.rows.collect() [DenseVector([1.0, 2.0, 3.0]), DenseVector([4.0, 5.0, 6.0])] """ java_row_matrix = self._java_matrix_wrapper.call("toRowMatrix") return RowMatrix(java_row_matrix) def toCoordinateMatrix(self): """ Convert this matrix to a CoordinateMatrix. >>> rows = sc.parallelize([IndexedRow(0, [1, 0]), ... IndexedRow(6, [0, 5])]) >>> mat = IndexedRowMatrix(rows).toCoordinateMatrix() >>> mat.entries.take(3) [MatrixEntry(0, 0, 1.0), MatrixEntry(0, 1, 0.0), MatrixEntry(6, 0, 0.0)] """ java_coordinate_matrix = self._java_matrix_wrapper.call("toCoordinateMatrix") return CoordinateMatrix(java_coordinate_matrix) class MatrixEntry(object): """ .. note:: Experimental Represents an entry of a CoordinateMatrix. Just a wrapper over a (long, long, float) tuple. :param i: The row index of the matrix. :param j: The column index of the matrix. :param value: The (i, j)th entry of the matrix, as a float. """ def __init__(self, i, j, value): self.i = long(i) self.j = long(j) self.value = float(value) def __repr__(self): return "MatrixEntry(%s, %s, %s)" % (self.i, self.j, self.value) def _convert_to_matrix_entry(entry): if isinstance(entry, MatrixEntry): return entry elif isinstance(entry, tuple) and len(entry) == 3: return MatrixEntry(*entry) else: raise TypeError("Cannot convert type %s into MatrixEntry" % type(entry)) class CoordinateMatrix(DistributedMatrix): """ .. note:: Experimental Represents a matrix in coordinate format. :param entries: An RDD of MatrixEntry inputs or (long, long, float) tuples. :param numRows: Number of rows in the matrix. A non-positive value means unknown, at which point the number of rows will be determined by the max row index plus one. :param numCols: Number of columns in the matrix. A non-positive value means unknown, at which point the number of columns will be determined by the max row index plus one. """ def __init__(self, entries, numRows=0, numCols=0): """ Note: This docstring is not shown publicly. Create a wrapper over a Java CoordinateMatrix. Publicly, we require that `rows` be an RDD. However, for internal usage, `rows` can also be a Java CoordinateMatrix object, in which case we can wrap it directly. This assists in clean matrix conversions. >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2), ... MatrixEntry(6, 4, 2.1)]) >>> mat = CoordinateMatrix(entries) >>> mat_diff = CoordinateMatrix(entries) >>> (mat_diff._java_matrix_wrapper._java_model == ... mat._java_matrix_wrapper._java_model) False >>> mat_same = CoordinateMatrix(mat._java_matrix_wrapper._java_model) >>> (mat_same._java_matrix_wrapper._java_model == ... mat._java_matrix_wrapper._java_model) True """ if isinstance(entries, RDD): entries = entries.map(_convert_to_matrix_entry) # We use DataFrames for serialization of MatrixEntry entries # from Python, so first convert the RDD to a DataFrame on # this side. This will convert each MatrixEntry to a Row # containing the 'i', 'j', and 'value' values, which can # each be easily serialized. We will convert back to # MatrixEntry inputs on the Scala side. java_matrix = callMLlibFunc("createCoordinateMatrix", entries.toDF(), long(numRows), long(numCols)) elif (isinstance(entries, JavaObject) and entries.getClass().getSimpleName() == "CoordinateMatrix"): java_matrix = entries else: raise TypeError("entries should be an RDD of MatrixEntry entries or " "(long, long, float) tuples, got %s" % type(entries)) self._java_matrix_wrapper = JavaModelWrapper(java_matrix) @property def entries(self): """ Entries of the CoordinateMatrix stored as an RDD of MatrixEntries. >>> mat = CoordinateMatrix(sc.parallelize([MatrixEntry(0, 0, 1.2), ... MatrixEntry(6, 4, 2.1)])) >>> entries = mat.entries >>> entries.first() MatrixEntry(0, 0, 1.2) """ # We use DataFrames for serialization of MatrixEntry entries # from Java, so we first convert the RDD of entries to a # DataFrame on the Scala/Java side. Then we map each Row in # the DataFrame back to a MatrixEntry on this side. entries_df = callMLlibFunc("getMatrixEntries", self._java_matrix_wrapper._java_model) entries = entries_df.map(lambda row: MatrixEntry(row[0], row[1], row[2])) return entries def numRows(self): """ Get or compute the number of rows. >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2), ... MatrixEntry(1, 0, 2), ... MatrixEntry(2, 1, 3.7)]) >>> mat = CoordinateMatrix(entries) >>> print(mat.numRows()) 3 >>> mat = CoordinateMatrix(entries, 7, 6) >>> print(mat.numRows()) 7 """ return self._java_matrix_wrapper.call("numRows") def numCols(self): """ Get or compute the number of cols. >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2), ... MatrixEntry(1, 0, 2), ... MatrixEntry(2, 1, 3.7)]) >>> mat = CoordinateMatrix(entries) >>> print(mat.numCols()) 2 >>> mat = CoordinateMatrix(entries, 7, 6) >>> print(mat.numCols()) 6 """ return self._java_matrix_wrapper.call("numCols") def toRowMatrix(self): """ Convert this matrix to a RowMatrix. >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2), ... MatrixEntry(6, 4, 2.1)]) >>> # This CoordinateMatrix will have 7 effective rows, due to >>> # the highest row index being 6, but the ensuing RowMatrix >>> # will only have 2 rows since there are only entries on 2 >>> # unique rows. >>> mat = CoordinateMatrix(entries).toRowMatrix() >>> print(mat.numRows()) 2 >>> # This CoordinateMatrix will have 5 columns, due to the >>> # highest column index being 4, and the ensuing RowMatrix >>> # will have 5 columns as well. >>> mat = CoordinateMatrix(entries).toRowMatrix() >>> print(mat.numCols()) 5 """ java_row_matrix = self._java_matrix_wrapper.call("toRowMatrix") return RowMatrix(java_row_matrix) def toIndexedRowMatrix(self): """ Convert this matrix to an IndexedRowMatrix. >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2), ... MatrixEntry(6, 4, 2.1)]) >>> # This CoordinateMatrix will have 7 effective rows, due to >>> # the highest row index being 6, and the ensuing >>> # IndexedRowMatrix will have 7 rows as well. >>> mat = CoordinateMatrix(entries).toIndexedRowMatrix() >>> print(mat.numRows()) 7 >>> # This CoordinateMatrix will have 5 columns, due to the >>> # highest column index being 4, and the ensuing >>> # IndexedRowMatrix will have 5 columns as well. >>> mat = CoordinateMatrix(entries).toIndexedRowMatrix() >>> print(mat.numCols()) 5 """ java_indexed_row_matrix = self._java_matrix_wrapper.call("toIndexedRowMatrix") return IndexedRowMatrix(java_indexed_row_matrix) def _test(): import doctest from pyspark import SparkContext from pyspark.sql import SQLContext import pyspark.mllib.linalg.distributed globs = pyspark.mllib.linalg.distributed.__dict__.copy() globs['sc'] = SparkContext('local[2]', 'PythonTest', batchSize=2) globs['sqlContext'] = SQLContext(globs['sc']) (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1) if __name__ == "__main__": _test()