aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming/kafka.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/streaming/kafka.py')
-rw-r--r--python/pyspark/streaming/kafka.py10
1 files changed, 3 insertions, 7 deletions
diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py
index a70b99249d..02a88699a2 100644
--- a/python/pyspark/streaming/kafka.py
+++ b/python/pyspark/streaming/kafka.py
@@ -192,13 +192,9 @@ class KafkaUtils(object):
@staticmethod
def _get_helper(sc):
try:
- # Use KafkaUtilsPythonHelper to access Scala's KafkaUtils (see SPARK-6027)
- helperClass = sc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
- .loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper")
- return helperClass.newInstance()
- except Py4JJavaError as e:
- # TODO: use --jar once it also work on driver
- if 'ClassNotFoundException' in str(e.java_exception):
+ return sc._jvm.org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper()
+ except TypeError as e:
+ if str(e) == "'JavaPackage' object is not callable":
KafkaUtils._printErrorMsg(sc)
raise