aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala25
1 files changed, 15 insertions, 10 deletions
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index 6ee7ac974b..33235d150b 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -31,7 +31,7 @@ import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase}
import org.apache.spark.streaming.util.ManualClock
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
+import org.apache.spark.util.Utils
import org.jboss.netty.channel.ChannelPipeline
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
@@ -41,21 +41,26 @@ import org.jboss.netty.handler.codec.compression._
class FlumeStreamSuite extends TestSuiteBase {
test("flume input stream") {
- runFlumeStreamTest(false, 9998)
+ runFlumeStreamTest(false)
}
test("flume input compressed stream") {
- runFlumeStreamTest(true, 9997)
+ runFlumeStreamTest(true)
}
- def runFlumeStreamTest(enableDecompression: Boolean, testPort: Int) {
+ def runFlumeStreamTest(enableDecompression: Boolean) {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
- val flumeStream: JavaReceiverInputDStream[SparkFlumeEvent] =
- FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, enableDecompression)
+ val (flumeStream, testPort) =
+ Utils.startServiceOnPort(9997, (trialPort: Int) => {
+ val dstream = FlumeUtils.createStream(
+ ssc, "localhost", trialPort, StorageLevel.MEMORY_AND_DISK, enableDecompression)
+ (dstream, trialPort)
+ })
+
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
- val outputStream = new TestOutputStream(flumeStream.receiverInputDStream, outputBuffer)
+ val outputStream = new TestOutputStream(flumeStream, outputBuffer)
outputStream.register()
ssc.start()
@@ -63,13 +68,13 @@ class FlumeStreamSuite extends TestSuiteBase {
val input = Seq(1, 2, 3, 4, 5)
Thread.sleep(1000)
val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
- var client: AvroSourceProtocol = null;
-
+ var client: AvroSourceProtocol = null
+
if (enableDecompression) {
client = SpecificRequestor.getClient(
classOf[AvroSourceProtocol],
new NettyTransceiver(new InetSocketAddress("localhost", testPort),
- new CompressionChannelFactory(6)));
+ new CompressionChannelFactory(6)))
} else {
client = SpecificRequestor.getClient(
classOf[AvroSourceProtocol], transceiver)