aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming/kafka.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/streaming/kafka.py')
-rw-r--r--python/pyspark/streaming/kafka.py8
1 files changed, 4 insertions, 4 deletions
diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py
index f083ed149e..7a7b6e1d9a 100644
--- a/python/pyspark/streaming/kafka.py
+++ b/python/pyspark/streaming/kafka.py
@@ -67,10 +67,10 @@ class KafkaUtils(object):
.loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper")
helper = helperClass.newInstance()
jstream = helper.createStream(ssc._jssc, jparam, jtopics, jlevel)
- except Py4JJavaError, e:
+ except Py4JJavaError as e:
# TODO: use --jar once it also work on driver
if 'ClassNotFoundException' in str(e.java_exception):
- print """
+ print("""
________________________________________________________________________________________________
Spark Streaming's Kafka libraries not found in class path. Try one of the following.
@@ -88,8 +88,8 @@ ________________________________________________________________________________
________________________________________________________________________________________________
-""" % (ssc.sparkContext.version, ssc.sparkContext.version)
+""" % (ssc.sparkContext.version, ssc.sparkContext.version))
raise e
ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
stream = DStream(jstream, ssc, ser)
- return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v)))
+ return stream.map(lambda k_v: (keyDecoder(k_v[0]), valueDecoder(k_v[1])))