aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2013-02-19 19:42:14 +0530
committerPrashant Sharma <prashant.s@imaginea.com>2013-02-19 19:42:14 +0530
commit8d44480d840079cb444b5e19511e5027dedd7f77 (patch)
tree47e7ca55edf5c81e70879d7c8957865fd785317a /streaming
parentf7d3e309cb76ef208ab51f23c90c5e891fb333a3 (diff)
downloadspark-8d44480d840079cb444b5e19511e5027dedd7f77.tar.gz
spark-8d44480d840079cb444b5e19511e5027dedd7f77.tar.bz2
spark-8d44480d840079cb444b5e19511e5027dedd7f77.zip
example for demonstrating ZeroMQ stream
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala15
1 files changed, 7 insertions, 8 deletions
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 8c772aec6e..f15e6bd23d 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -163,7 +163,7 @@ class StreamingContext private (
* @param props Props object defining creation of the actor
* @param name Name of the actor
* @param storageLevel RDD storage level. Defaults to memory-only.
- *
+ *
* @note An important point to note:
* Since Actor may exist outside the spark framework, It is thus user's responsibility
* to ensure the type safety, i.e parametrized type of data received and actorStream
@@ -181,9 +181,9 @@ class StreamingContext private (
* @param publisherUrl Url of remote zeromq publisher
* @param zeroMQ topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
- * of byte thus it needs the converter(which might be deserializer of bytes)
- * to translate from sequence of sequence of bytes, where sequence refer to a frame
- * and sub sequence refer to its payload.
+ * of byte thus it needs the converter(which might be deserializer of bytes)
+ * to translate from sequence of sequence of bytes, where sequence refer to a frame
+ * and sub sequence refer to its payload.
* @param storageLevel RDD storage level. Defaults to memory-only.
*/
def zeroMQStream[T: ClassManifest](publisherUrl:String,
@@ -191,11 +191,11 @@ class StreamingContext private (
bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T],
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy): DStream[T] = {
-
- actorStream(Props(new ZeroMQReceiver(publisherUrl,subscribe,bytesToObjects)),
+
+ actorStream(Props(new ZeroMQReceiver(publisherUrl,subscribe,bytesToObjects)),
"ZeroMQReceiver", storageLevel, supervisorStrategy)
}
-
+
/**
* Create an input stream that pulls messages form a Kafka Broker.
* @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
@@ -500,4 +500,3 @@ object StreamingContext {
new Path(sscCheckpointDir, UUID.randomUUID.toString).toString
}
}
-