diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/pyspark/streaming/kafka.py | 42 |
1 files changed, 27 insertions, 15 deletions
diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py index 19ad71f99d..0002dc10e8 100644 --- a/python/pyspark/streaming/kafka.py +++ b/python/pyspark/streaming/kafka.py @@ -16,7 +16,7 @@ # from py4j.java_collections import MapConverter -from py4j.java_gateway import java_import, Py4JError +from py4j.java_gateway import java_import, Py4JError, Py4JJavaError from pyspark.storagelevel import StorageLevel from pyspark.serializers import PairDeserializer, NoOpSerializer @@ -50,8 +50,6 @@ class KafkaUtils(object): :param valueDecoder: A function used to decode value (default is utf8_decoder) :return: A DStream object """ - java_import(ssc._jvm, "org.apache.spark.streaming.kafka.KafkaUtils") - kafkaParams.update({ "zookeeper.connect": zkQuorum, "group.id": groupId, @@ -63,20 +61,34 @@ class KafkaUtils(object): jparam = MapConverter().convert(kafkaParams, ssc.sparkContext._gateway._gateway_client) jlevel = ssc._sc._getJavaStorageLevel(storageLevel) - def getClassByName(name): - return ssc._jvm.org.apache.spark.util.Utils.classForName(name) - try: - array = getClassByName("[B") - decoder = getClassByName("kafka.serializer.DefaultDecoder") - jstream = ssc._jvm.KafkaUtils.createStream(ssc._jssc, array, array, decoder, decoder, - jparam, jtopics, jlevel) - except Py4JError, e: + # Use KafkaUtilsPythonHelper to access Scala's KafkaUtils (see SPARK-6027) + helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\ + .loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper") + helper = helperClass.newInstance() + jstream = helper.createStream(ssc._jssc, jparam, jtopics, jlevel) + except Py4JJavaError, e: # TODO: use --jar once it also work on driver - if not e.message or 'call a package' in e.message: - print "No kafka package, please put the assembly jar into classpath:" - print " $ bin/spark-submit --driver-class-path external/kafka-assembly/target/" + \ - "scala-*/spark-streaming-kafka-assembly-*.jar" + if 'ClassNotFoundException' in str(e.java_exception): + print """ +________________________________________________________________________________________________ + + Spark Streaming's Kafka libraries not found in class path. Try one of the following. + + 1. Include the Kafka library and its dependencies with in the + spark-submit command as + + $ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka:%s ... + + 2. Download the JAR of the artifact from Maven Central http://search.maven.org/, + Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-assembly, Version = %s. + Then, innclude the jar in the spark-submit command as + + $ bin/spark-submit --jars <spark-streaming-kafka-assembly.jar> ... + +________________________________________________________________________________________________ + +""" % (ssc.sparkContext.version, ssc.sparkContext.version) raise e ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) stream = DStream(jstream, ssc, ser) |