aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2013-02-19 19:42:14 +0530
committerPrashant Sharma <prashant.s@imaginea.com>2013-02-19 19:42:14 +0530
commit8d44480d840079cb444b5e19511e5027dedd7f77 (patch)
tree47e7ca55edf5c81e70879d7c8957865fd785317a
parentf7d3e309cb76ef208ab51f23c90c5e891fb333a3 (diff)
downloadspark-8d44480d840079cb444b5e19511e5027dedd7f77.tar.gz
spark-8d44480d840079cb444b5e19511e5027dedd7f77.tar.bz2
spark-8d44480d840079cb444b5e19511e5027dedd7f77.zip
example for demonstrating ZeroMQ stream
-rw-r--r--examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala70
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala15
2 files changed, 77 insertions, 8 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala
new file mode 100644
index 0000000000..ab7b67ed4b
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala
@@ -0,0 +1,70 @@
+package spark.streaming.examples
+
+import akka.actor.ActorSystem
+import akka.actor.actorRef2Scala
+import akka.zeromq._
+import spark.streaming.{ Seconds, StreamingContext }
+import spark.streaming.StreamingContext._
+import akka.zeromq.Subscribe
+
+/**
+ * A simple publisher for demonstration purposes, repeatedly publishes random Messages
+ * every one second.
+ */
+object SimpleZeroMQPublisher {
+
+ def main(args: Array[String]) = {
+ if (args.length < 2) {
+ System.err.println("Usage: SimpleZeroMQPublisher <zeroMQUrl> <topic> ")
+ System.exit(1)
+ }
+
+ val Seq(url, topic) = args.toSeq
+ val acs: ActorSystem = ActorSystem()
+
+ val pubSocket = ZeroMQExtension(acs).newSocket(SocketType.Pub, Bind(url))
+ val messages: Array[String] = Array("words ", "may ", "count ")
+ while (true) {
+ Thread.sleep(1000)
+ pubSocket ! ZMQMessage(Frame(topic) :: messages.map(x => Frame(x.getBytes)).toList)
+ }
+ acs.awaitTermination()
+ }
+}
+
+/**
+ * A sample wordcount with ZeroMQStream stream
+ *
+ * Usage: WordCountZeroMQ <master> <zeroMQurl> <topic>
+ * In local mode, <master> should be 'local[n]' with n > 1
+ * <zeroMQurl> and <topic> describe where zeroMq publisher is running.
+ *
+ * To run this example locally, you may run publisher as
+ * `$ ./run spark.streaming.examples.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar`
+ * and then run the example
+ * `$ ./run spark.streaming.examples.ZeroMQWordCount local[2] tcp://127.0.1.1:1234 foo`
+ */
+object ZeroMQWordCount {
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ System.err.println(
+ "Usage: WordCountZeroMQ <master> <zeroMQurl> <topic>" +
+ "In local mode, <master> should be 'local[n]' with n > 1")
+ System.exit(1)
+ }
+ val Seq(master, url, topic) = args.toSeq
+
+ // Create the context and set the batch size
+ val ssc = new StreamingContext(master, "ZeroMQWordCount", Seconds(2))
+
+ def bytesToStringIterator(x: Seq[Seq[Byte]]) = (x.map(x => new String(x.toArray))).iterator
+
+ //For this stream, a zeroMQ publisher should be running.
+ val lines = ssc.zeroMQStream(url, Subscribe(topic), bytesToStringIterator)
+ val words = lines.flatMap(_.split(" "))
+ val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+ wordCounts.print()
+ ssc.start()
+ }
+
+} \ No newline at end of file
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 8c772aec6e..f15e6bd23d 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -163,7 +163,7 @@ class StreamingContext private (
* @param props Props object defining creation of the actor
* @param name Name of the actor
* @param storageLevel RDD storage level. Defaults to memory-only.
- *
+ *
* @note An important point to note:
* Since Actor may exist outside the spark framework, It is thus user's responsibility
* to ensure the type safety, i.e parametrized type of data received and actorStream
@@ -181,9 +181,9 @@ class StreamingContext private (
* @param publisherUrl Url of remote zeromq publisher
* @param zeroMQ topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
- * of byte thus it needs the converter(which might be deserializer of bytes)
- * to translate from sequence of sequence of bytes, where sequence refer to a frame
- * and sub sequence refer to its payload.
+ * of byte thus it needs the converter(which might be deserializer of bytes)
+ * to translate from sequence of sequence of bytes, where sequence refer to a frame
+ * and sub sequence refer to its payload.
* @param storageLevel RDD storage level. Defaults to memory-only.
*/
def zeroMQStream[T: ClassManifest](publisherUrl:String,
@@ -191,11 +191,11 @@ class StreamingContext private (
bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T],
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy): DStream[T] = {
-
- actorStream(Props(new ZeroMQReceiver(publisherUrl,subscribe,bytesToObjects)),
+
+ actorStream(Props(new ZeroMQReceiver(publisherUrl,subscribe,bytesToObjects)),
"ZeroMQReceiver", storageLevel, supervisorStrategy)
}
-
+
/**
* Create an input stream that pulls messages form a Kafka Broker.
* @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
@@ -500,4 +500,3 @@ object StreamingContext {
new Path(sscCheckpointDir, UUID.randomUUID.toString).toString
}
}
-