aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-04-21 19:04:49 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-21 19:04:49 -0700
commit04c37b6f749dc2418cc28c89964cdc687dfcbd51 (patch)
treeba434fee57cba6fe201e83ad9c049fded5e09bc0 /external
parent5a5b3346c79abb659260284fed0ace51942f3193 (diff)
downloadspark-04c37b6f749dc2418cc28c89964cdc687dfcbd51.tar.gz
spark-04c37b6f749dc2418cc28c89964cdc687dfcbd51.tar.bz2
spark-04c37b6f749dc2418cc28c89964cdc687dfcbd51.zip
[SPARK-1332] Improve Spark Streaming's Network Receiver and InputDStream API [WIP]
The current Network Receiver API makes it slightly complicated to right a new receiver as one needs to create an instance of BlockGenerator as shown in SocketReceiver https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala#L51 Exposing the BlockGenerator interface has made it harder to improve the receiving process. The API of NetworkReceiver (which was not a very stable API anyways) needs to be change if we are to ensure future stability. Additionally, the functions like streamingContext.socketStream that create input streams, return DStream objects. That makes it hard to expose functionality (say, rate limits) unique to input dstreams. They should return InputDStream or NetworkInputDStream. This is still not yet implemented. This PR is blocked on the graceful shutdown PR #247 Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #300 from tdas/network-receiver-api and squashes the following commits: ea27b38 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into network-receiver-api 3a4777c [Tathagata Das] Renamed NetworkInputDStream to ReceiverInputDStream, and ActorReceiver related stuff. 838dd39 [Tathagata Das] Added more events to the StreamingListener to report errors and stopped receivers. a75c7a6 [Tathagata Das] Address some PR comments and fixed other issues. 91bfa72 [Tathagata Das] Fixed bugs. 8533094 [Tathagata Das] Scala style fixes. 028bde6 [Tathagata Das] Further refactored receiver to allow restarting of a receiver. 43f5290 [Tathagata Das] Made functions that create input streams return InputDStream and NetworkInputDStream, for both Scala and Java. 2c94579 [Tathagata Das] Fixed graceful shutdown by removing interrupts on receiving thread. 9e37a0b [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into network-receiver-api 3223e95 [Tathagata Das] Refactored the code that runs the NetworkReceiver into further classes and traits to make them more testable. a36cc48 [Tathagata Das] Refactored the NetworkReceiver API for future stability.
Diffstat (limited to 'external')
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala28
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala10
-rw-r--r--external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java6
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala6
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala19
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala14
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java10
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala10
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala41
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala12
-rw-r--r--external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java6
-rw-r--r--external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala6
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala23
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala20
-rw-r--r--external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala20
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala7
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala16
-rw-r--r--external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java8
-rw-r--r--external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala9
19 files changed, 143 insertions, 128 deletions
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index 34012b846e..df7605fe57 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -34,6 +34,8 @@ import org.apache.spark.util.Utils
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
+import org.apache.spark.Logging
+import org.apache.spark.streaming.receiver.Receiver
private[streaming]
class FlumeInputDStream[T: ClassTag](
@@ -41,9 +43,9 @@ class FlumeInputDStream[T: ClassTag](
host: String,
port: Int,
storageLevel: StorageLevel
-) extends NetworkInputDStream[SparkFlumeEvent](ssc_) {
+) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) {
- override def getReceiver(): NetworkReceiver[SparkFlumeEvent] = {
+ override def getReceiver(): Receiver[SparkFlumeEvent] = {
new FlumeReceiver(host, port, storageLevel)
}
}
@@ -115,13 +117,13 @@ private[streaming] object SparkFlumeEvent {
private[streaming]
class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
override def append(event : AvroFlumeEvent) : Status = {
- receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event)
+ receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event))
Status.OK
}
override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = {
events.foreach (event =>
- receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event))
+ receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event)))
Status.OK
}
}
@@ -133,23 +135,21 @@ class FlumeReceiver(
host: String,
port: Int,
storageLevel: StorageLevel
- ) extends NetworkReceiver[SparkFlumeEvent] {
+ ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
- lazy val blockGenerator = new BlockGenerator(storageLevel)
+ lazy val responder = new SpecificResponder(
+ classOf[AvroSourceProtocol], new FlumeEventServer(this))
+ lazy val server = new NettyServer(responder, new InetSocketAddress(host, port))
- protected override def onStart() {
- val responder = new SpecificResponder(
- classOf[AvroSourceProtocol], new FlumeEventServer(this))
- val server = new NettyServer(responder, new InetSocketAddress(host, port))
- blockGenerator.start()
+ def onStart() {
server.start()
logInfo("Flume receiver started")
}
- protected override def onStop() {
- blockGenerator.stop()
+ def onStop() {
+ server.close()
logInfo("Flume receiver stopped")
}
- override def getLocationPreference = Some(host)
+ override def preferredLocation = Some(host)
}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
index 654ba451e7..499f3560ef 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
@@ -19,8 +19,8 @@ package org.apache.spark.streaming.flume
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
-import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaInputDStream, JavaStreamingContext, JavaDStream}
+import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
object FlumeUtils {
/**
@@ -35,7 +35,7 @@ object FlumeUtils {
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): DStream[SparkFlumeEvent] = {
+ ): ReceiverInputDStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel)
inputStream
}
@@ -50,7 +50,7 @@ object FlumeUtils {
jssc: JavaStreamingContext,
hostname: String,
port: Int
- ): JavaDStream[SparkFlumeEvent] = {
+ ): JavaReceiverInputDStream[SparkFlumeEvent] = {
createStream(jssc.ssc, hostname, port)
}
@@ -65,7 +65,7 @@ object FlumeUtils {
hostname: String,
port: Int,
storageLevel: StorageLevel
- ): JavaDStream[SparkFlumeEvent] = {
+ ): JavaReceiverInputDStream[SparkFlumeEvent] = {
createStream(jssc.ssc, hostname, port, storageLevel)
}
}
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
index 733389b98d..e0ad4f1015 100644
--- a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
+++ b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
@@ -19,16 +19,16 @@ package org.apache.spark.streaming.flume;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.LocalJavaStreamingContext;
-import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.junit.Test;
public class JavaFlumeStreamSuite extends LocalJavaStreamingContext {
@Test
public void testFlumeStream() {
// tests the API, does not actually test data receiving
- JavaDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
- JavaDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
+ JavaReceiverInputDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
+ JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
StorageLevel.MEMORY_AND_DISK_SER_2());
}
}
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index 8bc43972ab..78603200d2 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -31,6 +31,7 @@ import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase}
import org.apache.spark.streaming.util.ManualClock
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
class FlumeStreamSuite extends TestSuiteBase {
@@ -39,10 +40,11 @@ class FlumeStreamSuite extends TestSuiteBase {
test("flume input stream") {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
- val flumeStream = FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK)
+ val flumeStream: JavaReceiverInputDStream[SparkFlumeEvent] =
+ FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
- val outputStream = new TestOutputStream(flumeStream, outputBuffer)
+ val outputStream = new TestOutputStream(flumeStream.receiverInputDStream, outputBuffer)
outputStream.register()
ssc.start()
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
index c2d9dcbfaa..21443ebbbf 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -33,6 +33,7 @@ import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.receiver.Receiver
/**
* Input stream that pulls messages from a Kafka Broker.
@@ -53,11 +54,11 @@ class KafkaInputDStream[
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
- ) extends NetworkInputDStream[(K, V)](ssc_) with Logging {
+ ) extends ReceiverInputDStream[(K, V)](ssc_) with Logging {
- def getReceiver(): NetworkReceiver[(K, V)] = {
+ def getReceiver(): Receiver[(K, V)] = {
new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
- .asInstanceOf[NetworkReceiver[(K, V)]]
+ .asInstanceOf[Receiver[(K, V)]]
}
}
@@ -70,21 +71,15 @@ class KafkaReceiver[
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
- ) extends NetworkReceiver[Any] {
+ ) extends Receiver[Any](storageLevel) with Logging {
- // Handles pushing data into the BlockManager
- lazy protected val blockGenerator = new BlockGenerator(storageLevel)
// Connection to Kafka
var consumerConnector : ConsumerConnector = null
- def onStop() {
- blockGenerator.stop()
- }
+ def onStop() { }
def onStart() {
- blockGenerator.start()
-
// In case we are using multiple Threads to handle Kafka Messages
val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _))
@@ -130,7 +125,7 @@ class KafkaReceiver[
def run() {
logInfo("Starting MessageHandler.")
for (msgAndMetadata <- stream) {
- blockGenerator += (msgAndMetadata.key, msgAndMetadata.message)
+ store((msgAndMetadata.key, msgAndMetadata.message))
}
}
}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index 5472d0cd04..86bb91f362 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -27,8 +27,8 @@ import kafka.serializer.{Decoder, StringDecoder}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream}
-import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.api.java.{JavaPairReceiverInputDStream, JavaStreamingContext, JavaPairDStream}
+import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
object KafkaUtils {
@@ -48,7 +48,7 @@ object KafkaUtils {
groupId: String,
topics: Map[String, Int],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): DStream[(String, String)] = {
+ ): ReceiverInputDStream[(String, String)] = {
val kafkaParams = Map[String, String](
"zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
"zookeeper.connection.timeout.ms" -> "10000")
@@ -70,7 +70,7 @@ object KafkaUtils {
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
- ): DStream[(K, V)] = {
+ ): ReceiverInputDStream[(K, V)] = {
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel)
}
@@ -88,7 +88,7 @@ object KafkaUtils {
zkQuorum: String,
groupId: String,
topics: JMap[String, JInt]
- ): JavaPairDStream[String, String] = {
+ ): JavaPairReceiverInputDStream[String, String] = {
implicit val cmt: ClassTag[String] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
@@ -110,7 +110,7 @@ object KafkaUtils {
groupId: String,
topics: JMap[String, JInt],
storageLevel: StorageLevel
- ): JavaPairDStream[String, String] = {
+ ): JavaPairReceiverInputDStream[String, String] = {
implicit val cmt: ClassTag[String] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
@@ -139,7 +139,7 @@ object KafkaUtils {
kafkaParams: JMap[String, String],
topics: JMap[String, JInt],
storageLevel: StorageLevel
- ): JavaPairDStream[K, V] = {
+ ): JavaPairReceiverInputDStream[K, V] = {
implicit val keyCmt: ClassTag[K] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
implicit val valueCmt: ClassTag[V] =
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
index 7b4999447e..9f8046bf00 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -18,12 +18,13 @@
package org.apache.spark.streaming.kafka;
import java.util.HashMap;
+
+import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.junit.Test;
import com.google.common.collect.Maps;
import kafka.serializer.StringDecoder;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.LocalJavaStreamingContext;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
public class JavaKafkaStreamSuite extends LocalJavaStreamingContext {
@Test
@@ -31,14 +32,15 @@ public class JavaKafkaStreamSuite extends LocalJavaStreamingContext {
HashMap<String, Integer> topics = Maps.newHashMap();
// tests the API, does not actually test data receiving
- JavaPairDStream<String, String> test1 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics);
- JavaPairDStream<String, String> test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics,
+ JavaPairReceiverInputDStream<String, String> test1 =
+ KafkaUtils.createStream(ssc, "localhost:12345", "group", topics);
+ JavaPairReceiverInputDStream<String, String> test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics,
StorageLevel.MEMORY_AND_DISK_SER_2());
HashMap<String, String> kafkaParams = Maps.newHashMap();
kafkaParams.put("zookeeper.connect", "localhost:12345");
kafkaParams.put("group.id","consumer-group");
- JavaPairDStream<String, String> test3 = KafkaUtils.createStream(ssc,
+ JavaPairReceiverInputDStream<String, String> test3 = KafkaUtils.createStream(ssc,
String.class, String.class, StringDecoder.class, StringDecoder.class,
kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2());
}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
index d9809f6409..e6f2c4a5cf 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.kafka
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
class KafkaStreamSuite extends TestSuiteBase {
@@ -28,10 +29,13 @@ class KafkaStreamSuite extends TestSuiteBase {
val topics = Map("my-topic" -> 1)
// tests the API, does not actually test data receiving
- val test1 = KafkaUtils.createStream(ssc, "localhost:1234", "group", topics)
- val test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test1: ReceiverInputDStream[(String, String)] =
+ KafkaUtils.createStream(ssc, "localhost:1234", "group", topics)
+ val test2: ReceiverInputDStream[(String, String)] =
+ KafkaUtils.createStream(ssc, "localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2)
val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group")
- val test3 = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
+ val test3: ReceiverInputDStream[(String, String)] =
+ KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2)
// TODO: Actually test receiving data
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
index 1204cfba39..0beee8b415 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
@@ -39,6 +39,7 @@ import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.receiver.Receiver
/**
* Input stream that subscribe messages from a Mqtt Broker.
@@ -49,38 +50,36 @@ import org.apache.spark.streaming.dstream._
*/
private[streaming]
-class MQTTInputDStream[T: ClassTag](
+class MQTTInputDStream(
@transient ssc_ : StreamingContext,
brokerUrl: String,
topic: String,
storageLevel: StorageLevel
- ) extends NetworkInputDStream[T](ssc_) with Logging {
-
- def getReceiver(): NetworkReceiver[T] = {
- new MQTTReceiver(brokerUrl, topic, storageLevel).asInstanceOf[NetworkReceiver[T]]
+ ) extends ReceiverInputDStream[String](ssc_) with Logging {
+
+ def getReceiver(): Receiver[String] = {
+ new MQTTReceiver(brokerUrl, topic, storageLevel)
}
}
-private[streaming]
-class MQTTReceiver(brokerUrl: String,
- topic: String,
- storageLevel: StorageLevel
- ) extends NetworkReceiver[Any] {
- lazy protected val blockGenerator = new BlockGenerator(storageLevel)
+private[streaming]
+class MQTTReceiver(
+ brokerUrl: String,
+ topic: String,
+ storageLevel: StorageLevel
+ ) extends Receiver[String](storageLevel) {
def onStop() {
- blockGenerator.stop()
- }
+ }
+
def onStart() {
- blockGenerator.start()
-
- // Set up persistence for messages
- var peristance: MqttClientPersistence = new MemoryPersistence()
+ // Set up persistence for messages
+ val persistence = new MemoryPersistence()
// Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
- var client: MqttClient = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance)
+ val client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence)
// Connect to MqttBroker
client.connect()
@@ -89,18 +88,18 @@ class MQTTReceiver(brokerUrl: String,
client.subscribe(topic)
// Callback automatically triggers as and when new message arrives on specified topic
- var callback: MqttCallback = new MqttCallback() {
+ val callback: MqttCallback = new MqttCallback() {
// Handles Mqtt message
override def messageArrived(arg0: String, arg1: MqttMessage) {
- blockGenerator += new String(arg1.getPayload())
+ store(new String(arg1.getPayload()))
}
override def deliveryComplete(arg0: IMqttDeliveryToken) {
}
override def connectionLost(arg0: Throwable) {
- logInfo("Connection lost " + arg0)
+ restart("Connection lost ", arg0)
}
}
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
index 1b09ee5dc8..c5ffe51f99 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
@@ -19,9 +19,9 @@ package org.apache.spark.streaming.mqtt
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
+import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext, JavaDStream}
import scala.reflect.ClassTag
-import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
object MQTTUtils {
/**
@@ -36,8 +36,8 @@ object MQTTUtils {
brokerUrl: String,
topic: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): DStream[String] = {
- new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel)
+ ): ReceiverInputDStream[String] = {
+ new MQTTInputDStream(ssc, brokerUrl, topic, storageLevel)
}
/**
@@ -51,7 +51,7 @@ object MQTTUtils {
jssc: JavaStreamingContext,
brokerUrl: String,
topic: String
- ): JavaDStream[String] = {
+ ): JavaReceiverInputDStream[String] = {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createStream(jssc.ssc, brokerUrl, topic)
}
@@ -68,7 +68,7 @@ object MQTTUtils {
brokerUrl: String,
topic: String,
storageLevel: StorageLevel
- ): JavaDStream[String] = {
+ ): JavaReceiverInputDStream[String] = {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createStream(jssc.ssc, brokerUrl, topic, storageLevel)
}
diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
index 44743aaecf..ce5aa1e0cd 100644
--- a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
+++ b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
@@ -18,7 +18,7 @@
package org.apache.spark.streaming.mqtt;
import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.junit.Test;
import org.apache.spark.streaming.LocalJavaStreamingContext;
@@ -30,8 +30,8 @@ public class JavaMQTTStreamSuite extends LocalJavaStreamingContext {
String topic = "def";
// tests the API, does not actually test data receiving
- JavaDStream<String> test1 = MQTTUtils.createStream(ssc, brokerUrl, topic);
- JavaDStream<String> test2 = MQTTUtils.createStream(ssc, brokerUrl, topic,
+ JavaReceiverInputDStream<String> test1 = MQTTUtils.createStream(ssc, brokerUrl, topic);
+ JavaReceiverInputDStream<String> test2 = MQTTUtils.createStream(ssc, brokerUrl, topic,
StorageLevel.MEMORY_AND_DISK_SER_2());
}
}
diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
index 89c40ad461..467fd263e2 100644
--- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.streaming.mqtt
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
class MQTTStreamSuite extends TestSuiteBase {
@@ -28,8 +29,9 @@ class MQTTStreamSuite extends TestSuiteBase {
val topic = "def"
// tests the API, does not actually test data receiving
- val test1 = MQTTUtils.createStream(ssc, brokerUrl, topic)
- val test2 = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic)
+ val test2: ReceiverInputDStream[String] =
+ MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)
// TODO: Actually test receiving data
ssc.stop()
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
index 843a4a7a9a..7bca140711 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
@@ -25,6 +25,8 @@ import twitter4j.auth.OAuthAuthorization
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.Logging
+import org.apache.spark.streaming.receiver.Receiver
/* A stream of Twitter statuses, potentially filtered by one or more keywords.
*
@@ -41,7 +43,7 @@ class TwitterInputDStream(
twitterAuth: Option[Authorization],
filters: Seq[String],
storageLevel: StorageLevel
- ) extends NetworkInputDStream[Status](ssc_) {
+ ) extends ReceiverInputDStream[Status](ssc_) {
private def createOAuthAuthorization(): Authorization = {
new OAuthAuthorization(new ConfigurationBuilder().build())
@@ -49,7 +51,7 @@ class TwitterInputDStream(
private val authorization = twitterAuth.getOrElse(createOAuthAuthorization())
- override def getReceiver(): NetworkReceiver[Status] = {
+ override def getReceiver(): Receiver[Status] = {
new TwitterReceiver(authorization, filters, storageLevel)
}
}
@@ -59,27 +61,27 @@ class TwitterReceiver(
twitterAuth: Authorization,
filters: Seq[String],
storageLevel: StorageLevel
- ) extends NetworkReceiver[Status] {
+ ) extends Receiver[Status](storageLevel) with Logging {
var twitterStream: TwitterStream = _
- lazy val blockGenerator = new BlockGenerator(storageLevel)
- protected override def onStart() {
- blockGenerator.start()
+ def onStart() {
twitterStream = new TwitterStreamFactory().getInstance(twitterAuth)
twitterStream.addListener(new StatusListener {
def onStatus(status: Status) = {
- blockGenerator += status
+ store(status)
}
// Unimplemented
def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}
def onTrackLimitationNotice(i: Int) {}
def onScrubGeo(l: Long, l1: Long) {}
def onStallWarning(stallWarning: StallWarning) {}
- def onException(e: Exception) { stopOnError(e) }
+ def onException(e: Exception) {
+ restart("Error receiving tweets", e)
+ }
})
- val query: FilterQuery = new FilterQuery
+ val query = new FilterQuery
if (filters.size > 0) {
query.track(filters.toArray)
twitterStream.filter(query)
@@ -89,8 +91,7 @@ class TwitterReceiver(
logInfo("Twitter receiver started")
}
- protected override def onStop() {
- blockGenerator.stop()
+ def onStop() {
twitterStream.shutdown()
logInfo("Twitter receiver stopped")
}
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
index e8433b7e9f..c6a9a2b737 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
@@ -21,8 +21,8 @@ import twitter4j.Status
import twitter4j.auth.Authorization
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
-import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaDStream, JavaStreamingContext}
+import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
object TwitterUtils {
/**
@@ -40,7 +40,7 @@ object TwitterUtils {
twitterAuth: Option[Authorization],
filters: Seq[String] = Nil,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): DStream[Status] = {
+ ): ReceiverInputDStream[Status] = {
new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel)
}
@@ -52,7 +52,7 @@ object TwitterUtils {
* Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
* @param jssc JavaStreamingContext object
*/
- def createStream(jssc: JavaStreamingContext): JavaDStream[Status] = {
+ def createStream(jssc: JavaStreamingContext): JavaReceiverInputDStream[Status] = {
createStream(jssc.ssc, None)
}
@@ -65,7 +65,8 @@ object TwitterUtils {
* @param jssc JavaStreamingContext object
* @param filters Set of filter strings to get only those tweets that match them
*/
- def createStream(jssc: JavaStreamingContext, filters: Array[String]): JavaDStream[Status] = {
+ def createStream(jssc: JavaStreamingContext, filters: Array[String]
+ ): JavaReceiverInputDStream[Status] = {
createStream(jssc.ssc, None, filters)
}
@@ -82,7 +83,7 @@ object TwitterUtils {
jssc: JavaStreamingContext,
filters: Array[String],
storageLevel: StorageLevel
- ): JavaDStream[Status] = {
+ ): JavaReceiverInputDStream[Status] = {
createStream(jssc.ssc, None, filters, storageLevel)
}
@@ -92,7 +93,8 @@ object TwitterUtils {
* @param jssc JavaStreamingContext object
* @param twitterAuth Twitter4J Authorization
*/
- def createStream(jssc: JavaStreamingContext, twitterAuth: Authorization): JavaDStream[Status] = {
+ def createStream(jssc: JavaStreamingContext, twitterAuth: Authorization
+ ): JavaReceiverInputDStream[Status] = {
createStream(jssc.ssc, Some(twitterAuth))
}
@@ -107,7 +109,7 @@ object TwitterUtils {
jssc: JavaStreamingContext,
twitterAuth: Authorization,
filters: Array[String]
- ): JavaDStream[Status] = {
+ ): JavaReceiverInputDStream[Status] = {
createStream(jssc.ssc, Some(twitterAuth), filters)
}
@@ -123,7 +125,7 @@ object TwitterUtils {
twitterAuth: Authorization,
filters: Array[String],
storageLevel: StorageLevel
- ): JavaDStream[Status] = {
+ ): JavaReceiverInputDStream[Status] = {
createStream(jssc.ssc, Some(twitterAuth), filters, storageLevel)
}
}
diff --git a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
index 06ab0cdaf3..93741e0375 100644
--- a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
+++ b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
@@ -20,6 +20,8 @@ package org.apache.spark.streaming.twitter
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
import org.apache.spark.storage.StorageLevel
import twitter4j.auth.{NullAuthorization, Authorization}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import twitter4j.Status
class TwitterStreamSuite extends TestSuiteBase {
@@ -29,13 +31,17 @@ class TwitterStreamSuite extends TestSuiteBase {
val authorization: Authorization = NullAuthorization.getInstance()
// tests the API, does not actually test data receiving
- val test1 = TwitterUtils.createStream(ssc, None)
- val test2 = TwitterUtils.createStream(ssc, None, filters)
- val test3 = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_AND_DISK_SER_2)
- val test4 = TwitterUtils.createStream(ssc, Some(authorization))
- val test5 = TwitterUtils.createStream(ssc, Some(authorization), filters)
- val test6 = TwitterUtils.createStream(ssc, Some(authorization), filters,
- StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test1: ReceiverInputDStream[Status] = TwitterUtils.createStream(ssc, None)
+ val test2: ReceiverInputDStream[Status] =
+ TwitterUtils.createStream(ssc, None, filters)
+ val test3: ReceiverInputDStream[Status] =
+ TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test4: ReceiverInputDStream[Status] =
+ TwitterUtils.createStream(ssc, Some(authorization))
+ val test5: ReceiverInputDStream[Status] =
+ TwitterUtils.createStream(ssc, Some(authorization), filters)
+ val test6: ReceiverInputDStream[Status] = TwitterUtils.createStream(
+ ssc, Some(authorization), filters, StorageLevel.MEMORY_AND_DISK_SER_2)
// Note that actually testing the data receiving is hard as authentication keys are
// necessary for accessing Twitter live stream
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
index a538c38dc4..554705878e 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
@@ -24,7 +24,7 @@ import akka.util.ByteString
import akka.zeromq._
import org.apache.spark.Logging
-import org.apache.spark.streaming.receivers._
+import org.apache.spark.streaming.receiver.ActorHelper
/**
* A receiver to subscribe to ZeroMQ stream.
@@ -32,7 +32,7 @@ import org.apache.spark.streaming.receivers._
private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
subscribe: Subscribe,
bytesToObjects: Seq[ByteString] => Iterator[T])
- extends Actor with Receiver with Logging {
+ extends Actor with ActorHelper with Logging {
override def preStart() = ZeroMQExtension(context.system)
.newSocket(SocketType.Sub, Listener(self), Connect(publisherUrl), subscribe)
@@ -46,9 +46,8 @@ private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
// We ignore first frame for processing as it is the topic
val bytes = m.frames.tail
- pushBlock(bytesToObjects(bytes))
+ store(bytesToObjects(bytes))
case Closed => logInfo("received closed ")
-
}
}
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
index b254e00714..0469d0af88 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
@@ -24,10 +24,10 @@ import akka.util.ByteString
import akka.zeromq.Subscribe
import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.receivers.ReceiverSupervisorStrategy
import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
-import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
+import org.apache.spark.streaming.dstream.{ReceiverInputDStream}
+import org.apache.spark.streaming.receiver.ActorSupervisorStrategy
object ZeroMQUtils {
/**
@@ -48,8 +48,8 @@ object ZeroMQUtils {
subscribe: Subscribe,
bytesToObjects: Seq[ByteString] => Iterator[T],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
- supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
- ): DStream[T] = {
+ supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy
+ ): ReceiverInputDStream[T] = {
ssc.actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
"ZeroMQReceiver", storageLevel, supervisorStrategy)
}
@@ -72,7 +72,7 @@ object ZeroMQUtils {
bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
storageLevel: StorageLevel,
supervisorStrategy: SupervisorStrategy
- ): JavaDStream[T] = {
+ ): JavaReceiverInputDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).toIterator
@@ -96,7 +96,7 @@ object ZeroMQUtils {
subscribe: Subscribe,
bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
storageLevel: StorageLevel
- ): JavaDStream[T] = {
+ ): JavaReceiverInputDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).toIterator
@@ -119,7 +119,7 @@ object ZeroMQUtils {
publisherUrl: String,
subscribe: Subscribe,
bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]]
- ): JavaDStream[T] = {
+ ): JavaReceiverInputDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).toIterator
diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
index d2361e14b8..417b91eecb 100644
--- a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
+++ b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
@@ -17,6 +17,7 @@
package org.apache.spark.streaming.zeromq;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.junit.Test;
import akka.actor.SupervisorStrategy;
import akka.util.ByteString;
@@ -24,7 +25,6 @@ import akka.zeromq.Subscribe;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.LocalJavaStreamingContext;
-import org.apache.spark.streaming.api.java.JavaDStream;
public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
@@ -39,11 +39,11 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
}
};
- JavaDStream<String> test1 = ZeroMQUtils.<String>createStream(
+ JavaReceiverInputDStream<String> test1 = ZeroMQUtils.<String>createStream(
ssc, publishUrl, subscribe, bytesToObjects);
- JavaDStream<String> test2 = ZeroMQUtils.<String>createStream(
+ JavaReceiverInputDStream<String> test2 = ZeroMQUtils.<String>createStream(
ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2());
- JavaDStream<String> test3 = ZeroMQUtils.<String>createStream(
+ JavaReceiverInputDStream<String> test3 = ZeroMQUtils.<String>createStream(
ssc,publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(),
SupervisorStrategy.defaultStrategy());
}
diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
index 92d55a7a7b..cc10ff6ae0 100644
--- a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
+++ b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
@@ -23,6 +23,7 @@ import akka.zeromq.Subscribe
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
class ZeroMQStreamSuite extends TestSuiteBase {
@@ -33,10 +34,12 @@ class ZeroMQStreamSuite extends TestSuiteBase {
val bytesToObjects = (bytes: Seq[ByteString]) => null.asInstanceOf[Iterator[String]]
// tests the API, does not actually test data receiving
- val test1 = ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects)
- val test2 = ZeroMQUtils.createStream(
+ val test1: ReceiverInputDStream[String] =
+ ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects)
+ val test2: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2)
- val test3 = ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects,
+ val test3: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
+ ssc, publishUrl, subscribe, bytesToObjects,
StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy)
// TODO: Actually test data receiving