aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
Diffstat (limited to 'external')
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala5
-rw-r--r--external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala4
2 files changed, 6 insertions, 3 deletions
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index 322de7bf2f..51d273af8d 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -28,6 +28,7 @@ import scala.language.postfixOps
import com.google.common.base.Charsets
import org.apache.avro.ipc.NettyTransceiver
import org.apache.avro.ipc.specific.SpecificRequestor
+import org.apache.commons.lang3.RandomUtils
import org.apache.flume.source.avro
import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
import org.jboss.netty.channel.ChannelPipeline
@@ -40,7 +41,6 @@ import org.scalatest.concurrent.Eventually._
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext, TestOutputStream}
-import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerReceiverStarted}
import org.apache.spark.util.Utils
class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with Logging {
@@ -76,7 +76,8 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with L
/** Find a free port */
private def findFreePort(): Int = {
- Utils.startServiceOnPort(23456, (trialPort: Int) => {
+ val candidatePort = RandomUtils.nextInt(1024, 65536)
+ Utils.startServiceOnPort(candidatePort, (trialPort: Int) => {
val socket = new ServerSocket(trialPort)
socket.close()
(null, trialPort)
diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
index 0f3298af62..24d78ecb3a 100644
--- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -25,6 +25,7 @@ import scala.concurrent.duration._
import scala.language.postfixOps
import org.apache.activemq.broker.{TransportConnector, BrokerService}
+import org.apache.commons.lang3.RandomUtils
import org.eclipse.paho.client.mqttv3._
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
@@ -113,7 +114,8 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
}
private def findFreePort(): Int = {
- Utils.startServiceOnPort(23456, (trialPort: Int) => {
+ val candidatePort = RandomUtils.nextInt(1024, 65536)
+ Utils.startServiceOnPort(candidatePort, (trialPort: Int) => {
val socket = new ServerSocket(trialPort)
socket.close()
(null, trialPort)