diff options
author | Patrick Wendell <pwendell@gmail.com> | 2014-08-02 01:11:03 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-08-02 01:16:13 -0700 |
commit | 44460ba594fbfe5a6ee66e5121ead914bf16f9f6 (patch) | |
tree | 95645a3b9187e34e444507832b7c16bd96adfa81 /external/flume/src | |
parent | 25cad6adf6479fb00265df06d5f77599f8defd26 (diff) | |
download | spark-44460ba594fbfe5a6ee66e5121ead914bf16f9f6.tar.gz spark-44460ba594fbfe5a6ee66e5121ead914bf16f9f6.tar.bz2 spark-44460ba594fbfe5a6ee66e5121ead914bf16f9f6.zip |
HOTFIX: Fix concurrency issue in FlumePollingStreamSuite.
This has been failing on master. One possible cause is that the port
gets contended if multiple test runs happen concurrently and they
hit this test at the same time. Since this test takes a long time
(60 seconds) that's very plausible. This patch randomizes the port
used in this test to avoid contention.
Diffstat (limited to 'external/flume/src')
-rw-r--r-- | external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala | 7 |
1 files changed, 6 insertions, 1 deletions
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala index 47071d0cc4..27bf2ac962 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming.flume import java.net.InetSocketAddress import java.util.concurrent.{Callable, ExecutorCompletionService, Executors} +import java.util.Random import scala.collection.JavaConversions._ import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} @@ -37,13 +38,16 @@ import org.apache.spark.streaming.flume.sink._ class FlumePollingStreamSuite extends TestSuiteBase { - val testPort = 9999 + val random = new Random() + /** Return a port in the ephemeral range. */ + def getTestPort = random.nextInt(16382) + 49152 val batchCount = 5 val eventsPerBatch = 100 val totalEventsPerChannel = batchCount * eventsPerBatch val channelCapacity = 5000 test("flume polling test") { + val testPort = getTestPort // Set up the streaming context and input streams val ssc = new StreamingContext(conf, batchDuration) val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = @@ -77,6 +81,7 @@ class FlumePollingStreamSuite extends TestSuiteBase { } test("flume polling test multiple hosts") { + val testPort = getTestPort // Set up the streaming context and input streams val ssc = new StreamingContext(conf, batchDuration) val addresses = Seq(testPort, testPort + 1).map(new InetSocketAddress("localhost", _)) |