aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala29
-rw-r--r--python/pyspark/streaming/kafka.py42
2 files changed, 55 insertions, 16 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index af04bc6576..62a6595189 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -27,7 +27,7 @@ import scala.collection.JavaConversions._
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
-import kafka.serializer.{Decoder, StringDecoder}
+import kafka.serializer.{DefaultDecoder, Decoder, StringDecoder}
import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.{SparkContext, SparkException}
@@ -532,3 +532,30 @@ object KafkaUtils {
)
}
}
+
+/**
+ * This is a helper class that wraps the KafkaUtils.createStream() into more
+ * Python-friendly class and function so that it can be easily
+ * instantiated and called from Python's KafkaUtils (see SPARK-6027).
+ *
+ * The zero-arg constructor helps instantiate this class from the Class object
+ * classOf[KafkaUtilsPythonHelper].newInstance(), and the createStream()
+ * takes care of known parameters instead of passing them from Python
+ */
+private class KafkaUtilsPythonHelper {
+ def createStream(
+ jssc: JavaStreamingContext,
+ kafkaParams: JMap[String, String],
+ topics: JMap[String, JInt],
+ storageLevel: StorageLevel): JavaPairReceiverInputDStream[Array[Byte], Array[Byte]] = {
+ KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](
+ jssc,
+ classOf[Array[Byte]],
+ classOf[Array[Byte]],
+ classOf[DefaultDecoder],
+ classOf[DefaultDecoder],
+ kafkaParams,
+ topics,
+ storageLevel)
+ }
+}
diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py
index 19ad71f99d..0002dc10e8 100644
--- a/python/pyspark/streaming/kafka.py
+++ b/python/pyspark/streaming/kafka.py
@@ -16,7 +16,7 @@
#
from py4j.java_collections import MapConverter
-from py4j.java_gateway import java_import, Py4JError
+from py4j.java_gateway import java_import, Py4JError, Py4JJavaError
from pyspark.storagelevel import StorageLevel
from pyspark.serializers import PairDeserializer, NoOpSerializer
@@ -50,8 +50,6 @@ class KafkaUtils(object):
:param valueDecoder: A function used to decode value (default is utf8_decoder)
:return: A DStream object
"""
- java_import(ssc._jvm, "org.apache.spark.streaming.kafka.KafkaUtils")
-
kafkaParams.update({
"zookeeper.connect": zkQuorum,
"group.id": groupId,
@@ -63,20 +61,34 @@ class KafkaUtils(object):
jparam = MapConverter().convert(kafkaParams, ssc.sparkContext._gateway._gateway_client)
jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
- def getClassByName(name):
- return ssc._jvm.org.apache.spark.util.Utils.classForName(name)
-
try:
- array = getClassByName("[B")
- decoder = getClassByName("kafka.serializer.DefaultDecoder")
- jstream = ssc._jvm.KafkaUtils.createStream(ssc._jssc, array, array, decoder, decoder,
- jparam, jtopics, jlevel)
- except Py4JError, e:
+ # Use KafkaUtilsPythonHelper to access Scala's KafkaUtils (see SPARK-6027)
+ helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
+ .loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper")
+ helper = helperClass.newInstance()
+ jstream = helper.createStream(ssc._jssc, jparam, jtopics, jlevel)
+ except Py4JJavaError, e:
# TODO: use --jar once it also work on driver
- if not e.message or 'call a package' in e.message:
- print "No kafka package, please put the assembly jar into classpath:"
- print " $ bin/spark-submit --driver-class-path external/kafka-assembly/target/" + \
- "scala-*/spark-streaming-kafka-assembly-*.jar"
+ if 'ClassNotFoundException' in str(e.java_exception):
+ print """
+________________________________________________________________________________________________
+
+ Spark Streaming's Kafka libraries not found in class path. Try one of the following.
+
+ 1. Include the Kafka library and its dependencies with in the
+ spark-submit command as
+
+ $ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka:%s ...
+
+ 2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
+ Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-assembly, Version = %s.
+ Then, innclude the jar in the spark-submit command as
+
+ $ bin/spark-submit --jars <spark-streaming-kafka-assembly.jar> ...
+
+________________________________________________________________________________________________
+
+""" % (ssc.sparkContext.version, ssc.sparkContext.version)
raise e
ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
stream = DStream(jstream, ssc, ser)