diff options
author | Prashant Sharma <prashant.iiith@gmail.com> | 2013-01-19 21:20:49 +0530 |
---|---|---|
committer | Prashant Sharma <prashant.iiith@gmail.com> | 2013-01-19 22:04:07 +0530 |
commit | 56b9bd197c522f33e354c2e9ad7e76440cf817e9 (patch) | |
tree | bf62af73fa0272db1ae6b7c01a6d0d0307a85be8 /examples | |
parent | bb6ab92e31b7aad464cf8262bc3567fdeb4c14c4 (diff) | |
download | spark-56b9bd197c522f33e354c2e9ad7e76440cf817e9.tar.gz spark-56b9bd197c522f33e354c2e9ad7e76440cf817e9.tar.bz2 spark-56b9bd197c522f33e354c2e9ad7e76440cf817e9.zip |
Plug in actor as stream receiver API
Diffstat (limited to 'examples')
-rw-r--r-- | examples/src/main/scala/spark/streaming/examples/AkkaActorWordCount.scala | 80 | ||||
-rw-r--r-- | examples/src/main/scala/spark/streaming/examples/FileTextStreamFeeder.scala | 63 |
2 files changed, 143 insertions, 0 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/AkkaActorWordCount.scala b/examples/src/main/scala/spark/streaming/examples/AkkaActorWordCount.scala new file mode 100644 index 0000000000..ff05842c71 --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/AkkaActorWordCount.scala @@ -0,0 +1,80 @@ +package spark.streaming.examples + +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.Props +import akka.actor.actorRef2Scala + +import spark.streaming.Seconds +import spark.streaming.StreamingContext +import spark.streaming.StreamingContext.toPairDStreamFunctions +import spark.streaming.receivers.Data + +case class SubscribeReceiver(receiverActor: ActorRef) +case class UnsubscribeReceiver(receiverActor: ActorRef) + +/** + * A sample actor as receiver is also simplest. This receiver actor + * goes and subscribe to a typical publisher/feeder actor and receives + * data, thus it is important to have feeder running before this example + * can be run. Please see FileTextStreamFeeder(sample) for feeder of this + * receiver. + */ +class SampleActorReceiver[T: ClassManifest](urlOfPublisher: String) + extends Actor { + + lazy private val remotePublisher = context.actorFor(urlOfPublisher) + + override def preStart = remotePublisher ! SubscribeReceiver(context.self) + + def receive = { + case msg => context.parent ! Data(msg.asInstanceOf[T]) + } + + override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self) + +} + +/** + * A sample word count program demonstrating the use of plugging in + * AkkaActor as Receiver + */ +object AkkaActorWordCount { + def main(args: Array[String]) { + if (args.length < 4) { + System.err.println( + "Usage: AkkaActorWordCount <master> <batch-duration in seconds>" + + " <remoteAkkaHost> <remoteAkkaPort>" + + "In local mode, <master> should be 'local[n]' with n > 1") + System.exit(1) + } + + val Seq(master, batchDuration, remoteAkkaHost, remoteAkkaPort) = args.toSeq + + // Create the context and set the batch size + val ssc = new StreamingContext(master, "AkkaActorWordCount", + Seconds(batchDuration.toLong)) + + /* + * Following is the use of pluggableActorStream to plug in custom actor as receiver + * + * An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e type of data received and PluggableInputDstream + * should be same. + * + * For example: Both pluggableActorStream and SampleActorReceiver are parameterized + * to same type to ensure type safety. + */ + + val lines = ssc.pluggableActorStream[String]( + Props(new SampleActorReceiver[String]("akka://spark@%s:%s/user/FeederActor".format( + remoteAkkaHost, remoteAkkaPort.toInt))), "SampleReceiver") + + //compute wordcount + lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _).print() + + ssc.start() + + } +} diff --git a/examples/src/main/scala/spark/streaming/examples/FileTextStreamFeeder.scala b/examples/src/main/scala/spark/streaming/examples/FileTextStreamFeeder.scala new file mode 100644 index 0000000000..f4c1b87f0e --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/FileTextStreamFeeder.scala @@ -0,0 +1,63 @@ +package spark.streaming.examples + +import java.util.concurrent.CountDownLatch + +import scala.collection.mutable.LinkedList +import scala.io.Source + +import akka.actor.{ Actor, ActorRef, actorRef2Scala } +import akka.actor.Props + +import spark.util.AkkaUtils + +/** + * A feeder to which multiple message receiver (specified by "noOfReceivers")actors + * subscribe and receive file(s)'s text as stream of messages. This is provided + * as a demonstration application for trying out Actor as receiver feature. Please see + * SampleActorReceiver or AkkaActorWordCount example for details about the + * receiver of this feeder. + */ + +object FileTextStreamFeeder { + + var receivers: LinkedList[ActorRef] = new LinkedList[ActorRef]() + var countdownLatch: CountDownLatch = _ + def main(args: Array[String]) = args.toList match { + + case host :: port :: noOfReceivers :: fileNames => + val acs = AkkaUtils.createActorSystem("spark", host, port.toInt)._1 + countdownLatch = new CountDownLatch(noOfReceivers.toInt) + val actor = acs.actorOf(Props(new FeederActor), "FeederActor") + countdownLatch.await() //wait for all the receivers to subscribe + for (fileName <- fileNames;line <- Source.fromFile(fileName).getLines) { + actor ! line + } + acs.awaitTermination(); + + case _ => + System.err.println("Usage: FileTextStreamFeeder <hostname> <port> <no_of_receivers> <filenames>") + System.exit(1) + } + + /** + * Sends the content to every receiver subscribed + */ + class FeederActor extends Actor { + + def receive: Receive = { + + case SubscribeReceiver(receiverActor: ActorRef) => + println("received subscribe from %s".format(receiverActor.toString)) + receivers = LinkedList(receiverActor) ++ receivers + countdownLatch.countDown() + + case UnsubscribeReceiver(receiverActor: ActorRef) => + println("received unsubscribe from %s".format(receiverActor.toString)) + receivers = receivers.dropWhile(x => x eq receiverActor) + + case textMessage: String => + receivers.foreach(_ ! textMessage) + + } + } +}
\ No newline at end of file |