diff options
Diffstat (limited to 'external')
4 files changed, 26 insertions, 20 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala index a2cd49c573..c2d9dcbfaa 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala @@ -37,7 +37,8 @@ import org.apache.spark.streaming.dstream._ /** * Input stream that pulls messages from a Kafka Broker. * - * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html + * @param kafkaParams Map of kafka configuration paramaters. + * See: http://kafka.apache.org/configuration.html * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread. * @param storageLevel RDD storage level. @@ -134,12 +135,15 @@ class KafkaReceiver[ } } - // It is our responsibility to delete the consumer group when specifying autooffset.reset. This is because - // Kafka 0.7.2 only honors this param when the group is not in zookeeper. + // It is our responsibility to delete the consumer group when specifying autooffset.reset. This + // is because Kafka 0.7.2 only honors this param when the group is not in zookeeper. // - // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied from Kafkas' - // ConsoleConsumer. See code related to 'autooffset.reset' when it is set to 'smallest'/'largest': + // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied + // from Kafkas' ConsoleConsumer. See code related to 'autooffset.reset' when it is set to + // 'smallest'/'largest': + // scalastyle:off // https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala + // scalastyle:on private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) { try { val dir = "/consumers/" + groupId diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index 15a2daa008..5472d0cd04 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -113,7 +113,8 @@ object KafkaUtils { ): JavaPairDStream[String, String] = { implicit val cmt: ClassTag[String] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] - createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) + createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), + storageLevel) } /** diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala index 960c6a389e..6acba25f44 100644 --- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala @@ -34,8 +34,8 @@ private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String, bytesToObjects: Seq[ByteString] => Iterator[T]) extends Actor with Receiver with Logging { - override def preStart() = ZeroMQExtension(context.system).newSocket(SocketType.Sub, Listener(self), - Connect(publisherUrl), subscribe) + override def preStart() = ZeroMQExtension(context.system) + .newSocket(SocketType.Sub, Listener(self), Connect(publisherUrl), subscribe) def receive: Receive = { diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala index b47d786986..c989ec0f27 100644 --- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala @@ -59,10 +59,10 @@ object ZeroMQUtils { * @param jssc JavaStreamingContext object * @param publisherUrl Url of remote ZeroMQ publisher * @param subscribe Topic to subscribe to - * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence - * of byte thus it needs the converter(which might be deserializer of bytes) - * to translate from sequence of sequence of bytes, where sequence refer to a frame - * and sub sequence refer to its payload. + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each + * frame has sequence of byte thus it needs the converter(which might be + * deserializer of bytes) to translate from sequence of sequence of bytes, + * where sequence refer to a frame and sub sequence refer to its payload. * @param storageLevel Storage level to use for storing the received objects */ def createStream[T]( @@ -84,10 +84,10 @@ object ZeroMQUtils { * @param jssc JavaStreamingContext object * @param publisherUrl Url of remote zeromq publisher * @param subscribe Topic to subscribe to - * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence - * of byte thus it needs the converter(which might be deserializer of bytes) - * to translate from sequence of sequence of bytes, where sequence refer to a frame - * and sub sequence refer to its payload. + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each + * frame has sequence of byte thus it needs the converter(which might be + * deserializer of bytes) to translate from sequence of sequence of bytes, + * where sequence refer to a frame and sub sequence refer to its payload. * @param storageLevel RDD storage level. */ def createStream[T]( @@ -108,10 +108,11 @@ object ZeroMQUtils { * @param jssc JavaStreamingContext object * @param publisherUrl Url of remote zeromq publisher * @param subscribe Topic to subscribe to - * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence - * of byte thus it needs the converter(which might be deserializer of bytes) - * to translate from sequence of sequence of bytes, where sequence refer to a frame - * and sub sequence refer to its payload. + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each + * frame has sequence of byte thus it needs the converter(which might + * be deserializer of bytes) to translate from sequence of sequence of + * bytes, where sequence refer to a frame and sub sequence refer to its + * payload. */ def createStream[T]( jssc: JavaStreamingContext, |