aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming/kafka.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/streaming/kafka.py')
-rw-r--r--python/pyspark/streaming/kafka.py7
1 files changed, 2 insertions, 5 deletions
diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py
index 7a7b6e1d9a..8d610d6569 100644
--- a/python/pyspark/streaming/kafka.py
+++ b/python/pyspark/streaming/kafka.py
@@ -15,8 +15,7 @@
# limitations under the License.
#
-from py4j.java_collections import MapConverter
-from py4j.java_gateway import java_import, Py4JError, Py4JJavaError
+from py4j.java_gateway import Py4JJavaError
from pyspark.storagelevel import StorageLevel
from pyspark.serializers import PairDeserializer, NoOpSerializer
@@ -57,8 +56,6 @@ class KafkaUtils(object):
})
if not isinstance(topics, dict):
raise TypeError("topics should be dict")
- jtopics = MapConverter().convert(topics, ssc.sparkContext._gateway._gateway_client)
- jparam = MapConverter().convert(kafkaParams, ssc.sparkContext._gateway._gateway_client)
jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
try:
@@ -66,7 +63,7 @@ class KafkaUtils(object):
helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
.loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper")
helper = helperClass.newInstance()
- jstream = helper.createStream(ssc._jssc, jparam, jtopics, jlevel)
+ jstream = helper.createStream(ssc._jssc, kafkaParams, topics, jlevel)
except Py4JJavaError as e:
# TODO: use --jar once it also work on driver
if 'ClassNotFoundException' in str(e.java_exception):