aboutsummaryrefslogtreecommitdiff
path: root/mllib/src/test
diff options
context:
space:
mode:
authorDB Tsai <dbtsai@alpinenow.com>2014-08-03 21:39:21 -0700
committerXiangrui Meng <meng@databricks.com>2014-08-03 21:39:21 -0700
commitae58aea2d1435b5bb011e68127e1bcddc2edf5b2 (patch)
treeca1a5c60fa45714f8429aed9f96f719c553e92bc /mllib/src/test
parent5507dd8e18fbb52d5e0c64a767103b2418cb09c6 (diff)
downloadspark-ae58aea2d1435b5bb011e68127e1bcddc2edf5b2.tar.gz
spark-ae58aea2d1435b5bb011e68127e1bcddc2edf5b2.tar.bz2
spark-ae58aea2d1435b5bb011e68127e1bcddc2edf5b2.zip
SPARK-2272 [MLlib] Feature scaling which standardizes the range of independent variables or features of data
Feature scaling is a method used to standardize the range of independent variables or features of data. In data processing, it is generally performed during the data preprocessing step. In this work, a trait called `VectorTransformer` is defined for generic transformation on a vector. It contains one method to be implemented, `transform` which applies transformation on a vector. There are two implementations of `VectorTransformer` now, and they all can be easily extended with PMML transformation support. 1) `StandardScaler` - Standardizes features by removing the mean and scaling to unit variance using column summary statistics on the samples in the training set. 2) `Normalizer` - Normalizes samples individually to unit L^n norm Author: DB Tsai <dbtsai@alpinenow.com> Closes #1207 from dbtsai/dbtsai-feature-scaling and squashes the following commits: 78c15d3 [DB Tsai] Alpine Data Labs
Diffstat (limited to 'mllib/src/test')
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/feature/NormalizerSuite.scala120
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/feature/StandardScalerSuite.scala200
2 files changed, 320 insertions, 0 deletions
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/NormalizerSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/NormalizerSuite.scala
new file mode 100644
index 0000000000..fb76dccfdf
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/NormalizerSuite.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.feature
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vectors}
+import org.apache.spark.mllib.util.LocalSparkContext
+import org.apache.spark.mllib.util.TestingUtils._
+
+class NormalizerSuite extends FunSuite with LocalSparkContext {
+
+ val data = Array(
+ Vectors.sparse(3, Seq((0, -2.0), (1, 2.3))),
+ Vectors.dense(0.0, 0.0, 0.0),
+ Vectors.dense(0.6, -1.1, -3.0),
+ Vectors.sparse(3, Seq((1, 0.91), (2, 3.2))),
+ Vectors.sparse(3, Seq((0, 5.7), (1, 0.72), (2, 2.7))),
+ Vectors.sparse(3, Seq())
+ )
+
+ lazy val dataRDD = sc.parallelize(data, 3)
+
+ test("Normalization using L1 distance") {
+ val l1Normalizer = new Normalizer(1)
+
+ val data1 = data.map(l1Normalizer.transform)
+ val data1RDD = l1Normalizer.transform(dataRDD)
+
+ assert((data, data1, data1RDD.collect()).zipped.forall {
+ case (v1: DenseVector, v2: DenseVector, v3: DenseVector) => true
+ case (v1: SparseVector, v2: SparseVector, v3: SparseVector) => true
+ case _ => false
+ }, "The vector type should be preserved after normalization.")
+
+ assert((data1, data1RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
+
+ assert(data1(0).toBreeze.norm(1) ~== 1.0 absTol 1E-5)
+ assert(data1(2).toBreeze.norm(1) ~== 1.0 absTol 1E-5)
+ assert(data1(3).toBreeze.norm(1) ~== 1.0 absTol 1E-5)
+ assert(data1(4).toBreeze.norm(1) ~== 1.0 absTol 1E-5)
+
+ assert(data1(0) ~== Vectors.sparse(3, Seq((0, -0.465116279), (1, 0.53488372))) absTol 1E-5)
+ assert(data1(1) ~== Vectors.dense(0.0, 0.0, 0.0) absTol 1E-5)
+ assert(data1(2) ~== Vectors.dense(0.12765957, -0.23404255, -0.63829787) absTol 1E-5)
+ assert(data1(3) ~== Vectors.sparse(3, Seq((1, 0.22141119), (2, 0.7785888))) absTol 1E-5)
+ assert(data1(4) ~== Vectors.dense(0.625, 0.07894737, 0.29605263) absTol 1E-5)
+ assert(data1(5) ~== Vectors.sparse(3, Seq()) absTol 1E-5)
+ }
+
+ test("Normalization using L2 distance") {
+ val l2Normalizer = new Normalizer()
+
+ val data2 = data.map(l2Normalizer.transform)
+ val data2RDD = l2Normalizer.transform(dataRDD)
+
+ assert((data, data2, data2RDD.collect()).zipped.forall {
+ case (v1: DenseVector, v2: DenseVector, v3: DenseVector) => true
+ case (v1: SparseVector, v2: SparseVector, v3: SparseVector) => true
+ case _ => false
+ }, "The vector type should be preserved after normalization.")
+
+ assert((data2, data2RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
+
+ assert(data2(0).toBreeze.norm(2) ~== 1.0 absTol 1E-5)
+ assert(data2(2).toBreeze.norm(2) ~== 1.0 absTol 1E-5)
+ assert(data2(3).toBreeze.norm(2) ~== 1.0 absTol 1E-5)
+ assert(data2(4).toBreeze.norm(2) ~== 1.0 absTol 1E-5)
+
+ assert(data2(0) ~== Vectors.sparse(3, Seq((0, -0.65617871), (1, 0.75460552))) absTol 1E-5)
+ assert(data2(1) ~== Vectors.dense(0.0, 0.0, 0.0) absTol 1E-5)
+ assert(data2(2) ~== Vectors.dense(0.184549876, -0.3383414, -0.922749378) absTol 1E-5)
+ assert(data2(3) ~== Vectors.sparse(3, Seq((1, 0.27352993), (2, 0.96186349))) absTol 1E-5)
+ assert(data2(4) ~== Vectors.dense(0.897906166, 0.113419726, 0.42532397) absTol 1E-5)
+ assert(data2(5) ~== Vectors.sparse(3, Seq()) absTol 1E-5)
+ }
+
+ test("Normalization using L^Inf distance.") {
+ val lInfNormalizer = new Normalizer(Double.PositiveInfinity)
+
+ val dataInf = data.map(lInfNormalizer.transform)
+ val dataInfRDD = lInfNormalizer.transform(dataRDD)
+
+ assert((data, dataInf, dataInfRDD.collect()).zipped.forall {
+ case (v1: DenseVector, v2: DenseVector, v3: DenseVector) => true
+ case (v1: SparseVector, v2: SparseVector, v3: SparseVector) => true
+ case _ => false
+ }, "The vector type should be preserved after normalization.")
+
+ assert((dataInf, dataInfRDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
+
+ assert(dataInf(0).toArray.map(Math.abs).max ~== 1.0 absTol 1E-5)
+ assert(dataInf(2).toArray.map(Math.abs).max ~== 1.0 absTol 1E-5)
+ assert(dataInf(3).toArray.map(Math.abs).max ~== 1.0 absTol 1E-5)
+ assert(dataInf(4).toArray.map(Math.abs).max ~== 1.0 absTol 1E-5)
+
+ assert(dataInf(0) ~== Vectors.sparse(3, Seq((0, -0.86956522), (1, 1.0))) absTol 1E-5)
+ assert(dataInf(1) ~== Vectors.dense(0.0, 0.0, 0.0) absTol 1E-5)
+ assert(dataInf(2) ~== Vectors.dense(0.2, -0.36666667, -1.0) absTol 1E-5)
+ assert(dataInf(3) ~== Vectors.sparse(3, Seq((1, 0.284375), (2, 1.0))) absTol 1E-5)
+ assert(dataInf(4) ~== Vectors.dense(1.0, 0.12631579, 0.473684211) absTol 1E-5)
+ assert(dataInf(5) ~== Vectors.sparse(3, Seq()) absTol 1E-5)
+ }
+
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/StandardScalerSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/StandardScalerSuite.scala
new file mode 100644
index 0000000000..5a9be923a8
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/StandardScalerSuite.scala
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.feature
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors}
+import org.apache.spark.mllib.util.LocalSparkContext
+import org.apache.spark.mllib.util.TestingUtils._
+import org.apache.spark.mllib.rdd.RDDFunctions._
+import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, MultivariateOnlineSummarizer}
+import org.apache.spark.rdd.RDD
+
+class StandardScalerSuite extends FunSuite with LocalSparkContext {
+
+ private def computeSummary(data: RDD[Vector]): MultivariateStatisticalSummary = {
+ data.treeAggregate(new MultivariateOnlineSummarizer)(
+ (aggregator, data) => aggregator.add(data),
+ (aggregator1, aggregator2) => aggregator1.merge(aggregator2))
+ }
+
+ test("Standardization with dense input") {
+ val data = Array(
+ Vectors.dense(-2.0, 2.3, 0),
+ Vectors.dense(0.0, -1.0, -3.0),
+ Vectors.dense(0.0, -5.1, 0.0),
+ Vectors.dense(3.8, 0.0, 1.9),
+ Vectors.dense(1.7, -0.6, 0.0),
+ Vectors.dense(0.0, 1.9, 0.0)
+ )
+
+ val dataRDD = sc.parallelize(data, 3)
+
+ val standardizer1 = new StandardScaler(withMean = true, withStd = true)
+ val standardizer2 = new StandardScaler()
+ val standardizer3 = new StandardScaler(withMean = true, withStd = false)
+
+ withClue("Using a standardizer before fitting the model should throw exception.") {
+ intercept[IllegalStateException] {
+ data.map(standardizer1.transform)
+ }
+ }
+
+ standardizer1.fit(dataRDD)
+ standardizer2.fit(dataRDD)
+ standardizer3.fit(dataRDD)
+
+ val data1 = data.map(standardizer1.transform)
+ val data2 = data.map(standardizer2.transform)
+ val data3 = data.map(standardizer3.transform)
+
+ val data1RDD = standardizer1.transform(dataRDD)
+ val data2RDD = standardizer2.transform(dataRDD)
+ val data3RDD = standardizer3.transform(dataRDD)
+
+ val summary = computeSummary(dataRDD)
+ val summary1 = computeSummary(data1RDD)
+ val summary2 = computeSummary(data2RDD)
+ val summary3 = computeSummary(data3RDD)
+
+ assert((data, data1, data1RDD.collect()).zipped.forall {
+ case (v1: DenseVector, v2: DenseVector, v3: DenseVector) => true
+ case (v1: SparseVector, v2: SparseVector, v3: SparseVector) => true
+ case _ => false
+ }, "The vector type should be preserved after standardization.")
+
+ assert((data, data2, data2RDD.collect()).zipped.forall {
+ case (v1: DenseVector, v2: DenseVector, v3: DenseVector) => true
+ case (v1: SparseVector, v2: SparseVector, v3: SparseVector) => true
+ case _ => false
+ }, "The vector type should be preserved after standardization.")
+
+ assert((data, data3, data3RDD.collect()).zipped.forall {
+ case (v1: DenseVector, v2: DenseVector, v3: DenseVector) => true
+ case (v1: SparseVector, v2: SparseVector, v3: SparseVector) => true
+ case _ => false
+ }, "The vector type should be preserved after standardization.")
+
+ assert((data1, data1RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
+ assert((data2, data2RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
+ assert((data3, data3RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
+
+ assert(summary1.mean ~== Vectors.dense(0.0, 0.0, 0.0) absTol 1E-5)
+ assert(summary1.variance ~== Vectors.dense(1.0, 1.0, 1.0) absTol 1E-5)
+
+ assert(summary2.mean !~== Vectors.dense(0.0, 0.0, 0.0) absTol 1E-5)
+ assert(summary2.variance ~== Vectors.dense(1.0, 1.0, 1.0) absTol 1E-5)
+
+ assert(summary3.mean ~== Vectors.dense(0.0, 0.0, 0.0) absTol 1E-5)
+ assert(summary3.variance ~== summary.variance absTol 1E-5)
+
+ assert(data1(0) ~== Vectors.dense(-1.31527964, 1.023470449, 0.11637768424) absTol 1E-5)
+ assert(data1(3) ~== Vectors.dense(1.637735298, 0.156973995, 1.32247368462) absTol 1E-5)
+ assert(data2(4) ~== Vectors.dense(0.865538862, -0.22604255, 0.0) absTol 1E-5)
+ assert(data2(5) ~== Vectors.dense(0.0, 0.71580142, 0.0) absTol 1E-5)
+ assert(data3(1) ~== Vectors.dense(-0.58333333, -0.58333333, -2.8166666666) absTol 1E-5)
+ assert(data3(5) ~== Vectors.dense(-0.58333333, 2.316666666, 0.18333333333) absTol 1E-5)
+ }
+
+
+ test("Standardization with sparse input") {
+ val data = Array(
+ Vectors.sparse(3, Seq((0, -2.0), (1, 2.3))),
+ Vectors.sparse(3, Seq((1, -1.0), (2, -3.0))),
+ Vectors.sparse(3, Seq((1, -5.1))),
+ Vectors.sparse(3, Seq((0, 3.8), (2, 1.9))),
+ Vectors.sparse(3, Seq((0, 1.7), (1, -0.6))),
+ Vectors.sparse(3, Seq((1, 1.9)))
+ )
+
+ val dataRDD = sc.parallelize(data, 3)
+
+ val standardizer1 = new StandardScaler(withMean = true, withStd = true)
+ val standardizer2 = new StandardScaler()
+ val standardizer3 = new StandardScaler(withMean = true, withStd = false)
+
+ standardizer1.fit(dataRDD)
+ standardizer2.fit(dataRDD)
+ standardizer3.fit(dataRDD)
+
+ val data2 = data.map(standardizer2.transform)
+
+ withClue("Standardization with mean can not be applied on sparse input.") {
+ intercept[IllegalArgumentException] {
+ data.map(standardizer1.transform)
+ }
+ }
+
+ withClue("Standardization with mean can not be applied on sparse input.") {
+ intercept[IllegalArgumentException] {
+ data.map(standardizer3.transform)
+ }
+ }
+
+ val data2RDD = standardizer2.transform(dataRDD)
+
+ val summary2 = computeSummary(data2RDD)
+
+ assert((data, data2, data2RDD.collect()).zipped.forall {
+ case (v1: DenseVector, v2: DenseVector, v3: DenseVector) => true
+ case (v1: SparseVector, v2: SparseVector, v3: SparseVector) => true
+ case _ => false
+ }, "The vector type should be preserved after standardization.")
+
+ assert((data2, data2RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5))
+
+ assert(summary2.mean !~== Vectors.dense(0.0, 0.0, 0.0) absTol 1E-5)
+ assert(summary2.variance ~== Vectors.dense(1.0, 1.0, 1.0) absTol 1E-5)
+
+ assert(data2(4) ~== Vectors.sparse(3, Seq((0, 0.865538862), (1, -0.22604255))) absTol 1E-5)
+ assert(data2(5) ~== Vectors.sparse(3, Seq((1, 0.71580142))) absTol 1E-5)
+ }
+
+ test("Standardization with constant input") {
+ // When the input data is all constant, the variance is zero. The standardization against
+ // zero variance is not well-defined, but we decide to just set it into zero here.
+ val data = Array(
+ Vectors.dense(2.0),
+ Vectors.dense(2.0),
+ Vectors.dense(2.0)
+ )
+
+ val dataRDD = sc.parallelize(data, 2)
+
+ val standardizer1 = new StandardScaler(withMean = true, withStd = true)
+ val standardizer2 = new StandardScaler(withMean = true, withStd = false)
+ val standardizer3 = new StandardScaler(withMean = false, withStd = true)
+
+ standardizer1.fit(dataRDD)
+ standardizer2.fit(dataRDD)
+ standardizer3.fit(dataRDD)
+
+ val data1 = data.map(standardizer1.transform)
+ val data2 = data.map(standardizer2.transform)
+ val data3 = data.map(standardizer3.transform)
+
+ assert(data1.forall(_.toArray.forall(_ == 0.0)),
+ "The variance is zero, so the transformed result should be 0.0")
+ assert(data2.forall(_.toArray.forall(_ == 0.0)),
+ "The variance is zero, so the transformed result should be 0.0")
+ assert(data3.forall(_.toArray.forall(_ == 0.0)),
+ "The variance is zero, so the transformed result should be 0.0")
+ }
+
+}