aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming/kafka.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/streaming/kafka.py')
-rw-r--r--python/pyspark/streaming/kafka.py42
1 files changed, 27 insertions, 15 deletions
diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py
index 19ad71f99d..0002dc10e8 100644
--- a/python/pyspark/streaming/kafka.py
+++ b/python/pyspark/streaming/kafka.py
@@ -16,7 +16,7 @@
#
from py4j.java_collections import MapConverter
-from py4j.java_gateway import java_import, Py4JError
+from py4j.java_gateway import java_import, Py4JError, Py4JJavaError
from pyspark.storagelevel import StorageLevel
from pyspark.serializers import PairDeserializer, NoOpSerializer
@@ -50,8 +50,6 @@ class KafkaUtils(object):
:param valueDecoder: A function used to decode value (default is utf8_decoder)
:return: A DStream object
"""
- java_import(ssc._jvm, "org.apache.spark.streaming.kafka.KafkaUtils")
-
kafkaParams.update({
"zookeeper.connect": zkQuorum,
"group.id": groupId,
@@ -63,20 +61,34 @@ class KafkaUtils(object):
jparam = MapConverter().convert(kafkaParams, ssc.sparkContext._gateway._gateway_client)
jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
- def getClassByName(name):
- return ssc._jvm.org.apache.spark.util.Utils.classForName(name)
-
try:
- array = getClassByName("[B")
- decoder = getClassByName("kafka.serializer.DefaultDecoder")
- jstream = ssc._jvm.KafkaUtils.createStream(ssc._jssc, array, array, decoder, decoder,
- jparam, jtopics, jlevel)
- except Py4JError, e:
+ # Use KafkaUtilsPythonHelper to access Scala's KafkaUtils (see SPARK-6027)
+ helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
+ .loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper")
+ helper = helperClass.newInstance()
+ jstream = helper.createStream(ssc._jssc, jparam, jtopics, jlevel)
+ except Py4JJavaError, e:
# TODO: use --jar once it also work on driver
- if not e.message or 'call a package' in e.message:
- print "No kafka package, please put the assembly jar into classpath:"
- print " $ bin/spark-submit --driver-class-path external/kafka-assembly/target/" + \
- "scala-*/spark-streaming-kafka-assembly-*.jar"
+ if 'ClassNotFoundException' in str(e.java_exception):
+ print """
+________________________________________________________________________________________________
+
+ Spark Streaming's Kafka libraries not found in class path. Try one of the following.
+
+ 1. Include the Kafka library and its dependencies with in the
+ spark-submit command as
+
+ $ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka:%s ...
+
+ 2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
+ Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-assembly, Version = %s.
+ Then, innclude the jar in the spark-submit command as
+
+ $ bin/spark-submit --jars <spark-streaming-kafka-assembly.jar> ...
+
+________________________________________________________________________________________________
+
+""" % (ssc.sparkContext.version, ssc.sparkContext.version)
raise e
ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
stream = DStream(jstream, ssc, ser)