diff options
author | Mariusz Strzelecki <mariusz.strzelecki@allegrogroup.com> | 2016-08-09 09:44:43 -0700 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2016-08-09 09:44:43 -0700 |
commit | 29081b587f3423bf5a3e0066357884d0c26a04bf (patch) | |
tree | b86e35b297a8b3100a19dc0e85f828434524b96e /external | |
parent | 182e11904bf2093c2faa57894a1c4bb11d872596 (diff) | |
download | spark-29081b587f3423bf5a3e0066357884d0c26a04bf.tar.gz spark-29081b587f3423bf5a3e0066357884d0c26a04bf.tar.bz2 spark-29081b587f3423bf5a3e0066357884d0c26a04bf.zip |
[SPARK-16950] [PYSPARK] fromOffsets parameter support in KafkaUtils.createDirectStream for python3
## What changes were proposed in this pull request?
Ability to use KafkaUtils.createDirectStream with starting offsets in python 3 by using java.lang.Number instead of Long during param mapping in scala helper. This allows py4j to pass Integer or Long to the map and resolves ClassCastException problems.
## How was this patch tested?
unit tests
jerryshao - could you please look at this PR?
Author: Mariusz Strzelecki <mariusz.strzelecki@allegrogroup.com>
Closes #14540 from szczeles/kafka_pyspark.
Diffstat (limited to 'external')
-rw-r--r-- | external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala | 8 |
1 files changed, 4 insertions, 4 deletions
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index edaafb912c..b17e198077 100644 --- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -18,7 +18,7 @@ package org.apache.spark.streaming.kafka import java.io.OutputStream -import java.lang.{Integer => JInt, Long => JLong} +import java.lang.{Integer => JInt, Long => JLong, Number => JNumber} import java.nio.charset.StandardCharsets import java.util.{List => JList, Map => JMap, Set => JSet} @@ -682,7 +682,7 @@ private[kafka] class KafkaUtilsPythonHelper { jssc: JavaStreamingContext, kafkaParams: JMap[String, String], topics: JSet[String], - fromOffsets: JMap[TopicAndPartition, JLong]): JavaDStream[(Array[Byte], Array[Byte])] = { + fromOffsets: JMap[TopicAndPartition, JNumber]): JavaDStream[(Array[Byte], Array[Byte])] = { val messageHandler = (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) => (mmd.key, mmd.message) new JavaDStream(createDirectStream(jssc, kafkaParams, topics, fromOffsets, messageHandler)) @@ -692,7 +692,7 @@ private[kafka] class KafkaUtilsPythonHelper { jssc: JavaStreamingContext, kafkaParams: JMap[String, String], topics: JSet[String], - fromOffsets: JMap[TopicAndPartition, JLong]): JavaDStream[Array[Byte]] = { + fromOffsets: JMap[TopicAndPartition, JNumber]): JavaDStream[Array[Byte]] = { val messageHandler = (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) => new PythonMessageAndMetadata(mmd.topic, mmd.partition, mmd.offset, mmd.key(), mmd.message()) val stream = createDirectStream(jssc, kafkaParams, topics, fromOffsets, messageHandler). @@ -704,7 +704,7 @@ private[kafka] class KafkaUtilsPythonHelper { jssc: JavaStreamingContext, kafkaParams: JMap[String, String], topics: JSet[String], - fromOffsets: JMap[TopicAndPartition, JLong], + fromOffsets: JMap[TopicAndPartition, JNumber], messageHandler: MessageAndMetadata[Array[Byte], Array[Byte]] => V): DStream[V] = { val currentFromOffsets = if (!fromOffsets.isEmpty) { |