aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala8
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala44
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaInputDStream.scala40
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairInputDStream.scala41
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairReceiverInputDStream.scala42
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaReceiverInputDStream.scala41
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala56
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala362
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala16
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala94
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala62
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala (renamed from streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala)95
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala142
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala236
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala23
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala180
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala180
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala16
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala8
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala (renamed from streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala)101
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala18
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala10
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala18
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala4
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java9
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala19
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala249
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala42
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala84
33 files changed, 1670 insertions, 585 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index d3339063cc..b4adf0e965 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable.ArrayBuffer
import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
import org.apache.spark.Logging
import org.apache.spark.streaming.scheduler.Job
-import org.apache.spark.streaming.dstream.{DStream, NetworkInputDStream, InputDStream}
+import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream, InputDStream}
final private[streaming] class DStreamGraph extends Serializable with Logging {
@@ -103,9 +103,9 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
def getOutputStreams() = this.synchronized { outputStreams.toArray }
- def getNetworkInputStreams() = this.synchronized {
- inputStreams.filter(_.isInstanceOf[NetworkInputDStream[_]])
- .map(_.asInstanceOf[NetworkInputDStream[_]])
+ def getReceiverInputStreams() = this.synchronized {
+ inputStreams.filter(_.isInstanceOf[ReceiverInputDStream[_]])
+ .map(_.asInstanceOf[ReceiverInputDStream[_]])
.toArray
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index e9a4f7ba22..daa5c69bba 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -31,12 +31,11 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
-
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream._
-import org.apache.spark.streaming.receivers._
+import org.apache.spark.streaming.receiver.{ActorSupervisorStrategy, ActorReceiver, Receiver}
import org.apache.spark.streaming.scheduler._
import org.apache.spark.streaming.ui.StreamingTab
import org.apache.spark.util.MetadataCleaner
@@ -139,7 +138,7 @@ class StreamingContext private[streaming] (
}
}
- private val nextNetworkInputStreamId = new AtomicInteger(0)
+ private val nextReceiverInputStreamId = new AtomicInteger(0)
private[streaming] var checkpointDir: String = {
if (isCheckpointPresent) {
@@ -208,15 +207,26 @@ class StreamingContext private[streaming] (
if (isCheckpointPresent) cp_ else null
}
- private[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement()
+ private[streaming] def getNewReceiverStreamId() = nextReceiverInputStreamId.getAndIncrement()
/**
- * Create an input stream with any arbitrary user implemented network receiver.
+ * Create an input stream with any arbitrary user implemented receiver.
* Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
- * @param receiver Custom implementation of NetworkReceiver
+ * @param receiver Custom implementation of Receiver
*/
+ @deprecated("Use receiverStream", "1.0.0")
def networkStream[T: ClassTag](
- receiver: NetworkReceiver[T]): DStream[T] = {
+ receiver: Receiver[T]): ReceiverInputDStream[T] = {
+ receiverStream(receiver)
+ }
+
+ /**
+ * Create an input stream with any arbitrary user implemented receiver.
+ * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
+ * @param receiver Custom implementation of Receiver
+ */
+ def receiverStream[T: ClassTag](
+ receiver: Receiver[T]): ReceiverInputDStream[T] = {
new PluggableInputDStream[T](this, receiver)
}
@@ -236,9 +246,9 @@ class StreamingContext private[streaming] (
props: Props,
name: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
- supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
- ): DStream[T] = {
- networkStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy))
+ supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy
+ ): ReceiverInputDStream[T] = {
+ receiverStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy))
}
/**
@@ -254,7 +264,7 @@ class StreamingContext private[streaming] (
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): DStream[String] = {
+ ): ReceiverInputDStream[String] = {
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
@@ -273,7 +283,7 @@ class StreamingContext private[streaming] (
port: Int,
converter: (InputStream) => Iterator[T],
storageLevel: StorageLevel
- ): DStream[T] = {
+ ): ReceiverInputDStream[T] = {
new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
}
@@ -292,7 +302,7 @@ class StreamingContext private[streaming] (
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): DStream[T] = {
+ ): ReceiverInputDStream[T] = {
new RawInputDStream[T](this, hostname, port, storageLevel)
}
@@ -310,7 +320,7 @@ class StreamingContext private[streaming] (
K: ClassTag,
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag
- ] (directory: String): DStream[(K, V)] = {
+ ] (directory: String): InputDStream[(K, V)] = {
new FileInputDStream[K, V, F](this, directory)
}
@@ -330,7 +340,7 @@ class StreamingContext private[streaming] (
K: ClassTag,
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag
- ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): DStream[(K, V)] = {
+ ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): InputDStream[(K, V)] = {
new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
}
@@ -356,7 +366,7 @@ class StreamingContext private[streaming] (
def queueStream[T: ClassTag](
queue: Queue[RDD[T]],
oneAtATime: Boolean = true
- ): DStream[T] = {
+ ): InputDStream[T] = {
queueStream(queue, oneAtATime, sc.makeRDD(Seq[T](), 1))
}
@@ -373,7 +383,7 @@ class StreamingContext private[streaming] (
queue: Queue[RDD[T]],
oneAtATime: Boolean,
defaultRDD: RDD[T]
- ): DStream[T] = {
+ ): InputDStream[T] = {
new QueueInputDStream(this, queue, oneAtATime, defaultRDD)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
index 13e2bacc92..505e4431e4 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
@@ -97,6 +97,10 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classTag: ClassTag[T]
}
object JavaDStream {
+ /**
+ * Convert a scala [[org.apache.spark.streaming.dstream.DStream]] to a Java-friendly
+ * [[org.apache.spark.streaming.api.java.JavaDStream]].
+ */
implicit def fromDStream[T: ClassTag](dstream: DStream[T]): JavaDStream[T] =
new JavaDStream[T](dstream)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaInputDStream.scala
new file mode 100644
index 0000000000..91f8d342d2
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaInputDStream.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.streaming.dstream.InputDStream
+
+/**
+ * A Java-friendly interface to [[org.apache.spark.streaming.dstream.InputDStream]].
+ */
+class JavaInputDStream[T](val inputDStream: InputDStream[T])
+ (implicit override val classTag: ClassTag[T]) extends JavaDStream[T](inputDStream) {
+}
+
+object JavaInputDStream {
+ /**
+ * Convert a scala [[org.apache.spark.streaming.dstream.InputDStream]] to a Java-friendly
+ * [[org.apache.spark.streaming.api.java.JavaInputDStream]].
+ */
+ implicit def fromInputDStream[T: ClassTag](
+ inputDStream: InputDStream[T]): JavaInputDStream[T] = {
+ new JavaInputDStream[T](inputDStream)
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairInputDStream.scala
new file mode 100644
index 0000000000..add8585308
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairInputDStream.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java
+
+import org.apache.spark.streaming.dstream.InputDStream
+import scala.reflect.ClassTag
+
+/**
+ * A Java-friendly interface to [[org.apache.spark.streaming.dstream.InputDStream]] of
+ * key-value pairs.
+ */
+class JavaPairInputDStream[K, V](val inputDStream: InputDStream[(K, V)])(
+ implicit val kClassTag: ClassTag[K], implicit val vClassTag: ClassTag[V]
+ ) extends JavaPairDStream[K, V](inputDStream) {
+}
+
+object JavaPairInputDStream {
+ /**
+ * Convert a scala [[org.apache.spark.streaming.dstream.InputDStream]] of pairs to a
+ * Java-friendly [[org.apache.spark.streaming.api.java.JavaPairInputDStream]].
+ */
+ implicit def fromInputDStream[K: ClassTag, V: ClassTag](
+ inputDStream: InputDStream[(K, V)]): JavaPairInputDStream[K, V] = {
+ new JavaPairInputDStream[K, V](inputDStream)
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairReceiverInputDStream.scala
new file mode 100644
index 0000000000..974b3e4516
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairReceiverInputDStream.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+/**
+ * A Java-friendly interface to [[org.apache.spark.streaming.dstream.ReceiverInputDStream]], the
+ * abstract class for defining any input stream that receives data over the network.
+ */
+class JavaPairReceiverInputDStream[K, V](val receiverInputDStream: ReceiverInputDStream[(K, V)])
+ (implicit override val kClassTag: ClassTag[K], override implicit val vClassTag: ClassTag[V])
+ extends JavaPairInputDStream[K, V](receiverInputDStream) {
+}
+
+object JavaPairReceiverInputDStream {
+ /**
+ * Convert a scala [[org.apache.spark.streaming.dstream.ReceiverInputDStream]] to a Java-friendly
+ * [[org.apache.spark.streaming.api.java.JavaReceiverInputDStream]].
+ */
+ implicit def fromReceiverInputDStream[K: ClassTag, V: ClassTag](
+ receiverInputDStream: ReceiverInputDStream[(K, V)]): JavaPairReceiverInputDStream[K, V] = {
+ new JavaPairReceiverInputDStream[K, V](receiverInputDStream)
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaReceiverInputDStream.scala
new file mode 100644
index 0000000000..340ef97980
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaReceiverInputDStream.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+/**
+ * A Java-friendly interface to [[org.apache.spark.streaming.dstream.ReceiverInputDStream]], the
+ * abstract class for defining any input stream that receives data over the network.
+ */
+class JavaReceiverInputDStream[T](val receiverInputDStream: ReceiverInputDStream[T])
+ (implicit override val classTag: ClassTag[T]) extends JavaInputDStream[T](receiverInputDStream) {
+}
+
+object JavaReceiverInputDStream {
+ /**
+ * Convert a scala [[org.apache.spark.streaming.dstream.ReceiverInputDStream]] to a Java-friendly
+ * [[org.apache.spark.streaming.api.java.JavaReceiverInputDStream]].
+ */
+ implicit def fromReceiverInputDStream[T: ClassTag](
+ receiverInputDStream: ReceiverInputDStream[T]): JavaReceiverInputDStream[T] = {
+ new JavaReceiverInputDStream[T](receiverInputDStream)
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index c800602d09..fbb2e9f85d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -35,7 +35,8 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.scheduler.StreamingListener
import org.apache.hadoop.conf.Configuration
-import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.dstream.{PluggableInputDStream, ReceiverInputDStream, DStream}
+import org.apache.spark.streaming.receiver.Receiver
/**
* A Java-friendly version of [[org.apache.spark.streaming.StreamingContext]] which is the main
@@ -155,8 +156,10 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param port Port to connect to for receiving data
* @param storageLevel Storage level to use for storing the received objects
*/
- def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel)
- : JavaDStream[String] = {
+ def socketTextStream(
+ hostname: String, port: Int,
+ storageLevel: StorageLevel
+ ): JavaReceiverInputDStream[String] = {
ssc.socketTextStream(hostname, port, storageLevel)
}
@@ -167,7 +170,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
*/
- def socketTextStream(hostname: String, port: Int): JavaDStream[String] = {
+ def socketTextStream(hostname: String, port: Int): JavaReceiverInputDStream[String] = {
ssc.socketTextStream(hostname, port)
}
@@ -186,7 +189,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
port: Int,
converter: JFunction[InputStream, java.lang.Iterable[T]],
storageLevel: StorageLevel)
- : JavaDStream[T] = {
+ : JavaReceiverInputDStream[T] = {
def fn = (x: InputStream) => converter.call(x).toIterator
implicit val cmt: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
@@ -218,10 +221,11 @@ class JavaStreamingContext(val ssc: StreamingContext) {
def rawSocketStream[T](
hostname: String,
port: Int,
- storageLevel: StorageLevel): JavaDStream[T] = {
+ storageLevel: StorageLevel): JavaReceiverInputDStream[T] = {
implicit val cmt: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- JavaDStream.fromDStream(ssc.rawSocketStream(hostname, port, storageLevel))
+ JavaReceiverInputDStream.fromReceiverInputDStream(
+ ssc.rawSocketStream(hostname, port, storageLevel))
}
/**
@@ -233,10 +237,11 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param port Port to connect to for receiving data
* @tparam T Type of the objects in the received blocks
*/
- def rawSocketStream[T](hostname: String, port: Int): JavaDStream[T] = {
+ def rawSocketStream[T](hostname: String, port: Int): JavaReceiverInputDStream[T] = {
implicit val cmt: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- JavaDStream.fromDStream(ssc.rawSocketStream(hostname, port))
+ JavaReceiverInputDStream.fromReceiverInputDStream(
+ ssc.rawSocketStream(hostname, port))
}
/**
@@ -249,7 +254,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @tparam V Value type for reading HDFS file
* @tparam F Input format for reading HDFS file
*/
- def fileStream[K, V, F <: NewInputFormat[K, V]](directory: String): JavaPairDStream[K, V] = {
+ def fileStream[K, V, F <: NewInputFormat[K, V]](
+ directory: String): JavaPairInputDStream[K, V] = {
implicit val cmk: ClassTag[K] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
implicit val cmv: ClassTag[V] =
@@ -275,7 +281,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
name: String,
storageLevel: StorageLevel,
supervisorStrategy: SupervisorStrategy
- ): JavaDStream[T] = {
+ ): JavaReceiverInputDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
ssc.actorStream[T](props, name, storageLevel, supervisorStrategy)
@@ -296,7 +302,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
props: Props,
name: String,
storageLevel: StorageLevel
- ): JavaDStream[T] = {
+ ): JavaReceiverInputDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
ssc.actorStream[T](props, name, storageLevel)
@@ -316,14 +322,14 @@ class JavaStreamingContext(val ssc: StreamingContext) {
def actorStream[T](
props: Props,
name: String
- ): JavaDStream[T] = {
+ ): JavaReceiverInputDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
ssc.actorStream[T](props, name)
}
/**
- * Creates an input stream from an queue of RDDs. In each batch,
+ * Create an input stream from an queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
*
* NOTE: changes to the queue after the stream is created will not be recognized.
@@ -339,7 +345,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
- * Creates an input stream from an queue of RDDs. In each batch,
+ * Create an input stream from an queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
*
* NOTE: changes to the queue after the stream is created will not be recognized.
@@ -347,7 +353,10 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
* @tparam T Type of objects in the RDD
*/
- def queueStream[T](queue: java.util.Queue[JavaRDD[T]], oneAtATime: Boolean): JavaDStream[T] = {
+ def queueStream[T](
+ queue: java.util.Queue[JavaRDD[T]],
+ oneAtATime: Boolean
+ ): JavaInputDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val sQueue = new scala.collection.mutable.Queue[RDD[T]]
@@ -356,7 +365,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
- * Creates an input stream from an queue of RDDs. In each batch,
+ * Create an input stream from an queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
*
* NOTE: changes to the queue after the stream is created will not be recognized.
@@ -368,7 +377,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
def queueStream[T](
queue: java.util.Queue[JavaRDD[T]],
oneAtATime: Boolean,
- defaultRDD: JavaRDD[T]): JavaDStream[T] = {
+ defaultRDD: JavaRDD[T]): JavaInputDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val sQueue = new scala.collection.mutable.Queue[RDD[T]]
@@ -377,6 +386,17 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
+ * Create an input stream with any arbitrary user implemented receiver.
+ * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
+ * @param receiver Custom implementation of Receiver
+ */
+ def receiverStream[T](receiver: Receiver[T]): ReceiverInputDStream[T] = {
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+ ssc.receiverStream(receiver)
+ }
+
+ /**
* Create a unified DStream from multiple DStreams of the same type and same slide duration.
*/
def union[T](first: JavaDStream[T], rest: JList[JavaDStream[T]]): JavaDStream[T] = {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index 226844c228..aa1993f058 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -30,7 +30,7 @@ import scala.reflect.ClassTag
* FileInputDStream, a subclass of InputDStream, monitors a HDFS directory from the driver for
* new files and generates RDDs with the new files. For implementing input streams
* that requires running a receiver on the worker nodes, use
- * [[org.apache.spark.streaming.dstream.NetworkInputDStream]] as the parent class.
+ * [[org.apache.spark.streaming.dstream.ReceiverInputDStream]] as the parent class.
*
* @param ssc_ Streaming context that will execute this input stream
*/
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
deleted file mode 100644
index 5a249706b4..0000000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ /dev/null
@@ -1,362 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.dstream
-
-import java.nio.ByteBuffer
-import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}
-
-import scala.collection.mutable.{ArrayBuffer, HashMap}
-import scala.concurrent.Await
-import scala.reflect.ClassTag
-
-import akka.actor.{Actor, Props}
-import akka.pattern.ask
-
-import org.apache.spark.{Logging, SparkEnv}
-import org.apache.spark.rdd.{BlockRDD, RDD}
-import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId}
-import org.apache.spark.streaming._
-import org.apache.spark.streaming.scheduler.{AddBlock, DeregisterReceiver, ReceivedBlockInfo, RegisterReceiver}
-import org.apache.spark.streaming.util.{RecurringTimer, SystemClock}
-import org.apache.spark.util.{AkkaUtils, Utils}
-
-/**
- * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
- * that has to start a receiver on worker nodes to receive external data.
- * Specific implementations of NetworkInputDStream must
- * define the getReceiver() function that gets the receiver object of type
- * [[org.apache.spark.streaming.dstream.NetworkReceiver]] that will be sent
- * to the workers to receive data.
- * @param ssc_ Streaming context that will execute this input stream
- * @tparam T Class type of the object of this stream
- */
-abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingContext)
- extends InputDStream[T](ssc_) {
-
- /** Keeps all received blocks information */
- private lazy val receivedBlockInfo = new HashMap[Time, Array[ReceivedBlockInfo]]
-
- /** This is an unique identifier for the network input stream. */
- val id = ssc.getNewNetworkStreamId()
-
- /**
- * Gets the receiver object that will be sent to the worker nodes
- * to receive data. This method needs to defined by any specific implementation
- * of a NetworkInputDStream.
- */
- def getReceiver(): NetworkReceiver[T]
-
- // Nothing to start or stop as both taken care of by the NetworkInputTracker.
- def start() {}
-
- def stop() {}
-
- /** Ask NetworkInputTracker for received data blocks and generates RDDs with them. */
- override def compute(validTime: Time): Option[RDD[T]] = {
- // If this is called for any time before the start time of the context,
- // then this returns an empty RDD. This may happen when recovering from a
- // master failure
- if (validTime >= graph.startTime) {
- val blockInfo = ssc.scheduler.networkInputTracker.getReceivedBlockInfo(id)
- receivedBlockInfo(validTime) = blockInfo
- val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId])
- Some(new BlockRDD[T](ssc.sc, blockIds))
- } else {
- Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
- }
- }
-
- /** Get information on received blocks. */
- private[streaming] def getReceivedBlockInfo(time: Time) = {
- receivedBlockInfo(time)
- }
-
- /**
- * Clear metadata that are older than `rememberDuration` of this DStream.
- * This is an internal method that should not be called directly. This
- * implementation overrides the default implementation to clear received
- * block information.
- */
- private[streaming] override def clearMetadata(time: Time) {
- super.clearMetadata(time)
- val oldReceivedBlocks = receivedBlockInfo.filter(_._1 <= (time - rememberDuration))
- receivedBlockInfo --= oldReceivedBlocks.keys
- logDebug("Cleared " + oldReceivedBlocks.size + " RDDs that were older than " +
- (time - rememberDuration) + ": " + oldReceivedBlocks.keys.mkString(", "))
- }
-}
-
-
-private[streaming] sealed trait NetworkReceiverMessage
-private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverMessage
-
-/**
- * Abstract class of a receiver that can be run on worker nodes to receive external data. See
- * [[org.apache.spark.streaming.dstream.NetworkInputDStream]] for an explanation.
- */
-abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging {
-
- /** Local SparkEnv */
- lazy protected val env = SparkEnv.get
-
- /** Remote Akka actor for the NetworkInputTracker */
- lazy protected val trackerActor = {
- val ip = env.conf.get("spark.driver.host", "localhost")
- val port = env.conf.getInt("spark.driver.port", 7077)
- val url = "akka.tcp://spark@%s:%s/user/NetworkInputTracker".format(ip, port)
- env.actorSystem.actorSelection(url)
- }
-
- /** Akka actor for receiving messages from the NetworkInputTracker in the driver */
- lazy protected val actor = env.actorSystem.actorOf(
- Props(new NetworkReceiverActor()), "NetworkReceiver-" + streamId)
-
- /** Timeout for Akka actor messages */
- lazy protected val askTimeout = AkkaUtils.askTimeout(env.conf)
-
- /** Thread that starts the receiver and stays blocked while data is being received */
- lazy protected val receivingThread = Thread.currentThread()
-
- /** Exceptions that occurs while receiving data */
- protected lazy val exceptions = new ArrayBuffer[Exception]
-
- /** Identifier of the stream this receiver is associated with */
- protected var streamId: Int = -1
-
- /**
- * This method will be called to start receiving data. All your receiver
- * starting code should be implemented by defining this function.
- */
- protected def onStart()
-
- /** This method will be called to stop receiving data. */
- protected def onStop()
-
- /** Conveys a placement preference (hostname) for this receiver. */
- def getLocationPreference() : Option[String] = None
-
- /**
- * Start the receiver. First is accesses all the lazy members to
- * materialize them. Then it calls the user-defined onStart() method to start
- * other threads, etc required to receiver the data.
- */
- def start() {
- try {
- // Access the lazy vals to materialize them
- env
- actor
- receivingThread
-
- // Call user-defined onStart()
- logInfo("Starting receiver")
- onStart()
-
- // Wait until interrupt is called on this thread
- while(true) Thread.sleep(100000)
- } catch {
- case ie: InterruptedException =>
- logInfo("Receiving thread has been interrupted, receiver " + streamId + " stopped")
- case e: Exception =>
- logError("Error receiving data in receiver " + streamId, e)
- exceptions += e
- }
-
- // Call user-defined onStop()
- logInfo("Stopping receiver")
- try {
- onStop()
- } catch {
- case e: Exception =>
- logError("Error stopping receiver " + streamId, e)
- exceptions += e
- }
-
- val message = if (exceptions.isEmpty) {
- null
- } else if (exceptions.size == 1) {
- val e = exceptions.head
- "Exception in receiver " + streamId + ": " + e.getMessage + "\n" + e.getStackTraceString
- } else {
- "Multiple exceptions in receiver " + streamId + "(" + exceptions.size + "):\n"
- exceptions.zipWithIndex.map {
- case (e, i) => "Exception " + i + ": " + e.getMessage + "\n" + e.getStackTraceString
- }.mkString("\n")
- }
-
- logInfo("Deregistering receiver " + streamId)
- val future = trackerActor.ask(DeregisterReceiver(streamId, message))(askTimeout)
- Await.result(future, askTimeout)
- logInfo("Deregistered receiver " + streamId)
- env.actorSystem.stop(actor)
- logInfo("Stopped receiver " + streamId)
- }
-
- /**
- * Stop the receiver. First it interrupts the main receiving thread,
- * that is, the thread that called receiver.start().
- */
- def stop() {
- // Stop receiving by interrupting the receiving thread
- receivingThread.interrupt()
- logInfo("Interrupted receiving thread " + receivingThread + " for stopping")
- }
-
- /**
- * Stop the receiver and reports exception to the tracker.
- * This should be called whenever an exception is to be handled on any thread
- * of the receiver.
- */
- protected def stopOnError(e: Exception) {
- logError("Error receiving data", e)
- exceptions += e
- stop()
- }
-
- /**
- * Push a block (as an ArrayBuffer filled with data) into the block manager.
- */
- def pushBlock(
- blockId: StreamBlockId,
- arrayBuffer: ArrayBuffer[T],
- metadata: Any,
- level: StorageLevel
- ) {
- env.blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], level)
- trackerActor ! AddBlock(ReceivedBlockInfo(streamId, blockId, arrayBuffer.size, metadata))
- logDebug("Pushed block " + blockId)
- }
-
- /**
- * Push a block (as bytes) into the block manager.
- */
- def pushBlock(
- blockId: StreamBlockId,
- bytes: ByteBuffer,
- metadata: Any,
- level: StorageLevel
- ) {
- env.blockManager.putBytes(blockId, bytes, level)
- trackerActor ! AddBlock(ReceivedBlockInfo(streamId, blockId, -1, metadata))
- }
-
- /** Set the ID of the DStream that this receiver is associated with */
- protected[streaming] def setStreamId(id: Int) {
- streamId = id
- }
-
- /** A helper actor that communicates with the NetworkInputTracker */
- private class NetworkReceiverActor extends Actor {
-
- override def preStart() {
- val msg = RegisterReceiver(
- streamId, NetworkReceiver.this.getClass.getSimpleName, Utils.localHostName(), self)
- val future = trackerActor.ask(msg)(askTimeout)
- Await.result(future, askTimeout)
- logInfo("Registered receiver " + streamId)
- }
-
- override def receive() = {
- case StopReceiver =>
- logInfo("Received stop signal")
- stop()
- }
- }
-
- /**
- * Batches objects created by a [[org.apache.spark.streaming.dstream.NetworkReceiver]] and puts
- * them into appropriately named blocks at regular intervals. This class starts two threads,
- * one to periodically start a new batch and prepare the previous batch of as a block,
- * the other to push the blocks into the block manager.
- */
- class BlockGenerator(storageLevel: StorageLevel)
- extends Serializable with Logging {
-
- case class Block(id: StreamBlockId, buffer: ArrayBuffer[T], metadata: Any = null)
-
- val clock = new SystemClock()
- val blockInterval = env.conf.getLong("spark.streaming.blockInterval", 200)
- val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer,
- "BlockGenerator")
- val blockStorageLevel = storageLevel
- val blocksForPushing = new ArrayBlockingQueue[Block](1000)
- val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
-
- var currentBuffer = new ArrayBuffer[T]
- var stopped = false
-
- def start() {
- blockIntervalTimer.start()
- blockPushingThread.start()
- logInfo("Started BlockGenerator")
- }
-
- def stop() {
- blockIntervalTimer.stop(false)
- stopped = true
- blockPushingThread.join()
- logInfo("Stopped BlockGenerator")
- }
-
- def += (obj: T): Unit = synchronized {
- currentBuffer += obj
- }
-
- private def updateCurrentBuffer(time: Long): Unit = synchronized {
- try {
- val newBlockBuffer = currentBuffer
- currentBuffer = new ArrayBuffer[T]
- if (newBlockBuffer.size > 0) {
- val blockId = StreamBlockId(NetworkReceiver.this.streamId, time - blockInterval)
- val newBlock = new Block(blockId, newBlockBuffer)
- blocksForPushing.add(newBlock)
- }
- } catch {
- case ie: InterruptedException =>
- logInfo("Block updating timer thread was interrupted")
- case e: Exception =>
- NetworkReceiver.this.stopOnError(e)
- }
- }
-
- private def keepPushingBlocks() {
- logInfo("Started block pushing thread")
- try {
- while(!stopped) {
- Option(blocksForPushing.poll(100, TimeUnit.MILLISECONDS)) match {
- case Some(block) =>
- NetworkReceiver.this.pushBlock(block.id, block.buffer, block.metadata, storageLevel)
- case None =>
- }
- }
- // Push out the blocks that are still left
- logInfo("Pushing out the last " + blocksForPushing.size() + " blocks")
- while (!blocksForPushing.isEmpty) {
- val block = blocksForPushing.take()
- NetworkReceiver.this.pushBlock(block.id, block.buffer, block.metadata, storageLevel)
- logInfo("Blocks left to push " + blocksForPushing.size())
- }
- logInfo("Stopped blocks pushing thread")
- } catch {
- case ie: InterruptedException =>
- logInfo("Block pushing thread was interrupted")
- case e: Exception =>
- NetworkReceiver.this.stopOnError(e)
- }
- }
- }
-}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
index 6f9477020a..186e1bf03a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
@@ -19,13 +19,14 @@ package org.apache.spark.streaming.dstream
import org.apache.spark.streaming.StreamingContext
import scala.reflect.ClassTag
+import org.apache.spark.streaming.receiver.Receiver
private[streaming]
class PluggableInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext,
- receiver: NetworkReceiver[T]) extends NetworkInputDStream[T](ssc_) {
+ receiver: Receiver[T]) extends ReceiverInputDStream[T](ssc_) {
- def getReceiver(): NetworkReceiver[T] = {
+ def getReceiver(): Receiver[T] = {
receiver
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
index dea0f26f90..e2925b9e03 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming.dstream
-import org.apache.spark.Logging
+import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.storage.{StorageLevel, StreamBlockId}
import org.apache.spark.streaming.StreamingContext
@@ -28,6 +28,7 @@ import java.nio.ByteBuffer
import java.nio.channels.{ReadableByteChannel, SocketChannel}
import java.io.EOFException
import java.util.concurrent.ArrayBlockingQueue
+import org.apache.spark.streaming.receiver.Receiver
/**
@@ -42,21 +43,19 @@ class RawInputDStream[T: ClassTag](
host: String,
port: Int,
storageLevel: StorageLevel
- ) extends NetworkInputDStream[T](ssc_ ) with Logging {
+ ) extends ReceiverInputDStream[T](ssc_ ) with Logging {
- def getReceiver(): NetworkReceiver[T] = {
- new RawNetworkReceiver(host, port, storageLevel).asInstanceOf[NetworkReceiver[T]]
+ def getReceiver(): Receiver[T] = {
+ new RawNetworkReceiver(host, port, storageLevel).asInstanceOf[Receiver[T]]
}
}
private[streaming]
class RawNetworkReceiver(host: String, port: Int, storageLevel: StorageLevel)
- extends NetworkReceiver[Any] {
+ extends Receiver[Any](storageLevel) with Logging {
var blockPushingThread: Thread = null
- override def getLocationPreference = None
-
def onStart() {
// Open a socket to the target address and keep reading from it
logInfo("Connecting to " + host + ":" + port)
@@ -73,9 +72,8 @@ class RawNetworkReceiver(host: String, port: Int, storageLevel: StorageLevel)
var nextBlockNumber = 0
while (true) {
val buffer = queue.take()
- val blockId = StreamBlockId(streamId, nextBlockNumber)
nextBlockNumber += 1
- pushBlock(blockId, buffer, null, storageLevel)
+ store(buffer)
}
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
new file mode 100644
index 0000000000..75cabdbf8d
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import scala.collection.mutable.HashMap
+import scala.reflect.ClassTag
+
+import org.apache.spark.rdd.{BlockRDD, RDD}
+import org.apache.spark.storage.BlockId
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
+
+/**
+ * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
+ * that has to start a receiver on worker nodes to receive external data.
+ * Specific implementations of NetworkInputDStream must
+ * define `the getReceiver()` function that gets the receiver object of type
+ * [[org.apache.spark.streaming.receiver.Receiver]] that will be sent
+ * to the workers to receive data.
+ * @param ssc_ Streaming context that will execute this input stream
+ * @tparam T Class type of the object of this stream
+ */
+abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingContext)
+ extends InputDStream[T](ssc_) {
+
+ /** Keeps all received blocks information */
+ private lazy val receivedBlockInfo = new HashMap[Time, Array[ReceivedBlockInfo]]
+
+ /** This is an unique identifier for the network input stream. */
+ val id = ssc.getNewReceiverStreamId()
+
+ /**
+ * Gets the receiver object that will be sent to the worker nodes
+ * to receive data. This method needs to defined by any specific implementation
+ * of a NetworkInputDStream.
+ */
+ def getReceiver(): Receiver[T]
+
+ // Nothing to start or stop as both taken care of by the ReceiverInputTracker.
+ def start() {}
+
+ def stop() {}
+
+ /** Ask ReceiverInputTracker for received data blocks and generates RDDs with them. */
+ override def compute(validTime: Time): Option[RDD[T]] = {
+ // If this is called for any time before the start time of the context,
+ // then this returns an empty RDD. This may happen when recovering from a
+ // master failure
+ if (validTime >= graph.startTime) {
+ val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
+ receivedBlockInfo(validTime) = blockInfo
+ val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId])
+ Some(new BlockRDD[T](ssc.sc, blockIds))
+ } else {
+ Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
+ }
+ }
+
+ /** Get information on received blocks. */
+ private[streaming] def getReceivedBlockInfo(time: Time) = {
+ receivedBlockInfo(time)
+ }
+
+ /**
+ * Clear metadata that are older than `rememberDuration` of this DStream.
+ * This is an internal method that should not be called directly. This
+ * implementation overrides the default implementation to clear received
+ * block information.
+ */
+ private[streaming] override def clearMetadata(time: Time) {
+ super.clearMetadata(time)
+ val oldReceivedBlocks = receivedBlockInfo.filter(_._1 <= (time - rememberDuration))
+ receivedBlockInfo --= oldReceivedBlocks.keys
+ logDebug("Cleared " + oldReceivedBlocks.size + " RDDs that were older than " +
+ (time - rememberDuration) + ": " + oldReceivedBlocks.keys.mkString(", "))
+ }
+}
+
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
index 63d94d1cc6..1e32727eac 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
@@ -24,7 +24,9 @@ import org.apache.spark.util.NextIterator
import scala.reflect.ClassTag
import java.io._
-import java.net.Socket
+import java.net.{UnknownHostException, Socket}
+import org.apache.spark.Logging
+import org.apache.spark.streaming.receiver.Receiver
private[streaming]
class SocketInputDStream[T: ClassTag](
@@ -33,9 +35,9 @@ class SocketInputDStream[T: ClassTag](
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel
- ) extends NetworkInputDStream[T](ssc_) {
+ ) extends ReceiverInputDStream[T](ssc_) {
- def getReceiver(): NetworkReceiver[T] = {
+ def getReceiver(): Receiver[T] = {
new SocketReceiver(host, port, bytesToObjects, storageLevel)
}
}
@@ -46,26 +48,52 @@ class SocketReceiver[T: ClassTag](
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel
- ) extends NetworkReceiver[T] {
+ ) extends Receiver[T](storageLevel) with Logging {
- lazy protected val blockGenerator = new BlockGenerator(storageLevel)
+ var socket: Socket = null
+ var receivingThread: Thread = null
- override def getLocationPreference = None
+ def onStart() {
+ receivingThread = new Thread("Socket Receiver") {
+ override def run() {
+ connect()
+ receive()
+ }
+ }
+ receivingThread.start()
+ }
- protected def onStart() {
- logInfo("Connecting to " + host + ":" + port)
- val socket = new Socket(host, port)
- logInfo("Connected to " + host + ":" + port)
- blockGenerator.start()
- val iterator = bytesToObjects(socket.getInputStream())
- while(iterator.hasNext) {
- val obj = iterator.next
- blockGenerator += obj
+ def onStop() {
+ if (socket != null) {
+ socket.close()
+ }
+ socket = null
+ if (receivingThread != null) {
+ receivingThread.join()
}
}
- protected def onStop() {
- blockGenerator.stop()
+ def connect() {
+ try {
+ logInfo("Connecting to " + host + ":" + port)
+ socket = new Socket(host, port)
+ } catch {
+ case e: Exception =>
+ restart("Could not connect to " + host + ":" + port, e)
+ }
+ }
+
+ def receive() {
+ try {
+ logInfo("Connected to " + host + ":" + port)
+ val iterator = bytesToObjects(socket.getInputStream())
+ while(!isStopped && iterator.hasNext) {
+ store(iterator.next)
+ }
+ } catch {
+ case e: Exception =>
+ restart("Error receiving data from socket", e)
+ }
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala
index da0d364ae7..821cf19481 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala
@@ -15,26 +15,22 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.receivers
+package org.apache.spark.streaming.receiver
-import akka.actor.{ Actor, PoisonPill, Props, SupervisorStrategy }
-import akka.actor.{ actorRef2Scala, ActorRef }
-import akka.actor.{ PossiblyHarmful, OneForOneStrategy }
-import akka.actor.SupervisorStrategy._
+import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.reflect.ClassTag
-import org.apache.spark.storage.{StorageLevel, StreamBlockId}
-import org.apache.spark.streaming.dstream.NetworkReceiver
-
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.collection.mutable.ArrayBuffer
+import akka.actor._
+import akka.actor.SupervisorStrategy.{Escalate, Restart}
+import org.apache.spark.{Logging, SparkEnv}
+import org.apache.spark.storage.StorageLevel
+import java.nio.ByteBuffer
/** A helper with set of defaults for supervisor strategy */
-object ReceiverSupervisorStrategy {
+object ActorSupervisorStrategy {
val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
15 millis) {
@@ -50,9 +46,9 @@ object ReceiverSupervisorStrategy {
* Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
*
* @example {{{
- * class MyActor extends Actor with Receiver{
+ * class MyActor extends Actor with ActorHelper{
* def receive {
- * case anything: String => pushBlock(anything)
+ * case anything: String => store(anything)
* }
* }
*
@@ -65,29 +61,40 @@ object ReceiverSupervisorStrategy {
* to ensure the type safety, i.e parametrized type of push block and InputDStream
* should be same.
*/
-trait Receiver {
+trait ActorHelper {
self: Actor => // to ensure that this can be added to Actor classes only
+ /** Store an iterator of received data as a data block into Spark's memory. */
+ def store[T](iter: Iterator[T]) {
+ println("Storing iterator")
+ context.parent ! IteratorData(iter)
+ }
+
/**
- * Push an iterator received data into Spark Streaming for processing
+ * Store the bytes of received data as a data block into Spark's memory. Note
+ * that the data in the ByteBuffer must be serialized using the same serializer
+ * that Spark is configured to use.
*/
- def pushBlock[T: ClassTag](iter: Iterator[T]) {
- context.parent ! Data(iter)
+ def store(bytes: ByteBuffer) {
+ context.parent ! ByteBufferData(bytes)
}
/**
- * Push a single item of received data into Spark Streaming for processing
+ * Store a single item of received data to Spark's memory.
+ * These single items will be aggregated together into data blocks before
+ * being pushed into Spark's memory.
*/
- def pushBlock[T: ClassTag](data: T) {
- context.parent ! Data(data)
+ def store[T](item: T) {
+ println("Storing item")
+ context.parent ! SingleItemData(item)
}
}
/**
* Statistics for querying the supervisor about state of workers. Used in
* conjunction with `StreamingContext.actorStream` and
- * [[org.apache.spark.streaming.receivers.Receiver]].
+ * [[org.apache.spark.streaming.receiver.ActorHelper]].
*/
case class Statistics(numberOfMsgs: Int,
numberOfWorkers: Int,
@@ -95,7 +102,10 @@ case class Statistics(numberOfMsgs: Int,
otherInfo: String)
/** Case class to receive data sent by child actors */
-private[streaming] case class Data[T: ClassTag](data: T)
+private[streaming] sealed trait ActorReceiverData
+private[streaming] case class SingleItemData[T](item: T) extends ActorReceiverData
+private[streaming] case class IteratorData[T](iterator: Iterator[T]) extends ActorReceiverData
+private[streaming] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData
/**
* Provides Actors as receivers for receiving stream.
@@ -117,16 +127,13 @@ private[streaming] case class Data[T: ClassTag](data: T)
* }}}
*/
private[streaming] class ActorReceiver[T: ClassTag](
- props: Props,
- name: String,
- storageLevel: StorageLevel,
- receiverSupervisorStrategy: SupervisorStrategy)
- extends NetworkReceiver[T] {
+ props: Props,
+ name: String,
+ storageLevel: StorageLevel,
+ receiverSupervisorStrategy: SupervisorStrategy
+ ) extends Receiver[T](storageLevel) with Logging {
- protected lazy val blocksGenerator: BlockGenerator =
- new BlockGenerator(storageLevel)
-
- protected lazy val supervisor = env.actorSystem.actorOf(Props(new Supervisor),
+ protected lazy val supervisor = SparkEnv.get.actorSystem.actorOf(Props(new Supervisor),
"Supervisor" + streamId)
class Supervisor extends Actor {
@@ -140,12 +147,18 @@ private[streaming] class ActorReceiver[T: ClassTag](
def receive = {
- case Data(iter: Iterator[_]) => pushBlock(iter.asInstanceOf[Iterator[T]])
+ case IteratorData(iterator) =>
+ println("received iterator")
+ store(iterator.asInstanceOf[Iterator[T]])
- case Data(msg) =>
- blocksGenerator += msg.asInstanceOf[T]
+ case SingleItemData(msg) =>
+ println("received single")
+ store(msg.asInstanceOf[T])
n.incrementAndGet
+ case ByteBufferData(bytes) =>
+ store(bytes)
+
case props: Props =>
val worker = context.actorOf(props)
logInfo("Started receiver worker at:" + worker.path)
@@ -165,20 +178,14 @@ private[streaming] class ActorReceiver[T: ClassTag](
}
}
- protected def pushBlock(iter: Iterator[T]) {
- val buffer = new ArrayBuffer[T]
- buffer ++= iter
- pushBlock(StreamBlockId(streamId, System.nanoTime()), buffer, null, storageLevel)
- }
-
- protected def onStart() = {
- blocksGenerator.start()
+ def onStart() = {
supervisor
logInfo("Supervision tree for receivers initialized at:" + supervisor.path)
}
- protected def onStop() = {
+ def onStop() = {
supervisor ! PoisonPill
}
}
+
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
new file mode 100644
index 0000000000..78cc2daa56
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.receiver
+
+import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.storage.StreamBlockId
+import org.apache.spark.streaming.util.{RecurringTimer, SystemClock}
+
+/** Listener object for BlockGenerator events */
+private[streaming] trait BlockGeneratorListener {
+ /** Called when a new block needs to be pushed */
+ def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_])
+ /** Called when an error has occurred in BlockGenerator */
+ def onError(message: String, throwable: Throwable)
+}
+
+/**
+ * Generates batches of objects received by a
+ * [[org.apache.spark.streaming.receiver.Receiver]] and puts them into appropriately
+ * named blocks at regular intervals. This class starts two threads,
+ * one to periodically start a new batch and prepare the previous batch of as a block,
+ * the other to push the blocks into the block manager.
+ */
+private[streaming] class BlockGenerator(
+ listener: BlockGeneratorListener,
+ receiverId: Int,
+ conf: SparkConf
+ ) extends Logging {
+
+ private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any])
+
+ private val clock = new SystemClock()
+ private val blockInterval = conf.getLong("spark.streaming.blockInterval", 200)
+ private val blockIntervalTimer =
+ new RecurringTimer(clock, blockInterval, updateCurrentBuffer, "BlockGenerator")
+ private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10)
+ private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
+ private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
+
+ @volatile private var currentBuffer = new ArrayBuffer[Any]
+ @volatile private var stopped = false
+
+ /** Start block generating and pushing threads. */
+ def start() {
+ blockIntervalTimer.start()
+ blockPushingThread.start()
+ logInfo("Started BlockGenerator")
+ }
+
+ /** Stop all threads. */
+ def stop() {
+ logInfo("Stopping BlockGenerator")
+ blockIntervalTimer.stop(interruptTimer = false)
+ stopped = true
+ logInfo("Waiting for block pushing thread")
+ blockPushingThread.join()
+ logInfo("Stopped BlockGenerator")
+ }
+
+ /**
+ * Push a single data item into the buffer. All received data items
+ * will be periodically pushed into BlockManager.
+ */
+ def += (data: Any): Unit = synchronized {
+ currentBuffer += data
+ }
+
+ /** Change the buffer to which single records are added to. */
+ private def updateCurrentBuffer(time: Long): Unit = synchronized {
+ try {
+ val newBlockBuffer = currentBuffer
+ currentBuffer = new ArrayBuffer[Any]
+ if (newBlockBuffer.size > 0) {
+ val blockId = StreamBlockId(receiverId, time - blockInterval)
+ val newBlock = new Block(blockId, newBlockBuffer)
+ blocksForPushing.put(newBlock) // put is blocking when queue is full
+ logDebug("Last element in " + blockId + " is " + newBlockBuffer.last)
+ }
+ } catch {
+ case ie: InterruptedException =>
+ logInfo("Block updating timer thread was interrupted")
+ case t: Throwable =>
+ reportError("Error in block updating thread", t)
+ }
+ }
+
+ /** Keep pushing blocks to the BlockManager. */
+ private def keepPushingBlocks() {
+ logInfo("Started block pushing thread")
+ try {
+ while(!stopped) {
+ Option(blocksForPushing.poll(100, TimeUnit.MILLISECONDS)) match {
+ case Some(block) => pushBlock(block)
+ case None =>
+ }
+ }
+ // Push out the blocks that are still left
+ logInfo("Pushing out the last " + blocksForPushing.size() + " blocks")
+ while (!blocksForPushing.isEmpty) {
+ logDebug("Getting block ")
+ val block = blocksForPushing.take()
+ pushBlock(block)
+ logInfo("Blocks left to push " + blocksForPushing.size())
+ }
+ logInfo("Stopped block pushing thread")
+ } catch {
+ case ie: InterruptedException =>
+ logInfo("Block pushing thread was interrupted")
+ case t: Throwable =>
+ reportError("Error in block pushing thread", t)
+ }
+ }
+
+ private def reportError(message: String, t: Throwable) {
+ logError(message, t)
+ listener.onError(message, t)
+ }
+
+ private def pushBlock(block: Block) {
+ listener.onPushBlock(block.id, block.buffer)
+ logInfo("Pushed block " + block.id)
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
new file mode 100644
index 0000000000..44eecf1dd2
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.receiver
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConversions._
+
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Abstract class of a receiver that can be run on worker nodes to receive external data. A
+ * custom receiver can be defined by defining the functions onStart() and onStop(). onStart()
+ * should define the setup steps necessary to start receiving data,
+ * and onStop() should define the cleanup steps necessary to stop receiving data. A custom
+ * receiver would look something like this.
+ *
+ * @example {{{
+ * class MyReceiver(storageLevel: StorageLevel) extends NetworkReceiver[String](storageLevel) {
+ * def onStart() {
+ * // Setup stuff (start threads, open sockets, etc.) to start receiving data.
+ * // Must start new thread to receive data, as onStart() must be non-blocking.
+ *
+ * // Call store(...) in those threads to store received data into Spark's memory.
+ *
+ * // Call stop(...), restart() or reportError(...) on any thread based on how
+ * // different errors should be handled.
+ *
+ * // See corresponding method documentation for more details
+ * }
+ *
+ * def onStop() {
+ * // Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data.
+ * }
+ * }
+ * }}}
+ */
+abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable {
+
+ /**
+ * This method is called by the system when the receiver is started. This function
+ * must initialize all resources (threads, buffers, etc.) necessary for receiving data.
+ * This function must be non-blocking, so receiving the data must occur on a different
+ * thread. Received data can be stored with Spark by calling `store(data)`.
+ *
+ * If there are errors in threads started here, then following options can be done
+ * (i) `reportError(...)` can be called to report the error to the driver.
+ * The receiving of data will continue uninterrupted.
+ * (ii) `stop(...)` can be called to stop receiving data. This will call `onStop()` to
+ * clear up all resources allocated (threads, buffers, etc.) during `onStart()`.
+ * (iii) `restart(...)` can be called to restart the receiver. This will call `onStop()`
+ * immediately, and then `onStart()` after a delay.
+ */
+ def onStart()
+
+ /**
+ * This method is called by the system when the receiver is stopped. All resources
+ * (threads, buffers, etc.) setup in `onStart()` must be cleaned up in this method.
+ */
+ def onStop()
+
+ /** Override this to specify a preferred location (hostname). */
+ def preferredLocation : Option[String] = None
+
+ /**
+ * Store a single item of received data to Spark's memory.
+ * These single items will be aggregated together into data blocks before
+ * being pushed into Spark's memory.
+ */
+ def store(dataItem: T) {
+ executor.pushSingle(dataItem)
+ }
+
+ /** Store an ArrayBuffer of received data as a data block into Spark's memory. */
+ def store(dataBuffer: ArrayBuffer[T]) {
+ executor.pushArrayBuffer(dataBuffer, None, None)
+ }
+
+ /**
+ * Store an ArrayBuffer of received data as a data block into Spark's memory.
+ * The metadata will be associated with this block of data
+ * for being used in the corresponding InputDStream.
+ */
+ def store(dataBuffer: ArrayBuffer[T], metadata: Any) {
+ executor.pushArrayBuffer(dataBuffer, Some(metadata), None)
+ }
+
+ /** Store an iterator of received data as a data block into Spark's memory. */
+ def store(dataIterator: Iterator[T]) {
+ executor.pushIterator(dataIterator, None, None)
+ }
+
+ /**
+ * Store an iterator of received data as a data block into Spark's memory.
+ * The metadata will be associated with this block of data
+ * for being used in the corresponding InputDStream.
+ */
+ def store(dataIterator: java.util.Iterator[T], metadata: Any) {
+ executor.pushIterator(dataIterator, Some(metadata), None)
+ }
+
+ /** Store an iterator of received data as a data block into Spark's memory. */
+ def store(dataIterator: java.util.Iterator[T]) {
+ executor.pushIterator(dataIterator, None, None)
+ }
+
+ /**
+ * Store an iterator of received data as a data block into Spark's memory.
+ * The metadata will be associated with this block of data
+ * for being used in the corresponding InputDStream.
+ */
+ def store(dataIterator: Iterator[T], metadata: Any) {
+ executor.pushIterator(dataIterator, Some(metadata), None)
+ }
+
+ /**
+ * Store the bytes of received data as a data block into Spark's memory. Note
+ * that the data in the ByteBuffer must be serialized using the same serializer
+ * that Spark is configured to use.
+ */
+ def store(bytes: ByteBuffer) {
+ executor.pushBytes(bytes, None, None)
+ }
+
+ /**
+ * Store the bytes of received data as a data block into Spark's memory.
+ * The metadata will be associated with this block of data
+ * for being used in the corresponding InputDStream.
+ */
+ def store(bytes: ByteBuffer, metadata: Any) {
+ executor.pushBytes(bytes, Some(metadata), None)
+ }
+
+ /** Report exceptions in receiving data. */
+ def reportError(message: String, throwable: Throwable) {
+ executor.reportError(message, throwable)
+ }
+
+ /**
+ * Restart the receiver. This will call `onStop()` immediately and return.
+ * Asynchronously, after a delay, `onStart()` will be called.
+ * The `message` will be reported to the driver.
+ * The delay is defined by the Spark configuration
+ * `spark.streaming.receiverRestartDelay`.
+ */
+ def restart(message: String) {
+ executor.restartReceiver(message)
+ }
+
+ /**
+ * Restart the receiver. This will call `onStop()` immediately and return.
+ * Asynchronously, after a delay, `onStart()` will be called.
+ * The `message` and `exception` will be reported to the driver.
+ * The delay is defined by the Spark configuration
+ * `spark.streaming.receiverRestartDelay`.
+ */
+ def restart(message: String, error: Throwable) {
+ executor.restartReceiver(message, Some(error))
+ }
+
+ /**
+ * Restart the receiver. This will call `onStop()` immediately and return.
+ * Asynchronously, after the given delay, `onStart()` will be called.
+ */
+ def restart(message: String, error: Throwable, millisecond: Int) {
+ executor.restartReceiver(message, Some(error), millisecond)
+ }
+
+ /** Stop the receiver completely. */
+ def stop(message: String) {
+ executor.stop(message, None)
+ }
+
+ /** Stop the receiver completely due to an exception */
+ def stop(message: String, error: Throwable) {
+ executor.stop(message, Some(error))
+ }
+
+ def isStarted(): Boolean = {
+ executor.isReceiverStarted()
+ }
+
+ /** Check if receiver has been marked for stopping. */
+ def isStopped(): Boolean = {
+ !executor.isReceiverStarted()
+ }
+
+ /** Get unique identifier of this receiver. */
+ def streamId = id
+
+ /*
+ * =================
+ * Private methods
+ * =================
+ */
+
+ /** Identifier of the stream this receiver is associated with. */
+ private var id: Int = -1
+
+ /** Handler object that runs the receiver. This is instantiated lazily in the worker. */
+ private[streaming] var executor_ : ReceiverSupervisor = null
+
+ /** Set the ID of the DStream that this receiver is associated with. */
+ private[streaming] def setReceiverId(id_ : Int) {
+ id = id_
+ }
+
+ /** Attach Network Receiver executor to this receiver. */
+ private[streaming] def attachExecutor(exec: ReceiverSupervisor) {
+ assert(executor_ == null)
+ executor_ = exec
+ }
+
+ /** Get the attached executor. */
+ private def executor = {
+ assert(executor_ != null, "Executor has not been attached to this receiver")
+ executor_
+ }
+}
+
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
new file mode 100644
index 0000000000..6ab3ca6ea5
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.receiver
+
+/** Messages sent to the NetworkReceiver. */
+private[streaming] sealed trait NetworkReceiverMessage
+private[streaming] object StopReceiver extends NetworkReceiverMessage
+
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
new file mode 100644
index 0000000000..256b3335e4
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.receiver
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.storage.StreamBlockId
+import java.util.concurrent.CountDownLatch
+import scala.concurrent._
+import ExecutionContext.Implicits.global
+
+/**
+ * Abstract class that is responsible for supervising a Receiver in the worker.
+ * It provides all the necessary interfaces for handling the data received by the receiver.
+ */
+private[streaming] abstract class ReceiverSupervisor(
+ receiver: Receiver[_],
+ conf: SparkConf
+ ) extends Logging {
+
+ /** Enumeration to identify current state of the StreamingContext */
+ object ReceiverState extends Enumeration {
+ type CheckpointState = Value
+ val Initialized, Started, Stopped = Value
+ }
+ import ReceiverState._
+
+ // Attach the executor to the receiver
+ receiver.attachExecutor(this)
+
+ /** Receiver id */
+ protected val streamId = receiver.streamId
+
+ /** Has the receiver been marked for stop. */
+ private val stopLatch = new CountDownLatch(1)
+
+ /** Time between a receiver is stopped and started again */
+ private val defaultRestartDelay = conf.getInt("spark.streaming.receiverRestartDelay", 2000)
+
+ /** Exception associated with the stopping of the receiver */
+ @volatile protected var stoppingError: Throwable = null
+
+ /** State of the receiver */
+ @volatile private[streaming] var receiverState = Initialized
+
+ /** Push a single data item to backend data store. */
+ def pushSingle(data: Any)
+
+ /** Store the bytes of received data as a data block into Spark's memory. */
+ def pushBytes(
+ bytes: ByteBuffer,
+ optionalMetadata: Option[Any],
+ optionalBlockId: Option[StreamBlockId]
+ )
+
+ /** Store a iterator of received data as a data block into Spark's memory. */
+ def pushIterator(
+ iterator: Iterator[_],
+ optionalMetadata: Option[Any],
+ optionalBlockId: Option[StreamBlockId]
+ )
+
+ /** Store an ArrayBuffer of received data as a data block into Spark's memory. */
+ def pushArrayBuffer(
+ arrayBuffer: ArrayBuffer[_],
+ optionalMetadata: Option[Any],
+ optionalBlockId: Option[StreamBlockId]
+ )
+
+ /** Report errors. */
+ def reportError(message: String, throwable: Throwable)
+
+ /** Start the executor */
+ def start() {
+ startReceiver()
+ }
+
+ /** Mark the executor and the receiver for stopping */
+ def stop(message: String, error: Option[Throwable]) {
+ stoppingError = error.orNull
+ stopReceiver(message, error)
+ stopLatch.countDown()
+ }
+
+ /** Start receiver */
+ def startReceiver(): Unit = synchronized {
+ try {
+ logInfo("Starting receiver")
+ onReceiverStart()
+ receiverState = Started
+ } catch {
+ case t: Throwable =>
+ stop("Error starting receiver " + streamId, Some(t))
+ }
+ }
+
+ /** Stop receiver */
+ def stopReceiver(message: String, error: Option[Throwable]): Unit = synchronized {
+ try {
+ receiverState = Stopped
+ onReceiverStop(message, error)
+ } catch {
+ case t: Throwable =>
+ stop("Error stopping receiver " + streamId, Some(t))
+ }
+ }
+
+ /** Restart receiver with delay */
+ def restartReceiver(message: String, error: Option[Throwable] = None) {
+ restartReceiver(message, error, defaultRestartDelay)
+ }
+
+ /** Restart receiver with delay */
+ def restartReceiver(message: String, error: Option[Throwable], delay: Int) {
+ logWarning("Restarting receiver with delay " + delay + " ms: " + message,
+ error.getOrElse(null))
+ stopReceiver("Restarting receiver with delay " + delay + "ms: " + message, error)
+ future {
+ logDebug("Sleeping for " + delay)
+ Thread.sleep(delay)
+ logDebug("Starting receiver again")
+ startReceiver()
+ logInfo("Receiver started again")
+ }
+ }
+
+ /** Called when the receiver needs to be started */
+ protected def onReceiverStart(): Unit = synchronized {
+ // Call user-defined onStart()
+ logInfo("Calling receiver onStart")
+ receiver.onStart()
+ logInfo("Called receiver onStart")
+ }
+
+ /** Called when the receiver needs to be stopped */
+ protected def onReceiverStop(message: String, error: Option[Throwable]): Unit = synchronized {
+ // Call user-defined onStop()
+ logInfo("Calling receiver onStop")
+ receiver.onStop()
+ logInfo("Called receiver onStop")
+ }
+
+ /** Check if receiver has been marked for stopping */
+ def isReceiverStarted() = {
+ logDebug("state = " + receiverState)
+ receiverState == Started
+ }
+
+ /** Wait the thread until the executor is stopped */
+ def awaitTermination() {
+ stopLatch.await()
+ logInfo("Waiting for executor stop is over")
+ if (stoppingError != null) {
+ logError("Stopped executor with error: " + stoppingError)
+ } else {
+ logWarning("Stopped executor without error")
+ }
+ if (stoppingError != null) {
+ throw stoppingError
+ }
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
new file mode 100644
index 0000000000..2a3521bd46
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.receiver
+
+import java.nio.ByteBuffer
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+import scala.concurrent.Await
+
+import akka.actor.{Actor, Props}
+import akka.pattern.ask
+
+import org.apache.spark.{Logging, SparkEnv}
+import org.apache.spark.storage.StreamBlockId
+import org.apache.spark.streaming.scheduler._
+import org.apache.spark.util.{Utils, AkkaUtils}
+import org.apache.spark.storage.StreamBlockId
+import org.apache.spark.streaming.scheduler.DeregisterReceiver
+import org.apache.spark.streaming.scheduler.AddBlock
+import scala.Some
+import org.apache.spark.streaming.scheduler.RegisterReceiver
+import com.google.common.base.Throwables
+
+/**
+ * Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]]
+ * which provides all the necessary functionality for handling the data received by
+ * the receiver. Specifically, it creates a [[org.apache.spark.streaming.receiver.BlockGenerator]]
+ * object that is used to divide the received data stream into blocks of data.
+ */
+private[streaming] class ReceiverSupervisorImpl(
+ receiver: Receiver[_],
+ env: SparkEnv
+ ) extends ReceiverSupervisor(receiver, env.conf) with Logging {
+
+ private val blockManager = env.blockManager
+
+ private val storageLevel = receiver.storageLevel
+
+ /** Remote Akka actor for the ReceiverTracker */
+ private val trackerActor = {
+ val ip = env.conf.get("spark.driver.host", "localhost")
+ val port = env.conf.getInt("spark.driver.port", 7077)
+ val url = "akka.tcp://spark@%s:%s/user/ReceiverTracker".format(ip, port)
+ env.actorSystem.actorSelection(url)
+ }
+
+ /** Timeout for Akka actor messages */
+ private val askTimeout = AkkaUtils.askTimeout(env.conf)
+
+ /** Akka actor for receiving messages from the ReceiverTracker in the driver */
+ private val actor = env.actorSystem.actorOf(
+ Props(new Actor {
+ override def preStart() {
+ logInfo("Registered receiver " + streamId)
+ val msg = RegisterReceiver(
+ streamId, receiver.getClass.getSimpleName, Utils.localHostName(), self)
+ val future = trackerActor.ask(msg)(askTimeout)
+ Await.result(future, askTimeout)
+ }
+
+ override def receive() = {
+ case StopReceiver =>
+ logInfo("Received stop signal")
+ stop("Stopped by driver", None)
+ }
+ }), "Receiver-" + streamId + "-" + System.currentTimeMillis())
+
+ /** Unique block ids if one wants to add blocks directly */
+ private val newBlockId = new AtomicLong(System.currentTimeMillis())
+
+ /** Divides received data records into data blocks for pushing in BlockManager. */
+ private val blockGenerator = new BlockGenerator(new BlockGeneratorListener {
+ def onError(message: String, throwable: Throwable) {
+ reportError(message, throwable)
+ }
+
+ def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
+ pushArrayBuffer(arrayBuffer, None, Some(blockId))
+ }
+ }, streamId, env.conf)
+
+ /** Push a single record of received data into block generator. */
+ def pushSingle(data: Any) {
+ blockGenerator += (data)
+ }
+
+ /** Store an ArrayBuffer of received data as a data block into Spark's memory. */
+ def pushArrayBuffer(
+ arrayBuffer: ArrayBuffer[_],
+ optionalMetadata: Option[Any],
+ optionalBlockId: Option[StreamBlockId]
+ ) {
+ val blockId = optionalBlockId.getOrElse(nextBlockId)
+ val time = System.currentTimeMillis
+ blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]],
+ storageLevel, tellMaster = true)
+ logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms")
+ reportPushedBlock(blockId, arrayBuffer.size, optionalMetadata)
+ }
+
+ /** Store a iterator of received data as a data block into Spark's memory. */
+ def pushIterator(
+ iterator: Iterator[_],
+ optionalMetadata: Option[Any],
+ optionalBlockId: Option[StreamBlockId]
+ ) {
+ val blockId = optionalBlockId.getOrElse(nextBlockId)
+ val time = System.currentTimeMillis
+ blockManager.put(blockId, iterator, storageLevel, tellMaster = true)
+ logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms")
+ reportPushedBlock(blockId, -1, optionalMetadata)
+ }
+
+ /** Store the bytes of received data as a data block into Spark's memory. */
+ def pushBytes(
+ bytes: ByteBuffer,
+ optionalMetadata: Option[Any],
+ optionalBlockId: Option[StreamBlockId]
+ ) {
+ val blockId = optionalBlockId.getOrElse(nextBlockId)
+ val time = System.currentTimeMillis
+ blockManager.putBytes(blockId, bytes, storageLevel, tellMaster = true)
+ logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms")
+ reportPushedBlock(blockId, -1, optionalMetadata)
+ }
+
+ /** Report pushed block */
+ def reportPushedBlock(blockId: StreamBlockId, numRecords: Long, optionalMetadata: Option[Any]) {
+ val blockInfo = ReceivedBlockInfo(streamId, blockId, numRecords, optionalMetadata.orNull)
+ trackerActor ! AddBlock(blockInfo)
+ logDebug("Reported block " + blockId)
+ }
+
+ /** Report error to the receiver tracker */
+ def reportError(message: String, error: Throwable) {
+ val errorString = Option(error).map(Throwables.getStackTraceAsString).getOrElse("")
+ trackerActor ! ReportError(streamId, message, errorString)
+ logWarning("Reported error " + message + " - " + error)
+ }
+
+ override def onReceiverStart() {
+ blockGenerator.start()
+ super.onReceiverStart()
+ }
+
+ override def onReceiverStop(message: String, error: Option[Throwable]) {
+ super.onReceiverStop(message, error)
+ blockGenerator.stop()
+ logInfo("Deregistering receiver " + streamId)
+ val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("")
+ val future = trackerActor.ask(
+ DeregisterReceiver(streamId, message, errorString))(askTimeout)
+ Await.result(future, askTimeout)
+ logInfo("Stopped receiver " + streamId)
+ }
+
+ override def stop(message: String, error: Option[Throwable]) {
+ super.stop(message, error)
+ env.actorSystem.stop(actor)
+ }
+
+ /** Generate new block ID */
+ private def nextBlockId = StreamBlockId(streamId, newBlockId.getAndIncrement)
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index e564eccba2..374848358e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -38,6 +38,7 @@ private[streaming]
class JobGenerator(jobScheduler: JobScheduler) extends Logging {
private val ssc = jobScheduler.ssc
+ private val conf = ssc.conf
private val graph = ssc.graph
val clock = {
@@ -93,26 +94,31 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
if (processReceivedData) {
logInfo("Stopping JobGenerator gracefully")
val timeWhenStopStarted = System.currentTimeMillis()
- val stopTimeout = 10 * ssc.graph.batchDuration.milliseconds
+ val stopTimeout = conf.getLong(
+ "spark.streaming.gracefulStopTimeout",
+ 10 * ssc.graph.batchDuration.milliseconds
+ )
val pollTime = 100
// To prevent graceful stop to get stuck permanently
def hasTimedOut = {
val timedOut = System.currentTimeMillis() - timeWhenStopStarted > stopTimeout
- if (timedOut) logWarning("Timed out while stopping the job generator")
+ if (timedOut) {
+ logWarning("Timed out while stopping the job generator (timeout = " + stopTimeout + ")")
+ }
timedOut
}
// Wait until all the received blocks in the network input tracker has
// been consumed by network input DStreams, and jobs have been generated with them
logInfo("Waiting for all received blocks to be consumed for job generation")
- while(!hasTimedOut && jobScheduler.networkInputTracker.hasMoreReceivedBlockIds) {
+ while(!hasTimedOut && jobScheduler.receiverTracker.hasMoreReceivedBlockIds) {
Thread.sleep(pollTime)
}
logInfo("Waited for all received blocks to be consumed for job generation")
// Stop generating jobs
- val stopTime = timer.stop(false)
+ val stopTime = timer.stop(interruptTimer = false)
graph.stop()
logInfo("Stopped generation timer")
@@ -214,7 +220,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
SparkEnv.set(ssc.env)
Try(graph.generateJobs(time)) match {
case Success(jobs) =>
- val receivedBlockInfo = graph.getNetworkInputStreams.map { stream =>
+ val receivedBlockInfo = graph.getReceiverInputStreams.map { stream =>
val streamId = stream.id
val receivedBlockInfo = stream.getReceivedBlockInfo(time)
(streamId, receivedBlockInfo)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index d9ada99b47..1b034b9fb1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -46,7 +46,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
// These two are created only when scheduler starts.
// eventActor not being null means the scheduler has been started and not stopped
- var networkInputTracker: NetworkInputTracker = null
+ var receiverTracker: ReceiverTracker = null
private var eventActor: ActorRef = null
@@ -61,8 +61,8 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
}), "JobScheduler")
listenerBus.start()
- networkInputTracker = new NetworkInputTracker(ssc)
- networkInputTracker.start()
+ receiverTracker = new ReceiverTracker(ssc)
+ receiverTracker.start()
jobGenerator.start()
logInfo("Started JobScheduler")
}
@@ -72,7 +72,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
logDebug("Stopping JobScheduler")
// First, stop receiving
- networkInputTracker.stop()
+ receiverTracker.stop()
// Second, stop generating jobs. If it has to process all received data,
// then this will wait for all the processing through JobScheduler to be over.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 438e72a7ce..3d2537f6f2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -21,12 +21,11 @@ import scala.collection.mutable.{HashMap, SynchronizedMap, SynchronizedQueue}
import scala.language.existentials
import akka.actor._
-
import org.apache.spark.{Logging, SparkEnv, SparkException}
import org.apache.spark.SparkContext._
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.{StreamingContext, Time}
-import org.apache.spark.streaming.dstream.{NetworkReceiver, StopReceiver}
+import org.apache.spark.streaming.receiver.{Receiver, ReceiverSupervisorImpl, StopReceiver}
import org.apache.spark.util.AkkaUtils
/** Information about receiver */
@@ -34,7 +33,7 @@ case class ReceiverInfo(streamId: Int, typ: String, location: String) {
override def toString = s"$typ-$streamId"
}
-/** Information about blocks received by the network receiver */
+/** Information about blocks received by the receiver */
case class ReceivedBlockInfo(
streamId: Int,
blockId: StreamBlockId,
@@ -43,20 +42,21 @@ case class ReceivedBlockInfo(
)
/**
- * Messages used by the NetworkReceiver and the NetworkInputTracker to communicate
+ * Messages used by the NetworkReceiver and the ReceiverTracker to communicate
* with each other.
*/
-private[streaming] sealed trait NetworkInputTrackerMessage
+private[streaming] sealed trait ReceiverTrackerMessage
private[streaming] case class RegisterReceiver(
streamId: Int,
typ: String,
host: String,
receiverActor: ActorRef
- ) extends NetworkInputTrackerMessage
+ ) extends ReceiverTrackerMessage
private[streaming] case class AddBlock(receivedBlockInfo: ReceivedBlockInfo)
- extends NetworkInputTrackerMessage
-private[streaming] case class DeregisterReceiver(streamId: Int, msg: String)
- extends NetworkInputTrackerMessage
+ extends ReceiverTrackerMessage
+private[streaming] case class ReportError(streamId: Int, message: String, error: String)
+private[streaming] case class DeregisterReceiver(streamId: Int, msg: String, error: String)
+ extends ReceiverTrackerMessage
/**
* This class manages the execution of the receivers of NetworkInputDStreams. Instance of
@@ -64,11 +64,11 @@ private[streaming] case class DeregisterReceiver(streamId: Int, msg: String)
* has been called because it needs the final set of input streams at the time of instantiation.
*/
private[streaming]
-class NetworkInputTracker(ssc: StreamingContext) extends Logging {
+class ReceiverTracker(ssc: StreamingContext) extends Logging {
- val networkInputStreams = ssc.graph.getNetworkInputStreams()
- val networkInputStreamMap = Map(networkInputStreams.map(x => (x.id, x)): _*)
- val receiverExecutor = new ReceiverExecutor()
+ val receiverInputStreams = ssc.graph.getReceiverInputStreams()
+ val receiverInputStreamMap = Map(receiverInputStreams.map(x => (x.id, x)): _*)
+ val receiverExecutor = new ReceiverLauncher()
val receiverInfo = new HashMap[Int, ActorRef] with SynchronizedMap[Int, ActorRef]
val receivedBlockInfo = new HashMap[Int, SynchronizedQueue[ReceivedBlockInfo]]
with SynchronizedMap[Int, SynchronizedQueue[ReceivedBlockInfo]]
@@ -83,27 +83,27 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
/** Start the actor and receiver execution thread. */
def start() = synchronized {
if (actor != null) {
- throw new SparkException("NetworkInputTracker already started")
+ throw new SparkException("ReceiverTracker already started")
}
- if (!networkInputStreams.isEmpty) {
- actor = ssc.env.actorSystem.actorOf(Props(new NetworkInputTrackerActor),
- "NetworkInputTracker")
+ if (!receiverInputStreams.isEmpty) {
+ actor = ssc.env.actorSystem.actorOf(Props(new ReceiverTrackerActor),
+ "ReceiverTracker")
receiverExecutor.start()
- logInfo("NetworkInputTracker started")
+ logInfo("ReceiverTracker started")
}
}
/** Stop the receiver execution thread. */
def stop() = synchronized {
- if (!networkInputStreams.isEmpty && actor != null) {
+ if (!receiverInputStreams.isEmpty && actor != null) {
// First, stop the receivers
receiverExecutor.stop()
// Finally, stop the actor
ssc.env.actorSystem.stop(actor)
actor = null
- logInfo("NetworkInputTracker stopped")
+ logInfo("ReceiverTracker stopped")
}
}
@@ -126,20 +126,26 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
receiverActor: ActorRef,
sender: ActorRef
) {
- if (!networkInputStreamMap.contains(streamId)) {
+ if (!receiverInputStreamMap.contains(streamId)) {
throw new Exception("Register received for unexpected id " + streamId)
}
receiverInfo += ((streamId, receiverActor))
ssc.scheduler.listenerBus.post(StreamingListenerReceiverStarted(
ReceiverInfo(streamId, typ, host)
))
- logInfo("Registered receiver for network stream " + streamId + " from " + sender.path.address)
+ logInfo("Registered receiver for stream " + streamId + " from " + sender.path.address)
}
/** Deregister a receiver */
- def deregisterReceiver(streamId: Int, message: String) {
+ def deregisterReceiver(streamId: Int, message: String, error: String) {
receiverInfo -= streamId
- logError("Deregistered receiver for network stream " + streamId + " with message:\n" + message)
+ ssc.scheduler.listenerBus.post(StreamingListenerReceiverStopped(streamId, message, error))
+ val messageWithError = if (error != null && !error.isEmpty) {
+ s"$message - $error"
+ } else {
+ s"$message"
+ }
+ logError(s"Deregistered receiver for stream $streamId: $messageWithError")
}
/** Add new blocks for the given stream */
@@ -149,27 +155,40 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
receivedBlockInfo.blockId)
}
+ /** Report error sent by a receiver */
+ def reportError(streamId: Int, message: String, error: String) {
+ ssc.scheduler.listenerBus.post(StreamingListenerReceiverError(streamId, message, error))
+ val messageWithError = if (error != null && !error.isEmpty) {
+ s"$message - $error"
+ } else {
+ s"$message"
+ }
+ logWarning(s"Error reported by receiver for stream $streamId: $messageWithError")
+ }
+
/** Check if any blocks are left to be processed */
def hasMoreReceivedBlockIds: Boolean = {
!receivedBlockInfo.values.forall(_.isEmpty)
}
/** Actor to receive messages from the receivers. */
- private class NetworkInputTrackerActor extends Actor {
+ private class ReceiverTrackerActor extends Actor {
def receive = {
case RegisterReceiver(streamId, typ, host, receiverActor) =>
registerReceiver(streamId, typ, host, receiverActor, sender)
sender ! true
case AddBlock(receivedBlockInfo) =>
addBlocks(receivedBlockInfo)
- case DeregisterReceiver(streamId, message) =>
- deregisterReceiver(streamId, message)
+ case ReportError(streamId, message, error) =>
+ reportError(streamId, message, error)
+ case DeregisterReceiver(streamId, message, error) =>
+ deregisterReceiver(streamId, message, error)
sender ! true
}
}
/** This thread class runs all the receivers on the cluster. */
- class ReceiverExecutor {
+ class ReceiverLauncher {
@transient val env = ssc.env
@transient val thread = new Thread() {
override def run() {
@@ -177,7 +196,7 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
SparkEnv.set(env)
startReceivers()
} catch {
- case ie: InterruptedException => logInfo("ReceiverExecutor interrupted")
+ case ie: InterruptedException => logInfo("ReceiverLauncher interrupted")
}
}
}
@@ -203,37 +222,39 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
}
/**
- * Get the receivers from the NetworkInputDStreams, distributes them to the
+ * Get the receivers from the ReceiverInputDStreams, distributes them to the
* worker nodes as a parallel collection, and runs them.
*/
private def startReceivers() {
- val receivers = networkInputStreams.map(nis => {
+ val receivers = receiverInputStreams.map(nis => {
val rcvr = nis.getReceiver()
- rcvr.setStreamId(nis.id)
+ rcvr.setReceiverId(nis.id)
rcvr
})
// Right now, we only honor preferences if all receivers have them
- val hasLocationPreferences = receivers.map(_.getLocationPreference().isDefined)
- .reduce(_ && _)
+ val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _)
// Create the parallel collection of receivers to distributed them on the worker nodes
val tempRDD =
if (hasLocationPreferences) {
- val receiversWithPreferences =
- receivers.map(r => (r, Seq(r.getLocationPreference().toString)))
- ssc.sc.makeRDD[NetworkReceiver[_]](receiversWithPreferences)
+ val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
+ ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
}
else {
ssc.sc.makeRDD(receivers, receivers.size)
}
// Function to start the receiver on the worker node
- val startReceiver = (iterator: Iterator[NetworkReceiver[_]]) => {
+ val startReceiver = (iterator: Iterator[Receiver[_]]) => {
if (!iterator.hasNext) {
- throw new Exception("Could not start receiver as details not found.")
+ throw new SparkException(
+ "Could not start receiver as object not found.")
}
- iterator.next().start()
+ val receiver = iterator.next()
+ val executor = new ReceiverSupervisorImpl(receiver, SparkEnv.get)
+ executor.start()
+ executor.awaitTermination()
}
// Run the dummy Spark job to ensure that all slaves have registered.
// This avoids all the receivers to be scheduled on the same node.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
index 5db40ebbeb..9d6ec1fa33 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
@@ -18,6 +18,7 @@
package org.apache.spark.streaming.scheduler
import scala.collection.mutable.Queue
+
import org.apache.spark.util.Distribution
/** Base trait for events related to StreamingListener */
@@ -26,8 +27,13 @@ sealed trait StreamingListenerEvent
case class StreamingListenerBatchSubmitted(batchInfo: BatchInfo) extends StreamingListenerEvent
case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent
case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent
+
case class StreamingListenerReceiverStarted(receiverInfo: ReceiverInfo)
extends StreamingListenerEvent
+case class StreamingListenerReceiverError(streamId: Int, message: String, error: String)
+ extends StreamingListenerEvent
+case class StreamingListenerReceiverStopped(streamId: Int, message: String, error: String)
+ extends StreamingListenerEvent
/** An event used in the listener to shutdown the listener daemon thread. */
private[scheduler] case object StreamingListenerShutdown extends StreamingListenerEvent
@@ -41,14 +47,20 @@ trait StreamingListener {
/** Called when a receiver has been started */
def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { }
+ /** Called when a receiver has reported an error */
+ def onReceiverError(receiverError: StreamingListenerReceiverError) { }
+
+ /** Called when a receiver has been stopped */
+ def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { }
+
/** Called when a batch of jobs has been submitted for processing. */
def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { }
- /** Called when processing of a batch of jobs has completed. */
- def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }
-
/** Called when processing of a batch of jobs has started. */
def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }
+
+ /** Called when processing of a batch of jobs has completed. */
+ def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
index ea03dfc7bf..398724d9e8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
@@ -40,6 +40,10 @@ private[spark] class StreamingListenerBus() extends Logging {
event match {
case receiverStarted: StreamingListenerReceiverStarted =>
listeners.foreach(_.onReceiverStarted(receiverStarted))
+ case receiverError: StreamingListenerReceiverError =>
+ listeners.foreach(_.onReceiverError(receiverError))
+ case receiverStopped: StreamingListenerReceiverStopped =>
+ listeners.foreach(_.onReceiverStopped(receiverStopped))
case batchSubmitted: StreamingListenerBatchSubmitted =>
listeners.foreach(_.onBatchSubmitted(batchSubmitted))
case batchStarted: StreamingListenerBatchStarted =>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index 8b025b09ed..bf637c1446 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -62,8 +62,8 @@ private[ui] class StreamingJobProgressListener(ssc: StreamingContext) extends St
totalCompletedBatches += 1L
}
- def numNetworkReceivers = synchronized {
- ssc.graph.getNetworkInputStreams().size
+ def numReceivers = synchronized {
+ ssc.graph.getReceiverInputStreams().size
}
def numTotalCompletedBatches: Long = synchronized {
@@ -101,7 +101,7 @@ private[ui] class StreamingJobProgressListener(ssc: StreamingContext) extends St
def receivedRecordsDistributions: Map[Int, Option[Distribution]] = synchronized {
val latestBatchInfos = retainedBatches.reverse.take(batchInfoLimit)
val latestBlockInfos = latestBatchInfos.map(_.receivedBlockInfo)
- (0 until numNetworkReceivers).map { receiverId =>
+ (0 until numReceivers).map { receiverId =>
val blockInfoOfParticularReceiver = latestBlockInfos.map { batchInfo =>
batchInfo.get(receiverId).getOrElse(Array.empty)
}
@@ -117,11 +117,11 @@ private[ui] class StreamingJobProgressListener(ssc: StreamingContext) extends St
def lastReceivedBatchRecords: Map[Int, Long] = {
val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.receivedBlockInfo)
lastReceivedBlockInfoOption.map { lastReceivedBlockInfo =>
- (0 until numNetworkReceivers).map { receiverId =>
+ (0 until numReceivers).map { receiverId =>
(receiverId, lastReceivedBlockInfo(receiverId).map(_.numRecords).sum)
}.toMap
}.getOrElse {
- (0 until numNetworkReceivers).map(receiverId => (receiverId, 0L)).toMap
+ (0 until numReceivers).map(receiverId => (receiverId, 0L)).toMap
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index 6607437db5..8fe1219356 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -40,7 +40,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
val content =
generateBasicStats() ++ <br></br> ++
<h4>Statistics over last {listener.retainedCompletedBatches.size} processed batches</h4> ++
- generateNetworkStatsTable() ++
+ generateReceiverStats() ++
generateBatchStatsTable()
UIUtils.headerSparkPage(
content, parent.basePath, parent.appName, "Streaming", parent.headerTabs, parent, Some(5000))
@@ -57,7 +57,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
<strong>Time since start: </strong>{formatDurationVerbose(timeSinceStart)}
</li>
<li>
- <strong>Network receivers: </strong>{listener.numNetworkReceivers}
+ <strong>Network receivers: </strong>{listener.numReceivers}
</li>
<li>
<strong>Batch interval: </strong>{formatDurationVerbose(listener.batchDuration)}
@@ -71,8 +71,8 @@ private[ui] class StreamingPage(parent: StreamingTab)
</ul>
}
- /** Generate stats of data received over the network the streaming program */
- private def generateNetworkStatsTable(): Seq[Node] = {
+ /** Generate stats of data received by the receivers in the streaming program */
+ private def generateReceiverStats(): Seq[Node] = {
val receivedRecordDistributions = listener.receivedRecordsDistributions
val lastBatchReceivedRecord = listener.lastReceivedBatchRecords
val table = if (receivedRecordDistributions.size > 0) {
@@ -86,13 +86,13 @@ private[ui] class StreamingPage(parent: StreamingTab)
"75th percentile rate\n[records/sec]",
"Maximum rate\n[records/sec]"
)
- val dataRows = (0 until listener.numNetworkReceivers).map { receiverId =>
+ val dataRows = (0 until listener.numReceivers).map { receiverId =>
val receiverInfo = listener.receiverInfo(receiverId)
val receiverName = receiverInfo.map(_.toString).getOrElse(s"Receiver-$receiverId")
val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCell)
- val receiverLastBatchRecords = formatDurationVerbose(lastBatchReceivedRecord(receiverId))
+ val receiverLastBatchRecords = formatNumber(lastBatchReceivedRecord(receiverId))
val receivedRecordStats = receivedRecordDistributions(receiverId).map { d =>
- d.getQuantiles().map(r => formatDurationVerbose(r.toLong))
+ d.getQuantiles().map(r => formatNumber(r.toLong))
}.getOrElse {
Seq(emptyCell, emptyCell, emptyCell, emptyCell, emptyCell)
}
@@ -104,8 +104,8 @@ private[ui] class StreamingPage(parent: StreamingTab)
}
val content =
- <h5>Network Input Statistics</h5> ++
- <div>{table.getOrElse("No network receivers")}</div>
+ <h5>Receiver Statistics</h5> ++
+ <div>{table.getOrElse("No receivers")}</div>
content
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
index e016377c94..1a616a0434 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
@@ -77,7 +77,9 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name:
def stop(interruptTimer: Boolean): Long = synchronized {
if (!stopped) {
stopped = true
- if (interruptTimer) thread.interrupt()
+ if (interruptTimer) {
+ thread.interrupt()
+ }
thread.join()
logInfo("Stopped timer for " + name + " after time " + prevTime)
}
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index a0b1bbc34f..f9bfb9b744 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -17,6 +17,7 @@
package org.apache.spark.streaming;
+import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
import org.junit.Assert;
@@ -36,10 +37,6 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaDStreamLike;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
// The test suite itself is Serializable so that anonymous Function implementations can be
// serialized, as an alternative to converting these anonymous classes to static inner classes;
@@ -1668,7 +1665,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
// InputStream functionality is deferred to the existing Scala tests.
@Test
public void testSocketTextStream() {
- JavaDStream<String> test = ssc.socketTextStream("localhost", 12345);
+ JavaReceiverInputDStream<String> test = ssc.socketTextStream("localhost", 12345);
}
@Test
@@ -1701,6 +1698,6 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
@Test
public void testRawSocketStream() {
- JavaDStream<String> test = ssc.rawSocketStream("localhost", 12345);
+ JavaReceiverInputDStream<String> test = ssc.rawSocketStream("localhost", 12345);
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 952511d411..46b7f63b65 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -36,10 +36,9 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.dstream.NetworkReceiver
-import org.apache.spark.streaming.receivers.Receiver
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.util.Utils
+import org.apache.spark.streaming.receiver.{ActorHelper, Receiver}
class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
@@ -207,7 +206,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// set up the network stream using the test receiver
val ssc = new StreamingContext(conf, batchDuration)
- val networkStream = ssc.networkStream[Int](testReceiver)
+ val networkStream = ssc.receiverStream[Int](testReceiver)
val countStream = networkStream.count
val outputBuffer = new ArrayBuffer[Seq[Long]] with SynchronizedBuffer[Seq[Long]]
val outputStream = new TestOutputStream(countStream, outputBuffer)
@@ -301,7 +300,7 @@ object TestServer {
}
/** This is an actor for testing actor input stream */
-class TestActor(port: Int) extends Actor with Receiver {
+class TestActor(port: Int) extends Actor with ActorHelper {
def bytesToString(byteString: ByteString) = byteString.utf8String
@@ -309,24 +308,22 @@ class TestActor(port: Int) extends Actor with Receiver {
def receive = {
case IO.Read(socket, bytes) =>
- pushBlock(bytesToString(bytes))
+ store(bytesToString(bytes))
}
}
/** This is a receiver to test multiple threads inserting data using block generator */
class MultiThreadTestReceiver(numThreads: Int, numRecordsPerThread: Int)
- extends NetworkReceiver[Int] {
+ extends Receiver[Int](StorageLevel.MEMORY_ONLY_SER) with Logging {
lazy val executorPool = Executors.newFixedThreadPool(numThreads)
- lazy val blockGenerator = new BlockGenerator(StorageLevel.MEMORY_ONLY)
lazy val finishCount = new AtomicInteger(0)
- protected def onStart() {
- blockGenerator.start()
+ def onStart() {
(1 to numThreads).map(threadId => {
val runnable = new Runnable {
def run() {
(1 to numRecordsPerThread).foreach(i =>
- blockGenerator += (threadId * numRecordsPerThread + i) )
+ store(threadId * numRecordsPerThread + i) )
if (finishCount.incrementAndGet == numThreads) {
MultiThreadTestReceiver.haveAllThreadsFinished = true
}
@@ -337,7 +334,7 @@ class MultiThreadTestReceiver(numThreads: Int, numRecordsPerThread: Int)
})
}
- protected def onStop() {
+ def onStop() {
executorPool.shutdown()
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
new file mode 100644
index 0000000000..5c0415ad14
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.SparkConf
+import org.apache.spark.storage.{StorageLevel, StreamBlockId}
+import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver, ReceiverSupervisor}
+import org.scalatest.FunSuite
+import org.scalatest.concurrent.Timeouts
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
+
+/** Testsuite for testing the network receiver behavior */
+class NetworkReceiverSuite extends FunSuite with Timeouts {
+
+ test("network receiver life cycle") {
+
+ val receiver = new FakeReceiver
+ val executor = new FakeReceiverSupervisor(receiver)
+
+ assert(executor.isAllEmpty)
+
+ // Thread that runs the executor
+ val executingThread = new Thread() {
+ override def run() {
+ executor.start()
+ executor.awaitTermination()
+ }
+ }
+
+ // Start the receiver
+ executingThread.start()
+
+ // Verify that the receiver
+ intercept[Exception] {
+ failAfter(200 millis) {
+ executingThread.join()
+ }
+ }
+
+ // Verify that receiver was started
+ assert(receiver.onStartCalled)
+ assert(executor.isReceiverStarted)
+ assert(receiver.isStarted)
+ assert(!receiver.isStopped())
+ assert(receiver.otherThread.isAlive)
+ eventually(timeout(100 millis), interval(10 millis)) {
+ assert(receiver.receiving)
+ }
+
+ // Verify whether the data stored by the receiver was sent to the executor
+ val byteBuffer = ByteBuffer.allocate(100)
+ val arrayBuffer = new ArrayBuffer[Int]()
+ val iterator = arrayBuffer.iterator
+ receiver.store(1)
+ receiver.store(byteBuffer)
+ receiver.store(arrayBuffer)
+ receiver.store(iterator)
+ assert(executor.singles.size === 1)
+ assert(executor.singles.head === 1)
+ assert(executor.byteBuffers.size === 1)
+ assert(executor.byteBuffers.head.eq(byteBuffer))
+ assert(executor.iterators.size === 1)
+ assert(executor.iterators.head.eq(iterator))
+ assert(executor.arrayBuffers.size === 1)
+ assert(executor.arrayBuffers.head.eq(arrayBuffer))
+
+ // Verify whether the exceptions reported by the receiver was sent to the executor
+ val exception = new Exception
+ receiver.reportError("Error", exception)
+ assert(executor.errors.size === 1)
+ assert(executor.errors.head.eq(exception))
+
+ // Verify restarting actually stops and starts the receiver
+ receiver.restart("restarting", null, 100)
+ assert(receiver.isStopped)
+ assert(receiver.onStopCalled)
+ eventually(timeout(1000 millis), interval(100 millis)) {
+ assert(receiver.onStartCalled)
+ assert(executor.isReceiverStarted)
+ assert(receiver.isStarted)
+ assert(!receiver.isStopped)
+ assert(receiver.receiving)
+ }
+
+ // Verify that stopping actually stops the thread
+ failAfter(100 millis) {
+ receiver.stop("test")
+ assert(receiver.isStopped)
+ assert(!receiver.otherThread.isAlive)
+
+ // The thread that started the executor should complete
+ // as stop() stops everything
+ executingThread.join()
+ }
+ }
+
+ test("block generator") {
+ val blockGeneratorListener = new FakeBlockGeneratorListener
+ val blockInterval = 200
+ val conf = new SparkConf().set("spark.streaming.blockInterval", blockInterval.toString)
+ val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf)
+ val expectedBlocks = 5
+ val waitTime = expectedBlocks * blockInterval + (blockInterval / 2)
+ val generatedData = new ArrayBuffer[Int]
+
+ // Generate blocks
+ val startTime = System.currentTimeMillis()
+ blockGenerator.start()
+ var count = 0
+ while(System.currentTimeMillis - startTime < waitTime) {
+ blockGenerator += count
+ generatedData += count
+ count += 1
+ Thread.sleep(10)
+ }
+ blockGenerator.stop()
+
+ val recordedData = blockGeneratorListener.arrayBuffers.flatten
+ assert(blockGeneratorListener.arrayBuffers.size > 0)
+ assert(recordedData.toSet === generatedData.toSet)
+ }
+
+ /**
+ * An implementation of NetworkReceiver that is used for testing a receiver's life cycle.
+ */
+ class FakeReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) {
+ var otherThread: Thread = null
+ var receiving = false
+ var onStartCalled = false
+ var onStopCalled = false
+
+ def onStart() {
+ otherThread = new Thread() {
+ override def run() {
+ receiving = true
+ while(!isStopped()) {
+ Thread.sleep(10)
+ }
+ }
+ }
+ onStartCalled = true
+ otherThread.start()
+
+ }
+
+ def onStop() {
+ onStopCalled = true
+ otherThread.join()
+ }
+
+ def reset() {
+ receiving = false
+ onStartCalled = false
+ onStopCalled = false
+ }
+ }
+
+ /**
+ * An implementation of NetworkReceiverExecutor used for testing a NetworkReceiver.
+ * Instead of storing the data in the BlockManager, it stores all the data in a local buffer
+ * that can used for verifying that the data has been forwarded correctly.
+ */
+ class FakeReceiverSupervisor(receiver: FakeReceiver)
+ extends ReceiverSupervisor(receiver, new SparkConf()) {
+ val singles = new ArrayBuffer[Any]
+ val byteBuffers = new ArrayBuffer[ByteBuffer]
+ val iterators = new ArrayBuffer[Iterator[_]]
+ val arrayBuffers = new ArrayBuffer[ArrayBuffer[_]]
+ val errors = new ArrayBuffer[Throwable]
+
+ /** Check if all data structures are clean */
+ def isAllEmpty = {
+ singles.isEmpty && byteBuffers.isEmpty && iterators.isEmpty &&
+ arrayBuffers.isEmpty && errors.isEmpty
+ }
+
+ def pushSingle(data: Any) {
+ singles += data
+ }
+
+ def pushBytes(
+ bytes: ByteBuffer,
+ optionalMetadata: Option[Any],
+ optionalBlockId: Option[StreamBlockId]
+ ) {
+ byteBuffers += bytes
+ }
+
+ def pushIterator(
+ iterator: Iterator[_],
+ optionalMetadata: Option[Any],
+ optionalBlockId: Option[StreamBlockId]
+ ) {
+ iterators += iterator
+ }
+
+ def pushArrayBuffer(
+ arrayBuffer: ArrayBuffer[_],
+ optionalMetadata: Option[Any],
+ optionalBlockId: Option[StreamBlockId]
+ ) {
+ arrayBuffers += arrayBuffer
+ }
+
+ def reportError(message: String, throwable: Throwable) {
+ errors += throwable
+ }
+ }
+
+ /**
+ * An implementation of BlockGeneratorListener that is used to test the BlockGenerator.
+ */
+ class FakeBlockGeneratorListener(pushDelay: Long = 0) extends BlockGeneratorListener {
+ // buffer of data received as ArrayBuffers
+ val arrayBuffers = new ArrayBuffer[ArrayBuffer[Int]]
+ val errors = new ArrayBuffer[Throwable]
+
+ def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
+ val bufferOfInts = arrayBuffer.map(_.asInstanceOf[Int])
+ arrayBuffers += bufferOfInts
+ Thread.sleep(0)
+ }
+
+ def onError(message: String, throwable: Throwable) {
+ errors += throwable
+ }
+ }
+}
+
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index ad5367ab94..6d14b1f785 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -21,7 +21,8 @@ import java.util.concurrent.atomic.AtomicInteger
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException}
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.dstream.{DStream, NetworkReceiver}
+import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.util.{MetadataCleaner, Utils}
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.concurrent.Timeouts
@@ -181,15 +182,15 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
conf.set("spark.cleaner.ttl", "3600")
sc = new SparkContext(conf)
for (i <- 1 to 4) {
- logInfo("==================================")
- ssc = new StreamingContext(sc, batchDuration)
+ logInfo("==================================\n\n\n")
+ ssc = new StreamingContext(sc, Milliseconds(100))
var runningCount = 0
TestReceiver.counter.set(1)
val input = ssc.networkStream(new TestReceiver)
input.count.foreachRDD(rdd => {
val count = rdd.first()
- logInfo("Count = " + count)
runningCount += count.toInt
+ logInfo("Count = " + count + ", Running count = " + runningCount)
})
ssc.start()
ssc.awaitTermination(500)
@@ -216,12 +217,12 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
ssc.start()
}
- // test whether waitForStop() exits after give amount of time
+ // test whether awaitTermination() exits after give amount of time
failAfter(1000 millis) {
ssc.awaitTermination(500)
}
- // test whether waitForStop() does not exit if not time is given
+ // test whether awaitTermination() does not exit if not time is given
val exception = intercept[Exception] {
failAfter(1000 millis) {
ssc.awaitTermination()
@@ -276,23 +277,26 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
class TestException(msg: String) extends Exception(msg)
/** Custom receiver for testing whether all data received by a receiver gets processed or not */
-class TestReceiver extends NetworkReceiver[Int] {
- protected lazy val blockGenerator = new BlockGenerator(StorageLevel.MEMORY_ONLY)
- protected def onStart() {
- blockGenerator.start()
- logInfo("BlockGenerator started on thread " + receivingThread)
- try {
- while(true) {
- blockGenerator += TestReceiver.counter.getAndIncrement
- Thread.sleep(0)
+class TestReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging {
+
+ var receivingThreadOption: Option[Thread] = None
+
+ def onStart() {
+ val thread = new Thread() {
+ override def run() {
+ logInfo("Receiving started")
+ while (!isStopped) {
+ store(TestReceiver.counter.getAndIncrement)
+ }
+ logInfo("Receiving stopped at count value of " + TestReceiver.counter.get())
}
- } finally {
- logInfo("Receiving stopped at count value of " + TestReceiver.counter.get())
}
+ receivingThreadOption = Some(thread)
+ thread.start()
}
- protected def onStop() {
- blockGenerator.stop()
+ def onStop() {
+ // no cleanup to be done, the receiving thread should stop on it own
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 9e0f2c900e..542c697ae3 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -17,10 +17,19 @@
package org.apache.spark.streaming
-import org.apache.spark.streaming.scheduler._
import scala.collection.mutable.ArrayBuffer
-import org.scalatest.matchers.ShouldMatchers
+import scala.concurrent.Future
+import scala.concurrent.ExecutionContext.Implicits.global
+
+import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.streaming.scheduler._
+
+import org.scalatest.matchers.ShouldMatchers
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
+import org.apache.spark.Logging
class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers {
@@ -32,7 +41,7 @@ class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers {
override def batchDuration = Milliseconds(100)
override def actuallyWait = true
- test("basic BatchInfo generation") {
+ test("batch info reporting") {
val ssc = setupStreams(input, operation)
val collector = new BatchInfoCollector
ssc.addStreamingListener(collector)
@@ -54,6 +63,31 @@ class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers {
isInIncreasingOrder(batchInfos.map(_.processingEndTime.get)) should be (true)
}
+ test("receiver info reporting") {
+ val ssc = new StreamingContext("local[2]", "test", Milliseconds(1000))
+ val inputStream = ssc.networkStream(new StreamingListenerSuiteReceiver)
+ inputStream.foreachRDD(_.count)
+
+ val collector = new ReceiverInfoCollector
+ ssc.addStreamingListener(collector)
+
+ ssc.start()
+ try {
+ eventually(timeout(1000 millis), interval(20 millis)) {
+ collector.startedReceiverInfo should have size 1
+ collector.startedReceiverInfo(0).streamId should equal (0)
+ collector.stoppedReceiverStreamIds should have size 1
+ collector.stoppedReceiverStreamIds(0) should equal (0)
+ collector.receiverErrors should have size 1
+ collector.receiverErrors(0)._1 should equal (0)
+ collector.receiverErrors(0)._2 should include ("report error")
+ collector.receiverErrors(0)._3 should include ("report exception")
+ }
+ } finally {
+ ssc.stop()
+ }
+ }
+
/** Check if a sequence of numbers is in increasing order */
def isInIncreasingOrder(seq: Seq[Long]): Boolean = {
for(i <- 1 until seq.size) {
@@ -61,12 +95,46 @@ class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers {
}
true
}
+}
+
+/** Listener that collects information on processed batches */
+class BatchInfoCollector extends StreamingListener {
+ val batchInfos = new ArrayBuffer[BatchInfo]
+ override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
+ batchInfos += batchCompleted.batchInfo
+ }
+}
+
+/** Listener that collects information on processed batches */
+class ReceiverInfoCollector extends StreamingListener {
+ val startedReceiverInfo = new ArrayBuffer[ReceiverInfo]
+ val stoppedReceiverStreamIds = new ArrayBuffer[Int]()
+ val receiverErrors = new ArrayBuffer[(Int, String, String)]()
+
+ override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) {
+ startedReceiverInfo += receiverStarted.receiverInfo
+ }
+
+ override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) {
+ stoppedReceiverStreamIds += receiverStopped.streamId
+ }
+
+ override def onReceiverError(receiverError: StreamingListenerReceiverError) {
+ receiverErrors += ((receiverError.streamId, receiverError.message, receiverError.error))
+ }
+}
- /** Listener that collects information on processed batches */
- class BatchInfoCollector extends StreamingListener {
- val batchInfos = new ArrayBuffer[BatchInfo]
- override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
- batchInfos += batchCompleted.batchInfo
+class StreamingListenerSuiteReceiver extends Receiver[Any](StorageLevel.MEMORY_ONLY) with Logging {
+ def onStart() {
+ Future {
+ logInfo("Started receiver and sleeping")
+ Thread.sleep(10)
+ logInfo("Reporting error and sleeping")
+ reportError("test report error", new Exception("test report exception"))
+ Thread.sleep(10)
+ logInfo("Stopping")
+ stop("test stop error")
}
}
+ def onStop() { }
}