aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorKousuke Saruta <sarutak@oss.nttdata.co.jp>2016-01-11 21:06:22 -0800
committerReynold Xin <rxin@databricks.com>2016-01-11 21:06:22 -0800
commit39ae04e6b714e085a1341aa84d8fc5fc827d5f35 (patch)
tree98f9bf78a4309c4c4cd061d4ee0a9f4621a5813d /external
parentaaa2c3b628319178ca1f3f68966ff253c2de49cb (diff)
downloadspark-39ae04e6b714e085a1341aa84d8fc5fc827d5f35.tar.gz
spark-39ae04e6b714e085a1341aa84d8fc5fc827d5f35.tar.bz2
spark-39ae04e6b714e085a1341aa84d8fc5fc827d5f35.zip
[SPARK-12692][BUILD][STREAMING] Scala style: Fix the style violation (Space before "," or ":")
Fix the style violation (space before , and :). This PR is a followup for #10643. Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp> Closes #10685 from sarutak/SPARK-12692-followup-streaming.
Diffstat (limited to 'external')
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala8
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala18
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala4
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala4
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala2
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala4
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala4
7 files changed, 22 insertions, 22 deletions
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala
index d87b86932d..aa530a7121 100644
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala
@@ -26,20 +26,20 @@ import org.slf4j.{Logger, LoggerFactory}
private[sink] trait Logging {
// Make the log field transient so that objects with Logging can
// be serialized and used on another machine
- @transient private var log_ : Logger = null
+ @transient private var _log: Logger = null
// Method to get or create the logger for this object
protected def log: Logger = {
- if (log_ == null) {
+ if (_log == null) {
initializeIfNecessary()
var className = this.getClass.getName
// Ignore trailing $'s in the class names for Scala objects
if (className.endsWith("$")) {
className = className.substring(0, className.length - 1)
}
- log_ = LoggerFactory.getLogger(className)
+ _log = LoggerFactory.getLogger(className)
}
- log_
+ _log
}
// Log methods that take only a String
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index 1bfa35a8b3..74bd0165c6 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -41,12 +41,12 @@ import org.apache.spark.util.Utils
private[streaming]
class FlumeInputDStream[T: ClassTag](
- ssc_ : StreamingContext,
+ _ssc: StreamingContext,
host: String,
port: Int,
storageLevel: StorageLevel,
enableDecompression: Boolean
-) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) {
+) extends ReceiverInputDStream[SparkFlumeEvent](_ssc) {
override def getReceiver(): Receiver[SparkFlumeEvent] = {
new FlumeReceiver(host, port, storageLevel, enableDecompression)
@@ -60,7 +60,7 @@ class FlumeInputDStream[T: ClassTag](
* which are not serializable.
*/
class SparkFlumeEvent() extends Externalizable {
- var event : AvroFlumeEvent = new AvroFlumeEvent()
+ var event: AvroFlumeEvent = new AvroFlumeEvent()
/* De-serialize from bytes. */
def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
@@ -75,12 +75,12 @@ class SparkFlumeEvent() extends Externalizable {
val keyLength = in.readInt()
val keyBuff = new Array[Byte](keyLength)
in.readFully(keyBuff)
- val key : String = Utils.deserialize(keyBuff)
+ val key: String = Utils.deserialize(keyBuff)
val valLength = in.readInt()
val valBuff = new Array[Byte](valLength)
in.readFully(valBuff)
- val value : String = Utils.deserialize(valBuff)
+ val value: String = Utils.deserialize(valBuff)
headers.put(key, value)
}
@@ -109,7 +109,7 @@ class SparkFlumeEvent() extends Externalizable {
}
private[streaming] object SparkFlumeEvent {
- def fromAvroFlumeEvent(in : AvroFlumeEvent) : SparkFlumeEvent = {
+ def fromAvroFlumeEvent(in: AvroFlumeEvent): SparkFlumeEvent = {
val event = new SparkFlumeEvent
event.event = in
event
@@ -118,13 +118,13 @@ private[streaming] object SparkFlumeEvent {
/** A simple server that implements Flume's Avro protocol. */
private[streaming]
-class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
- override def append(event : AvroFlumeEvent) : Status = {
+class FlumeEventServer(receiver: FlumeReceiver) extends AvroSourceProtocol {
+ override def append(event: AvroFlumeEvent): Status = {
receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event))
Status.OK
}
- override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = {
+ override def appendBatch(events: java.util.List[AvroFlumeEvent]): Status = {
events.asScala.foreach(event => receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event)))
Status.OK
}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index 8a087474d3..54d8c8b03f 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -58,11 +58,11 @@ class DirectKafkaInputDStream[
U <: Decoder[K]: ClassTag,
T <: Decoder[V]: ClassTag,
R: ClassTag](
- ssc_ : StreamingContext,
+ _ssc: StreamingContext,
val kafkaParams: Map[String, String],
val fromOffsets: Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R
- ) extends InputDStream[R](ssc_) with Logging {
+ ) extends InputDStream[R](_ssc) with Logging {
val maxRetries = context.sparkContext.getConf.getInt(
"spark.streaming.kafka.maxRetries", 1)
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
index 67f2360896..89d1811c99 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -48,12 +48,12 @@ class KafkaInputDStream[
V: ClassTag,
U <: Decoder[_]: ClassTag,
T <: Decoder[_]: ClassTag](
- ssc_ : StreamingContext,
+ _ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Map[String, Int],
useReliableReceiver: Boolean,
storageLevel: StorageLevel
- ) extends ReceiverInputDStream[(K, V)](ssc_) with Logging {
+ ) extends ReceiverInputDStream[(K, V)](_ssc) with Logging {
def getReceiver(): Receiver[(K, V)] = {
if (!useReliableReceiver) {
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
index 80e2df62de..7b9aee39ff 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
@@ -50,7 +50,7 @@ class ReliableKafkaStreamSuite extends SparkFunSuite
private var ssc: StreamingContext = _
private var tempDirectory: File = null
- override def beforeAll() : Unit = {
+ override def beforeAll(): Unit = {
kafkaTestUtils = new KafkaTestUtils
kafkaTestUtils.setup()
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
index 116c170489..079bd8a9a8 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
@@ -38,11 +38,11 @@ import org.apache.spark.streaming.receiver.Receiver
private[streaming]
class MQTTInputDStream(
- ssc_ : StreamingContext,
+ _ssc: StreamingContext,
brokerUrl: String,
topic: String,
storageLevel: StorageLevel
- ) extends ReceiverInputDStream[String](ssc_) {
+ ) extends ReceiverInputDStream[String](_ssc) {
private[streaming] override def name: String = s"MQTT stream [$id]"
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
index a48eec70b9..bdd57fdde3 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
@@ -39,11 +39,11 @@ import org.apache.spark.streaming.receiver.Receiver
*/
private[streaming]
class TwitterInputDStream(
- ssc_ : StreamingContext,
+ _ssc: StreamingContext,
twitterAuth: Option[Authorization],
filters: Seq[String],
storageLevel: StorageLevel
- ) extends ReceiverInputDStream[Status](ssc_) {
+ ) extends ReceiverInputDStream[Status](_ssc) {
private def createOAuthAuthorization(): Authorization = {
new OAuthAuthorization(new ConfigurationBuilder().build())