aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--examples/src/main/python/ml/simple_text_classification_pipeline.py44
-rw-r--r--python/docs/conf.py4
-rw-r--r--python/pyspark/ml/classification.py44
-rw-r--r--python/pyspark/ml/feature.py72
-rw-r--r--python/pyspark/ml/param/__init__.py8
-rw-r--r--python/pyspark/ml/pipeline.py19
-rw-r--r--python/pyspark/ml/util.py15
7 files changed, 153 insertions, 53 deletions
diff --git a/examples/src/main/python/ml/simple_text_classification_pipeline.py b/examples/src/main/python/ml/simple_text_classification_pipeline.py
index c7df3d7b74..b4d9355b68 100644
--- a/examples/src/main/python/ml/simple_text_classification_pipeline.py
+++ b/examples/src/main/python/ml/simple_text_classification_pipeline.py
@@ -36,43 +36,33 @@ if __name__ == "__main__":
sqlCtx = SQLContext(sc)
# Prepare training documents, which are labeled.
- LabeledDocument = Row('id', 'text', 'label')
- training = sqlCtx.inferSchema(
- sc.parallelize([(0L, "a b c d e spark", 1.0),
- (1L, "b d", 0.0),
- (2L, "spark f g h", 1.0),
- (3L, "hadoop mapreduce", 0.0)])
- .map(lambda x: LabeledDocument(*x)))
+ LabeledDocument = Row("id", "text", "label")
+ training = sc.parallelize([(0L, "a b c d e spark", 1.0),
+ (1L, "b d", 0.0),
+ (2L, "spark f g h", 1.0),
+ (3L, "hadoop mapreduce", 0.0)]) \
+ .map(lambda x: LabeledDocument(*x)).toDF()
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
- tokenizer = Tokenizer() \
- .setInputCol("text") \
- .setOutputCol("words")
- hashingTF = HashingTF() \
- .setInputCol(tokenizer.getOutputCol()) \
- .setOutputCol("features")
- lr = LogisticRegression() \
- .setMaxIter(10) \
- .setRegParam(0.01)
- pipeline = Pipeline() \
- .setStages([tokenizer, hashingTF, lr])
+ tokenizer = Tokenizer(inputCol="text", outputCol="words")
+ hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
+ lr = LogisticRegression(maxIter=10, regParam=0.01)
+ pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
# Fit the pipeline to training documents.
model = pipeline.fit(training)
# Prepare test documents, which are unlabeled.
- Document = Row('id', 'text')
- test = sqlCtx.inferSchema(
- sc.parallelize([(4L, "spark i j k"),
- (5L, "l m n"),
- (6L, "mapreduce spark"),
- (7L, "apache hadoop")])
- .map(lambda x: Document(*x)))
+ Document = Row("id", "text")
+ test = sc.parallelize([(4L, "spark i j k"),
+ (5L, "l m n"),
+ (6L, "mapreduce spark"),
+ (7L, "apache hadoop")]) \
+ .map(lambda x: Document(*x)).toDF()
# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
- prediction.registerTempTable("prediction")
- selected = sqlCtx.sql("SELECT id, text, prediction from prediction")
+ selected = prediction.select("id", "text", "prediction")
for row in selected.collect():
print row
diff --git a/python/docs/conf.py b/python/docs/conf.py
index b00dce95d6..cbbf7ffb08 100644
--- a/python/docs/conf.py
+++ b/python/docs/conf.py
@@ -97,6 +97,10 @@ pygments_style = 'sphinx'
# If true, keep warnings as "system message" paragraphs in the built documents.
#keep_warnings = False
+# -- Options for autodoc --------------------------------------------------
+
+# Look at the first line of the docstring for function and method signatures.
+autodoc_docstring_signature = True
# -- Options for HTML output ----------------------------------------------
diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py
index 6bd2aa8e47..b6de7493d7 100644
--- a/python/pyspark/ml/classification.py
+++ b/python/pyspark/ml/classification.py
@@ -15,7 +15,7 @@
# limitations under the License.
#
-from pyspark.ml.util import inherit_doc
+from pyspark.ml.util import inherit_doc, keyword_only
from pyspark.ml.wrapper import JavaEstimator, JavaModel
from pyspark.ml.param.shared import HasFeaturesCol, HasLabelCol, HasPredictionCol, HasMaxIter,\
HasRegParam
@@ -32,22 +32,46 @@ class LogisticRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredicti
>>> from pyspark.sql import Row
>>> from pyspark.mllib.linalg import Vectors
- >>> dataset = sqlCtx.inferSchema(sc.parallelize([ \
- Row(label=1.0, features=Vectors.dense(1.0)), \
- Row(label=0.0, features=Vectors.sparse(1, [], []))]))
- >>> lr = LogisticRegression() \
- .setMaxIter(5) \
- .setRegParam(0.01)
- >>> model = lr.fit(dataset)
- >>> test0 = sqlCtx.inferSchema(sc.parallelize([Row(features=Vectors.dense(-1.0))]))
+ >>> df = sc.parallelize([
+ ... Row(label=1.0, features=Vectors.dense(1.0)),
+ ... Row(label=0.0, features=Vectors.sparse(1, [], []))]).toDF()
+ >>> lr = LogisticRegression(maxIter=5, regParam=0.01)
+ >>> model = lr.fit(df)
+ >>> test0 = sc.parallelize([Row(features=Vectors.dense(-1.0))]).toDF()
>>> print model.transform(test0).head().prediction
0.0
- >>> test1 = sqlCtx.inferSchema(sc.parallelize([Row(features=Vectors.sparse(1, [0], [1.0]))]))
+ >>> test1 = sc.parallelize([Row(features=Vectors.sparse(1, [0], [1.0]))]).toDF()
>>> print model.transform(test1).head().prediction
1.0
+ >>> lr.setParams("vector")
+ Traceback (most recent call last):
+ ...
+ TypeError: Method setParams forces keyword arguments.
"""
_java_class = "org.apache.spark.ml.classification.LogisticRegression"
+ @keyword_only
+ def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction",
+ maxIter=100, regParam=0.1):
+ """
+ __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
+ maxIter=100, regParam=0.1)
+ """
+ super(LogisticRegression, self).__init__()
+ kwargs = self.__init__._input_kwargs
+ self.setParams(**kwargs)
+
+ @keyword_only
+ def setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction",
+ maxIter=100, regParam=0.1):
+ """
+ setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
+ maxIter=100, regParam=0.1)
+ Sets params for logistic regression.
+ """
+ kwargs = self.setParams._input_kwargs
+ return self._set_params(**kwargs)
+
def _create_model(self, java_model):
return LogisticRegressionModel(java_model)
diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py
index e088acd0ca..f1ddbb478d 100644
--- a/python/pyspark/ml/feature.py
+++ b/python/pyspark/ml/feature.py
@@ -16,7 +16,7 @@
#
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, HasNumFeatures
-from pyspark.ml.util import inherit_doc
+from pyspark.ml.util import inherit_doc, keyword_only
from pyspark.ml.wrapper import JavaTransformer
__all__ = ['Tokenizer', 'HashingTF']
@@ -29,18 +29,45 @@ class Tokenizer(JavaTransformer, HasInputCol, HasOutputCol):
splits it by white spaces.
>>> from pyspark.sql import Row
- >>> dataset = sqlCtx.inferSchema(sc.parallelize([Row(text="a b c")]))
- >>> tokenizer = Tokenizer() \
- .setInputCol("text") \
- .setOutputCol("words")
- >>> print tokenizer.transform(dataset).head()
+ >>> df = sc.parallelize([Row(text="a b c")]).toDF()
+ >>> tokenizer = Tokenizer(inputCol="text", outputCol="words")
+ >>> print tokenizer.transform(df).head()
Row(text=u'a b c', words=[u'a', u'b', u'c'])
- >>> print tokenizer.transform(dataset, {tokenizer.outputCol: "tokens"}).head()
+ >>> # Change a parameter.
+ >>> print tokenizer.setParams(outputCol="tokens").transform(df).head()
Row(text=u'a b c', tokens=[u'a', u'b', u'c'])
+ >>> # Temporarily modify a parameter.
+ >>> print tokenizer.transform(df, {tokenizer.outputCol: "words"}).head()
+ Row(text=u'a b c', words=[u'a', u'b', u'c'])
+ >>> print tokenizer.transform(df).head()
+ Row(text=u'a b c', tokens=[u'a', u'b', u'c'])
+ >>> # Must use keyword arguments to specify params.
+ >>> tokenizer.setParams("text")
+ Traceback (most recent call last):
+ ...
+ TypeError: Method setParams forces keyword arguments.
"""
_java_class = "org.apache.spark.ml.feature.Tokenizer"
+ @keyword_only
+ def __init__(self, inputCol="input", outputCol="output"):
+ """
+ __init__(self, inputCol="input", outputCol="output")
+ """
+ super(Tokenizer, self).__init__()
+ kwargs = self.__init__._input_kwargs
+ self.setParams(**kwargs)
+
+ @keyword_only
+ def setParams(self, inputCol="input", outputCol="output"):
+ """
+ setParams(self, inputCol="input", outputCol="output")
+ Sets params for this Tokenizer.
+ """
+ kwargs = self.setParams._input_kwargs
+ return self._set_params(**kwargs)
+
@inherit_doc
class HashingTF(JavaTransformer, HasInputCol, HasOutputCol, HasNumFeatures):
@@ -49,20 +76,37 @@ class HashingTF(JavaTransformer, HasInputCol, HasOutputCol, HasNumFeatures):
hashing trick.
>>> from pyspark.sql import Row
- >>> dataset = sqlCtx.inferSchema(sc.parallelize([Row(words=["a", "b", "c"])]))
- >>> hashingTF = HashingTF() \
- .setNumFeatures(10) \
- .setInputCol("words") \
- .setOutputCol("features")
- >>> print hashingTF.transform(dataset).head().features
+ >>> df = sc.parallelize([Row(words=["a", "b", "c"])]).toDF()
+ >>> hashingTF = HashingTF(numFeatures=10, inputCol="words", outputCol="features")
+ >>> print hashingTF.transform(df).head().features
+ (10,[7,8,9],[1.0,1.0,1.0])
+ >>> print hashingTF.setParams(outputCol="freqs").transform(df).head().freqs
(10,[7,8,9],[1.0,1.0,1.0])
>>> params = {hashingTF.numFeatures: 5, hashingTF.outputCol: "vector"}
- >>> print hashingTF.transform(dataset, params).head().vector
+ >>> print hashingTF.transform(df, params).head().vector
(5,[2,3,4],[1.0,1.0,1.0])
"""
_java_class = "org.apache.spark.ml.feature.HashingTF"
+ @keyword_only
+ def __init__(self, numFeatures=1 << 18, inputCol="input", outputCol="output"):
+ """
+ __init__(self, numFeatures=1 << 18, inputCol="input", outputCol="output")
+ """
+ super(HashingTF, self).__init__()
+ kwargs = self.__init__._input_kwargs
+ self.setParams(**kwargs)
+
+ @keyword_only
+ def setParams(self, numFeatures=1 << 18, inputCol="input", outputCol="output"):
+ """
+ setParams(self, numFeatures=1 << 18, inputCol="input", outputCol="output")
+ Sets params for this HashingTF.
+ """
+ kwargs = self.setParams._input_kwargs
+ return self._set_params(**kwargs)
+
if __name__ == "__main__":
import doctest
diff --git a/python/pyspark/ml/param/__init__.py b/python/pyspark/ml/param/__init__.py
index 5566792cea..e3a53dd780 100644
--- a/python/pyspark/ml/param/__init__.py
+++ b/python/pyspark/ml/param/__init__.py
@@ -80,3 +80,11 @@ class Params(Identifiable):
dummy = Params()
dummy.uid = "undefined"
return dummy
+
+ def _set_params(self, **kwargs):
+ """
+ Sets params.
+ """
+ for param, value in kwargs.iteritems():
+ self.paramMap[getattr(self, param)] = value
+ return self
diff --git a/python/pyspark/ml/pipeline.py b/python/pyspark/ml/pipeline.py
index 2d239f8c80..18d8a58f35 100644
--- a/python/pyspark/ml/pipeline.py
+++ b/python/pyspark/ml/pipeline.py
@@ -18,7 +18,7 @@
from abc import ABCMeta, abstractmethod
from pyspark.ml.param import Param, Params
-from pyspark.ml.util import inherit_doc
+from pyspark.ml.util import inherit_doc, keyword_only
__all__ = ['Estimator', 'Transformer', 'Pipeline', 'PipelineModel']
@@ -89,10 +89,16 @@ class Pipeline(Estimator):
identity transformer.
"""
- def __init__(self):
+ @keyword_only
+ def __init__(self, stages=[]):
+ """
+ __init__(self, stages=[])
+ """
super(Pipeline, self).__init__()
#: Param for pipeline stages.
self.stages = Param(self, "stages", "pipeline stages")
+ kwargs = self.__init__._input_kwargs
+ self.setParams(**kwargs)
def setStages(self, value):
"""
@@ -110,6 +116,15 @@ class Pipeline(Estimator):
if self.stages in self.paramMap:
return self.paramMap[self.stages]
+ @keyword_only
+ def setParams(self, stages=[]):
+ """
+ setParams(self, stages=[])
+ Sets params for Pipeline.
+ """
+ kwargs = self.setParams._input_kwargs
+ return self._set_params(**kwargs)
+
def fit(self, dataset, params={}):
paramMap = self._merge_params(params)
stages = paramMap[self.stages]
diff --git a/python/pyspark/ml/util.py b/python/pyspark/ml/util.py
index b1caa84b63..81d3f0882b 100644
--- a/python/pyspark/ml/util.py
+++ b/python/pyspark/ml/util.py
@@ -15,6 +15,7 @@
# limitations under the License.
#
+from functools import wraps
import uuid
@@ -32,6 +33,20 @@ def inherit_doc(cls):
return cls
+def keyword_only(func):
+ """
+ A decorator that forces keyword arguments in the wrapped method
+ and saves actual input keyword arguments in `_input_kwargs`.
+ """
+ @wraps(func)
+ def wrapper(*args, **kwargs):
+ if len(args) > 1:
+ raise TypeError("Method %s forces keyword arguments." % func.__name__)
+ wrapper._input_kwargs = kwargs
+ return func(*args, **kwargs)
+ return wrapper
+
+
class Identifiable(object):
"""
Object with a unique ID.