aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
Diffstat (limited to 'external')
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala28
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala10
-rw-r--r--external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java6
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala6
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala19
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala14
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java10
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala10
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala41
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala12
-rw-r--r--external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java6
-rw-r--r--external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala6
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala23
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala20
-rw-r--r--external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala20
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala7
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala16
-rw-r--r--external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java8
-rw-r--r--external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala9
19 files changed, 143 insertions, 128 deletions
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index 34012b846e..df7605fe57 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -34,6 +34,8 @@ import org.apache.spark.util.Utils
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
+import org.apache.spark.Logging
+import org.apache.spark.streaming.receiver.Receiver
private[streaming]
class FlumeInputDStream[T: ClassTag](
@@ -41,9 +43,9 @@ class FlumeInputDStream[T: ClassTag](
host: String,
port: Int,
storageLevel: StorageLevel
-) extends NetworkInputDStream[SparkFlumeEvent](ssc_) {
+) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) {
- override def getReceiver(): NetworkReceiver[SparkFlumeEvent] = {
+ override def getReceiver(): Receiver[SparkFlumeEvent] = {
new FlumeReceiver(host, port, storageLevel)
}
}
@@ -115,13 +117,13 @@ private[streaming] object SparkFlumeEvent {
private[streaming]
class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
override def append(event : AvroFlumeEvent) : Status = {
- receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event)
+ receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event))
Status.OK
}
override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = {
events.foreach (event =>
- receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event))
+ receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event)))
Status.OK
}
}
@@ -133,23 +135,21 @@ class FlumeReceiver(
host: String,
port: Int,
storageLevel: StorageLevel
- ) extends NetworkReceiver[SparkFlumeEvent] {
+ ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
- lazy val blockGenerator = new BlockGenerator(storageLevel)
+ lazy val responder = new SpecificResponder(
+ classOf[AvroSourceProtocol], new FlumeEventServer(this))
+ lazy val server = new NettyServer(responder, new InetSocketAddress(host, port))
- protected override def onStart() {
- val responder = new SpecificResponder(
- classOf[AvroSourceProtocol], new FlumeEventServer(this))
- val server = new NettyServer(responder, new InetSocketAddress(host, port))
- blockGenerator.start()
+ def onStart() {
server.start()
logInfo("Flume receiver started")
}
- protected override def onStop() {
- blockGenerator.stop()
+ def onStop() {
+ server.close()
logInfo("Flume receiver stopped")
}
- override def getLocationPreference = Some(host)
+ override def preferredLocation = Some(host)
}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
index 654ba451e7..499f3560ef 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
@@ -19,8 +19,8 @@ package org.apache.spark.streaming.flume
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
-import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaInputDStream, JavaStreamingContext, JavaDStream}
+import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
object FlumeUtils {
/**
@@ -35,7 +35,7 @@ object FlumeUtils {
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): DStream[SparkFlumeEvent] = {
+ ): ReceiverInputDStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel)
inputStream
}
@@ -50,7 +50,7 @@ object FlumeUtils {
jssc: JavaStreamingContext,
hostname: String,
port: Int
- ): JavaDStream[SparkFlumeEvent] = {
+ ): JavaReceiverInputDStream[SparkFlumeEvent] = {
createStream(jssc.ssc, hostname, port)
}
@@ -65,7 +65,7 @@ object FlumeUtils {
hostname: String,
port: Int,
storageLevel: StorageLevel
- ): JavaDStream[SparkFlumeEvent] = {
+ ): JavaReceiverInputDStream[SparkFlumeEvent] = {
createStream(jssc.ssc, hostname, port, storageLevel)
}
}
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
index 733389b98d..e0ad4f1015 100644
--- a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
+++ b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
@@ -19,16 +19,16 @@ package org.apache.spark.streaming.flume;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.LocalJavaStreamingContext;
-import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.junit.Test;
public class JavaFlumeStreamSuite extends LocalJavaStreamingContext {
@Test
public void testFlumeStream() {
// tests the API, does not actually test data receiving
- JavaDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
- JavaDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
+ JavaReceiverInputDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
+ JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
StorageLevel.MEMORY_AND_DISK_SER_2());
}
}
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index 8bc43972ab..78603200d2 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -31,6 +31,7 @@ import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase}
import org.apache.spark.streaming.util.ManualClock
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
class FlumeStreamSuite extends TestSuiteBase {
@@ -39,10 +40,11 @@ class FlumeStreamSuite extends TestSuiteBase {
test("flume input stream") {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
- val flumeStream = FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK)
+ val flumeStream: JavaReceiverInputDStream[SparkFlumeEvent] =
+ FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
- val outputStream = new TestOutputStream(flumeStream, outputBuffer)
+ val outputStream = new TestOutputStream(flumeStream.receiverInputDStream, outputBuffer)
outputStream.register()
ssc.start()
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
index c2d9dcbfaa..21443ebbbf 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -33,6 +33,7 @@ import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.receiver.Receiver
/**
* Input stream that pulls messages from a Kafka Broker.
@@ -53,11 +54,11 @@ class KafkaInputDStream[
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
- ) extends NetworkInputDStream[(K, V)](ssc_) with Logging {
+ ) extends ReceiverInputDStream[(K, V)](ssc_) with Logging {
- def getReceiver(): NetworkReceiver[(K, V)] = {
+ def getReceiver(): Receiver[(K, V)] = {
new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
- .asInstanceOf[NetworkReceiver[(K, V)]]
+ .asInstanceOf[Receiver[(K, V)]]
}
}
@@ -70,21 +71,15 @@ class KafkaReceiver[
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
- ) extends NetworkReceiver[Any] {
+ ) extends Receiver[Any](storageLevel) with Logging {
- // Handles pushing data into the BlockManager
- lazy protected val blockGenerator = new BlockGenerator(storageLevel)
// Connection to Kafka
var consumerConnector : ConsumerConnector = null
- def onStop() {
- blockGenerator.stop()
- }
+ def onStop() { }
def onStart() {
- blockGenerator.start()
-
// In case we are using multiple Threads to handle Kafka Messages
val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _))
@@ -130,7 +125,7 @@ class KafkaReceiver[
def run() {
logInfo("Starting MessageHandler.")
for (msgAndMetadata <- stream) {
- blockGenerator += (msgAndMetadata.key, msgAndMetadata.message)
+ store((msgAndMetadata.key, msgAndMetadata.message))
}
}
}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index 5472d0cd04..86bb91f362 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -27,8 +27,8 @@ import kafka.serializer.{Decoder, StringDecoder}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream}
-import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.api.java.{JavaPairReceiverInputDStream, JavaStreamingContext, JavaPairDStream}
+import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
object KafkaUtils {
@@ -48,7 +48,7 @@ object KafkaUtils {
groupId: String,
topics: Map[String, Int],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): DStream[(String, String)] = {
+ ): ReceiverInputDStream[(String, String)] = {
val kafkaParams = Map[String, String](
"zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
"zookeeper.connection.timeout.ms" -> "10000")
@@ -70,7 +70,7 @@ object KafkaUtils {
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
- ): DStream[(K, V)] = {
+ ): ReceiverInputDStream[(K, V)] = {
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel)
}
@@ -88,7 +88,7 @@ object KafkaUtils {
zkQuorum: String,
groupId: String,
topics: JMap[String, JInt]
- ): JavaPairDStream[String, String] = {
+ ): JavaPairReceiverInputDStream[String, String] = {
implicit val cmt: ClassTag[String] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
@@ -110,7 +110,7 @@ object KafkaUtils {
groupId: String,
topics: JMap[String, JInt],
storageLevel: StorageLevel
- ): JavaPairDStream[String, String] = {
+ ): JavaPairReceiverInputDStream[String, String] = {
implicit val cmt: ClassTag[String] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
@@ -139,7 +139,7 @@ object KafkaUtils {
kafkaParams: JMap[String, String],
topics: JMap[String, JInt],
storageLevel: StorageLevel
- ): JavaPairDStream[K, V] = {
+ ): JavaPairReceiverInputDStream[K, V] = {
implicit val keyCmt: ClassTag[K] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
implicit val valueCmt: ClassTag[V] =
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
index 7b4999447e..9f8046bf00 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -18,12 +18,13 @@
package org.apache.spark.streaming.kafka;
import java.util.HashMap;
+
+import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.junit.Test;
import com.google.common.collect.Maps;
import kafka.serializer.StringDecoder;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.LocalJavaStreamingContext;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
public class JavaKafkaStreamSuite extends LocalJavaStreamingContext {
@Test
@@ -31,14 +32,15 @@ public class JavaKafkaStreamSuite extends LocalJavaStreamingContext {
HashMap<String, Integer> topics = Maps.newHashMap();
// tests the API, does not actually test data receiving
- JavaPairDStream<String, String> test1 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics);
- JavaPairDStream<String, String> test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics,
+ JavaPairReceiverInputDStream<String, String> test1 =
+ KafkaUtils.createStream(ssc, "localhost:12345", "group", topics);
+ JavaPairReceiverInputDStream<String, String> test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics,
StorageLevel.MEMORY_AND_DISK_SER_2());
HashMap<String, String> kafkaParams = Maps.newHashMap();
kafkaParams.put("zookeeper.connect", "localhost:12345");
kafkaParams.put("group.id","consumer-group");
- JavaPairDStream<String, String> test3 = KafkaUtils.createStream(ssc,
+ JavaPairReceiverInputDStream<String, String> test3 = KafkaUtils.createStream(ssc,
String.class, String.class, StringDecoder.class, StringDecoder.class,
kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2());
}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
index d9809f6409..e6f2c4a5cf 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.kafka
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
class KafkaStreamSuite extends TestSuiteBase {
@@ -28,10 +29,13 @@ class KafkaStreamSuite extends TestSuiteBase {
val topics = Map("my-topic" -> 1)
// tests the API, does not actually test data receiving
- val test1 = KafkaUtils.createStream(ssc, "localhost:1234", "group", topics)
- val test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test1: ReceiverInputDStream[(String, String)] =
+ KafkaUtils.createStream(ssc, "localhost:1234", "group", topics)
+ val test2: ReceiverInputDStream[(String, String)] =
+ KafkaUtils.createStream(ssc, "localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2)
val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group")
- val test3 = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
+ val test3: ReceiverInputDStream[(String, String)] =
+ KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2)
// TODO: Actually test receiving data
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
index 1204cfba39..0beee8b415 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
@@ -39,6 +39,7 @@ import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.receiver.Receiver
/**
* Input stream that subscribe messages from a Mqtt Broker.
@@ -49,38 +50,36 @@ import org.apache.spark.streaming.dstream._
*/
private[streaming]
-class MQTTInputDStream[T: ClassTag](
+class MQTTInputDStream(
@transient ssc_ : StreamingContext,
brokerUrl: String,
topic: String,
storageLevel: StorageLevel
- ) extends NetworkInputDStream[T](ssc_) with Logging {
-
- def getReceiver(): NetworkReceiver[T] = {
- new MQTTReceiver(brokerUrl, topic, storageLevel).asInstanceOf[NetworkReceiver[T]]
+ ) extends ReceiverInputDStream[String](ssc_) with Logging {
+
+ def getReceiver(): Receiver[String] = {
+ new MQTTReceiver(brokerUrl, topic, storageLevel)
}
}
-private[streaming]
-class MQTTReceiver(brokerUrl: String,
- topic: String,
- storageLevel: StorageLevel
- ) extends NetworkReceiver[Any] {
- lazy protected val blockGenerator = new BlockGenerator(storageLevel)
+private[streaming]
+class MQTTReceiver(
+ brokerUrl: String,
+ topic: String,
+ storageLevel: StorageLevel
+ ) extends Receiver[String](storageLevel) {
def onStop() {
- blockGenerator.stop()
- }
+ }
+
def onStart() {
- blockGenerator.start()
-
- // Set up persistence for messages
- var peristance: MqttClientPersistence = new MemoryPersistence()
+ // Set up persistence for messages
+ val persistence = new MemoryPersistence()
// Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
- var client: MqttClient = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance)
+ val client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence)
// Connect to MqttBroker
client.connect()
@@ -89,18 +88,18 @@ class MQTTReceiver(brokerUrl: String,
client.subscribe(topic)
// Callback automatically triggers as and when new message arrives on specified topic
- var callback: MqttCallback = new MqttCallback() {
+ val callback: MqttCallback = new MqttCallback() {
// Handles Mqtt message
override def messageArrived(arg0: String, arg1: MqttMessage) {
- blockGenerator += new String(arg1.getPayload())
+ store(new String(arg1.getPayload()))
}
override def deliveryComplete(arg0: IMqttDeliveryToken) {
}
override def connectionLost(arg0: Throwable) {
- logInfo("Connection lost " + arg0)
+ restart("Connection lost ", arg0)
}
}
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
index 1b09ee5dc8..c5ffe51f99 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
@@ -19,9 +19,9 @@ package org.apache.spark.streaming.mqtt
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
+import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext, JavaDStream}
import scala.reflect.ClassTag
-import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
object MQTTUtils {
/**
@@ -36,8 +36,8 @@ object MQTTUtils {
brokerUrl: String,
topic: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): DStream[String] = {
- new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel)
+ ): ReceiverInputDStream[String] = {
+ new MQTTInputDStream(ssc, brokerUrl, topic, storageLevel)
}
/**
@@ -51,7 +51,7 @@ object MQTTUtils {
jssc: JavaStreamingContext,
brokerUrl: String,
topic: String
- ): JavaDStream[String] = {
+ ): JavaReceiverInputDStream[String] = {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createStream(jssc.ssc, brokerUrl, topic)
}
@@ -68,7 +68,7 @@ object MQTTUtils {
brokerUrl: String,
topic: String,
storageLevel: StorageLevel
- ): JavaDStream[String] = {
+ ): JavaReceiverInputDStream[String] = {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createStream(jssc.ssc, brokerUrl, topic, storageLevel)
}
diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
index 44743aaecf..ce5aa1e0cd 100644
--- a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
+++ b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
@@ -18,7 +18,7 @@
package org.apache.spark.streaming.mqtt;
import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.junit.Test;
import org.apache.spark.streaming.LocalJavaStreamingContext;
@@ -30,8 +30,8 @@ public class JavaMQTTStreamSuite extends LocalJavaStreamingContext {
String topic = "def";
// tests the API, does not actually test data receiving
- JavaDStream<String> test1 = MQTTUtils.createStream(ssc, brokerUrl, topic);
- JavaDStream<String> test2 = MQTTUtils.createStream(ssc, brokerUrl, topic,
+ JavaReceiverInputDStream<String> test1 = MQTTUtils.createStream(ssc, brokerUrl, topic);
+ JavaReceiverInputDStream<String> test2 = MQTTUtils.createStream(ssc, brokerUrl, topic,
StorageLevel.MEMORY_AND_DISK_SER_2());
}
}
diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
index 89c40ad461..467fd263e2 100644
--- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.streaming.mqtt
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
class MQTTStreamSuite extends TestSuiteBase {
@@ -28,8 +29,9 @@ class MQTTStreamSuite extends TestSuiteBase {
val topic = "def"
// tests the API, does not actually test data receiving
- val test1 = MQTTUtils.createStream(ssc, brokerUrl, topic)
- val test2 = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic)
+ val test2: ReceiverInputDStream[String] =
+ MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)
// TODO: Actually test receiving data
ssc.stop()
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
index 843a4a7a9a..7bca140711 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
@@ -25,6 +25,8 @@ import twitter4j.auth.OAuthAuthorization
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.Logging
+import org.apache.spark.streaming.receiver.Receiver
/* A stream of Twitter statuses, potentially filtered by one or more keywords.
*
@@ -41,7 +43,7 @@ class TwitterInputDStream(
twitterAuth: Option[Authorization],
filters: Seq[String],
storageLevel: StorageLevel
- ) extends NetworkInputDStream[Status](ssc_) {
+ ) extends ReceiverInputDStream[Status](ssc_) {
private def createOAuthAuthorization(): Authorization = {
new OAuthAuthorization(new ConfigurationBuilder().build())
@@ -49,7 +51,7 @@ class TwitterInputDStream(
private val authorization = twitterAuth.getOrElse(createOAuthAuthorization())
- override def getReceiver(): NetworkReceiver[Status] = {
+ override def getReceiver(): Receiver[Status] = {
new TwitterReceiver(authorization, filters, storageLevel)
}
}
@@ -59,27 +61,27 @@ class TwitterReceiver(
twitterAuth: Authorization,
filters: Seq[String],
storageLevel: StorageLevel
- ) extends NetworkReceiver[Status] {
+ ) extends Receiver[Status](storageLevel) with Logging {
var twitterStream: TwitterStream = _
- lazy val blockGenerator = new BlockGenerator(storageLevel)
- protected override def onStart() {
- blockGenerator.start()
+ def onStart() {
twitterStream = new TwitterStreamFactory().getInstance(twitterAuth)
twitterStream.addListener(new StatusListener {
def onStatus(status: Status) = {
- blockGenerator += status
+ store(status)
}
// Unimplemented
def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}
def onTrackLimitationNotice(i: Int) {}
def onScrubGeo(l: Long, l1: Long) {}
def onStallWarning(stallWarning: StallWarning) {}
- def onException(e: Exception) { stopOnError(e) }
+ def onException(e: Exception) {
+ restart("Error receiving tweets", e)
+ }
})
- val query: FilterQuery = new FilterQuery
+ val query = new FilterQuery
if (filters.size > 0) {
query.track(filters.toArray)
twitterStream.filter(query)
@@ -89,8 +91,7 @@ class TwitterReceiver(
logInfo("Twitter receiver started")
}
- protected override def onStop() {
- blockGenerator.stop()
+ def onStop() {
twitterStream.shutdown()
logInfo("Twitter receiver stopped")
}
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
index e8433b7e9f..c6a9a2b737 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
@@ -21,8 +21,8 @@ import twitter4j.Status
import twitter4j.auth.Authorization
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
-import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaDStream, JavaStreamingContext}
+import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
object TwitterUtils {
/**
@@ -40,7 +40,7 @@ object TwitterUtils {
twitterAuth: Option[Authorization],
filters: Seq[String] = Nil,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): DStream[Status] = {
+ ): ReceiverInputDStream[Status] = {
new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel)
}
@@ -52,7 +52,7 @@ object TwitterUtils {
* Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
* @param jssc JavaStreamingContext object
*/
- def createStream(jssc: JavaStreamingContext): JavaDStream[Status] = {
+ def createStream(jssc: JavaStreamingContext): JavaReceiverInputDStream[Status] = {
createStream(jssc.ssc, None)
}
@@ -65,7 +65,8 @@ object TwitterUtils {
* @param jssc JavaStreamingContext object
* @param filters Set of filter strings to get only those tweets that match them
*/
- def createStream(jssc: JavaStreamingContext, filters: Array[String]): JavaDStream[Status] = {
+ def createStream(jssc: JavaStreamingContext, filters: Array[String]
+ ): JavaReceiverInputDStream[Status] = {
createStream(jssc.ssc, None, filters)
}
@@ -82,7 +83,7 @@ object TwitterUtils {
jssc: JavaStreamingContext,
filters: Array[String],
storageLevel: StorageLevel
- ): JavaDStream[Status] = {
+ ): JavaReceiverInputDStream[Status] = {
createStream(jssc.ssc, None, filters, storageLevel)
}
@@ -92,7 +93,8 @@ object TwitterUtils {
* @param jssc JavaStreamingContext object
* @param twitterAuth Twitter4J Authorization
*/
- def createStream(jssc: JavaStreamingContext, twitterAuth: Authorization): JavaDStream[Status] = {
+ def createStream(jssc: JavaStreamingContext, twitterAuth: Authorization
+ ): JavaReceiverInputDStream[Status] = {
createStream(jssc.ssc, Some(twitterAuth))
}
@@ -107,7 +109,7 @@ object TwitterUtils {
jssc: JavaStreamingContext,
twitterAuth: Authorization,
filters: Array[String]
- ): JavaDStream[Status] = {
+ ): JavaReceiverInputDStream[Status] = {
createStream(jssc.ssc, Some(twitterAuth), filters)
}
@@ -123,7 +125,7 @@ object TwitterUtils {
twitterAuth: Authorization,
filters: Array[String],
storageLevel: StorageLevel
- ): JavaDStream[Status] = {
+ ): JavaReceiverInputDStream[Status] = {
createStream(jssc.ssc, Some(twitterAuth), filters, storageLevel)
}
}
diff --git a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
index 06ab0cdaf3..93741e0375 100644
--- a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
+++ b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
@@ -20,6 +20,8 @@ package org.apache.spark.streaming.twitter
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
import org.apache.spark.storage.StorageLevel
import twitter4j.auth.{NullAuthorization, Authorization}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import twitter4j.Status
class TwitterStreamSuite extends TestSuiteBase {
@@ -29,13 +31,17 @@ class TwitterStreamSuite extends TestSuiteBase {
val authorization: Authorization = NullAuthorization.getInstance()
// tests the API, does not actually test data receiving
- val test1 = TwitterUtils.createStream(ssc, None)
- val test2 = TwitterUtils.createStream(ssc, None, filters)
- val test3 = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_AND_DISK_SER_2)
- val test4 = TwitterUtils.createStream(ssc, Some(authorization))
- val test5 = TwitterUtils.createStream(ssc, Some(authorization), filters)
- val test6 = TwitterUtils.createStream(ssc, Some(authorization), filters,
- StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test1: ReceiverInputDStream[Status] = TwitterUtils.createStream(ssc, None)
+ val test2: ReceiverInputDStream[Status] =
+ TwitterUtils.createStream(ssc, None, filters)
+ val test3: ReceiverInputDStream[Status] =
+ TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test4: ReceiverInputDStream[Status] =
+ TwitterUtils.createStream(ssc, Some(authorization))
+ val test5: ReceiverInputDStream[Status] =
+ TwitterUtils.createStream(ssc, Some(authorization), filters)
+ val test6: ReceiverInputDStream[Status] = TwitterUtils.createStream(
+ ssc, Some(authorization), filters, StorageLevel.MEMORY_AND_DISK_SER_2)
// Note that actually testing the data receiving is hard as authentication keys are
// necessary for accessing Twitter live stream
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
index a538c38dc4..554705878e 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
@@ -24,7 +24,7 @@ import akka.util.ByteString
import akka.zeromq._
import org.apache.spark.Logging
-import org.apache.spark.streaming.receivers._
+import org.apache.spark.streaming.receiver.ActorHelper
/**
* A receiver to subscribe to ZeroMQ stream.
@@ -32,7 +32,7 @@ import org.apache.spark.streaming.receivers._
private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
subscribe: Subscribe,
bytesToObjects: Seq[ByteString] => Iterator[T])
- extends Actor with Receiver with Logging {
+ extends Actor with ActorHelper with Logging {
override def preStart() = ZeroMQExtension(context.system)
.newSocket(SocketType.Sub, Listener(self), Connect(publisherUrl), subscribe)
@@ -46,9 +46,8 @@ private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
// We ignore first frame for processing as it is the topic
val bytes = m.frames.tail
- pushBlock(bytesToObjects(bytes))
+ store(bytesToObjects(bytes))
case Closed => logInfo("received closed ")
-
}
}
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
index b254e00714..0469d0af88 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
@@ -24,10 +24,10 @@ import akka.util.ByteString
import akka.zeromq.Subscribe
import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.receivers.ReceiverSupervisorStrategy
import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
-import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
+import org.apache.spark.streaming.dstream.{ReceiverInputDStream}
+import org.apache.spark.streaming.receiver.ActorSupervisorStrategy
object ZeroMQUtils {
/**
@@ -48,8 +48,8 @@ object ZeroMQUtils {
subscribe: Subscribe,
bytesToObjects: Seq[ByteString] => Iterator[T],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
- supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
- ): DStream[T] = {
+ supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy
+ ): ReceiverInputDStream[T] = {
ssc.actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
"ZeroMQReceiver", storageLevel, supervisorStrategy)
}
@@ -72,7 +72,7 @@ object ZeroMQUtils {
bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
storageLevel: StorageLevel,
supervisorStrategy: SupervisorStrategy
- ): JavaDStream[T] = {
+ ): JavaReceiverInputDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).toIterator
@@ -96,7 +96,7 @@ object ZeroMQUtils {
subscribe: Subscribe,
bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
storageLevel: StorageLevel
- ): JavaDStream[T] = {
+ ): JavaReceiverInputDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).toIterator
@@ -119,7 +119,7 @@ object ZeroMQUtils {
publisherUrl: String,
subscribe: Subscribe,
bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]]
- ): JavaDStream[T] = {
+ ): JavaReceiverInputDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).toIterator
diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
index d2361e14b8..417b91eecb 100644
--- a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
+++ b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
@@ -17,6 +17,7 @@
package org.apache.spark.streaming.zeromq;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.junit.Test;
import akka.actor.SupervisorStrategy;
import akka.util.ByteString;
@@ -24,7 +25,6 @@ import akka.zeromq.Subscribe;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.LocalJavaStreamingContext;
-import org.apache.spark.streaming.api.java.JavaDStream;
public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
@@ -39,11 +39,11 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
}
};
- JavaDStream<String> test1 = ZeroMQUtils.<String>createStream(
+ JavaReceiverInputDStream<String> test1 = ZeroMQUtils.<String>createStream(
ssc, publishUrl, subscribe, bytesToObjects);
- JavaDStream<String> test2 = ZeroMQUtils.<String>createStream(
+ JavaReceiverInputDStream<String> test2 = ZeroMQUtils.<String>createStream(
ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2());
- JavaDStream<String> test3 = ZeroMQUtils.<String>createStream(
+ JavaReceiverInputDStream<String> test3 = ZeroMQUtils.<String>createStream(
ssc,publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(),
SupervisorStrategy.defaultStrategy());
}
diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
index 92d55a7a7b..cc10ff6ae0 100644
--- a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
+++ b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
@@ -23,6 +23,7 @@ import akka.zeromq.Subscribe
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
class ZeroMQStreamSuite extends TestSuiteBase {
@@ -33,10 +34,12 @@ class ZeroMQStreamSuite extends TestSuiteBase {
val bytesToObjects = (bytes: Seq[ByteString]) => null.asInstanceOf[Iterator[String]]
// tests the API, does not actually test data receiving
- val test1 = ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects)
- val test2 = ZeroMQUtils.createStream(
+ val test1: ReceiverInputDStream[String] =
+ ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects)
+ val test2: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2)
- val test3 = ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects,
+ val test3: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
+ ssc, publishUrl, subscribe, bytesToObjects,
StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy)
// TODO: Actually test data receiving