aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorXiangrui Meng <meng@databricks.com>2015-08-06 13:29:31 -0700
committerJoseph K. Bradley <joseph@databricks.com>2015-08-06 13:29:31 -0700
commit54c0789a05a783ce90e0e9848079be442a82966b (patch)
tree1108e74753dca0409b4e75f481556224a8d4cde0 /mllib
parent1f62f104c7a2aeac625b17d9e5ac62f1f10a2b21 (diff)
downloadspark-54c0789a05a783ce90e0e9848079be442a82966b.tar.gz
spark-54c0789a05a783ce90e0e9848079be442a82966b.tar.bz2
spark-54c0789a05a783ce90e0e9848079be442a82966b.zip
[SPARK-9493] [ML] add featureIndex to handle vector features in IsotonicRegression
This PR contains the following changes: * add `featureIndex` to handle vector features (in order to chain isotonic regression easily with output from logistic regression * make getter/setter names consistent with params * remove inheritance from Regressor because it is tricky to handle both `DoubleType` and `VectorType` * simplify test data generation jkbradley zapletal-martin Author: Xiangrui Meng <meng@databricks.com> Closes #7952 from mengxr/SPARK-9493 and squashes the following commits: 8818ac3 [Xiangrui Meng] address comments 05e2216 [Xiangrui Meng] address comments 8d08090 [Xiangrui Meng] add featureIndex to handle vector features make getter/setter names consistent with params remove inheritance from Regressor
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala202
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala82
2 files changed, 194 insertions, 90 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
index 4ece8cf8cf..f570590960 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
@@ -17,44 +17,113 @@
package org.apache.spark.ml.regression
+import org.apache.spark.Logging
import org.apache.spark.annotation.Experimental
-import org.apache.spark.ml.PredictorParams
-import org.apache.spark.ml.param.{Param, ParamMap, BooleanParam}
-import org.apache.spark.ml.util.{SchemaUtils, Identifiable}
-import org.apache.spark.mllib.regression.{IsotonicRegression => MLlibIsotonicRegression}
-import org.apache.spark.mllib.regression.{IsotonicRegressionModel => MLlibIsotonicRegressionModel}
+import org.apache.spark.ml.{Estimator, Model}
+import org.apache.spark.ml.param._
+import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasLabelCol, HasPredictionCol}
+import org.apache.spark.ml.util.{Identifiable, SchemaUtils}
+import org.apache.spark.mllib.linalg.{Vector, VectorUDT, Vectors}
+import org.apache.spark.mllib.regression.{IsotonicRegression => MLlibIsotonicRegression, IsotonicRegressionModel => MLlibIsotonicRegressionModel}
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.types.{DoubleType, DataType}
-import org.apache.spark.sql.{Row, DataFrame}
+import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.functions.{col, lit, udf}
+import org.apache.spark.sql.types.{DoubleType, StructType}
import org.apache.spark.storage.StorageLevel
/**
* Params for isotonic regression.
*/
-private[regression] trait IsotonicRegressionParams extends PredictorParams {
+private[regression] trait IsotonicRegressionBase extends Params with HasFeaturesCol
+ with HasLabelCol with HasPredictionCol with Logging {
/**
- * Param for weight column name.
- * TODO: Move weightCol to sharedParams.
- *
+ * Param for weight column name (default: none).
* @group param
*/
+ // TODO: Move weightCol to sharedParams.
final val weightCol: Param[String] =
- new Param[String](this, "weightCol", "weight column name")
+ new Param[String](this, "weightCol",
+ "weight column name. If this is not set or empty, we treat all instance weights as 1.0.")
/** @group getParam */
final def getWeightCol: String = $(weightCol)
/**
- * Param for isotonic parameter.
- * Isotonic (increasing) or antitonic (decreasing) sequence.
+ * Param for whether the output sequence should be isotonic/increasing (true) or
+ * antitonic/decreasing (false).
* @group param
*/
final val isotonic: BooleanParam =
- new BooleanParam(this, "isotonic", "isotonic (increasing) or antitonic (decreasing) sequence")
+ new BooleanParam(this, "isotonic",
+ "whether the output sequence should be isotonic/increasing (true) or" +
+ "antitonic/decreasing (false)")
/** @group getParam */
- final def getIsotonicParam: Boolean = $(isotonic)
+ final def getIsotonic: Boolean = $(isotonic)
+
+ /**
+ * Param for the index of the feature if [[featuresCol]] is a vector column (default: `0`), no
+ * effect otherwise.
+ * @group param
+ */
+ final val featureIndex: IntParam = new IntParam(this, "featureIndex",
+ "The index of the feature if featuresCol is a vector column, no effect otherwise.")
+
+ /** @group getParam */
+ final def getFeatureIndex: Int = $(featureIndex)
+
+ setDefault(isotonic -> true, featureIndex -> 0)
+
+ /** Checks whether the input has weight column. */
+ protected[ml] def hasWeightCol: Boolean = {
+ isDefined(weightCol) && $(weightCol) != ""
+ }
+
+ /**
+ * Extracts (label, feature, weight) from input dataset.
+ */
+ protected[ml] def extractWeightedLabeledPoints(
+ dataset: DataFrame): RDD[(Double, Double, Double)] = {
+ val f = if (dataset.schema($(featuresCol)).dataType.isInstanceOf[VectorUDT]) {
+ val idx = $(featureIndex)
+ val extract = udf { v: Vector => v(idx) }
+ extract(col($(featuresCol)))
+ } else {
+ col($(featuresCol))
+ }
+ val w = if (hasWeightCol) {
+ col($(weightCol))
+ } else {
+ lit(1.0)
+ }
+ dataset.select(col($(labelCol)), f, w)
+ .map { case Row(label: Double, feature: Double, weights: Double) =>
+ (label, feature, weights)
+ }
+ }
+
+ /**
+ * Validates and transforms input schema.
+ * @param schema input schema
+ * @param fitting whether this is in fitting or prediction
+ * @return output schema
+ */
+ protected[ml] def validateAndTransformSchema(
+ schema: StructType,
+ fitting: Boolean): StructType = {
+ if (fitting) {
+ SchemaUtils.checkColumnType(schema, $(labelCol), DoubleType)
+ if (hasWeightCol) {
+ SchemaUtils.checkColumnType(schema, $(weightCol), DoubleType)
+ } else {
+ logInfo("The weight column is not defined. Treat all instance weights as 1.0.")
+ }
+ }
+ val featuresType = schema($(featuresCol)).dataType
+ require(featuresType == DoubleType || featuresType.isInstanceOf[VectorUDT])
+ SchemaUtils.appendColumn(schema, $(predictionCol), DoubleType)
+ }
}
/**
@@ -67,52 +136,46 @@ private[regression] trait IsotonicRegressionParams extends PredictorParams {
* Uses [[org.apache.spark.mllib.regression.IsotonicRegression]].
*/
@Experimental
-class IsotonicRegression(override val uid: String)
- extends Regressor[Double, IsotonicRegression, IsotonicRegressionModel]
- with IsotonicRegressionParams {
+class IsotonicRegression(override val uid: String) extends Estimator[IsotonicRegressionModel]
+ with IsotonicRegressionBase {
def this() = this(Identifiable.randomUID("isoReg"))
- /**
- * Set the isotonic parameter.
- * Default is true.
- * @group setParam
- */
- def setIsotonicParam(value: Boolean): this.type = set(isotonic, value)
- setDefault(isotonic -> true)
+ /** @group setParam */
+ def setLabelCol(value: String): this.type = set(labelCol, value)
- /**
- * Set weight column param.
- * Default is weight.
- * @group setParam
- */
- def setWeightParam(value: String): this.type = set(weightCol, value)
- setDefault(weightCol -> "weight")
+ /** @group setParam */
+ def setFeaturesCol(value: String): this.type = set(featuresCol, value)
- override private[ml] def featuresDataType: DataType = DoubleType
+ /** @group setParam */
+ def setPredictionCol(value: String): this.type = set(predictionCol, value)
- override def copy(extra: ParamMap): IsotonicRegression = defaultCopy(extra)
+ /** @group setParam */
+ def setIsotonic(value: Boolean): this.type = set(isotonic, value)
- private[this] def extractWeightedLabeledPoints(
- dataset: DataFrame): RDD[(Double, Double, Double)] = {
+ /** @group setParam */
+ def setWeightCol(value: String): this.type = set(weightCol, value)
- dataset.select($(labelCol), $(featuresCol), $(weightCol))
- .map { case Row(label: Double, features: Double, weights: Double) =>
- (label, features, weights)
- }
- }
+ /** @group setParam */
+ def setFeatureIndex(value: Int): this.type = set(featureIndex, value)
- override protected def train(dataset: DataFrame): IsotonicRegressionModel = {
- SchemaUtils.checkColumnType(dataset.schema, $(weightCol), DoubleType)
+ override def copy(extra: ParamMap): IsotonicRegression = defaultCopy(extra)
+
+ override def fit(dataset: DataFrame): IsotonicRegressionModel = {
+ validateAndTransformSchema(dataset.schema, fitting = true)
// Extract columns from data. If dataset is persisted, do not persist oldDataset.
val instances = extractWeightedLabeledPoints(dataset)
val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE
if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)
val isotonicRegression = new MLlibIsotonicRegression().setIsotonic($(isotonic))
- val parentModel = isotonicRegression.run(instances)
+ val oldModel = isotonicRegression.run(instances)
- new IsotonicRegressionModel(uid, parentModel)
+ copyValues(new IsotonicRegressionModel(uid, oldModel).setParent(this))
+ }
+
+ override def transformSchema(schema: StructType): StructType = {
+ validateAndTransformSchema(schema, fitting = true)
}
}
@@ -123,22 +186,49 @@ class IsotonicRegression(override val uid: String)
*
* For detailed rules see [[org.apache.spark.mllib.regression.IsotonicRegressionModel.predict()]].
*
- * @param parentModel A [[org.apache.spark.mllib.regression.IsotonicRegressionModel]]
- * model trained by [[org.apache.spark.mllib.regression.IsotonicRegression]].
+ * @param oldModel A [[org.apache.spark.mllib.regression.IsotonicRegressionModel]]
+ * model trained by [[org.apache.spark.mllib.regression.IsotonicRegression]].
*/
+@Experimental
class IsotonicRegressionModel private[ml] (
override val uid: String,
- private[ml] val parentModel: MLlibIsotonicRegressionModel)
- extends RegressionModel[Double, IsotonicRegressionModel]
- with IsotonicRegressionParams {
+ private val oldModel: MLlibIsotonicRegressionModel)
+ extends Model[IsotonicRegressionModel] with IsotonicRegressionBase {
- override def featuresDataType: DataType = DoubleType
+ /** @group setParam */
+ def setFeaturesCol(value: String): this.type = set(featuresCol, value)
- override protected def predict(features: Double): Double = {
- parentModel.predict(features)
- }
+ /** @group setParam */
+ def setPredictionCol(value: String): this.type = set(predictionCol, value)
+
+ /** @group setParam */
+ def setFeatureIndex(value: Int): this.type = set(featureIndex, value)
+
+ /** Boundaries in increasing order for which predictions are known. */
+ def boundaries: Vector = Vectors.dense(oldModel.boundaries)
+
+ /**
+ * Predictions associated with the boundaries at the same index, monotone because of isotonic
+ * regression.
+ */
+ def predictions: Vector = Vectors.dense(oldModel.predictions)
override def copy(extra: ParamMap): IsotonicRegressionModel = {
- copyValues(new IsotonicRegressionModel(uid, parentModel), extra)
+ copyValues(new IsotonicRegressionModel(uid, oldModel), extra)
+ }
+
+ override def transform(dataset: DataFrame): DataFrame = {
+ val predict = dataset.schema($(featuresCol)).dataType match {
+ case DoubleType =>
+ udf { feature: Double => oldModel.predict(feature) }
+ case _: VectorUDT =>
+ val idx = $(featureIndex)
+ udf { features: Vector => oldModel.predict(features(idx)) }
+ }
+ dataset.withColumn($(predictionCol), predict(col($(featuresCol))))
+ }
+
+ override def transformSchema(schema: StructType): StructType = {
+ validateAndTransformSchema(schema, fitting = false)
}
}
diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala
index 66e4b170ba..c0ab00b68a 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala
@@ -19,57 +19,46 @@ package org.apache.spark.ml.regression
import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.param.ParamsSuite
+import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLlibTestSparkContext
-import org.apache.spark.sql.types.{DoubleType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row}
class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext {
- private val schema = StructType(
- Array(
- StructField("label", DoubleType),
- StructField("features", DoubleType),
- StructField("weight", DoubleType)))
-
- private val predictionSchema = StructType(Array(StructField("features", DoubleType)))
-
private def generateIsotonicInput(labels: Seq[Double]): DataFrame = {
- val data = Seq.tabulate(labels.size)(i => Row(labels(i), i.toDouble, 1d))
- val parallelData = sc.parallelize(data)
-
- sqlContext.createDataFrame(parallelData, schema)
+ sqlContext.createDataFrame(
+ labels.zipWithIndex.map { case (label, i) => (label, i.toDouble, 1.0) }
+ ).toDF("label", "features", "weight")
}
private def generatePredictionInput(features: Seq[Double]): DataFrame = {
- val data = Seq.tabulate(features.size)(i => Row(features(i)))
-
- val parallelData = sc.parallelize(data)
- sqlContext.createDataFrame(parallelData, predictionSchema)
+ sqlContext.createDataFrame(features.map(Tuple1.apply))
+ .toDF("features")
}
test("isotonic regression predictions") {
val dataset = generateIsotonicInput(Seq(1, 2, 3, 1, 6, 17, 16, 17, 18))
- val trainer = new IsotonicRegression().setIsotonicParam(true)
+ val ir = new IsotonicRegression().setIsotonic(true)
- val model = trainer.fit(dataset)
+ val model = ir.fit(dataset)
val predictions = model
.transform(dataset)
- .select("prediction").map {
- case Row(pred) => pred
+ .select("prediction").map { case Row(pred) =>
+ pred
}.collect()
assert(predictions === Array(1, 2, 2, 2, 6, 16.5, 16.5, 17, 18))
- assert(model.parentModel.boundaries === Array(0, 1, 3, 4, 5, 6, 7, 8))
- assert(model.parentModel.predictions === Array(1, 2, 2, 6, 16.5, 16.5, 17.0, 18.0))
- assert(model.parentModel.isotonic)
+ assert(model.boundaries === Vectors.dense(0, 1, 3, 4, 5, 6, 7, 8))
+ assert(model.predictions === Vectors.dense(1, 2, 2, 6, 16.5, 16.5, 17.0, 18.0))
+ assert(model.getIsotonic)
}
test("antitonic regression predictions") {
val dataset = generateIsotonicInput(Seq(7, 5, 3, 5, 1))
- val trainer = new IsotonicRegression().setIsotonicParam(false)
+ val ir = new IsotonicRegression().setIsotonic(false)
- val model = trainer.fit(dataset)
+ val model = ir.fit(dataset)
val features = generatePredictionInput(Seq(-2.0, -1.0, 0.5, 0.75, 1.0, 2.0, 9.0))
val predictions = model
@@ -94,9 +83,10 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext {
val ir = new IsotonicRegression()
assert(ir.getLabelCol === "label")
assert(ir.getFeaturesCol === "features")
- assert(ir.getWeightCol === "weight")
assert(ir.getPredictionCol === "prediction")
- assert(ir.getIsotonicParam === true)
+ assert(!ir.isDefined(ir.weightCol))
+ assert(ir.getIsotonic)
+ assert(ir.getFeatureIndex === 0)
val model = ir.fit(dataset)
model.transform(dataset)
@@ -105,21 +95,22 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext {
assert(model.getLabelCol === "label")
assert(model.getFeaturesCol === "features")
- assert(model.getWeightCol === "weight")
assert(model.getPredictionCol === "prediction")
- assert(model.getIsotonicParam === true)
+ assert(!model.isDefined(model.weightCol))
+ assert(model.getIsotonic)
+ assert(model.getFeatureIndex === 0)
assert(model.hasParent)
}
test("set parameters") {
val isotonicRegression = new IsotonicRegression()
- .setIsotonicParam(false)
- .setWeightParam("w")
+ .setIsotonic(false)
+ .setWeightCol("w")
.setFeaturesCol("f")
.setLabelCol("l")
.setPredictionCol("p")
- assert(isotonicRegression.getIsotonicParam === false)
+ assert(!isotonicRegression.getIsotonic)
assert(isotonicRegression.getWeightCol === "w")
assert(isotonicRegression.getFeaturesCol === "f")
assert(isotonicRegression.getLabelCol === "l")
@@ -130,7 +121,7 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext {
val dataset = generateIsotonicInput(Seq(1, 2, 3))
intercept[IllegalArgumentException] {
- new IsotonicRegression().setWeightParam("w").fit(dataset)
+ new IsotonicRegression().setWeightCol("w").fit(dataset)
}
intercept[IllegalArgumentException] {
@@ -145,4 +136,27 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext {
new IsotonicRegression().fit(dataset).setFeaturesCol("f").transform(dataset)
}
}
+
+ test("vector features column with feature index") {
+ val dataset = sqlContext.createDataFrame(Seq(
+ (4.0, Vectors.dense(0.0, 1.0)),
+ (3.0, Vectors.dense(0.0, 2.0)),
+ (5.0, Vectors.sparse(2, Array(1), Array(3.0))))
+ ).toDF("label", "features")
+
+ val ir = new IsotonicRegression()
+ .setFeatureIndex(1)
+
+ val model = ir.fit(dataset)
+
+ val features = generatePredictionInput(Seq(2.0, 3.0, 4.0, 5.0))
+
+ val predictions = model
+ .transform(features)
+ .select("prediction").map {
+ case Row(pred) => pred
+ }.collect()
+
+ assert(predictions === Array(3.5, 5.0, 5.0, 5.0))
+ }
}