aboutsummaryrefslogtreecommitdiff
path: root/external/flume/src/test
diff options
context:
space:
mode:
authortmalaska <ted.malaska@cloudera.com>2014-07-10 13:15:02 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2014-07-10 13:15:02 -0700
commit40a8fef4e6619b4ea10a4ec9026260649ce5ae73 (patch)
tree14bdcb5f6475634e95c65b766eeec64022b6d09f /external/flume/src/test
parent369aa84e8fba883165817338ac8bf9460be74521 (diff)
downloadspark-40a8fef4e6619b4ea10a4ec9026260649ce5ae73.tar.gz
spark-40a8fef4e6619b4ea10a4ec9026260649ce5ae73.tar.bz2
spark-40a8fef4e6619b4ea10a4ec9026260649ce5ae73.zip
[SPARK-1478].3: Upgrade FlumeInputDStream's FlumeReceiver to support FLUME-1915
This is a modified version of this PR https://github.com/apache/spark/pull/1168 done by @tmalaska Adds MIMA binary check exclusions. Author: tmalaska <ted.malaska@cloudera.com> Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #1347 from tdas/FLUME-1915 and squashes the following commits: 96065df [Tathagata Das] Added Mima exclusion for FlumeReceiver. 41d5338 [tmalaska] Address line 57 that was too long 12617e5 [tmalaska] SPARK-1478: Upgrade FlumeInputDStream's Flume...
Diffstat (limited to 'external/flume/src/test')
-rw-r--r--external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java2
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala41
2 files changed, 38 insertions, 5 deletions
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
index e0ad4f1015..3b5e0c7746 100644
--- a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
+++ b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
@@ -30,5 +30,7 @@ public class JavaFlumeStreamSuite extends LocalJavaStreamingContext {
JavaReceiverInputDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
StorageLevel.MEMORY_AND_DISK_SER_2());
+ JavaReceiverInputDStream<SparkFlumeEvent> test3 = FlumeUtils.createStream(ssc, "localhost", 12345,
+ StorageLevel.MEMORY_AND_DISK_SER_2(), false);
}
}
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index dd287d0ef9..73dffef953 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -33,15 +33,26 @@ import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuite
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
-class FlumeStreamSuite extends TestSuiteBase {
+import org.jboss.netty.channel.ChannelPipeline
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
+import org.jboss.netty.channel.socket.SocketChannel
+import org.jboss.netty.handler.codec.compression._
- val testPort = 9999
+class FlumeStreamSuite extends TestSuiteBase {
test("flume input stream") {
+ runFlumeStreamTest(false, 9998)
+ }
+
+ test("flume input compressed stream") {
+ runFlumeStreamTest(true, 9997)
+ }
+
+ def runFlumeStreamTest(enableDecompression: Boolean, testPort: Int) {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val flumeStream: JavaReceiverInputDStream[SparkFlumeEvent] =
- FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK)
+ FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, enableDecompression)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream.receiverInputDStream, outputBuffer)
@@ -52,8 +63,17 @@ class FlumeStreamSuite extends TestSuiteBase {
val input = Seq(1, 2, 3, 4, 5)
Thread.sleep(1000)
val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
- val client = SpecificRequestor.getClient(
- classOf[AvroSourceProtocol], transceiver)
+ var client: AvroSourceProtocol = null;
+
+ if (enableDecompression) {
+ client = SpecificRequestor.getClient(
+ classOf[AvroSourceProtocol],
+ new NettyTransceiver(new InetSocketAddress("localhost", testPort),
+ new CompressionChannelFactory(6)));
+ } else {
+ client = SpecificRequestor.getClient(
+ classOf[AvroSourceProtocol], transceiver)
+ }
for (i <- 0 until input.size) {
val event = new AvroFlumeEvent
@@ -64,6 +84,8 @@ class FlumeStreamSuite extends TestSuiteBase {
clock.addToTime(batchDuration.milliseconds)
}
+ Thread.sleep(1000)
+
val startTime = System.currentTimeMillis()
while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size)
@@ -85,4 +107,13 @@ class FlumeStreamSuite extends TestSuiteBase {
assert(outputBuffer(i).head.event.getHeaders.get("test") === "header")
}
}
+
+ class CompressionChannelFactory(compressionLevel: Int) extends NioClientSocketChannelFactory {
+ override def newChannel(pipeline:ChannelPipeline) : SocketChannel = {
+ var encoder : ZlibEncoder = new ZlibEncoder(compressionLevel);
+ pipeline.addFirst("deflater", encoder);
+ pipeline.addFirst("inflater", new ZlibDecoder());
+ super.newChannel(pipeline);
+ }
+ }
}