aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
Diffstat (limited to 'external')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala92
1 files changed, 91 insertions, 1 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index 5a9bd4214c..0721ddaf70 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -21,6 +21,7 @@ import java.lang.{Integer => JInt}
import java.lang.{Long => JLong}
import java.util.{Map => JMap}
import java.util.{Set => JSet}
+import java.util.{List => JList}
import scala.reflect.ClassTag
import scala.collection.JavaConversions._
@@ -234,7 +235,6 @@ object KafkaUtils {
new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler)
}
-
/**
* Create a RDD from Kafka using offset ranges for each topic and partition.
*
@@ -558,4 +558,94 @@ private class KafkaUtilsPythonHelper {
topics,
storageLevel)
}
+
+ def createRDD(
+ jsc: JavaSparkContext,
+ kafkaParams: JMap[String, String],
+ offsetRanges: JList[OffsetRange],
+ leaders: JMap[TopicAndPartition, Broker]): JavaPairRDD[Array[Byte], Array[Byte]] = {
+ val messageHandler = new JFunction[MessageAndMetadata[Array[Byte], Array[Byte]],
+ (Array[Byte], Array[Byte])] {
+ def call(t1: MessageAndMetadata[Array[Byte], Array[Byte]]): (Array[Byte], Array[Byte]) =
+ (t1.key(), t1.message())
+ }
+
+ val jrdd = KafkaUtils.createRDD[
+ Array[Byte],
+ Array[Byte],
+ DefaultDecoder,
+ DefaultDecoder,
+ (Array[Byte], Array[Byte])](
+ jsc,
+ classOf[Array[Byte]],
+ classOf[Array[Byte]],
+ classOf[DefaultDecoder],
+ classOf[DefaultDecoder],
+ classOf[(Array[Byte], Array[Byte])],
+ kafkaParams,
+ offsetRanges.toArray(new Array[OffsetRange](offsetRanges.size())),
+ leaders,
+ messageHandler
+ )
+ new JavaPairRDD(jrdd.rdd)
+ }
+
+ def createDirectStream(
+ jssc: JavaStreamingContext,
+ kafkaParams: JMap[String, String],
+ topics: JSet[String],
+ fromOffsets: JMap[TopicAndPartition, JLong]
+ ): JavaPairInputDStream[Array[Byte], Array[Byte]] = {
+
+ if (!fromOffsets.isEmpty) {
+ import scala.collection.JavaConversions._
+ val topicsFromOffsets = fromOffsets.keySet().map(_.topic)
+ if (topicsFromOffsets != topics.toSet) {
+ throw new IllegalStateException(s"The specified topics: ${topics.toSet.mkString(" ")} " +
+ s"do not equal to the topic from offsets: ${topicsFromOffsets.mkString(" ")}")
+ }
+ }
+
+ if (fromOffsets.isEmpty) {
+ KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](
+ jssc,
+ classOf[Array[Byte]],
+ classOf[Array[Byte]],
+ classOf[DefaultDecoder],
+ classOf[DefaultDecoder],
+ kafkaParams,
+ topics)
+ } else {
+ val messageHandler = new JFunction[MessageAndMetadata[Array[Byte], Array[Byte]],
+ (Array[Byte], Array[Byte])] {
+ def call(t1: MessageAndMetadata[Array[Byte], Array[Byte]]): (Array[Byte], Array[Byte]) =
+ (t1.key(), t1.message())
+ }
+
+ val jstream = KafkaUtils.createDirectStream[
+ Array[Byte],
+ Array[Byte],
+ DefaultDecoder,
+ DefaultDecoder,
+ (Array[Byte], Array[Byte])](
+ jssc,
+ classOf[Array[Byte]],
+ classOf[Array[Byte]],
+ classOf[DefaultDecoder],
+ classOf[DefaultDecoder],
+ classOf[(Array[Byte], Array[Byte])],
+ kafkaParams,
+ fromOffsets,
+ messageHandler)
+ new JavaPairInputDStream(jstream.inputDStream)
+ }
+ }
+
+ def createOffsetRange(topic: String, partition: JInt, fromOffset: JLong, untilOffset: JLong
+ ): OffsetRange = OffsetRange.create(topic, partition, fromOffset, untilOffset)
+
+ def createTopicAndPartition(topic: String, partition: JInt): TopicAndPartition =
+ TopicAndPartition(topic, partition)
+
+ def createBroker(host: String, port: JInt): Broker = Broker(host, port)
}