From 54c0789a05a783ce90e0e9848079be442a82966b Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 6 Aug 2015 13:29:31 -0700 Subject: [SPARK-9493] [ML] add featureIndex to handle vector features in IsotonicRegression This PR contains the following changes: * add `featureIndex` to handle vector features (in order to chain isotonic regression easily with output from logistic regression * make getter/setter names consistent with params * remove inheritance from Regressor because it is tricky to handle both `DoubleType` and `VectorType` * simplify test data generation jkbradley zapletal-martin Author: Xiangrui Meng Closes #7952 from mengxr/SPARK-9493 and squashes the following commits: 8818ac3 [Xiangrui Meng] address comments 05e2216 [Xiangrui Meng] address comments 8d08090 [Xiangrui Meng] add featureIndex to handle vector features make getter/setter names consistent with params remove inheritance from Regressor --- .../spark/ml/regression/IsotonicRegression.scala | 202 +++++++++++++++------ .../ml/regression/IsotonicRegressionSuite.scala | 82 +++++---- 2 files changed, 194 insertions(+), 90 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala index 4ece8cf8cf..f570590960 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala @@ -17,44 +17,113 @@ package org.apache.spark.ml.regression +import org.apache.spark.Logging import org.apache.spark.annotation.Experimental -import org.apache.spark.ml.PredictorParams -import org.apache.spark.ml.param.{Param, ParamMap, BooleanParam} -import org.apache.spark.ml.util.{SchemaUtils, Identifiable} -import org.apache.spark.mllib.regression.{IsotonicRegression => MLlibIsotonicRegression} -import org.apache.spark.mllib.regression.{IsotonicRegressionModel => MLlibIsotonicRegressionModel} +import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasLabelCol, HasPredictionCol} +import org.apache.spark.ml.util.{Identifiable, SchemaUtils} +import org.apache.spark.mllib.linalg.{Vector, VectorUDT, Vectors} +import org.apache.spark.mllib.regression.{IsotonicRegression => MLlibIsotonicRegression, IsotonicRegressionModel => MLlibIsotonicRegressionModel} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.types.{DoubleType, DataType} -import org.apache.spark.sql.{Row, DataFrame} +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.functions.{col, lit, udf} +import org.apache.spark.sql.types.{DoubleType, StructType} import org.apache.spark.storage.StorageLevel /** * Params for isotonic regression. */ -private[regression] trait IsotonicRegressionParams extends PredictorParams { +private[regression] trait IsotonicRegressionBase extends Params with HasFeaturesCol + with HasLabelCol with HasPredictionCol with Logging { /** - * Param for weight column name. - * TODO: Move weightCol to sharedParams. - * + * Param for weight column name (default: none). * @group param */ + // TODO: Move weightCol to sharedParams. final val weightCol: Param[String] = - new Param[String](this, "weightCol", "weight column name") + new Param[String](this, "weightCol", + "weight column name. If this is not set or empty, we treat all instance weights as 1.0.") /** @group getParam */ final def getWeightCol: String = $(weightCol) /** - * Param for isotonic parameter. - * Isotonic (increasing) or antitonic (decreasing) sequence. + * Param for whether the output sequence should be isotonic/increasing (true) or + * antitonic/decreasing (false). * @group param */ final val isotonic: BooleanParam = - new BooleanParam(this, "isotonic", "isotonic (increasing) or antitonic (decreasing) sequence") + new BooleanParam(this, "isotonic", + "whether the output sequence should be isotonic/increasing (true) or" + + "antitonic/decreasing (false)") /** @group getParam */ - final def getIsotonicParam: Boolean = $(isotonic) + final def getIsotonic: Boolean = $(isotonic) + + /** + * Param for the index of the feature if [[featuresCol]] is a vector column (default: `0`), no + * effect otherwise. + * @group param + */ + final val featureIndex: IntParam = new IntParam(this, "featureIndex", + "The index of the feature if featuresCol is a vector column, no effect otherwise.") + + /** @group getParam */ + final def getFeatureIndex: Int = $(featureIndex) + + setDefault(isotonic -> true, featureIndex -> 0) + + /** Checks whether the input has weight column. */ + protected[ml] def hasWeightCol: Boolean = { + isDefined(weightCol) && $(weightCol) != "" + } + + /** + * Extracts (label, feature, weight) from input dataset. + */ + protected[ml] def extractWeightedLabeledPoints( + dataset: DataFrame): RDD[(Double, Double, Double)] = { + val f = if (dataset.schema($(featuresCol)).dataType.isInstanceOf[VectorUDT]) { + val idx = $(featureIndex) + val extract = udf { v: Vector => v(idx) } + extract(col($(featuresCol))) + } else { + col($(featuresCol)) + } + val w = if (hasWeightCol) { + col($(weightCol)) + } else { + lit(1.0) + } + dataset.select(col($(labelCol)), f, w) + .map { case Row(label: Double, feature: Double, weights: Double) => + (label, feature, weights) + } + } + + /** + * Validates and transforms input schema. + * @param schema input schema + * @param fitting whether this is in fitting or prediction + * @return output schema + */ + protected[ml] def validateAndTransformSchema( + schema: StructType, + fitting: Boolean): StructType = { + if (fitting) { + SchemaUtils.checkColumnType(schema, $(labelCol), DoubleType) + if (hasWeightCol) { + SchemaUtils.checkColumnType(schema, $(weightCol), DoubleType) + } else { + logInfo("The weight column is not defined. Treat all instance weights as 1.0.") + } + } + val featuresType = schema($(featuresCol)).dataType + require(featuresType == DoubleType || featuresType.isInstanceOf[VectorUDT]) + SchemaUtils.appendColumn(schema, $(predictionCol), DoubleType) + } } /** @@ -67,52 +136,46 @@ private[regression] trait IsotonicRegressionParams extends PredictorParams { * Uses [[org.apache.spark.mllib.regression.IsotonicRegression]]. */ @Experimental -class IsotonicRegression(override val uid: String) - extends Regressor[Double, IsotonicRegression, IsotonicRegressionModel] - with IsotonicRegressionParams { +class IsotonicRegression(override val uid: String) extends Estimator[IsotonicRegressionModel] + with IsotonicRegressionBase { def this() = this(Identifiable.randomUID("isoReg")) - /** - * Set the isotonic parameter. - * Default is true. - * @group setParam - */ - def setIsotonicParam(value: Boolean): this.type = set(isotonic, value) - setDefault(isotonic -> true) + /** @group setParam */ + def setLabelCol(value: String): this.type = set(labelCol, value) - /** - * Set weight column param. - * Default is weight. - * @group setParam - */ - def setWeightParam(value: String): this.type = set(weightCol, value) - setDefault(weightCol -> "weight") + /** @group setParam */ + def setFeaturesCol(value: String): this.type = set(featuresCol, value) - override private[ml] def featuresDataType: DataType = DoubleType + /** @group setParam */ + def setPredictionCol(value: String): this.type = set(predictionCol, value) - override def copy(extra: ParamMap): IsotonicRegression = defaultCopy(extra) + /** @group setParam */ + def setIsotonic(value: Boolean): this.type = set(isotonic, value) - private[this] def extractWeightedLabeledPoints( - dataset: DataFrame): RDD[(Double, Double, Double)] = { + /** @group setParam */ + def setWeightCol(value: String): this.type = set(weightCol, value) - dataset.select($(labelCol), $(featuresCol), $(weightCol)) - .map { case Row(label: Double, features: Double, weights: Double) => - (label, features, weights) - } - } + /** @group setParam */ + def setFeatureIndex(value: Int): this.type = set(featureIndex, value) - override protected def train(dataset: DataFrame): IsotonicRegressionModel = { - SchemaUtils.checkColumnType(dataset.schema, $(weightCol), DoubleType) + override def copy(extra: ParamMap): IsotonicRegression = defaultCopy(extra) + + override def fit(dataset: DataFrame): IsotonicRegressionModel = { + validateAndTransformSchema(dataset.schema, fitting = true) // Extract columns from data. If dataset is persisted, do not persist oldDataset. val instances = extractWeightedLabeledPoints(dataset) val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) val isotonicRegression = new MLlibIsotonicRegression().setIsotonic($(isotonic)) - val parentModel = isotonicRegression.run(instances) + val oldModel = isotonicRegression.run(instances) - new IsotonicRegressionModel(uid, parentModel) + copyValues(new IsotonicRegressionModel(uid, oldModel).setParent(this)) + } + + override def transformSchema(schema: StructType): StructType = { + validateAndTransformSchema(schema, fitting = true) } } @@ -123,22 +186,49 @@ class IsotonicRegression(override val uid: String) * * For detailed rules see [[org.apache.spark.mllib.regression.IsotonicRegressionModel.predict()]]. * - * @param parentModel A [[org.apache.spark.mllib.regression.IsotonicRegressionModel]] - * model trained by [[org.apache.spark.mllib.regression.IsotonicRegression]]. + * @param oldModel A [[org.apache.spark.mllib.regression.IsotonicRegressionModel]] + * model trained by [[org.apache.spark.mllib.regression.IsotonicRegression]]. */ +@Experimental class IsotonicRegressionModel private[ml] ( override val uid: String, - private[ml] val parentModel: MLlibIsotonicRegressionModel) - extends RegressionModel[Double, IsotonicRegressionModel] - with IsotonicRegressionParams { + private val oldModel: MLlibIsotonicRegressionModel) + extends Model[IsotonicRegressionModel] with IsotonicRegressionBase { - override def featuresDataType: DataType = DoubleType + /** @group setParam */ + def setFeaturesCol(value: String): this.type = set(featuresCol, value) - override protected def predict(features: Double): Double = { - parentModel.predict(features) - } + /** @group setParam */ + def setPredictionCol(value: String): this.type = set(predictionCol, value) + + /** @group setParam */ + def setFeatureIndex(value: Int): this.type = set(featureIndex, value) + + /** Boundaries in increasing order for which predictions are known. */ + def boundaries: Vector = Vectors.dense(oldModel.boundaries) + + /** + * Predictions associated with the boundaries at the same index, monotone because of isotonic + * regression. + */ + def predictions: Vector = Vectors.dense(oldModel.predictions) override def copy(extra: ParamMap): IsotonicRegressionModel = { - copyValues(new IsotonicRegressionModel(uid, parentModel), extra) + copyValues(new IsotonicRegressionModel(uid, oldModel), extra) + } + + override def transform(dataset: DataFrame): DataFrame = { + val predict = dataset.schema($(featuresCol)).dataType match { + case DoubleType => + udf { feature: Double => oldModel.predict(feature) } + case _: VectorUDT => + val idx = $(featureIndex) + udf { features: Vector => oldModel.predict(features(idx)) } + } + dataset.withColumn($(predictionCol), predict(col($(featuresCol)))) + } + + override def transformSchema(schema: StructType): StructType = { + validateAndTransformSchema(schema, fitting = false) } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala index 66e4b170ba..c0ab00b68a 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala @@ -19,57 +19,46 @@ package org.apache.spark.ml.regression import org.apache.spark.SparkFunSuite import org.apache.spark.ml.param.ParamsSuite +import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.util.MLlibTestSparkContext -import org.apache.spark.sql.types.{DoubleType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row} class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext { - private val schema = StructType( - Array( - StructField("label", DoubleType), - StructField("features", DoubleType), - StructField("weight", DoubleType))) - - private val predictionSchema = StructType(Array(StructField("features", DoubleType))) - private def generateIsotonicInput(labels: Seq[Double]): DataFrame = { - val data = Seq.tabulate(labels.size)(i => Row(labels(i), i.toDouble, 1d)) - val parallelData = sc.parallelize(data) - - sqlContext.createDataFrame(parallelData, schema) + sqlContext.createDataFrame( + labels.zipWithIndex.map { case (label, i) => (label, i.toDouble, 1.0) } + ).toDF("label", "features", "weight") } private def generatePredictionInput(features: Seq[Double]): DataFrame = { - val data = Seq.tabulate(features.size)(i => Row(features(i))) - - val parallelData = sc.parallelize(data) - sqlContext.createDataFrame(parallelData, predictionSchema) + sqlContext.createDataFrame(features.map(Tuple1.apply)) + .toDF("features") } test("isotonic regression predictions") { val dataset = generateIsotonicInput(Seq(1, 2, 3, 1, 6, 17, 16, 17, 18)) - val trainer = new IsotonicRegression().setIsotonicParam(true) + val ir = new IsotonicRegression().setIsotonic(true) - val model = trainer.fit(dataset) + val model = ir.fit(dataset) val predictions = model .transform(dataset) - .select("prediction").map { - case Row(pred) => pred + .select("prediction").map { case Row(pred) => + pred }.collect() assert(predictions === Array(1, 2, 2, 2, 6, 16.5, 16.5, 17, 18)) - assert(model.parentModel.boundaries === Array(0, 1, 3, 4, 5, 6, 7, 8)) - assert(model.parentModel.predictions === Array(1, 2, 2, 6, 16.5, 16.5, 17.0, 18.0)) - assert(model.parentModel.isotonic) + assert(model.boundaries === Vectors.dense(0, 1, 3, 4, 5, 6, 7, 8)) + assert(model.predictions === Vectors.dense(1, 2, 2, 6, 16.5, 16.5, 17.0, 18.0)) + assert(model.getIsotonic) } test("antitonic regression predictions") { val dataset = generateIsotonicInput(Seq(7, 5, 3, 5, 1)) - val trainer = new IsotonicRegression().setIsotonicParam(false) + val ir = new IsotonicRegression().setIsotonic(false) - val model = trainer.fit(dataset) + val model = ir.fit(dataset) val features = generatePredictionInput(Seq(-2.0, -1.0, 0.5, 0.75, 1.0, 2.0, 9.0)) val predictions = model @@ -94,9 +83,10 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext { val ir = new IsotonicRegression() assert(ir.getLabelCol === "label") assert(ir.getFeaturesCol === "features") - assert(ir.getWeightCol === "weight") assert(ir.getPredictionCol === "prediction") - assert(ir.getIsotonicParam === true) + assert(!ir.isDefined(ir.weightCol)) + assert(ir.getIsotonic) + assert(ir.getFeatureIndex === 0) val model = ir.fit(dataset) model.transform(dataset) @@ -105,21 +95,22 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext { assert(model.getLabelCol === "label") assert(model.getFeaturesCol === "features") - assert(model.getWeightCol === "weight") assert(model.getPredictionCol === "prediction") - assert(model.getIsotonicParam === true) + assert(!model.isDefined(model.weightCol)) + assert(model.getIsotonic) + assert(model.getFeatureIndex === 0) assert(model.hasParent) } test("set parameters") { val isotonicRegression = new IsotonicRegression() - .setIsotonicParam(false) - .setWeightParam("w") + .setIsotonic(false) + .setWeightCol("w") .setFeaturesCol("f") .setLabelCol("l") .setPredictionCol("p") - assert(isotonicRegression.getIsotonicParam === false) + assert(!isotonicRegression.getIsotonic) assert(isotonicRegression.getWeightCol === "w") assert(isotonicRegression.getFeaturesCol === "f") assert(isotonicRegression.getLabelCol === "l") @@ -130,7 +121,7 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext { val dataset = generateIsotonicInput(Seq(1, 2, 3)) intercept[IllegalArgumentException] { - new IsotonicRegression().setWeightParam("w").fit(dataset) + new IsotonicRegression().setWeightCol("w").fit(dataset) } intercept[IllegalArgumentException] { @@ -145,4 +136,27 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext { new IsotonicRegression().fit(dataset).setFeaturesCol("f").transform(dataset) } } + + test("vector features column with feature index") { + val dataset = sqlContext.createDataFrame(Seq( + (4.0, Vectors.dense(0.0, 1.0)), + (3.0, Vectors.dense(0.0, 2.0)), + (5.0, Vectors.sparse(2, Array(1), Array(3.0)))) + ).toDF("label", "features") + + val ir = new IsotonicRegression() + .setFeatureIndex(1) + + val model = ir.fit(dataset) + + val features = generatePredictionInput(Seq(2.0, 3.0, 4.0, 5.0)) + + val predictions = model + .transform(features) + .select("prediction").map { + case Row(pred) => pred + }.collect() + + assert(predictions === Array(3.5, 5.0, 5.0, 5.0)) + } } -- cgit v1.2.3