aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
Diffstat (limited to 'external')
-rw-r--r--external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala8
1 files changed, 4 insertions, 4 deletions
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index edaafb912c..b17e198077 100644
--- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -18,7 +18,7 @@
package org.apache.spark.streaming.kafka
import java.io.OutputStream
-import java.lang.{Integer => JInt, Long => JLong}
+import java.lang.{Integer => JInt, Long => JLong, Number => JNumber}
import java.nio.charset.StandardCharsets
import java.util.{List => JList, Map => JMap, Set => JSet}
@@ -682,7 +682,7 @@ private[kafka] class KafkaUtilsPythonHelper {
jssc: JavaStreamingContext,
kafkaParams: JMap[String, String],
topics: JSet[String],
- fromOffsets: JMap[TopicAndPartition, JLong]): JavaDStream[(Array[Byte], Array[Byte])] = {
+ fromOffsets: JMap[TopicAndPartition, JNumber]): JavaDStream[(Array[Byte], Array[Byte])] = {
val messageHandler =
(mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) => (mmd.key, mmd.message)
new JavaDStream(createDirectStream(jssc, kafkaParams, topics, fromOffsets, messageHandler))
@@ -692,7 +692,7 @@ private[kafka] class KafkaUtilsPythonHelper {
jssc: JavaStreamingContext,
kafkaParams: JMap[String, String],
topics: JSet[String],
- fromOffsets: JMap[TopicAndPartition, JLong]): JavaDStream[Array[Byte]] = {
+ fromOffsets: JMap[TopicAndPartition, JNumber]): JavaDStream[Array[Byte]] = {
val messageHandler = (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) =>
new PythonMessageAndMetadata(mmd.topic, mmd.partition, mmd.offset, mmd.key(), mmd.message())
val stream = createDirectStream(jssc, kafkaParams, topics, fromOffsets, messageHandler).
@@ -704,7 +704,7 @@ private[kafka] class KafkaUtilsPythonHelper {
jssc: JavaStreamingContext,
kafkaParams: JMap[String, String],
topics: JSet[String],
- fromOffsets: JMap[TopicAndPartition, JLong],
+ fromOffsets: JMap[TopicAndPartition, JNumber],
messageHandler: MessageAndMetadata[Array[Byte], Array[Byte]] => V): DStream[V] = {
val currentFromOffsets = if (!fromOffsets.isEmpty) {