diff options
author | Patrick Wendell <pwendell@gmail.com> | 2014-02-09 10:09:19 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-02-09 10:09:19 -0800 |
commit | b69f8b2a01669851c656739b6886efe4cddef31a (patch) | |
tree | 9ed0b8a6883f0ae64ce1d3f5c206306731a5f93f /external/kafka | |
parent | b6dba10ae59215b5c4e40f7632563f592f138c87 (diff) | |
download | spark-b69f8b2a01669851c656739b6886efe4cddef31a.tar.gz spark-b69f8b2a01669851c656739b6886efe4cddef31a.tar.bz2 spark-b69f8b2a01669851c656739b6886efe4cddef31a.zip |
Merge pull request #557 from ScrapCodes/style. Closes #557.
SPARK-1058, Fix Style Errors and Add Scala Style to Spark Build.
Author: Patrick Wendell <pwendell@gmail.com>
Author: Prashant Sharma <scrapcodes@gmail.com>
== Merge branch commits ==
commit 1a8bd1c059b842cb95cc246aaea74a79fec684f4
Author: Prashant Sharma <scrapcodes@gmail.com>
Date: Sun Feb 9 17:39:07 2014 +0530
scala style fixes
commit f91709887a8e0b608c5c2b282db19b8a44d53a43
Author: Patrick Wendell <pwendell@gmail.com>
Date: Fri Jan 24 11:22:53 2014 -0800
Adding scalastyle snapshot
Diffstat (limited to 'external/kafka')
-rw-r--r-- | external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala | 14 | ||||
-rw-r--r-- | external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala | 3 |
2 files changed, 11 insertions, 6 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala index a2cd49c573..c2d9dcbfaa 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala @@ -37,7 +37,8 @@ import org.apache.spark.streaming.dstream._ /** * Input stream that pulls messages from a Kafka Broker. * - * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html + * @param kafkaParams Map of kafka configuration paramaters. + * See: http://kafka.apache.org/configuration.html * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread. * @param storageLevel RDD storage level. @@ -134,12 +135,15 @@ class KafkaReceiver[ } } - // It is our responsibility to delete the consumer group when specifying autooffset.reset. This is because - // Kafka 0.7.2 only honors this param when the group is not in zookeeper. + // It is our responsibility to delete the consumer group when specifying autooffset.reset. This + // is because Kafka 0.7.2 only honors this param when the group is not in zookeeper. // - // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied from Kafkas' - // ConsoleConsumer. See code related to 'autooffset.reset' when it is set to 'smallest'/'largest': + // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied + // from Kafkas' ConsoleConsumer. See code related to 'autooffset.reset' when it is set to + // 'smallest'/'largest': + // scalastyle:off // https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala + // scalastyle:on private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) { try { val dir = "/consumers/" + groupId diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index 15a2daa008..5472d0cd04 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -113,7 +113,8 @@ object KafkaUtils { ): JavaPairDStream[String, String] = { implicit val cmt: ClassTag[String] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] - createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) + createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), + storageLevel) } /** |