aboutsummaryrefslogtreecommitdiff
path: root/examples/src
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.iiith@gmail.com>2013-01-19 21:20:49 +0530
committerPrashant Sharma <prashant.iiith@gmail.com>2013-01-19 22:04:07 +0530
commit56b9bd197c522f33e354c2e9ad7e76440cf817e9 (patch)
treebf62af73fa0272db1ae6b7c01a6d0d0307a85be8 /examples/src
parentbb6ab92e31b7aad464cf8262bc3567fdeb4c14c4 (diff)
downloadspark-56b9bd197c522f33e354c2e9ad7e76440cf817e9.tar.gz
spark-56b9bd197c522f33e354c2e9ad7e76440cf817e9.tar.bz2
spark-56b9bd197c522f33e354c2e9ad7e76440cf817e9.zip
Plug in actor as stream receiver API
Diffstat (limited to 'examples/src')
-rw-r--r--examples/src/main/scala/spark/streaming/examples/AkkaActorWordCount.scala80
-rw-r--r--examples/src/main/scala/spark/streaming/examples/FileTextStreamFeeder.scala63
2 files changed, 143 insertions, 0 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/AkkaActorWordCount.scala b/examples/src/main/scala/spark/streaming/examples/AkkaActorWordCount.scala
new file mode 100644
index 0000000000..ff05842c71
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/AkkaActorWordCount.scala
@@ -0,0 +1,80 @@
+package spark.streaming.examples
+
+import akka.actor.Actor
+import akka.actor.ActorRef
+import akka.actor.Props
+import akka.actor.actorRef2Scala
+
+import spark.streaming.Seconds
+import spark.streaming.StreamingContext
+import spark.streaming.StreamingContext.toPairDStreamFunctions
+import spark.streaming.receivers.Data
+
+case class SubscribeReceiver(receiverActor: ActorRef)
+case class UnsubscribeReceiver(receiverActor: ActorRef)
+
+/**
+ * A sample actor as receiver is also simplest. This receiver actor
+ * goes and subscribe to a typical publisher/feeder actor and receives
+ * data, thus it is important to have feeder running before this example
+ * can be run. Please see FileTextStreamFeeder(sample) for feeder of this
+ * receiver.
+ */
+class SampleActorReceiver[T: ClassManifest](urlOfPublisher: String)
+ extends Actor {
+
+ lazy private val remotePublisher = context.actorFor(urlOfPublisher)
+
+ override def preStart = remotePublisher ! SubscribeReceiver(context.self)
+
+ def receive = {
+ case msg => context.parent ! Data(msg.asInstanceOf[T])
+ }
+
+ override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self)
+
+}
+
+/**
+ * A sample word count program demonstrating the use of plugging in
+ * AkkaActor as Receiver
+ */
+object AkkaActorWordCount {
+ def main(args: Array[String]) {
+ if (args.length < 4) {
+ System.err.println(
+ "Usage: AkkaActorWordCount <master> <batch-duration in seconds>" +
+ " <remoteAkkaHost> <remoteAkkaPort>" +
+ "In local mode, <master> should be 'local[n]' with n > 1")
+ System.exit(1)
+ }
+
+ val Seq(master, batchDuration, remoteAkkaHost, remoteAkkaPort) = args.toSeq
+
+ // Create the context and set the batch size
+ val ssc = new StreamingContext(master, "AkkaActorWordCount",
+ Seconds(batchDuration.toLong))
+
+ /*
+ * Following is the use of pluggableActorStream to plug in custom actor as receiver
+ *
+ * An important point to note:
+ * Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e type of data received and PluggableInputDstream
+ * should be same.
+ *
+ * For example: Both pluggableActorStream and SampleActorReceiver are parameterized
+ * to same type to ensure type safety.
+ */
+
+ val lines = ssc.pluggableActorStream[String](
+ Props(new SampleActorReceiver[String]("akka://spark@%s:%s/user/FeederActor".format(
+ remoteAkkaHost, remoteAkkaPort.toInt))), "SampleReceiver")
+
+ //compute wordcount
+ lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _).print()
+
+ ssc.start()
+
+ }
+}
diff --git a/examples/src/main/scala/spark/streaming/examples/FileTextStreamFeeder.scala b/examples/src/main/scala/spark/streaming/examples/FileTextStreamFeeder.scala
new file mode 100644
index 0000000000..f4c1b87f0e
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/FileTextStreamFeeder.scala
@@ -0,0 +1,63 @@
+package spark.streaming.examples
+
+import java.util.concurrent.CountDownLatch
+
+import scala.collection.mutable.LinkedList
+import scala.io.Source
+
+import akka.actor.{ Actor, ActorRef, actorRef2Scala }
+import akka.actor.Props
+
+import spark.util.AkkaUtils
+
+/**
+ * A feeder to which multiple message receiver (specified by "noOfReceivers")actors
+ * subscribe and receive file(s)'s text as stream of messages. This is provided
+ * as a demonstration application for trying out Actor as receiver feature. Please see
+ * SampleActorReceiver or AkkaActorWordCount example for details about the
+ * receiver of this feeder.
+ */
+
+object FileTextStreamFeeder {
+
+ var receivers: LinkedList[ActorRef] = new LinkedList[ActorRef]()
+ var countdownLatch: CountDownLatch = _
+ def main(args: Array[String]) = args.toList match {
+
+ case host :: port :: noOfReceivers :: fileNames =>
+ val acs = AkkaUtils.createActorSystem("spark", host, port.toInt)._1
+ countdownLatch = new CountDownLatch(noOfReceivers.toInt)
+ val actor = acs.actorOf(Props(new FeederActor), "FeederActor")
+ countdownLatch.await() //wait for all the receivers to subscribe
+ for (fileName <- fileNames;line <- Source.fromFile(fileName).getLines) {
+ actor ! line
+ }
+ acs.awaitTermination();
+
+ case _ =>
+ System.err.println("Usage: FileTextStreamFeeder <hostname> <port> <no_of_receivers> <filenames>")
+ System.exit(1)
+ }
+
+ /**
+ * Sends the content to every receiver subscribed
+ */
+ class FeederActor extends Actor {
+
+ def receive: Receive = {
+
+ case SubscribeReceiver(receiverActor: ActorRef) =>
+ println("received subscribe from %s".format(receiverActor.toString))
+ receivers = LinkedList(receiverActor) ++ receivers
+ countdownLatch.countDown()
+
+ case UnsubscribeReceiver(receiverActor: ActorRef) =>
+ println("received unsubscribe from %s".format(receiverActor.toString))
+ receivers = receivers.dropWhile(x => x eq receiverActor)
+
+ case textMessage: String =>
+ receivers.foreach(_ ! textMessage)
+
+ }
+ }
+} \ No newline at end of file