aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming
diff options
context:
space:
mode:
authorMechCoder <manojkumarsivaraj334@gmail.com>2015-08-14 12:46:05 -0700
committerXiangrui Meng <meng@databricks.com>2015-08-14 12:46:05 -0700
commitffa05c84fe75663fc33f3d954d1cb1e084ab3280 (patch)
treeeb5599d7c52bf7ddcd72d340952bca83e169c2d0 /python/pyspark/streaming
parentece00566e4d5f38585f2810bef38e526cae7d25e (diff)
downloadspark-ffa05c84fe75663fc33f3d954d1cb1e084ab3280.tar.gz
spark-ffa05c84fe75663fc33f3d954d1cb1e084ab3280.tar.bz2
spark-ffa05c84fe75663fc33f3d954d1cb1e084ab3280.zip
[SPARK-9828] [PYSPARK] Mutable values should not be default arguments
Author: MechCoder <manojkumarsivaraj334@gmail.com> Closes #8110 from MechCoder/spark-9828.
Diffstat (limited to 'python/pyspark/streaming')
-rw-r--r--python/pyspark/streaming/kafka.py12
1 files changed, 9 insertions, 3 deletions
diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py
index 33dd596335..dc5b7fd878 100644
--- a/python/pyspark/streaming/kafka.py
+++ b/python/pyspark/streaming/kafka.py
@@ -35,7 +35,7 @@ def utf8_decoder(s):
class KafkaUtils(object):
@staticmethod
- def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={},
+ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams=None,
storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2,
keyDecoder=utf8_decoder, valueDecoder=utf8_decoder):
"""
@@ -52,6 +52,8 @@ class KafkaUtils(object):
:param valueDecoder: A function used to decode value (default is utf8_decoder)
:return: A DStream object
"""
+ if kafkaParams is None:
+ kafkaParams = dict()
kafkaParams.update({
"zookeeper.connect": zkQuorum,
"group.id": groupId,
@@ -77,7 +79,7 @@ class KafkaUtils(object):
return stream.map(lambda k_v: (keyDecoder(k_v[0]), valueDecoder(k_v[1])))
@staticmethod
- def createDirectStream(ssc, topics, kafkaParams, fromOffsets={},
+ def createDirectStream(ssc, topics, kafkaParams, fromOffsets=None,
keyDecoder=utf8_decoder, valueDecoder=utf8_decoder):
"""
.. note:: Experimental
@@ -105,6 +107,8 @@ class KafkaUtils(object):
:param valueDecoder: A function used to decode value (default is utf8_decoder).
:return: A DStream object
"""
+ if fromOffsets is None:
+ fromOffsets = dict()
if not isinstance(topics, list):
raise TypeError("topics should be list")
if not isinstance(kafkaParams, dict):
@@ -129,7 +133,7 @@ class KafkaUtils(object):
return KafkaDStream(stream._jdstream, ssc, stream._jrdd_deserializer)
@staticmethod
- def createRDD(sc, kafkaParams, offsetRanges, leaders={},
+ def createRDD(sc, kafkaParams, offsetRanges, leaders=None,
keyDecoder=utf8_decoder, valueDecoder=utf8_decoder):
"""
.. note:: Experimental
@@ -145,6 +149,8 @@ class KafkaUtils(object):
:param valueDecoder: A function used to decode value (default is utf8_decoder)
:return: A RDD object
"""
+ if leaders is None:
+ leaders = dict()
if not isinstance(kafkaParams, dict):
raise TypeError("kafkaParams should be dict")
if not isinstance(offsetRanges, list):