aboutsummaryrefslogtreecommitdiff
path: root/examples/src
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2013-02-08 14:34:07 +0530
committerPrashant Sharma <prashant.s@imaginea.com>2013-02-08 14:34:07 +0530
commit291dd47c7f702f1229f82b111126f5f64b29d0c6 (patch)
tree533b5c3e6db146e1c18e0e93d7aa5f285070e508 /examples/src
parent4496bf197bb1137cc1224d8e5cb5f55625dd5851 (diff)
downloadspark-291dd47c7f702f1229f82b111126f5f64b29d0c6.tar.gz
spark-291dd47c7f702f1229f82b111126f5f64b29d0c6.tar.bz2
spark-291dd47c7f702f1229f82b111126f5f64b29d0c6.zip
Taking FeederActor out as seperate program
Diffstat (limited to 'examples/src')
-rw-r--r--examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala66
1 files changed, 44 insertions, 22 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala
index 0ffa45a4c6..71b4e5bf1a 100644
--- a/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala
@@ -49,25 +49,24 @@ class FeederActor extends Actor {
case SubscribeReceiver(receiverActor: ActorRef) =>
println("received subscribe from %s".format(receiverActor.toString))
- receivers = LinkedList(receiverActor) ++ receivers
+ receivers = LinkedList(receiverActor) ++ receivers
case UnsubscribeReceiver(receiverActor: ActorRef) =>
println("received unsubscribe from %s".format(receiverActor.toString))
- receivers = receivers.dropWhile(x => x eq receiverActor)
+ receivers = receivers.dropWhile(x => x eq receiverActor)
}
}
/**
- * A sample actor as receiver is also simplest. This receiver actor
+ * A sample actor as receiver, is also simplest. This receiver actor
* goes and subscribe to a typical publisher/feeder actor and receives
- * data, thus it is important to have feeder running before this example
- * can be run.
+ * data.
*
* @see [[spark.streaming.examples.FeederActor]]
*/
class SampleActorReceiver[T: ClassManifest](urlOfPublisher: String)
- extends Actor with Receiver {
+extends Actor with Receiver {
lazy private val remotePublisher = context.actorFor(urlOfPublisher)
@@ -82,12 +81,41 @@ class SampleActorReceiver[T: ClassManifest](urlOfPublisher: String)
}
/**
+ * A sample feeder actor
+ *
+ * Usage: FeederActor <hostname> <port>
+ * <hostname> and <port> describe the AkkaSystem that Spark Sample feeder would start on.
+ */
+object FeederActor {
+
+ def main(args: Array[String]) {
+ if(args.length < 2){
+ System.err.println(
+ "Usage: FeederActor <hostname> <port>\n"
+ )
+ System.exit(1)
+ }
+ val Seq(host, port) = args.toSeq
+
+
+ val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt)._1
+ val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor")
+
+ println("Feeder started as:" + feeder)
+
+ actorSystem.awaitTermination();
+ }
+}
+
+/**
* A sample word count program demonstrating the use of plugging in
* Actor as Receiver
* Usage: ActorWordCount <master> <hostname> <port>
* <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
- * <hostname> and <port> describe the AkkaSystem that Spark Sample feeder would work on.
- *
+ * <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on.
+ *
+ * To run this example locally, you may run Feeder Actor as
+ * `$ ./run spark.streaming.examples.FeederActor 127.0.1.1 9999`
* and then run the example
* `$ ./run spark.streaming.examples.ActorWordCount local[2] 127.0.1.1 9999`
*/
@@ -96,7 +124,7 @@ object ActorWordCount {
if (args.length < 3) {
System.err.println(
"Usage: ActorWordCount <master> <hostname> <port>" +
- "In local mode, <master> should be 'local[n]' with n > 1")
+ "In local mode, <master> should be 'local[n]' with n > 1")
System.exit(1)
}
@@ -104,21 +132,16 @@ object ActorWordCount {
// Create the context and set the batch size
val ssc = new StreamingContext(master, "ActorWordCount",
- Seconds(10))
+ Seconds(10))
- //Start feeder actor on this actor system.
- val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt)._1
-
- val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor")
-
- /*
+ /*
* Following is the use of actorStream to plug in custom actor as receiver
- *
+ *
* An important point to note:
- * Since Actor may exist outside the spark framework, It is thus user's responsibility
- * to ensure the type safety, i.e type of data received and InputDstream
+ * Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e type of data received and InputDstream
* should be same.
- *
+ *
* For example: Both actorStream and SampleActorReceiver are parameterized
* to same type to ensure type safety.
*/
@@ -127,10 +150,9 @@ object ActorWordCount {
Props(new SampleActorReceiver[String]("akka://spark@%s:%s/user/FeederActor".format(
host, port.toInt))), "SampleReceiver")
- //compute wordcount
+ //compute wordcount
lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print()
ssc.start()
-
}
}