diff options
Diffstat (limited to 'python/pyspark/streaming/mqtt.py')
-rw-r--r-- | python/pyspark/streaming/mqtt.py | 13 |
1 files changed, 5 insertions, 8 deletions
diff --git a/python/pyspark/streaming/mqtt.py b/python/pyspark/streaming/mqtt.py index 388e9526ba..8848a70c75 100644 --- a/python/pyspark/streaming/mqtt.py +++ b/python/pyspark/streaming/mqtt.py @@ -38,18 +38,15 @@ class MQTTUtils(object): :param storageLevel: RDD storage level. :return: A DStream object """ - jlevel = ssc._sc._getJavaStorageLevel(storageLevel) - try: - helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \ - .loadClass("org.apache.spark.streaming.mqtt.MQTTUtilsPythonHelper") - helper = helperClass.newInstance() - jstream = helper.createStream(ssc._jssc, brokerUrl, topic, jlevel) - except Py4JJavaError as e: - if 'ClassNotFoundException' in str(e.java_exception): + helper = ssc._jvm.org.apache.spark.streaming.mqtt.MQTTUtilsPythonHelper() + except TypeError as e: + if str(e) == "'JavaPackage' object is not callable": MQTTUtils._printErrorMsg(ssc.sparkContext) raise + jlevel = ssc._sc._getJavaStorageLevel(storageLevel) + jstream = helper.createStream(ssc._jssc, brokerUrl, topic, jlevel) return DStream(jstream, ssc, UTF8Deserializer()) @staticmethod |