aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-02-09 10:09:19 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-02-09 10:09:19 -0800
commitb69f8b2a01669851c656739b6886efe4cddef31a (patch)
tree9ed0b8a6883f0ae64ce1d3f5c206306731a5f93f /external
parentb6dba10ae59215b5c4e40f7632563f592f138c87 (diff)
downloadspark-b69f8b2a01669851c656739b6886efe4cddef31a.tar.gz
spark-b69f8b2a01669851c656739b6886efe4cddef31a.tar.bz2
spark-b69f8b2a01669851c656739b6886efe4cddef31a.zip
Merge pull request #557 from ScrapCodes/style. Closes #557.
SPARK-1058, Fix Style Errors and Add Scala Style to Spark Build. Author: Patrick Wendell <pwendell@gmail.com> Author: Prashant Sharma <scrapcodes@gmail.com> == Merge branch commits == commit 1a8bd1c059b842cb95cc246aaea74a79fec684f4 Author: Prashant Sharma <scrapcodes@gmail.com> Date: Sun Feb 9 17:39:07 2014 +0530 scala style fixes commit f91709887a8e0b608c5c2b282db19b8a44d53a43 Author: Patrick Wendell <pwendell@gmail.com> Date: Fri Jan 24 11:22:53 2014 -0800 Adding scalastyle snapshot
Diffstat (limited to 'external')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala14
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala3
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala4
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala25
4 files changed, 26 insertions, 20 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
index a2cd49c573..c2d9dcbfaa 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -37,7 +37,8 @@ import org.apache.spark.streaming.dstream._
/**
* Input stream that pulls messages from a Kafka Broker.
*
- * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html
+ * @param kafkaParams Map of kafka configuration paramaters.
+ * See: http://kafka.apache.org/configuration.html
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param storageLevel RDD storage level.
@@ -134,12 +135,15 @@ class KafkaReceiver[
}
}
- // It is our responsibility to delete the consumer group when specifying autooffset.reset. This is because
- // Kafka 0.7.2 only honors this param when the group is not in zookeeper.
+ // It is our responsibility to delete the consumer group when specifying autooffset.reset. This
+ // is because Kafka 0.7.2 only honors this param when the group is not in zookeeper.
//
- // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied from Kafkas'
- // ConsoleConsumer. See code related to 'autooffset.reset' when it is set to 'smallest'/'largest':
+ // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied
+ // from Kafkas' ConsoleConsumer. See code related to 'autooffset.reset' when it is set to
+ // 'smallest'/'largest':
+ // scalastyle:off
// https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+ // scalastyle:on
private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) {
try {
val dir = "/consumers/" + groupId
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index 15a2daa008..5472d0cd04 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -113,7 +113,8 @@ object KafkaUtils {
): JavaPairDStream[String, String] = {
implicit val cmt: ClassTag[String] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
- createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
+ createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
+ storageLevel)
}
/**
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
index 960c6a389e..6acba25f44 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
@@ -34,8 +34,8 @@ private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
bytesToObjects: Seq[ByteString] => Iterator[T])
extends Actor with Receiver with Logging {
- override def preStart() = ZeroMQExtension(context.system).newSocket(SocketType.Sub, Listener(self),
- Connect(publisherUrl), subscribe)
+ override def preStart() = ZeroMQExtension(context.system)
+ .newSocket(SocketType.Sub, Listener(self), Connect(publisherUrl), subscribe)
def receive: Receive = {
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
index b47d786986..c989ec0f27 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
@@ -59,10 +59,10 @@ object ZeroMQUtils {
* @param jssc JavaStreamingContext object
* @param publisherUrl Url of remote ZeroMQ publisher
* @param subscribe Topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
- * of byte thus it needs the converter(which might be deserializer of bytes)
- * to translate from sequence of sequence of bytes, where sequence refer to a frame
- * and sub sequence refer to its payload.
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
+ * frame has sequence of byte thus it needs the converter(which might be
+ * deserializer of bytes) to translate from sequence of sequence of bytes,
+ * where sequence refer to a frame and sub sequence refer to its payload.
* @param storageLevel Storage level to use for storing the received objects
*/
def createStream[T](
@@ -84,10 +84,10 @@ object ZeroMQUtils {
* @param jssc JavaStreamingContext object
* @param publisherUrl Url of remote zeromq publisher
* @param subscribe Topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
- * of byte thus it needs the converter(which might be deserializer of bytes)
- * to translate from sequence of sequence of bytes, where sequence refer to a frame
- * and sub sequence refer to its payload.
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
+ * frame has sequence of byte thus it needs the converter(which might be
+ * deserializer of bytes) to translate from sequence of sequence of bytes,
+ * where sequence refer to a frame and sub sequence refer to its payload.
* @param storageLevel RDD storage level.
*/
def createStream[T](
@@ -108,10 +108,11 @@ object ZeroMQUtils {
* @param jssc JavaStreamingContext object
* @param publisherUrl Url of remote zeromq publisher
* @param subscribe Topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
- * of byte thus it needs the converter(which might be deserializer of bytes)
- * to translate from sequence of sequence of bytes, where sequence refer to a frame
- * and sub sequence refer to its payload.
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
+ * frame has sequence of byte thus it needs the converter(which might
+ * be deserializer of bytes) to translate from sequence of sequence of
+ * bytes, where sequence refer to a frame and sub sequence refer to its
+ * payload.
*/
def createStream[T](
jssc: JavaStreamingContext,