aboutsummaryrefslogtreecommitdiff
path: root/external/zeromq
diff options
context:
space:
mode:
Diffstat (limited to 'external/zeromq')
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala4
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala25
2 files changed, 15 insertions, 14 deletions
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
index 960c6a389e..6acba25f44 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
@@ -34,8 +34,8 @@ private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
bytesToObjects: Seq[ByteString] => Iterator[T])
extends Actor with Receiver with Logging {
- override def preStart() = ZeroMQExtension(context.system).newSocket(SocketType.Sub, Listener(self),
- Connect(publisherUrl), subscribe)
+ override def preStart() = ZeroMQExtension(context.system)
+ .newSocket(SocketType.Sub, Listener(self), Connect(publisherUrl), subscribe)
def receive: Receive = {
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
index b47d786986..c989ec0f27 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
@@ -59,10 +59,10 @@ object ZeroMQUtils {
* @param jssc JavaStreamingContext object
* @param publisherUrl Url of remote ZeroMQ publisher
* @param subscribe Topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
- * of byte thus it needs the converter(which might be deserializer of bytes)
- * to translate from sequence of sequence of bytes, where sequence refer to a frame
- * and sub sequence refer to its payload.
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
+ * frame has sequence of byte thus it needs the converter(which might be
+ * deserializer of bytes) to translate from sequence of sequence of bytes,
+ * where sequence refer to a frame and sub sequence refer to its payload.
* @param storageLevel Storage level to use for storing the received objects
*/
def createStream[T](
@@ -84,10 +84,10 @@ object ZeroMQUtils {
* @param jssc JavaStreamingContext object
* @param publisherUrl Url of remote zeromq publisher
* @param subscribe Topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
- * of byte thus it needs the converter(which might be deserializer of bytes)
- * to translate from sequence of sequence of bytes, where sequence refer to a frame
- * and sub sequence refer to its payload.
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
+ * frame has sequence of byte thus it needs the converter(which might be
+ * deserializer of bytes) to translate from sequence of sequence of bytes,
+ * where sequence refer to a frame and sub sequence refer to its payload.
* @param storageLevel RDD storage level.
*/
def createStream[T](
@@ -108,10 +108,11 @@ object ZeroMQUtils {
* @param jssc JavaStreamingContext object
* @param publisherUrl Url of remote zeromq publisher
* @param subscribe Topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
- * of byte thus it needs the converter(which might be deserializer of bytes)
- * to translate from sequence of sequence of bytes, where sequence refer to a frame
- * and sub sequence refer to its payload.
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
+ * frame has sequence of byte thus it needs the converter(which might
+ * be deserializer of bytes) to translate from sequence of sequence of
+ * bytes, where sequence refer to a frame and sub sequence refer to its
+ * payload.
*/
def createStream[T](
jssc: JavaStreamingContext,