diff options
author | Prashant Sharma <prashant.s@imaginea.com> | 2013-02-08 14:34:07 +0530 |
---|---|---|
committer | Prashant Sharma <prashant.s@imaginea.com> | 2013-02-08 14:34:07 +0530 |
commit | 291dd47c7f702f1229f82b111126f5f64b29d0c6 (patch) | |
tree | 533b5c3e6db146e1c18e0e93d7aa5f285070e508 | |
parent | 4496bf197bb1137cc1224d8e5cb5f55625dd5851 (diff) | |
download | spark-291dd47c7f702f1229f82b111126f5f64b29d0c6.tar.gz spark-291dd47c7f702f1229f82b111126f5f64b29d0c6.tar.bz2 spark-291dd47c7f702f1229f82b111126f5f64b29d0c6.zip |
Taking FeederActor out as seperate program
-rw-r--r-- | examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala | 66 |
1 files changed, 44 insertions, 22 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala index 0ffa45a4c6..71b4e5bf1a 100644 --- a/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala +++ b/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala @@ -49,25 +49,24 @@ class FeederActor extends Actor { case SubscribeReceiver(receiverActor: ActorRef) => println("received subscribe from %s".format(receiverActor.toString)) - receivers = LinkedList(receiverActor) ++ receivers + receivers = LinkedList(receiverActor) ++ receivers case UnsubscribeReceiver(receiverActor: ActorRef) => println("received unsubscribe from %s".format(receiverActor.toString)) - receivers = receivers.dropWhile(x => x eq receiverActor) + receivers = receivers.dropWhile(x => x eq receiverActor) } } /** - * A sample actor as receiver is also simplest. This receiver actor + * A sample actor as receiver, is also simplest. This receiver actor * goes and subscribe to a typical publisher/feeder actor and receives - * data, thus it is important to have feeder running before this example - * can be run. + * data. * * @see [[spark.streaming.examples.FeederActor]] */ class SampleActorReceiver[T: ClassManifest](urlOfPublisher: String) - extends Actor with Receiver { +extends Actor with Receiver { lazy private val remotePublisher = context.actorFor(urlOfPublisher) @@ -82,12 +81,41 @@ class SampleActorReceiver[T: ClassManifest](urlOfPublisher: String) } /** + * A sample feeder actor + * + * Usage: FeederActor <hostname> <port> + * <hostname> and <port> describe the AkkaSystem that Spark Sample feeder would start on. + */ +object FeederActor { + + def main(args: Array[String]) { + if(args.length < 2){ + System.err.println( + "Usage: FeederActor <hostname> <port>\n" + ) + System.exit(1) + } + val Seq(host, port) = args.toSeq + + + val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt)._1 + val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor") + + println("Feeder started as:" + feeder) + + actorSystem.awaitTermination(); + } +} + +/** * A sample word count program demonstrating the use of plugging in * Actor as Receiver * Usage: ActorWordCount <master> <hostname> <port> * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1. - * <hostname> and <port> describe the AkkaSystem that Spark Sample feeder would work on. - * + * <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on. + * + * To run this example locally, you may run Feeder Actor as + * `$ ./run spark.streaming.examples.FeederActor 127.0.1.1 9999` * and then run the example * `$ ./run spark.streaming.examples.ActorWordCount local[2] 127.0.1.1 9999` */ @@ -96,7 +124,7 @@ object ActorWordCount { if (args.length < 3) { System.err.println( "Usage: ActorWordCount <master> <hostname> <port>" + - "In local mode, <master> should be 'local[n]' with n > 1") + "In local mode, <master> should be 'local[n]' with n > 1") System.exit(1) } @@ -104,21 +132,16 @@ object ActorWordCount { // Create the context and set the batch size val ssc = new StreamingContext(master, "ActorWordCount", - Seconds(10)) + Seconds(10)) - //Start feeder actor on this actor system. - val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt)._1 - - val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor") - - /* + /* * Following is the use of actorStream to plug in custom actor as receiver - * + * * An important point to note: - * Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e type of data received and InputDstream + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e type of data received and InputDstream * should be same. - * + * * For example: Both actorStream and SampleActorReceiver are parameterized * to same type to ensure type safety. */ @@ -127,10 +150,9 @@ object ActorWordCount { Props(new SampleActorReceiver[String]("akka://spark@%s:%s/user/FeederActor".format( host, port.toInt))), "SampleReceiver") - //compute wordcount + //compute wordcount lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print() ssc.start() - } } |