aboutsummaryrefslogtreecommitdiff
path: root/mllib/src/main
diff options
context:
space:
mode:
authorXiangrui Meng <meng@databricks.com>2015-08-06 13:29:31 -0700
committerJoseph K. Bradley <joseph@databricks.com>2015-08-06 13:29:31 -0700
commit54c0789a05a783ce90e0e9848079be442a82966b (patch)
tree1108e74753dca0409b4e75f481556224a8d4cde0 /mllib/src/main
parent1f62f104c7a2aeac625b17d9e5ac62f1f10a2b21 (diff)
downloadspark-54c0789a05a783ce90e0e9848079be442a82966b.tar.gz
spark-54c0789a05a783ce90e0e9848079be442a82966b.tar.bz2
spark-54c0789a05a783ce90e0e9848079be442a82966b.zip
[SPARK-9493] [ML] add featureIndex to handle vector features in IsotonicRegression
This PR contains the following changes: * add `featureIndex` to handle vector features (in order to chain isotonic regression easily with output from logistic regression * make getter/setter names consistent with params * remove inheritance from Regressor because it is tricky to handle both `DoubleType` and `VectorType` * simplify test data generation jkbradley zapletal-martin Author: Xiangrui Meng <meng@databricks.com> Closes #7952 from mengxr/SPARK-9493 and squashes the following commits: 8818ac3 [Xiangrui Meng] address comments 05e2216 [Xiangrui Meng] address comments 8d08090 [Xiangrui Meng] add featureIndex to handle vector features make getter/setter names consistent with params remove inheritance from Regressor
Diffstat (limited to 'mllib/src/main')
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala202
1 files changed, 146 insertions, 56 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
index 4ece8cf8cf..f570590960 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
@@ -17,44 +17,113 @@
package org.apache.spark.ml.regression
+import org.apache.spark.Logging
import org.apache.spark.annotation.Experimental
-import org.apache.spark.ml.PredictorParams
-import org.apache.spark.ml.param.{Param, ParamMap, BooleanParam}
-import org.apache.spark.ml.util.{SchemaUtils, Identifiable}
-import org.apache.spark.mllib.regression.{IsotonicRegression => MLlibIsotonicRegression}
-import org.apache.spark.mllib.regression.{IsotonicRegressionModel => MLlibIsotonicRegressionModel}
+import org.apache.spark.ml.{Estimator, Model}
+import org.apache.spark.ml.param._
+import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasLabelCol, HasPredictionCol}
+import org.apache.spark.ml.util.{Identifiable, SchemaUtils}
+import org.apache.spark.mllib.linalg.{Vector, VectorUDT, Vectors}
+import org.apache.spark.mllib.regression.{IsotonicRegression => MLlibIsotonicRegression, IsotonicRegressionModel => MLlibIsotonicRegressionModel}
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.types.{DoubleType, DataType}
-import org.apache.spark.sql.{Row, DataFrame}
+import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.functions.{col, lit, udf}
+import org.apache.spark.sql.types.{DoubleType, StructType}
import org.apache.spark.storage.StorageLevel
/**
* Params for isotonic regression.
*/
-private[regression] trait IsotonicRegressionParams extends PredictorParams {
+private[regression] trait IsotonicRegressionBase extends Params with HasFeaturesCol
+ with HasLabelCol with HasPredictionCol with Logging {
/**
- * Param for weight column name.
- * TODO: Move weightCol to sharedParams.
- *
+ * Param for weight column name (default: none).
* @group param
*/
+ // TODO: Move weightCol to sharedParams.
final val weightCol: Param[String] =
- new Param[String](this, "weightCol", "weight column name")
+ new Param[String](this, "weightCol",
+ "weight column name. If this is not set or empty, we treat all instance weights as 1.0.")
/** @group getParam */
final def getWeightCol: String = $(weightCol)
/**
- * Param for isotonic parameter.
- * Isotonic (increasing) or antitonic (decreasing) sequence.
+ * Param for whether the output sequence should be isotonic/increasing (true) or
+ * antitonic/decreasing (false).
* @group param
*/
final val isotonic: BooleanParam =
- new BooleanParam(this, "isotonic", "isotonic (increasing) or antitonic (decreasing) sequence")
+ new BooleanParam(this, "isotonic",
+ "whether the output sequence should be isotonic/increasing (true) or" +
+ "antitonic/decreasing (false)")
/** @group getParam */
- final def getIsotonicParam: Boolean = $(isotonic)
+ final def getIsotonic: Boolean = $(isotonic)
+
+ /**
+ * Param for the index of the feature if [[featuresCol]] is a vector column (default: `0`), no
+ * effect otherwise.
+ * @group param
+ */
+ final val featureIndex: IntParam = new IntParam(this, "featureIndex",
+ "The index of the feature if featuresCol is a vector column, no effect otherwise.")
+
+ /** @group getParam */
+ final def getFeatureIndex: Int = $(featureIndex)
+
+ setDefault(isotonic -> true, featureIndex -> 0)
+
+ /** Checks whether the input has weight column. */
+ protected[ml] def hasWeightCol: Boolean = {
+ isDefined(weightCol) && $(weightCol) != ""
+ }
+
+ /**
+ * Extracts (label, feature, weight) from input dataset.
+ */
+ protected[ml] def extractWeightedLabeledPoints(
+ dataset: DataFrame): RDD[(Double, Double, Double)] = {
+ val f = if (dataset.schema($(featuresCol)).dataType.isInstanceOf[VectorUDT]) {
+ val idx = $(featureIndex)
+ val extract = udf { v: Vector => v(idx) }
+ extract(col($(featuresCol)))
+ } else {
+ col($(featuresCol))
+ }
+ val w = if (hasWeightCol) {
+ col($(weightCol))
+ } else {
+ lit(1.0)
+ }
+ dataset.select(col($(labelCol)), f, w)
+ .map { case Row(label: Double, feature: Double, weights: Double) =>
+ (label, feature, weights)
+ }
+ }
+
+ /**
+ * Validates and transforms input schema.
+ * @param schema input schema
+ * @param fitting whether this is in fitting or prediction
+ * @return output schema
+ */
+ protected[ml] def validateAndTransformSchema(
+ schema: StructType,
+ fitting: Boolean): StructType = {
+ if (fitting) {
+ SchemaUtils.checkColumnType(schema, $(labelCol), DoubleType)
+ if (hasWeightCol) {
+ SchemaUtils.checkColumnType(schema, $(weightCol), DoubleType)
+ } else {
+ logInfo("The weight column is not defined. Treat all instance weights as 1.0.")
+ }
+ }
+ val featuresType = schema($(featuresCol)).dataType
+ require(featuresType == DoubleType || featuresType.isInstanceOf[VectorUDT])
+ SchemaUtils.appendColumn(schema, $(predictionCol), DoubleType)
+ }
}
/**
@@ -67,52 +136,46 @@ private[regression] trait IsotonicRegressionParams extends PredictorParams {
* Uses [[org.apache.spark.mllib.regression.IsotonicRegression]].
*/
@Experimental
-class IsotonicRegression(override val uid: String)
- extends Regressor[Double, IsotonicRegression, IsotonicRegressionModel]
- with IsotonicRegressionParams {
+class IsotonicRegression(override val uid: String) extends Estimator[IsotonicRegressionModel]
+ with IsotonicRegressionBase {
def this() = this(Identifiable.randomUID("isoReg"))
- /**
- * Set the isotonic parameter.
- * Default is true.
- * @group setParam
- */
- def setIsotonicParam(value: Boolean): this.type = set(isotonic, value)
- setDefault(isotonic -> true)
+ /** @group setParam */
+ def setLabelCol(value: String): this.type = set(labelCol, value)
- /**
- * Set weight column param.
- * Default is weight.
- * @group setParam
- */
- def setWeightParam(value: String): this.type = set(weightCol, value)
- setDefault(weightCol -> "weight")
+ /** @group setParam */
+ def setFeaturesCol(value: String): this.type = set(featuresCol, value)
- override private[ml] def featuresDataType: DataType = DoubleType
+ /** @group setParam */
+ def setPredictionCol(value: String): this.type = set(predictionCol, value)
- override def copy(extra: ParamMap): IsotonicRegression = defaultCopy(extra)
+ /** @group setParam */
+ def setIsotonic(value: Boolean): this.type = set(isotonic, value)
- private[this] def extractWeightedLabeledPoints(
- dataset: DataFrame): RDD[(Double, Double, Double)] = {
+ /** @group setParam */
+ def setWeightCol(value: String): this.type = set(weightCol, value)
- dataset.select($(labelCol), $(featuresCol), $(weightCol))
- .map { case Row(label: Double, features: Double, weights: Double) =>
- (label, features, weights)
- }
- }
+ /** @group setParam */
+ def setFeatureIndex(value: Int): this.type = set(featureIndex, value)
- override protected def train(dataset: DataFrame): IsotonicRegressionModel = {
- SchemaUtils.checkColumnType(dataset.schema, $(weightCol), DoubleType)
+ override def copy(extra: ParamMap): IsotonicRegression = defaultCopy(extra)
+
+ override def fit(dataset: DataFrame): IsotonicRegressionModel = {
+ validateAndTransformSchema(dataset.schema, fitting = true)
// Extract columns from data. If dataset is persisted, do not persist oldDataset.
val instances = extractWeightedLabeledPoints(dataset)
val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE
if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)
val isotonicRegression = new MLlibIsotonicRegression().setIsotonic($(isotonic))
- val parentModel = isotonicRegression.run(instances)
+ val oldModel = isotonicRegression.run(instances)
- new IsotonicRegressionModel(uid, parentModel)
+ copyValues(new IsotonicRegressionModel(uid, oldModel).setParent(this))
+ }
+
+ override def transformSchema(schema: StructType): StructType = {
+ validateAndTransformSchema(schema, fitting = true)
}
}
@@ -123,22 +186,49 @@ class IsotonicRegression(override val uid: String)
*
* For detailed rules see [[org.apache.spark.mllib.regression.IsotonicRegressionModel.predict()]].
*
- * @param parentModel A [[org.apache.spark.mllib.regression.IsotonicRegressionModel]]
- * model trained by [[org.apache.spark.mllib.regression.IsotonicRegression]].
+ * @param oldModel A [[org.apache.spark.mllib.regression.IsotonicRegressionModel]]
+ * model trained by [[org.apache.spark.mllib.regression.IsotonicRegression]].
*/
+@Experimental
class IsotonicRegressionModel private[ml] (
override val uid: String,
- private[ml] val parentModel: MLlibIsotonicRegressionModel)
- extends RegressionModel[Double, IsotonicRegressionModel]
- with IsotonicRegressionParams {
+ private val oldModel: MLlibIsotonicRegressionModel)
+ extends Model[IsotonicRegressionModel] with IsotonicRegressionBase {
- override def featuresDataType: DataType = DoubleType
+ /** @group setParam */
+ def setFeaturesCol(value: String): this.type = set(featuresCol, value)
- override protected def predict(features: Double): Double = {
- parentModel.predict(features)
- }
+ /** @group setParam */
+ def setPredictionCol(value: String): this.type = set(predictionCol, value)
+
+ /** @group setParam */
+ def setFeatureIndex(value: Int): this.type = set(featureIndex, value)
+
+ /** Boundaries in increasing order for which predictions are known. */
+ def boundaries: Vector = Vectors.dense(oldModel.boundaries)
+
+ /**
+ * Predictions associated with the boundaries at the same index, monotone because of isotonic
+ * regression.
+ */
+ def predictions: Vector = Vectors.dense(oldModel.predictions)
override def copy(extra: ParamMap): IsotonicRegressionModel = {
- copyValues(new IsotonicRegressionModel(uid, parentModel), extra)
+ copyValues(new IsotonicRegressionModel(uid, oldModel), extra)
+ }
+
+ override def transform(dataset: DataFrame): DataFrame = {
+ val predict = dataset.schema($(featuresCol)).dataType match {
+ case DoubleType =>
+ udf { feature: Double => oldModel.predict(feature) }
+ case _: VectorUDT =>
+ val idx = $(featureIndex)
+ udf { features: Vector => oldModel.predict(features(idx)) }
+ }
+ dataset.withColumn($(predictionCol), predict(col($(featuresCol))))
+ }
+
+ override def transformSchema(schema: StructType): StructType = {
+ validateAndTransformSchema(schema, fitting = false)
}
}