aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming/mqtt.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/streaming/mqtt.py')
-rw-r--r--python/pyspark/streaming/mqtt.py13
1 files changed, 5 insertions, 8 deletions
diff --git a/python/pyspark/streaming/mqtt.py b/python/pyspark/streaming/mqtt.py
index 388e9526ba..8848a70c75 100644
--- a/python/pyspark/streaming/mqtt.py
+++ b/python/pyspark/streaming/mqtt.py
@@ -38,18 +38,15 @@ class MQTTUtils(object):
:param storageLevel: RDD storage level.
:return: A DStream object
"""
- jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
-
try:
- helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
- .loadClass("org.apache.spark.streaming.mqtt.MQTTUtilsPythonHelper")
- helper = helperClass.newInstance()
- jstream = helper.createStream(ssc._jssc, brokerUrl, topic, jlevel)
- except Py4JJavaError as e:
- if 'ClassNotFoundException' in str(e.java_exception):
+ helper = ssc._jvm.org.apache.spark.streaming.mqtt.MQTTUtilsPythonHelper()
+ except TypeError as e:
+ if str(e) == "'JavaPackage' object is not callable":
MQTTUtils._printErrorMsg(ssc.sparkContext)
raise
+ jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
+ jstream = helper.createStream(ssc._jssc, brokerUrl, topic, jlevel)
return DStream(jstream, ssc, UTF8Deserializer())
@staticmethod