aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-02-22 12:17:17 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-02-22 12:17:17 -0800
commit688e62718fd547ed46a90f7ac57c39df35866e6b (patch)
treed5ba6a0a211c4182d2bad966869181de6b79d966
parent2921fa7d81be201e5d694ab58ade6233f397eef9 (diff)
parentee885611ff14dbf1bcc218e72da2728fe653e213 (diff)
downloadspark-688e62718fd547ed46a90f7ac57c39df35866e6b.tar.gz
spark-688e62718fd547ed46a90f7ac57c39df35866e6b.tar.bz2
spark-688e62718fd547ed46a90f7ac57c39df35866e6b.zip
Merge pull request #479 from ScrapCodes/zeromq-streaming
Zeromq streaming
-rw-r--r--examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala73
-rw-r--r--project/SparkBuild.scala3
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala25
-rw-r--r--streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala33
4 files changed, 131 insertions, 3 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala
new file mode 100644
index 0000000000..5ed9b7cb76
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala
@@ -0,0 +1,73 @@
+package spark.streaming.examples
+
+import akka.actor.ActorSystem
+import akka.actor.actorRef2Scala
+import akka.zeromq._
+import spark.streaming.{ Seconds, StreamingContext }
+import spark.streaming.StreamingContext._
+import akka.zeromq.Subscribe
+
+/**
+ * A simple publisher for demonstration purposes, repeatedly publishes random Messages
+ * every one second.
+ */
+object SimpleZeroMQPublisher {
+
+ def main(args: Array[String]) = {
+ if (args.length < 2) {
+ System.err.println("Usage: SimpleZeroMQPublisher <zeroMQUrl> <topic> ")
+ System.exit(1)
+ }
+
+ val Seq(url, topic) = args.toSeq
+ val acs: ActorSystem = ActorSystem()
+
+ val pubSocket = ZeroMQExtension(acs).newSocket(SocketType.Pub, Bind(url))
+ val messages: Array[String] = Array("words ", "may ", "count ")
+ while (true) {
+ Thread.sleep(1000)
+ pubSocket ! ZMQMessage(Frame(topic) :: messages.map(x => Frame(x.getBytes)).toList)
+ }
+ acs.awaitTermination()
+ }
+}
+
+/**
+ * A sample wordcount with ZeroMQStream stream
+ *
+ * To work with zeroMQ, some native libraries have to be installed.
+ * Install zeroMQ (release 2.1) core libraries. [ZeroMQ Install guide](http://www.zeromq.org/intro:get-the-software)
+ *
+ * Usage: ZeroMQWordCount <master> <zeroMQurl> <topic>
+ * In local mode, <master> should be 'local[n]' with n > 1
+ * <zeroMQurl> and <topic> describe where zeroMq publisher is running.
+ *
+ * To run this example locally, you may run publisher as
+ * `$ ./run spark.streaming.examples.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar`
+ * and run the example as
+ * `$ ./run spark.streaming.examples.ZeroMQWordCount local[2] tcp://127.0.1.1:1234 foo`
+ */
+object ZeroMQWordCount {
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ System.err.println(
+ "Usage: ZeroMQWordCount <master> <zeroMQurl> <topic>" +
+ "In local mode, <master> should be 'local[n]' with n > 1")
+ System.exit(1)
+ }
+ val Seq(master, url, topic) = args.toSeq
+
+ // Create the context and set the batch size
+ val ssc = new StreamingContext(master, "ZeroMQWordCount", Seconds(2))
+
+ def bytesToStringIterator(x: Seq[Seq[Byte]]) = (x.map(x => new String(x.toArray))).iterator
+
+ //For this stream, a zeroMQ publisher should be running.
+ val lines = ssc.zeroMQStream(url, Subscribe(topic), bytesToStringIterator)
+ val words = lines.flatMap(_.split(" "))
+ val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+ wordCounts.print()
+ ssc.start()
+ }
+
+} \ No newline at end of file
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index c6d3cc8b15..7f432b60db 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -164,7 +164,8 @@ object SparkBuild extends Build {
libraryDependencies ++= Seq(
"org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile",
"com.github.sgroschupf" % "zkclient" % "0.1",
- "org.twitter4j" % "twitter4j-stream" % "3.0.3"
+ "org.twitter4j" % "twitter4j-stream" % "3.0.3",
+ "com.typesafe.akka" % "akka-zeromq" % "2.0.3"
)
) ++ assemblySettings ++ extraAssemblySettings
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index d76ccfca4f..d0430b3f3e 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -2,12 +2,14 @@ package spark.streaming
import akka.actor.Props
import akka.actor.SupervisorStrategy
+import akka.zeromq.Subscribe
import spark.streaming.dstream._
import spark.{RDD, Logging, SparkEnv, SparkContext}
import spark.streaming.receivers.ActorReceiver
import spark.streaming.receivers.ReceiverSupervisorStrategy
+import spark.streaming.receivers.ZeroMQReceiver
import spark.storage.StorageLevel
import spark.util.MetadataCleaner
import spark.streaming.receivers.ActorReceiver
@@ -161,7 +163,7 @@ class StreamingContext private (
* @param props Props object defining creation of the actor
* @param name Name of the actor
* @param storageLevel RDD storage level. Defaults to memory-only.
- *
+ *
* @note An important point to note:
* Since Actor may exist outside the spark framework, It is thus user's responsibility
* to ensure the type safety, i.e parametrized type of data received and actorStream
@@ -175,6 +177,26 @@ class StreamingContext private (
}
/**
+ * Create an input stream that receives messages pushed by a zeromq publisher.
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param zeroMQ topic to subscribe to
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+ * of byte thus it needs the converter(which might be deserializer of bytes)
+ * to translate from sequence of sequence of bytes, where sequence refer to a frame
+ * and sub sequence refer to its payload.
+ * @param storageLevel RDD storage level. Defaults to memory-only.
+ */
+ def zeroMQStream[T: ClassManifest](publisherUrl:String,
+ subscribe: Subscribe,
+ bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T],
+ storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
+ supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy): DStream[T] = {
+
+ actorStream(Props(new ZeroMQReceiver(publisherUrl,subscribe,bytesToObjects)),
+ "ZeroMQReceiver", storageLevel, supervisorStrategy)
+ }
+
+ /**
* Create an input stream that pulls messages form a Kafka Broker.
* @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
* @param groupId The group id for this consumer.
@@ -478,4 +500,3 @@ object StreamingContext {
new Path(sscCheckpointDir, UUID.randomUUID.toString).toString
}
}
-
diff --git a/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala b/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala
new file mode 100644
index 0000000000..5533c3cf1e
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala
@@ -0,0 +1,33 @@
+package spark.streaming.receivers
+
+import akka.actor.Actor
+import akka.zeromq._
+
+import spark.Logging
+
+/**
+ * A receiver to subscribe to ZeroMQ stream.
+ */
+private[streaming] class ZeroMQReceiver[T: ClassManifest](publisherUrl: String,
+ subscribe: Subscribe,
+ bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T])
+ extends Actor with Receiver with Logging {
+
+ override def preStart() = context.system.newSocket(SocketType.Sub, Listener(self),
+ Connect(publisherUrl), subscribe)
+
+ def receive: Receive = {
+
+ case Connecting ⇒ logInfo("connecting ...")
+
+ case m: ZMQMessage ⇒
+ logDebug("Received message for:" + m.firstFrameAsString)
+
+ //We ignore first frame for processing as it is the topic
+ val bytes = m.frames.tail.map(_.payload)
+ pushBlock(bytesToObjects(bytes))
+
+ case Closed ⇒ logInfo("received closed ")
+
+ }
+}