From f25a3ea8d3ee6972efb925826981918549deacaa Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Thu, 21 Apr 2016 16:50:09 -0700 Subject: [SPARK-14734][ML][MLLIB] Added asML, fromML methods for all spark.mllib Vector, Matrix types ## What changes were proposed in this pull request? For maintaining wrappers around spark.mllib algorithms in spark.ml, it will be useful to have ```private[spark]``` methods for converting from one linear algebra representation to another. This PR adds toNew, fromNew methods for all spark.mllib Vector and Matrix types. ## How was this patch tested? Unit tests for all conversions Author: Joseph K. Bradley Closes #12504 from jkbradley/linalg-conversions. --- .../org/apache/spark/mllib/linalg/Matrices.scala | 35 ++++++++++++++++++- .../org/apache/spark/mllib/linalg/Vectors.scala | 33 ++++++++++++++++++ .../apache/spark/mllib/linalg/MatricesSuite.scala | 39 +++++++++++++++++++++- .../apache/spark/mllib/linalg/VectorsSuite.scala | 30 +++++++++++++++++ project/MimaExcludes.scala | 4 +++ 5 files changed, 139 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala index 8c09b69b3c..bb5d6d9d51 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala @@ -24,7 +24,8 @@ import scala.collection.mutable.{ArrayBuffer, ArrayBuilder => MArrayBuilder, Has import breeze.linalg.{CSCMatrix => BSM, DenseMatrix => BDM, Matrix => BM} import com.github.fommil.netlib.BLAS.{getInstance => blas} -import org.apache.spark.annotation.{DeveloperApi, Since} +import org.apache.spark.annotation.Since +import org.apache.spark.ml.{linalg => newlinalg} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericMutableRow import org.apache.spark.sql.catalyst.util.GenericArrayData @@ -158,6 +159,12 @@ sealed trait Matrix extends Serializable { */ @Since("1.5.0") def numActives: Int + + /** + * Convert this matrix to the new mllib-local representation. + * This does NOT copy the data; it copies references. + */ + private[spark] def asML: newlinalg.Matrix } private[spark] class MatrixUDT extends UserDefinedType[Matrix] { @@ -419,6 +426,10 @@ class DenseMatrix @Since("1.3.0") ( } } } + + private[spark] override def asML: newlinalg.DenseMatrix = { + new newlinalg.DenseMatrix(numRows, numCols, values, isTransposed) + } } /** @@ -515,6 +526,11 @@ object DenseMatrix { } matrix } + + /** Convert new linalg type to spark.mllib type. Light copy; only copies references */ + private[spark] def fromML(m: newlinalg.DenseMatrix): DenseMatrix = { + new DenseMatrix(m.numRows, m.numCols, m.values, m.isTransposed) + } } /** @@ -721,6 +737,10 @@ class SparseMatrix @Since("1.3.0") ( } } } + + private[spark] override def asML: newlinalg.SparseMatrix = { + new newlinalg.SparseMatrix(numRows, numCols, colPtrs, rowIndices, values, isTransposed) + } } /** @@ -895,6 +915,11 @@ object SparseMatrix { SparseMatrix.fromCOO(n, n, nnzVals.map(v => (v._2, v._2, v._1))) } } + + /** Convert new linalg type to spark.mllib type. Light copy; only copies references */ + private[spark] def fromML(m: newlinalg.SparseMatrix): SparseMatrix = { + new SparseMatrix(m.numRows, m.numCols, m.colPtrs, m.rowIndices, m.values, m.isTransposed) + } } /** @@ -1177,4 +1202,12 @@ object Matrices { SparseMatrix.fromCOO(numRows, numCols, entries) } } + + /** Convert new linalg type to spark.mllib type. Light copy; only copies references */ + private[spark] def fromML(m: newlinalg.Matrix): Matrix = m match { + case dm: newlinalg.DenseMatrix => + DenseMatrix.fromML(dm) + case sm: newlinalg.SparseMatrix => + SparseMatrix.fromML(sm) + } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala index 5812cdde2c..5ec83e8d5c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala @@ -30,6 +30,7 @@ import org.json4s.jackson.JsonMethods.{compact, parse => parseJson, render} import org.apache.spark.SparkException import org.apache.spark.annotation.{AlphaComponent, Since} +import org.apache.spark.ml.{linalg => newlinalg} import org.apache.spark.mllib.util.NumericParser import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericMutableRow @@ -180,6 +181,12 @@ sealed trait Vector extends Serializable { */ @Since("1.6.0") def toJson: String + + /** + * Convert this vector to the new mllib-local representation. + * This does NOT copy the data; it copies references. + */ + private[spark] def asML: newlinalg.Vector } /** @@ -573,6 +580,14 @@ object Vectors { /** Max number of nonzero entries used in computing hash code. */ private[linalg] val MAX_HASH_NNZ = 128 + + /** Convert new linalg type to spark.mllib type. Light copy; only copies references */ + private[spark] def fromML(v: newlinalg.Vector): Vector = v match { + case dv: newlinalg.DenseVector => + DenseVector.fromML(dv) + case sv: newlinalg.SparseVector => + SparseVector.fromML(sv) + } } /** @@ -686,6 +701,10 @@ class DenseVector @Since("1.0.0") ( val jValue = ("type" -> 1) ~ ("values" -> values.toSeq) compact(render(jValue)) } + + private[spark] override def asML: newlinalg.DenseVector = { + new newlinalg.DenseVector(values) + } } @Since("1.3.0") @@ -694,6 +713,11 @@ object DenseVector { /** Extracts the value array from a dense vector. */ @Since("1.3.0") def unapply(dv: DenseVector): Option[Array[Double]] = Some(dv.values) + + /** Convert new linalg type to spark.mllib type. Light copy; only copies references */ + private[spark] def fromML(v: newlinalg.DenseVector): DenseVector = { + new DenseVector(v.values) + } } /** @@ -882,6 +906,10 @@ class SparseVector @Since("1.0.0") ( ("values" -> values.toSeq) compact(render(jValue)) } + + private[spark] override def asML: newlinalg.SparseVector = { + new newlinalg.SparseVector(size, indices, values) + } } @Since("1.3.0") @@ -889,4 +917,9 @@ object SparseVector { @Since("1.3.0") def unapply(sv: SparseVector): Option[(Int, Array[Int], Array[Double])] = Some((sv.size, sv.indices, sv.values)) + + /** Convert new linalg type to spark.mllib type. Light copy; only copies references */ + private[spark] def fromML(v: newlinalg.SparseVector): SparseVector = { + new SparseVector(v.size, v.indices, v.values) + } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala index e289724cda..b7df02e6c0 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala @@ -19,12 +19,14 @@ package org.apache.spark.mllib.linalg import java.util.Random +import scala.collection.mutable.{Map => MutableMap} + import breeze.linalg.{CSCMatrix, Matrix => BM} import org.mockito.Mockito.when import org.scalatest.mock.MockitoSugar._ -import scala.collection.mutable.{Map => MutableMap} import org.apache.spark.SparkFunSuite +import org.apache.spark.ml.{linalg => newlinalg} import org.apache.spark.mllib.util.TestingUtils._ class MatricesSuite extends SparkFunSuite { @@ -523,4 +525,39 @@ class MatricesSuite extends SparkFunSuite { assert(m.transpose.colIter.toSeq === rows) } } + + test("conversions between new local linalg and mllib linalg") { + val dm: DenseMatrix = new DenseMatrix(3, 2, Array(0.0, 0.0, 1.0, 0.0, 2.0, 3.5)) + val sm: SparseMatrix = dm.toSparse + val sm0: Matrix = sm.asInstanceOf[Matrix] + val dm0: Matrix = dm.asInstanceOf[Matrix] + + def compare(oldM: Matrix, newM: newlinalg.Matrix): Unit = { + assert(oldM.toArray === newM.toArray) + assert(oldM.numCols === newM.numCols) + assert(oldM.numRows === newM.numRows) + } + + val newSM: newlinalg.SparseMatrix = sm.asML + val newDM: newlinalg.DenseMatrix = dm.asML + val newSM0: newlinalg.Matrix = sm0.asML + val newDM0: newlinalg.Matrix = dm0.asML + assert(newSM0.isInstanceOf[newlinalg.SparseMatrix]) + assert(newDM0.isInstanceOf[newlinalg.DenseMatrix]) + compare(sm, newSM) + compare(dm, newDM) + compare(sm0, newSM0) + compare(dm0, newDM0) + + val oldSM: SparseMatrix = SparseMatrix.fromML(newSM) + val oldDM: DenseMatrix = DenseMatrix.fromML(newDM) + val oldSM0: Matrix = Matrices.fromML(newSM0) + val oldDM0: Matrix = Matrices.fromML(newDM0) + assert(oldSM0.isInstanceOf[SparseMatrix]) + assert(oldDM0.isInstanceOf[DenseMatrix]) + compare(oldSM, newSM) + compare(oldDM, newDM) + compare(oldSM0, newSM0) + compare(oldDM0, newDM0) + } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala index e5567492a2..a7c1a07604 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala @@ -24,6 +24,7 @@ import org.json4s.jackson.JsonMethods.{parse => parseJson} import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.internal.Logging +import org.apache.spark.ml.{linalg => newlinalg} import org.apache.spark.mllib.util.TestingUtils._ class VectorsSuite extends SparkFunSuite with Logging { @@ -392,4 +393,33 @@ class VectorsSuite extends SparkFunSuite with Logging { assert(u === v, "toJson/fromJson should preserve vector values.") } } + + test("conversions between new local linalg and mllib linalg") { + val dv: DenseVector = new DenseVector(Array(1.0, 2.0, 3.5)) + val sv: SparseVector = new SparseVector(5, Array(1, 2, 4), Array(1.1, 2.2, 4.4)) + val sv0: Vector = sv.asInstanceOf[Vector] + val dv0: Vector = dv.asInstanceOf[Vector] + + val newSV: newlinalg.SparseVector = sv.asML + val newDV: newlinalg.DenseVector = dv.asML + val newSV0: newlinalg.Vector = sv0.asML + val newDV0: newlinalg.Vector = dv0.asML + assert(newSV0.isInstanceOf[newlinalg.SparseVector]) + assert(newDV0.isInstanceOf[newlinalg.DenseVector]) + assert(sv.toArray === newSV.toArray) + assert(dv.toArray === newDV.toArray) + assert(sv0.toArray === newSV0.toArray) + assert(dv0.toArray === newDV0.toArray) + + val oldSV: SparseVector = SparseVector.fromML(newSV) + val oldDV: DenseVector = DenseVector.fromML(newDV) + val oldSV0: Vector = Vectors.fromML(newSV0) + val oldDV0: Vector = Vectors.fromML(newDV0) + assert(oldSV0.isInstanceOf[SparseVector]) + assert(oldDV0.isInstanceOf[DenseVector]) + assert(oldSV.toArray === newSV.toArray) + assert(oldDV.toArray === newDV.toArray) + assert(oldSV0.toArray === newSV0.toArray) + assert(oldDV0.toArray === newDV0.toArray) + } } diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 9b2a966aaf..c98a39dc0c 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -659,6 +659,10 @@ object MimaExcludes { // [SPARK-14407] Hides HadoopFsRelation related data source API into execution package ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.OutputWriter"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.OutputWriterFactory") + ) ++ Seq( + // SPARK-14734: Add conversions between mllib and ml Vector, Matrix types + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.mllib.linalg.Vector.asML"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.mllib.linalg.Matrix.asML") ) ++ Seq( // SPARK-14704: Create accumulators in TaskMetrics ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.executor.InputMetrics.this"), -- cgit v1.2.3