diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2015-02-26 13:46:07 -0800 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-02-26 13:47:07 -0800 |
commit | aa63f633d39efa8c29095295f161eaad5495071d (patch) | |
tree | e3ecca97e0d71a64b94ef8ad62b8777d0c4dc3f2 /external | |
parent | 8942b522d8a3269a2a357e3a274ed4b3e66ebdde (diff) | |
download | spark-aa63f633d39efa8c29095295f161eaad5495071d.tar.gz spark-aa63f633d39efa8c29095295f161eaad5495071d.tar.bz2 spark-aa63f633d39efa8c29095295f161eaad5495071d.zip |
[SPARK-6027][SPARK-5546] Fixed --jar and --packages not working for KafkaUtils and improved error message
The problem with SPARK-6027 in short is that JARs like the kafka-assembly.jar does not work in python as the added JAR is not visible in the classloader used by Py4J. Py4J uses Class.forName(), which does not uses the systemclassloader, but the JARs are only visible in the Thread's contextclassloader. So this back uses the context class loader to create the KafkaUtils dstream object. This works for both cases where the Kafka libraries are added with --jars spark-streaming-kafka-assembly.jar or with --packages spark-streaming-kafka
Also improves the error message.
davies
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #4779 from tdas/kafka-python-fix and squashes the following commits:
fb16b04 [Tathagata Das] Removed import
c1fdf35 [Tathagata Das] Fixed long line and improved documentation
7b88be8 [Tathagata Das] Fixed --jar not working for KafkaUtils and improved error message
Diffstat (limited to 'external')
-rw-r--r-- | external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala | 29 |
1 files changed, 28 insertions, 1 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index af04bc6576..62a6595189 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -27,7 +27,7 @@ import scala.collection.JavaConversions._ import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata -import kafka.serializer.{Decoder, StringDecoder} +import kafka.serializer.{DefaultDecoder, Decoder, StringDecoder} import org.apache.spark.api.java.function.{Function => JFunction} import org.apache.spark.{SparkContext, SparkException} @@ -532,3 +532,30 @@ object KafkaUtils { ) } } + +/** + * This is a helper class that wraps the KafkaUtils.createStream() into more + * Python-friendly class and function so that it can be easily + * instantiated and called from Python's KafkaUtils (see SPARK-6027). + * + * The zero-arg constructor helps instantiate this class from the Class object + * classOf[KafkaUtilsPythonHelper].newInstance(), and the createStream() + * takes care of known parameters instead of passing them from Python + */ +private class KafkaUtilsPythonHelper { + def createStream( + jssc: JavaStreamingContext, + kafkaParams: JMap[String, String], + topics: JMap[String, JInt], + storageLevel: StorageLevel): JavaPairReceiverInputDStream[Array[Byte], Array[Byte]] = { + KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder]( + jssc, + classOf[Array[Byte]], + classOf[Array[Byte]], + classOf[DefaultDecoder], + classOf[DefaultDecoder], + kafkaParams, + topics, + storageLevel) + } +} |