aboutsummaryrefslogtreecommitdiff
path: root/external/flume/src
diff options
context:
space:
mode:
authorKousuke Saruta <sarutak@oss.nttdata.co.jp>2016-01-11 21:06:22 -0800
committerReynold Xin <rxin@databricks.com>2016-01-11 21:06:22 -0800
commit39ae04e6b714e085a1341aa84d8fc5fc827d5f35 (patch)
tree98f9bf78a4309c4c4cd061d4ee0a9f4621a5813d /external/flume/src
parentaaa2c3b628319178ca1f3f68966ff253c2de49cb (diff)
downloadspark-39ae04e6b714e085a1341aa84d8fc5fc827d5f35.tar.gz
spark-39ae04e6b714e085a1341aa84d8fc5fc827d5f35.tar.bz2
spark-39ae04e6b714e085a1341aa84d8fc5fc827d5f35.zip
[SPARK-12692][BUILD][STREAMING] Scala style: Fix the style violation (Space before "," or ":")
Fix the style violation (space before , and :). This PR is a followup for #10643. Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp> Closes #10685 from sarutak/SPARK-12692-followup-streaming.
Diffstat (limited to 'external/flume/src')
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala18
1 files changed, 9 insertions, 9 deletions
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index 1bfa35a8b3..74bd0165c6 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -41,12 +41,12 @@ import org.apache.spark.util.Utils
private[streaming]
class FlumeInputDStream[T: ClassTag](
- ssc_ : StreamingContext,
+ _ssc: StreamingContext,
host: String,
port: Int,
storageLevel: StorageLevel,
enableDecompression: Boolean
-) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) {
+) extends ReceiverInputDStream[SparkFlumeEvent](_ssc) {
override def getReceiver(): Receiver[SparkFlumeEvent] = {
new FlumeReceiver(host, port, storageLevel, enableDecompression)
@@ -60,7 +60,7 @@ class FlumeInputDStream[T: ClassTag](
* which are not serializable.
*/
class SparkFlumeEvent() extends Externalizable {
- var event : AvroFlumeEvent = new AvroFlumeEvent()
+ var event: AvroFlumeEvent = new AvroFlumeEvent()
/* De-serialize from bytes. */
def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
@@ -75,12 +75,12 @@ class SparkFlumeEvent() extends Externalizable {
val keyLength = in.readInt()
val keyBuff = new Array[Byte](keyLength)
in.readFully(keyBuff)
- val key : String = Utils.deserialize(keyBuff)
+ val key: String = Utils.deserialize(keyBuff)
val valLength = in.readInt()
val valBuff = new Array[Byte](valLength)
in.readFully(valBuff)
- val value : String = Utils.deserialize(valBuff)
+ val value: String = Utils.deserialize(valBuff)
headers.put(key, value)
}
@@ -109,7 +109,7 @@ class SparkFlumeEvent() extends Externalizable {
}
private[streaming] object SparkFlumeEvent {
- def fromAvroFlumeEvent(in : AvroFlumeEvent) : SparkFlumeEvent = {
+ def fromAvroFlumeEvent(in: AvroFlumeEvent): SparkFlumeEvent = {
val event = new SparkFlumeEvent
event.event = in
event
@@ -118,13 +118,13 @@ private[streaming] object SparkFlumeEvent {
/** A simple server that implements Flume's Avro protocol. */
private[streaming]
-class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
- override def append(event : AvroFlumeEvent) : Status = {
+class FlumeEventServer(receiver: FlumeReceiver) extends AvroSourceProtocol {
+ override def append(event: AvroFlumeEvent): Status = {
receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event))
Status.OK
}
- override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = {
+ override def appendBatch(events: java.util.List[AvroFlumeEvent]): Status = {
events.asScala.foreach(event => receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event)))
Status.OK
}