aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-02-26 13:46:07 -0800
committerAndrew Or <andrew@databricks.com>2015-02-26 13:47:07 -0800
commitaa63f633d39efa8c29095295f161eaad5495071d (patch)
treee3ecca97e0d71a64b94ef8ad62b8777d0c4dc3f2 /python/pyspark/streaming
parent8942b522d8a3269a2a357e3a274ed4b3e66ebdde (diff)
downloadspark-aa63f633d39efa8c29095295f161eaad5495071d.tar.gz
spark-aa63f633d39efa8c29095295f161eaad5495071d.tar.bz2
spark-aa63f633d39efa8c29095295f161eaad5495071d.zip
[SPARK-6027][SPARK-5546] Fixed --jar and --packages not working for KafkaUtils and improved error message
The problem with SPARK-6027 in short is that JARs like the kafka-assembly.jar does not work in python as the added JAR is not visible in the classloader used by Py4J. Py4J uses Class.forName(), which does not uses the systemclassloader, but the JARs are only visible in the Thread's contextclassloader. So this back uses the context class loader to create the KafkaUtils dstream object. This works for both cases where the Kafka libraries are added with --jars spark-streaming-kafka-assembly.jar or with --packages spark-streaming-kafka Also improves the error message. davies Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #4779 from tdas/kafka-python-fix and squashes the following commits: fb16b04 [Tathagata Das] Removed import c1fdf35 [Tathagata Das] Fixed long line and improved documentation 7b88be8 [Tathagata Das] Fixed --jar not working for KafkaUtils and improved error message
Diffstat (limited to 'python/pyspark/streaming')
-rw-r--r--python/pyspark/streaming/kafka.py42
1 files changed, 27 insertions, 15 deletions
diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py
index 19ad71f99d..0002dc10e8 100644
--- a/python/pyspark/streaming/kafka.py
+++ b/python/pyspark/streaming/kafka.py
@@ -16,7 +16,7 @@
#
from py4j.java_collections import MapConverter
-from py4j.java_gateway import java_import, Py4JError
+from py4j.java_gateway import java_import, Py4JError, Py4JJavaError
from pyspark.storagelevel import StorageLevel
from pyspark.serializers import PairDeserializer, NoOpSerializer
@@ -50,8 +50,6 @@ class KafkaUtils(object):
:param valueDecoder: A function used to decode value (default is utf8_decoder)
:return: A DStream object
"""
- java_import(ssc._jvm, "org.apache.spark.streaming.kafka.KafkaUtils")
-
kafkaParams.update({
"zookeeper.connect": zkQuorum,
"group.id": groupId,
@@ -63,20 +61,34 @@ class KafkaUtils(object):
jparam = MapConverter().convert(kafkaParams, ssc.sparkContext._gateway._gateway_client)
jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
- def getClassByName(name):
- return ssc._jvm.org.apache.spark.util.Utils.classForName(name)
-
try:
- array = getClassByName("[B")
- decoder = getClassByName("kafka.serializer.DefaultDecoder")
- jstream = ssc._jvm.KafkaUtils.createStream(ssc._jssc, array, array, decoder, decoder,
- jparam, jtopics, jlevel)
- except Py4JError, e:
+ # Use KafkaUtilsPythonHelper to access Scala's KafkaUtils (see SPARK-6027)
+ helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
+ .loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper")
+ helper = helperClass.newInstance()
+ jstream = helper.createStream(ssc._jssc, jparam, jtopics, jlevel)
+ except Py4JJavaError, e:
# TODO: use --jar once it also work on driver
- if not e.message or 'call a package' in e.message:
- print "No kafka package, please put the assembly jar into classpath:"
- print " $ bin/spark-submit --driver-class-path external/kafka-assembly/target/" + \
- "scala-*/spark-streaming-kafka-assembly-*.jar"
+ if 'ClassNotFoundException' in str(e.java_exception):
+ print """
+________________________________________________________________________________________________
+
+ Spark Streaming's Kafka libraries not found in class path. Try one of the following.
+
+ 1. Include the Kafka library and its dependencies with in the
+ spark-submit command as
+
+ $ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka:%s ...
+
+ 2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
+ Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-assembly, Version = %s.
+ Then, innclude the jar in the spark-submit command as
+
+ $ bin/spark-submit --jars <spark-streaming-kafka-assembly.jar> ...
+
+________________________________________________________________________________________________
+
+""" % (ssc.sparkContext.version, ssc.sparkContext.version)
raise e
ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
stream = DStream(jstream, ssc, ser)