From 04c37b6f749dc2418cc28c89964cdc687dfcbd51 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 21 Apr 2014 19:04:49 -0700 Subject: [SPARK-1332] Improve Spark Streaming's Network Receiver and InputDStream API [WIP] The current Network Receiver API makes it slightly complicated to right a new receiver as one needs to create an instance of BlockGenerator as shown in SocketReceiver https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala#L51 Exposing the BlockGenerator interface has made it harder to improve the receiving process. The API of NetworkReceiver (which was not a very stable API anyways) needs to be change if we are to ensure future stability. Additionally, the functions like streamingContext.socketStream that create input streams, return DStream objects. That makes it hard to expose functionality (say, rate limits) unique to input dstreams. They should return InputDStream or NetworkInputDStream. This is still not yet implemented. This PR is blocked on the graceful shutdown PR #247 Author: Tathagata Das Closes #300 from tdas/network-receiver-api and squashes the following commits: ea27b38 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into network-receiver-api 3a4777c [Tathagata Das] Renamed NetworkInputDStream to ReceiverInputDStream, and ActorReceiver related stuff. 838dd39 [Tathagata Das] Added more events to the StreamingListener to report errors and stopped receivers. a75c7a6 [Tathagata Das] Address some PR comments and fixed other issues. 91bfa72 [Tathagata Das] Fixed bugs. 8533094 [Tathagata Das] Scala style fixes. 028bde6 [Tathagata Das] Further refactored receiver to allow restarting of a receiver. 43f5290 [Tathagata Das] Made functions that create input streams return InputDStream and NetworkInputDStream, for both Scala and Java. 2c94579 [Tathagata Das] Fixed graceful shutdown by removing interrupts on receiving thread. 9e37a0b [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into network-receiver-api 3223e95 [Tathagata Das] Refactored the code that runs the NetworkReceiver into further classes and traits to make them more testable. a36cc48 [Tathagata Das] Refactored the NetworkReceiver API for future stability. --- .../org/apache/spark/streaming/DStreamGraph.scala | 8 +- .../apache/spark/streaming/StreamingContext.scala | 44 ++- .../spark/streaming/api/java/JavaDStream.scala | 4 + .../streaming/api/java/JavaInputDStream.scala | 40 +++ .../streaming/api/java/JavaPairInputDStream.scala | 41 +++ .../api/java/JavaPairReceiverInputDStream.scala | 42 +++ .../api/java/JavaReceiverInputDStream.scala | 41 +++ .../streaming/api/java/JavaStreamingContext.scala | 56 +++- .../spark/streaming/dstream/InputDStream.scala | 2 +- .../streaming/dstream/NetworkInputDStream.scala | 362 --------------------- .../streaming/dstream/PluggableInputDStream.scala | 5 +- .../spark/streaming/dstream/RawInputDStream.scala | 16 +- .../streaming/dstream/ReceiverInputDStream.scala | 94 ++++++ .../streaming/dstream/SocketInputDStream.scala | 62 +++- .../spark/streaming/receiver/ActorReceiver.scala | 191 +++++++++++ .../spark/streaming/receiver/BlockGenerator.scala | 142 ++++++++ .../apache/spark/streaming/receiver/Receiver.scala | 236 ++++++++++++++ .../spark/streaming/receiver/ReceiverMessage.scala | 23 ++ .../streaming/receiver/ReceiverSupervisor.scala | 180 ++++++++++ .../receiver/ReceiverSupervisorImpl.scala | 180 ++++++++++ .../spark/streaming/receivers/ActorReceiver.scala | 184 ----------- .../spark/streaming/scheduler/JobGenerator.scala | 16 +- .../spark/streaming/scheduler/JobScheduler.scala | 8 +- .../streaming/scheduler/NetworkInputTracker.scala | 257 --------------- .../streaming/scheduler/ReceiverTracker.scala | 278 ++++++++++++++++ .../streaming/scheduler/StreamingListener.scala | 18 +- .../streaming/scheduler/StreamingListenerBus.scala | 4 + .../ui/StreamingJobProgressListener.scala | 10 +- .../apache/spark/streaming/ui/StreamingPage.scala | 18 +- .../spark/streaming/util/RecurringTimer.scala | 4 +- .../org/apache/spark/streaming/JavaAPISuite.java | 9 +- .../apache/spark/streaming/InputStreamsSuite.scala | 19 +- .../spark/streaming/NetworkReceiverSuite.scala | 249 ++++++++++++++ .../spark/streaming/StreamingContextSuite.scala | 42 +-- .../spark/streaming/StreamingListenerSuite.scala | 84 ++++- 35 files changed, 2027 insertions(+), 942 deletions(-) create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaInputDStream.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairInputDStream.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairReceiverInputDStream.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaReceiverInputDStream.scala delete mode 100644 streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala delete mode 100644 streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala delete mode 100644 streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala create mode 100644 streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala (limited to 'streaming') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index d3339063cc..b4adf0e965 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -21,7 +21,7 @@ import scala.collection.mutable.ArrayBuffer import java.io.{ObjectInputStream, IOException, ObjectOutputStream} import org.apache.spark.Logging import org.apache.spark.streaming.scheduler.Job -import org.apache.spark.streaming.dstream.{DStream, NetworkInputDStream, InputDStream} +import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream, InputDStream} final private[streaming] class DStreamGraph extends Serializable with Logging { @@ -103,9 +103,9 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { def getOutputStreams() = this.synchronized { outputStreams.toArray } - def getNetworkInputStreams() = this.synchronized { - inputStreams.filter(_.isInstanceOf[NetworkInputDStream[_]]) - .map(_.asInstanceOf[NetworkInputDStream[_]]) + def getReceiverInputStreams() = this.synchronized { + inputStreams.filter(_.isInstanceOf[ReceiverInputDStream[_]]) + .map(_.asInstanceOf[ReceiverInputDStream[_]]) .toArray } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index e9a4f7ba22..daa5c69bba 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -31,12 +31,11 @@ import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{LongWritable, Text} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat - import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream._ -import org.apache.spark.streaming.receivers._ +import org.apache.spark.streaming.receiver.{ActorSupervisorStrategy, ActorReceiver, Receiver} import org.apache.spark.streaming.scheduler._ import org.apache.spark.streaming.ui.StreamingTab import org.apache.spark.util.MetadataCleaner @@ -139,7 +138,7 @@ class StreamingContext private[streaming] ( } } - private val nextNetworkInputStreamId = new AtomicInteger(0) + private val nextReceiverInputStreamId = new AtomicInteger(0) private[streaming] var checkpointDir: String = { if (isCheckpointPresent) { @@ -208,15 +207,26 @@ class StreamingContext private[streaming] ( if (isCheckpointPresent) cp_ else null } - private[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement() + private[streaming] def getNewReceiverStreamId() = nextReceiverInputStreamId.getAndIncrement() /** - * Create an input stream with any arbitrary user implemented network receiver. + * Create an input stream with any arbitrary user implemented receiver. * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html - * @param receiver Custom implementation of NetworkReceiver + * @param receiver Custom implementation of Receiver */ + @deprecated("Use receiverStream", "1.0.0") def networkStream[T: ClassTag]( - receiver: NetworkReceiver[T]): DStream[T] = { + receiver: Receiver[T]): ReceiverInputDStream[T] = { + receiverStream(receiver) + } + + /** + * Create an input stream with any arbitrary user implemented receiver. + * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html + * @param receiver Custom implementation of Receiver + */ + def receiverStream[T: ClassTag]( + receiver: Receiver[T]): ReceiverInputDStream[T] = { new PluggableInputDStream[T](this, receiver) } @@ -236,9 +246,9 @@ class StreamingContext private[streaming] ( props: Props, name: String, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, - supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy - ): DStream[T] = { - networkStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy)) + supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy + ): ReceiverInputDStream[T] = { + receiverStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy)) } /** @@ -254,7 +264,7 @@ class StreamingContext private[streaming] ( hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): DStream[String] = { + ): ReceiverInputDStream[String] = { socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel) } @@ -273,7 +283,7 @@ class StreamingContext private[streaming] ( port: Int, converter: (InputStream) => Iterator[T], storageLevel: StorageLevel - ): DStream[T] = { + ): ReceiverInputDStream[T] = { new SocketInputDStream[T](this, hostname, port, converter, storageLevel) } @@ -292,7 +302,7 @@ class StreamingContext private[streaming] ( hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): DStream[T] = { + ): ReceiverInputDStream[T] = { new RawInputDStream[T](this, hostname, port, storageLevel) } @@ -310,7 +320,7 @@ class StreamingContext private[streaming] ( K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag - ] (directory: String): DStream[(K, V)] = { + ] (directory: String): InputDStream[(K, V)] = { new FileInputDStream[K, V, F](this, directory) } @@ -330,7 +340,7 @@ class StreamingContext private[streaming] ( K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag - ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): DStream[(K, V)] = { + ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): InputDStream[(K, V)] = { new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly) } @@ -356,7 +366,7 @@ class StreamingContext private[streaming] ( def queueStream[T: ClassTag]( queue: Queue[RDD[T]], oneAtATime: Boolean = true - ): DStream[T] = { + ): InputDStream[T] = { queueStream(queue, oneAtATime, sc.makeRDD(Seq[T](), 1)) } @@ -373,7 +383,7 @@ class StreamingContext private[streaming] ( queue: Queue[RDD[T]], oneAtATime: Boolean, defaultRDD: RDD[T] - ): DStream[T] = { + ): InputDStream[T] = { new QueueInputDStream(this, queue, oneAtATime, defaultRDD) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala index 13e2bacc92..505e4431e4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala @@ -97,6 +97,10 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classTag: ClassTag[T] } object JavaDStream { + /** + * Convert a scala [[org.apache.spark.streaming.dstream.DStream]] to a Java-friendly + * [[org.apache.spark.streaming.api.java.JavaDStream]]. + */ implicit def fromDStream[T: ClassTag](dstream: DStream[T]): JavaDStream[T] = new JavaDStream[T](dstream) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaInputDStream.scala new file mode 100644 index 0000000000..91f8d342d2 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaInputDStream.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.api.java + +import scala.reflect.ClassTag + +import org.apache.spark.streaming.dstream.InputDStream + +/** + * A Java-friendly interface to [[org.apache.spark.streaming.dstream.InputDStream]]. + */ +class JavaInputDStream[T](val inputDStream: InputDStream[T]) + (implicit override val classTag: ClassTag[T]) extends JavaDStream[T](inputDStream) { +} + +object JavaInputDStream { + /** + * Convert a scala [[org.apache.spark.streaming.dstream.InputDStream]] to a Java-friendly + * [[org.apache.spark.streaming.api.java.JavaInputDStream]]. + */ + implicit def fromInputDStream[T: ClassTag]( + inputDStream: InputDStream[T]): JavaInputDStream[T] = { + new JavaInputDStream[T](inputDStream) + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairInputDStream.scala new file mode 100644 index 0000000000..add8585308 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairInputDStream.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.api.java + +import org.apache.spark.streaming.dstream.InputDStream +import scala.reflect.ClassTag + +/** + * A Java-friendly interface to [[org.apache.spark.streaming.dstream.InputDStream]] of + * key-value pairs. + */ +class JavaPairInputDStream[K, V](val inputDStream: InputDStream[(K, V)])( + implicit val kClassTag: ClassTag[K], implicit val vClassTag: ClassTag[V] + ) extends JavaPairDStream[K, V](inputDStream) { +} + +object JavaPairInputDStream { + /** + * Convert a scala [[org.apache.spark.streaming.dstream.InputDStream]] of pairs to a + * Java-friendly [[org.apache.spark.streaming.api.java.JavaPairInputDStream]]. + */ + implicit def fromInputDStream[K: ClassTag, V: ClassTag]( + inputDStream: InputDStream[(K, V)]): JavaPairInputDStream[K, V] = { + new JavaPairInputDStream[K, V](inputDStream) + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairReceiverInputDStream.scala new file mode 100644 index 0000000000..974b3e4516 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairReceiverInputDStream.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.api.java + +import scala.reflect.ClassTag + +import org.apache.spark.streaming.dstream.ReceiverInputDStream + +/** + * A Java-friendly interface to [[org.apache.spark.streaming.dstream.ReceiverInputDStream]], the + * abstract class for defining any input stream that receives data over the network. + */ +class JavaPairReceiverInputDStream[K, V](val receiverInputDStream: ReceiverInputDStream[(K, V)]) + (implicit override val kClassTag: ClassTag[K], override implicit val vClassTag: ClassTag[V]) + extends JavaPairInputDStream[K, V](receiverInputDStream) { +} + +object JavaPairReceiverInputDStream { + /** + * Convert a scala [[org.apache.spark.streaming.dstream.ReceiverInputDStream]] to a Java-friendly + * [[org.apache.spark.streaming.api.java.JavaReceiverInputDStream]]. + */ + implicit def fromReceiverInputDStream[K: ClassTag, V: ClassTag]( + receiverInputDStream: ReceiverInputDStream[(K, V)]): JavaPairReceiverInputDStream[K, V] = { + new JavaPairReceiverInputDStream[K, V](receiverInputDStream) + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaReceiverInputDStream.scala new file mode 100644 index 0000000000..340ef97980 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaReceiverInputDStream.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.api.java + +import scala.reflect.ClassTag + +import org.apache.spark.streaming.dstream.ReceiverInputDStream + +/** + * A Java-friendly interface to [[org.apache.spark.streaming.dstream.ReceiverInputDStream]], the + * abstract class for defining any input stream that receives data over the network. + */ +class JavaReceiverInputDStream[T](val receiverInputDStream: ReceiverInputDStream[T]) + (implicit override val classTag: ClassTag[T]) extends JavaInputDStream[T](receiverInputDStream) { +} + +object JavaReceiverInputDStream { + /** + * Convert a scala [[org.apache.spark.streaming.dstream.ReceiverInputDStream]] to a Java-friendly + * [[org.apache.spark.streaming.api.java.JavaReceiverInputDStream]]. + */ + implicit def fromReceiverInputDStream[T: ClassTag]( + receiverInputDStream: ReceiverInputDStream[T]): JavaReceiverInputDStream[T] = { + new JavaReceiverInputDStream[T](receiverInputDStream) + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index c800602d09..fbb2e9f85d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -35,7 +35,8 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark.streaming.scheduler.StreamingListener import org.apache.hadoop.conf.Configuration -import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.dstream.{PluggableInputDStream, ReceiverInputDStream, DStream} +import org.apache.spark.streaming.receiver.Receiver /** * A Java-friendly version of [[org.apache.spark.streaming.StreamingContext]] which is the main @@ -155,8 +156,10 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @param port Port to connect to for receiving data * @param storageLevel Storage level to use for storing the received objects */ - def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel) - : JavaDStream[String] = { + def socketTextStream( + hostname: String, port: Int, + storageLevel: StorageLevel + ): JavaReceiverInputDStream[String] = { ssc.socketTextStream(hostname, port, storageLevel) } @@ -167,7 +170,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @param hostname Hostname to connect to for receiving data * @param port Port to connect to for receiving data */ - def socketTextStream(hostname: String, port: Int): JavaDStream[String] = { + def socketTextStream(hostname: String, port: Int): JavaReceiverInputDStream[String] = { ssc.socketTextStream(hostname, port) } @@ -186,7 +189,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { port: Int, converter: JFunction[InputStream, java.lang.Iterable[T]], storageLevel: StorageLevel) - : JavaDStream[T] = { + : JavaReceiverInputDStream[T] = { def fn = (x: InputStream) => converter.call(x).toIterator implicit val cmt: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] @@ -218,10 +221,11 @@ class JavaStreamingContext(val ssc: StreamingContext) { def rawSocketStream[T]( hostname: String, port: Int, - storageLevel: StorageLevel): JavaDStream[T] = { + storageLevel: StorageLevel): JavaReceiverInputDStream[T] = { implicit val cmt: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - JavaDStream.fromDStream(ssc.rawSocketStream(hostname, port, storageLevel)) + JavaReceiverInputDStream.fromReceiverInputDStream( + ssc.rawSocketStream(hostname, port, storageLevel)) } /** @@ -233,10 +237,11 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @param port Port to connect to for receiving data * @tparam T Type of the objects in the received blocks */ - def rawSocketStream[T](hostname: String, port: Int): JavaDStream[T] = { + def rawSocketStream[T](hostname: String, port: Int): JavaReceiverInputDStream[T] = { implicit val cmt: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - JavaDStream.fromDStream(ssc.rawSocketStream(hostname, port)) + JavaReceiverInputDStream.fromReceiverInputDStream( + ssc.rawSocketStream(hostname, port)) } /** @@ -249,7 +254,8 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @tparam V Value type for reading HDFS file * @tparam F Input format for reading HDFS file */ - def fileStream[K, V, F <: NewInputFormat[K, V]](directory: String): JavaPairDStream[K, V] = { + def fileStream[K, V, F <: NewInputFormat[K, V]]( + directory: String): JavaPairInputDStream[K, V] = { implicit val cmk: ClassTag[K] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] implicit val cmv: ClassTag[V] = @@ -275,7 +281,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { name: String, storageLevel: StorageLevel, supervisorStrategy: SupervisorStrategy - ): JavaDStream[T] = { + ): JavaReceiverInputDStream[T] = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] ssc.actorStream[T](props, name, storageLevel, supervisorStrategy) @@ -296,7 +302,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { props: Props, name: String, storageLevel: StorageLevel - ): JavaDStream[T] = { + ): JavaReceiverInputDStream[T] = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] ssc.actorStream[T](props, name, storageLevel) @@ -316,14 +322,14 @@ class JavaStreamingContext(val ssc: StreamingContext) { def actorStream[T]( props: Props, name: String - ): JavaDStream[T] = { + ): JavaReceiverInputDStream[T] = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] ssc.actorStream[T](props, name) } /** - * Creates an input stream from an queue of RDDs. In each batch, + * Create an input stream from an queue of RDDs. In each batch, * it will process either one or all of the RDDs returned by the queue. * * NOTE: changes to the queue after the stream is created will not be recognized. @@ -339,7 +345,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** - * Creates an input stream from an queue of RDDs. In each batch, + * Create an input stream from an queue of RDDs. In each batch, * it will process either one or all of the RDDs returned by the queue. * * NOTE: changes to the queue after the stream is created will not be recognized. @@ -347,7 +353,10 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval * @tparam T Type of objects in the RDD */ - def queueStream[T](queue: java.util.Queue[JavaRDD[T]], oneAtATime: Boolean): JavaDStream[T] = { + def queueStream[T]( + queue: java.util.Queue[JavaRDD[T]], + oneAtATime: Boolean + ): JavaInputDStream[T] = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val sQueue = new scala.collection.mutable.Queue[RDD[T]] @@ -356,7 +365,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** - * Creates an input stream from an queue of RDDs. In each batch, + * Create an input stream from an queue of RDDs. In each batch, * it will process either one or all of the RDDs returned by the queue. * * NOTE: changes to the queue after the stream is created will not be recognized. @@ -368,7 +377,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { def queueStream[T]( queue: java.util.Queue[JavaRDD[T]], oneAtATime: Boolean, - defaultRDD: JavaRDD[T]): JavaDStream[T] = { + defaultRDD: JavaRDD[T]): JavaInputDStream[T] = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val sQueue = new scala.collection.mutable.Queue[RDD[T]] @@ -376,6 +385,17 @@ class JavaStreamingContext(val ssc: StreamingContext) { ssc.queueStream(sQueue, oneAtATime, defaultRDD.rdd) } + /** + * Create an input stream with any arbitrary user implemented receiver. + * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html + * @param receiver Custom implementation of Receiver + */ + def receiverStream[T](receiver: Receiver[T]): ReceiverInputDStream[T] = { + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + ssc.receiverStream(receiver) + } + /** * Create a unified DStream from multiple DStreams of the same type and same slide duration. */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala index 226844c228..aa1993f058 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala @@ -30,7 +30,7 @@ import scala.reflect.ClassTag * FileInputDStream, a subclass of InputDStream, monitors a HDFS directory from the driver for * new files and generates RDDs with the new files. For implementing input streams * that requires running a receiver on the worker nodes, use - * [[org.apache.spark.streaming.dstream.NetworkInputDStream]] as the parent class. + * [[org.apache.spark.streaming.dstream.ReceiverInputDStream]] as the parent class. * * @param ssc_ Streaming context that will execute this input stream */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala deleted file mode 100644 index 5a249706b4..0000000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala +++ /dev/null @@ -1,362 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.dstream - -import java.nio.ByteBuffer -import java.util.concurrent.{ArrayBlockingQueue, TimeUnit} - -import scala.collection.mutable.{ArrayBuffer, HashMap} -import scala.concurrent.Await -import scala.reflect.ClassTag - -import akka.actor.{Actor, Props} -import akka.pattern.ask - -import org.apache.spark.{Logging, SparkEnv} -import org.apache.spark.rdd.{BlockRDD, RDD} -import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId} -import org.apache.spark.streaming._ -import org.apache.spark.streaming.scheduler.{AddBlock, DeregisterReceiver, ReceivedBlockInfo, RegisterReceiver} -import org.apache.spark.streaming.util.{RecurringTimer, SystemClock} -import org.apache.spark.util.{AkkaUtils, Utils} - -/** - * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]] - * that has to start a receiver on worker nodes to receive external data. - * Specific implementations of NetworkInputDStream must - * define the getReceiver() function that gets the receiver object of type - * [[org.apache.spark.streaming.dstream.NetworkReceiver]] that will be sent - * to the workers to receive data. - * @param ssc_ Streaming context that will execute this input stream - * @tparam T Class type of the object of this stream - */ -abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingContext) - extends InputDStream[T](ssc_) { - - /** Keeps all received blocks information */ - private lazy val receivedBlockInfo = new HashMap[Time, Array[ReceivedBlockInfo]] - - /** This is an unique identifier for the network input stream. */ - val id = ssc.getNewNetworkStreamId() - - /** - * Gets the receiver object that will be sent to the worker nodes - * to receive data. This method needs to defined by any specific implementation - * of a NetworkInputDStream. - */ - def getReceiver(): NetworkReceiver[T] - - // Nothing to start or stop as both taken care of by the NetworkInputTracker. - def start() {} - - def stop() {} - - /** Ask NetworkInputTracker for received data blocks and generates RDDs with them. */ - override def compute(validTime: Time): Option[RDD[T]] = { - // If this is called for any time before the start time of the context, - // then this returns an empty RDD. This may happen when recovering from a - // master failure - if (validTime >= graph.startTime) { - val blockInfo = ssc.scheduler.networkInputTracker.getReceivedBlockInfo(id) - receivedBlockInfo(validTime) = blockInfo - val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId]) - Some(new BlockRDD[T](ssc.sc, blockIds)) - } else { - Some(new BlockRDD[T](ssc.sc, Array[BlockId]())) - } - } - - /** Get information on received blocks. */ - private[streaming] def getReceivedBlockInfo(time: Time) = { - receivedBlockInfo(time) - } - - /** - * Clear metadata that are older than `rememberDuration` of this DStream. - * This is an internal method that should not be called directly. This - * implementation overrides the default implementation to clear received - * block information. - */ - private[streaming] override def clearMetadata(time: Time) { - super.clearMetadata(time) - val oldReceivedBlocks = receivedBlockInfo.filter(_._1 <= (time - rememberDuration)) - receivedBlockInfo --= oldReceivedBlocks.keys - logDebug("Cleared " + oldReceivedBlocks.size + " RDDs that were older than " + - (time - rememberDuration) + ": " + oldReceivedBlocks.keys.mkString(", ")) - } -} - - -private[streaming] sealed trait NetworkReceiverMessage -private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverMessage - -/** - * Abstract class of a receiver that can be run on worker nodes to receive external data. See - * [[org.apache.spark.streaming.dstream.NetworkInputDStream]] for an explanation. - */ -abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging { - - /** Local SparkEnv */ - lazy protected val env = SparkEnv.get - - /** Remote Akka actor for the NetworkInputTracker */ - lazy protected val trackerActor = { - val ip = env.conf.get("spark.driver.host", "localhost") - val port = env.conf.getInt("spark.driver.port", 7077) - val url = "akka.tcp://spark@%s:%s/user/NetworkInputTracker".format(ip, port) - env.actorSystem.actorSelection(url) - } - - /** Akka actor for receiving messages from the NetworkInputTracker in the driver */ - lazy protected val actor = env.actorSystem.actorOf( - Props(new NetworkReceiverActor()), "NetworkReceiver-" + streamId) - - /** Timeout for Akka actor messages */ - lazy protected val askTimeout = AkkaUtils.askTimeout(env.conf) - - /** Thread that starts the receiver and stays blocked while data is being received */ - lazy protected val receivingThread = Thread.currentThread() - - /** Exceptions that occurs while receiving data */ - protected lazy val exceptions = new ArrayBuffer[Exception] - - /** Identifier of the stream this receiver is associated with */ - protected var streamId: Int = -1 - - /** - * This method will be called to start receiving data. All your receiver - * starting code should be implemented by defining this function. - */ - protected def onStart() - - /** This method will be called to stop receiving data. */ - protected def onStop() - - /** Conveys a placement preference (hostname) for this receiver. */ - def getLocationPreference() : Option[String] = None - - /** - * Start the receiver. First is accesses all the lazy members to - * materialize them. Then it calls the user-defined onStart() method to start - * other threads, etc required to receiver the data. - */ - def start() { - try { - // Access the lazy vals to materialize them - env - actor - receivingThread - - // Call user-defined onStart() - logInfo("Starting receiver") - onStart() - - // Wait until interrupt is called on this thread - while(true) Thread.sleep(100000) - } catch { - case ie: InterruptedException => - logInfo("Receiving thread has been interrupted, receiver " + streamId + " stopped") - case e: Exception => - logError("Error receiving data in receiver " + streamId, e) - exceptions += e - } - - // Call user-defined onStop() - logInfo("Stopping receiver") - try { - onStop() - } catch { - case e: Exception => - logError("Error stopping receiver " + streamId, e) - exceptions += e - } - - val message = if (exceptions.isEmpty) { - null - } else if (exceptions.size == 1) { - val e = exceptions.head - "Exception in receiver " + streamId + ": " + e.getMessage + "\n" + e.getStackTraceString - } else { - "Multiple exceptions in receiver " + streamId + "(" + exceptions.size + "):\n" - exceptions.zipWithIndex.map { - case (e, i) => "Exception " + i + ": " + e.getMessage + "\n" + e.getStackTraceString - }.mkString("\n") - } - - logInfo("Deregistering receiver " + streamId) - val future = trackerActor.ask(DeregisterReceiver(streamId, message))(askTimeout) - Await.result(future, askTimeout) - logInfo("Deregistered receiver " + streamId) - env.actorSystem.stop(actor) - logInfo("Stopped receiver " + streamId) - } - - /** - * Stop the receiver. First it interrupts the main receiving thread, - * that is, the thread that called receiver.start(). - */ - def stop() { - // Stop receiving by interrupting the receiving thread - receivingThread.interrupt() - logInfo("Interrupted receiving thread " + receivingThread + " for stopping") - } - - /** - * Stop the receiver and reports exception to the tracker. - * This should be called whenever an exception is to be handled on any thread - * of the receiver. - */ - protected def stopOnError(e: Exception) { - logError("Error receiving data", e) - exceptions += e - stop() - } - - /** - * Push a block (as an ArrayBuffer filled with data) into the block manager. - */ - def pushBlock( - blockId: StreamBlockId, - arrayBuffer: ArrayBuffer[T], - metadata: Any, - level: StorageLevel - ) { - env.blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], level) - trackerActor ! AddBlock(ReceivedBlockInfo(streamId, blockId, arrayBuffer.size, metadata)) - logDebug("Pushed block " + blockId) - } - - /** - * Push a block (as bytes) into the block manager. - */ - def pushBlock( - blockId: StreamBlockId, - bytes: ByteBuffer, - metadata: Any, - level: StorageLevel - ) { - env.blockManager.putBytes(blockId, bytes, level) - trackerActor ! AddBlock(ReceivedBlockInfo(streamId, blockId, -1, metadata)) - } - - /** Set the ID of the DStream that this receiver is associated with */ - protected[streaming] def setStreamId(id: Int) { - streamId = id - } - - /** A helper actor that communicates with the NetworkInputTracker */ - private class NetworkReceiverActor extends Actor { - - override def preStart() { - val msg = RegisterReceiver( - streamId, NetworkReceiver.this.getClass.getSimpleName, Utils.localHostName(), self) - val future = trackerActor.ask(msg)(askTimeout) - Await.result(future, askTimeout) - logInfo("Registered receiver " + streamId) - } - - override def receive() = { - case StopReceiver => - logInfo("Received stop signal") - stop() - } - } - - /** - * Batches objects created by a [[org.apache.spark.streaming.dstream.NetworkReceiver]] and puts - * them into appropriately named blocks at regular intervals. This class starts two threads, - * one to periodically start a new batch and prepare the previous batch of as a block, - * the other to push the blocks into the block manager. - */ - class BlockGenerator(storageLevel: StorageLevel) - extends Serializable with Logging { - - case class Block(id: StreamBlockId, buffer: ArrayBuffer[T], metadata: Any = null) - - val clock = new SystemClock() - val blockInterval = env.conf.getLong("spark.streaming.blockInterval", 200) - val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer, - "BlockGenerator") - val blockStorageLevel = storageLevel - val blocksForPushing = new ArrayBlockingQueue[Block](1000) - val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } } - - var currentBuffer = new ArrayBuffer[T] - var stopped = false - - def start() { - blockIntervalTimer.start() - blockPushingThread.start() - logInfo("Started BlockGenerator") - } - - def stop() { - blockIntervalTimer.stop(false) - stopped = true - blockPushingThread.join() - logInfo("Stopped BlockGenerator") - } - - def += (obj: T): Unit = synchronized { - currentBuffer += obj - } - - private def updateCurrentBuffer(time: Long): Unit = synchronized { - try { - val newBlockBuffer = currentBuffer - currentBuffer = new ArrayBuffer[T] - if (newBlockBuffer.size > 0) { - val blockId = StreamBlockId(NetworkReceiver.this.streamId, time - blockInterval) - val newBlock = new Block(blockId, newBlockBuffer) - blocksForPushing.add(newBlock) - } - } catch { - case ie: InterruptedException => - logInfo("Block updating timer thread was interrupted") - case e: Exception => - NetworkReceiver.this.stopOnError(e) - } - } - - private def keepPushingBlocks() { - logInfo("Started block pushing thread") - try { - while(!stopped) { - Option(blocksForPushing.poll(100, TimeUnit.MILLISECONDS)) match { - case Some(block) => - NetworkReceiver.this.pushBlock(block.id, block.buffer, block.metadata, storageLevel) - case None => - } - } - // Push out the blocks that are still left - logInfo("Pushing out the last " + blocksForPushing.size() + " blocks") - while (!blocksForPushing.isEmpty) { - val block = blocksForPushing.take() - NetworkReceiver.this.pushBlock(block.id, block.buffer, block.metadata, storageLevel) - logInfo("Blocks left to push " + blocksForPushing.size()) - } - logInfo("Stopped blocks pushing thread") - } catch { - case ie: InterruptedException => - logInfo("Block pushing thread was interrupted") - case e: Exception => - NetworkReceiver.this.stopOnError(e) - } - } - } -} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala index 6f9477020a..186e1bf03a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala @@ -19,13 +19,14 @@ package org.apache.spark.streaming.dstream import org.apache.spark.streaming.StreamingContext import scala.reflect.ClassTag +import org.apache.spark.streaming.receiver.Receiver private[streaming] class PluggableInputDStream[T: ClassTag]( @transient ssc_ : StreamingContext, - receiver: NetworkReceiver[T]) extends NetworkInputDStream[T](ssc_) { + receiver: Receiver[T]) extends ReceiverInputDStream[T](ssc_) { - def getReceiver(): NetworkReceiver[T] = { + def getReceiver(): Receiver[T] = { receiver } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala index dea0f26f90..e2925b9e03 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.Logging +import org.apache.spark.{Logging, SparkEnv} import org.apache.spark.storage.{StorageLevel, StreamBlockId} import org.apache.spark.streaming.StreamingContext @@ -28,6 +28,7 @@ import java.nio.ByteBuffer import java.nio.channels.{ReadableByteChannel, SocketChannel} import java.io.EOFException import java.util.concurrent.ArrayBlockingQueue +import org.apache.spark.streaming.receiver.Receiver /** @@ -42,21 +43,19 @@ class RawInputDStream[T: ClassTag]( host: String, port: Int, storageLevel: StorageLevel - ) extends NetworkInputDStream[T](ssc_ ) with Logging { + ) extends ReceiverInputDStream[T](ssc_ ) with Logging { - def getReceiver(): NetworkReceiver[T] = { - new RawNetworkReceiver(host, port, storageLevel).asInstanceOf[NetworkReceiver[T]] + def getReceiver(): Receiver[T] = { + new RawNetworkReceiver(host, port, storageLevel).asInstanceOf[Receiver[T]] } } private[streaming] class RawNetworkReceiver(host: String, port: Int, storageLevel: StorageLevel) - extends NetworkReceiver[Any] { + extends Receiver[Any](storageLevel) with Logging { var blockPushingThread: Thread = null - override def getLocationPreference = None - def onStart() { // Open a socket to the target address and keep reading from it logInfo("Connecting to " + host + ":" + port) @@ -73,9 +72,8 @@ class RawNetworkReceiver(host: String, port: Int, storageLevel: StorageLevel) var nextBlockNumber = 0 while (true) { val buffer = queue.take() - val blockId = StreamBlockId(streamId, nextBlockNumber) nextBlockNumber += 1 - pushBlock(blockId, buffer, null, storageLevel) + store(buffer) } } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala new file mode 100644 index 0000000000..75cabdbf8d --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import scala.collection.mutable.HashMap +import scala.reflect.ClassTag + +import org.apache.spark.rdd.{BlockRDD, RDD} +import org.apache.spark.storage.BlockId +import org.apache.spark.streaming._ +import org.apache.spark.streaming.receiver.Receiver +import org.apache.spark.streaming.scheduler.ReceivedBlockInfo + +/** + * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]] + * that has to start a receiver on worker nodes to receive external data. + * Specific implementations of NetworkInputDStream must + * define `the getReceiver()` function that gets the receiver object of type + * [[org.apache.spark.streaming.receiver.Receiver]] that will be sent + * to the workers to receive data. + * @param ssc_ Streaming context that will execute this input stream + * @tparam T Class type of the object of this stream + */ +abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingContext) + extends InputDStream[T](ssc_) { + + /** Keeps all received blocks information */ + private lazy val receivedBlockInfo = new HashMap[Time, Array[ReceivedBlockInfo]] + + /** This is an unique identifier for the network input stream. */ + val id = ssc.getNewReceiverStreamId() + + /** + * Gets the receiver object that will be sent to the worker nodes + * to receive data. This method needs to defined by any specific implementation + * of a NetworkInputDStream. + */ + def getReceiver(): Receiver[T] + + // Nothing to start or stop as both taken care of by the ReceiverInputTracker. + def start() {} + + def stop() {} + + /** Ask ReceiverInputTracker for received data blocks and generates RDDs with them. */ + override def compute(validTime: Time): Option[RDD[T]] = { + // If this is called for any time before the start time of the context, + // then this returns an empty RDD. This may happen when recovering from a + // master failure + if (validTime >= graph.startTime) { + val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id) + receivedBlockInfo(validTime) = blockInfo + val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId]) + Some(new BlockRDD[T](ssc.sc, blockIds)) + } else { + Some(new BlockRDD[T](ssc.sc, Array[BlockId]())) + } + } + + /** Get information on received blocks. */ + private[streaming] def getReceivedBlockInfo(time: Time) = { + receivedBlockInfo(time) + } + + /** + * Clear metadata that are older than `rememberDuration` of this DStream. + * This is an internal method that should not be called directly. This + * implementation overrides the default implementation to clear received + * block information. + */ + private[streaming] override def clearMetadata(time: Time) { + super.clearMetadata(time) + val oldReceivedBlocks = receivedBlockInfo.filter(_._1 <= (time - rememberDuration)) + receivedBlockInfo --= oldReceivedBlocks.keys + logDebug("Cleared " + oldReceivedBlocks.size + " RDDs that were older than " + + (time - rememberDuration) + ": " + oldReceivedBlocks.keys.mkString(", ")) + } +} + diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala index 63d94d1cc6..1e32727eac 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala @@ -24,7 +24,9 @@ import org.apache.spark.util.NextIterator import scala.reflect.ClassTag import java.io._ -import java.net.Socket +import java.net.{UnknownHostException, Socket} +import org.apache.spark.Logging +import org.apache.spark.streaming.receiver.Receiver private[streaming] class SocketInputDStream[T: ClassTag]( @@ -33,9 +35,9 @@ class SocketInputDStream[T: ClassTag]( port: Int, bytesToObjects: InputStream => Iterator[T], storageLevel: StorageLevel - ) extends NetworkInputDStream[T](ssc_) { + ) extends ReceiverInputDStream[T](ssc_) { - def getReceiver(): NetworkReceiver[T] = { + def getReceiver(): Receiver[T] = { new SocketReceiver(host, port, bytesToObjects, storageLevel) } } @@ -46,26 +48,52 @@ class SocketReceiver[T: ClassTag]( port: Int, bytesToObjects: InputStream => Iterator[T], storageLevel: StorageLevel - ) extends NetworkReceiver[T] { + ) extends Receiver[T](storageLevel) with Logging { - lazy protected val blockGenerator = new BlockGenerator(storageLevel) + var socket: Socket = null + var receivingThread: Thread = null - override def getLocationPreference = None + def onStart() { + receivingThread = new Thread("Socket Receiver") { + override def run() { + connect() + receive() + } + } + receivingThread.start() + } - protected def onStart() { - logInfo("Connecting to " + host + ":" + port) - val socket = new Socket(host, port) - logInfo("Connected to " + host + ":" + port) - blockGenerator.start() - val iterator = bytesToObjects(socket.getInputStream()) - while(iterator.hasNext) { - val obj = iterator.next - blockGenerator += obj + def onStop() { + if (socket != null) { + socket.close() + } + socket = null + if (receivingThread != null) { + receivingThread.join() } } - protected def onStop() { - blockGenerator.stop() + def connect() { + try { + logInfo("Connecting to " + host + ":" + port) + socket = new Socket(host, port) + } catch { + case e: Exception => + restart("Could not connect to " + host + ":" + port, e) + } + } + + def receive() { + try { + logInfo("Connected to " + host + ":" + port) + val iterator = bytesToObjects(socket.getInputStream()) + while(!isStopped && iterator.hasNext) { + store(iterator.next) + } + } catch { + case e: Exception => + restart("Error receiving data from socket", e) + } } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala new file mode 100644 index 0000000000..821cf19481 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.receiver + +import java.util.concurrent.atomic.AtomicInteger + +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag + +import akka.actor._ +import akka.actor.SupervisorStrategy.{Escalate, Restart} +import org.apache.spark.{Logging, SparkEnv} +import org.apache.spark.storage.StorageLevel +import java.nio.ByteBuffer + +/** A helper with set of defaults for supervisor strategy */ +object ActorSupervisorStrategy { + + val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = + 15 millis) { + case _: RuntimeException => Restart + case _: Exception => Escalate + } +} + +/** + * A receiver trait to be mixed in with your Actor to gain access to + * the API for pushing received data into Spark Streaming for being processed. + * + * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html + * + * @example {{{ + * class MyActor extends Actor with ActorHelper{ + * def receive { + * case anything: String => store(anything) + * } + * } + * + * // Can be used with an actorStream as follows + * ssc.actorStream[String](Props(new MyActor),"MyActorReceiver") + * + * }}} + * + * @note Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e parametrized type of push block and InputDStream + * should be same. + */ +trait ActorHelper { + + self: Actor => // to ensure that this can be added to Actor classes only + + /** Store an iterator of received data as a data block into Spark's memory. */ + def store[T](iter: Iterator[T]) { + println("Storing iterator") + context.parent ! IteratorData(iter) + } + + /** + * Store the bytes of received data as a data block into Spark's memory. Note + * that the data in the ByteBuffer must be serialized using the same serializer + * that Spark is configured to use. + */ + def store(bytes: ByteBuffer) { + context.parent ! ByteBufferData(bytes) + } + + /** + * Store a single item of received data to Spark's memory. + * These single items will be aggregated together into data blocks before + * being pushed into Spark's memory. + */ + def store[T](item: T) { + println("Storing item") + context.parent ! SingleItemData(item) + } +} + +/** + * Statistics for querying the supervisor about state of workers. Used in + * conjunction with `StreamingContext.actorStream` and + * [[org.apache.spark.streaming.receiver.ActorHelper]]. + */ +case class Statistics(numberOfMsgs: Int, + numberOfWorkers: Int, + numberOfHiccups: Int, + otherInfo: String) + +/** Case class to receive data sent by child actors */ +private[streaming] sealed trait ActorReceiverData +private[streaming] case class SingleItemData[T](item: T) extends ActorReceiverData +private[streaming] case class IteratorData[T](iterator: Iterator[T]) extends ActorReceiverData +private[streaming] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData + +/** + * Provides Actors as receivers for receiving stream. + * + * As Actors can also be used to receive data from almost any stream source. + * A nice set of abstraction(s) for actors as receivers is already provided for + * a few general cases. It is thus exposed as an API where user may come with + * his own Actor to run as receiver for Spark Streaming input source. + * + * This starts a supervisor actor which starts workers and also provides + * [http://doc.akka.io/docs/akka/snapshot/scala/fault-tolerance.html fault-tolerance]. + * + * Here's a way to start more supervisor/workers as its children. + * + * @example {{{ + * context.parent ! Props(new Supervisor) + * }}} OR {{{ + * context.parent ! Props(new Worker, "Worker") + * }}} + */ +private[streaming] class ActorReceiver[T: ClassTag]( + props: Props, + name: String, + storageLevel: StorageLevel, + receiverSupervisorStrategy: SupervisorStrategy + ) extends Receiver[T](storageLevel) with Logging { + + protected lazy val supervisor = SparkEnv.get.actorSystem.actorOf(Props(new Supervisor), + "Supervisor" + streamId) + + class Supervisor extends Actor { + + override val supervisorStrategy = receiverSupervisorStrategy + val worker = context.actorOf(props, name) + logInfo("Started receiver worker at:" + worker.path) + + val n: AtomicInteger = new AtomicInteger(0) + val hiccups: AtomicInteger = new AtomicInteger(0) + + def receive = { + + case IteratorData(iterator) => + println("received iterator") + store(iterator.asInstanceOf[Iterator[T]]) + + case SingleItemData(msg) => + println("received single") + store(msg.asInstanceOf[T]) + n.incrementAndGet + + case ByteBufferData(bytes) => + store(bytes) + + case props: Props => + val worker = context.actorOf(props) + logInfo("Started receiver worker at:" + worker.path) + sender ! worker + + case (props: Props, name: String) => + val worker = context.actorOf(props, name) + logInfo("Started receiver worker at:" + worker.path) + sender ! worker + + case _: PossiblyHarmful => hiccups.incrementAndGet() + + case _: Statistics => + val workers = context.children + sender ! Statistics(n.get, workers.size, hiccups.get, workers.mkString("\n")) + + } + } + + def onStart() = { + supervisor + logInfo("Supervision tree for receivers initialized at:" + supervisor.path) + + } + + def onStop() = { + supervisor ! PoisonPill + } +} + diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala new file mode 100644 index 0000000000..78cc2daa56 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.receiver + +import java.util.concurrent.{ArrayBlockingQueue, TimeUnit} + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{Logging, SparkConf} +import org.apache.spark.storage.StreamBlockId +import org.apache.spark.streaming.util.{RecurringTimer, SystemClock} + +/** Listener object for BlockGenerator events */ +private[streaming] trait BlockGeneratorListener { + /** Called when a new block needs to be pushed */ + def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) + /** Called when an error has occurred in BlockGenerator */ + def onError(message: String, throwable: Throwable) +} + +/** + * Generates batches of objects received by a + * [[org.apache.spark.streaming.receiver.Receiver]] and puts them into appropriately + * named blocks at regular intervals. This class starts two threads, + * one to periodically start a new batch and prepare the previous batch of as a block, + * the other to push the blocks into the block manager. + */ +private[streaming] class BlockGenerator( + listener: BlockGeneratorListener, + receiverId: Int, + conf: SparkConf + ) extends Logging { + + private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any]) + + private val clock = new SystemClock() + private val blockInterval = conf.getLong("spark.streaming.blockInterval", 200) + private val blockIntervalTimer = + new RecurringTimer(clock, blockInterval, updateCurrentBuffer, "BlockGenerator") + private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10) + private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize) + private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } } + + @volatile private var currentBuffer = new ArrayBuffer[Any] + @volatile private var stopped = false + + /** Start block generating and pushing threads. */ + def start() { + blockIntervalTimer.start() + blockPushingThread.start() + logInfo("Started BlockGenerator") + } + + /** Stop all threads. */ + def stop() { + logInfo("Stopping BlockGenerator") + blockIntervalTimer.stop(interruptTimer = false) + stopped = true + logInfo("Waiting for block pushing thread") + blockPushingThread.join() + logInfo("Stopped BlockGenerator") + } + + /** + * Push a single data item into the buffer. All received data items + * will be periodically pushed into BlockManager. + */ + def += (data: Any): Unit = synchronized { + currentBuffer += data + } + + /** Change the buffer to which single records are added to. */ + private def updateCurrentBuffer(time: Long): Unit = synchronized { + try { + val newBlockBuffer = currentBuffer + currentBuffer = new ArrayBuffer[Any] + if (newBlockBuffer.size > 0) { + val blockId = StreamBlockId(receiverId, time - blockInterval) + val newBlock = new Block(blockId, newBlockBuffer) + blocksForPushing.put(newBlock) // put is blocking when queue is full + logDebug("Last element in " + blockId + " is " + newBlockBuffer.last) + } + } catch { + case ie: InterruptedException => + logInfo("Block updating timer thread was interrupted") + case t: Throwable => + reportError("Error in block updating thread", t) + } + } + + /** Keep pushing blocks to the BlockManager. */ + private def keepPushingBlocks() { + logInfo("Started block pushing thread") + try { + while(!stopped) { + Option(blocksForPushing.poll(100, TimeUnit.MILLISECONDS)) match { + case Some(block) => pushBlock(block) + case None => + } + } + // Push out the blocks that are still left + logInfo("Pushing out the last " + blocksForPushing.size() + " blocks") + while (!blocksForPushing.isEmpty) { + logDebug("Getting block ") + val block = blocksForPushing.take() + pushBlock(block) + logInfo("Blocks left to push " + blocksForPushing.size()) + } + logInfo("Stopped block pushing thread") + } catch { + case ie: InterruptedException => + logInfo("Block pushing thread was interrupted") + case t: Throwable => + reportError("Error in block pushing thread", t) + } + } + + private def reportError(message: String, t: Throwable) { + logError(message, t) + listener.onError(message, t) + } + + private def pushBlock(block: Block) { + listener.onPushBlock(block.id, block.buffer) + logInfo("Pushed block " + block.id) + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala new file mode 100644 index 0000000000..44eecf1dd2 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.receiver + +import java.nio.ByteBuffer + +import scala.collection.mutable.ArrayBuffer +import scala.collection.JavaConversions._ + +import org.apache.spark.storage.StorageLevel + +/** + * Abstract class of a receiver that can be run on worker nodes to receive external data. A + * custom receiver can be defined by defining the functions onStart() and onStop(). onStart() + * should define the setup steps necessary to start receiving data, + * and onStop() should define the cleanup steps necessary to stop receiving data. A custom + * receiver would look something like this. + * + * @example {{{ + * class MyReceiver(storageLevel: StorageLevel) extends NetworkReceiver[String](storageLevel) { + * def onStart() { + * // Setup stuff (start threads, open sockets, etc.) to start receiving data. + * // Must start new thread to receive data, as onStart() must be non-blocking. + * + * // Call store(...) in those threads to store received data into Spark's memory. + * + * // Call stop(...), restart() or reportError(...) on any thread based on how + * // different errors should be handled. + * + * // See corresponding method documentation for more details + * } + * + * def onStop() { + * // Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data. + * } + * } + * }}} + */ +abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable { + + /** + * This method is called by the system when the receiver is started. This function + * must initialize all resources (threads, buffers, etc.) necessary for receiving data. + * This function must be non-blocking, so receiving the data must occur on a different + * thread. Received data can be stored with Spark by calling `store(data)`. + * + * If there are errors in threads started here, then following options can be done + * (i) `reportError(...)` can be called to report the error to the driver. + * The receiving of data will continue uninterrupted. + * (ii) `stop(...)` can be called to stop receiving data. This will call `onStop()` to + * clear up all resources allocated (threads, buffers, etc.) during `onStart()`. + * (iii) `restart(...)` can be called to restart the receiver. This will call `onStop()` + * immediately, and then `onStart()` after a delay. + */ + def onStart() + + /** + * This method is called by the system when the receiver is stopped. All resources + * (threads, buffers, etc.) setup in `onStart()` must be cleaned up in this method. + */ + def onStop() + + /** Override this to specify a preferred location (hostname). */ + def preferredLocation : Option[String] = None + + /** + * Store a single item of received data to Spark's memory. + * These single items will be aggregated together into data blocks before + * being pushed into Spark's memory. + */ + def store(dataItem: T) { + executor.pushSingle(dataItem) + } + + /** Store an ArrayBuffer of received data as a data block into Spark's memory. */ + def store(dataBuffer: ArrayBuffer[T]) { + executor.pushArrayBuffer(dataBuffer, None, None) + } + + /** + * Store an ArrayBuffer of received data as a data block into Spark's memory. + * The metadata will be associated with this block of data + * for being used in the corresponding InputDStream. + */ + def store(dataBuffer: ArrayBuffer[T], metadata: Any) { + executor.pushArrayBuffer(dataBuffer, Some(metadata), None) + } + + /** Store an iterator of received data as a data block into Spark's memory. */ + def store(dataIterator: Iterator[T]) { + executor.pushIterator(dataIterator, None, None) + } + + /** + * Store an iterator of received data as a data block into Spark's memory. + * The metadata will be associated with this block of data + * for being used in the corresponding InputDStream. + */ + def store(dataIterator: java.util.Iterator[T], metadata: Any) { + executor.pushIterator(dataIterator, Some(metadata), None) + } + + /** Store an iterator of received data as a data block into Spark's memory. */ + def store(dataIterator: java.util.Iterator[T]) { + executor.pushIterator(dataIterator, None, None) + } + + /** + * Store an iterator of received data as a data block into Spark's memory. + * The metadata will be associated with this block of data + * for being used in the corresponding InputDStream. + */ + def store(dataIterator: Iterator[T], metadata: Any) { + executor.pushIterator(dataIterator, Some(metadata), None) + } + + /** + * Store the bytes of received data as a data block into Spark's memory. Note + * that the data in the ByteBuffer must be serialized using the same serializer + * that Spark is configured to use. + */ + def store(bytes: ByteBuffer) { + executor.pushBytes(bytes, None, None) + } + + /** + * Store the bytes of received data as a data block into Spark's memory. + * The metadata will be associated with this block of data + * for being used in the corresponding InputDStream. + */ + def store(bytes: ByteBuffer, metadata: Any) { + executor.pushBytes(bytes, Some(metadata), None) + } + + /** Report exceptions in receiving data. */ + def reportError(message: String, throwable: Throwable) { + executor.reportError(message, throwable) + } + + /** + * Restart the receiver. This will call `onStop()` immediately and return. + * Asynchronously, after a delay, `onStart()` will be called. + * The `message` will be reported to the driver. + * The delay is defined by the Spark configuration + * `spark.streaming.receiverRestartDelay`. + */ + def restart(message: String) { + executor.restartReceiver(message) + } + + /** + * Restart the receiver. This will call `onStop()` immediately and return. + * Asynchronously, after a delay, `onStart()` will be called. + * The `message` and `exception` will be reported to the driver. + * The delay is defined by the Spark configuration + * `spark.streaming.receiverRestartDelay`. + */ + def restart(message: String, error: Throwable) { + executor.restartReceiver(message, Some(error)) + } + + /** + * Restart the receiver. This will call `onStop()` immediately and return. + * Asynchronously, after the given delay, `onStart()` will be called. + */ + def restart(message: String, error: Throwable, millisecond: Int) { + executor.restartReceiver(message, Some(error), millisecond) + } + + /** Stop the receiver completely. */ + def stop(message: String) { + executor.stop(message, None) + } + + /** Stop the receiver completely due to an exception */ + def stop(message: String, error: Throwable) { + executor.stop(message, Some(error)) + } + + def isStarted(): Boolean = { + executor.isReceiverStarted() + } + + /** Check if receiver has been marked for stopping. */ + def isStopped(): Boolean = { + !executor.isReceiverStarted() + } + + /** Get unique identifier of this receiver. */ + def streamId = id + + /* + * ================= + * Private methods + * ================= + */ + + /** Identifier of the stream this receiver is associated with. */ + private var id: Int = -1 + + /** Handler object that runs the receiver. This is instantiated lazily in the worker. */ + private[streaming] var executor_ : ReceiverSupervisor = null + + /** Set the ID of the DStream that this receiver is associated with. */ + private[streaming] def setReceiverId(id_ : Int) { + id = id_ + } + + /** Attach Network Receiver executor to this receiver. */ + private[streaming] def attachExecutor(exec: ReceiverSupervisor) { + assert(executor_ == null) + executor_ = exec + } + + /** Get the attached executor. */ + private def executor = { + assert(executor_ != null, "Executor has not been attached to this receiver") + executor_ + } +} + diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala new file mode 100644 index 0000000000..6ab3ca6ea5 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.receiver + +/** Messages sent to the NetworkReceiver. */ +private[streaming] sealed trait NetworkReceiverMessage +private[streaming] object StopReceiver extends NetworkReceiverMessage + diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala new file mode 100644 index 0000000000..256b3335e4 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.receiver + +import java.nio.ByteBuffer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{Logging, SparkConf} +import org.apache.spark.storage.StreamBlockId +import java.util.concurrent.CountDownLatch +import scala.concurrent._ +import ExecutionContext.Implicits.global + +/** + * Abstract class that is responsible for supervising a Receiver in the worker. + * It provides all the necessary interfaces for handling the data received by the receiver. + */ +private[streaming] abstract class ReceiverSupervisor( + receiver: Receiver[_], + conf: SparkConf + ) extends Logging { + + /** Enumeration to identify current state of the StreamingContext */ + object ReceiverState extends Enumeration { + type CheckpointState = Value + val Initialized, Started, Stopped = Value + } + import ReceiverState._ + + // Attach the executor to the receiver + receiver.attachExecutor(this) + + /** Receiver id */ + protected val streamId = receiver.streamId + + /** Has the receiver been marked for stop. */ + private val stopLatch = new CountDownLatch(1) + + /** Time between a receiver is stopped and started again */ + private val defaultRestartDelay = conf.getInt("spark.streaming.receiverRestartDelay", 2000) + + /** Exception associated with the stopping of the receiver */ + @volatile protected var stoppingError: Throwable = null + + /** State of the receiver */ + @volatile private[streaming] var receiverState = Initialized + + /** Push a single data item to backend data store. */ + def pushSingle(data: Any) + + /** Store the bytes of received data as a data block into Spark's memory. */ + def pushBytes( + bytes: ByteBuffer, + optionalMetadata: Option[Any], + optionalBlockId: Option[StreamBlockId] + ) + + /** Store a iterator of received data as a data block into Spark's memory. */ + def pushIterator( + iterator: Iterator[_], + optionalMetadata: Option[Any], + optionalBlockId: Option[StreamBlockId] + ) + + /** Store an ArrayBuffer of received data as a data block into Spark's memory. */ + def pushArrayBuffer( + arrayBuffer: ArrayBuffer[_], + optionalMetadata: Option[Any], + optionalBlockId: Option[StreamBlockId] + ) + + /** Report errors. */ + def reportError(message: String, throwable: Throwable) + + /** Start the executor */ + def start() { + startReceiver() + } + + /** Mark the executor and the receiver for stopping */ + def stop(message: String, error: Option[Throwable]) { + stoppingError = error.orNull + stopReceiver(message, error) + stopLatch.countDown() + } + + /** Start receiver */ + def startReceiver(): Unit = synchronized { + try { + logInfo("Starting receiver") + onReceiverStart() + receiverState = Started + } catch { + case t: Throwable => + stop("Error starting receiver " + streamId, Some(t)) + } + } + + /** Stop receiver */ + def stopReceiver(message: String, error: Option[Throwable]): Unit = synchronized { + try { + receiverState = Stopped + onReceiverStop(message, error) + } catch { + case t: Throwable => + stop("Error stopping receiver " + streamId, Some(t)) + } + } + + /** Restart receiver with delay */ + def restartReceiver(message: String, error: Option[Throwable] = None) { + restartReceiver(message, error, defaultRestartDelay) + } + + /** Restart receiver with delay */ + def restartReceiver(message: String, error: Option[Throwable], delay: Int) { + logWarning("Restarting receiver with delay " + delay + " ms: " + message, + error.getOrElse(null)) + stopReceiver("Restarting receiver with delay " + delay + "ms: " + message, error) + future { + logDebug("Sleeping for " + delay) + Thread.sleep(delay) + logDebug("Starting receiver again") + startReceiver() + logInfo("Receiver started again") + } + } + + /** Called when the receiver needs to be started */ + protected def onReceiverStart(): Unit = synchronized { + // Call user-defined onStart() + logInfo("Calling receiver onStart") + receiver.onStart() + logInfo("Called receiver onStart") + } + + /** Called when the receiver needs to be stopped */ + protected def onReceiverStop(message: String, error: Option[Throwable]): Unit = synchronized { + // Call user-defined onStop() + logInfo("Calling receiver onStop") + receiver.onStop() + logInfo("Called receiver onStop") + } + + /** Check if receiver has been marked for stopping */ + def isReceiverStarted() = { + logDebug("state = " + receiverState) + receiverState == Started + } + + /** Wait the thread until the executor is stopped */ + def awaitTermination() { + stopLatch.await() + logInfo("Waiting for executor stop is over") + if (stoppingError != null) { + logError("Stopped executor with error: " + stoppingError) + } else { + logWarning("Stopped executor without error") + } + if (stoppingError != null) { + throw stoppingError + } + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala new file mode 100644 index 0000000000..2a3521bd46 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.receiver + +import java.nio.ByteBuffer +import java.util.concurrent.atomic.AtomicLong + +import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} +import scala.concurrent.Await + +import akka.actor.{Actor, Props} +import akka.pattern.ask + +import org.apache.spark.{Logging, SparkEnv} +import org.apache.spark.storage.StreamBlockId +import org.apache.spark.streaming.scheduler._ +import org.apache.spark.util.{Utils, AkkaUtils} +import org.apache.spark.storage.StreamBlockId +import org.apache.spark.streaming.scheduler.DeregisterReceiver +import org.apache.spark.streaming.scheduler.AddBlock +import scala.Some +import org.apache.spark.streaming.scheduler.RegisterReceiver +import com.google.common.base.Throwables + +/** + * Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]] + * which provides all the necessary functionality for handling the data received by + * the receiver. Specifically, it creates a [[org.apache.spark.streaming.receiver.BlockGenerator]] + * object that is used to divide the received data stream into blocks of data. + */ +private[streaming] class ReceiverSupervisorImpl( + receiver: Receiver[_], + env: SparkEnv + ) extends ReceiverSupervisor(receiver, env.conf) with Logging { + + private val blockManager = env.blockManager + + private val storageLevel = receiver.storageLevel + + /** Remote Akka actor for the ReceiverTracker */ + private val trackerActor = { + val ip = env.conf.get("spark.driver.host", "localhost") + val port = env.conf.getInt("spark.driver.port", 7077) + val url = "akka.tcp://spark@%s:%s/user/ReceiverTracker".format(ip, port) + env.actorSystem.actorSelection(url) + } + + /** Timeout for Akka actor messages */ + private val askTimeout = AkkaUtils.askTimeout(env.conf) + + /** Akka actor for receiving messages from the ReceiverTracker in the driver */ + private val actor = env.actorSystem.actorOf( + Props(new Actor { + override def preStart() { + logInfo("Registered receiver " + streamId) + val msg = RegisterReceiver( + streamId, receiver.getClass.getSimpleName, Utils.localHostName(), self) + val future = trackerActor.ask(msg)(askTimeout) + Await.result(future, askTimeout) + } + + override def receive() = { + case StopReceiver => + logInfo("Received stop signal") + stop("Stopped by driver", None) + } + }), "Receiver-" + streamId + "-" + System.currentTimeMillis()) + + /** Unique block ids if one wants to add blocks directly */ + private val newBlockId = new AtomicLong(System.currentTimeMillis()) + + /** Divides received data records into data blocks for pushing in BlockManager. */ + private val blockGenerator = new BlockGenerator(new BlockGeneratorListener { + def onError(message: String, throwable: Throwable) { + reportError(message, throwable) + } + + def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) { + pushArrayBuffer(arrayBuffer, None, Some(blockId)) + } + }, streamId, env.conf) + + /** Push a single record of received data into block generator. */ + def pushSingle(data: Any) { + blockGenerator += (data) + } + + /** Store an ArrayBuffer of received data as a data block into Spark's memory. */ + def pushArrayBuffer( + arrayBuffer: ArrayBuffer[_], + optionalMetadata: Option[Any], + optionalBlockId: Option[StreamBlockId] + ) { + val blockId = optionalBlockId.getOrElse(nextBlockId) + val time = System.currentTimeMillis + blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], + storageLevel, tellMaster = true) + logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms") + reportPushedBlock(blockId, arrayBuffer.size, optionalMetadata) + } + + /** Store a iterator of received data as a data block into Spark's memory. */ + def pushIterator( + iterator: Iterator[_], + optionalMetadata: Option[Any], + optionalBlockId: Option[StreamBlockId] + ) { + val blockId = optionalBlockId.getOrElse(nextBlockId) + val time = System.currentTimeMillis + blockManager.put(blockId, iterator, storageLevel, tellMaster = true) + logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms") + reportPushedBlock(blockId, -1, optionalMetadata) + } + + /** Store the bytes of received data as a data block into Spark's memory. */ + def pushBytes( + bytes: ByteBuffer, + optionalMetadata: Option[Any], + optionalBlockId: Option[StreamBlockId] + ) { + val blockId = optionalBlockId.getOrElse(nextBlockId) + val time = System.currentTimeMillis + blockManager.putBytes(blockId, bytes, storageLevel, tellMaster = true) + logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms") + reportPushedBlock(blockId, -1, optionalMetadata) + } + + /** Report pushed block */ + def reportPushedBlock(blockId: StreamBlockId, numRecords: Long, optionalMetadata: Option[Any]) { + val blockInfo = ReceivedBlockInfo(streamId, blockId, numRecords, optionalMetadata.orNull) + trackerActor ! AddBlock(blockInfo) + logDebug("Reported block " + blockId) + } + + /** Report error to the receiver tracker */ + def reportError(message: String, error: Throwable) { + val errorString = Option(error).map(Throwables.getStackTraceAsString).getOrElse("") + trackerActor ! ReportError(streamId, message, errorString) + logWarning("Reported error " + message + " - " + error) + } + + override def onReceiverStart() { + blockGenerator.start() + super.onReceiverStart() + } + + override def onReceiverStop(message: String, error: Option[Throwable]) { + super.onReceiverStop(message, error) + blockGenerator.stop() + logInfo("Deregistering receiver " + streamId) + val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("") + val future = trackerActor.ask( + DeregisterReceiver(streamId, message, errorString))(askTimeout) + Await.result(future, askTimeout) + logInfo("Stopped receiver " + streamId) + } + + override def stop(message: String, error: Option[Throwable]) { + super.stop(message, error) + env.actorSystem.stop(actor) + } + + /** Generate new block ID */ + private def nextBlockId = StreamBlockId(streamId, newBlockId.getAndIncrement) +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala deleted file mode 100644 index da0d364ae7..0000000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.receivers - -import akka.actor.{ Actor, PoisonPill, Props, SupervisorStrategy } -import akka.actor.{ actorRef2Scala, ActorRef } -import akka.actor.{ PossiblyHarmful, OneForOneStrategy } -import akka.actor.SupervisorStrategy._ - -import scala.concurrent.duration._ -import scala.language.postfixOps -import scala.reflect.ClassTag - -import org.apache.spark.storage.{StorageLevel, StreamBlockId} -import org.apache.spark.streaming.dstream.NetworkReceiver - -import java.util.concurrent.atomic.AtomicInteger - -import scala.collection.mutable.ArrayBuffer - -/** A helper with set of defaults for supervisor strategy */ -object ReceiverSupervisorStrategy { - - val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = - 15 millis) { - case _: RuntimeException => Restart - case _: Exception => Escalate - } -} - -/** - * A receiver trait to be mixed in with your Actor to gain access to - * the API for pushing received data into Spark Streaming for being processed. - * - * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html - * - * @example {{{ - * class MyActor extends Actor with Receiver{ - * def receive { - * case anything: String => pushBlock(anything) - * } - * } - * - * // Can be used with an actorStream as follows - * ssc.actorStream[String](Props(new MyActor),"MyActorReceiver") - * - * }}} - * - * @note Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e parametrized type of push block and InputDStream - * should be same. - */ -trait Receiver { - - self: Actor => // to ensure that this can be added to Actor classes only - - /** - * Push an iterator received data into Spark Streaming for processing - */ - def pushBlock[T: ClassTag](iter: Iterator[T]) { - context.parent ! Data(iter) - } - - /** - * Push a single item of received data into Spark Streaming for processing - */ - def pushBlock[T: ClassTag](data: T) { - context.parent ! Data(data) - } -} - -/** - * Statistics for querying the supervisor about state of workers. Used in - * conjunction with `StreamingContext.actorStream` and - * [[org.apache.spark.streaming.receivers.Receiver]]. - */ -case class Statistics(numberOfMsgs: Int, - numberOfWorkers: Int, - numberOfHiccups: Int, - otherInfo: String) - -/** Case class to receive data sent by child actors */ -private[streaming] case class Data[T: ClassTag](data: T) - -/** - * Provides Actors as receivers for receiving stream. - * - * As Actors can also be used to receive data from almost any stream source. - * A nice set of abstraction(s) for actors as receivers is already provided for - * a few general cases. It is thus exposed as an API where user may come with - * his own Actor to run as receiver for Spark Streaming input source. - * - * This starts a supervisor actor which starts workers and also provides - * [http://doc.akka.io/docs/akka/snapshot/scala/fault-tolerance.html fault-tolerance]. - * - * Here's a way to start more supervisor/workers as its children. - * - * @example {{{ - * context.parent ! Props(new Supervisor) - * }}} OR {{{ - * context.parent ! Props(new Worker, "Worker") - * }}} - */ -private[streaming] class ActorReceiver[T: ClassTag]( - props: Props, - name: String, - storageLevel: StorageLevel, - receiverSupervisorStrategy: SupervisorStrategy) - extends NetworkReceiver[T] { - - protected lazy val blocksGenerator: BlockGenerator = - new BlockGenerator(storageLevel) - - protected lazy val supervisor = env.actorSystem.actorOf(Props(new Supervisor), - "Supervisor" + streamId) - - class Supervisor extends Actor { - - override val supervisorStrategy = receiverSupervisorStrategy - val worker = context.actorOf(props, name) - logInfo("Started receiver worker at:" + worker.path) - - val n: AtomicInteger = new AtomicInteger(0) - val hiccups: AtomicInteger = new AtomicInteger(0) - - def receive = { - - case Data(iter: Iterator[_]) => pushBlock(iter.asInstanceOf[Iterator[T]]) - - case Data(msg) => - blocksGenerator += msg.asInstanceOf[T] - n.incrementAndGet - - case props: Props => - val worker = context.actorOf(props) - logInfo("Started receiver worker at:" + worker.path) - sender ! worker - - case (props: Props, name: String) => - val worker = context.actorOf(props, name) - logInfo("Started receiver worker at:" + worker.path) - sender ! worker - - case _: PossiblyHarmful => hiccups.incrementAndGet() - - case _: Statistics => - val workers = context.children - sender ! Statistics(n.get, workers.size, hiccups.get, workers.mkString("\n")) - - } - } - - protected def pushBlock(iter: Iterator[T]) { - val buffer = new ArrayBuffer[T] - buffer ++= iter - pushBlock(StreamBlockId(streamId, System.nanoTime()), buffer, null, storageLevel) - } - - protected def onStart() = { - blocksGenerator.start() - supervisor - logInfo("Supervision tree for receivers initialized at:" + supervisor.path) - - } - - protected def onStop() = { - supervisor ! PoisonPill - } -} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index e564eccba2..374848358e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -38,6 +38,7 @@ private[streaming] class JobGenerator(jobScheduler: JobScheduler) extends Logging { private val ssc = jobScheduler.ssc + private val conf = ssc.conf private val graph = ssc.graph val clock = { @@ -93,26 +94,31 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { if (processReceivedData) { logInfo("Stopping JobGenerator gracefully") val timeWhenStopStarted = System.currentTimeMillis() - val stopTimeout = 10 * ssc.graph.batchDuration.milliseconds + val stopTimeout = conf.getLong( + "spark.streaming.gracefulStopTimeout", + 10 * ssc.graph.batchDuration.milliseconds + ) val pollTime = 100 // To prevent graceful stop to get stuck permanently def hasTimedOut = { val timedOut = System.currentTimeMillis() - timeWhenStopStarted > stopTimeout - if (timedOut) logWarning("Timed out while stopping the job generator") + if (timedOut) { + logWarning("Timed out while stopping the job generator (timeout = " + stopTimeout + ")") + } timedOut } // Wait until all the received blocks in the network input tracker has // been consumed by network input DStreams, and jobs have been generated with them logInfo("Waiting for all received blocks to be consumed for job generation") - while(!hasTimedOut && jobScheduler.networkInputTracker.hasMoreReceivedBlockIds) { + while(!hasTimedOut && jobScheduler.receiverTracker.hasMoreReceivedBlockIds) { Thread.sleep(pollTime) } logInfo("Waited for all received blocks to be consumed for job generation") // Stop generating jobs - val stopTime = timer.stop(false) + val stopTime = timer.stop(interruptTimer = false) graph.stop() logInfo("Stopped generation timer") @@ -214,7 +220,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { SparkEnv.set(ssc.env) Try(graph.generateJobs(time)) match { case Success(jobs) => - val receivedBlockInfo = graph.getNetworkInputStreams.map { stream => + val receivedBlockInfo = graph.getReceiverInputStreams.map { stream => val streamId = stream.id val receivedBlockInfo = stream.getReceivedBlockInfo(time) (streamId, receivedBlockInfo) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index d9ada99b47..1b034b9fb1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -46,7 +46,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { // These two are created only when scheduler starts. // eventActor not being null means the scheduler has been started and not stopped - var networkInputTracker: NetworkInputTracker = null + var receiverTracker: ReceiverTracker = null private var eventActor: ActorRef = null @@ -61,8 +61,8 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { }), "JobScheduler") listenerBus.start() - networkInputTracker = new NetworkInputTracker(ssc) - networkInputTracker.start() + receiverTracker = new ReceiverTracker(ssc) + receiverTracker.start() jobGenerator.start() logInfo("Started JobScheduler") } @@ -72,7 +72,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { logDebug("Stopping JobScheduler") // First, stop receiving - networkInputTracker.stop() + receiverTracker.stop() // Second, stop generating jobs. If it has to process all received data, // then this will wait for all the processing through JobScheduler to be over. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala deleted file mode 100644 index 438e72a7ce..0000000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala +++ /dev/null @@ -1,257 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.scheduler - -import scala.collection.mutable.{HashMap, SynchronizedMap, SynchronizedQueue} -import scala.language.existentials - -import akka.actor._ - -import org.apache.spark.{Logging, SparkEnv, SparkException} -import org.apache.spark.SparkContext._ -import org.apache.spark.storage.StreamBlockId -import org.apache.spark.streaming.{StreamingContext, Time} -import org.apache.spark.streaming.dstream.{NetworkReceiver, StopReceiver} -import org.apache.spark.util.AkkaUtils - -/** Information about receiver */ -case class ReceiverInfo(streamId: Int, typ: String, location: String) { - override def toString = s"$typ-$streamId" -} - -/** Information about blocks received by the network receiver */ -case class ReceivedBlockInfo( - streamId: Int, - blockId: StreamBlockId, - numRecords: Long, - metadata: Any - ) - -/** - * Messages used by the NetworkReceiver and the NetworkInputTracker to communicate - * with each other. - */ -private[streaming] sealed trait NetworkInputTrackerMessage -private[streaming] case class RegisterReceiver( - streamId: Int, - typ: String, - host: String, - receiverActor: ActorRef - ) extends NetworkInputTrackerMessage -private[streaming] case class AddBlock(receivedBlockInfo: ReceivedBlockInfo) - extends NetworkInputTrackerMessage -private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) - extends NetworkInputTrackerMessage - -/** - * This class manages the execution of the receivers of NetworkInputDStreams. Instance of - * this class must be created after all input streams have been added and StreamingContext.start() - * has been called because it needs the final set of input streams at the time of instantiation. - */ -private[streaming] -class NetworkInputTracker(ssc: StreamingContext) extends Logging { - - val networkInputStreams = ssc.graph.getNetworkInputStreams() - val networkInputStreamMap = Map(networkInputStreams.map(x => (x.id, x)): _*) - val receiverExecutor = new ReceiverExecutor() - val receiverInfo = new HashMap[Int, ActorRef] with SynchronizedMap[Int, ActorRef] - val receivedBlockInfo = new HashMap[Int, SynchronizedQueue[ReceivedBlockInfo]] - with SynchronizedMap[Int, SynchronizedQueue[ReceivedBlockInfo]] - val timeout = AkkaUtils.askTimeout(ssc.conf) - val listenerBus = ssc.scheduler.listenerBus - - // actor is created when generator starts. - // This not being null means the tracker has been started and not stopped - var actor: ActorRef = null - var currentTime: Time = null - - /** Start the actor and receiver execution thread. */ - def start() = synchronized { - if (actor != null) { - throw new SparkException("NetworkInputTracker already started") - } - - if (!networkInputStreams.isEmpty) { - actor = ssc.env.actorSystem.actorOf(Props(new NetworkInputTrackerActor), - "NetworkInputTracker") - receiverExecutor.start() - logInfo("NetworkInputTracker started") - } - } - - /** Stop the receiver execution thread. */ - def stop() = synchronized { - if (!networkInputStreams.isEmpty && actor != null) { - // First, stop the receivers - receiverExecutor.stop() - - // Finally, stop the actor - ssc.env.actorSystem.stop(actor) - actor = null - logInfo("NetworkInputTracker stopped") - } - } - - /** Return all the blocks received from a receiver. */ - def getReceivedBlockInfo(streamId: Int): Array[ReceivedBlockInfo] = { - val receivedBlockInfo = getReceivedBlockInfoQueue(streamId).dequeueAll(x => true) - logInfo("Stream " + streamId + " received " + receivedBlockInfo.size + " blocks") - receivedBlockInfo.toArray - } - - private def getReceivedBlockInfoQueue(streamId: Int) = { - receivedBlockInfo.getOrElseUpdate(streamId, new SynchronizedQueue[ReceivedBlockInfo]) - } - - /** Register a receiver */ - def registerReceiver( - streamId: Int, - typ: String, - host: String, - receiverActor: ActorRef, - sender: ActorRef - ) { - if (!networkInputStreamMap.contains(streamId)) { - throw new Exception("Register received for unexpected id " + streamId) - } - receiverInfo += ((streamId, receiverActor)) - ssc.scheduler.listenerBus.post(StreamingListenerReceiverStarted( - ReceiverInfo(streamId, typ, host) - )) - logInfo("Registered receiver for network stream " + streamId + " from " + sender.path.address) - } - - /** Deregister a receiver */ - def deregisterReceiver(streamId: Int, message: String) { - receiverInfo -= streamId - logError("Deregistered receiver for network stream " + streamId + " with message:\n" + message) - } - - /** Add new blocks for the given stream */ - def addBlocks(receivedBlockInfo: ReceivedBlockInfo) { - getReceivedBlockInfoQueue(receivedBlockInfo.streamId) += receivedBlockInfo - logDebug("Stream " + receivedBlockInfo.streamId + " received new blocks: " + - receivedBlockInfo.blockId) - } - - /** Check if any blocks are left to be processed */ - def hasMoreReceivedBlockIds: Boolean = { - !receivedBlockInfo.values.forall(_.isEmpty) - } - - /** Actor to receive messages from the receivers. */ - private class NetworkInputTrackerActor extends Actor { - def receive = { - case RegisterReceiver(streamId, typ, host, receiverActor) => - registerReceiver(streamId, typ, host, receiverActor, sender) - sender ! true - case AddBlock(receivedBlockInfo) => - addBlocks(receivedBlockInfo) - case DeregisterReceiver(streamId, message) => - deregisterReceiver(streamId, message) - sender ! true - } - } - - /** This thread class runs all the receivers on the cluster. */ - class ReceiverExecutor { - @transient val env = ssc.env - @transient val thread = new Thread() { - override def run() { - try { - SparkEnv.set(env) - startReceivers() - } catch { - case ie: InterruptedException => logInfo("ReceiverExecutor interrupted") - } - } - } - - def start() { - thread.start() - } - - def stop() { - // Send the stop signal to all the receivers - stopReceivers() - - // Wait for the Spark job that runs the receivers to be over - // That is, for the receivers to quit gracefully. - thread.join(10000) - - // Check if all the receivers have been deregistered or not - if (!receiverInfo.isEmpty) { - logWarning("All of the receivers have not deregistered, " + receiverInfo) - } else { - logInfo("All of the receivers have deregistered successfully") - } - } - - /** - * Get the receivers from the NetworkInputDStreams, distributes them to the - * worker nodes as a parallel collection, and runs them. - */ - private def startReceivers() { - val receivers = networkInputStreams.map(nis => { - val rcvr = nis.getReceiver() - rcvr.setStreamId(nis.id) - rcvr - }) - - // Right now, we only honor preferences if all receivers have them - val hasLocationPreferences = receivers.map(_.getLocationPreference().isDefined) - .reduce(_ && _) - - // Create the parallel collection of receivers to distributed them on the worker nodes - val tempRDD = - if (hasLocationPreferences) { - val receiversWithPreferences = - receivers.map(r => (r, Seq(r.getLocationPreference().toString))) - ssc.sc.makeRDD[NetworkReceiver[_]](receiversWithPreferences) - } - else { - ssc.sc.makeRDD(receivers, receivers.size) - } - - // Function to start the receiver on the worker node - val startReceiver = (iterator: Iterator[NetworkReceiver[_]]) => { - if (!iterator.hasNext) { - throw new Exception("Could not start receiver as details not found.") - } - iterator.next().start() - } - // Run the dummy Spark job to ensure that all slaves have registered. - // This avoids all the receivers to be scheduled on the same node. - if (!ssc.sparkContext.isLocal) { - ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect() - } - - // Distribute the receivers and start them - logInfo("Starting " + receivers.length + " receivers") - ssc.sparkContext.runJob(tempRDD, startReceiver) - logInfo("All of the receivers have been terminated") - } - - /** Stops the receivers. */ - private def stopReceivers() { - // Signal the receivers to stop - receiverInfo.values.foreach(_ ! StopReceiver) - logInfo("Sent stop signal to all " + receiverInfo.size + " receivers") - } - } -} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala new file mode 100644 index 0000000000..3d2537f6f2 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.scheduler + +import scala.collection.mutable.{HashMap, SynchronizedMap, SynchronizedQueue} +import scala.language.existentials + +import akka.actor._ +import org.apache.spark.{Logging, SparkEnv, SparkException} +import org.apache.spark.SparkContext._ +import org.apache.spark.storage.StreamBlockId +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.receiver.{Receiver, ReceiverSupervisorImpl, StopReceiver} +import org.apache.spark.util.AkkaUtils + +/** Information about receiver */ +case class ReceiverInfo(streamId: Int, typ: String, location: String) { + override def toString = s"$typ-$streamId" +} + +/** Information about blocks received by the receiver */ +case class ReceivedBlockInfo( + streamId: Int, + blockId: StreamBlockId, + numRecords: Long, + metadata: Any + ) + +/** + * Messages used by the NetworkReceiver and the ReceiverTracker to communicate + * with each other. + */ +private[streaming] sealed trait ReceiverTrackerMessage +private[streaming] case class RegisterReceiver( + streamId: Int, + typ: String, + host: String, + receiverActor: ActorRef + ) extends ReceiverTrackerMessage +private[streaming] case class AddBlock(receivedBlockInfo: ReceivedBlockInfo) + extends ReceiverTrackerMessage +private[streaming] case class ReportError(streamId: Int, message: String, error: String) +private[streaming] case class DeregisterReceiver(streamId: Int, msg: String, error: String) + extends ReceiverTrackerMessage + +/** + * This class manages the execution of the receivers of NetworkInputDStreams. Instance of + * this class must be created after all input streams have been added and StreamingContext.start() + * has been called because it needs the final set of input streams at the time of instantiation. + */ +private[streaming] +class ReceiverTracker(ssc: StreamingContext) extends Logging { + + val receiverInputStreams = ssc.graph.getReceiverInputStreams() + val receiverInputStreamMap = Map(receiverInputStreams.map(x => (x.id, x)): _*) + val receiverExecutor = new ReceiverLauncher() + val receiverInfo = new HashMap[Int, ActorRef] with SynchronizedMap[Int, ActorRef] + val receivedBlockInfo = new HashMap[Int, SynchronizedQueue[ReceivedBlockInfo]] + with SynchronizedMap[Int, SynchronizedQueue[ReceivedBlockInfo]] + val timeout = AkkaUtils.askTimeout(ssc.conf) + val listenerBus = ssc.scheduler.listenerBus + + // actor is created when generator starts. + // This not being null means the tracker has been started and not stopped + var actor: ActorRef = null + var currentTime: Time = null + + /** Start the actor and receiver execution thread. */ + def start() = synchronized { + if (actor != null) { + throw new SparkException("ReceiverTracker already started") + } + + if (!receiverInputStreams.isEmpty) { + actor = ssc.env.actorSystem.actorOf(Props(new ReceiverTrackerActor), + "ReceiverTracker") + receiverExecutor.start() + logInfo("ReceiverTracker started") + } + } + + /** Stop the receiver execution thread. */ + def stop() = synchronized { + if (!receiverInputStreams.isEmpty && actor != null) { + // First, stop the receivers + receiverExecutor.stop() + + // Finally, stop the actor + ssc.env.actorSystem.stop(actor) + actor = null + logInfo("ReceiverTracker stopped") + } + } + + /** Return all the blocks received from a receiver. */ + def getReceivedBlockInfo(streamId: Int): Array[ReceivedBlockInfo] = { + val receivedBlockInfo = getReceivedBlockInfoQueue(streamId).dequeueAll(x => true) + logInfo("Stream " + streamId + " received " + receivedBlockInfo.size + " blocks") + receivedBlockInfo.toArray + } + + private def getReceivedBlockInfoQueue(streamId: Int) = { + receivedBlockInfo.getOrElseUpdate(streamId, new SynchronizedQueue[ReceivedBlockInfo]) + } + + /** Register a receiver */ + def registerReceiver( + streamId: Int, + typ: String, + host: String, + receiverActor: ActorRef, + sender: ActorRef + ) { + if (!receiverInputStreamMap.contains(streamId)) { + throw new Exception("Register received for unexpected id " + streamId) + } + receiverInfo += ((streamId, receiverActor)) + ssc.scheduler.listenerBus.post(StreamingListenerReceiverStarted( + ReceiverInfo(streamId, typ, host) + )) + logInfo("Registered receiver for stream " + streamId + " from " + sender.path.address) + } + + /** Deregister a receiver */ + def deregisterReceiver(streamId: Int, message: String, error: String) { + receiverInfo -= streamId + ssc.scheduler.listenerBus.post(StreamingListenerReceiverStopped(streamId, message, error)) + val messageWithError = if (error != null && !error.isEmpty) { + s"$message - $error" + } else { + s"$message" + } + logError(s"Deregistered receiver for stream $streamId: $messageWithError") + } + + /** Add new blocks for the given stream */ + def addBlocks(receivedBlockInfo: ReceivedBlockInfo) { + getReceivedBlockInfoQueue(receivedBlockInfo.streamId) += receivedBlockInfo + logDebug("Stream " + receivedBlockInfo.streamId + " received new blocks: " + + receivedBlockInfo.blockId) + } + + /** Report error sent by a receiver */ + def reportError(streamId: Int, message: String, error: String) { + ssc.scheduler.listenerBus.post(StreamingListenerReceiverError(streamId, message, error)) + val messageWithError = if (error != null && !error.isEmpty) { + s"$message - $error" + } else { + s"$message" + } + logWarning(s"Error reported by receiver for stream $streamId: $messageWithError") + } + + /** Check if any blocks are left to be processed */ + def hasMoreReceivedBlockIds: Boolean = { + !receivedBlockInfo.values.forall(_.isEmpty) + } + + /** Actor to receive messages from the receivers. */ + private class ReceiverTrackerActor extends Actor { + def receive = { + case RegisterReceiver(streamId, typ, host, receiverActor) => + registerReceiver(streamId, typ, host, receiverActor, sender) + sender ! true + case AddBlock(receivedBlockInfo) => + addBlocks(receivedBlockInfo) + case ReportError(streamId, message, error) => + reportError(streamId, message, error) + case DeregisterReceiver(streamId, message, error) => + deregisterReceiver(streamId, message, error) + sender ! true + } + } + + /** This thread class runs all the receivers on the cluster. */ + class ReceiverLauncher { + @transient val env = ssc.env + @transient val thread = new Thread() { + override def run() { + try { + SparkEnv.set(env) + startReceivers() + } catch { + case ie: InterruptedException => logInfo("ReceiverLauncher interrupted") + } + } + } + + def start() { + thread.start() + } + + def stop() { + // Send the stop signal to all the receivers + stopReceivers() + + // Wait for the Spark job that runs the receivers to be over + // That is, for the receivers to quit gracefully. + thread.join(10000) + + // Check if all the receivers have been deregistered or not + if (!receiverInfo.isEmpty) { + logWarning("All of the receivers have not deregistered, " + receiverInfo) + } else { + logInfo("All of the receivers have deregistered successfully") + } + } + + /** + * Get the receivers from the ReceiverInputDStreams, distributes them to the + * worker nodes as a parallel collection, and runs them. + */ + private def startReceivers() { + val receivers = receiverInputStreams.map(nis => { + val rcvr = nis.getReceiver() + rcvr.setReceiverId(nis.id) + rcvr + }) + + // Right now, we only honor preferences if all receivers have them + val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _) + + // Create the parallel collection of receivers to distributed them on the worker nodes + val tempRDD = + if (hasLocationPreferences) { + val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get))) + ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences) + } + else { + ssc.sc.makeRDD(receivers, receivers.size) + } + + // Function to start the receiver on the worker node + val startReceiver = (iterator: Iterator[Receiver[_]]) => { + if (!iterator.hasNext) { + throw new SparkException( + "Could not start receiver as object not found.") + } + val receiver = iterator.next() + val executor = new ReceiverSupervisorImpl(receiver, SparkEnv.get) + executor.start() + executor.awaitTermination() + } + // Run the dummy Spark job to ensure that all slaves have registered. + // This avoids all the receivers to be scheduled on the same node. + if (!ssc.sparkContext.isLocal) { + ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect() + } + + // Distribute the receivers and start them + logInfo("Starting " + receivers.length + " receivers") + ssc.sparkContext.runJob(tempRDD, startReceiver) + logInfo("All of the receivers have been terminated") + } + + /** Stops the receivers. */ + private def stopReceivers() { + // Signal the receivers to stop + receiverInfo.values.foreach(_ ! StopReceiver) + logInfo("Sent stop signal to all " + receiverInfo.size + " receivers") + } + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala index 5db40ebbeb..9d6ec1fa33 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala @@ -18,6 +18,7 @@ package org.apache.spark.streaming.scheduler import scala.collection.mutable.Queue + import org.apache.spark.util.Distribution /** Base trait for events related to StreamingListener */ @@ -26,8 +27,13 @@ sealed trait StreamingListenerEvent case class StreamingListenerBatchSubmitted(batchInfo: BatchInfo) extends StreamingListenerEvent case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent + case class StreamingListenerReceiverStarted(receiverInfo: ReceiverInfo) extends StreamingListenerEvent +case class StreamingListenerReceiverError(streamId: Int, message: String, error: String) + extends StreamingListenerEvent +case class StreamingListenerReceiverStopped(streamId: Int, message: String, error: String) + extends StreamingListenerEvent /** An event used in the listener to shutdown the listener daemon thread. */ private[scheduler] case object StreamingListenerShutdown extends StreamingListenerEvent @@ -41,14 +47,20 @@ trait StreamingListener { /** Called when a receiver has been started */ def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { } + /** Called when a receiver has reported an error */ + def onReceiverError(receiverError: StreamingListenerReceiverError) { } + + /** Called when a receiver has been stopped */ + def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { } + /** Called when a batch of jobs has been submitted for processing. */ def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { } - /** Called when processing of a batch of jobs has completed. */ - def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { } - /** Called when processing of a batch of jobs has started. */ def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { } + + /** Called when processing of a batch of jobs has completed. */ + def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala index ea03dfc7bf..398724d9e8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala @@ -40,6 +40,10 @@ private[spark] class StreamingListenerBus() extends Logging { event match { case receiverStarted: StreamingListenerReceiverStarted => listeners.foreach(_.onReceiverStarted(receiverStarted)) + case receiverError: StreamingListenerReceiverError => + listeners.foreach(_.onReceiverError(receiverError)) + case receiverStopped: StreamingListenerReceiverStopped => + listeners.foreach(_.onReceiverStopped(receiverStopped)) case batchSubmitted: StreamingListenerBatchSubmitted => listeners.foreach(_.onBatchSubmitted(batchSubmitted)) case batchStarted: StreamingListenerBatchStarted => diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala index 8b025b09ed..bf637c1446 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala @@ -62,8 +62,8 @@ private[ui] class StreamingJobProgressListener(ssc: StreamingContext) extends St totalCompletedBatches += 1L } - def numNetworkReceivers = synchronized { - ssc.graph.getNetworkInputStreams().size + def numReceivers = synchronized { + ssc.graph.getReceiverInputStreams().size } def numTotalCompletedBatches: Long = synchronized { @@ -101,7 +101,7 @@ private[ui] class StreamingJobProgressListener(ssc: StreamingContext) extends St def receivedRecordsDistributions: Map[Int, Option[Distribution]] = synchronized { val latestBatchInfos = retainedBatches.reverse.take(batchInfoLimit) val latestBlockInfos = latestBatchInfos.map(_.receivedBlockInfo) - (0 until numNetworkReceivers).map { receiverId => + (0 until numReceivers).map { receiverId => val blockInfoOfParticularReceiver = latestBlockInfos.map { batchInfo => batchInfo.get(receiverId).getOrElse(Array.empty) } @@ -117,11 +117,11 @@ private[ui] class StreamingJobProgressListener(ssc: StreamingContext) extends St def lastReceivedBatchRecords: Map[Int, Long] = { val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.receivedBlockInfo) lastReceivedBlockInfoOption.map { lastReceivedBlockInfo => - (0 until numNetworkReceivers).map { receiverId => + (0 until numReceivers).map { receiverId => (receiverId, lastReceivedBlockInfo(receiverId).map(_.numRecords).sum) }.toMap }.getOrElse { - (0 until numNetworkReceivers).map(receiverId => (receiverId, 0L)).toMap + (0 until numReceivers).map(receiverId => (receiverId, 0L)).toMap } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index 6607437db5..8fe1219356 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -40,7 +40,7 @@ private[ui] class StreamingPage(parent: StreamingTab) val content = generateBasicStats() ++

++

Statistics over last {listener.retainedCompletedBatches.size} processed batches

++ - generateNetworkStatsTable() ++ + generateReceiverStats() ++ generateBatchStatsTable() UIUtils.headerSparkPage( content, parent.basePath, parent.appName, "Streaming", parent.headerTabs, parent, Some(5000)) @@ -57,7 +57,7 @@ private[ui] class StreamingPage(parent: StreamingTab) Time since start: {formatDurationVerbose(timeSinceStart)}
  • - Network receivers: {listener.numNetworkReceivers} + Network receivers: {listener.numReceivers}
  • Batch interval: {formatDurationVerbose(listener.batchDuration)} @@ -71,8 +71,8 @@ private[ui] class StreamingPage(parent: StreamingTab) } - /** Generate stats of data received over the network the streaming program */ - private def generateNetworkStatsTable(): Seq[Node] = { + /** Generate stats of data received by the receivers in the streaming program */ + private def generateReceiverStats(): Seq[Node] = { val receivedRecordDistributions = listener.receivedRecordsDistributions val lastBatchReceivedRecord = listener.lastReceivedBatchRecords val table = if (receivedRecordDistributions.size > 0) { @@ -86,13 +86,13 @@ private[ui] class StreamingPage(parent: StreamingTab) "75th percentile rate\n[records/sec]", "Maximum rate\n[records/sec]" ) - val dataRows = (0 until listener.numNetworkReceivers).map { receiverId => + val dataRows = (0 until listener.numReceivers).map { receiverId => val receiverInfo = listener.receiverInfo(receiverId) val receiverName = receiverInfo.map(_.toString).getOrElse(s"Receiver-$receiverId") val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCell) - val receiverLastBatchRecords = formatDurationVerbose(lastBatchReceivedRecord(receiverId)) + val receiverLastBatchRecords = formatNumber(lastBatchReceivedRecord(receiverId)) val receivedRecordStats = receivedRecordDistributions(receiverId).map { d => - d.getQuantiles().map(r => formatDurationVerbose(r.toLong)) + d.getQuantiles().map(r => formatNumber(r.toLong)) }.getOrElse { Seq(emptyCell, emptyCell, emptyCell, emptyCell, emptyCell) } @@ -104,8 +104,8 @@ private[ui] class StreamingPage(parent: StreamingTab) } val content = -
    Network Input Statistics
    ++ -
    {table.getOrElse("No network receivers")}
    +
    Receiver Statistics
    ++ +
    {table.getOrElse("No receivers")}
    content } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala index e016377c94..1a616a0434 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala @@ -77,7 +77,9 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: def stop(interruptTimer: Boolean): Long = synchronized { if (!stopped) { stopped = true - if (interruptTimer) thread.interrupt() + if (interruptTimer) { + thread.interrupt() + } thread.join() logInfo("Stopped timer for " + name + " after time " + prevTime) } diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index a0b1bbc34f..f9bfb9b744 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -17,6 +17,7 @@ package org.apache.spark.streaming; +import org.apache.spark.streaming.api.java.*; import scala.Tuple2; import org.junit.Assert; @@ -36,10 +37,6 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.*; import org.apache.spark.storage.StorageLevel; -import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.api.java.JavaDStreamLike; -import org.apache.spark.streaming.api.java.JavaPairDStream; -import org.apache.spark.streaming.api.java.JavaStreamingContext; // The test suite itself is Serializable so that anonymous Function implementations can be // serialized, as an alternative to converting these anonymous classes to static inner classes; @@ -1668,7 +1665,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa // InputStream functionality is deferred to the existing Scala tests. @Test public void testSocketTextStream() { - JavaDStream test = ssc.socketTextStream("localhost", 12345); + JavaReceiverInputDStream test = ssc.socketTextStream("localhost", 12345); } @Test @@ -1701,6 +1698,6 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa @Test public void testRawSocketStream() { - JavaDStream test = ssc.rawSocketStream("localhost", 12345); + JavaReceiverInputDStream test = ssc.rawSocketStream("localhost", 12345); } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 952511d411..46b7f63b65 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -36,10 +36,9 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.Logging import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.dstream.NetworkReceiver -import org.apache.spark.streaming.receivers.Receiver import org.apache.spark.streaming.util.ManualClock import org.apache.spark.util.Utils +import org.apache.spark.streaming.receiver.{ActorHelper, Receiver} class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { @@ -207,7 +206,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // set up the network stream using the test receiver val ssc = new StreamingContext(conf, batchDuration) - val networkStream = ssc.networkStream[Int](testReceiver) + val networkStream = ssc.receiverStream[Int](testReceiver) val countStream = networkStream.count val outputBuffer = new ArrayBuffer[Seq[Long]] with SynchronizedBuffer[Seq[Long]] val outputStream = new TestOutputStream(countStream, outputBuffer) @@ -301,7 +300,7 @@ object TestServer { } /** This is an actor for testing actor input stream */ -class TestActor(port: Int) extends Actor with Receiver { +class TestActor(port: Int) extends Actor with ActorHelper { def bytesToString(byteString: ByteString) = byteString.utf8String @@ -309,24 +308,22 @@ class TestActor(port: Int) extends Actor with Receiver { def receive = { case IO.Read(socket, bytes) => - pushBlock(bytesToString(bytes)) + store(bytesToString(bytes)) } } /** This is a receiver to test multiple threads inserting data using block generator */ class MultiThreadTestReceiver(numThreads: Int, numRecordsPerThread: Int) - extends NetworkReceiver[Int] { + extends Receiver[Int](StorageLevel.MEMORY_ONLY_SER) with Logging { lazy val executorPool = Executors.newFixedThreadPool(numThreads) - lazy val blockGenerator = new BlockGenerator(StorageLevel.MEMORY_ONLY) lazy val finishCount = new AtomicInteger(0) - protected def onStart() { - blockGenerator.start() + def onStart() { (1 to numThreads).map(threadId => { val runnable = new Runnable { def run() { (1 to numRecordsPerThread).foreach(i => - blockGenerator += (threadId * numRecordsPerThread + i) ) + store(threadId * numRecordsPerThread + i) ) if (finishCount.incrementAndGet == numThreads) { MultiThreadTestReceiver.haveAllThreadsFinished = true } @@ -337,7 +334,7 @@ class MultiThreadTestReceiver(numThreads: Int, numRecordsPerThread: Int) }) } - protected def onStop() { + def onStop() { executorPool.shutdown() } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala new file mode 100644 index 0000000000..5c0415ad14 --- /dev/null +++ b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +import java.nio.ByteBuffer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.SparkConf +import org.apache.spark.storage.{StorageLevel, StreamBlockId} +import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver, ReceiverSupervisor} +import org.scalatest.FunSuite +import org.scalatest.concurrent.Timeouts +import org.scalatest.concurrent.Eventually._ +import org.scalatest.time.SpanSugar._ + +/** Testsuite for testing the network receiver behavior */ +class NetworkReceiverSuite extends FunSuite with Timeouts { + + test("network receiver life cycle") { + + val receiver = new FakeReceiver + val executor = new FakeReceiverSupervisor(receiver) + + assert(executor.isAllEmpty) + + // Thread that runs the executor + val executingThread = new Thread() { + override def run() { + executor.start() + executor.awaitTermination() + } + } + + // Start the receiver + executingThread.start() + + // Verify that the receiver + intercept[Exception] { + failAfter(200 millis) { + executingThread.join() + } + } + + // Verify that receiver was started + assert(receiver.onStartCalled) + assert(executor.isReceiverStarted) + assert(receiver.isStarted) + assert(!receiver.isStopped()) + assert(receiver.otherThread.isAlive) + eventually(timeout(100 millis), interval(10 millis)) { + assert(receiver.receiving) + } + + // Verify whether the data stored by the receiver was sent to the executor + val byteBuffer = ByteBuffer.allocate(100) + val arrayBuffer = new ArrayBuffer[Int]() + val iterator = arrayBuffer.iterator + receiver.store(1) + receiver.store(byteBuffer) + receiver.store(arrayBuffer) + receiver.store(iterator) + assert(executor.singles.size === 1) + assert(executor.singles.head === 1) + assert(executor.byteBuffers.size === 1) + assert(executor.byteBuffers.head.eq(byteBuffer)) + assert(executor.iterators.size === 1) + assert(executor.iterators.head.eq(iterator)) + assert(executor.arrayBuffers.size === 1) + assert(executor.arrayBuffers.head.eq(arrayBuffer)) + + // Verify whether the exceptions reported by the receiver was sent to the executor + val exception = new Exception + receiver.reportError("Error", exception) + assert(executor.errors.size === 1) + assert(executor.errors.head.eq(exception)) + + // Verify restarting actually stops and starts the receiver + receiver.restart("restarting", null, 100) + assert(receiver.isStopped) + assert(receiver.onStopCalled) + eventually(timeout(1000 millis), interval(100 millis)) { + assert(receiver.onStartCalled) + assert(executor.isReceiverStarted) + assert(receiver.isStarted) + assert(!receiver.isStopped) + assert(receiver.receiving) + } + + // Verify that stopping actually stops the thread + failAfter(100 millis) { + receiver.stop("test") + assert(receiver.isStopped) + assert(!receiver.otherThread.isAlive) + + // The thread that started the executor should complete + // as stop() stops everything + executingThread.join() + } + } + + test("block generator") { + val blockGeneratorListener = new FakeBlockGeneratorListener + val blockInterval = 200 + val conf = new SparkConf().set("spark.streaming.blockInterval", blockInterval.toString) + val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf) + val expectedBlocks = 5 + val waitTime = expectedBlocks * blockInterval + (blockInterval / 2) + val generatedData = new ArrayBuffer[Int] + + // Generate blocks + val startTime = System.currentTimeMillis() + blockGenerator.start() + var count = 0 + while(System.currentTimeMillis - startTime < waitTime) { + blockGenerator += count + generatedData += count + count += 1 + Thread.sleep(10) + } + blockGenerator.stop() + + val recordedData = blockGeneratorListener.arrayBuffers.flatten + assert(blockGeneratorListener.arrayBuffers.size > 0) + assert(recordedData.toSet === generatedData.toSet) + } + + /** + * An implementation of NetworkReceiver that is used for testing a receiver's life cycle. + */ + class FakeReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) { + var otherThread: Thread = null + var receiving = false + var onStartCalled = false + var onStopCalled = false + + def onStart() { + otherThread = new Thread() { + override def run() { + receiving = true + while(!isStopped()) { + Thread.sleep(10) + } + } + } + onStartCalled = true + otherThread.start() + + } + + def onStop() { + onStopCalled = true + otherThread.join() + } + + def reset() { + receiving = false + onStartCalled = false + onStopCalled = false + } + } + + /** + * An implementation of NetworkReceiverExecutor used for testing a NetworkReceiver. + * Instead of storing the data in the BlockManager, it stores all the data in a local buffer + * that can used for verifying that the data has been forwarded correctly. + */ + class FakeReceiverSupervisor(receiver: FakeReceiver) + extends ReceiverSupervisor(receiver, new SparkConf()) { + val singles = new ArrayBuffer[Any] + val byteBuffers = new ArrayBuffer[ByteBuffer] + val iterators = new ArrayBuffer[Iterator[_]] + val arrayBuffers = new ArrayBuffer[ArrayBuffer[_]] + val errors = new ArrayBuffer[Throwable] + + /** Check if all data structures are clean */ + def isAllEmpty = { + singles.isEmpty && byteBuffers.isEmpty && iterators.isEmpty && + arrayBuffers.isEmpty && errors.isEmpty + } + + def pushSingle(data: Any) { + singles += data + } + + def pushBytes( + bytes: ByteBuffer, + optionalMetadata: Option[Any], + optionalBlockId: Option[StreamBlockId] + ) { + byteBuffers += bytes + } + + def pushIterator( + iterator: Iterator[_], + optionalMetadata: Option[Any], + optionalBlockId: Option[StreamBlockId] + ) { + iterators += iterator + } + + def pushArrayBuffer( + arrayBuffer: ArrayBuffer[_], + optionalMetadata: Option[Any], + optionalBlockId: Option[StreamBlockId] + ) { + arrayBuffers += arrayBuffer + } + + def reportError(message: String, throwable: Throwable) { + errors += throwable + } + } + + /** + * An implementation of BlockGeneratorListener that is used to test the BlockGenerator. + */ + class FakeBlockGeneratorListener(pushDelay: Long = 0) extends BlockGeneratorListener { + // buffer of data received as ArrayBuffers + val arrayBuffers = new ArrayBuffer[ArrayBuffer[Int]] + val errors = new ArrayBuffer[Throwable] + + def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) { + val bufferOfInts = arrayBuffer.map(_.asInstanceOf[Int]) + arrayBuffers += bufferOfInts + Thread.sleep(0) + } + + def onError(message: String, throwable: Throwable) { + errors += throwable + } + } +} + diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index ad5367ab94..6d14b1f785 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -21,7 +21,8 @@ import java.util.concurrent.atomic.AtomicInteger import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException} import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.dstream.{DStream, NetworkReceiver} +import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.util.{MetadataCleaner, Utils} import org.scalatest.{BeforeAndAfter, FunSuite} import org.scalatest.concurrent.Timeouts @@ -181,15 +182,15 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w conf.set("spark.cleaner.ttl", "3600") sc = new SparkContext(conf) for (i <- 1 to 4) { - logInfo("==================================") - ssc = new StreamingContext(sc, batchDuration) + logInfo("==================================\n\n\n") + ssc = new StreamingContext(sc, Milliseconds(100)) var runningCount = 0 TestReceiver.counter.set(1) val input = ssc.networkStream(new TestReceiver) input.count.foreachRDD(rdd => { val count = rdd.first() - logInfo("Count = " + count) runningCount += count.toInt + logInfo("Count = " + count + ", Running count = " + runningCount) }) ssc.start() ssc.awaitTermination(500) @@ -216,12 +217,12 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w ssc.start() } - // test whether waitForStop() exits after give amount of time + // test whether awaitTermination() exits after give amount of time failAfter(1000 millis) { ssc.awaitTermination(500) } - // test whether waitForStop() does not exit if not time is given + // test whether awaitTermination() does not exit if not time is given val exception = intercept[Exception] { failAfter(1000 millis) { ssc.awaitTermination() @@ -276,23 +277,26 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w class TestException(msg: String) extends Exception(msg) /** Custom receiver for testing whether all data received by a receiver gets processed or not */ -class TestReceiver extends NetworkReceiver[Int] { - protected lazy val blockGenerator = new BlockGenerator(StorageLevel.MEMORY_ONLY) - protected def onStart() { - blockGenerator.start() - logInfo("BlockGenerator started on thread " + receivingThread) - try { - while(true) { - blockGenerator += TestReceiver.counter.getAndIncrement - Thread.sleep(0) +class TestReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging { + + var receivingThreadOption: Option[Thread] = None + + def onStart() { + val thread = new Thread() { + override def run() { + logInfo("Receiving started") + while (!isStopped) { + store(TestReceiver.counter.getAndIncrement) + } + logInfo("Receiving stopped at count value of " + TestReceiver.counter.get()) } - } finally { - logInfo("Receiving stopped at count value of " + TestReceiver.counter.get()) } + receivingThreadOption = Some(thread) + thread.start() } - protected def onStop() { - blockGenerator.stop() + def onStop() { + // no cleanup to be done, the receiving thread should stop on it own } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index 9e0f2c900e..542c697ae3 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -17,10 +17,19 @@ package org.apache.spark.streaming -import org.apache.spark.streaming.scheduler._ import scala.collection.mutable.ArrayBuffer -import org.scalatest.matchers.ShouldMatchers +import scala.concurrent.Future +import scala.concurrent.ExecutionContext.Implicits.global + +import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.receiver.Receiver +import org.apache.spark.streaming.scheduler._ + +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.concurrent.Eventually._ +import org.scalatest.time.SpanSugar._ +import org.apache.spark.Logging class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers { @@ -32,7 +41,7 @@ class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers { override def batchDuration = Milliseconds(100) override def actuallyWait = true - test("basic BatchInfo generation") { + test("batch info reporting") { val ssc = setupStreams(input, operation) val collector = new BatchInfoCollector ssc.addStreamingListener(collector) @@ -54,6 +63,31 @@ class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers { isInIncreasingOrder(batchInfos.map(_.processingEndTime.get)) should be (true) } + test("receiver info reporting") { + val ssc = new StreamingContext("local[2]", "test", Milliseconds(1000)) + val inputStream = ssc.networkStream(new StreamingListenerSuiteReceiver) + inputStream.foreachRDD(_.count) + + val collector = new ReceiverInfoCollector + ssc.addStreamingListener(collector) + + ssc.start() + try { + eventually(timeout(1000 millis), interval(20 millis)) { + collector.startedReceiverInfo should have size 1 + collector.startedReceiverInfo(0).streamId should equal (0) + collector.stoppedReceiverStreamIds should have size 1 + collector.stoppedReceiverStreamIds(0) should equal (0) + collector.receiverErrors should have size 1 + collector.receiverErrors(0)._1 should equal (0) + collector.receiverErrors(0)._2 should include ("report error") + collector.receiverErrors(0)._3 should include ("report exception") + } + } finally { + ssc.stop() + } + } + /** Check if a sequence of numbers is in increasing order */ def isInIncreasingOrder(seq: Seq[Long]): Boolean = { for(i <- 1 until seq.size) { @@ -61,12 +95,46 @@ class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers { } true } +} + +/** Listener that collects information on processed batches */ +class BatchInfoCollector extends StreamingListener { + val batchInfos = new ArrayBuffer[BatchInfo] + override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { + batchInfos += batchCompleted.batchInfo + } +} + +/** Listener that collects information on processed batches */ +class ReceiverInfoCollector extends StreamingListener { + val startedReceiverInfo = new ArrayBuffer[ReceiverInfo] + val stoppedReceiverStreamIds = new ArrayBuffer[Int]() + val receiverErrors = new ArrayBuffer[(Int, String, String)]() + + override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { + startedReceiverInfo += receiverStarted.receiverInfo + } + + override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { + stoppedReceiverStreamIds += receiverStopped.streamId + } + + override def onReceiverError(receiverError: StreamingListenerReceiverError) { + receiverErrors += ((receiverError.streamId, receiverError.message, receiverError.error)) + } +} - /** Listener that collects information on processed batches */ - class BatchInfoCollector extends StreamingListener { - val batchInfos = new ArrayBuffer[BatchInfo] - override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { - batchInfos += batchCompleted.batchInfo +class StreamingListenerSuiteReceiver extends Receiver[Any](StorageLevel.MEMORY_ONLY) with Logging { + def onStart() { + Future { + logInfo("Started receiver and sleeping") + Thread.sleep(10) + logInfo("Reporting error and sleeping") + reportError("test report error", new Exception("test report exception")) + Thread.sleep(10) + logInfo("Stopping") + stop("test stop error") } } + def onStop() { } } -- cgit v1.2.3