diff options
-rw-r--r-- | docs/zeroMQ-intro.md | 59 | ||||
-rw-r--r-- | project/SparkBuild.scala | 1 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/StreamingContext.scala | 22 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala | 33 |
4 files changed, 115 insertions, 0 deletions
diff --git a/docs/zeroMQ-intro.md b/docs/zeroMQ-intro.md new file mode 100644 index 0000000000..0365bc08fd --- /dev/null +++ b/docs/zeroMQ-intro.md @@ -0,0 +1,59 @@ +--- +layout: global +title: ZeroMQ Stream setup guide +--- + +## Install ZeroMQ (using JNA) + +To work with zeroMQ, some native libraries have to be installed. + +* Install zeroMQ (release 2.1) core libraries. [ZeroMQ Install guide](http://www.zeromq.org/intro:get-the-software) + + Typically if you are using ubuntu 12.04, you can do: + + `$ sudo apt-get install libzmq1` + + __To work with akka-zeromq, zmq 2.1 version is supported via [JNA](https://github.com/twall/jna). Incase you want to switch to zeromq 3.0, please install [JZMQ](http://www.zeromq.org/bindings:java) which uses [JNI](http://docs.oracle.com/javase/6/docs/technotes/guides/jni/) and drop in jzmq jar__ + +## Sample scala code + +A publisher is an entity assumed to be outside the spark ecosystem. A sample zeroMQ publisher is provided to try out the sample spark ZeroMQ application. + +1. Start the sample publisher. + +{% highlight scala %} + + + val acs: ActorSystem = ActorSystem() + + val pubSocket = ZeroMQExtension(acs).newSocket(SocketType.Pub, Bind(url)) + + pubSocket ! ZMQMessage(Seq(Frame("topic"), Frame("My message".getBytes))) + + + +{% endhighlight %} + +A typical zeromq url looks like `tcp://<ip>:<port>` + +It does nothing more than publishing the message on the specified topic and url. + +2. Start the spark application by plugging the zeroMQ stream receiver. + +{% highlight scala %} + + val lines = ssc.zeroMQStream(url, Subscribe(topic), bytesToObjectsIterator) + +{% endhighlight %} + +bytesToObjectsIterator is going to be a function for decoding the Frame data. + +_For example: For decoding into strings using default charset:_ + + +{% highlight scala %} + + + def bytesToStringIterator(x: Seq[Seq[Byte]]) = (x.map(x => new String(x.toArray))).iterator + +{% endhighlight %} diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index c6d3cc8b15..5fe85a28c3 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -134,6 +134,7 @@ object SparkBuild extends Build { "com.typesafe.akka" % "akka-actor" % "2.0.3", "com.typesafe.akka" % "akka-remote" % "2.0.3", "com.typesafe.akka" % "akka-slf4j" % "2.0.3", + "com.typesafe.akka" % "akka-zeromq" % "2.0.3", "it.unimi.dsi" % "fastutil" % "6.4.4", "colt" % "colt" % "1.2.0", "cc.spray" % "spray-can" % "1.0-M2.1", diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index a9684c5772..8c772aec6e 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -2,12 +2,14 @@ package spark.streaming import akka.actor.Props import akka.actor.SupervisorStrategy +import akka.zeromq.Subscribe import spark.streaming.dstream._ import spark.{RDD, Logging, SparkEnv, SparkContext} import spark.streaming.receivers.ActorReceiver import spark.streaming.receivers.ReceiverSupervisorStrategy +import spark.streaming.receivers.ZeroMQReceiver import spark.storage.StorageLevel import spark.util.MetadataCleaner import spark.streaming.receivers.ActorReceiver @@ -175,6 +177,26 @@ class StreamingContext private ( } /** + * ZeroMQ stream receiver + * @param publisherUrl Url of remote zeromq publisher + * @param zeroMQ topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence + * of byte thus it needs the converter(which might be deserializer of bytes) + * to translate from sequence of sequence of bytes, where sequence refer to a frame + * and sub sequence refer to its payload. + * @param storageLevel RDD storage level. Defaults to memory-only. + */ + def zeroMQStream[T: ClassManifest](publisherUrl:String, + subscribe: Subscribe, + bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T], + storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2, + supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy): DStream[T] = { + + actorStream(Props(new ZeroMQReceiver(publisherUrl,subscribe,bytesToObjects)), + "ZeroMQReceiver", storageLevel, supervisorStrategy) + } + + /** * Create an input stream that pulls messages form a Kafka Broker. * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). * @param groupId The group id for this consumer. diff --git a/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala b/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala new file mode 100644 index 0000000000..5533c3cf1e --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala @@ -0,0 +1,33 @@ +package spark.streaming.receivers + +import akka.actor.Actor +import akka.zeromq._ + +import spark.Logging + +/** + * A receiver to subscribe to ZeroMQ stream. + */ +private[streaming] class ZeroMQReceiver[T: ClassManifest](publisherUrl: String, + subscribe: Subscribe, + bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T]) + extends Actor with Receiver with Logging { + + override def preStart() = context.system.newSocket(SocketType.Sub, Listener(self), + Connect(publisherUrl), subscribe) + + def receive: Receive = { + + case Connecting ⇒ logInfo("connecting ...") + + case m: ZMQMessage ⇒ + logDebug("Received message for:" + m.firstFrameAsString) + + //We ignore first frame for processing as it is the topic + val bytes = m.frames.tail.map(_.payload) + pushBlock(bytesToObjects(bytes)) + + case Closed ⇒ logInfo("received closed ") + + } +} |