diff options
Diffstat (limited to 'external')
19 files changed, 143 insertions, 128 deletions
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala index 34012b846e..df7605fe57 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala @@ -34,6 +34,8 @@ import org.apache.spark.util.Utils import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream._ +import org.apache.spark.Logging +import org.apache.spark.streaming.receiver.Receiver private[streaming] class FlumeInputDStream[T: ClassTag]( @@ -41,9 +43,9 @@ class FlumeInputDStream[T: ClassTag]( host: String, port: Int, storageLevel: StorageLevel -) extends NetworkInputDStream[SparkFlumeEvent](ssc_) { +) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) { - override def getReceiver(): NetworkReceiver[SparkFlumeEvent] = { + override def getReceiver(): Receiver[SparkFlumeEvent] = { new FlumeReceiver(host, port, storageLevel) } } @@ -115,13 +117,13 @@ private[streaming] object SparkFlumeEvent { private[streaming] class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol { override def append(event : AvroFlumeEvent) : Status = { - receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event) + receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event)) Status.OK } override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = { events.foreach (event => - receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event)) + receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event))) Status.OK } } @@ -133,23 +135,21 @@ class FlumeReceiver( host: String, port: Int, storageLevel: StorageLevel - ) extends NetworkReceiver[SparkFlumeEvent] { + ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging { - lazy val blockGenerator = new BlockGenerator(storageLevel) + lazy val responder = new SpecificResponder( + classOf[AvroSourceProtocol], new FlumeEventServer(this)) + lazy val server = new NettyServer(responder, new InetSocketAddress(host, port)) - protected override def onStart() { - val responder = new SpecificResponder( - classOf[AvroSourceProtocol], new FlumeEventServer(this)) - val server = new NettyServer(responder, new InetSocketAddress(host, port)) - blockGenerator.start() + def onStart() { server.start() logInfo("Flume receiver started") } - protected override def onStop() { - blockGenerator.stop() + def onStop() { + server.close() logInfo("Flume receiver stopped") } - override def getLocationPreference = Some(host) + override def preferredLocation = Some(host) } diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala index 654ba451e7..499f3560ef 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala @@ -19,8 +19,8 @@ package org.apache.spark.streaming.flume import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream} -import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaInputDStream, JavaStreamingContext, JavaDStream} +import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream} object FlumeUtils { /** @@ -35,7 +35,7 @@ object FlumeUtils { hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): DStream[SparkFlumeEvent] = { + ): ReceiverInputDStream[SparkFlumeEvent] = { val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel) inputStream } @@ -50,7 +50,7 @@ object FlumeUtils { jssc: JavaStreamingContext, hostname: String, port: Int - ): JavaDStream[SparkFlumeEvent] = { + ): JavaReceiverInputDStream[SparkFlumeEvent] = { createStream(jssc.ssc, hostname, port) } @@ -65,7 +65,7 @@ object FlumeUtils { hostname: String, port: Int, storageLevel: StorageLevel - ): JavaDStream[SparkFlumeEvent] = { + ): JavaReceiverInputDStream[SparkFlumeEvent] = { createStream(jssc.ssc, hostname, port, storageLevel) } } diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java index 733389b98d..e0ad4f1015 100644 --- a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java +++ b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java @@ -19,16 +19,16 @@ package org.apache.spark.streaming.flume; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.LocalJavaStreamingContext; -import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.junit.Test; public class JavaFlumeStreamSuite extends LocalJavaStreamingContext { @Test public void testFlumeStream() { // tests the API, does not actually test data receiving - JavaDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345); - JavaDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345, + JavaReceiverInputDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345); + JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345, StorageLevel.MEMORY_AND_DISK_SER_2()); } } diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala index 8bc43972ab..78603200d2 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala @@ -31,6 +31,7 @@ import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase} import org.apache.spark.streaming.util.ManualClock +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream class FlumeStreamSuite extends TestSuiteBase { @@ -39,10 +40,11 @@ class FlumeStreamSuite extends TestSuiteBase { test("flume input stream") { // Set up the streaming context and input streams val ssc = new StreamingContext(conf, batchDuration) - val flumeStream = FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK) + val flumeStream: JavaReceiverInputDStream[SparkFlumeEvent] = + FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK) val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]] with SynchronizedBuffer[Seq[SparkFlumeEvent]] - val outputStream = new TestOutputStream(flumeStream, outputBuffer) + val outputStream = new TestOutputStream(flumeStream.receiverInputDStream, outputBuffer) outputStream.register() ssc.start() diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala index c2d9dcbfaa..21443ebbbf 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala @@ -33,6 +33,7 @@ import org.apache.spark.Logging import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.receiver.Receiver /** * Input stream that pulls messages from a Kafka Broker. @@ -53,11 +54,11 @@ class KafkaInputDStream[ kafkaParams: Map[String, String], topics: Map[String, Int], storageLevel: StorageLevel - ) extends NetworkInputDStream[(K, V)](ssc_) with Logging { + ) extends ReceiverInputDStream[(K, V)](ssc_) with Logging { - def getReceiver(): NetworkReceiver[(K, V)] = { + def getReceiver(): Receiver[(K, V)] = { new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) - .asInstanceOf[NetworkReceiver[(K, V)]] + .asInstanceOf[Receiver[(K, V)]] } } @@ -70,21 +71,15 @@ class KafkaReceiver[ kafkaParams: Map[String, String], topics: Map[String, Int], storageLevel: StorageLevel - ) extends NetworkReceiver[Any] { + ) extends Receiver[Any](storageLevel) with Logging { - // Handles pushing data into the BlockManager - lazy protected val blockGenerator = new BlockGenerator(storageLevel) // Connection to Kafka var consumerConnector : ConsumerConnector = null - def onStop() { - blockGenerator.stop() - } + def onStop() { } def onStart() { - blockGenerator.start() - // In case we are using multiple Threads to handle Kafka Messages val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _)) @@ -130,7 +125,7 @@ class KafkaReceiver[ def run() { logInfo("Starting MessageHandler.") for (msgAndMetadata <- stream) { - blockGenerator += (msgAndMetadata.key, msgAndMetadata.message) + store((msgAndMetadata.key, msgAndMetadata.message)) } } } diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index 5472d0cd04..86bb91f362 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -27,8 +27,8 @@ import kafka.serializer.{Decoder, StringDecoder} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream} -import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.api.java.{JavaPairReceiverInputDStream, JavaStreamingContext, JavaPairDStream} +import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream} object KafkaUtils { @@ -48,7 +48,7 @@ object KafkaUtils { groupId: String, topics: Map[String, Int], storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): DStream[(String, String)] = { + ): ReceiverInputDStream[(String, String)] = { val kafkaParams = Map[String, String]( "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, "zookeeper.connection.timeout.ms" -> "10000") @@ -70,7 +70,7 @@ object KafkaUtils { kafkaParams: Map[String, String], topics: Map[String, Int], storageLevel: StorageLevel - ): DStream[(K, V)] = { + ): ReceiverInputDStream[(K, V)] = { new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel) } @@ -88,7 +88,7 @@ object KafkaUtils { zkQuorum: String, groupId: String, topics: JMap[String, JInt] - ): JavaPairDStream[String, String] = { + ): JavaPairReceiverInputDStream[String, String] = { implicit val cmt: ClassTag[String] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*)) @@ -110,7 +110,7 @@ object KafkaUtils { groupId: String, topics: JMap[String, JInt], storageLevel: StorageLevel - ): JavaPairDStream[String, String] = { + ): JavaPairReceiverInputDStream[String, String] = { implicit val cmt: ClassTag[String] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), @@ -139,7 +139,7 @@ object KafkaUtils { kafkaParams: JMap[String, String], topics: JMap[String, JInt], storageLevel: StorageLevel - ): JavaPairDStream[K, V] = { + ): JavaPairReceiverInputDStream[K, V] = { implicit val keyCmt: ClassTag[K] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] implicit val valueCmt: ClassTag[V] = diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java index 7b4999447e..9f8046bf00 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -18,12 +18,13 @@ package org.apache.spark.streaming.kafka; import java.util.HashMap; + +import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; import org.junit.Test; import com.google.common.collect.Maps; import kafka.serializer.StringDecoder; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.LocalJavaStreamingContext; -import org.apache.spark.streaming.api.java.JavaPairDStream; public class JavaKafkaStreamSuite extends LocalJavaStreamingContext { @Test @@ -31,14 +32,15 @@ public class JavaKafkaStreamSuite extends LocalJavaStreamingContext { HashMap<String, Integer> topics = Maps.newHashMap(); // tests the API, does not actually test data receiving - JavaPairDStream<String, String> test1 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics); - JavaPairDStream<String, String> test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics, + JavaPairReceiverInputDStream<String, String> test1 = + KafkaUtils.createStream(ssc, "localhost:12345", "group", topics); + JavaPairReceiverInputDStream<String, String> test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2()); HashMap<String, String> kafkaParams = Maps.newHashMap(); kafkaParams.put("zookeeper.connect", "localhost:12345"); kafkaParams.put("group.id","consumer-group"); - JavaPairDStream<String, String> test3 = KafkaUtils.createStream(ssc, + JavaPairReceiverInputDStream<String, String> test3 = KafkaUtils.createStream(ssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2()); } diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index d9809f6409..e6f2c4a5cf 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming.kafka import kafka.serializer.StringDecoder import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.dstream.ReceiverInputDStream class KafkaStreamSuite extends TestSuiteBase { @@ -28,10 +29,13 @@ class KafkaStreamSuite extends TestSuiteBase { val topics = Map("my-topic" -> 1) // tests the API, does not actually test data receiving - val test1 = KafkaUtils.createStream(ssc, "localhost:1234", "group", topics) - val test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2) + val test1: ReceiverInputDStream[(String, String)] = + KafkaUtils.createStream(ssc, "localhost:1234", "group", topics) + val test2: ReceiverInputDStream[(String, String)] = + KafkaUtils.createStream(ssc, "localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2) val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group") - val test3 = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( + val test3: ReceiverInputDStream[(String, String)] = + KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2) // TODO: Actually test receiving data diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala index 1204cfba39..0beee8b415 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala @@ -39,6 +39,7 @@ import org.apache.spark.Logging import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.receiver.Receiver /** * Input stream that subscribe messages from a Mqtt Broker. @@ -49,38 +50,36 @@ import org.apache.spark.streaming.dstream._ */ private[streaming] -class MQTTInputDStream[T: ClassTag]( +class MQTTInputDStream( @transient ssc_ : StreamingContext, brokerUrl: String, topic: String, storageLevel: StorageLevel - ) extends NetworkInputDStream[T](ssc_) with Logging { - - def getReceiver(): NetworkReceiver[T] = { - new MQTTReceiver(brokerUrl, topic, storageLevel).asInstanceOf[NetworkReceiver[T]] + ) extends ReceiverInputDStream[String](ssc_) with Logging { + + def getReceiver(): Receiver[String] = { + new MQTTReceiver(brokerUrl, topic, storageLevel) } } -private[streaming] -class MQTTReceiver(brokerUrl: String, - topic: String, - storageLevel: StorageLevel - ) extends NetworkReceiver[Any] { - lazy protected val blockGenerator = new BlockGenerator(storageLevel) +private[streaming] +class MQTTReceiver( + brokerUrl: String, + topic: String, + storageLevel: StorageLevel + ) extends Receiver[String](storageLevel) { def onStop() { - blockGenerator.stop() - } + } + def onStart() { - blockGenerator.start() - - // Set up persistence for messages - var peristance: MqttClientPersistence = new MemoryPersistence() + // Set up persistence for messages + val persistence = new MemoryPersistence() // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance - var client: MqttClient = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance) + val client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence) // Connect to MqttBroker client.connect() @@ -89,18 +88,18 @@ class MQTTReceiver(brokerUrl: String, client.subscribe(topic) // Callback automatically triggers as and when new message arrives on specified topic - var callback: MqttCallback = new MqttCallback() { + val callback: MqttCallback = new MqttCallback() { // Handles Mqtt message override def messageArrived(arg0: String, arg1: MqttMessage) { - blockGenerator += new String(arg1.getPayload()) + store(new String(arg1.getPayload())) } override def deliveryComplete(arg0: IMqttDeliveryToken) { } override def connectionLost(arg0: Throwable) { - logInfo("Connection lost " + arg0) + restart("Connection lost ", arg0) } } diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala index 1b09ee5dc8..c5ffe51f99 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala @@ -19,9 +19,9 @@ package org.apache.spark.streaming.mqtt import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream} +import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext, JavaDStream} import scala.reflect.ClassTag -import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream} object MQTTUtils { /** @@ -36,8 +36,8 @@ object MQTTUtils { brokerUrl: String, topic: String, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): DStream[String] = { - new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel) + ): ReceiverInputDStream[String] = { + new MQTTInputDStream(ssc, brokerUrl, topic, storageLevel) } /** @@ -51,7 +51,7 @@ object MQTTUtils { jssc: JavaStreamingContext, brokerUrl: String, topic: String - ): JavaDStream[String] = { + ): JavaReceiverInputDStream[String] = { implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] createStream(jssc.ssc, brokerUrl, topic) } @@ -68,7 +68,7 @@ object MQTTUtils { brokerUrl: String, topic: String, storageLevel: StorageLevel - ): JavaDStream[String] = { + ): JavaReceiverInputDStream[String] = { implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] createStream(jssc.ssc, brokerUrl, topic, storageLevel) } diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java index 44743aaecf..ce5aa1e0cd 100644 --- a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java +++ b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java @@ -18,7 +18,7 @@ package org.apache.spark.streaming.mqtt; import org.apache.spark.storage.StorageLevel; -import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.junit.Test; import org.apache.spark.streaming.LocalJavaStreamingContext; @@ -30,8 +30,8 @@ public class JavaMQTTStreamSuite extends LocalJavaStreamingContext { String topic = "def"; // tests the API, does not actually test data receiving - JavaDStream<String> test1 = MQTTUtils.createStream(ssc, brokerUrl, topic); - JavaDStream<String> test2 = MQTTUtils.createStream(ssc, brokerUrl, topic, + JavaReceiverInputDStream<String> test1 = MQTTUtils.createStream(ssc, brokerUrl, topic); + JavaReceiverInputDStream<String> test2 = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2()); } } diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala index 89c40ad461..467fd263e2 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.streaming.mqtt import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.dstream.ReceiverInputDStream class MQTTStreamSuite extends TestSuiteBase { @@ -28,8 +29,9 @@ class MQTTStreamSuite extends TestSuiteBase { val topic = "def" // tests the API, does not actually test data receiving - val test1 = MQTTUtils.createStream(ssc, brokerUrl, topic) - val test2 = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2) + val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic) + val test2: ReceiverInputDStream[String] = + MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2) // TODO: Actually test receiving data ssc.stop() diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala index 843a4a7a9a..7bca140711 100644 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala @@ -25,6 +25,8 @@ import twitter4j.auth.OAuthAuthorization import org.apache.spark.streaming._ import org.apache.spark.streaming.dstream._ import org.apache.spark.storage.StorageLevel +import org.apache.spark.Logging +import org.apache.spark.streaming.receiver.Receiver /* A stream of Twitter statuses, potentially filtered by one or more keywords. * @@ -41,7 +43,7 @@ class TwitterInputDStream( twitterAuth: Option[Authorization], filters: Seq[String], storageLevel: StorageLevel - ) extends NetworkInputDStream[Status](ssc_) { + ) extends ReceiverInputDStream[Status](ssc_) { private def createOAuthAuthorization(): Authorization = { new OAuthAuthorization(new ConfigurationBuilder().build()) @@ -49,7 +51,7 @@ class TwitterInputDStream( private val authorization = twitterAuth.getOrElse(createOAuthAuthorization()) - override def getReceiver(): NetworkReceiver[Status] = { + override def getReceiver(): Receiver[Status] = { new TwitterReceiver(authorization, filters, storageLevel) } } @@ -59,27 +61,27 @@ class TwitterReceiver( twitterAuth: Authorization, filters: Seq[String], storageLevel: StorageLevel - ) extends NetworkReceiver[Status] { + ) extends Receiver[Status](storageLevel) with Logging { var twitterStream: TwitterStream = _ - lazy val blockGenerator = new BlockGenerator(storageLevel) - protected override def onStart() { - blockGenerator.start() + def onStart() { twitterStream = new TwitterStreamFactory().getInstance(twitterAuth) twitterStream.addListener(new StatusListener { def onStatus(status: Status) = { - blockGenerator += status + store(status) } // Unimplemented def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {} def onTrackLimitationNotice(i: Int) {} def onScrubGeo(l: Long, l1: Long) {} def onStallWarning(stallWarning: StallWarning) {} - def onException(e: Exception) { stopOnError(e) } + def onException(e: Exception) { + restart("Error receiving tweets", e) + } }) - val query: FilterQuery = new FilterQuery + val query = new FilterQuery if (filters.size > 0) { query.track(filters.toArray) twitterStream.filter(query) @@ -89,8 +91,7 @@ class TwitterReceiver( logInfo("Twitter receiver started") } - protected override def onStop() { - blockGenerator.stop() + def onStop() { twitterStream.shutdown() logInfo("Twitter receiver stopped") } diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala index e8433b7e9f..c6a9a2b737 100644 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala @@ -21,8 +21,8 @@ import twitter4j.Status import twitter4j.auth.Authorization import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} -import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaDStream, JavaStreamingContext} +import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream} object TwitterUtils { /** @@ -40,7 +40,7 @@ object TwitterUtils { twitterAuth: Option[Authorization], filters: Seq[String] = Nil, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): DStream[Status] = { + ): ReceiverInputDStream[Status] = { new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel) } @@ -52,7 +52,7 @@ object TwitterUtils { * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. * @param jssc JavaStreamingContext object */ - def createStream(jssc: JavaStreamingContext): JavaDStream[Status] = { + def createStream(jssc: JavaStreamingContext): JavaReceiverInputDStream[Status] = { createStream(jssc.ssc, None) } @@ -65,7 +65,8 @@ object TwitterUtils { * @param jssc JavaStreamingContext object * @param filters Set of filter strings to get only those tweets that match them */ - def createStream(jssc: JavaStreamingContext, filters: Array[String]): JavaDStream[Status] = { + def createStream(jssc: JavaStreamingContext, filters: Array[String] + ): JavaReceiverInputDStream[Status] = { createStream(jssc.ssc, None, filters) } @@ -82,7 +83,7 @@ object TwitterUtils { jssc: JavaStreamingContext, filters: Array[String], storageLevel: StorageLevel - ): JavaDStream[Status] = { + ): JavaReceiverInputDStream[Status] = { createStream(jssc.ssc, None, filters, storageLevel) } @@ -92,7 +93,8 @@ object TwitterUtils { * @param jssc JavaStreamingContext object * @param twitterAuth Twitter4J Authorization */ - def createStream(jssc: JavaStreamingContext, twitterAuth: Authorization): JavaDStream[Status] = { + def createStream(jssc: JavaStreamingContext, twitterAuth: Authorization + ): JavaReceiverInputDStream[Status] = { createStream(jssc.ssc, Some(twitterAuth)) } @@ -107,7 +109,7 @@ object TwitterUtils { jssc: JavaStreamingContext, twitterAuth: Authorization, filters: Array[String] - ): JavaDStream[Status] = { + ): JavaReceiverInputDStream[Status] = { createStream(jssc.ssc, Some(twitterAuth), filters) } @@ -123,7 +125,7 @@ object TwitterUtils { twitterAuth: Authorization, filters: Array[String], storageLevel: StorageLevel - ): JavaDStream[Status] = { + ): JavaReceiverInputDStream[Status] = { createStream(jssc.ssc, Some(twitterAuth), filters, storageLevel) } } diff --git a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala index 06ab0cdaf3..93741e0375 100644 --- a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala +++ b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala @@ -20,6 +20,8 @@ package org.apache.spark.streaming.twitter import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} import org.apache.spark.storage.StorageLevel import twitter4j.auth.{NullAuthorization, Authorization} +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import twitter4j.Status class TwitterStreamSuite extends TestSuiteBase { @@ -29,13 +31,17 @@ class TwitterStreamSuite extends TestSuiteBase { val authorization: Authorization = NullAuthorization.getInstance() // tests the API, does not actually test data receiving - val test1 = TwitterUtils.createStream(ssc, None) - val test2 = TwitterUtils.createStream(ssc, None, filters) - val test3 = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_AND_DISK_SER_2) - val test4 = TwitterUtils.createStream(ssc, Some(authorization)) - val test5 = TwitterUtils.createStream(ssc, Some(authorization), filters) - val test6 = TwitterUtils.createStream(ssc, Some(authorization), filters, - StorageLevel.MEMORY_AND_DISK_SER_2) + val test1: ReceiverInputDStream[Status] = TwitterUtils.createStream(ssc, None) + val test2: ReceiverInputDStream[Status] = + TwitterUtils.createStream(ssc, None, filters) + val test3: ReceiverInputDStream[Status] = + TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_AND_DISK_SER_2) + val test4: ReceiverInputDStream[Status] = + TwitterUtils.createStream(ssc, Some(authorization)) + val test5: ReceiverInputDStream[Status] = + TwitterUtils.createStream(ssc, Some(authorization), filters) + val test6: ReceiverInputDStream[Status] = TwitterUtils.createStream( + ssc, Some(authorization), filters, StorageLevel.MEMORY_AND_DISK_SER_2) // Note that actually testing the data receiving is hard as authentication keys are // necessary for accessing Twitter live stream diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala index a538c38dc4..554705878e 100644 --- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala @@ -24,7 +24,7 @@ import akka.util.ByteString import akka.zeromq._ import org.apache.spark.Logging -import org.apache.spark.streaming.receivers._ +import org.apache.spark.streaming.receiver.ActorHelper /** * A receiver to subscribe to ZeroMQ stream. @@ -32,7 +32,7 @@ import org.apache.spark.streaming.receivers._ private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String, subscribe: Subscribe, bytesToObjects: Seq[ByteString] => Iterator[T]) - extends Actor with Receiver with Logging { + extends Actor with ActorHelper with Logging { override def preStart() = ZeroMQExtension(context.system) .newSocket(SocketType.Sub, Listener(self), Connect(publisherUrl), subscribe) @@ -46,9 +46,8 @@ private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String, // We ignore first frame for processing as it is the topic val bytes = m.frames.tail - pushBlock(bytesToObjects(bytes)) + store(bytesToObjects(bytes)) case Closed => logInfo("received closed ") - } } diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala index b254e00714..0469d0af88 100644 --- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala @@ -24,10 +24,10 @@ import akka.util.ByteString import akka.zeromq.Subscribe import org.apache.spark.api.java.function.{Function => JFunction} import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.receivers.ReceiverSupervisorStrategy import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream} -import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext} +import org.apache.spark.streaming.dstream.{ReceiverInputDStream} +import org.apache.spark.streaming.receiver.ActorSupervisorStrategy object ZeroMQUtils { /** @@ -48,8 +48,8 @@ object ZeroMQUtils { subscribe: Subscribe, bytesToObjects: Seq[ByteString] => Iterator[T], storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, - supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy - ): DStream[T] = { + supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy + ): ReceiverInputDStream[T] = { ssc.actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)), "ZeroMQReceiver", storageLevel, supervisorStrategy) } @@ -72,7 +72,7 @@ object ZeroMQUtils { bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], storageLevel: StorageLevel, supervisorStrategy: SupervisorStrategy - ): JavaDStream[T] = { + ): JavaReceiverInputDStream[T] = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).toIterator @@ -96,7 +96,7 @@ object ZeroMQUtils { subscribe: Subscribe, bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], storageLevel: StorageLevel - ): JavaDStream[T] = { + ): JavaReceiverInputDStream[T] = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).toIterator @@ -119,7 +119,7 @@ object ZeroMQUtils { publisherUrl: String, subscribe: Subscribe, bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]] - ): JavaDStream[T] = { + ): JavaReceiverInputDStream[T] = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).toIterator diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java index d2361e14b8..417b91eecb 100644 --- a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java +++ b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java @@ -17,6 +17,7 @@ package org.apache.spark.streaming.zeromq; +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.junit.Test; import akka.actor.SupervisorStrategy; import akka.util.ByteString; @@ -24,7 +25,6 @@ import akka.zeromq.Subscribe; import org.apache.spark.api.java.function.Function; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.LocalJavaStreamingContext; -import org.apache.spark.streaming.api.java.JavaDStream; public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext { @@ -39,11 +39,11 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext { } }; - JavaDStream<String> test1 = ZeroMQUtils.<String>createStream( + JavaReceiverInputDStream<String> test1 = ZeroMQUtils.<String>createStream( ssc, publishUrl, subscribe, bytesToObjects); - JavaDStream<String> test2 = ZeroMQUtils.<String>createStream( + JavaReceiverInputDStream<String> test2 = ZeroMQUtils.<String>createStream( ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2()); - JavaDStream<String> test3 = ZeroMQUtils.<String>createStream( + JavaReceiverInputDStream<String> test3 = ZeroMQUtils.<String>createStream( ssc,publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), SupervisorStrategy.defaultStrategy()); } diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala index 92d55a7a7b..cc10ff6ae0 100644 --- a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala +++ b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala @@ -23,6 +23,7 @@ import akka.zeromq.Subscribe import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} +import org.apache.spark.streaming.dstream.ReceiverInputDStream class ZeroMQStreamSuite extends TestSuiteBase { @@ -33,10 +34,12 @@ class ZeroMQStreamSuite extends TestSuiteBase { val bytesToObjects = (bytes: Seq[ByteString]) => null.asInstanceOf[Iterator[String]] // tests the API, does not actually test data receiving - val test1 = ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects) - val test2 = ZeroMQUtils.createStream( + val test1: ReceiverInputDStream[String] = + ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects) + val test2: ReceiverInputDStream[String] = ZeroMQUtils.createStream( ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2) - val test3 = ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects, + val test3: ReceiverInputDStream[String] = ZeroMQUtils.createStream( + ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy) // TODO: Actually test data receiving |