From 9689b663a2a4947ad60795321c770052f3c637f1 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 8 Apr 2014 23:01:15 -0700 Subject: [SPARK-1390] Refactoring of matrices backed by RDDs This is to refactor interfaces for matrices backed by RDDs. It would be better if we have a clear separation of local matrices and those backed by RDDs. Right now, we have 1. `org.apache.spark.mllib.linalg.SparseMatrix`, which is a wrapper over an RDD of matrix entries, i.e., coordinate list format. 2. `org.apache.spark.mllib.linalg.TallSkinnyDenseMatrix`, which is a wrapper over RDD[Array[Double]], i.e. row-oriented format. We will see naming collision when we introduce local `SparseMatrix`, and the name `TallSkinnyDenseMatrix` is not exact if we switch to `RDD[Vector]` from `RDD[Array[Double]]`. It would be better to have "RDD" in the class name to suggest that operations may trigger jobs. The proposed names are (all under `org.apache.spark.mllib.linalg.rdd`): 1. `RDDMatrix`: trait for matrices backed by one or more RDDs 2. `CoordinateRDDMatrix`: wrapper of `RDD[(Long, Long, Double)]` 3. `RowRDDMatrix`: wrapper of `RDD[Vector]` whose rows do not have special ordering 4. `IndexedRowRDDMatrix`: wrapper of `RDD[(Long, Vector)]` whose rows are associated with indices The current code also introduces local matrices. Author: Xiangrui Meng Closes #296 from mengxr/mat and squashes the following commits: 24d8294 [Xiangrui Meng] fix for groupBy returning Iterable bfc2b26 [Xiangrui Meng] merge master 8e4f1f5 [Xiangrui Meng] Merge branch 'master' into mat 0135193 [Xiangrui Meng] address Reza's comments 03cd7e1 [Xiangrui Meng] add pca/gram to IndexedRowMatrix add toBreeze to DistributedMatrix for test simplify tests b177ff1 [Xiangrui Meng] address Matei's comments be119fe [Xiangrui Meng] rename m/n to numRows/numCols for local matrix add tests for matrices b881506 [Xiangrui Meng] rename SparkPCA/SVD to TallSkinnyPCA/SVD e7d0d4a [Xiangrui Meng] move IndexedRDDMatrixRow to IndexedRowRDDMatrix 0d1491c [Xiangrui Meng] fix test errors a85262a [Xiangrui Meng] rename RDDMatrixRow to IndexedRDDMatrixRow b8b6ac3 [Xiangrui Meng] Remove old code 4cf679c [Xiangrui Meng] port pca to RowRDDMatrix, and add multiply and covariance 7836e2f [Xiangrui Meng] initial refactoring of matrices backed by RDDs --- .../org/apache/spark/examples/mllib/SparkPCA.scala | 51 ----------------- .../org/apache/spark/examples/mllib/SparkSVD.scala | 59 -------------------- .../spark/examples/mllib/TallSkinnyPCA.scala | 64 ++++++++++++++++++++++ .../spark/examples/mllib/TallSkinnySVD.scala | 64 ++++++++++++++++++++++ 4 files changed, 128 insertions(+), 110 deletions(-) delete mode 100644 examples/src/main/scala/org/apache/spark/examples/mllib/SparkPCA.scala delete mode 100644 examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnyPCA.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnySVD.scala (limited to 'examples/src/main/scala') diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/SparkPCA.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/SparkPCA.scala deleted file mode 100644 index d4e08c5e12..0000000000 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/SparkPCA.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.examples.mllib - -import org.apache.spark.SparkContext -import org.apache.spark.mllib.linalg.PCA -import org.apache.spark.mllib.linalg.MatrixEntry -import org.apache.spark.mllib.linalg.SparseMatrix -import org.apache.spark.mllib.util._ - - -/** - * Compute PCA of an example matrix. - */ -object SparkPCA { - def main(args: Array[String]) { - if (args.length != 3) { - System.err.println("Usage: SparkPCA m n") - System.exit(1) - } - val sc = new SparkContext(args(0), "PCA", - System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass)) - - val m = args(2).toInt - val n = args(3).toInt - - // Make example matrix - val data = Array.tabulate(m, n) { (a, b) => - (a + 2).toDouble * (b + 1) / (1 + a + b) } - - // recover top principal component - val coeffs = new PCA().setK(1).compute(sc.makeRDD(data)) - - println("top principal component = " + coeffs.mkString(", ")) - } -} diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala deleted file mode 100644 index 2933cec497..0000000000 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.examples.mllib - -import org.apache.spark.SparkContext -import org.apache.spark.mllib.linalg.SVD -import org.apache.spark.mllib.linalg.MatrixEntry -import org.apache.spark.mllib.linalg.SparseMatrix - -/** - * Compute SVD of an example matrix - * Input file should be comma separated, 1 indexed of the form - * i,j,value - * Where i is the column, j the row, and value is the matrix entry - * - * For example input file, see: - * mllib/data/als/test.data (example is 4 x 4) - */ -object SparkSVD { - def main(args: Array[String]) { - if (args.length != 4) { - System.err.println("Usage: SparkSVD m n") - System.exit(1) - } - val sc = new SparkContext(args(0), "SVD", - System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass)) - - // Load and parse the data file - val data = sc.textFile(args(1)).map { line => - val parts = line.split(',') - MatrixEntry(parts(0).toInt - 1, parts(1).toInt - 1, parts(2).toDouble) - } - val m = args(2).toInt - val n = args(3).toInt - - // recover largest singular vector - val decomposed = new SVD().setK(1).compute(SparseMatrix(data, m, n)) - val u = decomposed.U.data - val s = decomposed.S.data - val v = decomposed.V.data - - println("singular values = " + s.collect().mkString) - } -} diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnyPCA.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnyPCA.scala new file mode 100644 index 0000000000..a177435e60 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnyPCA.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.mllib.linalg.distributed.RowMatrix +import org.apache.spark.mllib.linalg.Vectors + +/** + * Compute the principal components of a tall-and-skinny matrix, whose rows are observations. + * + * The input matrix must be stored in row-oriented dense format, one line per row with its entries + * separated by space. For example, + * {{{ + * 0.5 1.0 + * 2.0 3.0 + * 4.0 5.0 + * }}} + * represents a 3-by-2 matrix, whose first row is (0.5, 1.0). + */ +object TallSkinnyPCA { + def main(args: Array[String]) { + if (args.length != 2) { + System.err.println("Usage: TallSkinnyPCA ") + System.exit(1) + } + + val conf = new SparkConf() + .setMaster(args(0)) + .setAppName("TallSkinnyPCA") + .setSparkHome(System.getenv("SPARK_HOME")) + .setJars(SparkContext.jarOfClass(this.getClass)) + val sc = new SparkContext(conf) + + // Load and parse the data file. + val rows = sc.textFile(args(1)).map { line => + val values = line.split(' ').map(_.toDouble) + Vectors.dense(values) + } + val mat = new RowMatrix(rows) + + // Compute principal components. + val pc = mat.computePrincipalComponents(mat.numCols().toInt) + + println("Principal components are:\n" + pc) + + sc.stop() + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnySVD.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnySVD.scala new file mode 100644 index 0000000000..49d09692c8 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnySVD.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.mllib.linalg.distributed.RowMatrix +import org.apache.spark.mllib.linalg.Vectors + +/** + * Compute the singular value decomposition (SVD) of a tall-and-skinny matrix. + * + * The input matrix must be stored in row-oriented dense format, one line per row with its entries + * separated by space. For example, + * {{{ + * 0.5 1.0 + * 2.0 3.0 + * 4.0 5.0 + * }}} + * represents a 3-by-2 matrix, whose first row is (0.5, 1.0). + */ +object TallSkinnySVD { + def main(args: Array[String]) { + if (args.length != 2) { + System.err.println("Usage: TallSkinnySVD ") + System.exit(1) + } + + val conf = new SparkConf() + .setMaster(args(0)) + .setAppName("TallSkinnySVD") + .setSparkHome(System.getenv("SPARK_HOME")) + .setJars(SparkContext.jarOfClass(this.getClass)) + val sc = new SparkContext(conf) + + // Load and parse the data file. + val rows = sc.textFile(args(1)).map { line => + val values = line.split(' ').map(_.toDouble) + Vectors.dense(values) + } + val mat = new RowMatrix(rows) + + // Compute SVD. + val svd = mat.computeSVD(mat.numCols().toInt) + + println("Singular values are " + svd.s) + + sc.stop() + } +} -- cgit v1.2.3