aboutsummaryrefslogtreecommitdiff
path: root/external/flume
diff options
context:
space:
mode:
Diffstat (limited to 'external/flume')
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala12
1 files changed, 5 insertions, 7 deletions
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index 2de2a7926b..60e2994431 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -37,8 +37,7 @@ import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.receiver.Receiver
-import org.jboss.netty.channel.ChannelPipelineFactory
-import org.jboss.netty.channel.Channels
+import org.jboss.netty.channel.{ChannelPipeline, ChannelPipelineFactory, Channels}
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
import org.jboss.netty.handler.codec.compression._
@@ -187,8 +186,8 @@ class FlumeReceiver(
logInfo("Flume receiver stopped")
}
- override def preferredLocation = Some(host)
-
+ override def preferredLocation: Option[String] = Option(host)
+
/** A Netty Pipeline factory that will decompress incoming data from
* and the Netty client and compress data going back to the client.
*
@@ -198,13 +197,12 @@ class FlumeReceiver(
*/
private[streaming]
class CompressionChannelPipelineFactory extends ChannelPipelineFactory {
-
- def getPipeline() = {
+ def getPipeline(): ChannelPipeline = {
val pipeline = Channels.pipeline()
val encoder = new ZlibEncoder(6)
pipeline.addFirst("deflater", encoder)
pipeline.addFirst("inflater", new ZlibDecoder())
pipeline
+ }
}
}
-}