aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2014-09-24 17:18:55 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-09-24 17:18:55 -0700
commit74fb2ecf7afc2d314f6477f8f2e6134614387453 (patch)
treeb2f687d6eae51de4b92c71cf24ab5150a444051b /external
parentbb96012b7360b099a19fecc80f0209b30f118ada (diff)
downloadspark-74fb2ecf7afc2d314f6477f8f2e6134614387453.tar.gz
spark-74fb2ecf7afc2d314f6477f8f2e6134614387453.tar.bz2
spark-74fb2ecf7afc2d314f6477f8f2e6134614387453.zip
[SPARK-3615][Streaming]Fix Kafka unit test hard coded Zookeeper port issue
Details can be seen in [SPARK-3615](https://issues.apache.org/jira/browse/SPARK-3615). Author: jerryshao <saisai.shao@intel.com> Closes #2483 from jerryshao/SPARK_3615 and squashes the following commits: 8555563 [jerryshao] Fix Kafka unit test hard coded Zookeeper port issue
Diffstat (limited to 'external')
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java2
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala46
2 files changed, 34 insertions, 14 deletions
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
index 0571454c01..efb0099c7c 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -81,7 +81,7 @@ public class JavaKafkaStreamSuite extends LocalJavaStreamingContext implements S
Predef.<Tuple2<String, Object>>conforms()));
HashMap<String, String> kafkaParams = new HashMap<String, String>();
- kafkaParams.put("zookeeper.connect", testSuite.zkConnect());
+ kafkaParams.put("zookeeper.connect", testSuite.zkHost() + ":" + testSuite.zkPort());
kafkaParams.put("group.id", "test-consumer-" + KafkaTestUtils.random().nextInt(10000));
kafkaParams.put("auto.offset.reset", "smallest");
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
index c0b55e9340..6943326eb7 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
@@ -24,7 +24,7 @@ import java.util.{Properties, Random}
import scala.collection.mutable
import kafka.admin.CreateTopicCommand
-import kafka.common.TopicAndPartition
+import kafka.common.{KafkaException, TopicAndPartition}
import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
import kafka.utils.ZKStringSerializer
import kafka.serializer.{StringDecoder, StringEncoder}
@@ -42,14 +42,13 @@ import org.apache.spark.util.Utils
class KafkaStreamSuite extends TestSuiteBase {
import KafkaTestUtils._
- val zkConnect = "localhost:2181"
+ val zkHost = "localhost"
+ var zkPort: Int = 0
val zkConnectionTimeout = 6000
val zkSessionTimeout = 6000
- val brokerPort = 9092
- val brokerProps = getBrokerConfig(brokerPort, zkConnect)
- val brokerConf = new KafkaConfig(brokerProps)
-
+ protected var brokerPort = 9092
+ protected var brokerConf: KafkaConfig = _
protected var zookeeper: EmbeddedZookeeper = _
protected var zkClient: ZkClient = _
protected var server: KafkaServer = _
@@ -59,16 +58,35 @@ class KafkaStreamSuite extends TestSuiteBase {
override def beforeFunction() {
// Zookeeper server startup
- zookeeper = new EmbeddedZookeeper(zkConnect)
+ zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
+ // Get the actual zookeeper binding port
+ zkPort = zookeeper.actualPort
logInfo("==================== 0 ====================")
- zkClient = new ZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer)
+
+ zkClient = new ZkClient(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout,
+ ZKStringSerializer)
logInfo("==================== 1 ====================")
// Kafka broker startup
- server = new KafkaServer(brokerConf)
- logInfo("==================== 2 ====================")
- server.startup()
- logInfo("==================== 3 ====================")
+ var bindSuccess: Boolean = false
+ while(!bindSuccess) {
+ try {
+ val brokerProps = getBrokerConfig(brokerPort, s"$zkHost:$zkPort")
+ brokerConf = new KafkaConfig(brokerProps)
+ server = new KafkaServer(brokerConf)
+ logInfo("==================== 2 ====================")
+ server.startup()
+ logInfo("==================== 3 ====================")
+ bindSuccess = true
+ } catch {
+ case e: KafkaException =>
+ if (e.getMessage != null && e.getMessage.contains("Socket server failed to bind to")) {
+ brokerPort += 1
+ }
+ case e: Exception => throw new Exception("Kafka server create failed", e)
+ }
+ }
+
Thread.sleep(2000)
logInfo("==================== 4 ====================")
super.beforeFunction()
@@ -92,7 +110,7 @@ class KafkaStreamSuite extends TestSuiteBase {
createTopic(topic)
produceAndSendMessage(topic, sent)
- val kafkaParams = Map("zookeeper.connect" -> zkConnect,
+ val kafkaParams = Map("zookeeper.connect" -> s"$zkHost:$zkPort",
"group.id" -> s"test-consumer-${random.nextInt(10000)}",
"auto.offset.reset" -> "smallest")
@@ -200,6 +218,8 @@ object KafkaTestUtils {
factory.configure(new InetSocketAddress(ip, port), 16)
factory.startup(zookeeper)
+ val actualPort = factory.getLocalPort
+
def shutdown() {
factory.shutdown()
Utils.deleteRecursively(snapshotDir)