From 04c37b6f749dc2418cc28c89964cdc687dfcbd51 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 21 Apr 2014 19:04:49 -0700 Subject: [SPARK-1332] Improve Spark Streaming's Network Receiver and InputDStream API [WIP] The current Network Receiver API makes it slightly complicated to right a new receiver as one needs to create an instance of BlockGenerator as shown in SocketReceiver https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala#L51 Exposing the BlockGenerator interface has made it harder to improve the receiving process. The API of NetworkReceiver (which was not a very stable API anyways) needs to be change if we are to ensure future stability. Additionally, the functions like streamingContext.socketStream that create input streams, return DStream objects. That makes it hard to expose functionality (say, rate limits) unique to input dstreams. They should return InputDStream or NetworkInputDStream. This is still not yet implemented. This PR is blocked on the graceful shutdown PR #247 Author: Tathagata Das Closes #300 from tdas/network-receiver-api and squashes the following commits: ea27b38 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into network-receiver-api 3a4777c [Tathagata Das] Renamed NetworkInputDStream to ReceiverInputDStream, and ActorReceiver related stuff. 838dd39 [Tathagata Das] Added more events to the StreamingListener to report errors and stopped receivers. a75c7a6 [Tathagata Das] Address some PR comments and fixed other issues. 91bfa72 [Tathagata Das] Fixed bugs. 8533094 [Tathagata Das] Scala style fixes. 028bde6 [Tathagata Das] Further refactored receiver to allow restarting of a receiver. 43f5290 [Tathagata Das] Made functions that create input streams return InputDStream and NetworkInputDStream, for both Scala and Java. 2c94579 [Tathagata Das] Fixed graceful shutdown by removing interrupts on receiving thread. 9e37a0b [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into network-receiver-api 3223e95 [Tathagata Das] Refactored the code that runs the NetworkReceiver into further classes and traits to make them more testable. a36cc48 [Tathagata Das] Refactored the NetworkReceiver API for future stability. --- .../spark/streaming/mqtt/MQTTInputDStream.scala | 41 +++++++++++----------- .../apache/spark/streaming/mqtt/MQTTUtils.scala | 12 +++---- .../spark/streaming/mqtt/JavaMQTTStreamSuite.java | 6 ++-- .../spark/streaming/mqtt/MQTTStreamSuite.scala | 6 ++-- 4 files changed, 33 insertions(+), 32 deletions(-) (limited to 'external/mqtt') diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala index 1204cfba39..0beee8b415 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala @@ -39,6 +39,7 @@ import org.apache.spark.Logging import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.receiver.Receiver /** * Input stream that subscribe messages from a Mqtt Broker. @@ -49,38 +50,36 @@ import org.apache.spark.streaming.dstream._ */ private[streaming] -class MQTTInputDStream[T: ClassTag]( +class MQTTInputDStream( @transient ssc_ : StreamingContext, brokerUrl: String, topic: String, storageLevel: StorageLevel - ) extends NetworkInputDStream[T](ssc_) with Logging { - - def getReceiver(): NetworkReceiver[T] = { - new MQTTReceiver(brokerUrl, topic, storageLevel).asInstanceOf[NetworkReceiver[T]] + ) extends ReceiverInputDStream[String](ssc_) with Logging { + + def getReceiver(): Receiver[String] = { + new MQTTReceiver(brokerUrl, topic, storageLevel) } } -private[streaming] -class MQTTReceiver(brokerUrl: String, - topic: String, - storageLevel: StorageLevel - ) extends NetworkReceiver[Any] { - lazy protected val blockGenerator = new BlockGenerator(storageLevel) +private[streaming] +class MQTTReceiver( + brokerUrl: String, + topic: String, + storageLevel: StorageLevel + ) extends Receiver[String](storageLevel) { def onStop() { - blockGenerator.stop() - } + } + def onStart() { - blockGenerator.start() - - // Set up persistence for messages - var peristance: MqttClientPersistence = new MemoryPersistence() + // Set up persistence for messages + val persistence = new MemoryPersistence() // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance - var client: MqttClient = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance) + val client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence) // Connect to MqttBroker client.connect() @@ -89,18 +88,18 @@ class MQTTReceiver(brokerUrl: String, client.subscribe(topic) // Callback automatically triggers as and when new message arrives on specified topic - var callback: MqttCallback = new MqttCallback() { + val callback: MqttCallback = new MqttCallback() { // Handles Mqtt message override def messageArrived(arg0: String, arg1: MqttMessage) { - blockGenerator += new String(arg1.getPayload()) + store(new String(arg1.getPayload())) } override def deliveryComplete(arg0: IMqttDeliveryToken) { } override def connectionLost(arg0: Throwable) { - logInfo("Connection lost " + arg0) + restart("Connection lost ", arg0) } } diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala index 1b09ee5dc8..c5ffe51f99 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala @@ -19,9 +19,9 @@ package org.apache.spark.streaming.mqtt import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream} +import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext, JavaDStream} import scala.reflect.ClassTag -import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream} object MQTTUtils { /** @@ -36,8 +36,8 @@ object MQTTUtils { brokerUrl: String, topic: String, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): DStream[String] = { - new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel) + ): ReceiverInputDStream[String] = { + new MQTTInputDStream(ssc, brokerUrl, topic, storageLevel) } /** @@ -51,7 +51,7 @@ object MQTTUtils { jssc: JavaStreamingContext, brokerUrl: String, topic: String - ): JavaDStream[String] = { + ): JavaReceiverInputDStream[String] = { implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] createStream(jssc.ssc, brokerUrl, topic) } @@ -68,7 +68,7 @@ object MQTTUtils { brokerUrl: String, topic: String, storageLevel: StorageLevel - ): JavaDStream[String] = { + ): JavaReceiverInputDStream[String] = { implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] createStream(jssc.ssc, brokerUrl, topic, storageLevel) } diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java index 44743aaecf..ce5aa1e0cd 100644 --- a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java +++ b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java @@ -18,7 +18,7 @@ package org.apache.spark.streaming.mqtt; import org.apache.spark.storage.StorageLevel; -import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.junit.Test; import org.apache.spark.streaming.LocalJavaStreamingContext; @@ -30,8 +30,8 @@ public class JavaMQTTStreamSuite extends LocalJavaStreamingContext { String topic = "def"; // tests the API, does not actually test data receiving - JavaDStream test1 = MQTTUtils.createStream(ssc, brokerUrl, topic); - JavaDStream test2 = MQTTUtils.createStream(ssc, brokerUrl, topic, + JavaReceiverInputDStream test1 = MQTTUtils.createStream(ssc, brokerUrl, topic); + JavaReceiverInputDStream test2 = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2()); } } diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala index 89c40ad461..467fd263e2 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.streaming.mqtt import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.dstream.ReceiverInputDStream class MQTTStreamSuite extends TestSuiteBase { @@ -28,8 +29,9 @@ class MQTTStreamSuite extends TestSuiteBase { val topic = "def" // tests the API, does not actually test data receiving - val test1 = MQTTUtils.createStream(ssc, brokerUrl, topic) - val test2 = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2) + val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic) + val test2: ReceiverInputDStream[String] = + MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2) // TODO: Actually test receiving data ssc.stop() -- cgit v1.2.3