diff options
author | Prashant Sharma <prashant.s@imaginea.com> | 2013-02-19 19:42:14 +0530 |
---|---|---|
committer | Prashant Sharma <prashant.s@imaginea.com> | 2013-02-19 19:42:14 +0530 |
commit | 8d44480d840079cb444b5e19511e5027dedd7f77 (patch) | |
tree | 47e7ca55edf5c81e70879d7c8957865fd785317a /streaming/src | |
parent | f7d3e309cb76ef208ab51f23c90c5e891fb333a3 (diff) | |
download | spark-8d44480d840079cb444b5e19511e5027dedd7f77.tar.gz spark-8d44480d840079cb444b5e19511e5027dedd7f77.tar.bz2 spark-8d44480d840079cb444b5e19511e5027dedd7f77.zip |
example for demonstrating ZeroMQ stream
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/StreamingContext.scala | 15 |
1 files changed, 7 insertions, 8 deletions
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 8c772aec6e..f15e6bd23d 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -163,7 +163,7 @@ class StreamingContext private ( * @param props Props object defining creation of the actor * @param name Name of the actor * @param storageLevel RDD storage level. Defaults to memory-only. - * + * * @note An important point to note: * Since Actor may exist outside the spark framework, It is thus user's responsibility * to ensure the type safety, i.e parametrized type of data received and actorStream @@ -181,9 +181,9 @@ class StreamingContext private ( * @param publisherUrl Url of remote zeromq publisher * @param zeroMQ topic to subscribe to * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence - * of byte thus it needs the converter(which might be deserializer of bytes) - * to translate from sequence of sequence of bytes, where sequence refer to a frame - * and sub sequence refer to its payload. + * of byte thus it needs the converter(which might be deserializer of bytes) + * to translate from sequence of sequence of bytes, where sequence refer to a frame + * and sub sequence refer to its payload. * @param storageLevel RDD storage level. Defaults to memory-only. */ def zeroMQStream[T: ClassManifest](publisherUrl:String, @@ -191,11 +191,11 @@ class StreamingContext private ( bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T], storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2, supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy): DStream[T] = { - - actorStream(Props(new ZeroMQReceiver(publisherUrl,subscribe,bytesToObjects)), + + actorStream(Props(new ZeroMQReceiver(publisherUrl,subscribe,bytesToObjects)), "ZeroMQReceiver", storageLevel, supervisorStrategy) } - + /** * Create an input stream that pulls messages form a Kafka Broker. * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). @@ -500,4 +500,3 @@ object StreamingContext { new Path(sscCheckpointDir, UUID.randomUUID.toString).toString } } - |