aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorMariusz Strzelecki <mariusz.strzelecki@allegrogroup.com>2016-08-09 09:44:43 -0700
committerDavies Liu <davies.liu@gmail.com>2016-08-09 09:44:43 -0700
commit29081b587f3423bf5a3e0066357884d0c26a04bf (patch)
treeb86e35b297a8b3100a19dc0e85f828434524b96e /external
parent182e11904bf2093c2faa57894a1c4bb11d872596 (diff)
downloadspark-29081b587f3423bf5a3e0066357884d0c26a04bf.tar.gz
spark-29081b587f3423bf5a3e0066357884d0c26a04bf.tar.bz2
spark-29081b587f3423bf5a3e0066357884d0c26a04bf.zip
[SPARK-16950] [PYSPARK] fromOffsets parameter support in KafkaUtils.createDirectStream for python3
## What changes were proposed in this pull request? Ability to use KafkaUtils.createDirectStream with starting offsets in python 3 by using java.lang.Number instead of Long during param mapping in scala helper. This allows py4j to pass Integer or Long to the map and resolves ClassCastException problems. ## How was this patch tested? unit tests jerryshao - could you please look at this PR? Author: Mariusz Strzelecki <mariusz.strzelecki@allegrogroup.com> Closes #14540 from szczeles/kafka_pyspark.
Diffstat (limited to 'external')
-rw-r--r--external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala8
1 files changed, 4 insertions, 4 deletions
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index edaafb912c..b17e198077 100644
--- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -18,7 +18,7 @@
package org.apache.spark.streaming.kafka
import java.io.OutputStream
-import java.lang.{Integer => JInt, Long => JLong}
+import java.lang.{Integer => JInt, Long => JLong, Number => JNumber}
import java.nio.charset.StandardCharsets
import java.util.{List => JList, Map => JMap, Set => JSet}
@@ -682,7 +682,7 @@ private[kafka] class KafkaUtilsPythonHelper {
jssc: JavaStreamingContext,
kafkaParams: JMap[String, String],
topics: JSet[String],
- fromOffsets: JMap[TopicAndPartition, JLong]): JavaDStream[(Array[Byte], Array[Byte])] = {
+ fromOffsets: JMap[TopicAndPartition, JNumber]): JavaDStream[(Array[Byte], Array[Byte])] = {
val messageHandler =
(mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) => (mmd.key, mmd.message)
new JavaDStream(createDirectStream(jssc, kafkaParams, topics, fromOffsets, messageHandler))
@@ -692,7 +692,7 @@ private[kafka] class KafkaUtilsPythonHelper {
jssc: JavaStreamingContext,
kafkaParams: JMap[String, String],
topics: JSet[String],
- fromOffsets: JMap[TopicAndPartition, JLong]): JavaDStream[Array[Byte]] = {
+ fromOffsets: JMap[TopicAndPartition, JNumber]): JavaDStream[Array[Byte]] = {
val messageHandler = (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) =>
new PythonMessageAndMetadata(mmd.topic, mmd.partition, mmd.offset, mmd.key(), mmd.message())
val stream = createDirectStream(jssc, kafkaParams, topics, fromOffsets, messageHandler).
@@ -704,7 +704,7 @@ private[kafka] class KafkaUtilsPythonHelper {
jssc: JavaStreamingContext,
kafkaParams: JMap[String, String],
topics: JSet[String],
- fromOffsets: JMap[TopicAndPartition, JLong],
+ fromOffsets: JMap[TopicAndPartition, JNumber],
messageHandler: MessageAndMetadata[Array[Byte], Array[Byte]] => V): DStream[V] = {
val currentFromOffsets = if (!fromOffsets.isEmpty) {