diff options
author | Prashant Sharma <prashant.s@imaginea.com> | 2013-02-19 19:42:14 +0530 |
---|---|---|
committer | Prashant Sharma <prashant.s@imaginea.com> | 2013-02-19 19:42:14 +0530 |
commit | 8d44480d840079cb444b5e19511e5027dedd7f77 (patch) | |
tree | 47e7ca55edf5c81e70879d7c8957865fd785317a /examples | |
parent | f7d3e309cb76ef208ab51f23c90c5e891fb333a3 (diff) | |
download | spark-8d44480d840079cb444b5e19511e5027dedd7f77.tar.gz spark-8d44480d840079cb444b5e19511e5027dedd7f77.tar.bz2 spark-8d44480d840079cb444b5e19511e5027dedd7f77.zip |
example for demonstrating ZeroMQ stream
Diffstat (limited to 'examples')
-rw-r--r-- | examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala | 70 |
1 files changed, 70 insertions, 0 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala new file mode 100644 index 0000000000..ab7b67ed4b --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala @@ -0,0 +1,70 @@ +package spark.streaming.examples + +import akka.actor.ActorSystem +import akka.actor.actorRef2Scala +import akka.zeromq._ +import spark.streaming.{ Seconds, StreamingContext } +import spark.streaming.StreamingContext._ +import akka.zeromq.Subscribe + +/** + * A simple publisher for demonstration purposes, repeatedly publishes random Messages + * every one second. + */ +object SimpleZeroMQPublisher { + + def main(args: Array[String]) = { + if (args.length < 2) { + System.err.println("Usage: SimpleZeroMQPublisher <zeroMQUrl> <topic> ") + System.exit(1) + } + + val Seq(url, topic) = args.toSeq + val acs: ActorSystem = ActorSystem() + + val pubSocket = ZeroMQExtension(acs).newSocket(SocketType.Pub, Bind(url)) + val messages: Array[String] = Array("words ", "may ", "count ") + while (true) { + Thread.sleep(1000) + pubSocket ! ZMQMessage(Frame(topic) :: messages.map(x => Frame(x.getBytes)).toList) + } + acs.awaitTermination() + } +} + +/** + * A sample wordcount with ZeroMQStream stream + * + * Usage: WordCountZeroMQ <master> <zeroMQurl> <topic> + * In local mode, <master> should be 'local[n]' with n > 1 + * <zeroMQurl> and <topic> describe where zeroMq publisher is running. + * + * To run this example locally, you may run publisher as + * `$ ./run spark.streaming.examples.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar` + * and then run the example + * `$ ./run spark.streaming.examples.ZeroMQWordCount local[2] tcp://127.0.1.1:1234 foo` + */ +object ZeroMQWordCount { + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println( + "Usage: WordCountZeroMQ <master> <zeroMQurl> <topic>" + + "In local mode, <master> should be 'local[n]' with n > 1") + System.exit(1) + } + val Seq(master, url, topic) = args.toSeq + + // Create the context and set the batch size + val ssc = new StreamingContext(master, "ZeroMQWordCount", Seconds(2)) + + def bytesToStringIterator(x: Seq[Seq[Byte]]) = (x.map(x => new String(x.toArray))).iterator + + //For this stream, a zeroMQ publisher should be running. + val lines = ssc.zeroMQStream(url, Subscribe(topic), bytesToStringIterator) + val words = lines.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) + wordCounts.print() + ssc.start() + } + +}
\ No newline at end of file |