aboutsummaryrefslogtreecommitdiff
path: root/external/flume
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-08-02 01:11:03 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-08-02 01:16:13 -0700
commit44460ba594fbfe5a6ee66e5121ead914bf16f9f6 (patch)
tree95645a3b9187e34e444507832b7c16bd96adfa81 /external/flume
parent25cad6adf6479fb00265df06d5f77599f8defd26 (diff)
downloadspark-44460ba594fbfe5a6ee66e5121ead914bf16f9f6.tar.gz
spark-44460ba594fbfe5a6ee66e5121ead914bf16f9f6.tar.bz2
spark-44460ba594fbfe5a6ee66e5121ead914bf16f9f6.zip
HOTFIX: Fix concurrency issue in FlumePollingStreamSuite.
This has been failing on master. One possible cause is that the port gets contended if multiple test runs happen concurrently and they hit this test at the same time. Since this test takes a long time (60 seconds) that's very plausible. This patch randomizes the port used in this test to avoid contention.
Diffstat (limited to 'external/flume')
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala7
1 files changed, 6 insertions, 1 deletions
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index 47071d0cc4..27bf2ac962 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.flume
import java.net.InetSocketAddress
import java.util.concurrent.{Callable, ExecutorCompletionService, Executors}
+import java.util.Random
import scala.collection.JavaConversions._
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
@@ -37,13 +38,16 @@ import org.apache.spark.streaming.flume.sink._
class FlumePollingStreamSuite extends TestSuiteBase {
- val testPort = 9999
+ val random = new Random()
+ /** Return a port in the ephemeral range. */
+ def getTestPort = random.nextInt(16382) + 49152
val batchCount = 5
val eventsPerBatch = 100
val totalEventsPerChannel = batchCount * eventsPerBatch
val channelCapacity = 5000
test("flume polling test") {
+ val testPort = getTestPort
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
@@ -77,6 +81,7 @@ class FlumePollingStreamSuite extends TestSuiteBase {
}
test("flume polling test multiple hosts") {
+ val testPort = getTestPort
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val addresses = Seq(testPort, testPort + 1).map(new InetSocketAddress("localhost", _))