diff options
author | Prashant Sharma <prashant.s@imaginea.com> | 2013-01-22 13:28:29 +0530 |
---|---|---|
committer | Prashant Sharma <prashant.s@imaginea.com> | 2013-01-22 13:28:29 +0530 |
commit | d17065c4b565ec975a46c6d375998ef8ae7a32d5 (patch) | |
tree | 8285bbcfb663ce6d3aaa6a16fbf17447adb917ad | |
parent | 43bfd7bb21e6f8a9d083686a83bcd309a84f937e (diff) | |
download | spark-d17065c4b565ec975a46c6d375998ef8ae7a32d5.tar.gz spark-d17065c4b565ec975a46c6d375998ef8ae7a32d5.tar.bz2 spark-d17065c4b565ec975a46c6d375998ef8ae7a32d5.zip |
actor as receiver
6 files changed, 500 insertions, 0 deletions
diff --git a/docs/plugin-custom-receiver.md b/docs/plugin-custom-receiver.md new file mode 100644 index 0000000000..0eb4246158 --- /dev/null +++ b/docs/plugin-custom-receiver.md @@ -0,0 +1,101 @@ +--- +layout: global +title: Tutorial - Spark streaming, Plugging in a custom receiver. +--- + +A "Spark streaming" receiver can be a simple network stream, streams of messages from a message queue, files etc. A receiver can also assume roles more than just receiving data like filtering, preprocessing, to name a few of the possibilities. The api to plug-in any user defined custom receiver is thus provided to encourage development of receivers which may be well suited to ones specific need. + +This guide shows the programming model and features by walking through a simple sample receiver and corresponding Spark Streaming application. + + +## A quick and naive walk-through + +### Write a simple receiver + +This starts with implementing [Actor](#References) + +Following is a simple socket text-stream receiver, which is appearently overly simplified using Akka's socket.io api. + +{% highlight scala %} + + class SocketTextStreamReceiver (host:String, + port:Int, + bytesToString: ByteString => String) extends Actor with Receiver { + + override def preStart = IOManager(context.system).connect(host, port) + + def receive = { + case IO.Read(socket, bytes) => pushBlock(bytesToString(bytes)) + } + + } + + +{% endhighlight %} + +All we did here is mixed in trait Receiver and called pushBlock api method to push our blocks of data. Please refer to scala-docs of Receiver for more details. + +### A sample spark application + +* First create a Spark streaming context with master url and batchduration. + +{% highlight scala %} + + val ssc = new StreamingContext(master, "WordCountCustomStreamSource", + Seconds(batchDuration)) + +{% endhighlight %} + +* Plug-in the actor configuration into the spark streaming context and create a DStream. + +{% highlight scala %} + + val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver( + "localhost",8445, z => z.utf8String)),"SocketReceiver") + +{% endhighlight %} + +* Process it. + +{% highlight scala %} + + val words = lines.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) + + wordCounts.print() + ssc.start() + + +{% endhighlight %} + +* After processing it, stream can be tested using the netcat utility. + + $ nc -l localhost 8445 + hello world + hello hello + + +## Multiple homogeneous/heterogeneous receivers. + +A DStream union operation is provided for taking union on multiple input streams. + +{% highlight scala %} + + val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver( + "localhost",8445, z => z.utf8String)),"SocketReceiver") + + // Another socket stream receiver + val lines2 = ssc.actorStream[String](Props(new SocketTextStreamReceiver( + "localhost",8446, z => z.utf8String)),"SocketReceiver") + + val union = lines.union(lines2) + +{% endhighlight %} + +Above stream can be easily process as described earlier. + +_A more comprehensive example is provided in the spark streaming examples_ + +## References + +1.[Akka Actor documentation](http://doc.akka.io/docs/akka/2.0.5/scala/actors.html) diff --git a/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala new file mode 100644 index 0000000000..c3d3755953 --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala @@ -0,0 +1,130 @@ +package spark.streaming.examples + +import scala.collection.mutable.LinkedList +import scala.util.Random + +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.Props +import akka.actor.actorRef2Scala + +import spark.streaming.Seconds +import spark.streaming.StreamingContext +import spark.streaming.StreamingContext.toPairDStreamFunctions +import spark.streaming.receivers.Receiver +import spark.util.AkkaUtils + +case class SubscribeReceiver(receiverActor: ActorRef) +case class UnsubscribeReceiver(receiverActor: ActorRef) + +/** + * Sends the random content to every receiver subscribed with 1/2 + * second delay. + */ +class FeederActor extends Actor { + + val rand = new Random() + var receivers: LinkedList[ActorRef] = new LinkedList[ActorRef]() + + val strings: Array[String] = Array("words ", "may ", "count ") + + def makeMessage(): String = { + val x = rand.nextInt(3) + strings(x) + strings(2 - x) + } + + /* + * A thread to generate random messages + */ + new Thread() { + override def run() { + while (true) { + Thread.sleep(500) + receivers.foreach(_ ! makeMessage) + } + } + }.start() + + def receive: Receive = { + + case SubscribeReceiver(receiverActor: ActorRef) => + println("received subscribe from %s".format(receiverActor.toString)) + receivers = LinkedList(receiverActor) ++ receivers + + case UnsubscribeReceiver(receiverActor: ActorRef) => + println("received unsubscribe from %s".format(receiverActor.toString)) + receivers = receivers.dropWhile(x => x eq receiverActor) + + } +} + +/** + * A sample actor as receiver is also simplest. This receiver actor + * goes and subscribe to a typical publisher/feeder actor and receives + * data, thus it is important to have feeder running before this example + * can be run. + * + * @see [[spark.streaming.examples.FeederActor]] + */ +class SampleActorReceiver[T: ClassManifest](urlOfPublisher: String) + extends Actor with Receiver { + + lazy private val remotePublisher = context.actorFor(urlOfPublisher) + + override def preStart = remotePublisher ! SubscribeReceiver(context.self) + + def receive = { + case msg ⇒ context.parent ! pushBlock(msg.asInstanceOf[T]) + } + + override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self) + +} + +/** + * A sample word count program demonstrating the use of plugging in + * Actor as Receiver + */ +object ActorWordCount { + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println( + "Usage: ActorWordCount <master> <host> <port>" + + "In local mode, <master> should be 'local[n]' with n > 1") + System.exit(1) + } + + val Seq(master, host, port) = args.toSeq + + // Create the context and set the batch size + val ssc = new StreamingContext(master, "ActorWordCount", + Seconds(10)) + + //Start feeder actor on this actor system. + val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt)._1 + + val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor") + + /* + * Following is the use of actorStream to plug in custom actor as receiver + * + * An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e type of data received and InputDstream + * should be same. + * + * For example: Both actorStream and SampleActorReceiver are parameterized + * to same type to ensure type safety. + */ + + val lines = ssc.actorStream[String]( + Props(new SampleActorReceiver[String]("akka://spark@%s:%s/user/FeederActor".format( + host, port.toInt))), "SampleReceiver") + + //compute wordcount + lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print() + + ssc.start() + + } +} diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 14500bdcb1..9d960e883f 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -1,8 +1,13 @@ package spark.streaming +import akka.actor.Props +import akka.actor.SupervisorStrategy + import spark.streaming.dstream._ import spark.{RDD, Logging, SparkEnv, SparkContext} +import spark.streaming.receivers.ActorReceiver +import spark.streaming.receivers.ReceiverSupervisorStrategy import spark.storage.StorageLevel import spark.util.MetadataCleaner @@ -135,6 +140,36 @@ class StreamingContext private ( protected[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement() /** + * Create an input stream with any arbitrary user implemented network receiver. + * @param receiver Custom implementation of NetworkReceiver + */ + def pluggableNetworkStream[T: ClassManifest]( + receiver: NetworkReceiver[T]): DStream[T] = { + val inputStream = new PluggableInputDStream[T](this, + receiver) + graph.addInputStream(inputStream) + inputStream + } + + /** + * Create an input stream with any arbitrary user implemented actor receiver. + * @param props Props object defining creation of the actor + * @param name Name of the actor + * @param storageLevel RDD storage level. Defaults to memory-only. + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e parametrized type of data received and actorStream + * should be same. + */ + def actorStream[T: ClassManifest]( + props: Props, name: String, + storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2, + supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy): DStream[T] = { + pluggableNetworkStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy)) + } + + /** * Create an input stream that pulls messages form a Kafka Broker. * @param hostname Zookeper hostname. * @param port Zookeper port. diff --git a/streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala new file mode 100644 index 0000000000..3c2a81947b --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala @@ -0,0 +1,13 @@ +package spark.streaming.dstream + +import spark.streaming.StreamingContext + +private[streaming] +class PluggableInputDStream[T: ClassManifest]( + @transient ssc_ : StreamingContext, + receiver: NetworkReceiver[T]) extends NetworkInputDStream[T](ssc_) { + + def getReceiver(): NetworkReceiver[T] = { + receiver + } +} diff --git a/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala new file mode 100644 index 0000000000..b3201d0b28 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala @@ -0,0 +1,153 @@ +package spark.streaming.receivers + +import akka.actor.{ Actor, PoisonPill, Props, SupervisorStrategy } +import akka.actor.{ actorRef2Scala, ActorRef } +import akka.actor.{ PossiblyHarmful, OneForOneStrategy } + +import spark.storage.StorageLevel +import spark.streaming.dstream.NetworkReceiver + +import java.util.concurrent.atomic.AtomicInteger + +/** A helper with set of defaults for supervisor strategy **/ +object ReceiverSupervisorStrategy { + + import akka.util.duration._ + import akka.actor.SupervisorStrategy._ + + val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = + 15 millis) { + case _: RuntimeException ⇒ Restart + case _: Exception ⇒ Escalate + } +} + +/** + * A receiver trait to be mixed in with your Actor to gain access to + * pushBlock API. + * + * @example {{{ + * class MyActor extends Actor with Receiver{ + * def receive { + * case anything :String ⇒ pushBlock(anything) + * } + * } + * //Can be plugged in actorStream as follows + * ssc.actorStream[String](Props(new MyActor),"MyActorReceiver") + * + * }}} + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e parametrized type of push block and InputDStream + * should be same. + * + */ +trait Receiver { self: Actor ⇒ + def pushBlock[T: ClassManifest](iter: Iterator[T]) { + context.parent ! Data(iter) + } + + def pushBlock[T: ClassManifest](data: T) { + context.parent ! Data(data) + } + +} + +/** + * Statistics for querying the supervisor about state of workers + */ +case class Statistics(numberOfMsgs: Int, + numberOfWorkers: Int, + numberOfHiccups: Int, + otherInfo: String) + +/** Case class to receive data sent by child actors **/ +private[streaming] case class Data[T: ClassManifest](data: T) + +/** + * Provides Actors as receivers for receiving stream. + * + * As Actors can also be used to receive data from almost any stream source. + * A nice set of abstraction(s) for actors as receivers is already provided for + * a few general cases. It is thus exposed as an API where user may come with + * his own Actor to run as receiver for Spark Streaming input source. + * + * This starts a supervisor actor which starts workers and also provides + * [http://doc.akka.io/docs/akka/2.0.5/scala/fault-tolerance.html fault-tolerance]. + * + * Here's a way to start more supervisor/workers as its children. + * + * @example {{{ + * context.parent ! Props(new Supervisor) + * }}} OR {{{ + * context.parent ! Props(new Worker,"Worker") + * }}} + * + * + */ +private[streaming] class ActorReceiver[T: ClassManifest]( + props: Props, + name: String, + storageLevel: StorageLevel, + receiverSupervisorStrategy: SupervisorStrategy) + extends NetworkReceiver[T] { + + protected lazy val blocksGenerator: BlockGenerator = + new BlockGenerator(storageLevel) + + protected lazy val supervisor = env.actorSystem.actorOf(Props(new Supervisor), + "Supervisor" + streamId) + + private class Supervisor extends Actor { + + override val supervisorStrategy = receiverSupervisorStrategy + val worker = context.actorOf(props, name) + logInfo("Started receiver worker at:" + worker.path) + + val n: AtomicInteger = new AtomicInteger(0) + val hiccups: AtomicInteger = new AtomicInteger(0) + + def receive = { + + case Data(iter: Iterator[_]) ⇒ pushBlock(iter.asInstanceOf[Iterator[T]]) + + case Data(msg) ⇒ + blocksGenerator += msg.asInstanceOf[T] + n.incrementAndGet + + case props: Props ⇒ + val worker = context.actorOf(props) + logInfo("Started receiver worker at:" + worker.path) + sender ! worker + + case (props: Props, name: String) ⇒ + val worker = context.actorOf(props, name) + logInfo("Started receiver worker at:" + worker.path) + sender ! worker + + case _: PossiblyHarmful => hiccups.incrementAndGet() + + case _: Statistics ⇒ + val workers = context.children + sender ! Statistics(n.get, workers.size, hiccups.get, workers.mkString("\n")) + + } + } + + protected def pushBlock(iter: Iterator[T]) { + pushBlock("block-" + streamId + "-" + System.nanoTime(), + iter, null, storageLevel) + } + + protected def onStart() = { + blocksGenerator.start() + supervisor + logInfo("Supervision tree for receivers initialized at:" + supervisor.path) + } + + protected def onStop() = { + supervisor ! PoisonPill + } + +} diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index d7ba7a5d17..d597501781 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -1,5 +1,11 @@ package spark.streaming +import akka.actor.Actor +import akka.actor.IO +import akka.actor.IOManager +import akka.actor.Props +import akka.util.ByteString + import dstream.SparkFlumeEvent import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket} import java.io.{File, BufferedWriter, OutputStreamWriter} @@ -7,6 +13,7 @@ import java.util.concurrent.{TimeUnit, ArrayBlockingQueue} import collection.mutable.{SynchronizedBuffer, ArrayBuffer} import util.ManualClock import spark.storage.StorageLevel +import spark.streaming.receivers.Receiver import spark.Logging import scala.util.Random import org.apache.commons.io.FileUtils @@ -242,6 +249,55 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { assert(output(i).head.toString === expectedOutput(i)) } } + test("actor input stream") { + // Start the server + val port = testPort + testServer = new TestServer(port) + testServer.start() + + // Set up the streaming context and input streams + val ssc = new StreamingContext(master, framework, batchDuration) + val networkStream = ssc.actorStream[String](Props(new TestActor(port)), "TestActor", + StorageLevel.MEMORY_AND_DISK) //Had to pass the local value of port to prevent from closing over entire scope + val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] + val outputStream = new TestOutputStream(networkStream, outputBuffer) + def output = outputBuffer.flatMap(x => x) + ssc.registerOutputStream(outputStream) + ssc.start() + + // Feed data to the server to send to the network receiver + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + val input = 1 to 9 + val expectedOutput = input.map(x => x.toString) + Thread.sleep(1000) + for (i <- 0 until input.size) { + testServer.send(input(i).toString) + Thread.sleep(500) + clock.addToTime(batchDuration.milliseconds) + } + Thread.sleep(1000) + logInfo("Stopping server") + testServer.stop() + logInfo("Stopping context") + ssc.stop() + + // Verify whether data received was as expected + logInfo("--------------------------------") + logInfo("output.size = " + outputBuffer.size) + logInfo("output") + outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("expected output.size = " + expectedOutput.size) + logInfo("expected output") + expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("--------------------------------") + + // Verify whether all the elements received are as expected + // (whether the elements were received one in each interval is not verified) + assert(output.size === expectedOutput.size) + for (i <- 0 until output.size) { + assert(output(i) === expectedOutput(i)) + } + } test("file input stream with checkpoint") { // Create a temporary directory @@ -353,3 +409,15 @@ object TestServer { } } } + +class TestActor(port: Int) extends Actor with Receiver { + + def bytesToString(byteString: ByteString) = byteString.utf8String + + override def preStart = IOManager(context.system).connect(new InetSocketAddress(port)) + + def receive = { + case IO.Read(socket, bytes) => + pushBlock(bytesToString(bytes)) + } +} |