aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorYanbo Liang <ybliang8@gmail.com>2015-06-30 12:23:48 -0700
committerXiangrui Meng <meng@databricks.com>2015-06-30 12:23:48 -0700
commitc1befd780c3defc843baa75097de7ec427d3f8ca (patch)
tree533a8b2b0d4ef5a1fefcf7539aedeea31e96b8ee /mllib
parent1e1f339976641af4cc87d4010db57c3b600f91af (diff)
downloadspark-c1befd780c3defc843baa75097de7ec427d3f8ca.tar.gz
spark-c1befd780c3defc843baa75097de7ec427d3f8ca.tar.bz2
spark-c1befd780c3defc843baa75097de7ec427d3f8ca.zip
[SPARK-8664] [ML] Add PCA transformer
Add PCA transformer for ML pipeline Author: Yanbo Liang <ybliang8@gmail.com> Closes #7065 from yanboliang/spark-8664 and squashes the following commits: 4afae45 [Yanbo Liang] address comments e9effd7 [Yanbo Liang] Add PCA transformer
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala130
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala2
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/feature/PCASuite.scala64
3 files changed, 195 insertions, 1 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala
new file mode 100644
index 0000000000..2d3bb680cf
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.feature
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.ml._
+import org.apache.spark.ml.param._
+import org.apache.spark.ml.param.shared._
+import org.apache.spark.ml.util.Identifiable
+import org.apache.spark.mllib.feature
+import org.apache.spark.mllib.linalg.{Vector, VectorUDT}
+import org.apache.spark.sql._
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.types.{StructField, StructType}
+
+/**
+ * Params for [[PCA]] and [[PCAModel]].
+ */
+private[feature] trait PCAParams extends Params with HasInputCol with HasOutputCol {
+
+ /**
+ * The number of principal components.
+ * @group param
+ */
+ final val k: IntParam = new IntParam(this, "k", "the number of principal components")
+
+ /** @group getParam */
+ def getK: Int = $(k)
+
+}
+
+/**
+ * :: Experimental ::
+ * PCA trains a model to project vectors to a low-dimensional space using PCA.
+ */
+@Experimental
+class PCA (override val uid: String) extends Estimator[PCAModel] with PCAParams {
+
+ def this() = this(Identifiable.randomUID("pca"))
+
+ /** @group setParam */
+ def setInputCol(value: String): this.type = set(inputCol, value)
+
+ /** @group setParam */
+ def setOutputCol(value: String): this.type = set(outputCol, value)
+
+ /** @group setParam */
+ def setK(value: Int): this.type = set(k, value)
+
+ /**
+ * Computes a [[PCAModel]] that contains the principal components of the input vectors.
+ */
+ override def fit(dataset: DataFrame): PCAModel = {
+ transformSchema(dataset.schema, logging = true)
+ val input = dataset.select($(inputCol)).map { case Row(v: Vector) => v}
+ val pca = new feature.PCA(k = $(k))
+ val pcaModel = pca.fit(input)
+ copyValues(new PCAModel(uid, pcaModel).setParent(this))
+ }
+
+ override def transformSchema(schema: StructType): StructType = {
+ val inputType = schema($(inputCol)).dataType
+ require(inputType.isInstanceOf[VectorUDT],
+ s"Input column ${$(inputCol)} must be a vector column")
+ require(!schema.fieldNames.contains($(outputCol)),
+ s"Output column ${$(outputCol)} already exists.")
+ val outputFields = schema.fields :+ StructField($(outputCol), new VectorUDT, false)
+ StructType(outputFields)
+ }
+
+ override def copy(extra: ParamMap): PCA = defaultCopy(extra)
+}
+
+/**
+ * :: Experimental ::
+ * Model fitted by [[PCA]].
+ */
+@Experimental
+class PCAModel private[ml] (
+ override val uid: String,
+ pcaModel: feature.PCAModel)
+ extends Model[PCAModel] with PCAParams {
+
+ /** @group setParam */
+ def setInputCol(value: String): this.type = set(inputCol, value)
+
+ /** @group setParam */
+ def setOutputCol(value: String): this.type = set(outputCol, value)
+
+ /**
+ * Transform a vector by computed Principal Components.
+ * NOTE: Vectors to be transformed must be the same length
+ * as the source vectors given to [[PCA.fit()]].
+ */
+ override def transform(dataset: DataFrame): DataFrame = {
+ transformSchema(dataset.schema, logging = true)
+ val pcaOp = udf { pcaModel.transform _ }
+ dataset.withColumn($(outputCol), pcaOp(col($(inputCol))))
+ }
+
+ override def transformSchema(schema: StructType): StructType = {
+ val inputType = schema($(inputCol)).dataType
+ require(inputType.isInstanceOf[VectorUDT],
+ s"Input column ${$(inputCol)} must be a vector column")
+ require(!schema.fieldNames.contains($(outputCol)),
+ s"Output column ${$(outputCol)} already exists.")
+ val outputFields = schema.fields :+ StructField($(outputCol), new VectorUDT, false)
+ StructType(outputFields)
+ }
+
+ override def copy(extra: ParamMap): PCAModel = {
+ val copied = new PCAModel(uid, pcaModel)
+ copyValues(copied, extra)
+ }
+}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala
index 4e01e402b4..2a66263d8b 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala
@@ -68,7 +68,7 @@ class PCA(val k: Int) {
* @param k number of principal components.
* @param pc a principal components Matrix. Each column is one principal component.
*/
-class PCAModel private[mllib] (val k: Int, val pc: DenseMatrix) extends VectorTransformer {
+class PCAModel private[spark] (val k: Int, val pc: DenseMatrix) extends VectorTransformer {
/**
* Transform a vector by computed Principal Components.
*
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/PCASuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/PCASuite.scala
new file mode 100644
index 0000000000..d0ae36b28c
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/PCASuite.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.feature
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.ml.param.ParamsSuite
+import org.apache.spark.mllib.linalg.distributed.RowMatrix
+import org.apache.spark.mllib.linalg.{Vector, Vectors, DenseMatrix, Matrices}
+import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.mllib.util.TestingUtils._
+import org.apache.spark.mllib.feature.{PCAModel => OldPCAModel}
+import org.apache.spark.sql.Row
+
+class PCASuite extends SparkFunSuite with MLlibTestSparkContext {
+
+ test("params") {
+ ParamsSuite.checkParams(new PCA)
+ val mat = Matrices.dense(2, 2, Array(0.0, 1.0, 2.0, 3.0)).asInstanceOf[DenseMatrix]
+ val model = new PCAModel("pca", new OldPCAModel(2, mat))
+ ParamsSuite.checkParams(model)
+ }
+
+ test("pca") {
+ val data = Array(
+ Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
+ Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
+ Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
+ )
+
+ val dataRDD = sc.parallelize(data, 2)
+
+ val mat = new RowMatrix(dataRDD)
+ val pc = mat.computePrincipalComponents(3)
+ val expected = mat.multiply(pc).rows
+
+ val df = sqlContext.createDataFrame(dataRDD.zip(expected)).toDF("features", "expected")
+
+ val pca = new PCA()
+ .setInputCol("features")
+ .setOutputCol("pca_features")
+ .setK(3)
+ .fit(df)
+
+ pca.transform(df).select("pca_features", "expected").collect().foreach {
+ case Row(x: Vector, y: Vector) =>
+ assert(x ~== y absTol 1e-5, "Transformed vector is different with expected vector.")
+ }
+ }
+}