aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--python/pyspark/ml/evaluation.py4
-rw-r--r--python/pyspark/ml/param/__init__.py26
-rw-r--r--python/pyspark/ml/pipeline.py4
-rw-r--r--python/pyspark/ml/tuning.py8
-rw-r--r--python/pyspark/rdd.py5
-rw-r--r--python/pyspark/sql/readwriter.py8
-rw-r--r--python/pyspark/statcounter.py4
-rw-r--r--python/pyspark/streaming/kafka.py12
8 files changed, 50 insertions, 21 deletions
diff --git a/python/pyspark/ml/evaluation.py b/python/pyspark/ml/evaluation.py
index 2734092575..e23ce053ba 100644
--- a/python/pyspark/ml/evaluation.py
+++ b/python/pyspark/ml/evaluation.py
@@ -46,7 +46,7 @@ class Evaluator(Params):
"""
raise NotImplementedError()
- def evaluate(self, dataset, params={}):
+ def evaluate(self, dataset, params=None):
"""
Evaluates the output with optional parameters.
@@ -56,6 +56,8 @@ class Evaluator(Params):
params
:return: metric
"""
+ if params is None:
+ params = dict()
if isinstance(params, dict):
if params:
return self.copy(params)._evaluate(dataset)
diff --git a/python/pyspark/ml/param/__init__.py b/python/pyspark/ml/param/__init__.py
index 7845536161..eeeac49b21 100644
--- a/python/pyspark/ml/param/__init__.py
+++ b/python/pyspark/ml/param/__init__.py
@@ -60,14 +60,16 @@ class Params(Identifiable):
__metaclass__ = ABCMeta
- #: internal param map for user-supplied values param map
- _paramMap = {}
+ def __init__(self):
+ super(Params, self).__init__()
+ #: internal param map for user-supplied values param map
+ self._paramMap = {}
- #: internal param map for default values
- _defaultParamMap = {}
+ #: internal param map for default values
+ self._defaultParamMap = {}
- #: value returned by :py:func:`params`
- _params = None
+ #: value returned by :py:func:`params`
+ self._params = None
@property
def params(self):
@@ -155,7 +157,7 @@ class Params(Identifiable):
else:
return self._defaultParamMap[param]
- def extractParamMap(self, extra={}):
+ def extractParamMap(self, extra=None):
"""
Extracts the embedded default param values and user-supplied
values, and then merges them with extra values from input into
@@ -165,12 +167,14 @@ class Params(Identifiable):
:param extra: extra param values
:return: merged param map
"""
+ if extra is None:
+ extra = dict()
paramMap = self._defaultParamMap.copy()
paramMap.update(self._paramMap)
paramMap.update(extra)
return paramMap
- def copy(self, extra={}):
+ def copy(self, extra=None):
"""
Creates a copy of this instance with the same uid and some
extra params. The default implementation creates a
@@ -181,6 +185,8 @@ class Params(Identifiable):
:param extra: Extra parameters to copy to the new instance
:return: Copy of this instance
"""
+ if extra is None:
+ extra = dict()
that = copy.copy(self)
that._paramMap = self.extractParamMap(extra)
return that
@@ -233,7 +239,7 @@ class Params(Identifiable):
self._defaultParamMap[getattr(self, param)] = value
return self
- def _copyValues(self, to, extra={}):
+ def _copyValues(self, to, extra=None):
"""
Copies param values from this instance to another instance for
params shared by them.
@@ -241,6 +247,8 @@ class Params(Identifiable):
:param extra: extra params to be copied
:return: the target instance with param values copied
"""
+ if extra is None:
+ extra = dict()
paramMap = self.extractParamMap(extra)
for p in self.params:
if p in paramMap and to.hasParam(p.name):
diff --git a/python/pyspark/ml/pipeline.py b/python/pyspark/ml/pipeline.py
index 9889f56cac..13cf2b0f7b 100644
--- a/python/pyspark/ml/pipeline.py
+++ b/python/pyspark/ml/pipeline.py
@@ -141,7 +141,7 @@ class Pipeline(Estimator):
@keyword_only
def __init__(self, stages=None):
"""
- __init__(self, stages=[])
+ __init__(self, stages=None)
"""
if stages is None:
stages = []
@@ -170,7 +170,7 @@ class Pipeline(Estimator):
@keyword_only
def setParams(self, stages=None):
"""
- setParams(self, stages=[])
+ setParams(self, stages=None)
Sets params for Pipeline.
"""
if stages is None:
diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py
index 0bf988fd72..dcfee6a317 100644
--- a/python/pyspark/ml/tuning.py
+++ b/python/pyspark/ml/tuning.py
@@ -227,7 +227,9 @@ class CrossValidator(Estimator):
bestModel = est.fit(dataset, epm[bestIndex])
return CrossValidatorModel(bestModel)
- def copy(self, extra={}):
+ def copy(self, extra=None):
+ if extra is None:
+ extra = dict()
newCV = Params.copy(self, extra)
if self.isSet(self.estimator):
newCV.setEstimator(self.getEstimator().copy(extra))
@@ -250,7 +252,7 @@ class CrossValidatorModel(Model):
def _transform(self, dataset):
return self.bestModel.transform(dataset)
- def copy(self, extra={}):
+ def copy(self, extra=None):
"""
Creates a copy of this instance with a randomly generated uid
and some extra params. This copies the underlying bestModel,
@@ -259,6 +261,8 @@ class CrossValidatorModel(Model):
:param extra: Extra parameters to copy to the new instance
:return: Copy of this instance
"""
+ if extra is None:
+ extra = dict()
return CrossValidatorModel(self.bestModel.copy(extra))
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index fa8e0a0574..9ef60a7e2c 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -700,7 +700,7 @@ class RDD(object):
return self.map(lambda x: (f(x), x)).groupByKey(numPartitions)
@ignore_unicode_prefix
- def pipe(self, command, env={}, checkCode=False):
+ def pipe(self, command, env=None, checkCode=False):
"""
Return an RDD created by piping elements to a forked external process.
@@ -709,6 +709,9 @@ class RDD(object):
:param checkCode: whether or not to check the return value of the shell command.
"""
+ if env is None:
+ env = dict()
+
def func(iterator):
pipe = Popen(
shlex.split(command), env=env, stdin=PIPE, stdout=PIPE)
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index bf6ac084bb..78247c8fa7 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -182,7 +182,7 @@ class DataFrameReader(object):
@since(1.4)
def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None,
- predicates=None, properties={}):
+ predicates=None, properties=None):
"""
Construct a :class:`DataFrame` representing the database table accessible
via JDBC URL `url` named `table` and connection `properties`.
@@ -208,6 +208,8 @@ class DataFrameReader(object):
should be included.
:return: a DataFrame
"""
+ if properties is None:
+ properties = dict()
jprop = JavaClass("java.util.Properties", self._sqlContext._sc._gateway._gateway_client)()
for k in properties:
jprop.setProperty(k, properties[k])
@@ -427,7 +429,7 @@ class DataFrameWriter(object):
self._jwrite.orc(path)
@since(1.4)
- def jdbc(self, url, table, mode=None, properties={}):
+ def jdbc(self, url, table, mode=None, properties=None):
"""Saves the content of the :class:`DataFrame` to a external database table via JDBC.
.. note:: Don't create too many partitions in parallel on a large cluster;\
@@ -445,6 +447,8 @@ class DataFrameWriter(object):
arbitrary string tag/value. Normally at least a
"user" and "password" property should be included.
"""
+ if properties is None:
+ properties = dict()
jprop = JavaClass("java.util.Properties", self._sqlContext._sc._gateway._gateway_client)()
for k in properties:
jprop.setProperty(k, properties[k])
diff --git a/python/pyspark/statcounter.py b/python/pyspark/statcounter.py
index 944fa414b0..0fee3b2096 100644
--- a/python/pyspark/statcounter.py
+++ b/python/pyspark/statcounter.py
@@ -30,7 +30,9 @@ except ImportError:
class StatCounter(object):
- def __init__(self, values=[]):
+ def __init__(self, values=None):
+ if values is None:
+ values = list()
self.n = 0 # Running count of our values
self.mu = 0.0 # Running mean of our values
self.m2 = 0.0 # Running variance numerator (sum of (x - mean)^2)
diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py
index 33dd596335..dc5b7fd878 100644
--- a/python/pyspark/streaming/kafka.py
+++ b/python/pyspark/streaming/kafka.py
@@ -35,7 +35,7 @@ def utf8_decoder(s):
class KafkaUtils(object):
@staticmethod
- def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={},
+ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams=None,
storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2,
keyDecoder=utf8_decoder, valueDecoder=utf8_decoder):
"""
@@ -52,6 +52,8 @@ class KafkaUtils(object):
:param valueDecoder: A function used to decode value (default is utf8_decoder)
:return: A DStream object
"""
+ if kafkaParams is None:
+ kafkaParams = dict()
kafkaParams.update({
"zookeeper.connect": zkQuorum,
"group.id": groupId,
@@ -77,7 +79,7 @@ class KafkaUtils(object):
return stream.map(lambda k_v: (keyDecoder(k_v[0]), valueDecoder(k_v[1])))
@staticmethod
- def createDirectStream(ssc, topics, kafkaParams, fromOffsets={},
+ def createDirectStream(ssc, topics, kafkaParams, fromOffsets=None,
keyDecoder=utf8_decoder, valueDecoder=utf8_decoder):
"""
.. note:: Experimental
@@ -105,6 +107,8 @@ class KafkaUtils(object):
:param valueDecoder: A function used to decode value (default is utf8_decoder).
:return: A DStream object
"""
+ if fromOffsets is None:
+ fromOffsets = dict()
if not isinstance(topics, list):
raise TypeError("topics should be list")
if not isinstance(kafkaParams, dict):
@@ -129,7 +133,7 @@ class KafkaUtils(object):
return KafkaDStream(stream._jdstream, ssc, stream._jrdd_deserializer)
@staticmethod
- def createRDD(sc, kafkaParams, offsetRanges, leaders={},
+ def createRDD(sc, kafkaParams, offsetRanges, leaders=None,
keyDecoder=utf8_decoder, valueDecoder=utf8_decoder):
"""
.. note:: Experimental
@@ -145,6 +149,8 @@ class KafkaUtils(object):
:param valueDecoder: A function used to decode value (default is utf8_decoder)
:return: A RDD object
"""
+ if leaders is None:
+ leaders = dict()
if not isinstance(kafkaParams, dict):
raise TypeError("kafkaParams should be dict")
if not isinstance(offsetRanges, list):