diff options
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/StreamingContext.scala | 25 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala | 33 |
2 files changed, 56 insertions, 2 deletions
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index d76ccfca4f..d0430b3f3e 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -2,12 +2,14 @@ package spark.streaming import akka.actor.Props import akka.actor.SupervisorStrategy +import akka.zeromq.Subscribe import spark.streaming.dstream._ import spark.{RDD, Logging, SparkEnv, SparkContext} import spark.streaming.receivers.ActorReceiver import spark.streaming.receivers.ReceiverSupervisorStrategy +import spark.streaming.receivers.ZeroMQReceiver import spark.storage.StorageLevel import spark.util.MetadataCleaner import spark.streaming.receivers.ActorReceiver @@ -161,7 +163,7 @@ class StreamingContext private ( * @param props Props object defining creation of the actor * @param name Name of the actor * @param storageLevel RDD storage level. Defaults to memory-only. - * + * * @note An important point to note: * Since Actor may exist outside the spark framework, It is thus user's responsibility * to ensure the type safety, i.e parametrized type of data received and actorStream @@ -175,6 +177,26 @@ class StreamingContext private ( } /** + * Create an input stream that receives messages pushed by a zeromq publisher. + * @param publisherUrl Url of remote zeromq publisher + * @param zeroMQ topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence + * of byte thus it needs the converter(which might be deserializer of bytes) + * to translate from sequence of sequence of bytes, where sequence refer to a frame + * and sub sequence refer to its payload. + * @param storageLevel RDD storage level. Defaults to memory-only. + */ + def zeroMQStream[T: ClassManifest](publisherUrl:String, + subscribe: Subscribe, + bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T], + storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2, + supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy): DStream[T] = { + + actorStream(Props(new ZeroMQReceiver(publisherUrl,subscribe,bytesToObjects)), + "ZeroMQReceiver", storageLevel, supervisorStrategy) + } + + /** * Create an input stream that pulls messages form a Kafka Broker. * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). * @param groupId The group id for this consumer. @@ -478,4 +500,3 @@ object StreamingContext { new Path(sscCheckpointDir, UUID.randomUUID.toString).toString } } - diff --git a/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala b/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala new file mode 100644 index 0000000000..5533c3cf1e --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala @@ -0,0 +1,33 @@ +package spark.streaming.receivers + +import akka.actor.Actor +import akka.zeromq._ + +import spark.Logging + +/** + * A receiver to subscribe to ZeroMQ stream. + */ +private[streaming] class ZeroMQReceiver[T: ClassManifest](publisherUrl: String, + subscribe: Subscribe, + bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T]) + extends Actor with Receiver with Logging { + + override def preStart() = context.system.newSocket(SocketType.Sub, Listener(self), + Connect(publisherUrl), subscribe) + + def receive: Receive = { + + case Connecting ⇒ logInfo("connecting ...") + + case m: ZMQMessage ⇒ + logDebug("Received message for:" + m.firstFrameAsString) + + //We ignore first frame for processing as it is the topic + val bytes = m.frames.tail.map(_.payload) + pushBlock(bytesToObjects(bytes)) + + case Closed ⇒ logInfo("received closed ") + + } +} |