aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/ui/UIUtils.scala6
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala6
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala28
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala10
-rw-r--r--external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java6
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala6
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala19
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala14
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java10
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala10
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala41
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala12
-rw-r--r--external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java6
-rw-r--r--external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala6
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala23
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala20
-rw-r--r--external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala20
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala7
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala16
-rw-r--r--external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java8
-rw-r--r--external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala9
-rw-r--r--project/MimaBuild.scala29
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala8
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala44
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaInputDStream.scala40
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairInputDStream.scala41
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairReceiverInputDStream.scala42
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaReceiverInputDStream.scala41
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala56
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala362
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala16
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala94
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala62
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala (renamed from streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala)95
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala142
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala236
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala23
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala180
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala180
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala16
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala8
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala (renamed from streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala)101
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala18
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala10
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala18
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala4
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java9
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala19
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala249
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala42
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala84
55 files changed, 1836 insertions, 731 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index 99770f2854..cf987a1ab0 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -121,7 +121,11 @@ private[spark] object UIUtils extends Logging {
(records, "")
}
}
- "%.1f%s".formatLocal(Locale.US, value, unit)
+ if (unit.isEmpty) {
+ "%d".formatLocal(Locale.US, value)
+ } else {
+ "%.1f%s".formatLocal(Locale.US, value, unit)
+ }
}
// Yarn has to go through a proxy so the base uri is provided and has to be on all links
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
index a22e64ca3c..eb44768b9c 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
@@ -26,8 +26,8 @@ import akka.actor.{Actor, ActorRef, Props, actorRef2Scala}
import org.apache.spark.{SparkConf, SecurityManager}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
-import org.apache.spark.streaming.receivers.Receiver
import org.apache.spark.util.AkkaUtils
+import org.apache.spark.streaming.receiver.ActorHelper
case class SubscribeReceiver(receiverActor: ActorRef)
case class UnsubscribeReceiver(receiverActor: ActorRef)
@@ -81,14 +81,14 @@ class FeederActor extends Actor {
* @see [[org.apache.spark.streaming.examples.FeederActor]]
*/
class SampleActorReceiver[T: ClassTag](urlOfPublisher: String)
-extends Actor with Receiver {
+extends Actor with ActorHelper {
lazy private val remotePublisher = context.actorSelection(urlOfPublisher)
override def preStart = remotePublisher ! SubscribeReceiver(context.self)
def receive = {
- case msg => pushBlock(msg.asInstanceOf[T])
+ case msg => store(msg.asInstanceOf[T])
}
override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self)
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index 34012b846e..df7605fe57 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -34,6 +34,8 @@ import org.apache.spark.util.Utils
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
+import org.apache.spark.Logging
+import org.apache.spark.streaming.receiver.Receiver
private[streaming]
class FlumeInputDStream[T: ClassTag](
@@ -41,9 +43,9 @@ class FlumeInputDStream[T: ClassTag](
host: String,
port: Int,
storageLevel: StorageLevel
-) extends NetworkInputDStream[SparkFlumeEvent](ssc_) {
+) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) {
- override def getReceiver(): NetworkReceiver[SparkFlumeEvent] = {
+ override def getReceiver(): Receiver[SparkFlumeEvent] = {
new FlumeReceiver(host, port, storageLevel)
}
}
@@ -115,13 +117,13 @@ private[streaming] object SparkFlumeEvent {
private[streaming]
class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
override def append(event : AvroFlumeEvent) : Status = {
- receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event)
+ receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event))
Status.OK
}
override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = {
events.foreach (event =>
- receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event))
+ receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event)))
Status.OK
}
}
@@ -133,23 +135,21 @@ class FlumeReceiver(
host: String,
port: Int,
storageLevel: StorageLevel
- ) extends NetworkReceiver[SparkFlumeEvent] {
+ ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
- lazy val blockGenerator = new BlockGenerator(storageLevel)
+ lazy val responder = new SpecificResponder(
+ classOf[AvroSourceProtocol], new FlumeEventServer(this))
+ lazy val server = new NettyServer(responder, new InetSocketAddress(host, port))
- protected override def onStart() {
- val responder = new SpecificResponder(
- classOf[AvroSourceProtocol], new FlumeEventServer(this))
- val server = new NettyServer(responder, new InetSocketAddress(host, port))
- blockGenerator.start()
+ def onStart() {
server.start()
logInfo("Flume receiver started")
}
- protected override def onStop() {
- blockGenerator.stop()
+ def onStop() {
+ server.close()
logInfo("Flume receiver stopped")
}
- override def getLocationPreference = Some(host)
+ override def preferredLocation = Some(host)
}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
index 654ba451e7..499f3560ef 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
@@ -19,8 +19,8 @@ package org.apache.spark.streaming.flume
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
-import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaInputDStream, JavaStreamingContext, JavaDStream}
+import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
object FlumeUtils {
/**
@@ -35,7 +35,7 @@ object FlumeUtils {
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): DStream[SparkFlumeEvent] = {
+ ): ReceiverInputDStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel)
inputStream
}
@@ -50,7 +50,7 @@ object FlumeUtils {
jssc: JavaStreamingContext,
hostname: String,
port: Int
- ): JavaDStream[SparkFlumeEvent] = {
+ ): JavaReceiverInputDStream[SparkFlumeEvent] = {
createStream(jssc.ssc, hostname, port)
}
@@ -65,7 +65,7 @@ object FlumeUtils {
hostname: String,
port: Int,
storageLevel: StorageLevel
- ): JavaDStream[SparkFlumeEvent] = {
+ ): JavaReceiverInputDStream[SparkFlumeEvent] = {
createStream(jssc.ssc, hostname, port, storageLevel)
}
}
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
index 733389b98d..e0ad4f1015 100644
--- a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
+++ b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
@@ -19,16 +19,16 @@ package org.apache.spark.streaming.flume;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.LocalJavaStreamingContext;
-import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.junit.Test;
public class JavaFlumeStreamSuite extends LocalJavaStreamingContext {
@Test
public void testFlumeStream() {
// tests the API, does not actually test data receiving
- JavaDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
- JavaDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
+ JavaReceiverInputDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
+ JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
StorageLevel.MEMORY_AND_DISK_SER_2());
}
}
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index 8bc43972ab..78603200d2 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -31,6 +31,7 @@ import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase}
import org.apache.spark.streaming.util.ManualClock
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
class FlumeStreamSuite extends TestSuiteBase {
@@ -39,10 +40,11 @@ class FlumeStreamSuite extends TestSuiteBase {
test("flume input stream") {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
- val flumeStream = FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK)
+ val flumeStream: JavaReceiverInputDStream[SparkFlumeEvent] =
+ FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
- val outputStream = new TestOutputStream(flumeStream, outputBuffer)
+ val outputStream = new TestOutputStream(flumeStream.receiverInputDStream, outputBuffer)
outputStream.register()
ssc.start()
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
index c2d9dcbfaa..21443ebbbf 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -33,6 +33,7 @@ import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.receiver.Receiver
/**
* Input stream that pulls messages from a Kafka Broker.
@@ -53,11 +54,11 @@ class KafkaInputDStream[
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
- ) extends NetworkInputDStream[(K, V)](ssc_) with Logging {
+ ) extends ReceiverInputDStream[(K, V)](ssc_) with Logging {
- def getReceiver(): NetworkReceiver[(K, V)] = {
+ def getReceiver(): Receiver[(K, V)] = {
new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
- .asInstanceOf[NetworkReceiver[(K, V)]]
+ .asInstanceOf[Receiver[(K, V)]]
}
}
@@ -70,21 +71,15 @@ class KafkaReceiver[
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
- ) extends NetworkReceiver[Any] {
+ ) extends Receiver[Any](storageLevel) with Logging {
- // Handles pushing data into the BlockManager
- lazy protected val blockGenerator = new BlockGenerator(storageLevel)
// Connection to Kafka
var consumerConnector : ConsumerConnector = null
- def onStop() {
- blockGenerator.stop()
- }
+ def onStop() { }
def onStart() {
- blockGenerator.start()
-
// In case we are using multiple Threads to handle Kafka Messages
val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _))
@@ -130,7 +125,7 @@ class KafkaReceiver[
def run() {
logInfo("Starting MessageHandler.")
for (msgAndMetadata <- stream) {
- blockGenerator += (msgAndMetadata.key, msgAndMetadata.message)
+ store((msgAndMetadata.key, msgAndMetadata.message))
}
}
}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index 5472d0cd04..86bb91f362 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -27,8 +27,8 @@ import kafka.serializer.{Decoder, StringDecoder}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream}
-import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.api.java.{JavaPairReceiverInputDStream, JavaStreamingContext, JavaPairDStream}
+import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
object KafkaUtils {
@@ -48,7 +48,7 @@ object KafkaUtils {
groupId: String,
topics: Map[String, Int],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): DStream[(String, String)] = {
+ ): ReceiverInputDStream[(String, String)] = {
val kafkaParams = Map[String, String](
"zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
"zookeeper.connection.timeout.ms" -> "10000")
@@ -70,7 +70,7 @@ object KafkaUtils {
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
- ): DStream[(K, V)] = {
+ ): ReceiverInputDStream[(K, V)] = {
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel)
}
@@ -88,7 +88,7 @@ object KafkaUtils {
zkQuorum: String,
groupId: String,
topics: JMap[String, JInt]
- ): JavaPairDStream[String, String] = {
+ ): JavaPairReceiverInputDStream[String, String] = {
implicit val cmt: ClassTag[String] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
@@ -110,7 +110,7 @@ object KafkaUtils {
groupId: String,
topics: JMap[String, JInt],
storageLevel: StorageLevel
- ): JavaPairDStream[String, String] = {
+ ): JavaPairReceiverInputDStream[String, String] = {
implicit val cmt: ClassTag[String] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
@@ -139,7 +139,7 @@ object KafkaUtils {
kafkaParams: JMap[String, String],
topics: JMap[String, JInt],
storageLevel: StorageLevel
- ): JavaPairDStream[K, V] = {
+ ): JavaPairReceiverInputDStream[K, V] = {
implicit val keyCmt: ClassTag[K] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
implicit val valueCmt: ClassTag[V] =
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
index 7b4999447e..9f8046bf00 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -18,12 +18,13 @@
package org.apache.spark.streaming.kafka;
import java.util.HashMap;
+
+import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.junit.Test;
import com.google.common.collect.Maps;
import kafka.serializer.StringDecoder;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.LocalJavaStreamingContext;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
public class JavaKafkaStreamSuite extends LocalJavaStreamingContext {
@Test
@@ -31,14 +32,15 @@ public class JavaKafkaStreamSuite extends LocalJavaStreamingContext {
HashMap<String, Integer> topics = Maps.newHashMap();
// tests the API, does not actually test data receiving
- JavaPairDStream<String, String> test1 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics);
- JavaPairDStream<String, String> test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics,
+ JavaPairReceiverInputDStream<String, String> test1 =
+ KafkaUtils.createStream(ssc, "localhost:12345", "group", topics);
+ JavaPairReceiverInputDStream<String, String> test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics,
StorageLevel.MEMORY_AND_DISK_SER_2());
HashMap<String, String> kafkaParams = Maps.newHashMap();
kafkaParams.put("zookeeper.connect", "localhost:12345");
kafkaParams.put("group.id","consumer-group");
- JavaPairDStream<String, String> test3 = KafkaUtils.createStream(ssc,
+ JavaPairReceiverInputDStream<String, String> test3 = KafkaUtils.createStream(ssc,
String.class, String.class, StringDecoder.class, StringDecoder.class,
kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2());
}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
index d9809f6409..e6f2c4a5cf 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.kafka
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
class KafkaStreamSuite extends TestSuiteBase {
@@ -28,10 +29,13 @@ class KafkaStreamSuite extends TestSuiteBase {
val topics = Map("my-topic" -> 1)
// tests the API, does not actually test data receiving
- val test1 = KafkaUtils.createStream(ssc, "localhost:1234", "group", topics)
- val test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test1: ReceiverInputDStream[(String, String)] =
+ KafkaUtils.createStream(ssc, "localhost:1234", "group", topics)
+ val test2: ReceiverInputDStream[(String, String)] =
+ KafkaUtils.createStream(ssc, "localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2)
val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group")
- val test3 = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
+ val test3: ReceiverInputDStream[(String, String)] =
+ KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2)
// TODO: Actually test receiving data
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
index 1204cfba39..0beee8b415 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
@@ -39,6 +39,7 @@ import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.receiver.Receiver
/**
* Input stream that subscribe messages from a Mqtt Broker.
@@ -49,38 +50,36 @@ import org.apache.spark.streaming.dstream._
*/
private[streaming]
-class MQTTInputDStream[T: ClassTag](
+class MQTTInputDStream(
@transient ssc_ : StreamingContext,
brokerUrl: String,
topic: String,
storageLevel: StorageLevel
- ) extends NetworkInputDStream[T](ssc_) with Logging {
-
- def getReceiver(): NetworkReceiver[T] = {
- new MQTTReceiver(brokerUrl, topic, storageLevel).asInstanceOf[NetworkReceiver[T]]
+ ) extends ReceiverInputDStream[String](ssc_) with Logging {
+
+ def getReceiver(): Receiver[String] = {
+ new MQTTReceiver(brokerUrl, topic, storageLevel)
}
}
-private[streaming]
-class MQTTReceiver(brokerUrl: String,
- topic: String,
- storageLevel: StorageLevel
- ) extends NetworkReceiver[Any] {
- lazy protected val blockGenerator = new BlockGenerator(storageLevel)
+private[streaming]
+class MQTTReceiver(
+ brokerUrl: String,
+ topic: String,
+ storageLevel: StorageLevel
+ ) extends Receiver[String](storageLevel) {
def onStop() {
- blockGenerator.stop()
- }
+ }
+
def onStart() {
- blockGenerator.start()
-
- // Set up persistence for messages
- var peristance: MqttClientPersistence = new MemoryPersistence()
+ // Set up persistence for messages
+ val persistence = new MemoryPersistence()
// Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
- var client: MqttClient = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance)
+ val client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence)
// Connect to MqttBroker
client.connect()
@@ -89,18 +88,18 @@ class MQTTReceiver(brokerUrl: String,
client.subscribe(topic)
// Callback automatically triggers as and when new message arrives on specified topic
- var callback: MqttCallback = new MqttCallback() {
+ val callback: MqttCallback = new MqttCallback() {
// Handles Mqtt message
override def messageArrived(arg0: String, arg1: MqttMessage) {
- blockGenerator += new String(arg1.getPayload())
+ store(new String(arg1.getPayload()))
}
override def deliveryComplete(arg0: IMqttDeliveryToken) {
}
override def connectionLost(arg0: Throwable) {
- logInfo("Connection lost " + arg0)
+ restart("Connection lost ", arg0)
}
}
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
index 1b09ee5dc8..c5ffe51f99 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
@@ -19,9 +19,9 @@ package org.apache.spark.streaming.mqtt
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
+import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext, JavaDStream}
import scala.reflect.ClassTag
-import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
object MQTTUtils {
/**
@@ -36,8 +36,8 @@ object MQTTUtils {
brokerUrl: String,
topic: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): DStream[String] = {
- new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel)
+ ): ReceiverInputDStream[String] = {
+ new MQTTInputDStream(ssc, brokerUrl, topic, storageLevel)
}
/**
@@ -51,7 +51,7 @@ object MQTTUtils {
jssc: JavaStreamingContext,
brokerUrl: String,
topic: String
- ): JavaDStream[String] = {
+ ): JavaReceiverInputDStream[String] = {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createStream(jssc.ssc, brokerUrl, topic)
}
@@ -68,7 +68,7 @@ object MQTTUtils {
brokerUrl: String,
topic: String,
storageLevel: StorageLevel
- ): JavaDStream[String] = {
+ ): JavaReceiverInputDStream[String] = {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createStream(jssc.ssc, brokerUrl, topic, storageLevel)
}
diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
index 44743aaecf..ce5aa1e0cd 100644
--- a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
+++ b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
@@ -18,7 +18,7 @@
package org.apache.spark.streaming.mqtt;
import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.junit.Test;
import org.apache.spark.streaming.LocalJavaStreamingContext;
@@ -30,8 +30,8 @@ public class JavaMQTTStreamSuite extends LocalJavaStreamingContext {
String topic = "def";
// tests the API, does not actually test data receiving
- JavaDStream<String> test1 = MQTTUtils.createStream(ssc, brokerUrl, topic);
- JavaDStream<String> test2 = MQTTUtils.createStream(ssc, brokerUrl, topic,
+ JavaReceiverInputDStream<String> test1 = MQTTUtils.createStream(ssc, brokerUrl, topic);
+ JavaReceiverInputDStream<String> test2 = MQTTUtils.createStream(ssc, brokerUrl, topic,
StorageLevel.MEMORY_AND_DISK_SER_2());
}
}
diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
index 89c40ad461..467fd263e2 100644
--- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.streaming.mqtt
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
class MQTTStreamSuite extends TestSuiteBase {
@@ -28,8 +29,9 @@ class MQTTStreamSuite extends TestSuiteBase {
val topic = "def"
// tests the API, does not actually test data receiving
- val test1 = MQTTUtils.createStream(ssc, brokerUrl, topic)
- val test2 = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic)
+ val test2: ReceiverInputDStream[String] =
+ MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)
// TODO: Actually test receiving data
ssc.stop()
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
index 843a4a7a9a..7bca140711 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
@@ -25,6 +25,8 @@ import twitter4j.auth.OAuthAuthorization
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.Logging
+import org.apache.spark.streaming.receiver.Receiver
/* A stream of Twitter statuses, potentially filtered by one or more keywords.
*
@@ -41,7 +43,7 @@ class TwitterInputDStream(
twitterAuth: Option[Authorization],
filters: Seq[String],
storageLevel: StorageLevel
- ) extends NetworkInputDStream[Status](ssc_) {
+ ) extends ReceiverInputDStream[Status](ssc_) {
private def createOAuthAuthorization(): Authorization = {
new OAuthAuthorization(new ConfigurationBuilder().build())
@@ -49,7 +51,7 @@ class TwitterInputDStream(
private val authorization = twitterAuth.getOrElse(createOAuthAuthorization())
- override def getReceiver(): NetworkReceiver[Status] = {
+ override def getReceiver(): Receiver[Status] = {
new TwitterReceiver(authorization, filters, storageLevel)
}
}
@@ -59,27 +61,27 @@ class TwitterReceiver(
twitterAuth: Authorization,
filters: Seq[String],
storageLevel: StorageLevel
- ) extends NetworkReceiver[Status] {
+ ) extends Receiver[Status](storageLevel) with Logging {
var twitterStream: TwitterStream = _
- lazy val blockGenerator = new BlockGenerator(storageLevel)
- protected override def onStart() {
- blockGenerator.start()
+ def onStart() {
twitterStream = new TwitterStreamFactory().getInstance(twitterAuth)
twitterStream.addListener(new StatusListener {
def onStatus(status: Status) = {
- blockGenerator += status
+ store(status)
}
// Unimplemented
def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}
def onTrackLimitationNotice(i: Int) {}
def onScrubGeo(l: Long, l1: Long) {}
def onStallWarning(stallWarning: StallWarning) {}
- def onException(e: Exception) { stopOnError(e) }
+ def onException(e: Exception) {
+ restart("Error receiving tweets", e)
+ }
})
- val query: FilterQuery = new FilterQuery
+ val query = new FilterQuery
if (filters.size > 0) {
query.track(filters.toArray)
twitterStream.filter(query)
@@ -89,8 +91,7 @@ class TwitterReceiver(
logInfo("Twitter receiver started")
}
- protected override def onStop() {
- blockGenerator.stop()
+ def onStop() {
twitterStream.shutdown()
logInfo("Twitter receiver stopped")
}
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
index e8433b7e9f..c6a9a2b737 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
@@ -21,8 +21,8 @@ import twitter4j.Status
import twitter4j.auth.Authorization
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
-import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaDStream, JavaStreamingContext}
+import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
object TwitterUtils {
/**
@@ -40,7 +40,7 @@ object TwitterUtils {
twitterAuth: Option[Authorization],
filters: Seq[String] = Nil,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): DStream[Status] = {
+ ): ReceiverInputDStream[Status] = {
new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel)
}
@@ -52,7 +52,7 @@ object TwitterUtils {
* Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
* @param jssc JavaStreamingContext object
*/
- def createStream(jssc: JavaStreamingContext): JavaDStream[Status] = {
+ def createStream(jssc: JavaStreamingContext): JavaReceiverInputDStream[Status] = {
createStream(jssc.ssc, None)
}
@@ -65,7 +65,8 @@ object TwitterUtils {
* @param jssc JavaStreamingContext object
* @param filters Set of filter strings to get only those tweets that match them
*/
- def createStream(jssc: JavaStreamingContext, filters: Array[String]): JavaDStream[Status] = {
+ def createStream(jssc: JavaStreamingContext, filters: Array[String]
+ ): JavaReceiverInputDStream[Status] = {
createStream(jssc.ssc, None, filters)
}
@@ -82,7 +83,7 @@ object TwitterUtils {
jssc: JavaStreamingContext,
filters: Array[String],
storageLevel: StorageLevel
- ): JavaDStream[Status] = {
+ ): JavaReceiverInputDStream[Status] = {
createStream(jssc.ssc, None, filters, storageLevel)
}
@@ -92,7 +93,8 @@ object TwitterUtils {
* @param jssc JavaStreamingContext object
* @param twitterAuth Twitter4J Authorization
*/
- def createStream(jssc: JavaStreamingContext, twitterAuth: Authorization): JavaDStream[Status] = {
+ def createStream(jssc: JavaStreamingContext, twitterAuth: Authorization
+ ): JavaReceiverInputDStream[Status] = {
createStream(jssc.ssc, Some(twitterAuth))
}
@@ -107,7 +109,7 @@ object TwitterUtils {
jssc: JavaStreamingContext,
twitterAuth: Authorization,
filters: Array[String]
- ): JavaDStream[Status] = {
+ ): JavaReceiverInputDStream[Status] = {
createStream(jssc.ssc, Some(twitterAuth), filters)
}
@@ -123,7 +125,7 @@ object TwitterUtils {
twitterAuth: Authorization,
filters: Array[String],
storageLevel: StorageLevel
- ): JavaDStream[Status] = {
+ ): JavaReceiverInputDStream[Status] = {
createStream(jssc.ssc, Some(twitterAuth), filters, storageLevel)
}
}
diff --git a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
index 06ab0cdaf3..93741e0375 100644
--- a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
+++ b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
@@ -20,6 +20,8 @@ package org.apache.spark.streaming.twitter
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
import org.apache.spark.storage.StorageLevel
import twitter4j.auth.{NullAuthorization, Authorization}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import twitter4j.Status
class TwitterStreamSuite extends TestSuiteBase {
@@ -29,13 +31,17 @@ class TwitterStreamSuite extends TestSuiteBase {
val authorization: Authorization = NullAuthorization.getInstance()
// tests the API, does not actually test data receiving
- val test1 = TwitterUtils.createStream(ssc, None)
- val test2 = TwitterUtils.createStream(ssc, None, filters)
- val test3 = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_AND_DISK_SER_2)
- val test4 = TwitterUtils.createStream(ssc, Some(authorization))
- val test5 = TwitterUtils.createStream(ssc, Some(authorization), filters)
- val test6 = TwitterUtils.createStream(ssc, Some(authorization), filters,
- StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test1: ReceiverInputDStream[Status] = TwitterUtils.createStream(ssc, None)
+ val test2: ReceiverInputDStream[Status] =
+ TwitterUtils.createStream(ssc, None, filters)
+ val test3: ReceiverInputDStream[Status] =
+ TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test4: ReceiverInputDStream[Status] =
+ TwitterUtils.createStream(ssc, Some(authorization))
+ val test5: ReceiverInputDStream[Status] =
+ TwitterUtils.createStream(ssc, Some(authorization), filters)
+ val test6: ReceiverInputDStream[Status] = TwitterUtils.createStream(
+ ssc, Some(authorization), filters, StorageLevel.MEMORY_AND_DISK_SER_2)
// Note that actually testing the data receiving is hard as authentication keys are
// necessary for accessing Twitter live stream
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
index a538c38dc4..554705878e 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
@@ -24,7 +24,7 @@ import akka.util.ByteString
import akka.zeromq._
import org.apache.spark.Logging
-import org.apache.spark.streaming.receivers._
+import org.apache.spark.streaming.receiver.ActorHelper
/**
* A receiver to subscribe to ZeroMQ stream.
@@ -32,7 +32,7 @@ import org.apache.spark.streaming.receivers._
private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
subscribe: Subscribe,
bytesToObjects: Seq[ByteString] => Iterator[T])
- extends Actor with Receiver with Logging {
+ extends Actor with ActorHelper with Logging {
override def preStart() = ZeroMQExtension(context.system)
.newSocket(SocketType.Sub, Listener(self), Connect(publisherUrl), subscribe)
@@ -46,9 +46,8 @@ private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
// We ignore first frame for processing as it is the topic
val bytes = m.frames.tail
- pushBlock(bytesToObjects(bytes))
+ store(bytesToObjects(bytes))
case Closed => logInfo("received closed ")
-
}
}
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
index b254e00714..0469d0af88 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
@@ -24,10 +24,10 @@ import akka.util.ByteString
import akka.zeromq.Subscribe
import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.receivers.ReceiverSupervisorStrategy
import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
-import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
+import org.apache.spark.streaming.dstream.{ReceiverInputDStream}
+import org.apache.spark.streaming.receiver.ActorSupervisorStrategy
object ZeroMQUtils {
/**
@@ -48,8 +48,8 @@ object ZeroMQUtils {
subscribe: Subscribe,
bytesToObjects: Seq[ByteString] => Iterator[T],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
- supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
- ): DStream[T] = {
+ supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy
+ ): ReceiverInputDStream[T] = {
ssc.actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
"ZeroMQReceiver", storageLevel, supervisorStrategy)
}
@@ -72,7 +72,7 @@ object ZeroMQUtils {
bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
storageLevel: StorageLevel,
supervisorStrategy: SupervisorStrategy
- ): JavaDStream[T] = {
+ ): JavaReceiverInputDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).toIterator
@@ -96,7 +96,7 @@ object ZeroMQUtils {
subscribe: Subscribe,
bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
storageLevel: StorageLevel
- ): JavaDStream[T] = {
+ ): JavaReceiverInputDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).toIterator
@@ -119,7 +119,7 @@ object ZeroMQUtils {
publisherUrl: String,
subscribe: Subscribe,
bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]]
- ): JavaDStream[T] = {
+ ): JavaReceiverInputDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).toIterator
diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
index d2361e14b8..417b91eecb 100644
--- a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
+++ b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
@@ -17,6 +17,7 @@
package org.apache.spark.streaming.zeromq;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.junit.Test;
import akka.actor.SupervisorStrategy;
import akka.util.ByteString;
@@ -24,7 +25,6 @@ import akka.zeromq.Subscribe;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.LocalJavaStreamingContext;
-import org.apache.spark.streaming.api.java.JavaDStream;
public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
@@ -39,11 +39,11 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
}
};
- JavaDStream<String> test1 = ZeroMQUtils.<String>createStream(
+ JavaReceiverInputDStream<String> test1 = ZeroMQUtils.<String>createStream(
ssc, publishUrl, subscribe, bytesToObjects);
- JavaDStream<String> test2 = ZeroMQUtils.<String>createStream(
+ JavaReceiverInputDStream<String> test2 = ZeroMQUtils.<String>createStream(
ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2());
- JavaDStream<String> test3 = ZeroMQUtils.<String>createStream(
+ JavaReceiverInputDStream<String> test3 = ZeroMQUtils.<String>createStream(
ssc,publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(),
SupervisorStrategy.defaultStrategy());
}
diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
index 92d55a7a7b..cc10ff6ae0 100644
--- a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
+++ b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
@@ -23,6 +23,7 @@ import akka.zeromq.Subscribe
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
class ZeroMQStreamSuite extends TestSuiteBase {
@@ -33,10 +34,12 @@ class ZeroMQStreamSuite extends TestSuiteBase {
val bytesToObjects = (bytes: Seq[ByteString]) => null.asInstanceOf[Iterator[String]]
// tests the API, does not actually test data receiving
- val test1 = ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects)
- val test2 = ZeroMQUtils.createStream(
+ val test1: ReceiverInputDStream[String] =
+ ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects)
+ val test2: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2)
- val test3 = ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects,
+ val test3: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
+ ssc, publishUrl, subscribe, bytesToObjects,
StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy)
// TODO: Actually test data receiving
diff --git a/project/MimaBuild.scala b/project/MimaBuild.scala
index 9cb31d7044..d540dc0a98 100644
--- a/project/MimaBuild.scala
+++ b/project/MimaBuild.scala
@@ -38,6 +38,7 @@ object MimaBuild {
IO.read(excludeFile).split("\n")
}
+ // Exclude a single class and its corresponding object
def excludeClass(className: String) = {
Seq(
excludePackage(className),
@@ -48,7 +49,16 @@ object MimaBuild {
ProblemFilters.exclude[MissingTypesProblem](className + "$")
)
}
- def excludeSparkClass(className: String) = excludeClass("org.apache.spark." + className)
+
+ // Exclude a Spark class, that is in the package org.apache.spark
+ def excludeSparkClass(className: String) = {
+ excludeClass("org.apache.spark." + className)
+ }
+
+ // Exclude a Spark package, that is in the package org.apache.spark
+ def excludeSparkPackage(packageName: String) = {
+ excludePackage("org.apache.spark." + packageName)
+ }
val packagePrivateExcludes = packagePrivateList.flatMap(excludeClass)
@@ -58,10 +68,9 @@ object MimaBuild {
SparkBuild.SPARK_VERSION match {
case v if v.startsWith("1.0") =>
Seq(
- excludePackage("org.apache.spark.api.java"),
- excludePackage("org.apache.spark.streaming.api.java"),
- excludePackage("org.apache.spark.streaming.scheduler"),
- excludePackage("org.apache.spark.mllib")
+ excludeSparkPackage("api.java"),
+ excludeSparkPackage("mllib"),
+ excludeSparkPackage("streaming")
) ++
excludeSparkClass("rdd.ClassTags") ++
excludeSparkClass("util.XORShiftRandom") ++
@@ -69,14 +78,7 @@ object MimaBuild {
excludeSparkClass("mllib.optimization.SquaredGradient") ++
excludeSparkClass("mllib.regression.RidgeRegressionWithSGD") ++
excludeSparkClass("mllib.regression.LassoWithSGD") ++
- excludeSparkClass("mllib.regression.LinearRegressionWithSGD") ++
- excludeSparkClass("streaming.dstream.NetworkReceiver") ++
- excludeSparkClass("streaming.dstream.NetworkReceiver#NetworkReceiverActor") ++
- excludeSparkClass("streaming.dstream.NetworkReceiver#BlockGenerator") ++
- excludeSparkClass("streaming.dstream.NetworkReceiver#BlockGenerator#Block") ++
- excludeSparkClass("streaming.dstream.ReportError") ++
- excludeSparkClass("streaming.dstream.ReportBlock") ++
- excludeSparkClass("streaming.dstream.DStream")
+ excludeSparkClass("mllib.regression.LinearRegressionWithSGD")
case _ => Seq()
}
@@ -87,5 +89,4 @@ object MimaBuild {
previousArtifact := None,
binaryIssueFilters ++= ignoredABIProblems(sparkHome)
)
-
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index d3339063cc..b4adf0e965 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable.ArrayBuffer
import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
import org.apache.spark.Logging
import org.apache.spark.streaming.scheduler.Job
-import org.apache.spark.streaming.dstream.{DStream, NetworkInputDStream, InputDStream}
+import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream, InputDStream}
final private[streaming] class DStreamGraph extends Serializable with Logging {
@@ -103,9 +103,9 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
def getOutputStreams() = this.synchronized { outputStreams.toArray }
- def getNetworkInputStreams() = this.synchronized {
- inputStreams.filter(_.isInstanceOf[NetworkInputDStream[_]])
- .map(_.asInstanceOf[NetworkInputDStream[_]])
+ def getReceiverInputStreams() = this.synchronized {
+ inputStreams.filter(_.isInstanceOf[ReceiverInputDStream[_]])
+ .map(_.asInstanceOf[ReceiverInputDStream[_]])
.toArray
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index e9a4f7ba22..daa5c69bba 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -31,12 +31,11 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
-
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream._
-import org.apache.spark.streaming.receivers._
+import org.apache.spark.streaming.receiver.{ActorSupervisorStrategy, ActorReceiver, Receiver}
import org.apache.spark.streaming.scheduler._
import org.apache.spark.streaming.ui.StreamingTab
import org.apache.spark.util.MetadataCleaner
@@ -139,7 +138,7 @@ class StreamingContext private[streaming] (
}
}
- private val nextNetworkInputStreamId = new AtomicInteger(0)
+ private val nextReceiverInputStreamId = new AtomicInteger(0)
private[streaming] var checkpointDir: String = {
if (isCheckpointPresent) {
@@ -208,15 +207,26 @@ class StreamingContext private[streaming] (
if (isCheckpointPresent) cp_ else null
}
- private[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement()
+ private[streaming] def getNewReceiverStreamId() = nextReceiverInputStreamId.getAndIncrement()
/**
- * Create an input stream with any arbitrary user implemented network receiver.
+ * Create an input stream with any arbitrary user implemented receiver.
* Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
- * @param receiver Custom implementation of NetworkReceiver
+ * @param receiver Custom implementation of Receiver
*/
+ @deprecated("Use receiverStream", "1.0.0")
def networkStream[T: ClassTag](
- receiver: NetworkReceiver[T]): DStream[T] = {
+ receiver: Receiver[T]): ReceiverInputDStream[T] = {
+ receiverStream(receiver)
+ }
+
+ /**
+ * Create an input stream with any arbitrary user implemented receiver.
+ * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
+ * @param receiver Custom implementation of Receiver
+ */
+ def receiverStream[T: ClassTag](
+ receiver: Receiver[T]): ReceiverInputDStream[T] = {
new PluggableInputDStream[T](this, receiver)
}
@@ -236,9 +246,9 @@ class StreamingContext private[streaming] (
props: Props,
name: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
- supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
- ): DStream[T] = {
- networkStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy))
+ supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy
+ ): ReceiverInputDStream[T] = {
+ receiverStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy))
}
/**
@@ -254,7 +264,7 @@ class StreamingContext private[streaming] (
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): DStream[String] = {
+ ): ReceiverInputDStream[String] = {
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
@@ -273,7 +283,7 @@ class StreamingContext private[streaming] (
port: Int,
converter: (InputStream) => Iterator[T],
storageLevel: StorageLevel
- ): DStream[T] = {
+ ): ReceiverInputDStream[T] = {
new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
}
@@ -292,7 +302,7 @@ class StreamingContext private[streaming] (
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): DStream[T] = {
+ ): ReceiverInputDStream[T] = {
new RawInputDStream[T](this, hostname, port, storageLevel)
}
@@ -310,7 +320,7 @@ class StreamingContext private[streaming] (
K: ClassTag,
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag
- ] (directory: String): DStream[(K, V)] = {
+ ] (directory: String): InputDStream[(K, V)] = {
new FileInputDStream[K, V, F](this, directory)
}
@@ -330,7 +340,7 @@ class StreamingContext private[streaming] (
K: ClassTag,
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag
- ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): DStream[(K, V)] = {
+ ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): InputDStream[(K, V)] = {
new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
}
@@ -356,7 +366,7 @@ class StreamingContext private[streaming] (
def queueStream[T: ClassTag](
queue: Queue[RDD[T]],
oneAtATime: Boolean = true
- ): DStream[T] = {
+ ): InputDStream[T] = {
queueStream(queue, oneAtATime, sc.makeRDD(Seq[T](), 1))
}
@@ -373,7 +383,7 @@ class StreamingContext private[streaming] (
queue: Queue[RDD[T]],
oneAtATime: Boolean,
defaultRDD: RDD[T]
- ): DStream[T] = {
+ ): InputDStream[T] = {
new QueueInputDStream(this, queue, oneAtATime, defaultRDD)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
index 13e2bacc92..505e4431e4 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
@@ -97,6 +97,10 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classTag: ClassTag[T]
}
object JavaDStream {
+ /**
+ * Convert a scala [[org.apache.spark.streaming.dstream.DStream]] to a Java-friendly
+ * [[org.apache.spark.streaming.api.java.JavaDStream]].
+ */
implicit def fromDStream[T: ClassTag](dstream: DStream[T]): JavaDStream[T] =
new JavaDStream[T](dstream)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaInputDStream.scala
new file mode 100644
index 0000000000..91f8d342d2
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaInputDStream.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.streaming.dstream.InputDStream
+
+/**
+ * A Java-friendly interface to [[org.apache.spark.streaming.dstream.InputDStream]].
+ */
+class JavaInputDStream[T](val inputDStream: InputDStream[T])
+ (implicit override val classTag: ClassTag[T]) extends JavaDStream[T](inputDStream) {
+}
+
+object JavaInputDStream {
+ /**
+ * Convert a scala [[org.apache.spark.streaming.dstream.InputDStream]] to a Java-friendly
+ * [[org.apache.spark.streaming.api.java.JavaInputDStream]].
+ */
+ implicit def fromInputDStream[T: ClassTag](
+ inputDStream: InputDStream[T]): JavaInputDStream[T] = {
+ new JavaInputDStream[T](inputDStream)
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairInputDStream.scala
new file mode 100644
index 0000000000..add8585308
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairInputDStream.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java
+
+import org.apache.spark.streaming.dstream.InputDStream
+import scala.reflect.ClassTag
+
+/**
+ * A Java-friendly interface to [[org.apache.spark.streaming.dstream.InputDStream]] of
+ * key-value pairs.
+ */
+class JavaPairInputDStream[K, V](val inputDStream: InputDStream[(K, V)])(
+ implicit val kClassTag: ClassTag[K], implicit val vClassTag: ClassTag[V]
+ ) extends JavaPairDStream[K, V](inputDStream) {
+}
+
+object JavaPairInputDStream {
+ /**
+ * Convert a scala [[org.apache.spark.streaming.dstream.InputDStream]] of pairs to a
+ * Java-friendly [[org.apache.spark.streaming.api.java.JavaPairInputDStream]].
+ */
+ implicit def fromInputDStream[K: ClassTag, V: ClassTag](
+ inputDStream: InputDStream[(K, V)]): JavaPairInputDStream[K, V] = {
+ new JavaPairInputDStream[K, V](inputDStream)
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairReceiverInputDStream.scala
new file mode 100644
index 0000000000..974b3e4516
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairReceiverInputDStream.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+/**
+ * A Java-friendly interface to [[org.apache.spark.streaming.dstream.ReceiverInputDStream]], the
+ * abstract class for defining any input stream that receives data over the network.
+ */
+class JavaPairReceiverInputDStream[K, V](val receiverInputDStream: ReceiverInputDStream[(K, V)])
+ (implicit override val kClassTag: ClassTag[K], override implicit val vClassTag: ClassTag[V])
+ extends JavaPairInputDStream[K, V](receiverInputDStream) {
+}
+
+object JavaPairReceiverInputDStream {
+ /**
+ * Convert a scala [[org.apache.spark.streaming.dstream.ReceiverInputDStream]] to a Java-friendly
+ * [[org.apache.spark.streaming.api.java.JavaReceiverInputDStream]].
+ */
+ implicit def fromReceiverInputDStream[K: ClassTag, V: ClassTag](
+ receiverInputDStream: ReceiverInputDStream[(K, V)]): JavaPairReceiverInputDStream[K, V] = {
+ new JavaPairReceiverInputDStream[K, V](receiverInputDStream)
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaReceiverInputDStream.scala
new file mode 100644
index 0000000000..340ef97980
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaReceiverInputDStream.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+/**
+ * A Java-friendly interface to [[org.apache.spark.streaming.dstream.ReceiverInputDStream]], the
+ * abstract class for defining any input stream that receives data over the network.
+ */
+class JavaReceiverInputDStream[T](val receiverInputDStream: ReceiverInputDStream[T])
+ (implicit override val classTag: ClassTag[T]) extends JavaInputDStream[T](receiverInputDStream) {
+}
+
+object JavaReceiverInputDStream {
+ /**
+ * Convert a scala [[org.apache.spark.streaming.dstream.ReceiverInputDStream]] to a Java-friendly
+ * [[org.apache.spark.streaming.api.java.JavaReceiverInputDStream]].
+ */
+ implicit def fromReceiverInputDStream[T: ClassTag](
+ receiverInputDStream: ReceiverInputDStream[T]): JavaReceiverInputDStream[T] = {
+ new JavaReceiverInputDStream[T](receiverInputDStream)
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index c800602d09..fbb2e9f85d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -35,7 +35,8 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.scheduler.StreamingListener
import org.apache.hadoop.conf.Configuration
-import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.dstream.{PluggableInputDStream, ReceiverInputDStream, DStream}
+import org.apache.spark.streaming.receiver.Receiver
/**
* A Java-friendly version of [[org.apache.spark.streaming.StreamingContext]] which is the main
@@ -155,8 +156,10 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param port Port to connect to for receiving data
* @param storageLevel Storage level to use for storing the received objects
*/
- def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel)
- : JavaDStream[String] = {
+ def socketTextStream(
+ hostname: String, port: Int,
+ storageLevel: StorageLevel
+ ): JavaReceiverInputDStream[String] = {
ssc.socketTextStream(hostname, port, storageLevel)
}
@@ -167,7 +170,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
*/
- def socketTextStream(hostname: String, port: Int): JavaDStream[String] = {
+ def socketTextStream(hostname: String, port: Int): JavaReceiverInputDStream[String] = {
ssc.socketTextStream(hostname, port)
}
@@ -186,7 +189,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
port: Int,
converter: JFunction[InputStream, java.lang.Iterable[T]],
storageLevel: StorageLevel)
- : JavaDStream[T] = {
+ : JavaReceiverInputDStream[T] = {
def fn = (x: InputStream) => converter.call(x).toIterator
implicit val cmt: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
@@ -218,10 +221,11 @@ class JavaStreamingContext(val ssc: StreamingContext) {
def rawSocketStream[T](
hostname: String,
port: Int,
- storageLevel: StorageLevel): JavaDStream[T] = {
+ storageLevel: StorageLevel): JavaReceiverInputDStream[T] = {
implicit val cmt: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- JavaDStream.fromDStream(ssc.rawSocketStream(hostname, port, storageLevel))
+ JavaReceiverInputDStream.fromReceiverInputDStream(
+ ssc.rawSocketStream(hostname, port, storageLevel))
}
/**
@@ -233,10 +237,11 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param port Port to connect to for receiving data
* @tparam T Type of the objects in the received blocks
*/
- def rawSocketStream[T](hostname: String, port: Int): JavaDStream[T] = {
+ def rawSocketStream[T](hostname: String, port: Int): JavaReceiverInputDStream[T] = {
implicit val cmt: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- JavaDStream.fromDStream(ssc.rawSocketStream(hostname, port))
+ JavaReceiverInputDStream.fromReceiverInputDStream(
+ ssc.rawSocketStream(hostname, port))
}
/**
@@ -249,7 +254,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @tparam V Value type for reading HDFS file
* @tparam F Input format for reading HDFS file
*/
- def fileStream[K, V, F <: NewInputFormat[K, V]](directory: String): JavaPairDStream[K, V] = {
+ def fileStream[K, V, F <: NewInputFormat[K, V]](
+ directory: String): JavaPairInputDStream[K, V] = {
implicit val cmk: ClassTag[K] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
implicit val cmv: ClassTag[V] =
@@ -275,7 +281,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
name: String,
storageLevel: StorageLevel,
supervisorStrategy: SupervisorStrategy
- ): JavaDStream[T] = {
+ ): JavaReceiverInputDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
ssc.actorStream[T](props, name, storageLevel, supervisorStrategy)
@@ -296,7 +302,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
props: Props,
name: String,
storageLevel: StorageLevel
- ): JavaDStream[T] = {
+ ): JavaReceiverInputDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
ssc.actorStream[T](props, name, storageLevel)
@@ -316,14 +322,14 @@ class JavaStreamingContext(val ssc: StreamingContext) {
def actorStream[T](
props: Props,
name: String
- ): JavaDStream[T] = {
+ ): JavaReceiverInputDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
ssc.actorStream[T](props, name)
}
/**
- * Creates an input stream from an queue of RDDs. In each batch,
+ * Create an input stream from an queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
*
* NOTE: changes to the queue after the stream is created will not be recognized.
@@ -339,7 +345,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
- * Creates an input stream from an queue of RDDs. In each batch,
+ * Create an input stream from an queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
*
* NOTE: changes to the queue after the stream is created will not be recognized.
@@ -347,7 +353,10 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
* @tparam T Type of objects in the RDD
*/
- def queueStream[T](queue: java.util.Queue[JavaRDD[T]], oneAtATime: Boolean): JavaDStream[T] = {
+ def queueStream[T](
+ queue: java.util.Queue[JavaRDD[T]],
+ oneAtATime: Boolean
+ ): JavaInputDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val sQueue = new scala.collection.mutable.Queue[RDD[T]]
@@ -356,7 +365,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
- * Creates an input stream from an queue of RDDs. In each batch,
+ * Create an input stream from an queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
*
* NOTE: changes to the queue after the stream is created will not be recognized.
@@ -368,7 +377,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
def queueStream[T](
queue: java.util.Queue[JavaRDD[T]],
oneAtATime: Boolean,
- defaultRDD: JavaRDD[T]): JavaDStream[T] = {
+ defaultRDD: JavaRDD[T]): JavaInputDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val sQueue = new scala.collection.mutable.Queue[RDD[T]]
@@ -377,6 +386,17 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
+ * Create an input stream with any arbitrary user implemented receiver.
+ * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
+ * @param receiver Custom implementation of Receiver
+ */
+ def receiverStream[T](receiver: Receiver[T]): ReceiverInputDStream[T] = {
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+ ssc.receiverStream(receiver)
+ }
+
+ /**
* Create a unified DStream from multiple DStreams of the same type and same slide duration.
*/
def union[T](first: JavaDStream[T], rest: JList[JavaDStream[T]]): JavaDStream[T] = {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index 226844c228..aa1993f058 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -30,7 +30,7 @@ import scala.reflect.ClassTag
* FileInputDStream, a subclass of InputDStream, monitors a HDFS directory from the driver for
* new files and generates RDDs with the new files. For implementing input streams
* that requires running a receiver on the worker nodes, use
- * [[org.apache.spark.streaming.dstream.NetworkInputDStream]] as the parent class.
+ * [[org.apache.spark.streaming.dstream.ReceiverInputDStream]] as the parent class.
*
* @param ssc_ Streaming context that will execute this input stream
*/
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
deleted file mode 100644
index 5a249706b4..0000000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ /dev/null
@@ -1,362 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.dstream
-
-import java.nio.ByteBuffer
-import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}
-
-import scala.collection.mutable.{ArrayBuffer, HashMap}
-import scala.concurrent.Await
-import scala.reflect.ClassTag
-
-import akka.actor.{Actor, Props}
-import akka.pattern.ask
-
-import org.apache.spark.{Logging, SparkEnv}
-import org.apache.spark.rdd.{BlockRDD, RDD}
-import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId}
-import org.apache.spark.streaming._
-import org.apache.spark.streaming.scheduler.{AddBlock, DeregisterReceiver, ReceivedBlockInfo, RegisterReceiver}
-import org.apache.spark.streaming.util.{RecurringTimer, SystemClock}
-import org.apache.spark.util.{AkkaUtils, Utils}
-
-/**
- * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
- * that has to start a receiver on worker nodes to receive external data.
- * Specific implementations of NetworkInputDStream must
- * define the getReceiver() function that gets the receiver object of type
- * [[org.apache.spark.streaming.dstream.NetworkReceiver]] that will be sent
- * to the workers to receive data.
- * @param ssc_ Streaming context that will execute this input stream
- * @tparam T Class type of the object of this stream
- */
-abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingContext)
- extends InputDStream[T](ssc_) {
-
- /** Keeps all received blocks information */
- private lazy val receivedBlockInfo = new HashMap[Time, Array[ReceivedBlockInfo]]
-
- /** This is an unique identifier for the network input stream. */
- val id = ssc.getNewNetworkStreamId()
-
- /**
- * Gets the receiver object that will be sent to the worker nodes
- * to receive data. This method needs to defined by any specific implementation
- * of a NetworkInputDStream.
- */
- def getReceiver(): NetworkReceiver[T]
-
- // Nothing to start or stop as both taken care of by the NetworkInputTracker.
- def start() {}
-
- def stop() {}
-
- /** Ask NetworkInputTracker for received data blocks and generates RDDs with them. */
- override def compute(validTime: Time): Option[RDD[T]] = {
- // If this is called for any time before the start time of the context,
- // then this returns an empty RDD. This may happen when recovering from a
- // master failure
- if (validTime >= graph.startTime) {
- val blockInfo = ssc.scheduler.networkInputTracker.getReceivedBlockInfo(id)
- receivedBlockInfo(validTime) = blockInfo
- val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId])
- Some(new BlockRDD[T](ssc.sc, blockIds))
- } else {
- Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
- }
- }
-
- /** Get information on received blocks. */
- private[streaming] def getReceivedBlockInfo(time: Time) = {
- receivedBlockInfo(time)
- }
-
- /**
- * Clear metadata that are older than `rememberDuration` of this DStream.
- * This is an internal method that should not be called directly. This
- * implementation overrides the default implementation to clear received
- * block information.
- */
- private[streaming] override def clearMetadata(time: Time) {
- super.clearMetadata(time)
- val oldReceivedBlocks = receivedBlockInfo.filter(_._1 <= (time - rememberDuration))
- receivedBlockInfo --= oldReceivedBlocks.keys
- logDebug("Cleared " + oldReceivedBlocks.size + " RDDs that were older than " +
- (time - rememberDuration) + ": " + oldReceivedBlocks.keys.mkString(", "))
- }
-}
-
-
-private[streaming] sealed trait NetworkReceiverMessage
-private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverMessage
-
-/**
- * Abstract class of a receiver that can be run on worker nodes to receive external data. See
- * [[org.apache.spark.streaming.dstream.NetworkInputDStream]] for an explanation.
- */
-abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging {
-
- /** Local SparkEnv */
- lazy protected val env = SparkEnv.get
-
- /** Remote Akka actor for the NetworkInputTracker */
- lazy protected val trackerActor = {
- val ip = env.conf.get("spark.driver.host", "localhost")
- val port = env.conf.getInt("spark.driver.port", 7077)
- val url = "akka.tcp://spark@%s:%s/user/NetworkInputTracker".format(ip, port)
- env.actorSystem.actorSelection(url)
- }
-
- /** Akka actor for receiving messages from the NetworkInputTracker in the driver */
- lazy protected val actor = env.actorSystem.actorOf(
- Props(new NetworkReceiverActor()), "NetworkReceiver-" + streamId)
-
- /** Timeout for Akka actor messages */
- lazy protected val askTimeout = AkkaUtils.askTimeout(env.conf)
-
- /** Thread that starts the receiver and stays blocked while data is being received */
- lazy protected val receivingThread = Thread.currentThread()
-
- /** Exceptions that occurs while receiving data */
- protected lazy val exceptions = new ArrayBuffer[Exception]
-
- /** Identifier of the stream this receiver is associated with */
- protected var streamId: Int = -1
-
- /**
- * This method will be called to start receiving data. All your receiver
- * starting code should be implemented by defining this function.
- */
- protected def onStart()
-
- /** This method will be called to stop receiving data. */
- protected def onStop()
-
- /** Conveys a placement preference (hostname) for this receiver. */
- def getLocationPreference() : Option[String] = None
-
- /**
- * Start the receiver. First is accesses all the lazy members to
- * materialize them. Then it calls the user-defined onStart() method to start
- * other threads, etc required to receiver the data.
- */
- def start() {
- try {
- // Access the lazy vals to materialize them
- env
- actor
- receivingThread
-
- // Call user-defined onStart()
- logInfo("Starting receiver")
- onStart()
-
- // Wait until interrupt is called on this thread
- while(true) Thread.sleep(100000)
- } catch {
- case ie: InterruptedException =>
- logInfo("Receiving thread has been interrupted, receiver " + streamId + " stopped")
- case e: Exception =>
- logError("Error receiving data in receiver " + streamId, e)
- exceptions += e
- }
-
- // Call user-defined onStop()
- logInfo("Stopping receiver")
- try {
- onStop()
- } catch {
- case e: Exception =>
- logError("Error stopping receiver " + streamId, e)
- exceptions += e
- }
-
- val message = if (exceptions.isEmpty) {
- null
- } else if (exceptions.size == 1) {
- val e = exceptions.head
- "Exception in receiver " + streamId + ": " + e.getMessage + "\n" + e.getStackTraceString
- } else {
- "Multiple exceptions in receiver " + streamId + "(" + exceptions.size + "):\n"
- exceptions.zipWithIndex.map {
- case (e, i) => "Exception " + i + ": " + e.getMessage + "\n" + e.getStackTraceString
- }.mkString("\n")
- }
-
- logInfo("Deregistering receiver " + streamId)
- val future = trackerActor.ask(DeregisterReceiver(streamId, message))(askTimeout)
- Await.result(future, askTimeout)
- logInfo("Deregistered receiver " + streamId)
- env.actorSystem.stop(actor)
- logInfo("Stopped receiver " + streamId)
- }
-
- /**
- * Stop the receiver. First it interrupts the main receiving thread,
- * that is, the thread that called receiver.start().
- */
- def stop() {
- // Stop receiving by interrupting the receiving thread
- receivingThread.interrupt()
- logInfo("Interrupted receiving thread " + receivingThread + " for stopping")
- }
-
- /**
- * Stop the receiver and reports exception to the tracker.
- * This should be called whenever an exception is to be handled on any thread
- * of the receiver.
- */
- protected def stopOnError(e: Exception) {
- logError("Error receiving data", e)
- exceptions += e
- stop()
- }
-
- /**
- * Push a block (as an ArrayBuffer filled with data) into the block manager.
- */
- def pushBlock(
- blockId: StreamBlockId,
- arrayBuffer: ArrayBuffer[T],
- metadata: Any,
- level: StorageLevel
- ) {
- env.blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], level)
- trackerActor ! AddBlock(ReceivedBlockInfo(streamId, blockId, arrayBuffer.size, metadata))
- logDebug("Pushed block " + blockId)
- }
-
- /**
- * Push a block (as bytes) into the block manager.
- */
- def pushBlock(
- blockId: StreamBlockId,
- bytes: ByteBuffer,
- metadata: Any,
- level: StorageLevel
- ) {
- env.blockManager.putBytes(blockId, bytes, level)
- trackerActor ! AddBlock(ReceivedBlockInfo(streamId, blockId, -1, metadata))
- }
-
- /** Set the ID of the DStream that this receiver is associated with */
- protected[streaming] def setStreamId(id: Int) {
- streamId = id
- }
-
- /** A helper actor that communicates with the NetworkInputTracker */
- private class NetworkReceiverActor extends Actor {
-
- override def preStart() {
- val msg = RegisterReceiver(
- streamId, NetworkReceiver.this.getClass.getSimpleName, Utils.localHostName(), self)
- val future = trackerActor.ask(msg)(askTimeout)
- Await.result(future, askTimeout)
- logInfo("Registered receiver " + streamId)
- }
-
- override def receive() = {
- case StopReceiver =>
- logInfo("Received stop signal")
- stop()
- }
- }
-
- /**
- * Batches objects created by a [[org.apache.spark.streaming.dstream.NetworkReceiver]] and puts
- * them into appropriately named blocks at regular intervals. This class starts two threads,
- * one to periodically start a new batch and prepare the previous batch of as a block,
- * the other to push the blocks into the block manager.
- */
- class BlockGenerator(storageLevel: StorageLevel)
- extends Serializable with Logging {
-
- case class Block(id: StreamBlockId, buffer: ArrayBuffer[T], metadata: Any = null)
-
- val clock = new SystemClock()
- val blockInterval = env.conf.getLong("spark.streaming.blockInterval", 200)
- val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer,
- "BlockGenerator")
- val blockStorageLevel = storageLevel
- val blocksForPushing = new ArrayBlockingQueue[Block](1000)
- val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
-
- var currentBuffer = new ArrayBuffer[T]
- var stopped = false
-
- def start() {
- blockIntervalTimer.start()
- blockPushingThread.start()
- logInfo("Started BlockGenerator")
- }
-
- def stop() {
- blockIntervalTimer.stop(false)
- stopped = true
- blockPushingThread.join()
- logInfo("Stopped BlockGenerator")
- }
-
- def += (obj: T): Unit = synchronized {
- currentBuffer += obj
- }
-
- private def updateCurrentBuffer(time: Long): Unit = synchronized {
- try {
- val newBlockBuffer = currentBuffer
- currentBuffer = new ArrayBuffer[T]
- if (newBlockBuffer.size > 0) {
- val blockId = StreamBlockId(NetworkReceiver.this.streamId, time - blockInterval)
- val newBlock = new Block(blockId, newBlockBuffer)
- blocksForPushing.add(newBlock)
- }
- } catch {
- case ie: InterruptedException =>
- logInfo("Block updating timer thread was interrupted")
- case e: Exception =>
- NetworkReceiver.this.stopOnError(e)
- }
- }
-
- private def keepPushingBlocks() {
- logInfo("Started block pushing thread")
- try {
- while(!stopped) {
- Option(blocksForPushing.poll(100, TimeUnit.MILLISECONDS)) match {
- case Some(block) =>
- NetworkReceiver.this.pushBlock(block.id, block.buffer, block.metadata, storageLevel)
- case None =>
- }
- }
- // Push out the blocks that are still left
- logInfo("Pushing out the last " + blocksForPushing.size() + " blocks")
- while (!blocksForPushing.isEmpty) {
- val block = blocksForPushing.take()
- NetworkReceiver.this.pushBlock(block.id, block.buffer, block.metadata, storageLevel)
- logInfo("Blocks left to push " + blocksForPushing.size())
- }
- logInfo("Stopped blocks pushing thread")
- } catch {
- case ie: InterruptedException =>
- logInfo("Block pushing thread was interrupted")
- case e: Exception =>
- NetworkReceiver.this.stopOnError(e)
- }
- }
- }
-}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
index 6f9477020a..186e1bf03a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
@@ -19,13 +19,14 @@ package org.apache.spark.streaming.dstream
import org.apache.spark.streaming.StreamingContext
import scala.reflect.ClassTag
+import org.apache.spark.streaming.receiver.Receiver
private[streaming]
class PluggableInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext,
- receiver: NetworkReceiver[T]) extends NetworkInputDStream[T](ssc_) {
+ receiver: Receiver[T]) extends ReceiverInputDStream[T](ssc_) {
- def getReceiver(): NetworkReceiver[T] = {
+ def getReceiver(): Receiver[T] = {
receiver
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
index dea0f26f90..e2925b9e03 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming.dstream
-import org.apache.spark.Logging
+import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.storage.{StorageLevel, StreamBlockId}
import org.apache.spark.streaming.StreamingContext
@@ -28,6 +28,7 @@ import java.nio.ByteBuffer
import java.nio.channels.{ReadableByteChannel, SocketChannel}
import java.io.EOFException
import java.util.concurrent.ArrayBlockingQueue
+import org.apache.spark.streaming.receiver.Receiver
/**
@@ -42,21 +43,19 @@ class RawInputDStream[T: ClassTag](
host: String,
port: Int,
storageLevel: StorageLevel
- ) extends NetworkInputDStream[T](ssc_ ) with Logging {
+ ) extends ReceiverInputDStream[T](ssc_ ) with Logging {
- def getReceiver(): NetworkReceiver[T] = {
- new RawNetworkReceiver(host, port, storageLevel).asInstanceOf[NetworkReceiver[T]]
+ def getReceiver(): Receiver[T] = {
+ new RawNetworkReceiver(host, port, storageLevel).asInstanceOf[Receiver[T]]
}
}
private[streaming]
class RawNetworkReceiver(host: String, port: Int, storageLevel: StorageLevel)
- extends NetworkReceiver[Any] {
+ extends Receiver[Any](storageLevel) with Logging {
var blockPushingThread: Thread = null
- override def getLocationPreference = None
-
def onStart() {
// Open a socket to the target address and keep reading from it
logInfo("Connecting to " + host + ":" + port)
@@ -73,9 +72,8 @@ class RawNetworkReceiver(host: String, port: Int, storageLevel: StorageLevel)
var nextBlockNumber = 0
while (true) {
val buffer = queue.take()
- val blockId = StreamBlockId(streamId, nextBlockNumber)
nextBlockNumber += 1
- pushBlock(blockId, buffer, null, storageLevel)
+ store(buffer)
}
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
new file mode 100644
index 0000000000..75cabdbf8d
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import scala.collection.mutable.HashMap
+import scala.reflect.ClassTag
+
+import org.apache.spark.rdd.{BlockRDD, RDD}
+import org.apache.spark.storage.BlockId
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
+
+/**
+ * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
+ * that has to start a receiver on worker nodes to receive external data.
+ * Specific implementations of NetworkInputDStream must
+ * define `the getReceiver()` function that gets the receiver object of type
+ * [[org.apache.spark.streaming.receiver.Receiver]] that will be sent
+ * to the workers to receive data.
+ * @param ssc_ Streaming context that will execute this input stream
+ * @tparam T Class type of the object of this stream
+ */
+abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingContext)
+ extends InputDStream[T](ssc_) {
+
+ /** Keeps all received blocks information */
+ private lazy val receivedBlockInfo = new HashMap[Time, Array[ReceivedBlockInfo]]
+
+ /** This is an unique identifier for the network input stream. */
+ val id = ssc.getNewReceiverStreamId()
+
+ /**
+ * Gets the receiver object that will be sent to the worker nodes
+ * to receive data. This method needs to defined by any specific implementation
+ * of a NetworkInputDStream.
+ */
+ def getReceiver(): Receiver[T]
+
+ // Nothing to start or stop as both taken care of by the ReceiverInputTracker.
+ def start() {}
+
+ def stop() {}
+
+ /** Ask ReceiverInputTracker for received data blocks and generates RDDs with them. */
+ override def compute(validTime: Time): Option[RDD[T]] = {
+ // If this is called for any time before the start time of the context,
+ // then this returns an empty RDD. This may happen when recovering from a
+ // master failure
+ if (validTime >= graph.startTime) {
+ val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
+ receivedBlockInfo(validTime) = blockInfo
+ val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId])
+ Some(new BlockRDD[T](ssc.sc, blockIds))
+ } else {
+ Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
+ }
+ }
+
+ /** Get information on received blocks. */
+ private[streaming] def getReceivedBlockInfo(time: Time) = {
+ receivedBlockInfo(time)
+ }
+
+ /**
+ * Clear metadata that are older than `rememberDuration` of this DStream.
+ * This is an internal method that should not be called directly. This
+ * implementation overrides the default implementation to clear received
+ * block information.
+ */
+ private[streaming] override def clearMetadata(time: Time) {
+ super.clearMetadata(time)
+ val oldReceivedBlocks = receivedBlockInfo.filter(_._1 <= (time - rememberDuration))
+ receivedBlockInfo --= oldReceivedBlocks.keys
+ logDebug("Cleared " + oldReceivedBlocks.size + " RDDs that were older than " +
+ (time - rememberDuration) + ": " + oldReceivedBlocks.keys.mkString(", "))
+ }
+}
+
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
index 63d94d1cc6..1e32727eac 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
@@ -24,7 +24,9 @@ import org.apache.spark.util.NextIterator
import scala.reflect.ClassTag
import java.io._
-import java.net.Socket
+import java.net.{UnknownHostException, Socket}
+import org.apache.spark.Logging
+import org.apache.spark.streaming.receiver.Receiver
private[streaming]
class SocketInputDStream[T: ClassTag](
@@ -33,9 +35,9 @@ class SocketInputDStream[T: ClassTag](
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel
- ) extends NetworkInputDStream[T](ssc_) {
+ ) extends ReceiverInputDStream[T](ssc_) {
- def getReceiver(): NetworkReceiver[T] = {
+ def getReceiver(): Receiver[T] = {
new SocketReceiver(host, port, bytesToObjects, storageLevel)
}
}
@@ -46,26 +48,52 @@ class SocketReceiver[T: ClassTag](
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel
- ) extends NetworkReceiver[T] {
+ ) extends Receiver[T](storageLevel) with Logging {
- lazy protected val blockGenerator = new BlockGenerator(storageLevel)
+ var socket: Socket = null
+ var receivingThread: Thread = null
- override def getLocationPreference = None
+ def onStart() {
+ receivingThread = new Thread("Socket Receiver") {
+ override def run() {
+ connect()
+ receive()
+ }
+ }
+ receivingThread.start()
+ }
- protected def onStart() {
- logInfo("Connecting to " + host + ":" + port)
- val socket = new Socket(host, port)
- logInfo("Connected to " + host + ":" + port)
- blockGenerator.start()
- val iterator = bytesToObjects(socket.getInputStream())
- while(iterator.hasNext) {
- val obj = iterator.next
- blockGenerator += obj
+ def onStop() {
+ if (socket != null) {
+ socket.close()
+ }
+ socket = null
+ if (receivingThread != null) {
+ receivingThread.join()
}
}
- protected def onStop() {
- blockGenerator.stop()
+ def connect() {
+ try {
+ logInfo("Connecting to " + host + ":" + port)
+ socket = new Socket(host, port)
+ } catch {
+ case e: Exception =>
+ restart("Could not connect to " + host + ":" + port, e)
+ }
+ }
+
+ def receive() {
+ try {
+ logInfo("Connected to " + host + ":" + port)
+ val iterator = bytesToObjects(socket.getInputStream())
+ while(!isStopped && iterator.hasNext) {
+ store(iterator.next)
+ }
+ } catch {
+ case e: Exception =>
+ restart("Error receiving data from socket", e)
+ }
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala
index da0d364ae7..821cf19481 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala
@@ -15,26 +15,22 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.receivers
+package org.apache.spark.streaming.receiver
-import akka.actor.{ Actor, PoisonPill, Props, SupervisorStrategy }
-import akka.actor.{ actorRef2Scala, ActorRef }
-import akka.actor.{ PossiblyHarmful, OneForOneStrategy }
-import akka.actor.SupervisorStrategy._
+import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.reflect.ClassTag
-import org.apache.spark.storage.{StorageLevel, StreamBlockId}
-import org.apache.spark.streaming.dstream.NetworkReceiver
-
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.collection.mutable.ArrayBuffer
+import akka.actor._
+import akka.actor.SupervisorStrategy.{Escalate, Restart}
+import org.apache.spark.{Logging, SparkEnv}
+import org.apache.spark.storage.StorageLevel
+import java.nio.ByteBuffer
/** A helper with set of defaults for supervisor strategy */
-object ReceiverSupervisorStrategy {
+object ActorSupervisorStrategy {
val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
15 millis) {
@@ -50,9 +46,9 @@ object ReceiverSupervisorStrategy {
* Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
*
* @example {{{
- * class MyActor extends Actor with Receiver{
+ * class MyActor extends Actor with ActorHelper{
* def receive {
- * case anything: String => pushBlock(anything)
+ * case anything: String => store(anything)
* }
* }
*
@@ -65,29 +61,40 @@ object ReceiverSupervisorStrategy {
* to ensure the type safety, i.e parametrized type of push block and InputDStream
* should be same.
*/
-trait Receiver {
+trait ActorHelper {
self: Actor => // to ensure that this can be added to Actor classes only
+ /** Store an iterator of received data as a data block into Spark's memory. */
+ def store[T](iter: Iterator[T]) {
+ println("Storing iterator")
+ context.parent ! IteratorData(iter)
+ }
+
/**
- * Push an iterator received data into Spark Streaming for processing
+ * Store the bytes of received data as a data block into Spark's memory. Note
+ * that the data in the ByteBuffer must be serialized using the same serializer
+ * that Spark is configured to use.
*/
- def pushBlock[T: ClassTag](iter: Iterator[T]) {
- context.parent ! Data(iter)
+ def store(bytes: ByteBuffer) {
+ context.parent ! ByteBufferData(bytes)
}
/**
- * Push a single item of received data into Spark Streaming for processing
+ * Store a single item of received data to Spark's memory.
+ * These single items will be aggregated together into data blocks before
+ * being pushed into Spark's memory.
*/
- def pushBlock[T: ClassTag](data: T) {
- context.parent ! Data(data)
+ def store[T](item: T) {
+ println("Storing item")
+ context.parent ! SingleItemData(item)
}
}
/**
* Statistics for querying the supervisor about state of workers. Used in
* conjunction with `StreamingContext.actorStream` and
- * [[org.apache.spark.streaming.receivers.Receiver]].
+ * [[org.apache.spark.streaming.receiver.ActorHelper]].
*/
case class Statistics(numberOfMsgs: Int,
numberOfWorkers: Int,
@@ -95,7 +102,10 @@ case class Statistics(numberOfMsgs: Int,
otherInfo: String)
/** Case class to receive data sent by child actors */
-private[streaming] case class Data[T: ClassTag](data: T)
+private[streaming] sealed trait ActorReceiverData
+private[streaming] case class SingleItemData[T](item: T) extends ActorReceiverData
+private[streaming] case class IteratorData[T](iterator: Iterator[T]) extends ActorReceiverData
+private[streaming] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData
/**
* Provides Actors as receivers for receiving stream.
@@ -117,16 +127,13 @@ private[streaming] case class Data[T: ClassTag](data: T)
* }}}
*/
private[streaming] class ActorReceiver[T: ClassTag](
- props: Props,
- name: String,
- storageLevel: StorageLevel,
- receiverSupervisorStrategy: SupervisorStrategy)
- extends NetworkReceiver[T] {
+ props: Props,
+ name: String,
+ storageLevel: StorageLevel,
+ receiverSupervisorStrategy: SupervisorStrategy
+ ) extends Receiver[T](storageLevel) with Logging {
- protected lazy val blocksGenerator: BlockGenerator =
- new BlockGenerator(storageLevel)
-
- protected lazy val supervisor = env.actorSystem.actorOf(Props(new Supervisor),
+ protected lazy val supervisor = SparkEnv.get.actorSystem.actorOf(Props(new Supervisor),
"Supervisor" + streamId)
class Supervisor extends Actor {
@@ -140,12 +147,18 @@ private[streaming] class ActorReceiver[T: ClassTag](
def receive = {
- case Data(iter: Iterator[_]) => pushBlock(iter.asInstanceOf[Iterator[T]])
+ case IteratorData(iterator) =>
+ println("received iterator")
+ store(iterator.asInstanceOf[Iterator[T]])
- case Data(msg) =>
- blocksGenerator += msg.asInstanceOf[T]
+ case SingleItemData(msg) =>
+ println("received single")
+ store(msg.asInstanceOf[T])
n.incrementAndGet
+ case ByteBufferData(bytes) =>
+ store(bytes)
+
case props: Props =>
val worker = context.actorOf(props)
logInfo("Started receiver worker at:" + worker.path)
@@ -165,20 +178,14 @@ private[streaming] class ActorReceiver[T: ClassTag](
}
}
- protected def pushBlock(iter: Iterator[T]) {
- val buffer = new ArrayBuffer[T]
- buffer ++= iter
- pushBlock(StreamBlockId(streamId, System.nanoTime()), buffer, null, storageLevel)
- }
-
- protected def onStart() = {
- blocksGenerator.start()
+ def onStart() = {
supervisor
logInfo("Supervision tree for receivers initialized at:" + supervisor.path)
}
- protected def onStop() = {
+ def onStop() = {
supervisor ! PoisonPill
}
}
+
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
new file mode 100644
index 0000000000..78cc2daa56
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.receiver
+
+import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.storage.StreamBlockId
+import org.apache.spark.streaming.util.{RecurringTimer, SystemClock}
+
+/** Listener object for BlockGenerator events */
+private[streaming] trait BlockGeneratorListener {
+ /** Called when a new block needs to be pushed */
+ def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_])
+ /** Called when an error has occurred in BlockGenerator */
+ def onError(message: String, throwable: Throwable)
+}
+
+/**
+ * Generates batches of objects received by a
+ * [[org.apache.spark.streaming.receiver.Receiver]] and puts them into appropriately
+ * named blocks at regular intervals. This class starts two threads,
+ * one to periodically start a new batch and prepare the previous batch of as a block,
+ * the other to push the blocks into the block manager.
+ */
+private[streaming] class BlockGenerator(
+ listener: BlockGeneratorListener,
+ receiverId: Int,
+ conf: SparkConf
+ ) extends Logging {
+
+ private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any])
+
+ private val clock = new SystemClock()
+ private val blockInterval = conf.getLong("spark.streaming.blockInterval", 200)
+ private val blockIntervalTimer =
+ new RecurringTimer(clock, blockInterval, updateCurrentBuffer, "BlockGenerator")
+ private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10)
+ private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
+ private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
+
+ @volatile private var currentBuffer = new ArrayBuffer[Any]
+ @volatile private var stopped = false
+
+ /** Start block generating and pushing threads. */
+ def start() {
+ blockIntervalTimer.start()
+ blockPushingThread.start()
+ logInfo("Started BlockGenerator")
+ }
+
+ /** Stop all threads. */
+ def stop() {
+ logInfo("Stopping BlockGenerator")
+ blockIntervalTimer.stop(interruptTimer = false)
+ stopped = true
+ logInfo("Waiting for block pushing thread")
+ blockPushingThread.join()
+ logInfo("Stopped BlockGenerator")
+ }
+
+ /**
+ * Push a single data item into the buffer. All received data items
+ * will be periodically pushed into BlockManager.
+ */
+ def += (data: Any): Unit = synchronized {
+ currentBuffer += data
+ }
+
+ /** Change the buffer to which single records are added to. */
+ private def updateCurrentBuffer(time: Long): Unit = synchronized {
+ try {
+ val newBlockBuffer = currentBuffer
+ currentBuffer = new ArrayBuffer[Any]
+ if (newBlockBuffer.size > 0) {
+ val blockId = StreamBlockId(receiverId, time - blockInterval)
+ val newBlock = new Block(blockId, newBlockBuffer)
+ blocksForPushing.put(newBlock) // put is blocking when queue is full
+ logDebug("Last element in " + blockId + " is " + newBlockBuffer.last)
+ }
+ } catch {
+ case ie: InterruptedException =>
+ logInfo("Block updating timer thread was interrupted")
+ case t: Throwable =>
+ reportError("Error in block updating thread", t)
+ }
+ }
+
+ /** Keep pushing blocks to the BlockManager. */
+ private def keepPushingBlocks() {
+ logInfo("Started block pushing thread")
+ try {
+ while(!stopped) {
+ Option(blocksForPushing.poll(100, TimeUnit.MILLISECONDS)) match {
+ case Some(block) => pushBlock(block)
+ case None =>
+ }
+ }
+ // Push out the blocks that are still left
+ logInfo("Pushing out the last " + blocksForPushing.size() + " blocks")
+ while (!blocksForPushing.isEmpty) {
+ logDebug("Getting block ")
+ val block = blocksForPushing.take()
+ pushBlock(block)
+ logInfo("Blocks left to push " + blocksForPushing.size())
+ }
+ logInfo("Stopped block pushing thread")
+ } catch {
+ case ie: InterruptedException =>
+ logInfo("Block pushing thread was interrupted")
+ case t: Throwable =>
+ reportError("Error in block pushing thread", t)
+ }
+ }
+
+ private def reportError(message: String, t: Throwable) {
+ logError(message, t)
+ listener.onError(message, t)
+ }
+
+ private def pushBlock(block: Block) {
+ listener.onPushBlock(block.id, block.buffer)
+ logInfo("Pushed block " + block.id)
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
new file mode 100644
index 0000000000..44eecf1dd2
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.receiver
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConversions._
+
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Abstract class of a receiver that can be run on worker nodes to receive external data. A
+ * custom receiver can be defined by defining the functions onStart() and onStop(). onStart()
+ * should define the setup steps necessary to start receiving data,
+ * and onStop() should define the cleanup steps necessary to stop receiving data. A custom
+ * receiver would look something like this.
+ *
+ * @example {{{
+ * class MyReceiver(storageLevel: StorageLevel) extends NetworkReceiver[String](storageLevel) {
+ * def onStart() {
+ * // Setup stuff (start threads, open sockets, etc.) to start receiving data.
+ * // Must start new thread to receive data, as onStart() must be non-blocking.
+ *
+ * // Call store(...) in those threads to store received data into Spark's memory.
+ *
+ * // Call stop(...), restart() or reportError(...) on any thread based on how
+ * // different errors should be handled.
+ *
+ * // See corresponding method documentation for more details
+ * }
+ *
+ * def onStop() {
+ * // Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data.
+ * }
+ * }
+ * }}}
+ */
+abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable {
+
+ /**
+ * This method is called by the system when the receiver is started. This function
+ * must initialize all resources (threads, buffers, etc.) necessary for receiving data.
+ * This function must be non-blocking, so receiving the data must occur on a different
+ * thread. Received data can be stored with Spark by calling `store(data)`.
+ *
+ * If there are errors in threads started here, then following options can be done
+ * (i) `reportError(...)` can be called to report the error to the driver.
+ * The receiving of data will continue uninterrupted.
+ * (ii) `stop(...)` can be called to stop receiving data. This will call `onStop()` to
+ * clear up all resources allocated (threads, buffers, etc.) during `onStart()`.
+ * (iii) `restart(...)` can be called to restart the receiver. This will call `onStop()`
+ * immediately, and then `onStart()` after a delay.
+ */
+ def onStart()
+
+ /**
+ * This method is called by the system when the receiver is stopped. All resources
+ * (threads, buffers, etc.) setup in `onStart()` must be cleaned up in this method.
+ */
+ def onStop()
+
+ /** Override this to specify a preferred location (hostname). */
+ def preferredLocation : Option[String] = None
+
+ /**
+ * Store a single item of received data to Spark's memory.
+ * These single items will be aggregated together into data blocks before
+ * being pushed into Spark's memory.
+ */
+ def store(dataItem: T) {
+ executor.pushSingle(dataItem)
+ }
+
+ /** Store an ArrayBuffer of received data as a data block into Spark's memory. */
+ def store(dataBuffer: ArrayBuffer[T]) {
+ executor.pushArrayBuffer(dataBuffer, None, None)
+ }
+
+ /**
+ * Store an ArrayBuffer of received data as a data block into Spark's memory.
+ * The metadata will be associated with this block of data
+ * for being used in the corresponding InputDStream.
+ */
+ def store(dataBuffer: ArrayBuffer[T], metadata: Any) {
+ executor.pushArrayBuffer(dataBuffer, Some(metadata), None)
+ }
+
+ /** Store an iterator of received data as a data block into Spark's memory. */
+ def store(dataIterator: Iterator[T]) {
+ executor.pushIterator(dataIterator, None, None)
+ }
+
+ /**
+ * Store an iterator of received data as a data block into Spark's memory.
+ * The metadata will be associated with this block of data
+ * for being used in the corresponding InputDStream.
+ */
+ def store(dataIterator: java.util.Iterator[T], metadata: Any) {
+ executor.pushIterator(dataIterator, Some(metadata), None)
+ }
+
+ /** Store an iterator of received data as a data block into Spark's memory. */
+ def store(dataIterator: java.util.Iterator[T]) {
+ executor.pushIterator(dataIterator, None, None)
+ }
+
+ /**
+ * Store an iterator of received data as a data block into Spark's memory.
+ * The metadata will be associated with this block of data
+ * for being used in the corresponding InputDStream.
+ */
+ def store(dataIterator: Iterator[T], metadata: Any) {
+ executor.pushIterator(dataIterator, Some(metadata), None)
+ }
+
+ /**
+ * Store the bytes of received data as a data block into Spark's memory. Note
+ * that the data in the ByteBuffer must be serialized using the same serializer
+ * that Spark is configured to use.
+ */
+ def store(bytes: ByteBuffer) {
+ executor.pushBytes(bytes, None, None)
+ }
+
+ /**
+ * Store the bytes of received data as a data block into Spark's memory.
+ * The metadata will be associated with this block of data
+ * for being used in the corresponding InputDStream.
+ */
+ def store(bytes: ByteBuffer, metadata: Any) {
+ executor.pushBytes(bytes, Some(metadata), None)
+ }
+
+ /** Report exceptions in receiving data. */
+ def reportError(message: String, throwable: Throwable) {
+ executor.reportError(message, throwable)
+ }
+
+ /**
+ * Restart the receiver. This will call `onStop()` immediately and return.
+ * Asynchronously, after a delay, `onStart()` will be called.
+ * The `message` will be reported to the driver.
+ * The delay is defined by the Spark configuration
+ * `spark.streaming.receiverRestartDelay`.
+ */
+ def restart(message: String) {
+ executor.restartReceiver(message)
+ }
+
+ /**
+ * Restart the receiver. This will call `onStop()` immediately and return.
+ * Asynchronously, after a delay, `onStart()` will be called.
+ * The `message` and `exception` will be reported to the driver.
+ * The delay is defined by the Spark configuration
+ * `spark.streaming.receiverRestartDelay`.
+ */
+ def restart(message: String, error: Throwable) {
+ executor.restartReceiver(message, Some(error))
+ }
+
+ /**
+ * Restart the receiver. This will call `onStop()` immediately and return.
+ * Asynchronously, after the given delay, `onStart()` will be called.
+ */
+ def restart(message: String, error: Throwable, millisecond: Int) {
+ executor.restartReceiver(message, Some(error), millisecond)
+ }
+
+ /** Stop the receiver completely. */
+ def stop(message: String) {
+ executor.stop(message, None)
+ }
+
+ /** Stop the receiver completely due to an exception */
+ def stop(message: String, error: Throwable) {
+ executor.stop(message, Some(error))
+ }
+
+ def isStarted(): Boolean = {
+ executor.isReceiverStarted()
+ }
+
+ /** Check if receiver has been marked for stopping. */
+ def isStopped(): Boolean = {
+ !executor.isReceiverStarted()
+ }
+
+ /** Get unique identifier of this receiver. */
+ def streamId = id
+
+ /*
+ * =================
+ * Private methods
+ * =================
+ */
+
+ /** Identifier of the stream this receiver is associated with. */
+ private var id: Int = -1
+
+ /** Handler object that runs the receiver. This is instantiated lazily in the worker. */
+ private[streaming] var executor_ : ReceiverSupervisor = null
+
+ /** Set the ID of the DStream that this receiver is associated with. */
+ private[streaming] def setReceiverId(id_ : Int) {
+ id = id_
+ }
+
+ /** Attach Network Receiver executor to this receiver. */
+ private[streaming] def attachExecutor(exec: ReceiverSupervisor) {
+ assert(executor_ == null)
+ executor_ = exec
+ }
+
+ /** Get the attached executor. */
+ private def executor = {
+ assert(executor_ != null, "Executor has not been attached to this receiver")
+ executor_
+ }
+}
+
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
new file mode 100644
index 0000000000..6ab3ca6ea5
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.receiver
+
+/** Messages sent to the NetworkReceiver. */
+private[streaming] sealed trait NetworkReceiverMessage
+private[streaming] object StopReceiver extends NetworkReceiverMessage
+
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
new file mode 100644
index 0000000000..256b3335e4
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.receiver
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.storage.StreamBlockId
+import java.util.concurrent.CountDownLatch
+import scala.concurrent._
+import ExecutionContext.Implicits.global
+
+/**
+ * Abstract class that is responsible for supervising a Receiver in the worker.
+ * It provides all the necessary interfaces for handling the data received by the receiver.
+ */
+private[streaming] abstract class ReceiverSupervisor(
+ receiver: Receiver[_],
+ conf: SparkConf
+ ) extends Logging {
+
+ /** Enumeration to identify current state of the StreamingContext */
+ object ReceiverState extends Enumeration {
+ type CheckpointState = Value
+ val Initialized, Started, Stopped = Value
+ }
+ import ReceiverState._
+
+ // Attach the executor to the receiver
+ receiver.attachExecutor(this)
+
+ /** Receiver id */
+ protected val streamId = receiver.streamId
+
+ /** Has the receiver been marked for stop. */
+ private val stopLatch = new CountDownLatch(1)
+
+ /** Time between a receiver is stopped and started again */
+ private val defaultRestartDelay = conf.getInt("spark.streaming.receiverRestartDelay", 2000)
+
+ /** Exception associated with the stopping of the receiver */
+ @volatile protected var stoppingError: Throwable = null
+
+ /** State of the receiver */
+ @volatile private[streaming] var receiverState = Initialized
+
+ /** Push a single data item to backend data store. */
+ def pushSingle(data: Any)
+
+ /** Store the bytes of received data as a data block into Spark's memory. */
+ def pushBytes(
+ bytes: ByteBuffer,
+ optionalMetadata: Option[Any],
+ optionalBlockId: Option[StreamBlockId]
+ )
+
+ /** Store a iterator of received data as a data block into Spark's memory. */
+ def pushIterator(
+ iterator: Iterator[_],
+ optionalMetadata: Option[Any],
+ optionalBlockId: Option[StreamBlockId]
+ )
+
+ /** Store an ArrayBuffer of received data as a data block into Spark's memory. */
+ def pushArrayBuffer(
+ arrayBuffer: ArrayBuffer[_],
+ optionalMetadata: Option[Any],
+ optionalBlockId: Option[StreamBlockId]
+ )
+
+ /** Report errors. */
+ def reportError(message: String, throwable: Throwable)
+
+ /** Start the executor */
+ def start() {
+ startReceiver()
+ }
+
+ /** Mark the executor and the receiver for stopping */
+ def stop(message: String, error: Option[Throwable]) {
+ stoppingError = error.orNull
+ stopReceiver(message, error)
+ stopLatch.countDown()
+ }
+
+ /** Start receiver */
+ def startReceiver(): Unit = synchronized {
+ try {
+ logInfo("Starting receiver")
+ onReceiverStart()
+ receiverState = Started
+ } catch {
+ case t: Throwable =>
+ stop("Error starting receiver " + streamId, Some(t))
+ }
+ }
+
+ /** Stop receiver */
+ def stopReceiver(message: String, error: Option[Throwable]): Unit = synchronized {
+ try {
+ receiverState = Stopped
+ onReceiverStop(message, error)
+ } catch {
+ case t: Throwable =>
+ stop("Error stopping receiver " + streamId, Some(t))
+ }
+ }
+
+ /** Restart receiver with delay */
+ def restartReceiver(message: String, error: Option[Throwable] = None) {
+ restartReceiver(message, error, defaultRestartDelay)
+ }
+
+ /** Restart receiver with delay */
+ def restartReceiver(message: String, error: Option[Throwable], delay: Int) {
+ logWarning("Restarting receiver with delay " + delay + " ms: " + message,
+ error.getOrElse(null))
+ stopReceiver("Restarting receiver with delay " + delay + "ms: " + message, error)
+ future {
+ logDebug("Sleeping for " + delay)
+ Thread.sleep(delay)
+ logDebug("Starting receiver again")
+ startReceiver()
+ logInfo("Receiver started again")
+ }
+ }
+
+ /** Called when the receiver needs to be started */
+ protected def onReceiverStart(): Unit = synchronized {
+ // Call user-defined onStart()
+ logInfo("Calling receiver onStart")
+ receiver.onStart()
+ logInfo("Called receiver onStart")
+ }
+
+ /** Called when the receiver needs to be stopped */
+ protected def onReceiverStop(message: String, error: Option[Throwable]): Unit = synchronized {
+ // Call user-defined onStop()
+ logInfo("Calling receiver onStop")
+ receiver.onStop()
+ logInfo("Called receiver onStop")
+ }
+
+ /** Check if receiver has been marked for stopping */
+ def isReceiverStarted() = {
+ logDebug("state = " + receiverState)
+ receiverState == Started
+ }
+
+ /** Wait the thread until the executor is stopped */
+ def awaitTermination() {
+ stopLatch.await()
+ logInfo("Waiting for executor stop is over")
+ if (stoppingError != null) {
+ logError("Stopped executor with error: " + stoppingError)
+ } else {
+ logWarning("Stopped executor without error")
+ }
+ if (stoppingError != null) {
+ throw stoppingError
+ }
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
new file mode 100644
index 0000000000..2a3521bd46
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.receiver
+
+import java.nio.ByteBuffer
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+import scala.concurrent.Await
+
+import akka.actor.{Actor, Props}
+import akka.pattern.ask
+
+import org.apache.spark.{Logging, SparkEnv}
+import org.apache.spark.storage.StreamBlockId
+import org.apache.spark.streaming.scheduler._
+import org.apache.spark.util.{Utils, AkkaUtils}
+import org.apache.spark.storage.StreamBlockId
+import org.apache.spark.streaming.scheduler.DeregisterReceiver
+import org.apache.spark.streaming.scheduler.AddBlock
+import scala.Some
+import org.apache.spark.streaming.scheduler.RegisterReceiver
+import com.google.common.base.Throwables
+
+/**
+ * Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]]
+ * which provides all the necessary functionality for handling the data received by
+ * the receiver. Specifically, it creates a [[org.apache.spark.streaming.receiver.BlockGenerator]]
+ * object that is used to divide the received data stream into blocks of data.
+ */
+private[streaming] class ReceiverSupervisorImpl(
+ receiver: Receiver[_],
+ env: SparkEnv
+ ) extends ReceiverSupervisor(receiver, env.conf) with Logging {
+
+ private val blockManager = env.blockManager
+
+ private val storageLevel = receiver.storageLevel
+
+ /** Remote Akka actor for the ReceiverTracker */
+ private val trackerActor = {
+ val ip = env.conf.get("spark.driver.host", "localhost")
+ val port = env.conf.getInt("spark.driver.port", 7077)
+ val url = "akka.tcp://spark@%s:%s/user/ReceiverTracker".format(ip, port)
+ env.actorSystem.actorSelection(url)
+ }
+
+ /** Timeout for Akka actor messages */
+ private val askTimeout = AkkaUtils.askTimeout(env.conf)
+
+ /** Akka actor for receiving messages from the ReceiverTracker in the driver */
+ private val actor = env.actorSystem.actorOf(
+ Props(new Actor {
+ override def preStart() {
+ logInfo("Registered receiver " + streamId)
+ val msg = RegisterReceiver(
+ streamId, receiver.getClass.getSimpleName, Utils.localHostName(), self)
+ val future = trackerActor.ask(msg)(askTimeout)
+ Await.result(future, askTimeout)
+ }
+
+ override def receive() = {
+ case StopReceiver =>
+ logInfo("Received stop signal")
+ stop("Stopped by driver", None)
+ }
+ }), "Receiver-" + streamId + "-" + System.currentTimeMillis())
+
+ /** Unique block ids if one wants to add blocks directly */
+ private val newBlockId = new AtomicLong(System.currentTimeMillis())
+
+ /** Divides received data records into data blocks for pushing in BlockManager. */
+ private val blockGenerator = new BlockGenerator(new BlockGeneratorListener {
+ def onError(message: String, throwable: Throwable) {
+ reportError(message, throwable)
+ }
+
+ def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
+ pushArrayBuffer(arrayBuffer, None, Some(blockId))
+ }
+ }, streamId, env.conf)
+
+ /** Push a single record of received data into block generator. */
+ def pushSingle(data: Any) {
+ blockGenerator += (data)
+ }
+
+ /** Store an ArrayBuffer of received data as a data block into Spark's memory. */
+ def pushArrayBuffer(
+ arrayBuffer: ArrayBuffer[_],
+ optionalMetadata: Option[Any],
+ optionalBlockId: Option[StreamBlockId]
+ ) {
+ val blockId = optionalBlockId.getOrElse(nextBlockId)
+ val time = System.currentTimeMillis
+ blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]],
+ storageLevel, tellMaster = true)
+ logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms")
+ reportPushedBlock(blockId, arrayBuffer.size, optionalMetadata)
+ }
+
+ /** Store a iterator of received data as a data block into Spark's memory. */
+ def pushIterator(
+ iterator: Iterator[_],
+ optionalMetadata: Option[Any],
+ optionalBlockId: Option[StreamBlockId]
+ ) {
+ val blockId = optionalBlockId.getOrElse(nextBlockId)
+ val time = System.currentTimeMillis
+ blockManager.put(blockId, iterator, storageLevel, tellMaster = true)
+ logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms")
+ reportPushedBlock(blockId, -1, optionalMetadata)
+ }
+
+ /** Store the bytes of received data as a data block into Spark's memory. */
+ def pushBytes(
+ bytes: ByteBuffer,
+ optionalMetadata: Option[Any],
+ optionalBlockId: Option[StreamBlockId]
+ ) {
+ val blockId = optionalBlockId.getOrElse(nextBlockId)
+ val time = System.currentTimeMillis
+ blockManager.putBytes(blockId, bytes, storageLevel, tellMaster = true)
+ logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms")
+ reportPushedBlock(blockId, -1, optionalMetadata)
+ }
+
+ /** Report pushed block */
+ def reportPushedBlock(blockId: StreamBlockId, numRecords: Long, optionalMetadata: Option[Any]) {
+ val blockInfo = ReceivedBlockInfo(streamId, blockId, numRecords, optionalMetadata.orNull)
+ trackerActor ! AddBlock(blockInfo)
+ logDebug("Reported block " + blockId)
+ }
+
+ /** Report error to the receiver tracker */
+ def reportError(message: String, error: Throwable) {
+ val errorString = Option(error).map(Throwables.getStackTraceAsString).getOrElse("")
+ trackerActor ! ReportError(streamId, message, errorString)
+ logWarning("Reported error " + message + " - " + error)
+ }
+
+ override def onReceiverStart() {
+ blockGenerator.start()
+ super.onReceiverStart()
+ }
+
+ override def onReceiverStop(message: String, error: Option[Throwable]) {
+ super.onReceiverStop(message, error)
+ blockGenerator.stop()
+ logInfo("Deregistering receiver " + streamId)
+ val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("")
+ val future = trackerActor.ask(
+ DeregisterReceiver(streamId, message, errorString))(askTimeout)
+ Await.result(future, askTimeout)
+ logInfo("Stopped receiver " + streamId)
+ }
+
+ override def stop(message: String, error: Option[Throwable]) {
+ super.stop(message, error)
+ env.actorSystem.stop(actor)
+ }
+
+ /** Generate new block ID */
+ private def nextBlockId = StreamBlockId(streamId, newBlockId.getAndIncrement)
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index e564eccba2..374848358e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -38,6 +38,7 @@ private[streaming]
class JobGenerator(jobScheduler: JobScheduler) extends Logging {
private val ssc = jobScheduler.ssc
+ private val conf = ssc.conf
private val graph = ssc.graph
val clock = {
@@ -93,26 +94,31 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
if (processReceivedData) {
logInfo("Stopping JobGenerator gracefully")
val timeWhenStopStarted = System.currentTimeMillis()
- val stopTimeout = 10 * ssc.graph.batchDuration.milliseconds
+ val stopTimeout = conf.getLong(
+ "spark.streaming.gracefulStopTimeout",
+ 10 * ssc.graph.batchDuration.milliseconds
+ )
val pollTime = 100
// To prevent graceful stop to get stuck permanently
def hasTimedOut = {
val timedOut = System.currentTimeMillis() - timeWhenStopStarted > stopTimeout
- if (timedOut) logWarning("Timed out while stopping the job generator")
+ if (timedOut) {
+ logWarning("Timed out while stopping the job generator (timeout = " + stopTimeout + ")")
+ }
timedOut
}
// Wait until all the received blocks in the network input tracker has
// been consumed by network input DStreams, and jobs have been generated with them
logInfo("Waiting for all received blocks to be consumed for job generation")
- while(!hasTimedOut && jobScheduler.networkInputTracker.hasMoreReceivedBlockIds) {
+ while(!hasTimedOut && jobScheduler.receiverTracker.hasMoreReceivedBlockIds) {
Thread.sleep(pollTime)
}
logInfo("Waited for all received blocks to be consumed for job generation")
// Stop generating jobs
- val stopTime = timer.stop(false)
+ val stopTime = timer.stop(interruptTimer = false)
graph.stop()
logInfo("Stopped generation timer")
@@ -214,7 +220,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
SparkEnv.set(ssc.env)
Try(graph.generateJobs(time)) match {
case Success(jobs) =>
- val receivedBlockInfo = graph.getNetworkInputStreams.map { stream =>
+ val receivedBlockInfo = graph.getReceiverInputStreams.map { stream =>
val streamId = stream.id
val receivedBlockInfo = stream.getReceivedBlockInfo(time)
(streamId, receivedBlockInfo)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index d9ada99b47..1b034b9fb1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -46,7 +46,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
// These two are created only when scheduler starts.
// eventActor not being null means the scheduler has been started and not stopped
- var networkInputTracker: NetworkInputTracker = null
+ var receiverTracker: ReceiverTracker = null
private var eventActor: ActorRef = null
@@ -61,8 +61,8 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
}), "JobScheduler")
listenerBus.start()
- networkInputTracker = new NetworkInputTracker(ssc)
- networkInputTracker.start()
+ receiverTracker = new ReceiverTracker(ssc)
+ receiverTracker.start()
jobGenerator.start()
logInfo("Started JobScheduler")
}
@@ -72,7 +72,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
logDebug("Stopping JobScheduler")
// First, stop receiving
- networkInputTracker.stop()
+ receiverTracker.stop()
// Second, stop generating jobs. If it has to process all received data,
// then this will wait for all the processing through JobScheduler to be over.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 438e72a7ce..3d2537f6f2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -21,12 +21,11 @@ import scala.collection.mutable.{HashMap, SynchronizedMap, SynchronizedQueue}
import scala.language.existentials
import akka.actor._
-
import org.apache.spark.{Logging, SparkEnv, SparkException}
import org.apache.spark.SparkContext._
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.{StreamingContext, Time}
-import org.apache.spark.streaming.dstream.{NetworkReceiver, StopReceiver}
+import org.apache.spark.streaming.receiver.{Receiver, ReceiverSupervisorImpl, StopReceiver}
import org.apache.spark.util.AkkaUtils
/** Information about receiver */
@@ -34,7 +33,7 @@ case class ReceiverInfo(streamId: Int, typ: String, location: String) {
override def toString = s"$typ-$streamId"
}
-/** Information about blocks received by the network receiver */
+/** Information about blocks received by the receiver */
case class ReceivedBlockInfo(
streamId: Int,
blockId: StreamBlockId,
@@ -43,20 +42,21 @@ case class ReceivedBlockInfo(
)
/**
- * Messages used by the NetworkReceiver and the NetworkInputTracker to communicate
+ * Messages used by the NetworkReceiver and the ReceiverTracker to communicate
* with each other.
*/
-private[streaming] sealed trait NetworkInputTrackerMessage
+private[streaming] sealed trait ReceiverTrackerMessage
private[streaming] case class RegisterReceiver(
streamId: Int,
typ: String,
host: String,
receiverActor: ActorRef
- ) extends NetworkInputTrackerMessage
+ ) extends ReceiverTrackerMessage
private[streaming] case class AddBlock(receivedBlockInfo: ReceivedBlockInfo)
- extends NetworkInputTrackerMessage
-private[streaming] case class DeregisterReceiver(streamId: Int, msg: String)
- extends NetworkInputTrackerMessage
+ extends ReceiverTrackerMessage
+private[streaming] case class ReportError(streamId: Int, message: String, error: String)
+private[streaming] case class DeregisterReceiver(streamId: Int, msg: String, error: String)
+ extends ReceiverTrackerMessage
/**
* This class manages the execution of the receivers of NetworkInputDStreams. Instance of
@@ -64,11 +64,11 @@ private[streaming] case class DeregisterReceiver(streamId: Int, msg: String)
* has been called because it needs the final set of input streams at the time of instantiation.
*/
private[streaming]
-class NetworkInputTracker(ssc: StreamingContext) extends Logging {
+class ReceiverTracker(ssc: StreamingContext) extends Logging {
- val networkInputStreams = ssc.graph.getNetworkInputStreams()
- val networkInputStreamMap = Map(networkInputStreams.map(x => (x.id, x)): _*)
- val receiverExecutor = new ReceiverExecutor()
+ val receiverInputStreams = ssc.graph.getReceiverInputStreams()
+ val receiverInputStreamMap = Map(receiverInputStreams.map(x => (x.id, x)): _*)
+ val receiverExecutor = new ReceiverLauncher()
val receiverInfo = new HashMap[Int, ActorRef] with SynchronizedMap[Int, ActorRef]
val receivedBlockInfo = new HashMap[Int, SynchronizedQueue[ReceivedBlockInfo]]
with SynchronizedMap[Int, SynchronizedQueue[ReceivedBlockInfo]]
@@ -83,27 +83,27 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
/** Start the actor and receiver execution thread. */
def start() = synchronized {
if (actor != null) {
- throw new SparkException("NetworkInputTracker already started")
+ throw new SparkException("ReceiverTracker already started")
}
- if (!networkInputStreams.isEmpty) {
- actor = ssc.env.actorSystem.actorOf(Props(new NetworkInputTrackerActor),
- "NetworkInputTracker")
+ if (!receiverInputStreams.isEmpty) {
+ actor = ssc.env.actorSystem.actorOf(Props(new ReceiverTrackerActor),
+ "ReceiverTracker")
receiverExecutor.start()
- logInfo("NetworkInputTracker started")
+ logInfo("ReceiverTracker started")
}
}
/** Stop the receiver execution thread. */
def stop() = synchronized {
- if (!networkInputStreams.isEmpty && actor != null) {
+ if (!receiverInputStreams.isEmpty && actor != null) {
// First, stop the receivers
receiverExecutor.stop()
// Finally, stop the actor
ssc.env.actorSystem.stop(actor)
actor = null
- logInfo("NetworkInputTracker stopped")
+ logInfo("ReceiverTracker stopped")
}
}
@@ -126,20 +126,26 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
receiverActor: ActorRef,
sender: ActorRef
) {
- if (!networkInputStreamMap.contains(streamId)) {
+ if (!receiverInputStreamMap.contains(streamId)) {
throw new Exception("Register received for unexpected id " + streamId)
}
receiverInfo += ((streamId, receiverActor))
ssc.scheduler.listenerBus.post(StreamingListenerReceiverStarted(
ReceiverInfo(streamId, typ, host)
))
- logInfo("Registered receiver for network stream " + streamId + " from " + sender.path.address)
+ logInfo("Registered receiver for stream " + streamId + " from " + sender.path.address)
}
/** Deregister a receiver */
- def deregisterReceiver(streamId: Int, message: String) {
+ def deregisterReceiver(streamId: Int, message: String, error: String) {
receiverInfo -= streamId
- logError("Deregistered receiver for network stream " + streamId + " with message:\n" + message)
+ ssc.scheduler.listenerBus.post(StreamingListenerReceiverStopped(streamId, message, error))
+ val messageWithError = if (error != null && !error.isEmpty) {
+ s"$message - $error"
+ } else {
+ s"$message"
+ }
+ logError(s"Deregistered receiver for stream $streamId: $messageWithError")
}
/** Add new blocks for the given stream */
@@ -149,27 +155,40 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
receivedBlockInfo.blockId)
}
+ /** Report error sent by a receiver */
+ def reportError(streamId: Int, message: String, error: String) {
+ ssc.scheduler.listenerBus.post(StreamingListenerReceiverError(streamId, message, error))
+ val messageWithError = if (error != null && !error.isEmpty) {
+ s"$message - $error"
+ } else {
+ s"$message"
+ }
+ logWarning(s"Error reported by receiver for stream $streamId: $messageWithError")
+ }
+
/** Check if any blocks are left to be processed */
def hasMoreReceivedBlockIds: Boolean = {
!receivedBlockInfo.values.forall(_.isEmpty)
}
/** Actor to receive messages from the receivers. */
- private class NetworkInputTrackerActor extends Actor {
+ private class ReceiverTrackerActor extends Actor {
def receive = {
case RegisterReceiver(streamId, typ, host, receiverActor) =>
registerReceiver(streamId, typ, host, receiverActor, sender)
sender ! true
case AddBlock(receivedBlockInfo) =>
addBlocks(receivedBlockInfo)
- case DeregisterReceiver(streamId, message) =>
- deregisterReceiver(streamId, message)
+ case ReportError(streamId, message, error) =>
+ reportError(streamId, message, error)
+ case DeregisterReceiver(streamId, message, error) =>
+ deregisterReceiver(streamId, message, error)
sender ! true
}
}
/** This thread class runs all the receivers on the cluster. */
- class ReceiverExecutor {
+ class ReceiverLauncher {
@transient val env = ssc.env
@transient val thread = new Thread() {
override def run() {
@@ -177,7 +196,7 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
SparkEnv.set(env)
startReceivers()
} catch {
- case ie: InterruptedException => logInfo("ReceiverExecutor interrupted")
+ case ie: InterruptedException => logInfo("ReceiverLauncher interrupted")
}
}
}
@@ -203,37 +222,39 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
}
/**
- * Get the receivers from the NetworkInputDStreams, distributes them to the
+ * Get the receivers from the ReceiverInputDStreams, distributes them to the
* worker nodes as a parallel collection, and runs them.
*/
private def startReceivers() {
- val receivers = networkInputStreams.map(nis => {
+ val receivers = receiverInputStreams.map(nis => {
val rcvr = nis.getReceiver()
- rcvr.setStreamId(nis.id)
+ rcvr.setReceiverId(nis.id)
rcvr
})
// Right now, we only honor preferences if all receivers have them
- val hasLocationPreferences = receivers.map(_.getLocationPreference().isDefined)
- .reduce(_ && _)
+ val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _)
// Create the parallel collection of receivers to distributed them on the worker nodes
val tempRDD =
if (hasLocationPreferences) {
- val receiversWithPreferences =
- receivers.map(r => (r, Seq(r.getLocationPreference().toString)))
- ssc.sc.makeRDD[NetworkReceiver[_]](receiversWithPreferences)
+ val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
+ ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
}
else {
ssc.sc.makeRDD(receivers, receivers.size)
}
// Function to start the receiver on the worker node
- val startReceiver = (iterator: Iterator[NetworkReceiver[_]]) => {
+ val startReceiver = (iterator: Iterator[Receiver[_]]) => {
if (!iterator.hasNext) {
- throw new Exception("Could not start receiver as details not found.")
+ throw new SparkException(
+ "Could not start receiver as object not found.")
}
- iterator.next().start()
+ val receiver = iterator.next()
+ val executor = new ReceiverSupervisorImpl(receiver, SparkEnv.get)
+ executor.start()
+ executor.awaitTermination()
}
// Run the dummy Spark job to ensure that all slaves have registered.
// This avoids all the receivers to be scheduled on the same node.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
index 5db40ebbeb..9d6ec1fa33 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
@@ -18,6 +18,7 @@
package org.apache.spark.streaming.scheduler
import scala.collection.mutable.Queue
+
import org.apache.spark.util.Distribution
/** Base trait for events related to StreamingListener */
@@ -26,8 +27,13 @@ sealed trait StreamingListenerEvent
case class StreamingListenerBatchSubmitted(batchInfo: BatchInfo) extends StreamingListenerEvent
case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent
case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent
+
case class StreamingListenerReceiverStarted(receiverInfo: ReceiverInfo)
extends StreamingListenerEvent
+case class StreamingListenerReceiverError(streamId: Int, message: String, error: String)
+ extends StreamingListenerEvent
+case class StreamingListenerReceiverStopped(streamId: Int, message: String, error: String)
+ extends StreamingListenerEvent
/** An event used in the listener to shutdown the listener daemon thread. */
private[scheduler] case object StreamingListenerShutdown extends StreamingListenerEvent
@@ -41,14 +47,20 @@ trait StreamingListener {
/** Called when a receiver has been started */
def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { }
+ /** Called when a receiver has reported an error */
+ def onReceiverError(receiverError: StreamingListenerReceiverError) { }
+
+ /** Called when a receiver has been stopped */
+ def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { }
+
/** Called when a batch of jobs has been submitted for processing. */
def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { }
- /** Called when processing of a batch of jobs has completed. */
- def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }
-
/** Called when processing of a batch of jobs has started. */
def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }
+
+ /** Called when processing of a batch of jobs has completed. */
+ def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
index ea03dfc7bf..398724d9e8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
@@ -40,6 +40,10 @@ private[spark] class StreamingListenerBus() extends Logging {
event match {
case receiverStarted: StreamingListenerReceiverStarted =>
listeners.foreach(_.onReceiverStarted(receiverStarted))
+ case receiverError: StreamingListenerReceiverError =>
+ listeners.foreach(_.onReceiverError(receiverError))
+ case receiverStopped: StreamingListenerReceiverStopped =>
+ listeners.foreach(_.onReceiverStopped(receiverStopped))
case batchSubmitted: StreamingListenerBatchSubmitted =>
listeners.foreach(_.onBatchSubmitted(batchSubmitted))
case batchStarted: StreamingListenerBatchStarted =>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index 8b025b09ed..bf637c1446 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -62,8 +62,8 @@ private[ui] class StreamingJobProgressListener(ssc: StreamingContext) extends St
totalCompletedBatches += 1L
}
- def numNetworkReceivers = synchronized {
- ssc.graph.getNetworkInputStreams().size
+ def numReceivers = synchronized {
+ ssc.graph.getReceiverInputStreams().size
}
def numTotalCompletedBatches: Long = synchronized {
@@ -101,7 +101,7 @@ private[ui] class StreamingJobProgressListener(ssc: StreamingContext) extends St
def receivedRecordsDistributions: Map[Int, Option[Distribution]] = synchronized {
val latestBatchInfos = retainedBatches.reverse.take(batchInfoLimit)
val latestBlockInfos = latestBatchInfos.map(_.receivedBlockInfo)
- (0 until numNetworkReceivers).map { receiverId =>
+ (0 until numReceivers).map { receiverId =>
val blockInfoOfParticularReceiver = latestBlockInfos.map { batchInfo =>
batchInfo.get(receiverId).getOrElse(Array.empty)
}
@@ -117,11 +117,11 @@ private[ui] class StreamingJobProgressListener(ssc: StreamingContext) extends St
def lastReceivedBatchRecords: Map[Int, Long] = {
val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.receivedBlockInfo)
lastReceivedBlockInfoOption.map { lastReceivedBlockInfo =>
- (0 until numNetworkReceivers).map { receiverId =>
+ (0 until numReceivers).map { receiverId =>
(receiverId, lastReceivedBlockInfo(receiverId).map(_.numRecords).sum)
}.toMap
}.getOrElse {
- (0 until numNetworkReceivers).map(receiverId => (receiverId, 0L)).toMap
+ (0 until numReceivers).map(receiverId => (receiverId, 0L)).toMap
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index 6607437db5..8fe1219356 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -40,7 +40,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
val content =
generateBasicStats() ++ <br></br> ++
<h4>Statistics over last {listener.retainedCompletedBatches.size} processed batches</h4> ++
- generateNetworkStatsTable() ++
+ generateReceiverStats() ++
generateBatchStatsTable()
UIUtils.headerSparkPage(
content, parent.basePath, parent.appName, "Streaming", parent.headerTabs, parent, Some(5000))
@@ -57,7 +57,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
<strong>Time since start: </strong>{formatDurationVerbose(timeSinceStart)}
</li>
<li>
- <strong>Network receivers: </strong>{listener.numNetworkReceivers}
+ <strong>Network receivers: </strong>{listener.numReceivers}
</li>
<li>
<strong>Batch interval: </strong>{formatDurationVerbose(listener.batchDuration)}
@@ -71,8 +71,8 @@ private[ui] class StreamingPage(parent: StreamingTab)
</ul>
}
- /** Generate stats of data received over the network the streaming program */
- private def generateNetworkStatsTable(): Seq[Node] = {
+ /** Generate stats of data received by the receivers in the streaming program */
+ private def generateReceiverStats(): Seq[Node] = {
val receivedRecordDistributions = listener.receivedRecordsDistributions
val lastBatchReceivedRecord = listener.lastReceivedBatchRecords
val table = if (receivedRecordDistributions.size > 0) {
@@ -86,13 +86,13 @@ private[ui] class StreamingPage(parent: StreamingTab)
"75th percentile rate\n[records/sec]",
"Maximum rate\n[records/sec]"
)
- val dataRows = (0 until listener.numNetworkReceivers).map { receiverId =>
+ val dataRows = (0 until listener.numReceivers).map { receiverId =>
val receiverInfo = listener.receiverInfo(receiverId)
val receiverName = receiverInfo.map(_.toString).getOrElse(s"Receiver-$receiverId")
val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCell)
- val receiverLastBatchRecords = formatDurationVerbose(lastBatchReceivedRecord(receiverId))
+ val receiverLastBatchRecords = formatNumber(lastBatchReceivedRecord(receiverId))
val receivedRecordStats = receivedRecordDistributions(receiverId).map { d =>
- d.getQuantiles().map(r => formatDurationVerbose(r.toLong))
+ d.getQuantiles().map(r => formatNumber(r.toLong))
}.getOrElse {
Seq(emptyCell, emptyCell, emptyCell, emptyCell, emptyCell)
}
@@ -104,8 +104,8 @@ private[ui] class StreamingPage(parent: StreamingTab)
}
val content =
- <h5>Network Input Statistics</h5> ++
- <div>{table.getOrElse("No network receivers")}</div>
+ <h5>Receiver Statistics</h5> ++
+ <div>{table.getOrElse("No receivers")}</div>
content
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
index e016377c94..1a616a0434 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
@@ -77,7 +77,9 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name:
def stop(interruptTimer: Boolean): Long = synchronized {
if (!stopped) {
stopped = true
- if (interruptTimer) thread.interrupt()
+ if (interruptTimer) {
+ thread.interrupt()
+ }
thread.join()
logInfo("Stopped timer for " + name + " after time " + prevTime)
}
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index a0b1bbc34f..f9bfb9b744 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -17,6 +17,7 @@
package org.apache.spark.streaming;
+import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
import org.junit.Assert;
@@ -36,10 +37,6 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaDStreamLike;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
// The test suite itself is Serializable so that anonymous Function implementations can be
// serialized, as an alternative to converting these anonymous classes to static inner classes;
@@ -1668,7 +1665,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
// InputStream functionality is deferred to the existing Scala tests.
@Test
public void testSocketTextStream() {
- JavaDStream<String> test = ssc.socketTextStream("localhost", 12345);
+ JavaReceiverInputDStream<String> test = ssc.socketTextStream("localhost", 12345);
}
@Test
@@ -1701,6 +1698,6 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
@Test
public void testRawSocketStream() {
- JavaDStream<String> test = ssc.rawSocketStream("localhost", 12345);
+ JavaReceiverInputDStream<String> test = ssc.rawSocketStream("localhost", 12345);
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 952511d411..46b7f63b65 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -36,10 +36,9 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.dstream.NetworkReceiver
-import org.apache.spark.streaming.receivers.Receiver
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.util.Utils
+import org.apache.spark.streaming.receiver.{ActorHelper, Receiver}
class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
@@ -207,7 +206,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// set up the network stream using the test receiver
val ssc = new StreamingContext(conf, batchDuration)
- val networkStream = ssc.networkStream[Int](testReceiver)
+ val networkStream = ssc.receiverStream[Int](testReceiver)
val countStream = networkStream.count
val outputBuffer = new ArrayBuffer[Seq[Long]] with SynchronizedBuffer[Seq[Long]]
val outputStream = new TestOutputStream(countStream, outputBuffer)
@@ -301,7 +300,7 @@ object TestServer {
}
/** This is an actor for testing actor input stream */
-class TestActor(port: Int) extends Actor with Receiver {
+class TestActor(port: Int) extends Actor with ActorHelper {
def bytesToString(byteString: ByteString) = byteString.utf8String
@@ -309,24 +308,22 @@ class TestActor(port: Int) extends Actor with Receiver {
def receive = {
case IO.Read(socket, bytes) =>
- pushBlock(bytesToString(bytes))
+ store(bytesToString(bytes))
}
}
/** This is a receiver to test multiple threads inserting data using block generator */
class MultiThreadTestReceiver(numThreads: Int, numRecordsPerThread: Int)
- extends NetworkReceiver[Int] {
+ extends Receiver[Int](StorageLevel.MEMORY_ONLY_SER) with Logging {
lazy val executorPool = Executors.newFixedThreadPool(numThreads)
- lazy val blockGenerator = new BlockGenerator(StorageLevel.MEMORY_ONLY)
lazy val finishCount = new AtomicInteger(0)
- protected def onStart() {
- blockGenerator.start()
+ def onStart() {
(1 to numThreads).map(threadId => {
val runnable = new Runnable {
def run() {
(1 to numRecordsPerThread).foreach(i =>
- blockGenerator += (threadId * numRecordsPerThread + i) )
+ store(threadId * numRecordsPerThread + i) )
if (finishCount.incrementAndGet == numThreads) {
MultiThreadTestReceiver.haveAllThreadsFinished = true
}
@@ -337,7 +334,7 @@ class MultiThreadTestReceiver(numThreads: Int, numRecordsPerThread: Int)
})
}
- protected def onStop() {
+ def onStop() {
executorPool.shutdown()
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
new file mode 100644
index 0000000000..5c0415ad14
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.SparkConf
+import org.apache.spark.storage.{StorageLevel, StreamBlockId}
+import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver, ReceiverSupervisor}
+import org.scalatest.FunSuite
+import org.scalatest.concurrent.Timeouts
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
+
+/** Testsuite for testing the network receiver behavior */
+class NetworkReceiverSuite extends FunSuite with Timeouts {
+
+ test("network receiver life cycle") {
+
+ val receiver = new FakeReceiver
+ val executor = new FakeReceiverSupervisor(receiver)
+
+ assert(executor.isAllEmpty)
+
+ // Thread that runs the executor
+ val executingThread = new Thread() {
+ override def run() {
+ executor.start()
+ executor.awaitTermination()
+ }
+ }
+
+ // Start the receiver
+ executingThread.start()
+
+ // Verify that the receiver
+ intercept[Exception] {
+ failAfter(200 millis) {
+ executingThread.join()
+ }
+ }
+
+ // Verify that receiver was started
+ assert(receiver.onStartCalled)
+ assert(executor.isReceiverStarted)
+ assert(receiver.isStarted)
+ assert(!receiver.isStopped())
+ assert(receiver.otherThread.isAlive)
+ eventually(timeout(100 millis), interval(10 millis)) {
+ assert(receiver.receiving)
+ }
+
+ // Verify whether the data stored by the receiver was sent to the executor
+ val byteBuffer = ByteBuffer.allocate(100)
+ val arrayBuffer = new ArrayBuffer[Int]()
+ val iterator = arrayBuffer.iterator
+ receiver.store(1)
+ receiver.store(byteBuffer)
+ receiver.store(arrayBuffer)
+ receiver.store(iterator)
+ assert(executor.singles.size === 1)
+ assert(executor.singles.head === 1)
+ assert(executor.byteBuffers.size === 1)
+ assert(executor.byteBuffers.head.eq(byteBuffer))
+ assert(executor.iterators.size === 1)
+ assert(executor.iterators.head.eq(iterator))
+ assert(executor.arrayBuffers.size === 1)
+ assert(executor.arrayBuffers.head.eq(arrayBuffer))
+
+ // Verify whether the exceptions reported by the receiver was sent to the executor
+ val exception = new Exception
+ receiver.reportError("Error", exception)
+ assert(executor.errors.size === 1)
+ assert(executor.errors.head.eq(exception))
+
+ // Verify restarting actually stops and starts the receiver
+ receiver.restart("restarting", null, 100)
+ assert(receiver.isStopped)
+ assert(receiver.onStopCalled)
+ eventually(timeout(1000 millis), interval(100 millis)) {
+ assert(receiver.onStartCalled)
+ assert(executor.isReceiverStarted)
+ assert(receiver.isStarted)
+ assert(!receiver.isStopped)
+ assert(receiver.receiving)
+ }
+
+ // Verify that stopping actually stops the thread
+ failAfter(100 millis) {
+ receiver.stop("test")
+ assert(receiver.isStopped)
+ assert(!receiver.otherThread.isAlive)
+
+ // The thread that started the executor should complete
+ // as stop() stops everything
+ executingThread.join()
+ }
+ }
+
+ test("block generator") {
+ val blockGeneratorListener = new FakeBlockGeneratorListener
+ val blockInterval = 200
+ val conf = new SparkConf().set("spark.streaming.blockInterval", blockInterval.toString)
+ val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf)
+ val expectedBlocks = 5
+ val waitTime = expectedBlocks * blockInterval + (blockInterval / 2)
+ val generatedData = new ArrayBuffer[Int]
+
+ // Generate blocks
+ val startTime = System.currentTimeMillis()
+ blockGenerator.start()
+ var count = 0
+ while(System.currentTimeMillis - startTime < waitTime) {
+ blockGenerator += count
+ generatedData += count
+ count += 1
+ Thread.sleep(10)
+ }
+ blockGenerator.stop()
+
+ val recordedData = blockGeneratorListener.arrayBuffers.flatten
+ assert(blockGeneratorListener.arrayBuffers.size > 0)
+ assert(recordedData.toSet === generatedData.toSet)
+ }
+
+ /**
+ * An implementation of NetworkReceiver that is used for testing a receiver's life cycle.
+ */
+ class FakeReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) {
+ var otherThread: Thread = null
+ var receiving = false
+ var onStartCalled = false
+ var onStopCalled = false
+
+ def onStart() {
+ otherThread = new Thread() {
+ override def run() {
+ receiving = true
+ while(!isStopped()) {
+ Thread.sleep(10)
+ }
+ }
+ }
+ onStartCalled = true
+ otherThread.start()
+
+ }
+
+ def onStop() {
+ onStopCalled = true
+ otherThread.join()
+ }
+
+ def reset() {
+ receiving = false
+ onStartCalled = false
+ onStopCalled = false
+ }
+ }
+
+ /**
+ * An implementation of NetworkReceiverExecutor used for testing a NetworkReceiver.
+ * Instead of storing the data in the BlockManager, it stores all the data in a local buffer
+ * that can used for verifying that the data has been forwarded correctly.
+ */
+ class FakeReceiverSupervisor(receiver: FakeReceiver)
+ extends ReceiverSupervisor(receiver, new SparkConf()) {
+ val singles = new ArrayBuffer[Any]
+ val byteBuffers = new ArrayBuffer[ByteBuffer]
+ val iterators = new ArrayBuffer[Iterator[_]]
+ val arrayBuffers = new ArrayBuffer[ArrayBuffer[_]]
+ val errors = new ArrayBuffer[Throwable]
+
+ /** Check if all data structures are clean */
+ def isAllEmpty = {
+ singles.isEmpty && byteBuffers.isEmpty && iterators.isEmpty &&
+ arrayBuffers.isEmpty && errors.isEmpty
+ }
+
+ def pushSingle(data: Any) {
+ singles += data
+ }
+
+ def pushBytes(
+ bytes: ByteBuffer,
+ optionalMetadata: Option[Any],
+ optionalBlockId: Option[StreamBlockId]
+ ) {
+ byteBuffers += bytes
+ }
+
+ def pushIterator(
+ iterator: Iterator[_],
+ optionalMetadata: Option[Any],
+ optionalBlockId: Option[StreamBlockId]
+ ) {
+ iterators += iterator
+ }
+
+ def pushArrayBuffer(
+ arrayBuffer: ArrayBuffer[_],
+ optionalMetadata: Option[Any],
+ optionalBlockId: Option[StreamBlockId]
+ ) {
+ arrayBuffers += arrayBuffer
+ }
+
+ def reportError(message: String, throwable: Throwable) {
+ errors += throwable
+ }
+ }
+
+ /**
+ * An implementation of BlockGeneratorListener that is used to test the BlockGenerator.
+ */
+ class FakeBlockGeneratorListener(pushDelay: Long = 0) extends BlockGeneratorListener {
+ // buffer of data received as ArrayBuffers
+ val arrayBuffers = new ArrayBuffer[ArrayBuffer[Int]]
+ val errors = new ArrayBuffer[Throwable]
+
+ def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
+ val bufferOfInts = arrayBuffer.map(_.asInstanceOf[Int])
+ arrayBuffers += bufferOfInts
+ Thread.sleep(0)
+ }
+
+ def onError(message: String, throwable: Throwable) {
+ errors += throwable
+ }
+ }
+}
+
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index ad5367ab94..6d14b1f785 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -21,7 +21,8 @@ import java.util.concurrent.atomic.AtomicInteger
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException}
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.dstream.{DStream, NetworkReceiver}
+import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.util.{MetadataCleaner, Utils}
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.concurrent.Timeouts
@@ -181,15 +182,15 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
conf.set("spark.cleaner.ttl", "3600")
sc = new SparkContext(conf)
for (i <- 1 to 4) {
- logInfo("==================================")
- ssc = new StreamingContext(sc, batchDuration)
+ logInfo("==================================\n\n\n")
+ ssc = new StreamingContext(sc, Milliseconds(100))
var runningCount = 0
TestReceiver.counter.set(1)
val input = ssc.networkStream(new TestReceiver)
input.count.foreachRDD(rdd => {
val count = rdd.first()
- logInfo("Count = " + count)
runningCount += count.toInt
+ logInfo("Count = " + count + ", Running count = " + runningCount)
})
ssc.start()
ssc.awaitTermination(500)
@@ -216,12 +217,12 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
ssc.start()
}
- // test whether waitForStop() exits after give amount of time
+ // test whether awaitTermination() exits after give amount of time
failAfter(1000 millis) {
ssc.awaitTermination(500)
}
- // test whether waitForStop() does not exit if not time is given
+ // test whether awaitTermination() does not exit if not time is given
val exception = intercept[Exception] {
failAfter(1000 millis) {
ssc.awaitTermination()
@@ -276,23 +277,26 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
class TestException(msg: String) extends Exception(msg)
/** Custom receiver for testing whether all data received by a receiver gets processed or not */
-class TestReceiver extends NetworkReceiver[Int] {
- protected lazy val blockGenerator = new BlockGenerator(StorageLevel.MEMORY_ONLY)
- protected def onStart() {
- blockGenerator.start()
- logInfo("BlockGenerator started on thread " + receivingThread)
- try {
- while(true) {
- blockGenerator += TestReceiver.counter.getAndIncrement
- Thread.sleep(0)
+class TestReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging {
+
+ var receivingThreadOption: Option[Thread] = None
+
+ def onStart() {
+ val thread = new Thread() {
+ override def run() {
+ logInfo("Receiving started")
+ while (!isStopped) {
+ store(TestReceiver.counter.getAndIncrement)
+ }
+ logInfo("Receiving stopped at count value of " + TestReceiver.counter.get())
}
- } finally {
- logInfo("Receiving stopped at count value of " + TestReceiver.counter.get())
}
+ receivingThreadOption = Some(thread)
+ thread.start()
}
- protected def onStop() {
- blockGenerator.stop()
+ def onStop() {
+ // no cleanup to be done, the receiving thread should stop on it own
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 9e0f2c900e..542c697ae3 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -17,10 +17,19 @@
package org.apache.spark.streaming
-import org.apache.spark.streaming.scheduler._
import scala.collection.mutable.ArrayBuffer
-import org.scalatest.matchers.ShouldMatchers
+import scala.concurrent.Future
+import scala.concurrent.ExecutionContext.Implicits.global
+
+import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.streaming.scheduler._
+
+import org.scalatest.matchers.ShouldMatchers
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
+import org.apache.spark.Logging
class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers {
@@ -32,7 +41,7 @@ class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers {
override def batchDuration = Milliseconds(100)
override def actuallyWait = true
- test("basic BatchInfo generation") {
+ test("batch info reporting") {
val ssc = setupStreams(input, operation)
val collector = new BatchInfoCollector
ssc.addStreamingListener(collector)
@@ -54,6 +63,31 @@ class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers {
isInIncreasingOrder(batchInfos.map(_.processingEndTime.get)) should be (true)
}
+ test("receiver info reporting") {
+ val ssc = new StreamingContext("local[2]", "test", Milliseconds(1000))
+ val inputStream = ssc.networkStream(new StreamingListenerSuiteReceiver)
+ inputStream.foreachRDD(_.count)
+
+ val collector = new ReceiverInfoCollector
+ ssc.addStreamingListener(collector)
+
+ ssc.start()
+ try {
+ eventually(timeout(1000 millis), interval(20 millis)) {
+ collector.startedReceiverInfo should have size 1
+ collector.startedReceiverInfo(0).streamId should equal (0)
+ collector.stoppedReceiverStreamIds should have size 1
+ collector.stoppedReceiverStreamIds(0) should equal (0)
+ collector.receiverErrors should have size 1
+ collector.receiverErrors(0)._1 should equal (0)
+ collector.receiverErrors(0)._2 should include ("report error")
+ collector.receiverErrors(0)._3 should include ("report exception")
+ }
+ } finally {
+ ssc.stop()
+ }
+ }
+
/** Check if a sequence of numbers is in increasing order */
def isInIncreasingOrder(seq: Seq[Long]): Boolean = {
for(i <- 1 until seq.size) {
@@ -61,12 +95,46 @@ class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers {
}
true
}
+}
+
+/** Listener that collects information on processed batches */
+class BatchInfoCollector extends StreamingListener {
+ val batchInfos = new ArrayBuffer[BatchInfo]
+ override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
+ batchInfos += batchCompleted.batchInfo
+ }
+}
+
+/** Listener that collects information on processed batches */
+class ReceiverInfoCollector extends StreamingListener {
+ val startedReceiverInfo = new ArrayBuffer[ReceiverInfo]
+ val stoppedReceiverStreamIds = new ArrayBuffer[Int]()
+ val receiverErrors = new ArrayBuffer[(Int, String, String)]()
+
+ override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) {
+ startedReceiverInfo += receiverStarted.receiverInfo
+ }
+
+ override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) {
+ stoppedReceiverStreamIds += receiverStopped.streamId
+ }
+
+ override def onReceiverError(receiverError: StreamingListenerReceiverError) {
+ receiverErrors += ((receiverError.streamId, receiverError.message, receiverError.error))
+ }
+}
- /** Listener that collects information on processed batches */
- class BatchInfoCollector extends StreamingListener {
- val batchInfos = new ArrayBuffer[BatchInfo]
- override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
- batchInfos += batchCompleted.batchInfo
+class StreamingListenerSuiteReceiver extends Receiver[Any](StorageLevel.MEMORY_ONLY) with Logging {
+ def onStart() {
+ Future {
+ logInfo("Started receiver and sleeping")
+ Thread.sleep(10)
+ logInfo("Reporting error and sleeping")
+ reportError("test report error", new Exception("test report exception"))
+ Thread.sleep(10)
+ logInfo("Stopping")
+ stop("test stop error")
}
}
+ def onStop() { }
}