aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala176
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/feature/MaxAbsScalerSuite.scala70
2 files changed, 246 insertions, 0 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala
new file mode 100644
index 0000000000..15c308b238
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.feature
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.ml.{Estimator, Model}
+import org.apache.spark.ml.param.{ParamMap, Params}
+import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
+import org.apache.spark.ml.util._
+import org.apache.spark.mllib.linalg.{Vector, Vectors, VectorUDT}
+import org.apache.spark.mllib.stat.Statistics
+import org.apache.spark.sql._
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.types.{StructField, StructType}
+
+/**
+ * Params for [[MaxAbsScaler]] and [[MaxAbsScalerModel]].
+ */
+private[feature] trait MaxAbsScalerParams extends Params with HasInputCol with HasOutputCol {
+
+ /** Validates and transforms the input schema. */
+ protected def validateAndTransformSchema(schema: StructType): StructType = {
+ validateParams()
+ val inputType = schema($(inputCol)).dataType
+ require(inputType.isInstanceOf[VectorUDT],
+ s"Input column ${$(inputCol)} must be a vector column")
+ require(!schema.fieldNames.contains($(outputCol)),
+ s"Output column ${$(outputCol)} already exists.")
+ val outputFields = schema.fields :+ StructField($(outputCol), new VectorUDT, false)
+ StructType(outputFields)
+ }
+}
+
+/**
+ * :: Experimental ::
+ * Rescale each feature individually to range [-1, 1] by dividing through the largest maximum
+ * absolute value in each feature. It does not shift/center the data, and thus does not destroy
+ * any sparsity.
+ */
+@Experimental
+class MaxAbsScaler @Since("2.0.0") (override val uid: String)
+ extends Estimator[MaxAbsScalerModel] with MaxAbsScalerParams with DefaultParamsWritable {
+
+ @Since("2.0.0")
+ def this() = this(Identifiable.randomUID("maxAbsScal"))
+
+ /** @group setParam */
+ def setInputCol(value: String): this.type = set(inputCol, value)
+
+ /** @group setParam */
+ def setOutputCol(value: String): this.type = set(outputCol, value)
+
+ override def fit(dataset: DataFrame): MaxAbsScalerModel = {
+ transformSchema(dataset.schema, logging = true)
+ val input = dataset.select($(inputCol)).map { case Row(v: Vector) => v }
+ val summary = Statistics.colStats(input)
+ val minVals = summary.min.toArray
+ val maxVals = summary.max.toArray
+ val n = minVals.length
+ val maxAbs = Array.tabulate(n) { i => math.max(math.abs(minVals(i)), math.abs(maxVals(i))) }
+
+ copyValues(new MaxAbsScalerModel(uid, Vectors.dense(maxAbs)).setParent(this))
+ }
+
+ override def transformSchema(schema: StructType): StructType = {
+ validateAndTransformSchema(schema)
+ }
+
+ override def copy(extra: ParamMap): MaxAbsScaler = defaultCopy(extra)
+}
+
+@Since("1.6.0")
+object MaxAbsScaler extends DefaultParamsReadable[MaxAbsScaler] {
+
+ @Since("1.6.0")
+ override def load(path: String): MaxAbsScaler = super.load(path)
+}
+
+/**
+ * :: Experimental ::
+ * Model fitted by [[MaxAbsScaler]].
+ *
+ */
+@Experimental
+class MaxAbsScalerModel private[ml] (
+ override val uid: String,
+ val maxAbs: Vector)
+ extends Model[MaxAbsScalerModel] with MaxAbsScalerParams with MLWritable {
+
+ import MaxAbsScalerModel._
+
+ /** @group setParam */
+ def setInputCol(value: String): this.type = set(inputCol, value)
+
+ /** @group setParam */
+ def setOutputCol(value: String): this.type = set(outputCol, value)
+
+ override def transform(dataset: DataFrame): DataFrame = {
+ transformSchema(dataset.schema, logging = true)
+ // TODO: this looks hack, we may have to handle sparse and dense vectors separately.
+ val maxAbsUnzero = Vectors.dense(maxAbs.toArray.map(x => if (x == 0) 1 else x))
+ val reScale = udf { (vector: Vector) =>
+ val brz = vector.toBreeze / maxAbsUnzero.toBreeze
+ Vectors.fromBreeze(brz)
+ }
+ dataset.withColumn($(outputCol), reScale(col($(inputCol))))
+ }
+
+ override def transformSchema(schema: StructType): StructType = {
+ validateAndTransformSchema(schema)
+ }
+
+ override def copy(extra: ParamMap): MaxAbsScalerModel = {
+ val copied = new MaxAbsScalerModel(uid, maxAbs)
+ copyValues(copied, extra).setParent(parent)
+ }
+
+ @Since("1.6.0")
+ override def write: MLWriter = new MaxAbsScalerModelWriter(this)
+}
+
+@Since("1.6.0")
+object MaxAbsScalerModel extends MLReadable[MaxAbsScalerModel] {
+
+ private[MaxAbsScalerModel]
+ class MaxAbsScalerModelWriter(instance: MaxAbsScalerModel) extends MLWriter {
+
+ private case class Data(maxAbs: Vector)
+
+ override protected def saveImpl(path: String): Unit = {
+ DefaultParamsWriter.saveMetadata(instance, path, sc)
+ val data = new Data(instance.maxAbs)
+ val dataPath = new Path(path, "data").toString
+ sqlContext.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
+ }
+ }
+
+ private class MaxAbsScalerModelReader extends MLReader[MaxAbsScalerModel] {
+
+ private val className = classOf[MaxAbsScalerModel].getName
+
+ override def load(path: String): MaxAbsScalerModel = {
+ val metadata = DefaultParamsReader.loadMetadata(path, sc, className)
+ val dataPath = new Path(path, "data").toString
+ val Row(maxAbs: Vector) = sqlContext.read.parquet(dataPath)
+ .select("maxAbs")
+ .head()
+ val model = new MaxAbsScalerModel(metadata.uid, maxAbs)
+ DefaultParamsReader.getAndSetParams(model, metadata)
+ model
+ }
+ }
+
+ @Since("1.6.0")
+ override def read: MLReader[MaxAbsScalerModel] = new MaxAbsScalerModelReader
+
+ @Since("1.6.0")
+ override def load(path: String): MaxAbsScalerModel = super.load(path)
+}
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/MaxAbsScalerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/MaxAbsScalerSuite.scala
new file mode 100644
index 0000000000..e083d47136
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/MaxAbsScalerSuite.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.ml.feature
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
+import org.apache.spark.mllib.linalg.{Vector, Vectors}
+import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.sql.Row
+
+class MaxAbsScalerSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
+ test("MaxAbsScaler fit basic case") {
+ val data = Array(
+ Vectors.dense(1, 0, 100),
+ Vectors.dense(2, 0, 0),
+ Vectors.sparse(3, Array(0, 2), Array(-2, -100)),
+ Vectors.sparse(3, Array(0), Array(-1.5)))
+
+ val expected: Array[Vector] = Array(
+ Vectors.dense(0.5, 0, 1),
+ Vectors.dense(1, 0, 0),
+ Vectors.sparse(3, Array(0, 2), Array(-1, -1)),
+ Vectors.sparse(3, Array(0), Array(-0.75)))
+
+ val df = sqlContext.createDataFrame(data.zip(expected)).toDF("features", "expected")
+ val scaler = new MaxAbsScaler()
+ .setInputCol("features")
+ .setOutputCol("scaled")
+
+ val model = scaler.fit(df)
+ model.transform(df).select("expected", "scaled").collect()
+ .foreach { case Row(vector1: Vector, vector2: Vector) =>
+ assert(vector1.equals(vector2), s"MaxAbsScaler ut error: $vector2 should be $vector1")
+ }
+
+ // copied model must have the same parent.
+ MLTestingUtils.checkCopy(model)
+ }
+
+ test("MaxAbsScaler read/write") {
+ val t = new MaxAbsScaler()
+ .setInputCol("myInputCol")
+ .setOutputCol("myOutputCol")
+ testDefaultReadWrite(t)
+ }
+
+ test("MaxAbsScalerModel read/write") {
+ val instance = new MaxAbsScalerModel(
+ "myMaxAbsScalerModel", Vectors.dense(1.0, 10.0))
+ .setInputCol("myInputCol")
+ .setOutputCol("myOutputCol")
+ val newInstance = testDefaultReadWrite(instance)
+ assert(newInstance.maxAbs === instance.maxAbs)
+ }
+
+}