aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala7
1 files changed, 6 insertions, 1 deletions
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index 47071d0cc4..27bf2ac962 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.flume
import java.net.InetSocketAddress
import java.util.concurrent.{Callable, ExecutorCompletionService, Executors}
+import java.util.Random
import scala.collection.JavaConversions._
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
@@ -37,13 +38,16 @@ import org.apache.spark.streaming.flume.sink._
class FlumePollingStreamSuite extends TestSuiteBase {
- val testPort = 9999
+ val random = new Random()
+ /** Return a port in the ephemeral range. */
+ def getTestPort = random.nextInt(16382) + 49152
val batchCount = 5
val eventsPerBatch = 100
val totalEventsPerChannel = batchCount * eventsPerBatch
val channelCapacity = 5000
test("flume polling test") {
+ val testPort = getTestPort
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
@@ -77,6 +81,7 @@ class FlumePollingStreamSuite extends TestSuiteBase {
}
test("flume polling test multiple hosts") {
+ val testPort = getTestPort
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val addresses = Seq(testPort, testPort + 1).map(new InetSocketAddress("localhost", _))