diff options
Diffstat (limited to 'mllib-local/src/main/scala/org/apache/spark/ml/linalg/Matrices.scala')
-rw-r--r-- | mllib-local/src/main/scala/org/apache/spark/ml/linalg/Matrices.scala | 274 |
1 files changed, 242 insertions, 32 deletions
diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Matrices.scala b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Matrices.scala index d9ffdeb797..07f3bc2728 100644 --- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Matrices.scala +++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Matrices.scala @@ -44,6 +44,12 @@ sealed trait Matrix extends Serializable { @Since("2.0.0") val isTransposed: Boolean = false + /** Indicates whether the values backing this matrix are arranged in column major order. */ + private[ml] def isColMajor: Boolean = !isTransposed + + /** Indicates whether the values backing this matrix are arranged in row major order. */ + private[ml] def isRowMajor: Boolean = isTransposed + /** Converts to a dense array in column major. */ @Since("2.0.0") def toArray: Array[Double] = { @@ -148,7 +154,8 @@ sealed trait Matrix extends Serializable { * and column indices respectively with the type `Int`, and the final parameter is the * corresponding value in the matrix with type `Double`. */ - private[spark] def foreachActive(f: (Int, Int, Double) => Unit) + @Since("2.2.0") + def foreachActive(f: (Int, Int, Double) => Unit): Unit /** * Find the number of non-zero active values. @@ -161,6 +168,116 @@ sealed trait Matrix extends Serializable { */ @Since("2.0.0") def numActives: Int + + /** + * Converts this matrix to a sparse matrix. + * + * @param colMajor Whether the values of the resulting sparse matrix should be in column major + * or row major order. If `false`, resulting matrix will be row major. + */ + private[ml] def toSparseMatrix(colMajor: Boolean): SparseMatrix + + /** + * Converts this matrix to a sparse matrix in column major order. + */ + @Since("2.2.0") + def toSparseColMajor: SparseMatrix = toSparseMatrix(colMajor = true) + + /** + * Converts this matrix to a sparse matrix in row major order. + */ + @Since("2.2.0") + def toSparseRowMajor: SparseMatrix = toSparseMatrix(colMajor = false) + + /** + * Converts this matrix to a sparse matrix while maintaining the layout of the current matrix. + */ + @Since("2.2.0") + def toSparse: SparseMatrix = toSparseMatrix(colMajor = isColMajor) + + /** + * Converts this matrix to a dense matrix. + * + * @param colMajor Whether the values of the resulting dense matrix should be in column major + * or row major order. If `false`, resulting matrix will be row major. + */ + private[ml] def toDenseMatrix(colMajor: Boolean): DenseMatrix + + /** + * Converts this matrix to a dense matrix while maintaining the layout of the current matrix. + */ + @Since("2.2.0") + def toDense: DenseMatrix = toDenseMatrix(colMajor = isColMajor) + + /** + * Converts this matrix to a dense matrix in row major order. + */ + @Since("2.2.0") + def toDenseRowMajor: DenseMatrix = toDenseMatrix(colMajor = false) + + /** + * Converts this matrix to a dense matrix in column major order. + */ + @Since("2.2.0") + def toDenseColMajor: DenseMatrix = toDenseMatrix(colMajor = true) + + /** + * Returns a matrix in dense or sparse column major format, whichever uses less storage. + */ + @Since("2.2.0") + def compressedColMajor: Matrix = { + if (getDenseSizeInBytes <= getSparseSizeInBytes(colMajor = true)) { + this.toDenseColMajor + } else { + this.toSparseColMajor + } + } + + /** + * Returns a matrix in dense or sparse row major format, whichever uses less storage. + */ + @Since("2.2.0") + def compressedRowMajor: Matrix = { + if (getDenseSizeInBytes <= getSparseSizeInBytes(colMajor = false)) { + this.toDenseRowMajor + } else { + this.toSparseRowMajor + } + } + + /** + * Returns a matrix in dense column major, dense row major, sparse row major, or sparse column + * major format, whichever uses less storage. When dense representation is optimal, it maintains + * the current layout order. + */ + @Since("2.2.0") + def compressed: Matrix = { + val cscSize = getSparseSizeInBytes(colMajor = true) + val csrSize = getSparseSizeInBytes(colMajor = false) + if (getDenseSizeInBytes <= math.min(cscSize, csrSize)) { + // dense matrix size is the same for column major and row major, so maintain current layout + this.toDense + } else if (cscSize <= csrSize) { + this.toSparseColMajor + } else { + this.toSparseRowMajor + } + } + + /** Gets the size of the dense representation of this `Matrix`. */ + private[ml] def getDenseSizeInBytes: Long = { + Matrices.getDenseSize(numCols, numRows) + } + + /** Gets the size of the minimal sparse representation of this `Matrix`. */ + private[ml] def getSparseSizeInBytes(colMajor: Boolean): Long = { + val nnz = numNonzeros + val numPtrs = if (colMajor) numCols + 1L else numRows + 1L + Matrices.getSparseSize(nnz, numPtrs) + } + + /** Gets the current size in bytes of this `Matrix`. Useful for testing */ + private[ml] def getSizeInBytes: Long } /** @@ -258,7 +375,7 @@ class DenseMatrix @Since("2.0.0") ( override def transpose: DenseMatrix = new DenseMatrix(numCols, numRows, values, !isTransposed) - private[spark] override def foreachActive(f: (Int, Int, Double) => Unit): Unit = { + override def foreachActive(f: (Int, Int, Double) => Unit): Unit = { if (!isTransposed) { // outer loop over columns var j = 0 @@ -291,31 +408,49 @@ class DenseMatrix @Since("2.0.0") ( override def numActives: Int = values.length /** - * Generate a `SparseMatrix` from the given `DenseMatrix`. The new matrix will have isTransposed - * set to false. + * Generate a `SparseMatrix` from the given `DenseMatrix`. + * + * @param colMajor Whether the resulting `SparseMatrix` values will be in column major order. */ - @Since("2.0.0") - def toSparse: SparseMatrix = { - val spVals: MArrayBuilder[Double] = new MArrayBuilder.ofDouble - val colPtrs: Array[Int] = new Array[Int](numCols + 1) - val rowIndices: MArrayBuilder[Int] = new MArrayBuilder.ofInt - var nnz = 0 - var j = 0 - while (j < numCols) { - var i = 0 - while (i < numRows) { - val v = values(index(i, j)) - if (v != 0.0) { - rowIndices += i - spVals += v - nnz += 1 + private[ml] override def toSparseMatrix(colMajor: Boolean): SparseMatrix = { + if (!colMajor) this.transpose.toSparseColMajor.transpose + else { + val spVals: MArrayBuilder[Double] = new MArrayBuilder.ofDouble + val colPtrs: Array[Int] = new Array[Int](numCols + 1) + val rowIndices: MArrayBuilder[Int] = new MArrayBuilder.ofInt + var nnz = 0 + var j = 0 + while (j < numCols) { + var i = 0 + while (i < numRows) { + val v = values(index(i, j)) + if (v != 0.0) { + rowIndices += i + spVals += v + nnz += 1 + } + i += 1 } - i += 1 + j += 1 + colPtrs(j) = nnz } - j += 1 - colPtrs(j) = nnz + new SparseMatrix(numRows, numCols, colPtrs, rowIndices.result(), spVals.result()) + } + } + + /** + * Generate a `DenseMatrix` from this `DenseMatrix`. + * + * @param colMajor Whether the resulting `DenseMatrix` values will be in column major order. + */ + private[ml] override def toDenseMatrix(colMajor: Boolean): DenseMatrix = { + if (isRowMajor && colMajor) { + new DenseMatrix(numRows, numCols, this.toArray, isTransposed = false) + } else if (isColMajor && !colMajor) { + new DenseMatrix(numRows, numCols, this.transpose.toArray, isTransposed = true) + } else { + this } - new SparseMatrix(numRows, numCols, colPtrs, rowIndices.result(), spVals.result()) } override def colIter: Iterator[Vector] = { @@ -331,6 +466,8 @@ class DenseMatrix @Since("2.0.0") ( } } } + + private[ml] def getSizeInBytes: Long = Matrices.getDenseSize(numCols, numRows) } /** @@ -560,7 +697,7 @@ class SparseMatrix @Since("2.0.0") ( override def transpose: SparseMatrix = new SparseMatrix(numCols, numRows, colPtrs, rowIndices, values, !isTransposed) - private[spark] override def foreachActive(f: (Int, Int, Double) => Unit): Unit = { + override def foreachActive(f: (Int, Int, Double) => Unit): Unit = { if (!isTransposed) { var j = 0 while (j < numCols) { @@ -587,18 +724,67 @@ class SparseMatrix @Since("2.0.0") ( } } + override def numNonzeros: Int = values.count(_ != 0) + + override def numActives: Int = values.length + /** - * Generate a `DenseMatrix` from the given `SparseMatrix`. The new matrix will have isTransposed - * set to false. + * Generate a `SparseMatrix` from this `SparseMatrix`, removing explicit zero values if they + * exist. + * + * @param colMajor Whether or not the resulting `SparseMatrix` values are in column major + * order. */ - @Since("2.0.0") - def toDense: DenseMatrix = { - new DenseMatrix(numRows, numCols, toArray) + private[ml] override def toSparseMatrix(colMajor: Boolean): SparseMatrix = { + if (isColMajor && !colMajor) { + // it is col major and we want row major, use breeze to remove explicit zeros + val breezeTransposed = asBreeze.asInstanceOf[BSM[Double]].t + Matrices.fromBreeze(breezeTransposed).transpose.asInstanceOf[SparseMatrix] + } else if (isRowMajor && colMajor) { + // it is row major and we want col major, use breeze to remove explicit zeros + val breezeTransposed = asBreeze.asInstanceOf[BSM[Double]] + Matrices.fromBreeze(breezeTransposed).asInstanceOf[SparseMatrix] + } else { + val nnz = numNonzeros + if (nnz != numActives) { + // remove explicit zeros + val rr = new Array[Int](nnz) + val vv = new Array[Double](nnz) + val numPtrs = if (isRowMajor) numRows else numCols + val cc = new Array[Int](numPtrs + 1) + var nzIdx = 0 + var j = 0 + while (j < numPtrs) { + var idx = colPtrs(j) + val idxEnd = colPtrs(j + 1) + cc(j) = nzIdx + while (idx < idxEnd) { + if (values(idx) != 0.0) { + vv(nzIdx) = values(idx) + rr(nzIdx) = rowIndices(idx) + nzIdx += 1 + } + idx += 1 + } + j += 1 + } + cc(j) = nnz + new SparseMatrix(numRows, numCols, cc, rr, vv, isTransposed = isTransposed) + } else { + this + } + } } - override def numNonzeros: Int = values.count(_ != 0) - - override def numActives: Int = values.length + /** + * Generate a `DenseMatrix` from the given `SparseMatrix`. + * + * @param colMajor Whether the resulting `DenseMatrix` values are in column major order. + */ + private[ml] override def toDenseMatrix(colMajor: Boolean): DenseMatrix = { + if (colMajor) new DenseMatrix(numRows, numCols, this.toArray) + else new DenseMatrix(numRows, numCols, this.transpose.toArray, isTransposed = true) + } override def colIter: Iterator[Vector] = { if (isTransposed) { @@ -631,6 +817,8 @@ class SparseMatrix @Since("2.0.0") ( } } } + + private[ml] def getSizeInBytes: Long = Matrices.getSparseSize(numActives, colPtrs.length) } /** @@ -1079,4 +1267,26 @@ object Matrices { SparseMatrix.fromCOO(numRows, numCols, entries) } } + + private[ml] def getSparseSize(numActives: Long, numPtrs: Long): Long = { + /* + Sparse matrices store two int arrays, one double array, two ints, and one boolean: + 8 * values.length + 4 * rowIndices.length + 4 * colPtrs.length + arrayHeader * 3 + 2 * 4 + 1 + */ + val doubleBytes = java.lang.Double.BYTES + val intBytes = java.lang.Integer.BYTES + val arrayHeader = 12L + doubleBytes * numActives + intBytes * numActives + intBytes * numPtrs + arrayHeader * 3L + 9L + } + + private[ml] def getDenseSize(numCols: Long, numRows: Long): Long = { + /* + Dense matrices store one double array, two ints, and one boolean: + 8 * values.length + arrayHeader + 2 * 4 + 1 + */ + val doubleBytes = java.lang.Double.BYTES + val arrayHeader = 12L + doubleBytes * numCols * numRows + arrayHeader + 9L + } + } |