diff options
Diffstat (limited to 'mllib')
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala | 176 | ||||
-rw-r--r-- | mllib/src/test/scala/org/apache/spark/ml/feature/MaxAbsScalerSuite.scala | 70 |
2 files changed, 246 insertions, 0 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala new file mode 100644 index 0000000000..15c308b238 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import org.apache.hadoop.fs.Path + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.param.{ParamMap, Params} +import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol} +import org.apache.spark.ml.util._ +import org.apache.spark.mllib.linalg.{Vector, Vectors, VectorUDT} +import org.apache.spark.mllib.stat.Statistics +import org.apache.spark.sql._ +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.{StructField, StructType} + +/** + * Params for [[MaxAbsScaler]] and [[MaxAbsScalerModel]]. + */ +private[feature] trait MaxAbsScalerParams extends Params with HasInputCol with HasOutputCol { + + /** Validates and transforms the input schema. */ + protected def validateAndTransformSchema(schema: StructType): StructType = { + validateParams() + val inputType = schema($(inputCol)).dataType + require(inputType.isInstanceOf[VectorUDT], + s"Input column ${$(inputCol)} must be a vector column") + require(!schema.fieldNames.contains($(outputCol)), + s"Output column ${$(outputCol)} already exists.") + val outputFields = schema.fields :+ StructField($(outputCol), new VectorUDT, false) + StructType(outputFields) + } +} + +/** + * :: Experimental :: + * Rescale each feature individually to range [-1, 1] by dividing through the largest maximum + * absolute value in each feature. It does not shift/center the data, and thus does not destroy + * any sparsity. + */ +@Experimental +class MaxAbsScaler @Since("2.0.0") (override val uid: String) + extends Estimator[MaxAbsScalerModel] with MaxAbsScalerParams with DefaultParamsWritable { + + @Since("2.0.0") + def this() = this(Identifiable.randomUID("maxAbsScal")) + + /** @group setParam */ + def setInputCol(value: String): this.type = set(inputCol, value) + + /** @group setParam */ + def setOutputCol(value: String): this.type = set(outputCol, value) + + override def fit(dataset: DataFrame): MaxAbsScalerModel = { + transformSchema(dataset.schema, logging = true) + val input = dataset.select($(inputCol)).map { case Row(v: Vector) => v } + val summary = Statistics.colStats(input) + val minVals = summary.min.toArray + val maxVals = summary.max.toArray + val n = minVals.length + val maxAbs = Array.tabulate(n) { i => math.max(math.abs(minVals(i)), math.abs(maxVals(i))) } + + copyValues(new MaxAbsScalerModel(uid, Vectors.dense(maxAbs)).setParent(this)) + } + + override def transformSchema(schema: StructType): StructType = { + validateAndTransformSchema(schema) + } + + override def copy(extra: ParamMap): MaxAbsScaler = defaultCopy(extra) +} + +@Since("1.6.0") +object MaxAbsScaler extends DefaultParamsReadable[MaxAbsScaler] { + + @Since("1.6.0") + override def load(path: String): MaxAbsScaler = super.load(path) +} + +/** + * :: Experimental :: + * Model fitted by [[MaxAbsScaler]]. + * + */ +@Experimental +class MaxAbsScalerModel private[ml] ( + override val uid: String, + val maxAbs: Vector) + extends Model[MaxAbsScalerModel] with MaxAbsScalerParams with MLWritable { + + import MaxAbsScalerModel._ + + /** @group setParam */ + def setInputCol(value: String): this.type = set(inputCol, value) + + /** @group setParam */ + def setOutputCol(value: String): this.type = set(outputCol, value) + + override def transform(dataset: DataFrame): DataFrame = { + transformSchema(dataset.schema, logging = true) + // TODO: this looks hack, we may have to handle sparse and dense vectors separately. + val maxAbsUnzero = Vectors.dense(maxAbs.toArray.map(x => if (x == 0) 1 else x)) + val reScale = udf { (vector: Vector) => + val brz = vector.toBreeze / maxAbsUnzero.toBreeze + Vectors.fromBreeze(brz) + } + dataset.withColumn($(outputCol), reScale(col($(inputCol)))) + } + + override def transformSchema(schema: StructType): StructType = { + validateAndTransformSchema(schema) + } + + override def copy(extra: ParamMap): MaxAbsScalerModel = { + val copied = new MaxAbsScalerModel(uid, maxAbs) + copyValues(copied, extra).setParent(parent) + } + + @Since("1.6.0") + override def write: MLWriter = new MaxAbsScalerModelWriter(this) +} + +@Since("1.6.0") +object MaxAbsScalerModel extends MLReadable[MaxAbsScalerModel] { + + private[MaxAbsScalerModel] + class MaxAbsScalerModelWriter(instance: MaxAbsScalerModel) extends MLWriter { + + private case class Data(maxAbs: Vector) + + override protected def saveImpl(path: String): Unit = { + DefaultParamsWriter.saveMetadata(instance, path, sc) + val data = new Data(instance.maxAbs) + val dataPath = new Path(path, "data").toString + sqlContext.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) + } + } + + private class MaxAbsScalerModelReader extends MLReader[MaxAbsScalerModel] { + + private val className = classOf[MaxAbsScalerModel].getName + + override def load(path: String): MaxAbsScalerModel = { + val metadata = DefaultParamsReader.loadMetadata(path, sc, className) + val dataPath = new Path(path, "data").toString + val Row(maxAbs: Vector) = sqlContext.read.parquet(dataPath) + .select("maxAbs") + .head() + val model = new MaxAbsScalerModel(metadata.uid, maxAbs) + DefaultParamsReader.getAndSetParams(model, metadata) + model + } + } + + @Since("1.6.0") + override def read: MLReader[MaxAbsScalerModel] = new MaxAbsScalerModelReader + + @Since("1.6.0") + override def load(path: String): MaxAbsScalerModel = super.load(path) +} diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/MaxAbsScalerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/MaxAbsScalerSuite.scala new file mode 100644 index 0000000000..e083d47136 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/MaxAbsScalerSuite.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.ml.feature + +import org.apache.spark.SparkFunSuite +import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} +import org.apache.spark.mllib.linalg.{Vector, Vectors} +import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.sql.Row + +class MaxAbsScalerSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { + test("MaxAbsScaler fit basic case") { + val data = Array( + Vectors.dense(1, 0, 100), + Vectors.dense(2, 0, 0), + Vectors.sparse(3, Array(0, 2), Array(-2, -100)), + Vectors.sparse(3, Array(0), Array(-1.5))) + + val expected: Array[Vector] = Array( + Vectors.dense(0.5, 0, 1), + Vectors.dense(1, 0, 0), + Vectors.sparse(3, Array(0, 2), Array(-1, -1)), + Vectors.sparse(3, Array(0), Array(-0.75))) + + val df = sqlContext.createDataFrame(data.zip(expected)).toDF("features", "expected") + val scaler = new MaxAbsScaler() + .setInputCol("features") + .setOutputCol("scaled") + + val model = scaler.fit(df) + model.transform(df).select("expected", "scaled").collect() + .foreach { case Row(vector1: Vector, vector2: Vector) => + assert(vector1.equals(vector2), s"MaxAbsScaler ut error: $vector2 should be $vector1") + } + + // copied model must have the same parent. + MLTestingUtils.checkCopy(model) + } + + test("MaxAbsScaler read/write") { + val t = new MaxAbsScaler() + .setInputCol("myInputCol") + .setOutputCol("myOutputCol") + testDefaultReadWrite(t) + } + + test("MaxAbsScalerModel read/write") { + val instance = new MaxAbsScalerModel( + "myMaxAbsScalerModel", Vectors.dense(1.0, 10.0)) + .setInputCol("myInputCol") + .setOutputCol("myOutputCol") + val newInstance = testDefaultReadWrite(instance) + assert(newInstance.maxAbs === instance.maxAbs) + } + +} |