aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorcody koeninger <cody@koeninger.org>2016-07-05 11:45:54 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-07-05 11:45:54 -0700
commit1fca9da95dc9b9aaf9ae75fd7456378861d8b409 (patch)
treee2d51a61157fe6477a96617bf31e5074a14c55fe /external
parent16a2a7d714f945b06978e3bd20a58ea32f0621ac (diff)
downloadspark-1fca9da95dc9b9aaf9ae75fd7456378861d8b409.tar.gz
spark-1fca9da95dc9b9aaf9ae75fd7456378861d8b409.tar.bz2
spark-1fca9da95dc9b9aaf9ae75fd7456378861d8b409.zip
[SPARK-16212][STREAMING][KAFKA] use random port for embedded kafka
## What changes were proposed in this pull request? Testing for 0.10 uncovered an issue with a fixed port number being used in KafkaTestUtils. This is making a roughly equivalent fix for the 0.8 connector ## How was this patch tested? Unit tests, manual tests Author: cody koeninger <cody@koeninger.org> Closes #14018 from koeninger/kafka-0-8-test-port.
Diffstat (limited to 'external')
-rw-r--r--external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala6
1 files changed, 4 insertions, 2 deletions
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
index d9d4240c05..abfd7aad4c 100644
--- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
@@ -35,6 +35,7 @@ import kafka.serializer.StringEncoder
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.{ZKStringSerializer, ZkUtils}
import org.I0Itec.zkclient.ZkClient
+import org.apache.commons.lang3.RandomUtils
import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
import org.apache.spark.SparkConf
@@ -62,7 +63,8 @@ private[kafka] class KafkaTestUtils extends Logging {
// Kafka broker related configurations
private val brokerHost = "localhost"
- private var brokerPort = 9092
+ // 0.8.2 server doesn't have a boundPort method, so can't use 0 for a random port
+ private var brokerPort = RandomUtils.nextInt(1024, 65536)
private var brokerConf: KafkaConfig = _
// Kafka broker server
@@ -112,7 +114,7 @@ private[kafka] class KafkaTestUtils extends Logging {
brokerConf = new KafkaConfig(brokerConfiguration)
server = new KafkaServer(brokerConf)
server.startup()
- (server, port)
+ (server, brokerPort)
}, new SparkConf(), "KafkaBroker")
brokerReady = true