diff options
author | tmalaska <ted.malaska@cloudera.com> | 2014-07-10 13:15:02 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-07-10 13:15:02 -0700 |
commit | 40a8fef4e6619b4ea10a4ec9026260649ce5ae73 (patch) | |
tree | 14bdcb5f6475634e95c65b766eeec64022b6d09f /external/flume/src/main | |
parent | 369aa84e8fba883165817338ac8bf9460be74521 (diff) | |
download | spark-40a8fef4e6619b4ea10a4ec9026260649ce5ae73.tar.gz spark-40a8fef4e6619b4ea10a4ec9026260649ce5ae73.tar.bz2 spark-40a8fef4e6619b4ea10a4ec9026260649ce5ae73.zip |
[SPARK-1478].3: Upgrade FlumeInputDStream's FlumeReceiver to support FLUME-1915
This is a modified version of this PR https://github.com/apache/spark/pull/1168 done by @tmalaska
Adds MIMA binary check exclusions.
Author: tmalaska <ted.malaska@cloudera.com>
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #1347 from tdas/FLUME-1915 and squashes the following commits:
96065df [Tathagata Das] Added Mima exclusion for FlumeReceiver.
41d5338 [tmalaska] Address line 57 that was too long
12617e5 [tmalaska] SPARK-1478: Upgrade FlumeInputDStream's Flume...
Diffstat (limited to 'external/flume/src/main')
-rw-r--r-- | external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala | 76 | ||||
-rw-r--r-- | external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala | 41 |
2 files changed, 106 insertions, 11 deletions
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala index ed35e34ad4..07ae88febf 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming.flume import java.net.InetSocketAddress import java.io.{ObjectInput, ObjectOutput, Externalizable} import java.nio.ByteBuffer +import java.util.concurrent.Executors import scala.collection.JavaConversions._ import scala.reflect.ClassTag @@ -29,24 +30,32 @@ import org.apache.flume.source.avro.AvroFlumeEvent import org.apache.flume.source.avro.Status import org.apache.avro.ipc.specific.SpecificResponder import org.apache.avro.ipc.NettyServer - +import org.apache.spark.Logging import org.apache.spark.util.Utils import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream._ -import org.apache.spark.Logging +import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.receiver.Receiver +import org.jboss.netty.channel.ChannelPipelineFactory +import org.jboss.netty.channel.Channels +import org.jboss.netty.channel.ChannelPipeline +import org.jboss.netty.channel.ChannelFactory +import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory +import org.jboss.netty.handler.codec.compression._ +import org.jboss.netty.handler.execution.ExecutionHandler + private[streaming] class FlumeInputDStream[T: ClassTag]( @transient ssc_ : StreamingContext, host: String, port: Int, - storageLevel: StorageLevel + storageLevel: StorageLevel, + enableDecompression: Boolean ) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) { override def getReceiver(): Receiver[SparkFlumeEvent] = { - new FlumeReceiver(host, port, storageLevel) + new FlumeReceiver(host, port, storageLevel, enableDecompression) } } @@ -134,22 +143,71 @@ private[streaming] class FlumeReceiver( host: String, port: Int, - storageLevel: StorageLevel + storageLevel: StorageLevel, + enableDecompression: Boolean ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging { lazy val responder = new SpecificResponder( classOf[AvroSourceProtocol], new FlumeEventServer(this)) - lazy val server = new NettyServer(responder, new InetSocketAddress(host, port)) + var server: NettyServer = null + + private def initServer() = { + if (enableDecompression) { + val channelFactory = new NioServerSocketChannelFactory + (Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); + val channelPipelieFactory = new CompressionChannelPipelineFactory() + + new NettyServer( + responder, + new InetSocketAddress(host, port), + channelFactory, + channelPipelieFactory, + null) + } else { + new NettyServer(responder, new InetSocketAddress(host, port)) + } + } def onStart() { - server.start() + synchronized { + if (server == null) { + server = initServer() + server.start() + } else { + logWarning("Flume receiver being asked to start more then once with out close") + } + } logInfo("Flume receiver started") } def onStop() { - server.close() + synchronized { + if (server != null) { + server.close() + server = null + } + } logInfo("Flume receiver stopped") } override def preferredLocation = Some(host) + + /** A Netty Pipeline factory that will decompress incoming data from + * and the Netty client and compress data going back to the client. + * + * The compression on the return is required because Flume requires + * a successful response to indicate it can remove the event/batch + * from the configured channel + */ + private[streaming] + class CompressionChannelPipelineFactory extends ChannelPipelineFactory { + + def getPipeline() = { + val pipeline = Channels.pipeline() + val encoder = new ZlibEncoder(6) + pipeline.addFirst("deflater", encoder) + pipeline.addFirst("inflater", new ZlibDecoder()) + pipeline + } +} } diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala index 499f3560ef..716db9fa76 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala @@ -36,7 +36,27 @@ object FlumeUtils { port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[SparkFlumeEvent] = { - val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel) + createStream(ssc, hostname, port, storageLevel, false) + } + + /** + * Create a input stream from a Flume source. + * @param ssc StreamingContext object + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + * @param storageLevel Storage level to use for storing the received objects + * @param enableDecompression should netty server decompress input stream + */ + def createStream ( + ssc: StreamingContext, + hostname: String, + port: Int, + storageLevel: StorageLevel, + enableDecompression: Boolean + ): ReceiverInputDStream[SparkFlumeEvent] = { + val inputStream = new FlumeInputDStream[SparkFlumeEvent]( + ssc, hostname, port, storageLevel, enableDecompression) + inputStream } @@ -66,6 +86,23 @@ object FlumeUtils { port: Int, storageLevel: StorageLevel ): JavaReceiverInputDStream[SparkFlumeEvent] = { - createStream(jssc.ssc, hostname, port, storageLevel) + createStream(jssc.ssc, hostname, port, storageLevel, false) + } + + /** + * Creates a input stream from a Flume source. + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + * @param storageLevel Storage level to use for storing the received objects + * @param enableDecompression should netty server decompress input stream + */ + def createStream( + jssc: JavaStreamingContext, + hostname: String, + port: Int, + storageLevel: StorageLevel, + enableDecompression: Boolean + ): JavaReceiverInputDStream[SparkFlumeEvent] = { + createStream(jssc.ssc, hostname, port, storageLevel, enableDecompression) } } |