aboutsummaryrefslogtreecommitdiff
path: root/external/kafka/src/test
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2015-04-09 23:14:24 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-04-09 23:14:24 -0700
commit3290d2d13bb4bd875aec14425c8e3766f9cc644b (patch)
tree7e862fb443bd1bca2b15c3fd1a1ec9b9d27912d5 /external/kafka/src/test
parente2360810f50de77f79d372cc9b46db117d451cfc (diff)
downloadspark-3290d2d13bb4bd875aec14425c8e3766f9cc644b.tar.gz
spark-3290d2d13bb4bd875aec14425c8e3766f9cc644b.tar.bz2
spark-3290d2d13bb4bd875aec14425c8e3766f9cc644b.zip
[SPARK-6211][Streaming] Add Python Kafka API unit test
Refactor the Kafka unit test and add Python API support. CC tdas davies please help to review, thanks a lot. Author: jerryshao <saisai.shao@intel.com> Author: Saisai Shao <saisai.shao@intel.com> Closes #4961 from jerryshao/SPARK-6211 and squashes the following commits: ee4b919 [jerryshao] Fixed newly merged issue 82c756e [jerryshao] Address the comments 92912d1 [jerryshao] Address the commits 0708bb1 [jerryshao] Fix rebase issue 40b47a3 [Saisai Shao] Style fix f889657 [Saisai Shao] Update the code according 8a2f3e2 [jerryshao] Address the issues 0f1b7ce [jerryshao] Still fix the bug 61a04f0 [jerryshao] Fix bugs and address the issues 64d9877 [jerryshao] Fix rebase bugs 8ad442f [jerryshao] Add kafka-assembly in run-tests 6020b00 [jerryshao] Add more debug info in Shell 8102d6e [jerryshao] Fix bug in Jenkins test fde1213 [jerryshao] Code style changes 5536f95 [jerryshao] Refactor the Kafka unit test and add Python Kafka unittest support
Diffstat (limited to 'external/kafka/src/test')
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java28
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java28
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java34
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala56
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala29
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala40
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala211
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala62
8 files changed, 180 insertions, 308 deletions
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
index d6ca6d58b5..4c1d6a03eb 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
@@ -41,24 +41,28 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext;
public class JavaDirectKafkaStreamSuite implements Serializable {
private transient JavaStreamingContext ssc = null;
- private transient KafkaStreamSuiteBase suiteBase = null;
+ private transient KafkaTestUtils kafkaTestUtils = null;
@Before
public void setUp() {
- suiteBase = new KafkaStreamSuiteBase() { };
- suiteBase.setupKafka();
- System.clearProperty("spark.driver.port");
- SparkConf sparkConf = new SparkConf()
- .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
- ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200));
+ kafkaTestUtils = new KafkaTestUtils();
+ kafkaTestUtils.setup();
+ SparkConf sparkConf = new SparkConf()
+ .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
+ ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200));
}
@After
public void tearDown() {
+ if (ssc != null) {
ssc.stop();
ssc = null;
- System.clearProperty("spark.driver.port");
- suiteBase.tearDownKafka();
+ }
+
+ if (kafkaTestUtils != null) {
+ kafkaTestUtils.teardown();
+ kafkaTestUtils = null;
+ }
}
@Test
@@ -74,7 +78,7 @@ public class JavaDirectKafkaStreamSuite implements Serializable {
sent.addAll(Arrays.asList(topic2data));
HashMap<String, String> kafkaParams = new HashMap<String, String>();
- kafkaParams.put("metadata.broker.list", suiteBase.brokerAddress());
+ kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress());
kafkaParams.put("auto.offset.reset", "smallest");
JavaDStream<String> stream1 = KafkaUtils.createDirectStream(
@@ -147,8 +151,8 @@ public class JavaDirectKafkaStreamSuite implements Serializable {
private String[] createTopicAndSendData(String topic) {
String[] data = { topic + "-1", topic + "-2", topic + "-3"};
- suiteBase.createTopic(topic);
- suiteBase.sendMessages(topic, data);
+ kafkaTestUtils.createTopic(topic);
+ kafkaTestUtils.sendMessages(topic, data);
return data;
}
}
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
index 4477b81827..a9dc6e5061 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
@@ -37,13 +37,12 @@ import org.apache.spark.api.java.function.Function;
public class JavaKafkaRDDSuite implements Serializable {
private transient JavaSparkContext sc = null;
- private transient KafkaStreamSuiteBase suiteBase = null;
+ private transient KafkaTestUtils kafkaTestUtils = null;
@Before
public void setUp() {
- suiteBase = new KafkaStreamSuiteBase() { };
- suiteBase.setupKafka();
- System.clearProperty("spark.driver.port");
+ kafkaTestUtils = new KafkaTestUtils();
+ kafkaTestUtils.setup();
SparkConf sparkConf = new SparkConf()
.setMaster("local[4]").setAppName(this.getClass().getSimpleName());
sc = new JavaSparkContext(sparkConf);
@@ -51,10 +50,15 @@ public class JavaKafkaRDDSuite implements Serializable {
@After
public void tearDown() {
- sc.stop();
- sc = null;
- System.clearProperty("spark.driver.port");
- suiteBase.tearDownKafka();
+ if (sc != null) {
+ sc.stop();
+ sc = null;
+ }
+
+ if (kafkaTestUtils != null) {
+ kafkaTestUtils.teardown();
+ kafkaTestUtils = null;
+ }
}
@Test
@@ -66,7 +70,7 @@ public class JavaKafkaRDDSuite implements Serializable {
String[] topic2data = createTopicAndSendData(topic2);
HashMap<String, String> kafkaParams = new HashMap<String, String>();
- kafkaParams.put("metadata.broker.list", suiteBase.brokerAddress());
+ kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress());
OffsetRange[] offsetRanges = {
OffsetRange.create(topic1, 0, 0, 1),
@@ -75,7 +79,7 @@ public class JavaKafkaRDDSuite implements Serializable {
HashMap<TopicAndPartition, Broker> emptyLeaders = new HashMap<TopicAndPartition, Broker>();
HashMap<TopicAndPartition, Broker> leaders = new HashMap<TopicAndPartition, Broker>();
- String[] hostAndPort = suiteBase.brokerAddress().split(":");
+ String[] hostAndPort = kafkaTestUtils.brokerAddress().split(":");
Broker broker = Broker.create(hostAndPort[0], Integer.parseInt(hostAndPort[1]));
leaders.put(new TopicAndPartition(topic1, 0), broker);
leaders.put(new TopicAndPartition(topic2, 0), broker);
@@ -144,8 +148,8 @@ public class JavaKafkaRDDSuite implements Serializable {
private String[] createTopicAndSendData(String topic) {
String[] data = { topic + "-1", topic + "-2", topic + "-3"};
- suiteBase.createTopic(topic);
- suiteBase.sendMessages(topic, data);
+ kafkaTestUtils.createTopic(topic);
+ kafkaTestUtils.sendMessages(topic, data);
return data;
}
}
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
index bad0a93eb2..540f4ceaba 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -22,9 +22,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Random;
-import scala.Predef;
import scala.Tuple2;
-import scala.collection.JavaConverters;
import kafka.serializer.StringDecoder;
import org.junit.After;
@@ -44,13 +42,12 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext;
public class JavaKafkaStreamSuite implements Serializable {
private transient JavaStreamingContext ssc = null;
private transient Random random = new Random();
- private transient KafkaStreamSuiteBase suiteBase = null;
+ private transient KafkaTestUtils kafkaTestUtils = null;
@Before
public void setUp() {
- suiteBase = new KafkaStreamSuiteBase() { };
- suiteBase.setupKafka();
- System.clearProperty("spark.driver.port");
+ kafkaTestUtils = new KafkaTestUtils();
+ kafkaTestUtils.setup();
SparkConf sparkConf = new SparkConf()
.setMaster("local[4]").setAppName(this.getClass().getSimpleName());
ssc = new JavaStreamingContext(sparkConf, new Duration(500));
@@ -58,10 +55,15 @@ public class JavaKafkaStreamSuite implements Serializable {
@After
public void tearDown() {
- ssc.stop();
- ssc = null;
- System.clearProperty("spark.driver.port");
- suiteBase.tearDownKafka();
+ if (ssc != null) {
+ ssc.stop();
+ ssc = null;
+ }
+
+ if (kafkaTestUtils != null) {
+ kafkaTestUtils.teardown();
+ kafkaTestUtils = null;
+ }
}
@Test
@@ -75,15 +77,11 @@ public class JavaKafkaStreamSuite implements Serializable {
sent.put("b", 3);
sent.put("c", 10);
- suiteBase.createTopic(topic);
- HashMap<String, Object> tmp = new HashMap<String, Object>(sent);
- suiteBase.sendMessages(topic,
- JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap(
- Predef.<Tuple2<String, Object>>conforms())
- );
+ kafkaTestUtils.createTopic(topic);
+ kafkaTestUtils.sendMessages(topic, sent);
HashMap<String, String> kafkaParams = new HashMap<String, String>();
- kafkaParams.put("zookeeper.connect", suiteBase.zkAddress());
+ kafkaParams.put("zookeeper.connect", kafkaTestUtils.zkAddress());
kafkaParams.put("group.id", "test-consumer-" + random.nextInt(10000));
kafkaParams.put("auto.offset.reset", "smallest");
@@ -126,6 +124,7 @@ public class JavaKafkaStreamSuite implements Serializable {
);
ssc.start();
+
long startTime = System.currentTimeMillis();
boolean sizeMatches = false;
while (!sizeMatches && System.currentTimeMillis() - startTime < 20000) {
@@ -136,6 +135,5 @@ public class JavaKafkaStreamSuite implements Serializable {
for (String k : sent.keySet()) {
Assert.assertEquals(sent.get(k).intValue(), result.get(k).intValue());
}
- ssc.stop();
}
}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
index 17ca9d145d..415730f555 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -27,31 +27,41 @@ import scala.language.postfixOps
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
-import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
import org.scalatest.concurrent.Eventually
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.util.Utils
-class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
- with BeforeAndAfter with BeforeAndAfterAll with Eventually {
+class DirectKafkaStreamSuite
+ extends FunSuite
+ with BeforeAndAfter
+ with BeforeAndAfterAll
+ with Eventually
+ with Logging {
val sparkConf = new SparkConf()
.setMaster("local[4]")
.setAppName(this.getClass.getSimpleName)
- var sc: SparkContext = _
- var ssc: StreamingContext = _
- var testDir: File = _
+ private var sc: SparkContext = _
+ private var ssc: StreamingContext = _
+ private var testDir: File = _
+
+ private var kafkaTestUtils: KafkaTestUtils = _
override def beforeAll {
- setupKafka()
+ kafkaTestUtils = new KafkaTestUtils
+ kafkaTestUtils.setup()
}
override def afterAll {
- tearDownKafka()
+ if (kafkaTestUtils != null) {
+ kafkaTestUtils.teardown()
+ kafkaTestUtils = null
+ }
}
after {
@@ -72,12 +82,12 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
val topics = Set("basic1", "basic2", "basic3")
val data = Map("a" -> 7, "b" -> 9)
topics.foreach { t =>
- createTopic(t)
- sendMessages(t, data)
+ kafkaTestUtils.createTopic(t)
+ kafkaTestUtils.sendMessages(t, data)
}
val totalSent = data.values.sum * topics.size
val kafkaParams = Map(
- "metadata.broker.list" -> s"$brokerAddress",
+ "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
"auto.offset.reset" -> "smallest"
)
@@ -121,9 +131,9 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
val topic = "largest"
val topicPartition = TopicAndPartition(topic, 0)
val data = Map("a" -> 10)
- createTopic(topic)
+ kafkaTestUtils.createTopic(topic)
val kafkaParams = Map(
- "metadata.broker.list" -> s"$brokerAddress",
+ "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
"auto.offset.reset" -> "largest"
)
val kc = new KafkaCluster(kafkaParams)
@@ -132,7 +142,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
}
// Send some initial messages before starting context
- sendMessages(topic, data)
+ kafkaTestUtils.sendMessages(topic, data)
eventually(timeout(10 seconds), interval(20 milliseconds)) {
assert(getLatestOffset() > 3)
}
@@ -154,7 +164,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
stream.map { _._2 }.foreachRDD { rdd => collectedData ++= rdd.collect() }
ssc.start()
val newData = Map("b" -> 10)
- sendMessages(topic, newData)
+ kafkaTestUtils.sendMessages(topic, newData)
eventually(timeout(10 seconds), interval(50 milliseconds)) {
collectedData.contains("b")
}
@@ -166,9 +176,9 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
val topic = "offset"
val topicPartition = TopicAndPartition(topic, 0)
val data = Map("a" -> 10)
- createTopic(topic)
+ kafkaTestUtils.createTopic(topic)
val kafkaParams = Map(
- "metadata.broker.list" -> s"$brokerAddress",
+ "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
"auto.offset.reset" -> "largest"
)
val kc = new KafkaCluster(kafkaParams)
@@ -177,7 +187,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
}
// Send some initial messages before starting context
- sendMessages(topic, data)
+ kafkaTestUtils.sendMessages(topic, data)
eventually(timeout(10 seconds), interval(20 milliseconds)) {
assert(getLatestOffset() >= 10)
}
@@ -200,7 +210,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
stream.foreachRDD { rdd => collectedData ++= rdd.collect() }
ssc.start()
val newData = Map("b" -> 10)
- sendMessages(topic, newData)
+ kafkaTestUtils.sendMessages(topic, newData)
eventually(timeout(10 seconds), interval(50 milliseconds)) {
collectedData.contains("b")
}
@@ -210,18 +220,18 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
// Test to verify the offset ranges can be recovered from the checkpoints
test("offset recovery") {
val topic = "recovery"
- createTopic(topic)
+ kafkaTestUtils.createTopic(topic)
testDir = Utils.createTempDir()
val kafkaParams = Map(
- "metadata.broker.list" -> s"$brokerAddress",
+ "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
"auto.offset.reset" -> "smallest"
)
// Send data to Kafka and wait for it to be received
def sendDataAndWaitForReceive(data: Seq[Int]) {
val strings = data.map { _.toString}
- sendMessages(topic, strings.map { _ -> 1}.toMap)
+ kafkaTestUtils.sendMessages(topic, strings.map { _ -> 1}.toMap)
eventually(timeout(10 seconds), interval(50 milliseconds)) {
assert(strings.forall { DirectKafkaStreamSuite.collectedData.contains })
}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
index fc9275b720..2b33d2a220 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
@@ -20,28 +20,35 @@ package org.apache.spark.streaming.kafka
import scala.util.Random
import kafka.common.TopicAndPartition
-import org.scalatest.BeforeAndAfterAll
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
-class KafkaClusterSuite extends KafkaStreamSuiteBase with BeforeAndAfterAll {
- val topic = "kcsuitetopic" + Random.nextInt(10000)
- val topicAndPartition = TopicAndPartition(topic, 0)
- var kc: KafkaCluster = null
+class KafkaClusterSuite extends FunSuite with BeforeAndAfterAll {
+ private val topic = "kcsuitetopic" + Random.nextInt(10000)
+ private val topicAndPartition = TopicAndPartition(topic, 0)
+ private var kc: KafkaCluster = null
+
+ private var kafkaTestUtils: KafkaTestUtils = _
override def beforeAll() {
- setupKafka()
- createTopic(topic)
- sendMessages(topic, Map("a" -> 1))
- kc = new KafkaCluster(Map("metadata.broker.list" -> s"$brokerAddress"))
+ kafkaTestUtils = new KafkaTestUtils
+ kafkaTestUtils.setup()
+
+ kafkaTestUtils.createTopic(topic)
+ kafkaTestUtils.sendMessages(topic, Map("a" -> 1))
+ kc = new KafkaCluster(Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress))
}
override def afterAll() {
- tearDownKafka()
+ if (kafkaTestUtils != null) {
+ kafkaTestUtils.teardown()
+ kafkaTestUtils = null
+ }
}
test("metadata apis") {
val leader = kc.findLeaders(Set(topicAndPartition)).right.get(topicAndPartition)
val leaderAddress = s"${leader._1}:${leader._2}"
- assert(leaderAddress === brokerAddress, "didn't get leader")
+ assert(leaderAddress === kafkaTestUtils.brokerAddress, "didn't get leader")
val parts = kc.getPartitions(Set(topic)).right.get
assert(parts(topicAndPartition), "didn't get partitions")
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
index a223da70b0..7d26ce5087 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
@@ -22,18 +22,22 @@ import scala.util.Random
import kafka.serializer.StringDecoder
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
-import org.scalatest.BeforeAndAfterAll
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
import org.apache.spark._
-import org.apache.spark.SparkContext._
-class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfterAll {
- val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
- var sc: SparkContext = _
+class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll {
+
+ private var kafkaTestUtils: KafkaTestUtils = _
+
+ private val sparkConf = new SparkConf().setMaster("local[4]")
+ .setAppName(this.getClass.getSimpleName)
+ private var sc: SparkContext = _
+
override def beforeAll {
sc = new SparkContext(sparkConf)
-
- setupKafka()
+ kafkaTestUtils = new KafkaTestUtils
+ kafkaTestUtils.setup()
}
override def afterAll {
@@ -41,17 +45,21 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfterAll {
sc.stop
sc = null
}
- tearDownKafka()
+
+ if (kafkaTestUtils != null) {
+ kafkaTestUtils.teardown()
+ kafkaTestUtils = null
+ }
}
test("basic usage") {
val topic = "topicbasic"
- createTopic(topic)
+ kafkaTestUtils.createTopic(topic)
val messages = Set("the", "quick", "brown", "fox")
- sendMessages(topic, messages.toArray)
+ kafkaTestUtils.sendMessages(topic, messages.toArray)
- val kafkaParams = Map("metadata.broker.list" -> brokerAddress,
+ val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress,
"group.id" -> s"test-consumer-${Random.nextInt(10000)}")
val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size))
@@ -67,15 +75,15 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfterAll {
// the idea is to find e.g. off-by-one errors between what kafka has available and the rdd
val topic = "topic1"
val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
- createTopic(topic)
+ kafkaTestUtils.createTopic(topic)
- val kafkaParams = Map("metadata.broker.list" -> brokerAddress,
+ val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress,
"group.id" -> s"test-consumer-${Random.nextInt(10000)}")
val kc = new KafkaCluster(kafkaParams)
// this is the "lots of messages" case
- sendMessages(topic, sent)
+ kafkaTestUtils.sendMessages(topic, sent)
// rdd defined from leaders after sending messages, should get the number sent
val rdd = getRdd(kc, Set(topic))
@@ -92,14 +100,14 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfterAll {
// shouldn't get anything, since message is sent after rdd was defined
val sentOnlyOne = Map("d" -> 1)
- sendMessages(topic, sentOnlyOne)
+ kafkaTestUtils.sendMessages(topic, sentOnlyOne)
assert(rdd2.isDefined)
assert(rdd2.get.count === 0, "got messages when there shouldn't be any")
// this is the "exactly 1 message" case, namely the single message from sentOnlyOne above
val rdd3 = getRdd(kc, Set(topic))
// send lots of messages after rdd was defined, they shouldn't show up
- sendMessages(topic, Map("extra" -> 22))
+ kafkaTestUtils.sendMessages(topic, Map("extra" -> 22))
assert(rdd3.isDefined)
assert(rdd3.get.count === sentOnlyOne.values.sum, "didn't get exactly one message")
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
index e4966eebb9..24699dfc33 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
@@ -17,209 +17,38 @@
package org.apache.spark.streaming.kafka
-import java.io.File
-import java.net.InetSocketAddress
-import java.util.Properties
-
import scala.collection.mutable
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Random
-import kafka.admin.AdminUtils
-import kafka.common.{KafkaException, TopicAndPartition}
-import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
-import kafka.serializer.{StringDecoder, StringEncoder}
-import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.ZKStringSerializer
-import org.I0Itec.zkclient.ZkClient
-import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
-import org.scalatest.{BeforeAndAfter, FunSuite}
+import kafka.serializer.StringDecoder
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
import org.scalatest.concurrent.Eventually
-import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
-import org.apache.spark.util.Utils
-
-/**
- * This is an abstract base class for Kafka testsuites. This has the functionality to set up
- * and tear down local Kafka servers, and to push data using Kafka producers.
- */
-abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Logging {
-
- private val zkHost = "localhost"
- private var zkPort: Int = 0
- private val zkConnectionTimeout = 6000
- private val zkSessionTimeout = 6000
- private var zookeeper: EmbeddedZookeeper = _
- private val brokerHost = "localhost"
- private var brokerPort = 9092
- private var brokerConf: KafkaConfig = _
- private var server: KafkaServer = _
- private var producer: Producer[String, String] = _
- private var zkReady = false
- private var brokerReady = false
-
- protected var zkClient: ZkClient = _
-
- def zkAddress: String = {
- assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper address")
- s"$zkHost:$zkPort"
- }
- def brokerAddress: String = {
- assert(brokerReady, "Kafka not setup yet or already torn down, cannot get broker address")
- s"$brokerHost:$brokerPort"
- }
-
- def setupKafka() {
- // Zookeeper server startup
- zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
- // Get the actual zookeeper binding port
- zkPort = zookeeper.actualPort
- zkReady = true
- logInfo("==================== Zookeeper Started ====================")
+class KafkaStreamSuite extends FunSuite with Eventually with BeforeAndAfterAll {
+ private var ssc: StreamingContext = _
+ private var kafkaTestUtils: KafkaTestUtils = _
- zkClient = new ZkClient(zkAddress, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer)
- logInfo("==================== Zookeeper Client Created ====================")
-
- // Kafka broker startup
- var bindSuccess: Boolean = false
- while(!bindSuccess) {
- try {
- val brokerProps = getBrokerConfig()
- brokerConf = new KafkaConfig(brokerProps)
- server = new KafkaServer(brokerConf)
- server.startup()
- logInfo("==================== Kafka Broker Started ====================")
- bindSuccess = true
- } catch {
- case e: KafkaException =>
- if (e.getMessage != null && e.getMessage.contains("Socket server failed to bind to")) {
- brokerPort += 1
- }
- case e: Exception => throw new Exception("Kafka server create failed", e)
- }
- }
-
- Thread.sleep(2000)
- logInfo("==================== Kafka + Zookeeper Ready ====================")
- brokerReady = true
+ override def beforeAll(): Unit = {
+ kafkaTestUtils = new KafkaTestUtils
+ kafkaTestUtils.setup()
}
- def tearDownKafka() {
- brokerReady = false
- zkReady = false
- if (producer != null) {
- producer.close()
- producer = null
- }
-
- if (server != null) {
- server.shutdown()
- server = null
- }
-
- brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) }
-
- if (zkClient != null) {
- zkClient.close()
- zkClient = null
- }
-
- if (zookeeper != null) {
- zookeeper.shutdown()
- zookeeper = null
- }
- }
-
- def createTopic(topic: String) {
- AdminUtils.createTopic(zkClient, topic, 1, 1)
- // wait until metadata is propagated
- waitUntilMetadataIsPropagated(topic, 0)
- logInfo(s"==================== Topic $topic Created ====================")
- }
-
- def sendMessages(topic: String, messageToFreq: Map[String, Int]) {
- val messages = messageToFreq.flatMap { case (s, freq) => Seq.fill(freq)(s) }.toArray
- sendMessages(topic, messages)
- }
-
- def sendMessages(topic: String, messages: Array[String]) {
- producer = new Producer[String, String](new ProducerConfig(getProducerConfig()))
- producer.send(messages.map { new KeyedMessage[String, String](topic, _ ) }: _*)
- producer.close()
- logInfo(s"==================== Sent Messages: ${messages.mkString(", ")} ====================")
- }
-
- private def getBrokerConfig(): Properties = {
- val props = new Properties()
- props.put("broker.id", "0")
- props.put("host.name", "localhost")
- props.put("port", brokerPort.toString)
- props.put("log.dir", Utils.createTempDir().getAbsolutePath)
- props.put("zookeeper.connect", zkAddress)
- props.put("log.flush.interval.messages", "1")
- props.put("replica.socket.timeout.ms", "1500")
- props
- }
-
- private def getProducerConfig(): Properties = {
- val brokerAddr = brokerConf.hostName + ":" + brokerConf.port
- val props = new Properties()
- props.put("metadata.broker.list", brokerAddr)
- props.put("serializer.class", classOf[StringEncoder].getName)
- props
- }
-
- private def waitUntilMetadataIsPropagated(topic: String, partition: Int) {
- eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
- assert(
- server.apis.metadataCache.containsTopicAndPartition(topic, partition),
- s"Partition [$topic, $partition] metadata not propagated after timeout"
- )
- }
- }
-
- class EmbeddedZookeeper(val zkConnect: String) {
- val random = new Random()
- val snapshotDir = Utils.createTempDir()
- val logDir = Utils.createTempDir()
-
- val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500)
- val (ip, port) = {
- val splits = zkConnect.split(":")
- (splits(0), splits(1).toInt)
- }
- val factory = new NIOServerCnxnFactory()
- factory.configure(new InetSocketAddress(ip, port), 16)
- factory.startup(zookeeper)
-
- val actualPort = factory.getLocalPort
-
- def shutdown() {
- factory.shutdown()
- Utils.deleteRecursively(snapshotDir)
- Utils.deleteRecursively(logDir)
- }
- }
-}
-
-
-class KafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
- var ssc: StreamingContext = _
-
- before {
- setupKafka()
- }
-
- after {
+ override def afterAll(): Unit = {
if (ssc != null) {
ssc.stop()
ssc = null
}
- tearDownKafka()
+
+ if (kafkaTestUtils != null) {
+ kafkaTestUtils.teardown()
+ kafkaTestUtils = null
+ }
}
test("Kafka input stream") {
@@ -227,10 +56,10 @@ class KafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
ssc = new StreamingContext(sparkConf, Milliseconds(500))
val topic = "topic1"
val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
- createTopic(topic)
- sendMessages(topic, sent)
+ kafkaTestUtils.createTopic(topic)
+ kafkaTestUtils.sendMessages(topic, sent)
- val kafkaParams = Map("zookeeper.connect" -> zkAddress,
+ val kafkaParams = Map("zookeeper.connect" -> kafkaTestUtils.zkAddress,
"group.id" -> s"test-consumer-${Random.nextInt(10000)}",
"auto.offset.reset" -> "smallest")
@@ -244,14 +73,14 @@ class KafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
result.put(kv._1, count)
}
}
+
ssc.start()
+
eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
assert(sent.size === result.size)
sent.keys.foreach { k =>
assert(sent(k) === result(k).toInt)
}
}
- ssc.stop()
}
}
-
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
index 3cd960d1fd..38548dd73b 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
@@ -17,7 +17,6 @@
package org.apache.spark.streaming.kafka
-
import java.io.File
import scala.collection.mutable
@@ -27,7 +26,7 @@ import scala.util.Random
import kafka.serializer.StringDecoder
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
-import org.scalatest.BeforeAndAfter
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
import org.scalatest.concurrent.Eventually
import org.apache.spark.SparkConf
@@ -35,47 +34,61 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.util.Utils
-class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually {
+class ReliableKafkaStreamSuite extends FunSuite
+ with BeforeAndAfterAll with BeforeAndAfter with Eventually {
- val sparkConf = new SparkConf()
+ private val sparkConf = new SparkConf()
.setMaster("local[4]")
.setAppName(this.getClass.getSimpleName)
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
- val data = Map("a" -> 10, "b" -> 10, "c" -> 10)
+ private val data = Map("a" -> 10, "b" -> 10, "c" -> 10)
+ private var kafkaTestUtils: KafkaTestUtils = _
- var groupId: String = _
- var kafkaParams: Map[String, String] = _
- var ssc: StreamingContext = _
- var tempDirectory: File = null
+ private var groupId: String = _
+ private var kafkaParams: Map[String, String] = _
+ private var ssc: StreamingContext = _
+ private var tempDirectory: File = null
+
+ override def beforeAll() : Unit = {
+ kafkaTestUtils = new KafkaTestUtils
+ kafkaTestUtils.setup()
- before {
- setupKafka()
groupId = s"test-consumer-${Random.nextInt(10000)}"
kafkaParams = Map(
- "zookeeper.connect" -> zkAddress,
+ "zookeeper.connect" -> kafkaTestUtils.zkAddress,
"group.id" -> groupId,
"auto.offset.reset" -> "smallest"
)
- ssc = new StreamingContext(sparkConf, Milliseconds(500))
tempDirectory = Utils.createTempDir()
+ }
+
+ override def afterAll(): Unit = {
+ Utils.deleteRecursively(tempDirectory)
+
+ if (kafkaTestUtils != null) {
+ kafkaTestUtils.teardown()
+ kafkaTestUtils = null
+ }
+ }
+
+ before {
+ ssc = new StreamingContext(sparkConf, Milliseconds(500))
ssc.checkpoint(tempDirectory.getAbsolutePath)
}
after {
if (ssc != null) {
ssc.stop()
+ ssc = null
}
- Utils.deleteRecursively(tempDirectory)
- tearDownKafka()
}
-
test("Reliable Kafka input stream with single topic") {
- var topic = "test-topic"
- createTopic(topic)
- sendMessages(topic, data)
+ val topic = "test-topic"
+ kafkaTestUtils.createTopic(topic)
+ kafkaTestUtils.sendMessages(topic, data)
// Verify whether the offset of this group/topic/partition is 0 before starting.
assert(getCommitOffset(groupId, topic, 0) === None)
@@ -91,6 +104,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter
}
}
ssc.start()
+
eventually(timeout(20000 milliseconds), interval(200 milliseconds)) {
// A basic process verification for ReliableKafkaReceiver.
// Verify whether received message number is equal to the sent message number.
@@ -100,14 +114,13 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter
// Verify the offset number whether it is equal to the total message number.
assert(getCommitOffset(groupId, topic, 0) === Some(29L))
}
- ssc.stop()
}
test("Reliable Kafka input stream with multiple topics") {
val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1)
topics.foreach { case (t, _) =>
- createTopic(t)
- sendMessages(t, data)
+ kafkaTestUtils.createTopic(t)
+ kafkaTestUtils.sendMessages(t, data)
}
// Before started, verify all the group/topic/partition offsets are 0.
@@ -118,19 +131,18 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter
ssc, kafkaParams, topics, StorageLevel.MEMORY_ONLY)
stream.foreachRDD(_ => Unit)
ssc.start()
+
eventually(timeout(20000 milliseconds), interval(100 milliseconds)) {
// Verify the offset for each group/topic to see whether they are equal to the expected one.
topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === Some(29L)) }
}
- ssc.stop()
}
/** Getting partition offset from Zookeeper. */
private def getCommitOffset(groupId: String, topic: String, partition: Int): Option[Long] = {
- assert(zkClient != null, "Zookeeper client is not initialized")
val topicDirs = new ZKGroupTopicDirs(groupId, topic)
val zkPath = s"${topicDirs.consumerOffsetDir}/$partition"
- ZkUtils.readDataMaybeNull(zkClient, zkPath)._1.map(_.toLong)
+ ZkUtils.readDataMaybeNull(kafkaTestUtils.zookeeperClient, zkPath)._1.map(_.toLong)
}
}