diff options
-rw-r--r-- | dev/sparktestsupport/modules.py | 1 | ||||
-rw-r--r-- | docs/mllib-data-types.md | 106 | ||||
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala | 53 | ||||
-rw-r--r-- | python/docs/pyspark.mllib.rst | 8 | ||||
-rw-r--r-- | python/pyspark/mllib/common.py | 2 | ||||
-rw-r--r-- | python/pyspark/mllib/linalg/distributed.py | 537 |
6 files changed, 704 insertions, 3 deletions
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 956dc81b62..a9717ff956 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -331,6 +331,7 @@ pyspark_mllib = Module( "pyspark.mllib.feature", "pyspark.mllib.fpm", "pyspark.mllib.linalg.__init__", + "pyspark.mllib.linalg.distributed", "pyspark.mllib.random", "pyspark.mllib.recommendation", "pyspark.mllib.regression", diff --git a/docs/mllib-data-types.md b/docs/mllib-data-types.md index 3aa040046f..11033bf4f9 100644 --- a/docs/mllib-data-types.md +++ b/docs/mllib-data-types.md @@ -372,12 +372,37 @@ long m = mat.numRows(); long n = mat.numCols(); {% endhighlight %} </div> + +<div data-lang="python" markdown="1"> + +A [`RowMatrix`](api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.RowMatrix) can be +created from an `RDD` of vectors. + +{% highlight python %} +from pyspark.mllib.linalg.distributed import RowMatrix + +# Create an RDD of vectors. +rows = sc.parallelize([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]]) + +# Create a RowMatrix from an RDD of vectors. +mat = RowMatrix(rows) + +# Get its size. +m = mat.numRows() # 4 +n = mat.numCols() # 3 + +# Get the rows as an RDD of vectors again. +rowsRDD = mat.rows +{% endhighlight %} +</div> + </div> ### IndexedRowMatrix An `IndexedRowMatrix` is similar to a `RowMatrix` but with meaningful row indices. It is backed by -an RDD of indexed rows, so that each row is represented by its index (long-typed) and a local vector. +an RDD of indexed rows, so that each row is represented by its index (long-typed) and a local +vector. <div class="codetabs"> <div data-lang="scala" markdown="1"> @@ -431,7 +456,48 @@ long n = mat.numCols(); // Drop its row indices. RowMatrix rowMat = mat.toRowMatrix(); {% endhighlight %} -</div></div> +</div> + +<div data-lang="python" markdown="1"> + +An [`IndexedRowMatrix`](api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.IndexedRowMatrix) +can be created from an `RDD` of `IndexedRow`s, where +[`IndexedRow`](api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.IndexedRow) is a +wrapper over `(long, vector)`. An `IndexedRowMatrix` can be converted to a `RowMatrix` by dropping +its row indices. + +{% highlight python %} +from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix + +# Create an RDD of indexed rows. +# - This can be done explicitly with the IndexedRow class: +indexedRows = sc.parallelize([IndexedRow(0, [1, 2, 3]), + IndexedRow(1, [4, 5, 6]), + IndexedRow(2, [7, 8, 9]), + IndexedRow(3, [10, 11, 12])]) +# - or by using (long, vector) tuples: +indexedRows = sc.parallelize([(0, [1, 2, 3]), (1, [4, 5, 6]), + (2, [7, 8, 9]), (3, [10, 11, 12])]) + +# Create an IndexedRowMatrix from an RDD of IndexedRows. +mat = IndexedRowMatrix(indexedRows) + +# Get its size. +m = mat.numRows() # 4 +n = mat.numCols() # 3 + +# Get the rows as an RDD of IndexedRows. +rowsRDD = mat.rows + +# Convert to a RowMatrix by dropping the row indices. +rowMat = mat.toRowMatrix() + +# Convert to a CoordinateMatrix. +coordinateMat = mat.toCoordinateMatrix() +{% endhighlight %} +</div> + +</div> ### CoordinateMatrix @@ -495,6 +561,42 @@ long n = mat.numCols(); IndexedRowMatrix indexedRowMatrix = mat.toIndexedRowMatrix(); {% endhighlight %} </div> + +<div data-lang="python" markdown="1"> + +A [`CoordinateMatrix`](api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.CoordinateMatrix) +can be created from an `RDD` of `MatrixEntry` entries, where +[`MatrixEntry`](api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.MatrixEntry) is a +wrapper over `(long, long, float)`. A `CoordinateMatrix` can be converted to a `RowMatrix` by +calling `toRowMatrix`, or to an `IndexedRowMatrix` with sparse rows by calling `toIndexedRowMatrix`. + +{% highlight python %} +from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry + +# Create an RDD of coordinate entries. +# - This can be done explicitly with the MatrixEntry class: +entries = sc.parallelize([MatrixEntry(0, 0, 1.2), MatrixEntry(1, 0, 2.1), MatrixEntry(6, 1, 3.7)]) +# - or using (long, long, float) tuples: +entries = sc.parallelize([(0, 0, 1.2), (1, 0, 2.1), (2, 1, 3.7)]) + +# Create an CoordinateMatrix from an RDD of MatrixEntries. +mat = CoordinateMatrix(entries) + +# Get its size. +m = mat.numRows() # 3 +n = mat.numCols() # 2 + +# Get the entries as an RDD of MatrixEntries. +entriesRDD = mat.entries + +# Convert to a RowMatrix. +rowMat = mat.toRowMatrix() + +# Convert to an IndexedRowMatrix. +indexedRowMat = mat.toIndexedRowMatrix() +{% endhighlight %} +</div> + </div> ### BlockMatrix diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 6f080d32bb..d2b3fae381 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -37,6 +37,7 @@ import org.apache.spark.mllib.evaluation.RankingMetrics import org.apache.spark.mllib.feature._ import org.apache.spark.mllib.fpm.{FPGrowth, FPGrowthModel} import org.apache.spark.mllib.linalg._ +import org.apache.spark.mllib.linalg.distributed._ import org.apache.spark.mllib.optimization._ import org.apache.spark.mllib.random.{RandomRDDs => RG} import org.apache.spark.mllib.recommendation._ @@ -54,7 +55,7 @@ import org.apache.spark.mllib.tree.{DecisionTree, GradientBoostedTrees, RandomFo import org.apache.spark.mllib.util.MLUtils import org.apache.spark.mllib.util.LinearDataGenerator import org.apache.spark.rdd.RDD -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.{DataFrame, Row, SQLContext} import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils @@ -1096,6 +1097,56 @@ private[python] class PythonMLLibAPI extends Serializable { Statistics.kolmogorovSmirnovTest(data, distName, paramsSeq: _*) } + /** + * Wrapper around RowMatrix constructor. + */ + def createRowMatrix(rows: JavaRDD[Vector], numRows: Long, numCols: Int): RowMatrix = { + new RowMatrix(rows.rdd, numRows, numCols) + } + + /** + * Wrapper around IndexedRowMatrix constructor. + */ + def createIndexedRowMatrix(rows: DataFrame, numRows: Long, numCols: Int): IndexedRowMatrix = { + // We use DataFrames for serialization of IndexedRows from Python, + // so map each Row in the DataFrame back to an IndexedRow. + val indexedRows = rows.map { + case Row(index: Long, vector: Vector) => IndexedRow(index, vector) + } + new IndexedRowMatrix(indexedRows, numRows, numCols) + } + + /** + * Wrapper around CoordinateMatrix constructor. + */ + def createCoordinateMatrix(rows: DataFrame, numRows: Long, numCols: Long): CoordinateMatrix = { + // We use DataFrames for serialization of MatrixEntry entries from + // Python, so map each Row in the DataFrame back to a MatrixEntry. + val entries = rows.map { + case Row(i: Long, j: Long, value: Double) => MatrixEntry(i, j, value) + } + new CoordinateMatrix(entries, numRows, numCols) + } + + /** + * Return the rows of an IndexedRowMatrix. + */ + def getIndexedRows(indexedRowMatrix: IndexedRowMatrix): DataFrame = { + // We use DataFrames for serialization of IndexedRows to Python, + // so return a DataFrame. + val sqlContext = new SQLContext(indexedRowMatrix.rows.sparkContext) + sqlContext.createDataFrame(indexedRowMatrix.rows) + } + + /** + * Return the entries of a CoordinateMatrix. + */ + def getMatrixEntries(coordinateMatrix: CoordinateMatrix): DataFrame = { + // We use DataFrames for serialization of MatrixEntry entries to + // Python, so return a DataFrame. + val sqlContext = new SQLContext(coordinateMatrix.entries.sparkContext) + sqlContext.createDataFrame(coordinateMatrix.entries) + } } /** diff --git a/python/docs/pyspark.mllib.rst b/python/docs/pyspark.mllib.rst index 26ece4c2c3..2d54ab118b 100644 --- a/python/docs/pyspark.mllib.rst +++ b/python/docs/pyspark.mllib.rst @@ -46,6 +46,14 @@ pyspark.mllib.linalg module :undoc-members: :show-inheritance: +pyspark.mllib.linalg.distributed module +--------------------------------------- + +.. automodule:: pyspark.mllib.linalg.distributed + :members: + :undoc-members: + :show-inheritance: + pyspark.mllib.random module --------------------------- diff --git a/python/pyspark/mllib/common.py b/python/pyspark/mllib/common.py index 855e85f571..a439a488de 100644 --- a/python/pyspark/mllib/common.py +++ b/python/pyspark/mllib/common.py @@ -73,6 +73,8 @@ def _py2java(sc, obj): """ Convert Python object into Java """ if isinstance(obj, RDD): obj = _to_java_object_rdd(obj) + elif isinstance(obj, DataFrame): + obj = obj._jdf elif isinstance(obj, SparkContext): obj = obj._jsc elif isinstance(obj, list): diff --git a/python/pyspark/mllib/linalg/distributed.py b/python/pyspark/mllib/linalg/distributed.py new file mode 100644 index 0000000000..666d833019 --- /dev/null +++ b/python/pyspark/mllib/linalg/distributed.py @@ -0,0 +1,537 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Package for distributed linear algebra. +""" + +import sys + +if sys.version >= '3': + long = int + +from py4j.java_gateway import JavaObject + +from pyspark import RDD +from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper +from pyspark.mllib.linalg import _convert_to_vector + + +__all__ = ['DistributedMatrix', 'RowMatrix', 'IndexedRow', + 'IndexedRowMatrix', 'MatrixEntry', 'CoordinateMatrix'] + + +class DistributedMatrix(object): + """ + .. note:: Experimental + + Represents a distributively stored matrix backed by one or + more RDDs. + + """ + def numRows(self): + """Get or compute the number of rows.""" + raise NotImplementedError + + def numCols(self): + """Get or compute the number of cols.""" + raise NotImplementedError + + +class RowMatrix(DistributedMatrix): + """ + .. note:: Experimental + + Represents a row-oriented distributed Matrix with no meaningful + row indices. + + :param rows: An RDD of vectors. + :param numRows: Number of rows in the matrix. A non-positive + value means unknown, at which point the number + of rows will be determined by the number of + records in the `rows` RDD. + :param numCols: Number of columns in the matrix. A non-positive + value means unknown, at which point the number + of columns will be determined by the size of + the first row. + """ + def __init__(self, rows, numRows=0, numCols=0): + """ + Note: This docstring is not shown publicly. + + Create a wrapper over a Java RowMatrix. + + Publicly, we require that `rows` be an RDD. However, for + internal usage, `rows` can also be a Java RowMatrix + object, in which case we can wrap it directly. This + assists in clean matrix conversions. + + >>> rows = sc.parallelize([[1, 2, 3], [4, 5, 6]]) + >>> mat = RowMatrix(rows) + + >>> mat_diff = RowMatrix(rows) + >>> (mat_diff._java_matrix_wrapper._java_model == + ... mat._java_matrix_wrapper._java_model) + False + + >>> mat_same = RowMatrix(mat._java_matrix_wrapper._java_model) + >>> (mat_same._java_matrix_wrapper._java_model == + ... mat._java_matrix_wrapper._java_model) + True + """ + if isinstance(rows, RDD): + rows = rows.map(_convert_to_vector) + java_matrix = callMLlibFunc("createRowMatrix", rows, long(numRows), int(numCols)) + elif (isinstance(rows, JavaObject) + and rows.getClass().getSimpleName() == "RowMatrix"): + java_matrix = rows + else: + raise TypeError("rows should be an RDD of vectors, got %s" % type(rows)) + + self._java_matrix_wrapper = JavaModelWrapper(java_matrix) + + @property + def rows(self): + """ + Rows of the RowMatrix stored as an RDD of vectors. + + >>> mat = RowMatrix(sc.parallelize([[1, 2, 3], [4, 5, 6]])) + >>> rows = mat.rows + >>> rows.first() + DenseVector([1.0, 2.0, 3.0]) + """ + return self._java_matrix_wrapper.call("rows") + + def numRows(self): + """ + Get or compute the number of rows. + + >>> rows = sc.parallelize([[1, 2, 3], [4, 5, 6], + ... [7, 8, 9], [10, 11, 12]]) + + >>> mat = RowMatrix(rows) + >>> print(mat.numRows()) + 4 + + >>> mat = RowMatrix(rows, 7, 6) + >>> print(mat.numRows()) + 7 + """ + return self._java_matrix_wrapper.call("numRows") + + def numCols(self): + """ + Get or compute the number of cols. + + >>> rows = sc.parallelize([[1, 2, 3], [4, 5, 6], + ... [7, 8, 9], [10, 11, 12]]) + + >>> mat = RowMatrix(rows) + >>> print(mat.numCols()) + 3 + + >>> mat = RowMatrix(rows, 7, 6) + >>> print(mat.numCols()) + 6 + """ + return self._java_matrix_wrapper.call("numCols") + + +class IndexedRow(object): + """ + .. note:: Experimental + + Represents a row of an IndexedRowMatrix. + + Just a wrapper over a (long, vector) tuple. + + :param index: The index for the given row. + :param vector: The row in the matrix at the given index. + """ + def __init__(self, index, vector): + self.index = long(index) + self.vector = _convert_to_vector(vector) + + def __repr__(self): + return "IndexedRow(%s, %s)" % (self.index, self.vector) + + +def _convert_to_indexed_row(row): + if isinstance(row, IndexedRow): + return row + elif isinstance(row, tuple) and len(row) == 2: + return IndexedRow(*row) + else: + raise TypeError("Cannot convert type %s into IndexedRow" % type(row)) + + +class IndexedRowMatrix(DistributedMatrix): + """ + .. note:: Experimental + + Represents a row-oriented distributed Matrix with indexed rows. + + :param rows: An RDD of IndexedRows or (long, vector) tuples. + :param numRows: Number of rows in the matrix. A non-positive + value means unknown, at which point the number + of rows will be determined by the max row + index plus one. + :param numCols: Number of columns in the matrix. A non-positive + value means unknown, at which point the number + of columns will be determined by the size of + the first row. + """ + def __init__(self, rows, numRows=0, numCols=0): + """ + Note: This docstring is not shown publicly. + + Create a wrapper over a Java IndexedRowMatrix. + + Publicly, we require that `rows` be an RDD. However, for + internal usage, `rows` can also be a Java IndexedRowMatrix + object, in which case we can wrap it directly. This + assists in clean matrix conversions. + + >>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]), + ... IndexedRow(1, [4, 5, 6])]) + >>> mat = IndexedRowMatrix(rows) + + >>> mat_diff = IndexedRowMatrix(rows) + >>> (mat_diff._java_matrix_wrapper._java_model == + ... mat._java_matrix_wrapper._java_model) + False + + >>> mat_same = IndexedRowMatrix(mat._java_matrix_wrapper._java_model) + >>> (mat_same._java_matrix_wrapper._java_model == + ... mat._java_matrix_wrapper._java_model) + True + """ + if isinstance(rows, RDD): + rows = rows.map(_convert_to_indexed_row) + # We use DataFrames for serialization of IndexedRows from + # Python, so first convert the RDD to a DataFrame on this + # side. This will convert each IndexedRow to a Row + # containing the 'index' and 'vector' values, which can + # both be easily serialized. We will convert back to + # IndexedRows on the Scala side. + java_matrix = callMLlibFunc("createIndexedRowMatrix", rows.toDF(), + long(numRows), int(numCols)) + elif (isinstance(rows, JavaObject) + and rows.getClass().getSimpleName() == "IndexedRowMatrix"): + java_matrix = rows + else: + raise TypeError("rows should be an RDD of IndexedRows or (long, vector) tuples, " + "got %s" % type(rows)) + + self._java_matrix_wrapper = JavaModelWrapper(java_matrix) + + @property + def rows(self): + """ + Rows of the IndexedRowMatrix stored as an RDD of IndexedRows. + + >>> mat = IndexedRowMatrix(sc.parallelize([IndexedRow(0, [1, 2, 3]), + ... IndexedRow(1, [4, 5, 6])])) + >>> rows = mat.rows + >>> rows.first() + IndexedRow(0, [1.0,2.0,3.0]) + """ + # We use DataFrames for serialization of IndexedRows from + # Java, so we first convert the RDD of rows to a DataFrame + # on the Scala/Java side. Then we map each Row in the + # DataFrame back to an IndexedRow on this side. + rows_df = callMLlibFunc("getIndexedRows", self._java_matrix_wrapper._java_model) + rows = rows_df.map(lambda row: IndexedRow(row[0], row[1])) + return rows + + def numRows(self): + """ + Get or compute the number of rows. + + >>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]), + ... IndexedRow(1, [4, 5, 6]), + ... IndexedRow(2, [7, 8, 9]), + ... IndexedRow(3, [10, 11, 12])]) + + >>> mat = IndexedRowMatrix(rows) + >>> print(mat.numRows()) + 4 + + >>> mat = IndexedRowMatrix(rows, 7, 6) + >>> print(mat.numRows()) + 7 + """ + return self._java_matrix_wrapper.call("numRows") + + def numCols(self): + """ + Get or compute the number of cols. + + >>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]), + ... IndexedRow(1, [4, 5, 6]), + ... IndexedRow(2, [7, 8, 9]), + ... IndexedRow(3, [10, 11, 12])]) + + >>> mat = IndexedRowMatrix(rows) + >>> print(mat.numCols()) + 3 + + >>> mat = IndexedRowMatrix(rows, 7, 6) + >>> print(mat.numCols()) + 6 + """ + return self._java_matrix_wrapper.call("numCols") + + def toRowMatrix(self): + """ + Convert this matrix to a RowMatrix. + + >>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]), + ... IndexedRow(6, [4, 5, 6])]) + >>> mat = IndexedRowMatrix(rows).toRowMatrix() + >>> mat.rows.collect() + [DenseVector([1.0, 2.0, 3.0]), DenseVector([4.0, 5.0, 6.0])] + """ + java_row_matrix = self._java_matrix_wrapper.call("toRowMatrix") + return RowMatrix(java_row_matrix) + + def toCoordinateMatrix(self): + """ + Convert this matrix to a CoordinateMatrix. + + >>> rows = sc.parallelize([IndexedRow(0, [1, 0]), + ... IndexedRow(6, [0, 5])]) + >>> mat = IndexedRowMatrix(rows).toCoordinateMatrix() + >>> mat.entries.take(3) + [MatrixEntry(0, 0, 1.0), MatrixEntry(0, 1, 0.0), MatrixEntry(6, 0, 0.0)] + """ + java_coordinate_matrix = self._java_matrix_wrapper.call("toCoordinateMatrix") + return CoordinateMatrix(java_coordinate_matrix) + + +class MatrixEntry(object): + """ + .. note:: Experimental + + Represents an entry of a CoordinateMatrix. + + Just a wrapper over a (long, long, float) tuple. + + :param i: The row index of the matrix. + :param j: The column index of the matrix. + :param value: The (i, j)th entry of the matrix, as a float. + """ + def __init__(self, i, j, value): + self.i = long(i) + self.j = long(j) + self.value = float(value) + + def __repr__(self): + return "MatrixEntry(%s, %s, %s)" % (self.i, self.j, self.value) + + +def _convert_to_matrix_entry(entry): + if isinstance(entry, MatrixEntry): + return entry + elif isinstance(entry, tuple) and len(entry) == 3: + return MatrixEntry(*entry) + else: + raise TypeError("Cannot convert type %s into MatrixEntry" % type(entry)) + + +class CoordinateMatrix(DistributedMatrix): + """ + .. note:: Experimental + + Represents a matrix in coordinate format. + + :param entries: An RDD of MatrixEntry inputs or + (long, long, float) tuples. + :param numRows: Number of rows in the matrix. A non-positive + value means unknown, at which point the number + of rows will be determined by the max row + index plus one. + :param numCols: Number of columns in the matrix. A non-positive + value means unknown, at which point the number + of columns will be determined by the max row + index plus one. + """ + def __init__(self, entries, numRows=0, numCols=0): + """ + Note: This docstring is not shown publicly. + + Create a wrapper over a Java CoordinateMatrix. + + Publicly, we require that `rows` be an RDD. However, for + internal usage, `rows` can also be a Java CoordinateMatrix + object, in which case we can wrap it directly. This + assists in clean matrix conversions. + + >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2), + ... MatrixEntry(6, 4, 2.1)]) + >>> mat = CoordinateMatrix(entries) + + >>> mat_diff = CoordinateMatrix(entries) + >>> (mat_diff._java_matrix_wrapper._java_model == + ... mat._java_matrix_wrapper._java_model) + False + + >>> mat_same = CoordinateMatrix(mat._java_matrix_wrapper._java_model) + >>> (mat_same._java_matrix_wrapper._java_model == + ... mat._java_matrix_wrapper._java_model) + True + """ + if isinstance(entries, RDD): + entries = entries.map(_convert_to_matrix_entry) + # We use DataFrames for serialization of MatrixEntry entries + # from Python, so first convert the RDD to a DataFrame on + # this side. This will convert each MatrixEntry to a Row + # containing the 'i', 'j', and 'value' values, which can + # each be easily serialized. We will convert back to + # MatrixEntry inputs on the Scala side. + java_matrix = callMLlibFunc("createCoordinateMatrix", entries.toDF(), + long(numRows), long(numCols)) + elif (isinstance(entries, JavaObject) + and entries.getClass().getSimpleName() == "CoordinateMatrix"): + java_matrix = entries + else: + raise TypeError("entries should be an RDD of MatrixEntry entries or " + "(long, long, float) tuples, got %s" % type(entries)) + + self._java_matrix_wrapper = JavaModelWrapper(java_matrix) + + @property + def entries(self): + """ + Entries of the CoordinateMatrix stored as an RDD of + MatrixEntries. + + >>> mat = CoordinateMatrix(sc.parallelize([MatrixEntry(0, 0, 1.2), + ... MatrixEntry(6, 4, 2.1)])) + >>> entries = mat.entries + >>> entries.first() + MatrixEntry(0, 0, 1.2) + """ + # We use DataFrames for serialization of MatrixEntry entries + # from Java, so we first convert the RDD of entries to a + # DataFrame on the Scala/Java side. Then we map each Row in + # the DataFrame back to a MatrixEntry on this side. + entries_df = callMLlibFunc("getMatrixEntries", self._java_matrix_wrapper._java_model) + entries = entries_df.map(lambda row: MatrixEntry(row[0], row[1], row[2])) + return entries + + def numRows(self): + """ + Get or compute the number of rows. + + >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2), + ... MatrixEntry(1, 0, 2), + ... MatrixEntry(2, 1, 3.7)]) + + >>> mat = CoordinateMatrix(entries) + >>> print(mat.numRows()) + 3 + + >>> mat = CoordinateMatrix(entries, 7, 6) + >>> print(mat.numRows()) + 7 + """ + return self._java_matrix_wrapper.call("numRows") + + def numCols(self): + """ + Get or compute the number of cols. + + >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2), + ... MatrixEntry(1, 0, 2), + ... MatrixEntry(2, 1, 3.7)]) + + >>> mat = CoordinateMatrix(entries) + >>> print(mat.numCols()) + 2 + + >>> mat = CoordinateMatrix(entries, 7, 6) + >>> print(mat.numCols()) + 6 + """ + return self._java_matrix_wrapper.call("numCols") + + def toRowMatrix(self): + """ + Convert this matrix to a RowMatrix. + + >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2), + ... MatrixEntry(6, 4, 2.1)]) + + >>> # This CoordinateMatrix will have 7 effective rows, due to + >>> # the highest row index being 6, but the ensuing RowMatrix + >>> # will only have 2 rows since there are only entries on 2 + >>> # unique rows. + >>> mat = CoordinateMatrix(entries).toRowMatrix() + >>> print(mat.numRows()) + 2 + + >>> # This CoordinateMatrix will have 5 columns, due to the + >>> # highest column index being 4, and the ensuing RowMatrix + >>> # will have 5 columns as well. + >>> mat = CoordinateMatrix(entries).toRowMatrix() + >>> print(mat.numCols()) + 5 + """ + java_row_matrix = self._java_matrix_wrapper.call("toRowMatrix") + return RowMatrix(java_row_matrix) + + def toIndexedRowMatrix(self): + """ + Convert this matrix to an IndexedRowMatrix. + + >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2), + ... MatrixEntry(6, 4, 2.1)]) + + >>> # This CoordinateMatrix will have 7 effective rows, due to + >>> # the highest row index being 6, and the ensuing + >>> # IndexedRowMatrix will have 7 rows as well. + >>> mat = CoordinateMatrix(entries).toIndexedRowMatrix() + >>> print(mat.numRows()) + 7 + + >>> # This CoordinateMatrix will have 5 columns, due to the + >>> # highest column index being 4, and the ensuing + >>> # IndexedRowMatrix will have 5 columns as well. + >>> mat = CoordinateMatrix(entries).toIndexedRowMatrix() + >>> print(mat.numCols()) + 5 + """ + java_indexed_row_matrix = self._java_matrix_wrapper.call("toIndexedRowMatrix") + return IndexedRowMatrix(java_indexed_row_matrix) + + +def _test(): + import doctest + from pyspark import SparkContext + from pyspark.sql import SQLContext + import pyspark.mllib.linalg.distributed + globs = pyspark.mllib.linalg.distributed.__dict__.copy() + globs['sc'] = SparkContext('local[2]', 'PythonTest', batchSize=2) + globs['sqlContext'] = SQLContext(globs['sc']) + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) + globs['sc'].stop() + if failure_count: + exit(-1) + +if __name__ == "__main__": + _test() |