diff options
-rw-r--r-- | examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala | 70 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/StreamingContext.scala | 15 |
2 files changed, 77 insertions, 8 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala new file mode 100644 index 0000000000..ab7b67ed4b --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala @@ -0,0 +1,70 @@ +package spark.streaming.examples + +import akka.actor.ActorSystem +import akka.actor.actorRef2Scala +import akka.zeromq._ +import spark.streaming.{ Seconds, StreamingContext } +import spark.streaming.StreamingContext._ +import akka.zeromq.Subscribe + +/** + * A simple publisher for demonstration purposes, repeatedly publishes random Messages + * every one second. + */ +object SimpleZeroMQPublisher { + + def main(args: Array[String]) = { + if (args.length < 2) { + System.err.println("Usage: SimpleZeroMQPublisher <zeroMQUrl> <topic> ") + System.exit(1) + } + + val Seq(url, topic) = args.toSeq + val acs: ActorSystem = ActorSystem() + + val pubSocket = ZeroMQExtension(acs).newSocket(SocketType.Pub, Bind(url)) + val messages: Array[String] = Array("words ", "may ", "count ") + while (true) { + Thread.sleep(1000) + pubSocket ! ZMQMessage(Frame(topic) :: messages.map(x => Frame(x.getBytes)).toList) + } + acs.awaitTermination() + } +} + +/** + * A sample wordcount with ZeroMQStream stream + * + * Usage: WordCountZeroMQ <master> <zeroMQurl> <topic> + * In local mode, <master> should be 'local[n]' with n > 1 + * <zeroMQurl> and <topic> describe where zeroMq publisher is running. + * + * To run this example locally, you may run publisher as + * `$ ./run spark.streaming.examples.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar` + * and then run the example + * `$ ./run spark.streaming.examples.ZeroMQWordCount local[2] tcp://127.0.1.1:1234 foo` + */ +object ZeroMQWordCount { + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println( + "Usage: WordCountZeroMQ <master> <zeroMQurl> <topic>" + + "In local mode, <master> should be 'local[n]' with n > 1") + System.exit(1) + } + val Seq(master, url, topic) = args.toSeq + + // Create the context and set the batch size + val ssc = new StreamingContext(master, "ZeroMQWordCount", Seconds(2)) + + def bytesToStringIterator(x: Seq[Seq[Byte]]) = (x.map(x => new String(x.toArray))).iterator + + //For this stream, a zeroMQ publisher should be running. + val lines = ssc.zeroMQStream(url, Subscribe(topic), bytesToStringIterator) + val words = lines.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) + wordCounts.print() + ssc.start() + } + +}
\ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 8c772aec6e..f15e6bd23d 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -163,7 +163,7 @@ class StreamingContext private ( * @param props Props object defining creation of the actor * @param name Name of the actor * @param storageLevel RDD storage level. Defaults to memory-only. - * + * * @note An important point to note: * Since Actor may exist outside the spark framework, It is thus user's responsibility * to ensure the type safety, i.e parametrized type of data received and actorStream @@ -181,9 +181,9 @@ class StreamingContext private ( * @param publisherUrl Url of remote zeromq publisher * @param zeroMQ topic to subscribe to * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence - * of byte thus it needs the converter(which might be deserializer of bytes) - * to translate from sequence of sequence of bytes, where sequence refer to a frame - * and sub sequence refer to its payload. + * of byte thus it needs the converter(which might be deserializer of bytes) + * to translate from sequence of sequence of bytes, where sequence refer to a frame + * and sub sequence refer to its payload. * @param storageLevel RDD storage level. Defaults to memory-only. */ def zeroMQStream[T: ClassManifest](publisherUrl:String, @@ -191,11 +191,11 @@ class StreamingContext private ( bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T], storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2, supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy): DStream[T] = { - - actorStream(Props(new ZeroMQReceiver(publisherUrl,subscribe,bytesToObjects)), + + actorStream(Props(new ZeroMQReceiver(publisherUrl,subscribe,bytesToObjects)), "ZeroMQReceiver", storageLevel, supervisorStrategy) } - + /** * Create an input stream that pulls messages form a Kafka Broker. * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). @@ -500,4 +500,3 @@ object StreamingContext { new Path(sscCheckpointDir, UUID.randomUUID.toString).toString } } - |