aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorXusen Yin <yinxusen@gmail.com>2016-04-15 12:58:38 -0700
committerJoseph K. Bradley <joseph@databricks.com>2016-04-15 12:58:38 -0700
commit90b46e014a60069bd18754b02fce056d8f4d1b3e (patch)
tree926544baf9126f7ec1d95fbe1405e45bb2ac90d4 /python
parent129f2f455da982ec9fab593299fa4021b62827eb (diff)
downloadspark-90b46e014a60069bd18754b02fce056d8f4d1b3e.tar.gz
spark-90b46e014a60069bd18754b02fce056d8f4d1b3e.tar.bz2
spark-90b46e014a60069bd18754b02fce056d8f4d1b3e.zip
[SPARK-7861][ML] PySpark OneVsRest
## What changes were proposed in this pull request? https://issues.apache.org/jira/browse/SPARK-7861 Add PySpark OneVsRest. I implement it with Python since it's a meta-pipeline. ## How was this patch tested? Test with doctest. Author: Xusen Yin <yinxusen@gmail.com> Closes #12124 from yinxusen/SPARK-14306-7861.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/ml/classification.py224
-rw-r--r--python/pyspark/ml/tests.py32
2 files changed, 249 insertions, 7 deletions
diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py
index 7051798485..089316729c 100644
--- a/python/pyspark/ml/classification.py
+++ b/python/pyspark/ml/classification.py
@@ -15,18 +15,21 @@
# limitations under the License.
#
+import operator
import warnings
-from pyspark import since
-from pyspark.ml.util import *
-from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaWrapper
-from pyspark.ml.param import TypeConverters
+from pyspark.ml import Estimator, Model
from pyspark.ml.param.shared import *
from pyspark.ml.regression import (
RandomForestParams, TreeEnsembleParams, DecisionTreeModel, TreeEnsembleModels)
+from pyspark.ml.util import *
+from pyspark.ml.wrapper import JavaEstimator, JavaModel
+from pyspark.ml.wrapper import JavaWrapper
from pyspark.mllib.common import inherit_doc
from pyspark.sql import DataFrame
-
+from pyspark.sql.functions import udf, when
+from pyspark.sql.types import ArrayType, DoubleType
+from pyspark.storagelevel import StorageLevel
__all__ = ['LogisticRegression', 'LogisticRegressionModel',
'LogisticRegressionSummary', 'LogisticRegressionTrainingSummary',
@@ -35,7 +38,8 @@ __all__ = ['LogisticRegression', 'LogisticRegressionModel',
'GBTClassifier', 'GBTClassificationModel',
'RandomForestClassifier', 'RandomForestClassificationModel',
'NaiveBayes', 'NaiveBayesModel',
- 'MultilayerPerceptronClassifier', 'MultilayerPerceptronClassificationModel']
+ 'MultilayerPerceptronClassifier', 'MultilayerPerceptronClassificationModel',
+ 'OneVsRest', 'OneVsRestModel']
@inherit_doc
@@ -1156,6 +1160,214 @@ class MultilayerPerceptronClassificationModel(JavaModel, JavaMLWritable, JavaMLR
return self._call_java("weights")
+@inherit_doc
+class OneVsRest(Estimator, HasFeaturesCol, HasLabelCol, HasPredictionCol):
+ """
+ Reduction of Multiclass Classification to Binary Classification.
+ Performs reduction using one against all strategy.
+ For a multiclass classification with k classes, train k models (one per class).
+ Each example is scored against all k models and the model with highest score
+ is picked to label the example.
+
+ >>> from pyspark.sql import Row
+ >>> from pyspark.mllib.linalg import Vectors
+ >>> df = sc.parallelize([
+ ... Row(label=0.0, features=Vectors.dense(1.0, 0.8)),
+ ... Row(label=1.0, features=Vectors.sparse(2, [], [])),
+ ... Row(label=2.0, features=Vectors.dense(0.5, 0.5))]).toDF()
+ >>> lr = LogisticRegression(maxIter=5, regParam=0.01)
+ >>> ovr = OneVsRest(classifier=lr)
+ >>> model = ovr.fit(df)
+ >>> [x.coefficients for x in model.models]
+ [DenseVector([3.3925, 1.8785]), DenseVector([-4.3016, -6.3163]), DenseVector([-4.5855, 6.1785])]
+ >>> [x.intercept for x in model.models]
+ [-3.6474708290602034, 2.5507881951814495, -1.1016513228162115]
+ >>> test0 = sc.parallelize([Row(features=Vectors.dense(-1.0, 0.0))]).toDF()
+ >>> model.transform(test0).head().prediction
+ 1.0
+ >>> test1 = sc.parallelize([Row(features=Vectors.sparse(2, [0], [1.0]))]).toDF()
+ >>> model.transform(test1).head().prediction
+ 0.0
+ >>> test2 = sc.parallelize([Row(features=Vectors.dense(0.5, 0.4))]).toDF()
+ >>> model.transform(test2).head().prediction
+ 2.0
+
+ .. versionadded:: 2.0.0
+ """
+
+ classifier = Param(Params._dummy(), "classifier", "base binary classifier")
+
+ @keyword_only
+ def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction",
+ classifier=None):
+ """
+ __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
+ classifier=None)
+ """
+ super(OneVsRest, self).__init__()
+ kwargs = self.__init__._input_kwargs
+ self._set(**kwargs)
+
+ @keyword_only
+ @since("2.0.0")
+ def setParams(self, featuresCol=None, labelCol=None, predictionCol=None, classifier=None):
+ """
+ setParams(self, featuresCol=None, labelCol=None, predictionCol=None, classifier=None):
+ Sets params for OneVsRest.
+ """
+ kwargs = self.setParams._input_kwargs
+ return self._set(**kwargs)
+
+ @since("2.0.0")
+ def setClassifier(self, value):
+ """
+ Sets the value of :py:attr:`classifier`.
+
+ .. note:: Only LogisticRegression and NaiveBayes are supported now.
+ """
+ self._set(classifier=value)
+ return self
+
+ @since("2.0.0")
+ def getClassifier(self):
+ """
+ Gets the value of classifier or its default value.
+ """
+ return self.getOrDefault(self.classifier)
+
+ def _fit(self, dataset):
+ labelCol = self.getLabelCol()
+ featuresCol = self.getFeaturesCol()
+ predictionCol = self.getPredictionCol()
+ classifier = self.getClassifier()
+ assert isinstance(classifier, HasRawPredictionCol),\
+ "Classifier %s doesn't extend from HasRawPredictionCol." % type(classifier)
+
+ numClasses = int(dataset.agg({labelCol: "max"}).head()["max("+labelCol+")"]) + 1
+
+ multiclassLabeled = dataset.select(labelCol, featuresCol)
+
+ # persist if underlying dataset is not persistent.
+ handlePersistence = \
+ dataset.rdd.getStorageLevel() == StorageLevel(False, False, False, False)
+ if handlePersistence:
+ multiclassLabeled.persist(StorageLevel.MEMORY_AND_DISK)
+
+ def trainSingleClass(index):
+ binaryLabelCol = "mc2b$" + str(index)
+ trainingDataset = multiclassLabeled.withColumn(
+ binaryLabelCol,
+ when(multiclassLabeled[labelCol] == float(index), 1.0).otherwise(0.0))
+ paramMap = dict([(classifier.labelCol, binaryLabelCol),
+ (classifier.featuresCol, featuresCol),
+ (classifier.predictionCol, predictionCol)])
+ return classifier.fit(trainingDataset, paramMap)
+
+ # TODO: Parallel training for all classes.
+ models = [trainSingleClass(i) for i in range(numClasses)]
+
+ if handlePersistence:
+ multiclassLabeled.unpersist()
+
+ return self._copyValues(OneVsRestModel(models=models))
+
+ @since("2.0.0")
+ def copy(self, extra=None):
+ """
+ Creates a copy of this instance with a randomly generated uid
+ and some extra params. This creates a deep copy of the embedded paramMap,
+ and copies the embedded and extra parameters over.
+
+ :param extra: Extra parameters to copy to the new instance
+ :return: Copy of this instance
+ """
+ if extra is None:
+ extra = dict()
+ newOvr = Params.copy(self, extra)
+ if self.isSet(self.classifier):
+ newOvr.setClassifier(self.getClassifier().copy(extra))
+ return newOvr
+
+
+class OneVsRestModel(Model, HasFeaturesCol, HasLabelCol, HasPredictionCol):
+ """
+ Model fitted by OneVsRest.
+ This stores the models resulting from training k binary classifiers: one for each class.
+ Each example is scored against all k models, and the model with the highest score
+ is picked to label the example.
+
+ .. versionadded:: 2.0.0
+ """
+
+ def __init__(self, models):
+ super(OneVsRestModel, self).__init__()
+ self.models = models
+
+ def _transform(self, dataset):
+ # determine the input columns: these need to be passed through
+ origCols = dataset.columns
+
+ # add an accumulator column to store predictions of all the models
+ accColName = "mbc$acc" + str(uuid.uuid4())
+ initUDF = udf(lambda _: [], ArrayType(DoubleType()))
+ newDataset = dataset.withColumn(accColName, initUDF(dataset[origCols[0]]))
+
+ # persist if underlying dataset is not persistent.
+ handlePersistence = \
+ dataset.rdd.getStorageLevel() == StorageLevel(False, False, False, False)
+ if handlePersistence:
+ newDataset.persist(StorageLevel.MEMORY_AND_DISK)
+
+ # update the accumulator column with the result of prediction of models
+ aggregatedDataset = newDataset
+ for index, model in enumerate(self.models):
+ rawPredictionCol = model._call_java("getRawPredictionCol")
+ columns = origCols + [rawPredictionCol, accColName]
+
+ # add temporary column to store intermediate scores and update
+ tmpColName = "mbc$tmp" + str(uuid.uuid4())
+ updateUDF = udf(
+ lambda predictions, prediction: predictions + [prediction.tolist()[1]],
+ ArrayType(DoubleType()))
+ transformedDataset = model.transform(aggregatedDataset).select(*columns)
+ updatedDataset = transformedDataset.withColumn(
+ tmpColName,
+ updateUDF(transformedDataset[accColName], transformedDataset[rawPredictionCol]))
+ newColumns = origCols + [tmpColName]
+
+ # switch out the intermediate column with the accumulator column
+ aggregatedDataset = updatedDataset\
+ .select(*newColumns).withColumnRenamed(tmpColName, accColName)
+
+ if handlePersistence:
+ newDataset.unpersist()
+
+ # output the index of the classifier with highest confidence as prediction
+ labelUDF = udf(
+ lambda predictions: float(max(enumerate(predictions), key=operator.itemgetter(1))[0]),
+ DoubleType())
+
+ # output label and label metadata as prediction
+ return aggregatedDataset.withColumn(
+ self.getPredictionCol(), labelUDF(aggregatedDataset[accColName])).drop(accColName)
+
+ @since("2.0.0")
+ def copy(self, extra=None):
+ """
+ Creates a copy of this instance with a randomly generated uid
+ and some extra params. This creates a deep copy of the embedded paramMap,
+ and copies the embedded and extra parameters over.
+
+ :param extra: Extra parameters to copy to the new instance
+ :return: Copy of this instance
+ """
+ if extra is None:
+ extra = dict()
+ newModel = Params.copy(self, extra)
+ newModel.models = [model.copy(extra) for model in self.models]
+ return newModel
+
+
if __name__ == "__main__":
import doctest
import pyspark.ml.classification
diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py
index 85ad949c5a..d595eff5b4 100644
--- a/python/pyspark/ml/tests.py
+++ b/python/pyspark/ml/tests.py
@@ -43,7 +43,7 @@ import tempfile
import numpy as np
from pyspark.ml import Estimator, Model, Pipeline, PipelineModel, Transformer
-from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier
+from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, OneVsRest
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator
from pyspark.ml.feature import *
@@ -850,6 +850,36 @@ class TrainingSummaryTest(PySparkTestCase):
self.assertAlmostEqual(sameSummary.areaUnderROC, s.areaUnderROC)
+class OneVsRestTests(PySparkTestCase):
+
+ def test_copy(self):
+ sqlContext = SQLContext(self.sc)
+ df = sqlContext.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
+ (1.0, Vectors.sparse(2, [], [])),
+ (2.0, Vectors.dense(0.5, 0.5))],
+ ["label", "features"])
+ lr = LogisticRegression(maxIter=5, regParam=0.01)
+ ovr = OneVsRest(classifier=lr)
+ ovr1 = ovr.copy({lr.maxIter: 10})
+ self.assertEqual(ovr.getClassifier().getMaxIter(), 5)
+ self.assertEqual(ovr1.getClassifier().getMaxIter(), 10)
+ model = ovr.fit(df)
+ model1 = model.copy({model.predictionCol: "indexed"})
+ self.assertEqual(model1.getPredictionCol(), "indexed")
+
+ def test_output_columns(self):
+ sqlContext = SQLContext(self.sc)
+ df = sqlContext.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
+ (1.0, Vectors.sparse(2, [], [])),
+ (2.0, Vectors.dense(0.5, 0.5))],
+ ["label", "features"])
+ lr = LogisticRegression(maxIter=5, regParam=0.01)
+ ovr = OneVsRest(classifier=lr)
+ model = ovr.fit(df)
+ output = model.transform(df)
+ self.assertEqual(output.columns, ["label", "features", "prediction"])
+
+
class HashingTFTest(PySparkTestCase):
def test_apply_binary_term_freqs(self):