aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
Diffstat (limited to 'external')
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala6
1 files changed, 3 insertions, 3 deletions
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
index c989ec0f27..b254e00714 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
@@ -75,7 +75,7 @@ object ZeroMQUtils {
): JavaDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- val fn = (x: Seq[ByteString]) => bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).toIterator
createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel, supervisorStrategy)
}
@@ -99,7 +99,7 @@ object ZeroMQUtils {
): JavaDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- val fn = (x: Seq[ByteString]) => bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).toIterator
createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel)
}
@@ -122,7 +122,7 @@ object ZeroMQUtils {
): JavaDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- val fn = (x: Seq[ByteString]) => bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).toIterator
createStream[T](jssc.ssc, publisherUrl, subscribe, fn)
}
}