aboutsummaryrefslogtreecommitdiff
path: root/external/zeromq
diff options
context:
space:
mode:
Diffstat (limited to 'external/zeromq')
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala8
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala2
2 files changed, 5 insertions, 5 deletions
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
index 769761e3b8..960c6a389e 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
@@ -31,7 +31,7 @@ import org.apache.spark.streaming.receivers._
*/
private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
subscribe: Subscribe,
- bytesToObjects: Seq[ByteString] ⇒ Iterator[T])
+ bytesToObjects: Seq[ByteString] => Iterator[T])
extends Actor with Receiver with Logging {
override def preStart() = ZeroMQExtension(context.system).newSocket(SocketType.Sub, Listener(self),
@@ -39,16 +39,16 @@ private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
def receive: Receive = {
- case Connecting ⇒ logInfo("connecting ...")
+ case Connecting => logInfo("connecting ...")
- case m: ZMQMessage ⇒
+ case m: ZMQMessage =>
logDebug("Received message for:" + m.frame(0))
//We ignore first frame for processing as it is the topic
val bytes = m.frames.tail
pushBlock(bytesToObjects(bytes))
- case Closed ⇒ logInfo("received closed ")
+ case Closed => logInfo("received closed ")
}
}
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
index 7a14b3d2bf..b47d786986 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
@@ -46,7 +46,7 @@ object ZeroMQUtils {
ssc: StreamingContext,
publisherUrl: String,
subscribe: Subscribe,
- bytesToObjects: Seq[ByteString] ⇒ Iterator[T],
+ bytesToObjects: Seq[ByteString] => Iterator[T],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
): DStream[T] = {