aboutsummaryrefslogtreecommitdiff
path: root/external/flume/src
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2014-09-30 15:18:51 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2014-09-30 15:18:51 -0700
commit8764fe368bbd72fe76ed318faad0e97a7279e2fe (patch)
treebc3a285480aa6d276278015864d5f25d14a43dc7 /external/flume/src
parentd3a3840e077802647aced1ceace1494605dda1db (diff)
downloadspark-8764fe368bbd72fe76ed318faad0e97a7279e2fe.tar.gz
spark-8764fe368bbd72fe76ed318faad0e97a7279e2fe.tar.bz2
spark-8764fe368bbd72fe76ed318faad0e97a7279e2fe.zip
SPARK-3744 [STREAMING] FlumeStreamSuite will fail during port contention
Since it looked quite easy, I took the liberty of making a quick PR that just uses `Utils.startServiceOnPort` to fix this. It works locally for me. Author: Sean Owen <sowen@cloudera.com> Closes #2601 from srowen/SPARK-3744 and squashes the following commits: ddc9319 [Sean Owen] Avoid port contention in tests by retrying several ports for Flume stream
Diffstat (limited to 'external/flume/src')
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala25
1 files changed, 15 insertions, 10 deletions
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index 6ee7ac974b..33235d150b 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -31,7 +31,7 @@ import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase}
import org.apache.spark.streaming.util.ManualClock
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
+import org.apache.spark.util.Utils
import org.jboss.netty.channel.ChannelPipeline
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
@@ -41,21 +41,26 @@ import org.jboss.netty.handler.codec.compression._
class FlumeStreamSuite extends TestSuiteBase {
test("flume input stream") {
- runFlumeStreamTest(false, 9998)
+ runFlumeStreamTest(false)
}
test("flume input compressed stream") {
- runFlumeStreamTest(true, 9997)
+ runFlumeStreamTest(true)
}
- def runFlumeStreamTest(enableDecompression: Boolean, testPort: Int) {
+ def runFlumeStreamTest(enableDecompression: Boolean) {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
- val flumeStream: JavaReceiverInputDStream[SparkFlumeEvent] =
- FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, enableDecompression)
+ val (flumeStream, testPort) =
+ Utils.startServiceOnPort(9997, (trialPort: Int) => {
+ val dstream = FlumeUtils.createStream(
+ ssc, "localhost", trialPort, StorageLevel.MEMORY_AND_DISK, enableDecompression)
+ (dstream, trialPort)
+ })
+
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
- val outputStream = new TestOutputStream(flumeStream.receiverInputDStream, outputBuffer)
+ val outputStream = new TestOutputStream(flumeStream, outputBuffer)
outputStream.register()
ssc.start()
@@ -63,13 +68,13 @@ class FlumeStreamSuite extends TestSuiteBase {
val input = Seq(1, 2, 3, 4, 5)
Thread.sleep(1000)
val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
- var client: AvroSourceProtocol = null;
-
+ var client: AvroSourceProtocol = null
+
if (enableDecompression) {
client = SpecificRequestor.getClient(
classOf[AvroSourceProtocol],
new NettyTransceiver(new InetSocketAddress("localhost", testPort),
- new CompressionChannelFactory(6)));
+ new CompressionChannelFactory(6)))
} else {
client = SpecificRequestor.getClient(
classOf[AvroSourceProtocol], transceiver)