aboutsummaryrefslogtreecommitdiff
path: root/mllib/src/main
diff options
context:
space:
mode:
authorDB Tsai <dbtsai@alpinenow.com>2014-08-03 21:39:21 -0700
committerXiangrui Meng <meng@databricks.com>2014-08-03 21:39:21 -0700
commitae58aea2d1435b5bb011e68127e1bcddc2edf5b2 (patch)
treeca1a5c60fa45714f8429aed9f96f719c553e92bc /mllib/src/main
parent5507dd8e18fbb52d5e0c64a767103b2418cb09c6 (diff)
downloadspark-ae58aea2d1435b5bb011e68127e1bcddc2edf5b2.tar.gz
spark-ae58aea2d1435b5bb011e68127e1bcddc2edf5b2.tar.bz2
spark-ae58aea2d1435b5bb011e68127e1bcddc2edf5b2.zip
SPARK-2272 [MLlib] Feature scaling which standardizes the range of independent variables or features of data
Feature scaling is a method used to standardize the range of independent variables or features of data. In data processing, it is generally performed during the data preprocessing step. In this work, a trait called `VectorTransformer` is defined for generic transformation on a vector. It contains one method to be implemented, `transform` which applies transformation on a vector. There are two implementations of `VectorTransformer` now, and they all can be easily extended with PMML transformation support. 1) `StandardScaler` - Standardizes features by removing the mean and scaling to unit variance using column summary statistics on the samples in the training set. 2) `Normalizer` - Normalizes samples individually to unit L^n norm Author: DB Tsai <dbtsai@alpinenow.com> Closes #1207 from dbtsai/dbtsai-feature-scaling and squashes the following commits: 78c15d3 [DB Tsai] Alpine Data Labs
Diffstat (limited to 'mllib/src/main')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/feature/Normalizer.scala76
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala119
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/feature/VectorTransformer.scala51
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala2
4 files changed, 247 insertions, 1 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Normalizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Normalizer.scala
new file mode 100644
index 0000000000..ea9fd0a80d
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Normalizer.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.feature
+
+import breeze.linalg.{DenseVector => BDV, SparseVector => BSV}
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.mllib.linalg.{Vector, Vectors}
+
+/**
+ * :: DeveloperApi ::
+ * Normalizes samples individually to unit L^p^ norm
+ *
+ * For any 1 <= p < Double.PositiveInfinity, normalizes samples using
+ * sum(abs(vector).^p^)^(1/p)^ as norm.
+ *
+ * For p = Double.PositiveInfinity, max(abs(vector)) will be used as norm for normalization.
+ *
+ * @param p Normalization in L^p^ space, p = 2 by default.
+ */
+@DeveloperApi
+class Normalizer(p: Double) extends VectorTransformer {
+
+ def this() = this(2)
+
+ require(p >= 1.0)
+
+ /**
+ * Applies unit length normalization on a vector.
+ *
+ * @param vector vector to be normalized.
+ * @return normalized vector. If the norm of the input is zero, it will return the input vector.
+ */
+ override def transform(vector: Vector): Vector = {
+ var norm = vector.toBreeze.norm(p)
+
+ if (norm != 0.0) {
+ // For dense vector, we've to allocate new memory for new output vector.
+ // However, for sparse vector, the `index` array will not be changed,
+ // so we can re-use it to save memory.
+ vector.toBreeze match {
+ case dv: BDV[Double] => Vectors.fromBreeze(dv :/ norm)
+ case sv: BSV[Double] =>
+ val output = new BSV[Double](sv.index, sv.data.clone(), sv.length)
+ var i = 0
+ while (i < output.data.length) {
+ output.data(i) /= norm
+ i += 1
+ }
+ Vectors.fromBreeze(output)
+ case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass)
+ }
+ } else {
+ // Since the norm is zero, return the input vector object itself.
+ // Note that it's safe since we always assume that the data in RDD
+ // should be immutable.
+ vector
+ }
+ }
+
+}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala
new file mode 100644
index 0000000000..cc2d7579c2
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.feature
+
+import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV}
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.mllib.linalg.{Vector, Vectors}
+import org.apache.spark.mllib.rdd.RDDFunctions._
+import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
+import org.apache.spark.rdd.RDD
+
+/**
+ * :: DeveloperApi ::
+ * Standardizes features by removing the mean and scaling to unit variance using column summary
+ * statistics on the samples in the training set.
+ *
+ * @param withMean False by default. Centers the data with mean before scaling. It will build a
+ * dense output, so this does not work on sparse input and will raise an exception.
+ * @param withStd True by default. Scales the data to unit standard deviation.
+ */
+@DeveloperApi
+class StandardScaler(withMean: Boolean, withStd: Boolean) extends VectorTransformer {
+
+ def this() = this(false, true)
+
+ require(withMean || withStd, s"withMean and withStd both equal to false. Doing nothing.")
+
+ private var mean: BV[Double] = _
+ private var factor: BV[Double] = _
+
+ /**
+ * Computes the mean and variance and stores as a model to be used for later scaling.
+ *
+ * @param data The data used to compute the mean and variance to build the transformation model.
+ * @return This StandardScalar object.
+ */
+ def fit(data: RDD[Vector]): this.type = {
+ val summary = data.treeAggregate(new MultivariateOnlineSummarizer)(
+ (aggregator, data) => aggregator.add(data),
+ (aggregator1, aggregator2) => aggregator1.merge(aggregator2))
+
+ mean = summary.mean.toBreeze
+ factor = summary.variance.toBreeze
+ require(mean.length == factor.length)
+
+ var i = 0
+ while (i < factor.length) {
+ factor(i) = if (factor(i) != 0.0) 1.0 / math.sqrt(factor(i)) else 0.0
+ i += 1
+ }
+
+ this
+ }
+
+ /**
+ * Applies standardization transformation on a vector.
+ *
+ * @param vector Vector to be standardized.
+ * @return Standardized vector. If the variance of a column is zero, it will return default `0.0`
+ * for the column with zero variance.
+ */
+ override def transform(vector: Vector): Vector = {
+ if (mean == null || factor == null) {
+ throw new IllegalStateException(
+ "Haven't learned column summary statistics yet. Call fit first.")
+ }
+
+ require(vector.size == mean.length)
+
+ if (withMean) {
+ vector.toBreeze match {
+ case dv: BDV[Double] =>
+ val output = vector.toBreeze.copy
+ var i = 0
+ while (i < output.length) {
+ output(i) = (output(i) - mean(i)) * (if (withStd) factor(i) else 1.0)
+ i += 1
+ }
+ Vectors.fromBreeze(output)
+ case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass)
+ }
+ } else if (withStd) {
+ vector.toBreeze match {
+ case dv: BDV[Double] => Vectors.fromBreeze(dv :* factor)
+ case sv: BSV[Double] =>
+ // For sparse vector, the `index` array inside sparse vector object will not be changed,
+ // so we can re-use it to save memory.
+ val output = new BSV[Double](sv.index, sv.data.clone(), sv.length)
+ var i = 0
+ while (i < output.data.length) {
+ output.data(i) *= factor(output.index(i))
+ i += 1
+ }
+ Vectors.fromBreeze(output)
+ case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass)
+ }
+ } else {
+ // Note that it's safe since we always assume that the data in RDD should be immutable.
+ vector
+ }
+ }
+
+}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/VectorTransformer.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/VectorTransformer.scala
new file mode 100644
index 0000000000..415a845332
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/VectorTransformer.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.feature
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.mllib.linalg.Vector
+import org.apache.spark.rdd.RDD
+
+/**
+ * :: DeveloperApi ::
+ * Trait for transformation of a vector
+ */
+@DeveloperApi
+trait VectorTransformer extends Serializable {
+
+ /**
+ * Applies transformation on a vector.
+ *
+ * @param vector vector to be transformed.
+ * @return transformed vector.
+ */
+ def transform(vector: Vector): Vector
+
+ /**
+ * Applies transformation on an RDD[Vector].
+ *
+ * @param data RDD[Vector] to be transformed.
+ * @return transformed RDD[Vector].
+ */
+ def transform(data: RDD[Vector]): RDD[Vector] = {
+ // Later in #1498 , all RDD objects are sent via broadcasting instead of akka.
+ // So it should be no longer necessary to explicitly broadcast `this` object.
+ data.map(x => this.transform(x))
+ }
+
+}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
index 58c1322757..45486b2c7d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
@@ -19,7 +19,7 @@ package org.apache.spark.mllib.linalg.distributed
import java.util.Arrays
-import breeze.linalg.{Vector => BV, DenseMatrix => BDM, DenseVector => BDV, SparseVector => BSV}
+import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, SparseVector => BSV}
import breeze.linalg.{svd => brzSvd, axpy => brzAxpy}
import breeze.numerics.{sqrt => brzSqrt}
import com.github.fommil.netlib.BLAS.{getInstance => blas}