aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authortmalaska <ted.malaska@cloudera.com>2014-07-10 13:15:02 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2014-07-10 13:15:02 -0700
commit40a8fef4e6619b4ea10a4ec9026260649ce5ae73 (patch)
tree14bdcb5f6475634e95c65b766eeec64022b6d09f /external
parent369aa84e8fba883165817338ac8bf9460be74521 (diff)
downloadspark-40a8fef4e6619b4ea10a4ec9026260649ce5ae73.tar.gz
spark-40a8fef4e6619b4ea10a4ec9026260649ce5ae73.tar.bz2
spark-40a8fef4e6619b4ea10a4ec9026260649ce5ae73.zip
[SPARK-1478].3: Upgrade FlumeInputDStream's FlumeReceiver to support FLUME-1915
This is a modified version of this PR https://github.com/apache/spark/pull/1168 done by @tmalaska Adds MIMA binary check exclusions. Author: tmalaska <ted.malaska@cloudera.com> Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #1347 from tdas/FLUME-1915 and squashes the following commits: 96065df [Tathagata Das] Added Mima exclusion for FlumeReceiver. 41d5338 [tmalaska] Address line 57 that was too long 12617e5 [tmalaska] SPARK-1478: Upgrade FlumeInputDStream's Flume...
Diffstat (limited to 'external')
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala76
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala41
-rw-r--r--external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java2
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala41
4 files changed, 144 insertions, 16 deletions
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index ed35e34ad4..07ae88febf 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.flume
import java.net.InetSocketAddress
import java.io.{ObjectInput, ObjectOutput, Externalizable}
import java.nio.ByteBuffer
+import java.util.concurrent.Executors
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
@@ -29,24 +30,32 @@ import org.apache.flume.source.avro.AvroFlumeEvent
import org.apache.flume.source.avro.Status
import org.apache.avro.ipc.specific.SpecificResponder
import org.apache.avro.ipc.NettyServer
-
+import org.apache.spark.Logging
import org.apache.spark.util.Utils
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
-import org.apache.spark.Logging
+import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.receiver.Receiver
+import org.jboss.netty.channel.ChannelPipelineFactory
+import org.jboss.netty.channel.Channels
+import org.jboss.netty.channel.ChannelPipeline
+import org.jboss.netty.channel.ChannelFactory
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
+import org.jboss.netty.handler.codec.compression._
+import org.jboss.netty.handler.execution.ExecutionHandler
+
private[streaming]
class FlumeInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext,
host: String,
port: Int,
- storageLevel: StorageLevel
+ storageLevel: StorageLevel,
+ enableDecompression: Boolean
) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) {
override def getReceiver(): Receiver[SparkFlumeEvent] = {
- new FlumeReceiver(host, port, storageLevel)
+ new FlumeReceiver(host, port, storageLevel, enableDecompression)
}
}
@@ -134,22 +143,71 @@ private[streaming]
class FlumeReceiver(
host: String,
port: Int,
- storageLevel: StorageLevel
+ storageLevel: StorageLevel,
+ enableDecompression: Boolean
) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
lazy val responder = new SpecificResponder(
classOf[AvroSourceProtocol], new FlumeEventServer(this))
- lazy val server = new NettyServer(responder, new InetSocketAddress(host, port))
+ var server: NettyServer = null
+
+ private def initServer() = {
+ if (enableDecompression) {
+ val channelFactory = new NioServerSocketChannelFactory
+ (Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+ val channelPipelieFactory = new CompressionChannelPipelineFactory()
+
+ new NettyServer(
+ responder,
+ new InetSocketAddress(host, port),
+ channelFactory,
+ channelPipelieFactory,
+ null)
+ } else {
+ new NettyServer(responder, new InetSocketAddress(host, port))
+ }
+ }
def onStart() {
- server.start()
+ synchronized {
+ if (server == null) {
+ server = initServer()
+ server.start()
+ } else {
+ logWarning("Flume receiver being asked to start more then once with out close")
+ }
+ }
logInfo("Flume receiver started")
}
def onStop() {
- server.close()
+ synchronized {
+ if (server != null) {
+ server.close()
+ server = null
+ }
+ }
logInfo("Flume receiver stopped")
}
override def preferredLocation = Some(host)
+
+ /** A Netty Pipeline factory that will decompress incoming data from
+ * and the Netty client and compress data going back to the client.
+ *
+ * The compression on the return is required because Flume requires
+ * a successful response to indicate it can remove the event/batch
+ * from the configured channel
+ */
+ private[streaming]
+ class CompressionChannelPipelineFactory extends ChannelPipelineFactory {
+
+ def getPipeline() = {
+ val pipeline = Channels.pipeline()
+ val encoder = new ZlibEncoder(6)
+ pipeline.addFirst("deflater", encoder)
+ pipeline.addFirst("inflater", new ZlibDecoder())
+ pipeline
+ }
+}
}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
index 499f3560ef..716db9fa76 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
@@ -36,7 +36,27 @@ object FlumeUtils {
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[SparkFlumeEvent] = {
- val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel)
+ createStream(ssc, hostname, port, storageLevel, false)
+ }
+
+ /**
+ * Create a input stream from a Flume source.
+ * @param ssc StreamingContext object
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ * @param storageLevel Storage level to use for storing the received objects
+ * @param enableDecompression should netty server decompress input stream
+ */
+ def createStream (
+ ssc: StreamingContext,
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel,
+ enableDecompression: Boolean
+ ): ReceiverInputDStream[SparkFlumeEvent] = {
+ val inputStream = new FlumeInputDStream[SparkFlumeEvent](
+ ssc, hostname, port, storageLevel, enableDecompression)
+
inputStream
}
@@ -66,6 +86,23 @@ object FlumeUtils {
port: Int,
storageLevel: StorageLevel
): JavaReceiverInputDStream[SparkFlumeEvent] = {
- createStream(jssc.ssc, hostname, port, storageLevel)
+ createStream(jssc.ssc, hostname, port, storageLevel, false)
+ }
+
+ /**
+ * Creates a input stream from a Flume source.
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ * @param storageLevel Storage level to use for storing the received objects
+ * @param enableDecompression should netty server decompress input stream
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel,
+ enableDecompression: Boolean
+ ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+ createStream(jssc.ssc, hostname, port, storageLevel, enableDecompression)
}
}
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
index e0ad4f1015..3b5e0c7746 100644
--- a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
+++ b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
@@ -30,5 +30,7 @@ public class JavaFlumeStreamSuite extends LocalJavaStreamingContext {
JavaReceiverInputDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
StorageLevel.MEMORY_AND_DISK_SER_2());
+ JavaReceiverInputDStream<SparkFlumeEvent> test3 = FlumeUtils.createStream(ssc, "localhost", 12345,
+ StorageLevel.MEMORY_AND_DISK_SER_2(), false);
}
}
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index dd287d0ef9..73dffef953 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -33,15 +33,26 @@ import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuite
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
-class FlumeStreamSuite extends TestSuiteBase {
+import org.jboss.netty.channel.ChannelPipeline
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
+import org.jboss.netty.channel.socket.SocketChannel
+import org.jboss.netty.handler.codec.compression._
- val testPort = 9999
+class FlumeStreamSuite extends TestSuiteBase {
test("flume input stream") {
+ runFlumeStreamTest(false, 9998)
+ }
+
+ test("flume input compressed stream") {
+ runFlumeStreamTest(true, 9997)
+ }
+
+ def runFlumeStreamTest(enableDecompression: Boolean, testPort: Int) {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val flumeStream: JavaReceiverInputDStream[SparkFlumeEvent] =
- FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK)
+ FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, enableDecompression)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream.receiverInputDStream, outputBuffer)
@@ -52,8 +63,17 @@ class FlumeStreamSuite extends TestSuiteBase {
val input = Seq(1, 2, 3, 4, 5)
Thread.sleep(1000)
val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
- val client = SpecificRequestor.getClient(
- classOf[AvroSourceProtocol], transceiver)
+ var client: AvroSourceProtocol = null;
+
+ if (enableDecompression) {
+ client = SpecificRequestor.getClient(
+ classOf[AvroSourceProtocol],
+ new NettyTransceiver(new InetSocketAddress("localhost", testPort),
+ new CompressionChannelFactory(6)));
+ } else {
+ client = SpecificRequestor.getClient(
+ classOf[AvroSourceProtocol], transceiver)
+ }
for (i <- 0 until input.size) {
val event = new AvroFlumeEvent
@@ -64,6 +84,8 @@ class FlumeStreamSuite extends TestSuiteBase {
clock.addToTime(batchDuration.milliseconds)
}
+ Thread.sleep(1000)
+
val startTime = System.currentTimeMillis()
while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size)
@@ -85,4 +107,13 @@ class FlumeStreamSuite extends TestSuiteBase {
assert(outputBuffer(i).head.event.getHeaders.get("test") === "header")
}
}
+
+ class CompressionChannelFactory(compressionLevel: Int) extends NioClientSocketChannelFactory {
+ override def newChannel(pipeline:ChannelPipeline) : SocketChannel = {
+ var encoder : ZlibEncoder = new ZlibEncoder(compressionLevel);
+ pipeline.addFirst("deflater", encoder);
+ pipeline.addFirst("inflater", new ZlibDecoder());
+ super.newChannel(pipeline);
+ }
+ }
}