From 8d44480d840079cb444b5e19511e5027dedd7f77 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Tue, 19 Feb 2013 19:42:14 +0530 Subject: example for demonstrating ZeroMQ stream --- .../src/main/scala/spark/streaming/StreamingContext.scala | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 8c772aec6e..f15e6bd23d 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -163,7 +163,7 @@ class StreamingContext private ( * @param props Props object defining creation of the actor * @param name Name of the actor * @param storageLevel RDD storage level. Defaults to memory-only. - * + * * @note An important point to note: * Since Actor may exist outside the spark framework, It is thus user's responsibility * to ensure the type safety, i.e parametrized type of data received and actorStream @@ -181,9 +181,9 @@ class StreamingContext private ( * @param publisherUrl Url of remote zeromq publisher * @param zeroMQ topic to subscribe to * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence - * of byte thus it needs the converter(which might be deserializer of bytes) - * to translate from sequence of sequence of bytes, where sequence refer to a frame - * and sub sequence refer to its payload. + * of byte thus it needs the converter(which might be deserializer of bytes) + * to translate from sequence of sequence of bytes, where sequence refer to a frame + * and sub sequence refer to its payload. * @param storageLevel RDD storage level. Defaults to memory-only. */ def zeroMQStream[T: ClassManifest](publisherUrl:String, @@ -191,11 +191,11 @@ class StreamingContext private ( bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T], storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2, supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy): DStream[T] = { - - actorStream(Props(new ZeroMQReceiver(publisherUrl,subscribe,bytesToObjects)), + + actorStream(Props(new ZeroMQReceiver(publisherUrl,subscribe,bytesToObjects)), "ZeroMQReceiver", storageLevel, supervisorStrategy) } - + /** * Create an input stream that pulls messages form a Kafka Broker. * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). @@ -500,4 +500,3 @@ object StreamingContext { new Path(sscCheckpointDir, UUID.randomUUID.toString).toString } } - -- cgit v1.2.3