aboutsummaryrefslogtreecommitdiff
path: root/examples/src
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2013-01-22 13:28:29 +0530
committerPrashant Sharma <prashant.s@imaginea.com>2013-01-22 13:28:29 +0530
commitd17065c4b565ec975a46c6d375998ef8ae7a32d5 (patch)
tree8285bbcfb663ce6d3aaa6a16fbf17447adb917ad /examples/src
parent43bfd7bb21e6f8a9d083686a83bcd309a84f937e (diff)
downloadspark-d17065c4b565ec975a46c6d375998ef8ae7a32d5.tar.gz
spark-d17065c4b565ec975a46c6d375998ef8ae7a32d5.tar.bz2
spark-d17065c4b565ec975a46c6d375998ef8ae7a32d5.zip
actor as receiver
Diffstat (limited to 'examples/src')
-rw-r--r--examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala130
1 files changed, 130 insertions, 0 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala
new file mode 100644
index 0000000000..c3d3755953
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala
@@ -0,0 +1,130 @@
+package spark.streaming.examples
+
+import scala.collection.mutable.LinkedList
+import scala.util.Random
+
+import akka.actor.Actor
+import akka.actor.ActorRef
+import akka.actor.Props
+import akka.actor.actorRef2Scala
+
+import spark.streaming.Seconds
+import spark.streaming.StreamingContext
+import spark.streaming.StreamingContext.toPairDStreamFunctions
+import spark.streaming.receivers.Receiver
+import spark.util.AkkaUtils
+
+case class SubscribeReceiver(receiverActor: ActorRef)
+case class UnsubscribeReceiver(receiverActor: ActorRef)
+
+/**
+ * Sends the random content to every receiver subscribed with 1/2
+ * second delay.
+ */
+class FeederActor extends Actor {
+
+ val rand = new Random()
+ var receivers: LinkedList[ActorRef] = new LinkedList[ActorRef]()
+
+ val strings: Array[String] = Array("words ", "may ", "count ")
+
+ def makeMessage(): String = {
+ val x = rand.nextInt(3)
+ strings(x) + strings(2 - x)
+ }
+
+ /*
+ * A thread to generate random messages
+ */
+ new Thread() {
+ override def run() {
+ while (true) {
+ Thread.sleep(500)
+ receivers.foreach(_ ! makeMessage)
+ }
+ }
+ }.start()
+
+ def receive: Receive = {
+
+ case SubscribeReceiver(receiverActor: ActorRef) =>
+ println("received subscribe from %s".format(receiverActor.toString))
+ receivers = LinkedList(receiverActor) ++ receivers
+
+ case UnsubscribeReceiver(receiverActor: ActorRef) =>
+ println("received unsubscribe from %s".format(receiverActor.toString))
+ receivers = receivers.dropWhile(x => x eq receiverActor)
+
+ }
+}
+
+/**
+ * A sample actor as receiver is also simplest. This receiver actor
+ * goes and subscribe to a typical publisher/feeder actor and receives
+ * data, thus it is important to have feeder running before this example
+ * can be run.
+ *
+ * @see [[spark.streaming.examples.FeederActor]]
+ */
+class SampleActorReceiver[T: ClassManifest](urlOfPublisher: String)
+ extends Actor with Receiver {
+
+ lazy private val remotePublisher = context.actorFor(urlOfPublisher)
+
+ override def preStart = remotePublisher ! SubscribeReceiver(context.self)
+
+ def receive = {
+ case msg ⇒ context.parent ! pushBlock(msg.asInstanceOf[T])
+ }
+
+ override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self)
+
+}
+
+/**
+ * A sample word count program demonstrating the use of plugging in
+ * Actor as Receiver
+ */
+object ActorWordCount {
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ System.err.println(
+ "Usage: ActorWordCount <master> <host> <port>" +
+ "In local mode, <master> should be 'local[n]' with n > 1")
+ System.exit(1)
+ }
+
+ val Seq(master, host, port) = args.toSeq
+
+ // Create the context and set the batch size
+ val ssc = new StreamingContext(master, "ActorWordCount",
+ Seconds(10))
+
+ //Start feeder actor on this actor system.
+ val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt)._1
+
+ val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor")
+
+ /*
+ * Following is the use of actorStream to plug in custom actor as receiver
+ *
+ * An important point to note:
+ * Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e type of data received and InputDstream
+ * should be same.
+ *
+ * For example: Both actorStream and SampleActorReceiver are parameterized
+ * to same type to ensure type safety.
+ */
+
+ val lines = ssc.actorStream[String](
+ Props(new SampleActorReceiver[String]("akka://spark@%s:%s/user/FeederActor".format(
+ host, port.toInt))), "SampleReceiver")
+
+ //compute wordcount
+ lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print()
+
+ ssc.start()
+
+ }
+}