From 1fca9da95dc9b9aaf9ae75fd7456378861d8b409 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Tue, 5 Jul 2016 11:45:54 -0700 Subject: [SPARK-16212][STREAMING][KAFKA] use random port for embedded kafka ## What changes were proposed in this pull request? Testing for 0.10 uncovered an issue with a fixed port number being used in KafkaTestUtils. This is making a roughly equivalent fix for the 0.8 connector ## How was this patch tested? Unit tests, manual tests Author: cody koeninger Closes #14018 from koeninger/kafka-0-8-test-port. --- .../scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'external/kafka-0-8') diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala index d9d4240c05..abfd7aad4c 100644 --- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala +++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala @@ -35,6 +35,7 @@ import kafka.serializer.StringEncoder import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.{ZKStringSerializer, ZkUtils} import org.I0Itec.zkclient.ZkClient +import org.apache.commons.lang3.RandomUtils import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} import org.apache.spark.SparkConf @@ -62,7 +63,8 @@ private[kafka] class KafkaTestUtils extends Logging { // Kafka broker related configurations private val brokerHost = "localhost" - private var brokerPort = 9092 + // 0.8.2 server doesn't have a boundPort method, so can't use 0 for a random port + private var brokerPort = RandomUtils.nextInt(1024, 65536) private var brokerConf: KafkaConfig = _ // Kafka broker server @@ -112,7 +114,7 @@ private[kafka] class KafkaTestUtils extends Logging { brokerConf = new KafkaConfig(brokerConfiguration) server = new KafkaServer(brokerConf) server.startup() - (server, port) + (server, brokerPort) }, new SparkConf(), "KafkaBroker") brokerReady = true -- cgit v1.2.3