From 0a2e33334125cb3ae5e54f8333ea5608779399fc Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sun, 13 Jan 2013 16:18:39 -0800 Subject: Removed stream id from the constructor of NetworkReceiver to make it easier for PluggableNetworkInputDStream. --- .../spark/streaming/NetworkInputTracker.scala | 34 +++++++++++++----- .../streaming/dstream/FlumeInputDStream.scala | 5 ++- .../streaming/dstream/KafkaInputDStream.scala | 6 ++-- .../streaming/dstream/NetworkInputDStream.scala | 42 ++++++++++++++++------ .../spark/streaming/dstream/RawInputDStream.scala | 6 ++-- .../streaming/dstream/SocketInputDStream.scala | 5 ++- 6 files changed, 68 insertions(+), 30 deletions(-) (limited to 'streaming/src') diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala index a6ab44271f..e4152f3a61 100644 --- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala +++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala @@ -18,7 +18,10 @@ private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: Act private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[String], metadata: Any) extends NetworkInputTrackerMessage private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage - +/** + * This class manages the execution of the receivers of NetworkInputDStreams. + */ +private[streaming] class NetworkInputTracker( @transient ssc: StreamingContext, @transient networkInputStreams: Array[NetworkInputDStream[_]]) @@ -32,16 +35,20 @@ class NetworkInputTracker( var currentTime: Time = null + /** Start the actor and receiver execution thread. */ def start() { ssc.env.actorSystem.actorOf(Props(new NetworkInputTrackerActor), "NetworkInputTracker") receiverExecutor.start() } + /** Stop the receiver execution thread. */ def stop() { + // TODO: stop the actor as well receiverExecutor.interrupt() receiverExecutor.stopReceivers() } + /** Return all the blocks received from a receiver. */ def getBlockIds(receiverId: Int, time: Time): Array[String] = synchronized { val queue = receivedBlockIds.synchronized { receivedBlockIds.getOrElse(receiverId, new Queue[String]()) @@ -53,6 +60,7 @@ class NetworkInputTracker( result.toArray } + /** Actor to receive messages from the receivers. */ private class NetworkInputTrackerActor extends Actor { def receive = { case RegisterReceiver(streamId, receiverActor) => { @@ -83,7 +91,8 @@ class NetworkInputTracker( } } } - + + /** This thread class runs all the receivers on the cluster. */ class ReceiverExecutor extends Thread { val env = ssc.env @@ -97,13 +106,22 @@ class NetworkInputTracker( stopReceivers() } } - + + /** + * Get the receivers from the NetworkInputDStreams, distributes them to the + * worker nodes as a parallel collection, and runs them. + */ def startReceivers() { - val receivers = networkInputStreams.map(_.createReceiver()) + val receivers = networkInputStreams.map(nis => { + val rcvr = nis.createReceiver() + rcvr.setStreamId(nis.id) + rcvr + }) // Right now, we only honor preferences if all receivers have them val hasLocationPreferences = receivers.map(_.getLocationPreference().isDefined).reduce(_ && _) + // Create the parallel collection of receivers to distributed them on the worker nodes val tempRDD = if (hasLocationPreferences) { val receiversWithPreferences = receivers.map(r => (r, Seq(r.getLocationPreference().toString))) @@ -113,21 +131,21 @@ class NetworkInputTracker( ssc.sc.makeRDD(receivers, receivers.size) } + // Function to start the receiver on the worker node val startReceiver = (iterator: Iterator[NetworkReceiver[_]]) => { if (!iterator.hasNext) { throw new Exception("Could not start receiver as details not found.") } iterator.next().start() } + // Distribute the receivers and start them ssc.sc.runJob(tempRDD, startReceiver) } + /** Stops the receivers. */ def stopReceivers() { - //implicit val ec = env.actorSystem.dispatcher + // Signal the receivers to stop receiverInfo.values.foreach(_ ! StopReceiver) - //val listOfFutures = receiverInfo.values.map(_.ask(StopReceiver)(timeout)).toList - //val futureOfList = Future.sequence(listOfFutures) - //Await.result(futureOfList, timeout) } } } diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala index ca70e72e56..efc7058480 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala @@ -26,7 +26,7 @@ class FlumeInputDStream[T: ClassManifest]( ) extends NetworkInputDStream[SparkFlumeEvent](ssc_) { override def createReceiver(): NetworkReceiver[SparkFlumeEvent] = { - new FlumeReceiver(id, host, port, storageLevel) + new FlumeReceiver(host, port, storageLevel) } } @@ -112,11 +112,10 @@ class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol { * Flume Avro interface.*/ private[streaming] class FlumeReceiver( - streamId: Int, host: String, port: Int, storageLevel: StorageLevel - ) extends NetworkReceiver[SparkFlumeEvent](streamId) { + ) extends NetworkReceiver[SparkFlumeEvent] { lazy val blockGenerator = new BlockGenerator(storageLevel) diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala index 25988a2ce7..2b4740bdf7 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala @@ -96,15 +96,15 @@ class KafkaInputDStream[T: ClassManifest]( } */ def createReceiver(): NetworkReceiver[T] = { - new KafkaReceiver(id, host, port, groupId, topics, initialOffsets, storageLevel) + new KafkaReceiver(host, port, groupId, topics, initialOffsets, storageLevel) .asInstanceOf[NetworkReceiver[T]] } } private[streaming] -class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String, +class KafkaReceiver(host: String, port: Int, groupId: String, topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long], - storageLevel: StorageLevel) extends NetworkReceiver[Any](streamId) { + storageLevel: StorageLevel) extends NetworkReceiver[Any] { // Timeout for establishing a connection to Zookeper in ms. val ZK_TIMEOUT = 10000 diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala index 18e62a0e33..aa6be95f30 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala @@ -17,6 +17,15 @@ import akka.util.duration._ import spark.streaming.util.{RecurringTimer, SystemClock} import java.util.concurrent.ArrayBlockingQueue +/** + * Abstract class for defining any InputDStream that has to start a receiver on worker + * nodes to receive external data. Specific implementations of NetworkInputDStream must + * define the createReceiver() function that creates the receiver object of type + * [[spark.streaming.dstream.NetworkReceiver]] that will be sent to the workers to receive + * data. + * @param ssc_ Streaming context that will execute this input stream + * @tparam T Class type of the object of this stream + */ abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : StreamingContext) extends InputDStream[T](ssc_) { @@ -25,7 +34,7 @@ abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : Streaming val id = ssc.getNewNetworkStreamId() /** - * This method creates the receiver object that will be sent to the workers + * Creates the receiver object that will be sent to the worker nodes * to receive data. This method needs to defined by any specific implementation * of a NetworkInputDStream. */ @@ -48,7 +57,11 @@ private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverM private[streaming] case class ReportBlock(blockId: String, metadata: Any) extends NetworkReceiverMessage private[streaming] case class ReportError(msg: String) extends NetworkReceiverMessage -abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Serializable with Logging { +/** + * Abstract class of a receiver that can be run on worker nodes to receive external data. See + * [[spark.streaming.dstream.NetworkInputDStream]] for an explanation. + */ +abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Logging { initLogging() @@ -59,17 +72,22 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri lazy protected val receivingThread = Thread.currentThread() - /** This method will be called to start receiving data. */ + protected var streamId: Int = -1 + + /** + * This method will be called to start receiving data. All your receiver + * starting code should be implemented by defining this function. + */ protected def onStart() /** This method will be called to stop receiving data. */ protected def onStop() - /** This method conveys a placement preference (hostname) for this receiver. */ + /** Conveys a placement preference (hostname) for this receiver. */ def getLocationPreference() : Option[String] = None /** - * This method starts the receiver. First is accesses all the lazy members to + * Starts the receiver. First is accesses all the lazy members to * materialize them. Then it calls the user-defined onStart() method to start * other threads, etc required to receiver the data. */ @@ -92,7 +110,7 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri } /** - * This method stops the receiver. First it interrupts the main receiving thread, + * Stops the receiver. First it interrupts the main receiving thread, * that is, the thread that called receiver.start(). Then it calls the user-defined * onStop() method to stop other threads and/or do cleanup. */ @@ -103,7 +121,7 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri } /** - * This method stops the receiver and reports to exception to the tracker. + * Stops the receiver and reports to exception to the tracker. * This should be called whenever an exception has happened on any thread * of the receiver. */ @@ -115,7 +133,7 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri /** - * This method pushes a block (as iterator of values) into the block manager. + * Pushes a block (as iterator of values) into the block manager. */ def pushBlock(blockId: String, iterator: Iterator[T], metadata: Any, level: StorageLevel) { val buffer = new ArrayBuffer[T] ++ iterator @@ -125,7 +143,7 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri } /** - * This method pushes a block (as bytes) into the block manager. + * Pushes a block (as bytes) into the block manager. */ def pushBlock(blockId: String, bytes: ByteBuffer, metadata: Any, level: StorageLevel) { env.blockManager.putBytes(blockId, bytes, level) @@ -157,6 +175,10 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri } } + protected[streaming] def setStreamId(id: Int) { + streamId = id + } + /** * Batches objects created by a [[spark.streaming.NetworkReceiver]] and puts them into * appropriately named blocks at regular intervals. This class starts two threads, @@ -202,7 +224,7 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[T] if (newBlockBuffer.size > 0) { - val blockId = "input-" + NetworkReceiver.this.streamId + "- " + (time - blockInterval) + val blockId = "input-" + NetworkReceiver.this.streamId + "-" + (time - blockInterval) val newBlock = createBlock(blockId, newBlockBuffer.toIterator) blocksForPushing.add(newBlock) } diff --git a/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala index aa2f31cea8..290fab1ce0 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala @@ -26,13 +26,13 @@ class RawInputDStream[T: ClassManifest]( ) extends NetworkInputDStream[T](ssc_ ) with Logging { def createReceiver(): NetworkReceiver[T] = { - new RawNetworkReceiver(id, host, port, storageLevel).asInstanceOf[NetworkReceiver[T]] + new RawNetworkReceiver(host, port, storageLevel).asInstanceOf[NetworkReceiver[T]] } } private[streaming] -class RawNetworkReceiver(streamId: Int, host: String, port: Int, storageLevel: StorageLevel) - extends NetworkReceiver[Any](streamId) { +class RawNetworkReceiver(host: String, port: Int, storageLevel: StorageLevel) + extends NetworkReceiver[Any] { var blockPushingThread: Thread = null diff --git a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala index 8e4b20ea4c..d42027092b 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala @@ -16,18 +16,17 @@ class SocketInputDStream[T: ClassManifest]( ) extends NetworkInputDStream[T](ssc_) { def createReceiver(): NetworkReceiver[T] = { - new SocketReceiver(id, host, port, bytesToObjects, storageLevel) + new SocketReceiver(host, port, bytesToObjects, storageLevel) } } private[streaming] class SocketReceiver[T: ClassManifest]( - streamId: Int, host: String, port: Int, bytesToObjects: InputStream => Iterator[T], storageLevel: StorageLevel - ) extends NetworkReceiver[T](streamId) { + ) extends NetworkReceiver[T] { lazy protected val blockGenerator = new BlockGenerator(storageLevel) -- cgit v1.2.3