aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark
diff options
context:
space:
mode:
authorXiangrui Meng <meng@databricks.com>2015-02-15 20:29:26 -0800
committerXiangrui Meng <meng@databricks.com>2015-02-15 20:29:26 -0800
commitcd4a15366244657c4b7936abe5054754534366f2 (patch)
treefbee98a5031440c879705f2c7f9717b5d815c66e /python/pyspark
parent836577b382695558f5c97d94ee725d0156ebfad2 (diff)
downloadspark-cd4a15366244657c4b7936abe5054754534366f2.tar.gz
spark-cd4a15366244657c4b7936abe5054754534366f2.tar.bz2
spark-cd4a15366244657c4b7936abe5054754534366f2.zip
[SPARK-5769] Set params in constructors and in setParams in Python ML pipelines
This PR allow Python users to set params in constructors and in setParams, where we use decorator `keyword_only` to force keyword arguments. The trade-off is discussed in the design doc of SPARK-4586. Generated doc: ![screen shot 2015-02-12 at 3 06 58 am](https://cloud.githubusercontent.com/assets/829644/6166491/9cfcd06a-b265-11e4-99ea-473d866634fc.png) CC: davies rxin Author: Xiangrui Meng <meng@databricks.com> Closes #4564 from mengxr/py-pipeline-kw and squashes the following commits: fedf720 [Xiangrui Meng] use toDF d565f2c [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into py-pipeline-kw cbc15d3 [Xiangrui Meng] fix style 5032097 [Xiangrui Meng] update pipeline signature 950774e [Xiangrui Meng] simplify keyword_only and update constructor/setParams signatures fdde5fc [Xiangrui Meng] fix style c9384b8 [Xiangrui Meng] fix sphinx doc 8e59180 [Xiangrui Meng] add setParams and make constructors take params, where we force keyword args
Diffstat (limited to 'python/pyspark')
-rw-r--r--python/pyspark/ml/classification.py44
-rw-r--r--python/pyspark/ml/feature.py72
-rw-r--r--python/pyspark/ml/param/__init__.py8
-rw-r--r--python/pyspark/ml/pipeline.py19
-rw-r--r--python/pyspark/ml/util.py15
5 files changed, 132 insertions, 26 deletions
diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py
index 6bd2aa8e47..b6de7493d7 100644
--- a/python/pyspark/ml/classification.py
+++ b/python/pyspark/ml/classification.py
@@ -15,7 +15,7 @@
# limitations under the License.
#
-from pyspark.ml.util import inherit_doc
+from pyspark.ml.util import inherit_doc, keyword_only
from pyspark.ml.wrapper import JavaEstimator, JavaModel
from pyspark.ml.param.shared import HasFeaturesCol, HasLabelCol, HasPredictionCol, HasMaxIter,\
HasRegParam
@@ -32,22 +32,46 @@ class LogisticRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredicti
>>> from pyspark.sql import Row
>>> from pyspark.mllib.linalg import Vectors
- >>> dataset = sqlCtx.inferSchema(sc.parallelize([ \
- Row(label=1.0, features=Vectors.dense(1.0)), \
- Row(label=0.0, features=Vectors.sparse(1, [], []))]))
- >>> lr = LogisticRegression() \
- .setMaxIter(5) \
- .setRegParam(0.01)
- >>> model = lr.fit(dataset)
- >>> test0 = sqlCtx.inferSchema(sc.parallelize([Row(features=Vectors.dense(-1.0))]))
+ >>> df = sc.parallelize([
+ ... Row(label=1.0, features=Vectors.dense(1.0)),
+ ... Row(label=0.0, features=Vectors.sparse(1, [], []))]).toDF()
+ >>> lr = LogisticRegression(maxIter=5, regParam=0.01)
+ >>> model = lr.fit(df)
+ >>> test0 = sc.parallelize([Row(features=Vectors.dense(-1.0))]).toDF()
>>> print model.transform(test0).head().prediction
0.0
- >>> test1 = sqlCtx.inferSchema(sc.parallelize([Row(features=Vectors.sparse(1, [0], [1.0]))]))
+ >>> test1 = sc.parallelize([Row(features=Vectors.sparse(1, [0], [1.0]))]).toDF()
>>> print model.transform(test1).head().prediction
1.0
+ >>> lr.setParams("vector")
+ Traceback (most recent call last):
+ ...
+ TypeError: Method setParams forces keyword arguments.
"""
_java_class = "org.apache.spark.ml.classification.LogisticRegression"
+ @keyword_only
+ def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction",
+ maxIter=100, regParam=0.1):
+ """
+ __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
+ maxIter=100, regParam=0.1)
+ """
+ super(LogisticRegression, self).__init__()
+ kwargs = self.__init__._input_kwargs
+ self.setParams(**kwargs)
+
+ @keyword_only
+ def setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction",
+ maxIter=100, regParam=0.1):
+ """
+ setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
+ maxIter=100, regParam=0.1)
+ Sets params for logistic regression.
+ """
+ kwargs = self.setParams._input_kwargs
+ return self._set_params(**kwargs)
+
def _create_model(self, java_model):
return LogisticRegressionModel(java_model)
diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py
index e088acd0ca..f1ddbb478d 100644
--- a/python/pyspark/ml/feature.py
+++ b/python/pyspark/ml/feature.py
@@ -16,7 +16,7 @@
#
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, HasNumFeatures
-from pyspark.ml.util import inherit_doc
+from pyspark.ml.util import inherit_doc, keyword_only
from pyspark.ml.wrapper import JavaTransformer
__all__ = ['Tokenizer', 'HashingTF']
@@ -29,18 +29,45 @@ class Tokenizer(JavaTransformer, HasInputCol, HasOutputCol):
splits it by white spaces.
>>> from pyspark.sql import Row
- >>> dataset = sqlCtx.inferSchema(sc.parallelize([Row(text="a b c")]))
- >>> tokenizer = Tokenizer() \
- .setInputCol("text") \
- .setOutputCol("words")
- >>> print tokenizer.transform(dataset).head()
+ >>> df = sc.parallelize([Row(text="a b c")]).toDF()
+ >>> tokenizer = Tokenizer(inputCol="text", outputCol="words")
+ >>> print tokenizer.transform(df).head()
Row(text=u'a b c', words=[u'a', u'b', u'c'])
- >>> print tokenizer.transform(dataset, {tokenizer.outputCol: "tokens"}).head()
+ >>> # Change a parameter.
+ >>> print tokenizer.setParams(outputCol="tokens").transform(df).head()
Row(text=u'a b c', tokens=[u'a', u'b', u'c'])
+ >>> # Temporarily modify a parameter.
+ >>> print tokenizer.transform(df, {tokenizer.outputCol: "words"}).head()
+ Row(text=u'a b c', words=[u'a', u'b', u'c'])
+ >>> print tokenizer.transform(df).head()
+ Row(text=u'a b c', tokens=[u'a', u'b', u'c'])
+ >>> # Must use keyword arguments to specify params.
+ >>> tokenizer.setParams("text")
+ Traceback (most recent call last):
+ ...
+ TypeError: Method setParams forces keyword arguments.
"""
_java_class = "org.apache.spark.ml.feature.Tokenizer"
+ @keyword_only
+ def __init__(self, inputCol="input", outputCol="output"):
+ """
+ __init__(self, inputCol="input", outputCol="output")
+ """
+ super(Tokenizer, self).__init__()
+ kwargs = self.__init__._input_kwargs
+ self.setParams(**kwargs)
+
+ @keyword_only
+ def setParams(self, inputCol="input", outputCol="output"):
+ """
+ setParams(self, inputCol="input", outputCol="output")
+ Sets params for this Tokenizer.
+ """
+ kwargs = self.setParams._input_kwargs
+ return self._set_params(**kwargs)
+
@inherit_doc
class HashingTF(JavaTransformer, HasInputCol, HasOutputCol, HasNumFeatures):
@@ -49,20 +76,37 @@ class HashingTF(JavaTransformer, HasInputCol, HasOutputCol, HasNumFeatures):
hashing trick.
>>> from pyspark.sql import Row
- >>> dataset = sqlCtx.inferSchema(sc.parallelize([Row(words=["a", "b", "c"])]))
- >>> hashingTF = HashingTF() \
- .setNumFeatures(10) \
- .setInputCol("words") \
- .setOutputCol("features")
- >>> print hashingTF.transform(dataset).head().features
+ >>> df = sc.parallelize([Row(words=["a", "b", "c"])]).toDF()
+ >>> hashingTF = HashingTF(numFeatures=10, inputCol="words", outputCol="features")
+ >>> print hashingTF.transform(df).head().features
+ (10,[7,8,9],[1.0,1.0,1.0])
+ >>> print hashingTF.setParams(outputCol="freqs").transform(df).head().freqs
(10,[7,8,9],[1.0,1.0,1.0])
>>> params = {hashingTF.numFeatures: 5, hashingTF.outputCol: "vector"}
- >>> print hashingTF.transform(dataset, params).head().vector
+ >>> print hashingTF.transform(df, params).head().vector
(5,[2,3,4],[1.0,1.0,1.0])
"""
_java_class = "org.apache.spark.ml.feature.HashingTF"
+ @keyword_only
+ def __init__(self, numFeatures=1 << 18, inputCol="input", outputCol="output"):
+ """
+ __init__(self, numFeatures=1 << 18, inputCol="input", outputCol="output")
+ """
+ super(HashingTF, self).__init__()
+ kwargs = self.__init__._input_kwargs
+ self.setParams(**kwargs)
+
+ @keyword_only
+ def setParams(self, numFeatures=1 << 18, inputCol="input", outputCol="output"):
+ """
+ setParams(self, numFeatures=1 << 18, inputCol="input", outputCol="output")
+ Sets params for this HashingTF.
+ """
+ kwargs = self.setParams._input_kwargs
+ return self._set_params(**kwargs)
+
if __name__ == "__main__":
import doctest
diff --git a/python/pyspark/ml/param/__init__.py b/python/pyspark/ml/param/__init__.py
index 5566792cea..e3a53dd780 100644
--- a/python/pyspark/ml/param/__init__.py
+++ b/python/pyspark/ml/param/__init__.py
@@ -80,3 +80,11 @@ class Params(Identifiable):
dummy = Params()
dummy.uid = "undefined"
return dummy
+
+ def _set_params(self, **kwargs):
+ """
+ Sets params.
+ """
+ for param, value in kwargs.iteritems():
+ self.paramMap[getattr(self, param)] = value
+ return self
diff --git a/python/pyspark/ml/pipeline.py b/python/pyspark/ml/pipeline.py
index 2d239f8c80..18d8a58f35 100644
--- a/python/pyspark/ml/pipeline.py
+++ b/python/pyspark/ml/pipeline.py
@@ -18,7 +18,7 @@
from abc import ABCMeta, abstractmethod
from pyspark.ml.param import Param, Params
-from pyspark.ml.util import inherit_doc
+from pyspark.ml.util import inherit_doc, keyword_only
__all__ = ['Estimator', 'Transformer', 'Pipeline', 'PipelineModel']
@@ -89,10 +89,16 @@ class Pipeline(Estimator):
identity transformer.
"""
- def __init__(self):
+ @keyword_only
+ def __init__(self, stages=[]):
+ """
+ __init__(self, stages=[])
+ """
super(Pipeline, self).__init__()
#: Param for pipeline stages.
self.stages = Param(self, "stages", "pipeline stages")
+ kwargs = self.__init__._input_kwargs
+ self.setParams(**kwargs)
def setStages(self, value):
"""
@@ -110,6 +116,15 @@ class Pipeline(Estimator):
if self.stages in self.paramMap:
return self.paramMap[self.stages]
+ @keyword_only
+ def setParams(self, stages=[]):
+ """
+ setParams(self, stages=[])
+ Sets params for Pipeline.
+ """
+ kwargs = self.setParams._input_kwargs
+ return self._set_params(**kwargs)
+
def fit(self, dataset, params={}):
paramMap = self._merge_params(params)
stages = paramMap[self.stages]
diff --git a/python/pyspark/ml/util.py b/python/pyspark/ml/util.py
index b1caa84b63..81d3f0882b 100644
--- a/python/pyspark/ml/util.py
+++ b/python/pyspark/ml/util.py
@@ -15,6 +15,7 @@
# limitations under the License.
#
+from functools import wraps
import uuid
@@ -32,6 +33,20 @@ def inherit_doc(cls):
return cls
+def keyword_only(func):
+ """
+ A decorator that forces keyword arguments in the wrapped method
+ and saves actual input keyword arguments in `_input_kwargs`.
+ """
+ @wraps(func)
+ def wrapper(*args, **kwargs):
+ if len(args) > 1:
+ raise TypeError("Method %s forces keyword arguments." % func.__name__)
+ wrapper._input_kwargs = kwargs
+ return func(*args, **kwargs)
+ return wrapper
+
+
class Identifiable(object):
"""
Object with a unique ID.