diff options
6 files changed, 396 insertions, 0 deletions
diff --git a/docs/plugin-custom-receiver.md b/docs/plugin-custom-receiver.md new file mode 100644 index 0000000000..41e6a17e2c --- /dev/null +++ b/docs/plugin-custom-receiver.md @@ -0,0 +1,101 @@ +--- +layout: global +title: Tutorial - Spark streaming, Plugging in a custom receiver. +--- + +A "Spark streaming" receiver can be a simple network stream, streams of messages from a message queue, files etc. A receiver can also assume roles more than just receiving data like filtering, preprocessing, to name a few of the possibilities. The api to plug-in any user defined custom receiver is thus provided to encourage development of receivers which may be well suited to ones specific need. + +This guide shows the programming model and features by walking through a simple sample receiver and corresponding Spark Streaming application. + + +## A quick and naive walk-through + +### Write a simple receiver + +This starts with implementing [Actor](#References) + +Following is a simple socket text-stream receiver, which is appearently overly simplified using Akka's socket.io api. + +{% highlight scala %} + + class SocketTextStreamReceiver (host:String, + port:Int, + bytesToString: ByteString => String) extends Actor { + + override def preStart = IOManager(context.system).connect(host, port) + + def receive = { + case IO.Read(socket, bytes) => context.parent ! Data(bytesToString(bytes)) + } + } + + +{% endhighlight %} + + +_Please see implementations of NetworkReceiver for more generic NetworkReceivers._ + +### A sample spark application + +* First create a Spark streaming context with master url and batchduration. + +{% highlight scala %} + + val ssc = new StreamingContext(master, "WordCountCustomStreamSource", + Seconds(batchDuration)) + +{% endhighlight %} + +* Plug-in the actor configuration into the spark streaming context and create a DStream. + +{% highlight scala %} + + val lines = ssc.pluggableActorStream[String](Props(new SocketTextStreamReceiver( + "localhost",8445, z => z.utf8String)),"SocketReceiver") + +{% endhighlight %} + +* Process it. + +{% highlight scala %} + + val words = lines.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) + + wordCounts.print() + ssc.start() + + +{% endhighlight %} + +* After processing it, stream can be tested using the netcat utility. + + $ nc -l localhost 8445 + hello world + hello hello + + +## Multiple homogeneous/heterogeneous receivers. + +A DStream union operation is provided for taking union on multiple input streams. + +{% highlight scala %} + + val lines = ssc.pluggableActorStream[String](Props(new SocketTextStreamReceiver( + "localhost",8445, z => z.utf8String)),"SocketReceiver") + + // Another socket stream receiver + val lines2 = ssc.pluggableActorStream[String](Props(new SocketTextStreamReceiver( + "localhost",8446, z => z.utf8String)),"SocketReceiver") + + val union = lines.union(lines2) + +{% endhighlight %} + +Above stream can be easily process as described earlier. + +_A more comprehensive example is provided in the spark streaming examples_ + +## References + +1.[Akka Actor documentation](http://doc.akka.io/docs/akka/2.0.5/scala/actors.html) diff --git a/examples/src/main/scala/spark/streaming/examples/AkkaActorWordCount.scala b/examples/src/main/scala/spark/streaming/examples/AkkaActorWordCount.scala new file mode 100644 index 0000000000..ff05842c71 --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/AkkaActorWordCount.scala @@ -0,0 +1,80 @@ +package spark.streaming.examples + +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.Props +import akka.actor.actorRef2Scala + +import spark.streaming.Seconds +import spark.streaming.StreamingContext +import spark.streaming.StreamingContext.toPairDStreamFunctions +import spark.streaming.receivers.Data + +case class SubscribeReceiver(receiverActor: ActorRef) +case class UnsubscribeReceiver(receiverActor: ActorRef) + +/** + * A sample actor as receiver is also simplest. This receiver actor + * goes and subscribe to a typical publisher/feeder actor and receives + * data, thus it is important to have feeder running before this example + * can be run. Please see FileTextStreamFeeder(sample) for feeder of this + * receiver. + */ +class SampleActorReceiver[T: ClassManifest](urlOfPublisher: String) + extends Actor { + + lazy private val remotePublisher = context.actorFor(urlOfPublisher) + + override def preStart = remotePublisher ! SubscribeReceiver(context.self) + + def receive = { + case msg => context.parent ! Data(msg.asInstanceOf[T]) + } + + override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self) + +} + +/** + * A sample word count program demonstrating the use of plugging in + * AkkaActor as Receiver + */ +object AkkaActorWordCount { + def main(args: Array[String]) { + if (args.length < 4) { + System.err.println( + "Usage: AkkaActorWordCount <master> <batch-duration in seconds>" + + " <remoteAkkaHost> <remoteAkkaPort>" + + "In local mode, <master> should be 'local[n]' with n > 1") + System.exit(1) + } + + val Seq(master, batchDuration, remoteAkkaHost, remoteAkkaPort) = args.toSeq + + // Create the context and set the batch size + val ssc = new StreamingContext(master, "AkkaActorWordCount", + Seconds(batchDuration.toLong)) + + /* + * Following is the use of pluggableActorStream to plug in custom actor as receiver + * + * An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e type of data received and PluggableInputDstream + * should be same. + * + * For example: Both pluggableActorStream and SampleActorReceiver are parameterized + * to same type to ensure type safety. + */ + + val lines = ssc.pluggableActorStream[String]( + Props(new SampleActorReceiver[String]("akka://spark@%s:%s/user/FeederActor".format( + remoteAkkaHost, remoteAkkaPort.toInt))), "SampleReceiver") + + //compute wordcount + lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _).print() + + ssc.start() + + } +} diff --git a/examples/src/main/scala/spark/streaming/examples/FileTextStreamFeeder.scala b/examples/src/main/scala/spark/streaming/examples/FileTextStreamFeeder.scala new file mode 100644 index 0000000000..f4c1b87f0e --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/FileTextStreamFeeder.scala @@ -0,0 +1,63 @@ +package spark.streaming.examples + +import java.util.concurrent.CountDownLatch + +import scala.collection.mutable.LinkedList +import scala.io.Source + +import akka.actor.{ Actor, ActorRef, actorRef2Scala } +import akka.actor.Props + +import spark.util.AkkaUtils + +/** + * A feeder to which multiple message receiver (specified by "noOfReceivers")actors + * subscribe and receive file(s)'s text as stream of messages. This is provided + * as a demonstration application for trying out Actor as receiver feature. Please see + * SampleActorReceiver or AkkaActorWordCount example for details about the + * receiver of this feeder. + */ + +object FileTextStreamFeeder { + + var receivers: LinkedList[ActorRef] = new LinkedList[ActorRef]() + var countdownLatch: CountDownLatch = _ + def main(args: Array[String]) = args.toList match { + + case host :: port :: noOfReceivers :: fileNames => + val acs = AkkaUtils.createActorSystem("spark", host, port.toInt)._1 + countdownLatch = new CountDownLatch(noOfReceivers.toInt) + val actor = acs.actorOf(Props(new FeederActor), "FeederActor") + countdownLatch.await() //wait for all the receivers to subscribe + for (fileName <- fileNames;line <- Source.fromFile(fileName).getLines) { + actor ! line + } + acs.awaitTermination(); + + case _ => + System.err.println("Usage: FileTextStreamFeeder <hostname> <port> <no_of_receivers> <filenames>") + System.exit(1) + } + + /** + * Sends the content to every receiver subscribed + */ + class FeederActor extends Actor { + + def receive: Receive = { + + case SubscribeReceiver(receiverActor: ActorRef) => + println("received subscribe from %s".format(receiverActor.toString)) + receivers = LinkedList(receiverActor) ++ receivers + countdownLatch.countDown() + + case UnsubscribeReceiver(receiverActor: ActorRef) => + println("received unsubscribe from %s".format(receiverActor.toString)) + receivers = receivers.dropWhile(x => x eq receiverActor) + + case textMessage: String => + receivers.foreach(_ ! textMessage) + + } + } +}
\ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 14500bdcb1..cd7379da14 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -1,10 +1,15 @@ package spark.streaming +import akka.actor.Props + import spark.streaming.dstream._ import spark.{RDD, Logging, SparkEnv, SparkContext} import spark.storage.StorageLevel import spark.util.MetadataCleaner +import spark.streaming.receivers.ActorReceiver +import spark.streaming.receivers.Settings + import scala.collection.mutable.Queue @@ -135,6 +140,30 @@ class StreamingContext private ( protected[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement() /** + * Create an input stream with any arbitrary user implemented network receiver. + * @param receiver Custom implementation of NetworkReceiver + */ + def pluggableNetworkStream[T: ClassManifest]( + receiver: NetworkReceiver[T]): DStream[T] = { + val inputStream = new PluggableInputDStream[T](this, + receiver) + graph.addInputStream(inputStream) + inputStream + } + + /** + * Create an input stream with any arbitrary user implemented akka actor receiver. + * @param props Props object defining creation of the actor + * @param name Name of the actor + * @param storageLevel RDD storage level. Defaults to memory-only. + */ + def pluggableActorStream[T: ClassManifest]( + props: Props, name: String, + storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2): DStream[T] = { + pluggableNetworkStream(new ActorReceiver(Settings(props, name, storageLevel))) + } + + /** * Create an input stream that pulls messages form a Kafka Broker. * @param hostname Zookeper hostname. * @param port Zookeper port. diff --git a/streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala new file mode 100644 index 0000000000..674f1059fe --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala @@ -0,0 +1,12 @@ +package spark.streaming.dstream + +import spark.streaming.StreamingContext + +class PluggableInputDStream[T: ClassManifest]( + @transient ssc_ : StreamingContext, + receiver: NetworkReceiver[T]) extends NetworkInputDStream[T](ssc_) { + + def getReceiver(): NetworkReceiver[T] = { + receiver + } +} diff --git a/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala new file mode 100644 index 0000000000..f24c99ad70 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala @@ -0,0 +1,111 @@ +package spark.streaming.receivers + +import akka.actor.{ Actor, PoisonPill, Props, SupervisorStrategy } +import akka.actor.{ actorRef2Scala, ActorRef } +import akka.actor.{ PossiblyHarmful, OneForOneStrategy } + +import spark.storage.StorageLevel +import spark.streaming.dstream.NetworkReceiver + +import java.util.concurrent.atomic.AtomicInteger + +/** A helper with set of defaults for supervisor strategy **/ +object ReceiverSupervisorStrategy { + + import akka.util.duration._ + import akka.actor.SupervisorStrategy._ + + val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = + 15 millis) { + case _: RuntimeException ⇒ Restart + case _: Exception ⇒ Escalate + } +} + +/** + * Settings for configuring the actor creation or defining supervisor strategy + */ +case class Settings(props: Props, + name: String, + storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2, + supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy) + +/** + * Statistcs for querying the supervisor about state of workers + */ +case class Statistcs(numberOfMsgs: Int, + numberOfWorkers: Int, + numberOfHiccups: Int, + otherInfo: String) + +/** Case class to receive data sent by child actors **/ +case class Data[T: ClassManifest](data: T) + +/** + * Provides Actors as receivers for receiving stream. + * + * As Actors can also be used to receive data from almost any stream source. + * A nice set of abstraction(s) for actors as receivers is already provided for + * a few general cases. It is thus exposed as an API where user may come with + * his own Actor to run as receiver for Spark Streaming input source. + */ +class ActorReceiver[T: ClassManifest](settings: Settings) + extends NetworkReceiver[T] { + + protected lazy val blocksGenerator: BlockGenerator = + new BlockGenerator(settings.storageLevel) + + protected lazy val supervisor = env.actorSystem.actorOf(Props(new Supervisor), + "Supervisor" + streamId) + + private class Supervisor extends Actor { + + override val supervisorStrategy = settings.supervisorStrategy + val worker = context.actorOf(settings.props, settings.name) + logInfo("Started receiver worker at:" + worker.path) + + val n: AtomicInteger = new AtomicInteger(0) + val hiccups: AtomicInteger = new AtomicInteger(0) + + def receive = { + + case props: Props => + val worker = context.actorOf(props) + logInfo("Started receiver worker at:" + worker.path) + sender ! worker + + case (props: Props, name: String) => + val worker = context.actorOf(props, name) + logInfo("Started receiver worker at:" + worker.path) + sender ! worker + + case _: PossiblyHarmful => hiccups.incrementAndGet() + + case _: Statistcs => + val workers = context.children + sender ! Statistcs(n.get, workers.size, hiccups.get, workers.mkString("\n")) + + case Data(iter: Iterator[_]) => push(iter.asInstanceOf[Iterator[T]]) + + case Data(msg) => + blocksGenerator += msg.asInstanceOf[T] + n.incrementAndGet + } + } + + protected def push(iter: Iterator[T]) { + pushBlock("block-" + streamId + "-" + System.nanoTime(), + iter, null, settings.storageLevel) + } + + protected def onStart() = { + blocksGenerator.start() + supervisor + logInfo("Supervision tree for receivers initialized at:" + supervisor.path) + } + + protected def onStop() = { + supervisor ! PoisonPill + } + +} |