aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--dev/sparktestsupport/modules.py1
-rw-r--r--docs/mllib-data-types.md106
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala53
-rw-r--r--python/docs/pyspark.mllib.rst8
-rw-r--r--python/pyspark/mllib/common.py2
-rw-r--r--python/pyspark/mllib/linalg/distributed.py537
6 files changed, 704 insertions, 3 deletions
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 956dc81b62..a9717ff956 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -331,6 +331,7 @@ pyspark_mllib = Module(
"pyspark.mllib.feature",
"pyspark.mllib.fpm",
"pyspark.mllib.linalg.__init__",
+ "pyspark.mllib.linalg.distributed",
"pyspark.mllib.random",
"pyspark.mllib.recommendation",
"pyspark.mllib.regression",
diff --git a/docs/mllib-data-types.md b/docs/mllib-data-types.md
index 3aa040046f..11033bf4f9 100644
--- a/docs/mllib-data-types.md
+++ b/docs/mllib-data-types.md
@@ -372,12 +372,37 @@ long m = mat.numRows();
long n = mat.numCols();
{% endhighlight %}
</div>
+
+<div data-lang="python" markdown="1">
+
+A [`RowMatrix`](api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.RowMatrix) can be
+created from an `RDD` of vectors.
+
+{% highlight python %}
+from pyspark.mllib.linalg.distributed import RowMatrix
+
+# Create an RDD of vectors.
+rows = sc.parallelize([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]])
+
+# Create a RowMatrix from an RDD of vectors.
+mat = RowMatrix(rows)
+
+# Get its size.
+m = mat.numRows() # 4
+n = mat.numCols() # 3
+
+# Get the rows as an RDD of vectors again.
+rowsRDD = mat.rows
+{% endhighlight %}
+</div>
+
</div>
### IndexedRowMatrix
An `IndexedRowMatrix` is similar to a `RowMatrix` but with meaningful row indices. It is backed by
-an RDD of indexed rows, so that each row is represented by its index (long-typed) and a local vector.
+an RDD of indexed rows, so that each row is represented by its index (long-typed) and a local
+vector.
<div class="codetabs">
<div data-lang="scala" markdown="1">
@@ -431,7 +456,48 @@ long n = mat.numCols();
// Drop its row indices.
RowMatrix rowMat = mat.toRowMatrix();
{% endhighlight %}
-</div></div>
+</div>
+
+<div data-lang="python" markdown="1">
+
+An [`IndexedRowMatrix`](api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.IndexedRowMatrix)
+can be created from an `RDD` of `IndexedRow`s, where
+[`IndexedRow`](api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.IndexedRow) is a
+wrapper over `(long, vector)`. An `IndexedRowMatrix` can be converted to a `RowMatrix` by dropping
+its row indices.
+
+{% highlight python %}
+from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
+
+# Create an RDD of indexed rows.
+# - This can be done explicitly with the IndexedRow class:
+indexedRows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
+ IndexedRow(1, [4, 5, 6]),
+ IndexedRow(2, [7, 8, 9]),
+ IndexedRow(3, [10, 11, 12])])
+# - or by using (long, vector) tuples:
+indexedRows = sc.parallelize([(0, [1, 2, 3]), (1, [4, 5, 6]),
+ (2, [7, 8, 9]), (3, [10, 11, 12])])
+
+# Create an IndexedRowMatrix from an RDD of IndexedRows.
+mat = IndexedRowMatrix(indexedRows)
+
+# Get its size.
+m = mat.numRows() # 4
+n = mat.numCols() # 3
+
+# Get the rows as an RDD of IndexedRows.
+rowsRDD = mat.rows
+
+# Convert to a RowMatrix by dropping the row indices.
+rowMat = mat.toRowMatrix()
+
+# Convert to a CoordinateMatrix.
+coordinateMat = mat.toCoordinateMatrix()
+{% endhighlight %}
+</div>
+
+</div>
### CoordinateMatrix
@@ -495,6 +561,42 @@ long n = mat.numCols();
IndexedRowMatrix indexedRowMatrix = mat.toIndexedRowMatrix();
{% endhighlight %}
</div>
+
+<div data-lang="python" markdown="1">
+
+A [`CoordinateMatrix`](api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.CoordinateMatrix)
+can be created from an `RDD` of `MatrixEntry` entries, where
+[`MatrixEntry`](api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.MatrixEntry) is a
+wrapper over `(long, long, float)`. A `CoordinateMatrix` can be converted to a `RowMatrix` by
+calling `toRowMatrix`, or to an `IndexedRowMatrix` with sparse rows by calling `toIndexedRowMatrix`.
+
+{% highlight python %}
+from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry
+
+# Create an RDD of coordinate entries.
+# - This can be done explicitly with the MatrixEntry class:
+entries = sc.parallelize([MatrixEntry(0, 0, 1.2), MatrixEntry(1, 0, 2.1), MatrixEntry(6, 1, 3.7)])
+# - or using (long, long, float) tuples:
+entries = sc.parallelize([(0, 0, 1.2), (1, 0, 2.1), (2, 1, 3.7)])
+
+# Create an CoordinateMatrix from an RDD of MatrixEntries.
+mat = CoordinateMatrix(entries)
+
+# Get its size.
+m = mat.numRows() # 3
+n = mat.numCols() # 2
+
+# Get the entries as an RDD of MatrixEntries.
+entriesRDD = mat.entries
+
+# Convert to a RowMatrix.
+rowMat = mat.toRowMatrix()
+
+# Convert to an IndexedRowMatrix.
+indexedRowMat = mat.toIndexedRowMatrix()
+{% endhighlight %}
+</div>
+
</div>
### BlockMatrix
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index 6f080d32bb..d2b3fae381 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -37,6 +37,7 @@ import org.apache.spark.mllib.evaluation.RankingMetrics
import org.apache.spark.mllib.feature._
import org.apache.spark.mllib.fpm.{FPGrowth, FPGrowthModel}
import org.apache.spark.mllib.linalg._
+import org.apache.spark.mllib.linalg.distributed._
import org.apache.spark.mllib.optimization._
import org.apache.spark.mllib.random.{RandomRDDs => RG}
import org.apache.spark.mllib.recommendation._
@@ -54,7 +55,7 @@ import org.apache.spark.mllib.tree.{DecisionTree, GradientBoostedTrees, RandomFo
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.util.LinearDataGenerator
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
@@ -1096,6 +1097,56 @@ private[python] class PythonMLLibAPI extends Serializable {
Statistics.kolmogorovSmirnovTest(data, distName, paramsSeq: _*)
}
+ /**
+ * Wrapper around RowMatrix constructor.
+ */
+ def createRowMatrix(rows: JavaRDD[Vector], numRows: Long, numCols: Int): RowMatrix = {
+ new RowMatrix(rows.rdd, numRows, numCols)
+ }
+
+ /**
+ * Wrapper around IndexedRowMatrix constructor.
+ */
+ def createIndexedRowMatrix(rows: DataFrame, numRows: Long, numCols: Int): IndexedRowMatrix = {
+ // We use DataFrames for serialization of IndexedRows from Python,
+ // so map each Row in the DataFrame back to an IndexedRow.
+ val indexedRows = rows.map {
+ case Row(index: Long, vector: Vector) => IndexedRow(index, vector)
+ }
+ new IndexedRowMatrix(indexedRows, numRows, numCols)
+ }
+
+ /**
+ * Wrapper around CoordinateMatrix constructor.
+ */
+ def createCoordinateMatrix(rows: DataFrame, numRows: Long, numCols: Long): CoordinateMatrix = {
+ // We use DataFrames for serialization of MatrixEntry entries from
+ // Python, so map each Row in the DataFrame back to a MatrixEntry.
+ val entries = rows.map {
+ case Row(i: Long, j: Long, value: Double) => MatrixEntry(i, j, value)
+ }
+ new CoordinateMatrix(entries, numRows, numCols)
+ }
+
+ /**
+ * Return the rows of an IndexedRowMatrix.
+ */
+ def getIndexedRows(indexedRowMatrix: IndexedRowMatrix): DataFrame = {
+ // We use DataFrames for serialization of IndexedRows to Python,
+ // so return a DataFrame.
+ val sqlContext = new SQLContext(indexedRowMatrix.rows.sparkContext)
+ sqlContext.createDataFrame(indexedRowMatrix.rows)
+ }
+
+ /**
+ * Return the entries of a CoordinateMatrix.
+ */
+ def getMatrixEntries(coordinateMatrix: CoordinateMatrix): DataFrame = {
+ // We use DataFrames for serialization of MatrixEntry entries to
+ // Python, so return a DataFrame.
+ val sqlContext = new SQLContext(coordinateMatrix.entries.sparkContext)
+ sqlContext.createDataFrame(coordinateMatrix.entries)
+ }
}
/**
diff --git a/python/docs/pyspark.mllib.rst b/python/docs/pyspark.mllib.rst
index 26ece4c2c3..2d54ab118b 100644
--- a/python/docs/pyspark.mllib.rst
+++ b/python/docs/pyspark.mllib.rst
@@ -46,6 +46,14 @@ pyspark.mllib.linalg module
:undoc-members:
:show-inheritance:
+pyspark.mllib.linalg.distributed module
+---------------------------------------
+
+.. automodule:: pyspark.mllib.linalg.distributed
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
pyspark.mllib.random module
---------------------------
diff --git a/python/pyspark/mllib/common.py b/python/pyspark/mllib/common.py
index 855e85f571..a439a488de 100644
--- a/python/pyspark/mllib/common.py
+++ b/python/pyspark/mllib/common.py
@@ -73,6 +73,8 @@ def _py2java(sc, obj):
""" Convert Python object into Java """
if isinstance(obj, RDD):
obj = _to_java_object_rdd(obj)
+ elif isinstance(obj, DataFrame):
+ obj = obj._jdf
elif isinstance(obj, SparkContext):
obj = obj._jsc
elif isinstance(obj, list):
diff --git a/python/pyspark/mllib/linalg/distributed.py b/python/pyspark/mllib/linalg/distributed.py
new file mode 100644
index 0000000000..666d833019
--- /dev/null
+++ b/python/pyspark/mllib/linalg/distributed.py
@@ -0,0 +1,537 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Package for distributed linear algebra.
+"""
+
+import sys
+
+if sys.version >= '3':
+ long = int
+
+from py4j.java_gateway import JavaObject
+
+from pyspark import RDD
+from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
+from pyspark.mllib.linalg import _convert_to_vector
+
+
+__all__ = ['DistributedMatrix', 'RowMatrix', 'IndexedRow',
+ 'IndexedRowMatrix', 'MatrixEntry', 'CoordinateMatrix']
+
+
+class DistributedMatrix(object):
+ """
+ .. note:: Experimental
+
+ Represents a distributively stored matrix backed by one or
+ more RDDs.
+
+ """
+ def numRows(self):
+ """Get or compute the number of rows."""
+ raise NotImplementedError
+
+ def numCols(self):
+ """Get or compute the number of cols."""
+ raise NotImplementedError
+
+
+class RowMatrix(DistributedMatrix):
+ """
+ .. note:: Experimental
+
+ Represents a row-oriented distributed Matrix with no meaningful
+ row indices.
+
+ :param rows: An RDD of vectors.
+ :param numRows: Number of rows in the matrix. A non-positive
+ value means unknown, at which point the number
+ of rows will be determined by the number of
+ records in the `rows` RDD.
+ :param numCols: Number of columns in the matrix. A non-positive
+ value means unknown, at which point the number
+ of columns will be determined by the size of
+ the first row.
+ """
+ def __init__(self, rows, numRows=0, numCols=0):
+ """
+ Note: This docstring is not shown publicly.
+
+ Create a wrapper over a Java RowMatrix.
+
+ Publicly, we require that `rows` be an RDD. However, for
+ internal usage, `rows` can also be a Java RowMatrix
+ object, in which case we can wrap it directly. This
+ assists in clean matrix conversions.
+
+ >>> rows = sc.parallelize([[1, 2, 3], [4, 5, 6]])
+ >>> mat = RowMatrix(rows)
+
+ >>> mat_diff = RowMatrix(rows)
+ >>> (mat_diff._java_matrix_wrapper._java_model ==
+ ... mat._java_matrix_wrapper._java_model)
+ False
+
+ >>> mat_same = RowMatrix(mat._java_matrix_wrapper._java_model)
+ >>> (mat_same._java_matrix_wrapper._java_model ==
+ ... mat._java_matrix_wrapper._java_model)
+ True
+ """
+ if isinstance(rows, RDD):
+ rows = rows.map(_convert_to_vector)
+ java_matrix = callMLlibFunc("createRowMatrix", rows, long(numRows), int(numCols))
+ elif (isinstance(rows, JavaObject)
+ and rows.getClass().getSimpleName() == "RowMatrix"):
+ java_matrix = rows
+ else:
+ raise TypeError("rows should be an RDD of vectors, got %s" % type(rows))
+
+ self._java_matrix_wrapper = JavaModelWrapper(java_matrix)
+
+ @property
+ def rows(self):
+ """
+ Rows of the RowMatrix stored as an RDD of vectors.
+
+ >>> mat = RowMatrix(sc.parallelize([[1, 2, 3], [4, 5, 6]]))
+ >>> rows = mat.rows
+ >>> rows.first()
+ DenseVector([1.0, 2.0, 3.0])
+ """
+ return self._java_matrix_wrapper.call("rows")
+
+ def numRows(self):
+ """
+ Get or compute the number of rows.
+
+ >>> rows = sc.parallelize([[1, 2, 3], [4, 5, 6],
+ ... [7, 8, 9], [10, 11, 12]])
+
+ >>> mat = RowMatrix(rows)
+ >>> print(mat.numRows())
+ 4
+
+ >>> mat = RowMatrix(rows, 7, 6)
+ >>> print(mat.numRows())
+ 7
+ """
+ return self._java_matrix_wrapper.call("numRows")
+
+ def numCols(self):
+ """
+ Get or compute the number of cols.
+
+ >>> rows = sc.parallelize([[1, 2, 3], [4, 5, 6],
+ ... [7, 8, 9], [10, 11, 12]])
+
+ >>> mat = RowMatrix(rows)
+ >>> print(mat.numCols())
+ 3
+
+ >>> mat = RowMatrix(rows, 7, 6)
+ >>> print(mat.numCols())
+ 6
+ """
+ return self._java_matrix_wrapper.call("numCols")
+
+
+class IndexedRow(object):
+ """
+ .. note:: Experimental
+
+ Represents a row of an IndexedRowMatrix.
+
+ Just a wrapper over a (long, vector) tuple.
+
+ :param index: The index for the given row.
+ :param vector: The row in the matrix at the given index.
+ """
+ def __init__(self, index, vector):
+ self.index = long(index)
+ self.vector = _convert_to_vector(vector)
+
+ def __repr__(self):
+ return "IndexedRow(%s, %s)" % (self.index, self.vector)
+
+
+def _convert_to_indexed_row(row):
+ if isinstance(row, IndexedRow):
+ return row
+ elif isinstance(row, tuple) and len(row) == 2:
+ return IndexedRow(*row)
+ else:
+ raise TypeError("Cannot convert type %s into IndexedRow" % type(row))
+
+
+class IndexedRowMatrix(DistributedMatrix):
+ """
+ .. note:: Experimental
+
+ Represents a row-oriented distributed Matrix with indexed rows.
+
+ :param rows: An RDD of IndexedRows or (long, vector) tuples.
+ :param numRows: Number of rows in the matrix. A non-positive
+ value means unknown, at which point the number
+ of rows will be determined by the max row
+ index plus one.
+ :param numCols: Number of columns in the matrix. A non-positive
+ value means unknown, at which point the number
+ of columns will be determined by the size of
+ the first row.
+ """
+ def __init__(self, rows, numRows=0, numCols=0):
+ """
+ Note: This docstring is not shown publicly.
+
+ Create a wrapper over a Java IndexedRowMatrix.
+
+ Publicly, we require that `rows` be an RDD. However, for
+ internal usage, `rows` can also be a Java IndexedRowMatrix
+ object, in which case we can wrap it directly. This
+ assists in clean matrix conversions.
+
+ >>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
+ ... IndexedRow(1, [4, 5, 6])])
+ >>> mat = IndexedRowMatrix(rows)
+
+ >>> mat_diff = IndexedRowMatrix(rows)
+ >>> (mat_diff._java_matrix_wrapper._java_model ==
+ ... mat._java_matrix_wrapper._java_model)
+ False
+
+ >>> mat_same = IndexedRowMatrix(mat._java_matrix_wrapper._java_model)
+ >>> (mat_same._java_matrix_wrapper._java_model ==
+ ... mat._java_matrix_wrapper._java_model)
+ True
+ """
+ if isinstance(rows, RDD):
+ rows = rows.map(_convert_to_indexed_row)
+ # We use DataFrames for serialization of IndexedRows from
+ # Python, so first convert the RDD to a DataFrame on this
+ # side. This will convert each IndexedRow to a Row
+ # containing the 'index' and 'vector' values, which can
+ # both be easily serialized. We will convert back to
+ # IndexedRows on the Scala side.
+ java_matrix = callMLlibFunc("createIndexedRowMatrix", rows.toDF(),
+ long(numRows), int(numCols))
+ elif (isinstance(rows, JavaObject)
+ and rows.getClass().getSimpleName() == "IndexedRowMatrix"):
+ java_matrix = rows
+ else:
+ raise TypeError("rows should be an RDD of IndexedRows or (long, vector) tuples, "
+ "got %s" % type(rows))
+
+ self._java_matrix_wrapper = JavaModelWrapper(java_matrix)
+
+ @property
+ def rows(self):
+ """
+ Rows of the IndexedRowMatrix stored as an RDD of IndexedRows.
+
+ >>> mat = IndexedRowMatrix(sc.parallelize([IndexedRow(0, [1, 2, 3]),
+ ... IndexedRow(1, [4, 5, 6])]))
+ >>> rows = mat.rows
+ >>> rows.first()
+ IndexedRow(0, [1.0,2.0,3.0])
+ """
+ # We use DataFrames for serialization of IndexedRows from
+ # Java, so we first convert the RDD of rows to a DataFrame
+ # on the Scala/Java side. Then we map each Row in the
+ # DataFrame back to an IndexedRow on this side.
+ rows_df = callMLlibFunc("getIndexedRows", self._java_matrix_wrapper._java_model)
+ rows = rows_df.map(lambda row: IndexedRow(row[0], row[1]))
+ return rows
+
+ def numRows(self):
+ """
+ Get or compute the number of rows.
+
+ >>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
+ ... IndexedRow(1, [4, 5, 6]),
+ ... IndexedRow(2, [7, 8, 9]),
+ ... IndexedRow(3, [10, 11, 12])])
+
+ >>> mat = IndexedRowMatrix(rows)
+ >>> print(mat.numRows())
+ 4
+
+ >>> mat = IndexedRowMatrix(rows, 7, 6)
+ >>> print(mat.numRows())
+ 7
+ """
+ return self._java_matrix_wrapper.call("numRows")
+
+ def numCols(self):
+ """
+ Get or compute the number of cols.
+
+ >>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
+ ... IndexedRow(1, [4, 5, 6]),
+ ... IndexedRow(2, [7, 8, 9]),
+ ... IndexedRow(3, [10, 11, 12])])
+
+ >>> mat = IndexedRowMatrix(rows)
+ >>> print(mat.numCols())
+ 3
+
+ >>> mat = IndexedRowMatrix(rows, 7, 6)
+ >>> print(mat.numCols())
+ 6
+ """
+ return self._java_matrix_wrapper.call("numCols")
+
+ def toRowMatrix(self):
+ """
+ Convert this matrix to a RowMatrix.
+
+ >>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
+ ... IndexedRow(6, [4, 5, 6])])
+ >>> mat = IndexedRowMatrix(rows).toRowMatrix()
+ >>> mat.rows.collect()
+ [DenseVector([1.0, 2.0, 3.0]), DenseVector([4.0, 5.0, 6.0])]
+ """
+ java_row_matrix = self._java_matrix_wrapper.call("toRowMatrix")
+ return RowMatrix(java_row_matrix)
+
+ def toCoordinateMatrix(self):
+ """
+ Convert this matrix to a CoordinateMatrix.
+
+ >>> rows = sc.parallelize([IndexedRow(0, [1, 0]),
+ ... IndexedRow(6, [0, 5])])
+ >>> mat = IndexedRowMatrix(rows).toCoordinateMatrix()
+ >>> mat.entries.take(3)
+ [MatrixEntry(0, 0, 1.0), MatrixEntry(0, 1, 0.0), MatrixEntry(6, 0, 0.0)]
+ """
+ java_coordinate_matrix = self._java_matrix_wrapper.call("toCoordinateMatrix")
+ return CoordinateMatrix(java_coordinate_matrix)
+
+
+class MatrixEntry(object):
+ """
+ .. note:: Experimental
+
+ Represents an entry of a CoordinateMatrix.
+
+ Just a wrapper over a (long, long, float) tuple.
+
+ :param i: The row index of the matrix.
+ :param j: The column index of the matrix.
+ :param value: The (i, j)th entry of the matrix, as a float.
+ """
+ def __init__(self, i, j, value):
+ self.i = long(i)
+ self.j = long(j)
+ self.value = float(value)
+
+ def __repr__(self):
+ return "MatrixEntry(%s, %s, %s)" % (self.i, self.j, self.value)
+
+
+def _convert_to_matrix_entry(entry):
+ if isinstance(entry, MatrixEntry):
+ return entry
+ elif isinstance(entry, tuple) and len(entry) == 3:
+ return MatrixEntry(*entry)
+ else:
+ raise TypeError("Cannot convert type %s into MatrixEntry" % type(entry))
+
+
+class CoordinateMatrix(DistributedMatrix):
+ """
+ .. note:: Experimental
+
+ Represents a matrix in coordinate format.
+
+ :param entries: An RDD of MatrixEntry inputs or
+ (long, long, float) tuples.
+ :param numRows: Number of rows in the matrix. A non-positive
+ value means unknown, at which point the number
+ of rows will be determined by the max row
+ index plus one.
+ :param numCols: Number of columns in the matrix. A non-positive
+ value means unknown, at which point the number
+ of columns will be determined by the max row
+ index plus one.
+ """
+ def __init__(self, entries, numRows=0, numCols=0):
+ """
+ Note: This docstring is not shown publicly.
+
+ Create a wrapper over a Java CoordinateMatrix.
+
+ Publicly, we require that `rows` be an RDD. However, for
+ internal usage, `rows` can also be a Java CoordinateMatrix
+ object, in which case we can wrap it directly. This
+ assists in clean matrix conversions.
+
+ >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
+ ... MatrixEntry(6, 4, 2.1)])
+ >>> mat = CoordinateMatrix(entries)
+
+ >>> mat_diff = CoordinateMatrix(entries)
+ >>> (mat_diff._java_matrix_wrapper._java_model ==
+ ... mat._java_matrix_wrapper._java_model)
+ False
+
+ >>> mat_same = CoordinateMatrix(mat._java_matrix_wrapper._java_model)
+ >>> (mat_same._java_matrix_wrapper._java_model ==
+ ... mat._java_matrix_wrapper._java_model)
+ True
+ """
+ if isinstance(entries, RDD):
+ entries = entries.map(_convert_to_matrix_entry)
+ # We use DataFrames for serialization of MatrixEntry entries
+ # from Python, so first convert the RDD to a DataFrame on
+ # this side. This will convert each MatrixEntry to a Row
+ # containing the 'i', 'j', and 'value' values, which can
+ # each be easily serialized. We will convert back to
+ # MatrixEntry inputs on the Scala side.
+ java_matrix = callMLlibFunc("createCoordinateMatrix", entries.toDF(),
+ long(numRows), long(numCols))
+ elif (isinstance(entries, JavaObject)
+ and entries.getClass().getSimpleName() == "CoordinateMatrix"):
+ java_matrix = entries
+ else:
+ raise TypeError("entries should be an RDD of MatrixEntry entries or "
+ "(long, long, float) tuples, got %s" % type(entries))
+
+ self._java_matrix_wrapper = JavaModelWrapper(java_matrix)
+
+ @property
+ def entries(self):
+ """
+ Entries of the CoordinateMatrix stored as an RDD of
+ MatrixEntries.
+
+ >>> mat = CoordinateMatrix(sc.parallelize([MatrixEntry(0, 0, 1.2),
+ ... MatrixEntry(6, 4, 2.1)]))
+ >>> entries = mat.entries
+ >>> entries.first()
+ MatrixEntry(0, 0, 1.2)
+ """
+ # We use DataFrames for serialization of MatrixEntry entries
+ # from Java, so we first convert the RDD of entries to a
+ # DataFrame on the Scala/Java side. Then we map each Row in
+ # the DataFrame back to a MatrixEntry on this side.
+ entries_df = callMLlibFunc("getMatrixEntries", self._java_matrix_wrapper._java_model)
+ entries = entries_df.map(lambda row: MatrixEntry(row[0], row[1], row[2]))
+ return entries
+
+ def numRows(self):
+ """
+ Get or compute the number of rows.
+
+ >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
+ ... MatrixEntry(1, 0, 2),
+ ... MatrixEntry(2, 1, 3.7)])
+
+ >>> mat = CoordinateMatrix(entries)
+ >>> print(mat.numRows())
+ 3
+
+ >>> mat = CoordinateMatrix(entries, 7, 6)
+ >>> print(mat.numRows())
+ 7
+ """
+ return self._java_matrix_wrapper.call("numRows")
+
+ def numCols(self):
+ """
+ Get or compute the number of cols.
+
+ >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
+ ... MatrixEntry(1, 0, 2),
+ ... MatrixEntry(2, 1, 3.7)])
+
+ >>> mat = CoordinateMatrix(entries)
+ >>> print(mat.numCols())
+ 2
+
+ >>> mat = CoordinateMatrix(entries, 7, 6)
+ >>> print(mat.numCols())
+ 6
+ """
+ return self._java_matrix_wrapper.call("numCols")
+
+ def toRowMatrix(self):
+ """
+ Convert this matrix to a RowMatrix.
+
+ >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
+ ... MatrixEntry(6, 4, 2.1)])
+
+ >>> # This CoordinateMatrix will have 7 effective rows, due to
+ >>> # the highest row index being 6, but the ensuing RowMatrix
+ >>> # will only have 2 rows since there are only entries on 2
+ >>> # unique rows.
+ >>> mat = CoordinateMatrix(entries).toRowMatrix()
+ >>> print(mat.numRows())
+ 2
+
+ >>> # This CoordinateMatrix will have 5 columns, due to the
+ >>> # highest column index being 4, and the ensuing RowMatrix
+ >>> # will have 5 columns as well.
+ >>> mat = CoordinateMatrix(entries).toRowMatrix()
+ >>> print(mat.numCols())
+ 5
+ """
+ java_row_matrix = self._java_matrix_wrapper.call("toRowMatrix")
+ return RowMatrix(java_row_matrix)
+
+ def toIndexedRowMatrix(self):
+ """
+ Convert this matrix to an IndexedRowMatrix.
+
+ >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
+ ... MatrixEntry(6, 4, 2.1)])
+
+ >>> # This CoordinateMatrix will have 7 effective rows, due to
+ >>> # the highest row index being 6, and the ensuing
+ >>> # IndexedRowMatrix will have 7 rows as well.
+ >>> mat = CoordinateMatrix(entries).toIndexedRowMatrix()
+ >>> print(mat.numRows())
+ 7
+
+ >>> # This CoordinateMatrix will have 5 columns, due to the
+ >>> # highest column index being 4, and the ensuing
+ >>> # IndexedRowMatrix will have 5 columns as well.
+ >>> mat = CoordinateMatrix(entries).toIndexedRowMatrix()
+ >>> print(mat.numCols())
+ 5
+ """
+ java_indexed_row_matrix = self._java_matrix_wrapper.call("toIndexedRowMatrix")
+ return IndexedRowMatrix(java_indexed_row_matrix)
+
+
+def _test():
+ import doctest
+ from pyspark import SparkContext
+ from pyspark.sql import SQLContext
+ import pyspark.mllib.linalg.distributed
+ globs = pyspark.mllib.linalg.distributed.__dict__.copy()
+ globs['sc'] = SparkContext('local[2]', 'PythonTest', batchSize=2)
+ globs['sqlContext'] = SQLContext(globs['sc'])
+ (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
+ globs['sc'].stop()
+ if failure_count:
+ exit(-1)
+
+if __name__ == "__main__":
+ _test()