aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2013-02-08 16:56:42 +0530
committerPrashant Sharma <prashant.s@imaginea.com>2013-02-19 19:32:52 +0530
commitf7d3e309cb76ef208ab51f23c90c5e891fb333a3 (patch)
treefa7ec4379be06e3a95c681a58eafd4bcba493c97 /streaming
parent8b9c673fce1c733c7fcd8b978e84f943be9e9e35 (diff)
downloadspark-f7d3e309cb76ef208ab51f23c90c5e891fb333a3.tar.gz
spark-f7d3e309cb76ef208ab51f23c90c5e891fb333a3.tar.bz2
spark-f7d3e309cb76ef208ab51f23c90c5e891fb333a3.zip
ZeroMQ stream as receiver
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala22
-rw-r--r--streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala33
2 files changed, 55 insertions, 0 deletions
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index a9684c5772..8c772aec6e 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -2,12 +2,14 @@ package spark.streaming
import akka.actor.Props
import akka.actor.SupervisorStrategy
+import akka.zeromq.Subscribe
import spark.streaming.dstream._
import spark.{RDD, Logging, SparkEnv, SparkContext}
import spark.streaming.receivers.ActorReceiver
import spark.streaming.receivers.ReceiverSupervisorStrategy
+import spark.streaming.receivers.ZeroMQReceiver
import spark.storage.StorageLevel
import spark.util.MetadataCleaner
import spark.streaming.receivers.ActorReceiver
@@ -175,6 +177,26 @@ class StreamingContext private (
}
/**
+ * ZeroMQ stream receiver
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param zeroMQ topic to subscribe to
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+ * of byte thus it needs the converter(which might be deserializer of bytes)
+ * to translate from sequence of sequence of bytes, where sequence refer to a frame
+ * and sub sequence refer to its payload.
+ * @param storageLevel RDD storage level. Defaults to memory-only.
+ */
+ def zeroMQStream[T: ClassManifest](publisherUrl:String,
+ subscribe: Subscribe,
+ bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T],
+ storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
+ supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy): DStream[T] = {
+
+ actorStream(Props(new ZeroMQReceiver(publisherUrl,subscribe,bytesToObjects)),
+ "ZeroMQReceiver", storageLevel, supervisorStrategy)
+ }
+
+ /**
* Create an input stream that pulls messages form a Kafka Broker.
* @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
* @param groupId The group id for this consumer.
diff --git a/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala b/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala
new file mode 100644
index 0000000000..5533c3cf1e
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala
@@ -0,0 +1,33 @@
+package spark.streaming.receivers
+
+import akka.actor.Actor
+import akka.zeromq._
+
+import spark.Logging
+
+/**
+ * A receiver to subscribe to ZeroMQ stream.
+ */
+private[streaming] class ZeroMQReceiver[T: ClassManifest](publisherUrl: String,
+ subscribe: Subscribe,
+ bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T])
+ extends Actor with Receiver with Logging {
+
+ override def preStart() = context.system.newSocket(SocketType.Sub, Listener(self),
+ Connect(publisherUrl), subscribe)
+
+ def receive: Receive = {
+
+ case Connecting ⇒ logInfo("connecting ...")
+
+ case m: ZMQMessage ⇒
+ logDebug("Received message for:" + m.firstFrameAsString)
+
+ //We ignore first frame for processing as it is the topic
+ val bytes = m.frames.tail.map(_.payload)
+ pushBlock(bytesToObjects(bytes))
+
+ case Closed ⇒ logInfo("received closed ")
+
+ }
+}