aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2015-04-27 23:48:02 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-04-27 23:48:02 -0700
commit9e4e82b7bca1129bcd5e0274b9ae1b1be3fb93da (patch)
tree5df0f823975fd6f0ef7132a7346ca993bc30d63b /external
parent29576e786072bd4218e10036ddfc8d367b1c1446 (diff)
downloadspark-9e4e82b7bca1129bcd5e0274b9ae1b1be3fb93da.tar.gz
spark-9e4e82b7bca1129bcd5e0274b9ae1b1be3fb93da.tar.bz2
spark-9e4e82b7bca1129bcd5e0274b9ae1b1be3fb93da.zip
[SPARK-5946] [STREAMING] Add Python API for direct Kafka stream
Currently only added `createDirectStream` API, I'm not sure if `createRDD` is also needed, since some Java object needs to be wrapped in Python. Please help to review, thanks a lot. Author: jerryshao <saisai.shao@intel.com> Author: Saisai Shao <saisai.shao@intel.com> Closes #4723 from jerryshao/direct-kafka-python-api and squashes the following commits: a1fe97c [jerryshao] Fix rebase issue eebf333 [jerryshao] Address the comments da40f4e [jerryshao] Fix Python 2.6 Syntax error issue 5c0ee85 [jerryshao] Style fix 4aeac18 [jerryshao] Fix bug in example code 7146d86 [jerryshao] Add unit test bf3bdd6 [jerryshao] Add more APIs and address the comments f5b3801 [jerryshao] Small style fix 8641835 [Saisai Shao] Rebase and update the code 589c05b [Saisai Shao] Fix the style d6fcb6a [Saisai Shao] Address the comments dfda902 [Saisai Shao] Style fix 0f7d168 [Saisai Shao] Add the doc and fix some style issues 67e6880 [Saisai Shao] Fix test bug 917b0db [Saisai Shao] Add Python createRDD API for Kakfa direct stream c3fc11d [jerryshao] Modify the docs 2c00936 [Saisai Shao] address the comments 3360f44 [jerryshao] Fix code style e0e0f0d [jerryshao] Code clean and bug fix 338c41f [Saisai Shao] Add python API and example for direct kafka stream
Diffstat (limited to 'external')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala92
1 files changed, 91 insertions, 1 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index 5a9bd4214c..0721ddaf70 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -21,6 +21,7 @@ import java.lang.{Integer => JInt}
import java.lang.{Long => JLong}
import java.util.{Map => JMap}
import java.util.{Set => JSet}
+import java.util.{List => JList}
import scala.reflect.ClassTag
import scala.collection.JavaConversions._
@@ -234,7 +235,6 @@ object KafkaUtils {
new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler)
}
-
/**
* Create a RDD from Kafka using offset ranges for each topic and partition.
*
@@ -558,4 +558,94 @@ private class KafkaUtilsPythonHelper {
topics,
storageLevel)
}
+
+ def createRDD(
+ jsc: JavaSparkContext,
+ kafkaParams: JMap[String, String],
+ offsetRanges: JList[OffsetRange],
+ leaders: JMap[TopicAndPartition, Broker]): JavaPairRDD[Array[Byte], Array[Byte]] = {
+ val messageHandler = new JFunction[MessageAndMetadata[Array[Byte], Array[Byte]],
+ (Array[Byte], Array[Byte])] {
+ def call(t1: MessageAndMetadata[Array[Byte], Array[Byte]]): (Array[Byte], Array[Byte]) =
+ (t1.key(), t1.message())
+ }
+
+ val jrdd = KafkaUtils.createRDD[
+ Array[Byte],
+ Array[Byte],
+ DefaultDecoder,
+ DefaultDecoder,
+ (Array[Byte], Array[Byte])](
+ jsc,
+ classOf[Array[Byte]],
+ classOf[Array[Byte]],
+ classOf[DefaultDecoder],
+ classOf[DefaultDecoder],
+ classOf[(Array[Byte], Array[Byte])],
+ kafkaParams,
+ offsetRanges.toArray(new Array[OffsetRange](offsetRanges.size())),
+ leaders,
+ messageHandler
+ )
+ new JavaPairRDD(jrdd.rdd)
+ }
+
+ def createDirectStream(
+ jssc: JavaStreamingContext,
+ kafkaParams: JMap[String, String],
+ topics: JSet[String],
+ fromOffsets: JMap[TopicAndPartition, JLong]
+ ): JavaPairInputDStream[Array[Byte], Array[Byte]] = {
+
+ if (!fromOffsets.isEmpty) {
+ import scala.collection.JavaConversions._
+ val topicsFromOffsets = fromOffsets.keySet().map(_.topic)
+ if (topicsFromOffsets != topics.toSet) {
+ throw new IllegalStateException(s"The specified topics: ${topics.toSet.mkString(" ")} " +
+ s"do not equal to the topic from offsets: ${topicsFromOffsets.mkString(" ")}")
+ }
+ }
+
+ if (fromOffsets.isEmpty) {
+ KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](
+ jssc,
+ classOf[Array[Byte]],
+ classOf[Array[Byte]],
+ classOf[DefaultDecoder],
+ classOf[DefaultDecoder],
+ kafkaParams,
+ topics)
+ } else {
+ val messageHandler = new JFunction[MessageAndMetadata[Array[Byte], Array[Byte]],
+ (Array[Byte], Array[Byte])] {
+ def call(t1: MessageAndMetadata[Array[Byte], Array[Byte]]): (Array[Byte], Array[Byte]) =
+ (t1.key(), t1.message())
+ }
+
+ val jstream = KafkaUtils.createDirectStream[
+ Array[Byte],
+ Array[Byte],
+ DefaultDecoder,
+ DefaultDecoder,
+ (Array[Byte], Array[Byte])](
+ jssc,
+ classOf[Array[Byte]],
+ classOf[Array[Byte]],
+ classOf[DefaultDecoder],
+ classOf[DefaultDecoder],
+ classOf[(Array[Byte], Array[Byte])],
+ kafkaParams,
+ fromOffsets,
+ messageHandler)
+ new JavaPairInputDStream(jstream.inputDStream)
+ }
+ }
+
+ def createOffsetRange(topic: String, partition: JInt, fromOffset: JLong, untilOffset: JLong
+ ): OffsetRange = OffsetRange.create(topic, partition, fromOffset, untilOffset)
+
+ def createTopicAndPartition(topic: String, partition: JInt): TopicAndPartition =
+ TopicAndPartition(topic, partition)
+
+ def createBroker(host: String, port: JInt): Broker = Broker(host, port)
}