diff options
author | MechCoder <manojkumarsivaraj334@gmail.com> | 2015-08-14 12:46:05 -0700 |
---|---|---|
committer | Xiangrui Meng <meng@databricks.com> | 2015-08-14 12:46:05 -0700 |
commit | ffa05c84fe75663fc33f3d954d1cb1e084ab3280 (patch) | |
tree | eb5599d7c52bf7ddcd72d340952bca83e169c2d0 /python/pyspark/streaming | |
parent | ece00566e4d5f38585f2810bef38e526cae7d25e (diff) | |
download | spark-ffa05c84fe75663fc33f3d954d1cb1e084ab3280.tar.gz spark-ffa05c84fe75663fc33f3d954d1cb1e084ab3280.tar.bz2 spark-ffa05c84fe75663fc33f3d954d1cb1e084ab3280.zip |
[SPARK-9828] [PYSPARK] Mutable values should not be default arguments
Author: MechCoder <manojkumarsivaraj334@gmail.com>
Closes #8110 from MechCoder/spark-9828.
Diffstat (limited to 'python/pyspark/streaming')
-rw-r--r-- | python/pyspark/streaming/kafka.py | 12 |
1 files changed, 9 insertions, 3 deletions
diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py index 33dd596335..dc5b7fd878 100644 --- a/python/pyspark/streaming/kafka.py +++ b/python/pyspark/streaming/kafka.py @@ -35,7 +35,7 @@ def utf8_decoder(s): class KafkaUtils(object): @staticmethod - def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={}, + def createStream(ssc, zkQuorum, groupId, topics, kafkaParams=None, storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2, keyDecoder=utf8_decoder, valueDecoder=utf8_decoder): """ @@ -52,6 +52,8 @@ class KafkaUtils(object): :param valueDecoder: A function used to decode value (default is utf8_decoder) :return: A DStream object """ + if kafkaParams is None: + kafkaParams = dict() kafkaParams.update({ "zookeeper.connect": zkQuorum, "group.id": groupId, @@ -77,7 +79,7 @@ class KafkaUtils(object): return stream.map(lambda k_v: (keyDecoder(k_v[0]), valueDecoder(k_v[1]))) @staticmethod - def createDirectStream(ssc, topics, kafkaParams, fromOffsets={}, + def createDirectStream(ssc, topics, kafkaParams, fromOffsets=None, keyDecoder=utf8_decoder, valueDecoder=utf8_decoder): """ .. note:: Experimental @@ -105,6 +107,8 @@ class KafkaUtils(object): :param valueDecoder: A function used to decode value (default is utf8_decoder). :return: A DStream object """ + if fromOffsets is None: + fromOffsets = dict() if not isinstance(topics, list): raise TypeError("topics should be list") if not isinstance(kafkaParams, dict): @@ -129,7 +133,7 @@ class KafkaUtils(object): return KafkaDStream(stream._jdstream, ssc, stream._jrdd_deserializer) @staticmethod - def createRDD(sc, kafkaParams, offsetRanges, leaders={}, + def createRDD(sc, kafkaParams, offsetRanges, leaders=None, keyDecoder=utf8_decoder, valueDecoder=utf8_decoder): """ .. note:: Experimental @@ -145,6 +149,8 @@ class KafkaUtils(object): :param valueDecoder: A function used to decode value (default is utf8_decoder) :return: A RDD object """ + if leaders is None: + leaders = dict() if not isinstance(kafkaParams, dict): raise TypeError("kafkaParams should be dict") if not isinstance(offsetRanges, list): |