diff options
author | tmalaska <ted.malaska@cloudera.com> | 2014-07-10 13:15:02 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-07-10 13:15:02 -0700 |
commit | 40a8fef4e6619b4ea10a4ec9026260649ce5ae73 (patch) | |
tree | 14bdcb5f6475634e95c65b766eeec64022b6d09f /external/flume/src/test | |
parent | 369aa84e8fba883165817338ac8bf9460be74521 (diff) | |
download | spark-40a8fef4e6619b4ea10a4ec9026260649ce5ae73.tar.gz spark-40a8fef4e6619b4ea10a4ec9026260649ce5ae73.tar.bz2 spark-40a8fef4e6619b4ea10a4ec9026260649ce5ae73.zip |
[SPARK-1478].3: Upgrade FlumeInputDStream's FlumeReceiver to support FLUME-1915
This is a modified version of this PR https://github.com/apache/spark/pull/1168 done by @tmalaska
Adds MIMA binary check exclusions.
Author: tmalaska <ted.malaska@cloudera.com>
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #1347 from tdas/FLUME-1915 and squashes the following commits:
96065df [Tathagata Das] Added Mima exclusion for FlumeReceiver.
41d5338 [tmalaska] Address line 57 that was too long
12617e5 [tmalaska] SPARK-1478: Upgrade FlumeInputDStream's Flume...
Diffstat (limited to 'external/flume/src/test')
-rw-r--r-- | external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java | 2 | ||||
-rw-r--r-- | external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala | 41 |
2 files changed, 38 insertions, 5 deletions
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java index e0ad4f1015..3b5e0c7746 100644 --- a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java +++ b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java @@ -30,5 +30,7 @@ public class JavaFlumeStreamSuite extends LocalJavaStreamingContext { JavaReceiverInputDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345); JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345, StorageLevel.MEMORY_AND_DISK_SER_2()); + JavaReceiverInputDStream<SparkFlumeEvent> test3 = FlumeUtils.createStream(ssc, "localhost", 12345, + StorageLevel.MEMORY_AND_DISK_SER_2(), false); } } diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala index dd287d0ef9..73dffef953 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala @@ -33,15 +33,26 @@ import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuite import org.apache.spark.streaming.util.ManualClock import org.apache.spark.streaming.api.java.JavaReceiverInputDStream -class FlumeStreamSuite extends TestSuiteBase { +import org.jboss.netty.channel.ChannelPipeline +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory +import org.jboss.netty.channel.socket.SocketChannel +import org.jboss.netty.handler.codec.compression._ - val testPort = 9999 +class FlumeStreamSuite extends TestSuiteBase { test("flume input stream") { + runFlumeStreamTest(false, 9998) + } + + test("flume input compressed stream") { + runFlumeStreamTest(true, 9997) + } + + def runFlumeStreamTest(enableDecompression: Boolean, testPort: Int) { // Set up the streaming context and input streams val ssc = new StreamingContext(conf, batchDuration) val flumeStream: JavaReceiverInputDStream[SparkFlumeEvent] = - FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK) + FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, enableDecompression) val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]] with SynchronizedBuffer[Seq[SparkFlumeEvent]] val outputStream = new TestOutputStream(flumeStream.receiverInputDStream, outputBuffer) @@ -52,8 +63,17 @@ class FlumeStreamSuite extends TestSuiteBase { val input = Seq(1, 2, 3, 4, 5) Thread.sleep(1000) val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort)) - val client = SpecificRequestor.getClient( - classOf[AvroSourceProtocol], transceiver) + var client: AvroSourceProtocol = null; + + if (enableDecompression) { + client = SpecificRequestor.getClient( + classOf[AvroSourceProtocol], + new NettyTransceiver(new InetSocketAddress("localhost", testPort), + new CompressionChannelFactory(6))); + } else { + client = SpecificRequestor.getClient( + classOf[AvroSourceProtocol], transceiver) + } for (i <- 0 until input.size) { val event = new AvroFlumeEvent @@ -64,6 +84,8 @@ class FlumeStreamSuite extends TestSuiteBase { clock.addToTime(batchDuration.milliseconds) } + Thread.sleep(1000) + val startTime = System.currentTimeMillis() while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size) @@ -85,4 +107,13 @@ class FlumeStreamSuite extends TestSuiteBase { assert(outputBuffer(i).head.event.getHeaders.get("test") === "header") } } + + class CompressionChannelFactory(compressionLevel: Int) extends NioClientSocketChannelFactory { + override def newChannel(pipeline:ChannelPipeline) : SocketChannel = { + var encoder : ZlibEncoder = new ZlibEncoder(compressionLevel); + pipeline.addFirst("deflater", encoder); + pipeline.addFirst("inflater", new ZlibDecoder()); + super.newChannel(pipeline); + } + } } |