diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2014-04-21 19:04:49 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-04-21 19:04:49 -0700 |
commit | 04c37b6f749dc2418cc28c89964cdc687dfcbd51 (patch) | |
tree | ba434fee57cba6fe201e83ad9c049fded5e09bc0 /external/kafka/src/main | |
parent | 5a5b3346c79abb659260284fed0ace51942f3193 (diff) | |
download | spark-04c37b6f749dc2418cc28c89964cdc687dfcbd51.tar.gz spark-04c37b6f749dc2418cc28c89964cdc687dfcbd51.tar.bz2 spark-04c37b6f749dc2418cc28c89964cdc687dfcbd51.zip |
[SPARK-1332] Improve Spark Streaming's Network Receiver and InputDStream API [WIP]
The current Network Receiver API makes it slightly complicated to right a new receiver as one needs to create an instance of BlockGenerator as shown in SocketReceiver
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala#L51
Exposing the BlockGenerator interface has made it harder to improve the receiving process. The API of NetworkReceiver (which was not a very stable API anyways) needs to be change if we are to ensure future stability.
Additionally, the functions like streamingContext.socketStream that create input streams, return DStream objects. That makes it hard to expose functionality (say, rate limits) unique to input dstreams. They should return InputDStream or NetworkInputDStream. This is still not yet implemented.
This PR is blocked on the graceful shutdown PR #247
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #300 from tdas/network-receiver-api and squashes the following commits:
ea27b38 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into network-receiver-api
3a4777c [Tathagata Das] Renamed NetworkInputDStream to ReceiverInputDStream, and ActorReceiver related stuff.
838dd39 [Tathagata Das] Added more events to the StreamingListener to report errors and stopped receivers.
a75c7a6 [Tathagata Das] Address some PR comments and fixed other issues.
91bfa72 [Tathagata Das] Fixed bugs.
8533094 [Tathagata Das] Scala style fixes.
028bde6 [Tathagata Das] Further refactored receiver to allow restarting of a receiver.
43f5290 [Tathagata Das] Made functions that create input streams return InputDStream and NetworkInputDStream, for both Scala and Java.
2c94579 [Tathagata Das] Fixed graceful shutdown by removing interrupts on receiving thread.
9e37a0b [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into network-receiver-api
3223e95 [Tathagata Das] Refactored the code that runs the NetworkReceiver into further classes and traits to make them more testable.
a36cc48 [Tathagata Das] Refactored the NetworkReceiver API for future stability.
Diffstat (limited to 'external/kafka/src/main')
-rw-r--r-- | external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala | 19 | ||||
-rw-r--r-- | external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala | 14 |
2 files changed, 14 insertions, 19 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala index c2d9dcbfaa..21443ebbbf 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala @@ -33,6 +33,7 @@ import org.apache.spark.Logging import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.receiver.Receiver /** * Input stream that pulls messages from a Kafka Broker. @@ -53,11 +54,11 @@ class KafkaInputDStream[ kafkaParams: Map[String, String], topics: Map[String, Int], storageLevel: StorageLevel - ) extends NetworkInputDStream[(K, V)](ssc_) with Logging { + ) extends ReceiverInputDStream[(K, V)](ssc_) with Logging { - def getReceiver(): NetworkReceiver[(K, V)] = { + def getReceiver(): Receiver[(K, V)] = { new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) - .asInstanceOf[NetworkReceiver[(K, V)]] + .asInstanceOf[Receiver[(K, V)]] } } @@ -70,21 +71,15 @@ class KafkaReceiver[ kafkaParams: Map[String, String], topics: Map[String, Int], storageLevel: StorageLevel - ) extends NetworkReceiver[Any] { + ) extends Receiver[Any](storageLevel) with Logging { - // Handles pushing data into the BlockManager - lazy protected val blockGenerator = new BlockGenerator(storageLevel) // Connection to Kafka var consumerConnector : ConsumerConnector = null - def onStop() { - blockGenerator.stop() - } + def onStop() { } def onStart() { - blockGenerator.start() - // In case we are using multiple Threads to handle Kafka Messages val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _)) @@ -130,7 +125,7 @@ class KafkaReceiver[ def run() { logInfo("Starting MessageHandler.") for (msgAndMetadata <- stream) { - blockGenerator += (msgAndMetadata.key, msgAndMetadata.message) + store((msgAndMetadata.key, msgAndMetadata.message)) } } } diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index 5472d0cd04..86bb91f362 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -27,8 +27,8 @@ import kafka.serializer.{Decoder, StringDecoder} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream} -import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.api.java.{JavaPairReceiverInputDStream, JavaStreamingContext, JavaPairDStream} +import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream} object KafkaUtils { @@ -48,7 +48,7 @@ object KafkaUtils { groupId: String, topics: Map[String, Int], storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): DStream[(String, String)] = { + ): ReceiverInputDStream[(String, String)] = { val kafkaParams = Map[String, String]( "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, "zookeeper.connection.timeout.ms" -> "10000") @@ -70,7 +70,7 @@ object KafkaUtils { kafkaParams: Map[String, String], topics: Map[String, Int], storageLevel: StorageLevel - ): DStream[(K, V)] = { + ): ReceiverInputDStream[(K, V)] = { new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel) } @@ -88,7 +88,7 @@ object KafkaUtils { zkQuorum: String, groupId: String, topics: JMap[String, JInt] - ): JavaPairDStream[String, String] = { + ): JavaPairReceiverInputDStream[String, String] = { implicit val cmt: ClassTag[String] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*)) @@ -110,7 +110,7 @@ object KafkaUtils { groupId: String, topics: JMap[String, JInt], storageLevel: StorageLevel - ): JavaPairDStream[String, String] = { + ): JavaPairReceiverInputDStream[String, String] = { implicit val cmt: ClassTag[String] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), @@ -139,7 +139,7 @@ object KafkaUtils { kafkaParams: JMap[String, String], topics: JMap[String, JInt], storageLevel: StorageLevel - ): JavaPairDStream[K, V] = { + ): JavaPairReceiverInputDStream[K, V] = { implicit val keyCmt: ClassTag[K] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] implicit val valueCmt: ClassTag[V] = |