aboutsummaryrefslogtreecommitdiff
path: root/external/flume/src/main
diff options
context:
space:
mode:
authortmalaska <ted.malaska@cloudera.com>2014-07-10 13:15:02 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2014-07-10 13:15:02 -0700
commit40a8fef4e6619b4ea10a4ec9026260649ce5ae73 (patch)
tree14bdcb5f6475634e95c65b766eeec64022b6d09f /external/flume/src/main
parent369aa84e8fba883165817338ac8bf9460be74521 (diff)
downloadspark-40a8fef4e6619b4ea10a4ec9026260649ce5ae73.tar.gz
spark-40a8fef4e6619b4ea10a4ec9026260649ce5ae73.tar.bz2
spark-40a8fef4e6619b4ea10a4ec9026260649ce5ae73.zip
[SPARK-1478].3: Upgrade FlumeInputDStream's FlumeReceiver to support FLUME-1915
This is a modified version of this PR https://github.com/apache/spark/pull/1168 done by @tmalaska Adds MIMA binary check exclusions. Author: tmalaska <ted.malaska@cloudera.com> Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #1347 from tdas/FLUME-1915 and squashes the following commits: 96065df [Tathagata Das] Added Mima exclusion for FlumeReceiver. 41d5338 [tmalaska] Address line 57 that was too long 12617e5 [tmalaska] SPARK-1478: Upgrade FlumeInputDStream's Flume...
Diffstat (limited to 'external/flume/src/main')
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala76
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala41
2 files changed, 106 insertions, 11 deletions
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index ed35e34ad4..07ae88febf 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.flume
import java.net.InetSocketAddress
import java.io.{ObjectInput, ObjectOutput, Externalizable}
import java.nio.ByteBuffer
+import java.util.concurrent.Executors
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
@@ -29,24 +30,32 @@ import org.apache.flume.source.avro.AvroFlumeEvent
import org.apache.flume.source.avro.Status
import org.apache.avro.ipc.specific.SpecificResponder
import org.apache.avro.ipc.NettyServer
-
+import org.apache.spark.Logging
import org.apache.spark.util.Utils
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
-import org.apache.spark.Logging
+import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.receiver.Receiver
+import org.jboss.netty.channel.ChannelPipelineFactory
+import org.jboss.netty.channel.Channels
+import org.jboss.netty.channel.ChannelPipeline
+import org.jboss.netty.channel.ChannelFactory
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
+import org.jboss.netty.handler.codec.compression._
+import org.jboss.netty.handler.execution.ExecutionHandler
+
private[streaming]
class FlumeInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext,
host: String,
port: Int,
- storageLevel: StorageLevel
+ storageLevel: StorageLevel,
+ enableDecompression: Boolean
) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) {
override def getReceiver(): Receiver[SparkFlumeEvent] = {
- new FlumeReceiver(host, port, storageLevel)
+ new FlumeReceiver(host, port, storageLevel, enableDecompression)
}
}
@@ -134,22 +143,71 @@ private[streaming]
class FlumeReceiver(
host: String,
port: Int,
- storageLevel: StorageLevel
+ storageLevel: StorageLevel,
+ enableDecompression: Boolean
) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
lazy val responder = new SpecificResponder(
classOf[AvroSourceProtocol], new FlumeEventServer(this))
- lazy val server = new NettyServer(responder, new InetSocketAddress(host, port))
+ var server: NettyServer = null
+
+ private def initServer() = {
+ if (enableDecompression) {
+ val channelFactory = new NioServerSocketChannelFactory
+ (Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+ val channelPipelieFactory = new CompressionChannelPipelineFactory()
+
+ new NettyServer(
+ responder,
+ new InetSocketAddress(host, port),
+ channelFactory,
+ channelPipelieFactory,
+ null)
+ } else {
+ new NettyServer(responder, new InetSocketAddress(host, port))
+ }
+ }
def onStart() {
- server.start()
+ synchronized {
+ if (server == null) {
+ server = initServer()
+ server.start()
+ } else {
+ logWarning("Flume receiver being asked to start more then once with out close")
+ }
+ }
logInfo("Flume receiver started")
}
def onStop() {
- server.close()
+ synchronized {
+ if (server != null) {
+ server.close()
+ server = null
+ }
+ }
logInfo("Flume receiver stopped")
}
override def preferredLocation = Some(host)
+
+ /** A Netty Pipeline factory that will decompress incoming data from
+ * and the Netty client and compress data going back to the client.
+ *
+ * The compression on the return is required because Flume requires
+ * a successful response to indicate it can remove the event/batch
+ * from the configured channel
+ */
+ private[streaming]
+ class CompressionChannelPipelineFactory extends ChannelPipelineFactory {
+
+ def getPipeline() = {
+ val pipeline = Channels.pipeline()
+ val encoder = new ZlibEncoder(6)
+ pipeline.addFirst("deflater", encoder)
+ pipeline.addFirst("inflater", new ZlibDecoder())
+ pipeline
+ }
+}
}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
index 499f3560ef..716db9fa76 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
@@ -36,7 +36,27 @@ object FlumeUtils {
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[SparkFlumeEvent] = {
- val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel)
+ createStream(ssc, hostname, port, storageLevel, false)
+ }
+
+ /**
+ * Create a input stream from a Flume source.
+ * @param ssc StreamingContext object
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ * @param storageLevel Storage level to use for storing the received objects
+ * @param enableDecompression should netty server decompress input stream
+ */
+ def createStream (
+ ssc: StreamingContext,
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel,
+ enableDecompression: Boolean
+ ): ReceiverInputDStream[SparkFlumeEvent] = {
+ val inputStream = new FlumeInputDStream[SparkFlumeEvent](
+ ssc, hostname, port, storageLevel, enableDecompression)
+
inputStream
}
@@ -66,6 +86,23 @@ object FlumeUtils {
port: Int,
storageLevel: StorageLevel
): JavaReceiverInputDStream[SparkFlumeEvent] = {
- createStream(jssc.ssc, hostname, port, storageLevel)
+ createStream(jssc.ssc, hostname, port, storageLevel, false)
+ }
+
+ /**
+ * Creates a input stream from a Flume source.
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ * @param storageLevel Storage level to use for storing the received objects
+ * @param enableDecompression should netty server decompress input stream
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel,
+ enableDecompression: Boolean
+ ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+ createStream(jssc.ssc, hostname, port, storageLevel, enableDecompression)
}
}