aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorcody koeninger <cody@koeninger.org>2015-06-07 21:42:45 +0100
committerSean Owen <sowen@cloudera.com>2015-06-07 21:42:45 +0100
commitb127ff8a0c5fb704da574d101a2d0e27ac5f463a (patch)
treec36566079d8bd031caf69899f1c0440cd0e5b206 /external
parente84815dc333a69368a48e0152f02934980768a14 (diff)
downloadspark-b127ff8a0c5fb704da574d101a2d0e27ac5f463a.tar.gz
spark-b127ff8a0c5fb704da574d101a2d0e27ac5f463a.tar.bz2
spark-b127ff8a0c5fb704da574d101a2d0e27ac5f463a.zip
[SPARK-2808] [STREAMING] [KAFKA] cleanup tests from
see if requiring producer acks eliminates the need for waitUntilLeaderOffset calls in tests Author: cody koeninger <cody@koeninger.org> Closes #5921 from koeninger/kafka-0.8.2-test-cleanup and squashes the following commits: 1e89dc8 [cody koeninger] Merge branch 'master' into kafka-0.8.2-test-cleanup 4662828 [cody koeninger] [Streaming][Kafka] filter mima issue for removal of method from private test class af1e083 [cody koeninger] Merge branch 'master' into kafka-0.8.2-test-cleanup 4298ac2 [cody koeninger] [Streaming][Kafka] update comment to trigger jenkins attempt 1274afb [cody koeninger] [Streaming][Kafka] see if requiring producer acks eliminates the need for waitUntilLeaderOffset calls in tests
Diffstat (limited to 'external')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala17
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java3
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala4
3 files changed, 2 insertions, 22 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
index 6dc4e9517d..b608b75952 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
@@ -195,6 +195,8 @@ private class KafkaTestUtils extends Logging {
val props = new Properties()
props.put("metadata.broker.list", brokerAddress)
props.put("serializer.class", classOf[StringEncoder].getName)
+ // wait for all in-sync replicas to ack sends
+ props.put("request.required.acks", "-1")
props
}
@@ -229,21 +231,6 @@ private class KafkaTestUtils extends Logging {
tryAgain(1)
}
- /** Wait until the leader offset for the given topic/partition equals the specified offset */
- def waitUntilLeaderOffset(
- topic: String,
- partition: Int,
- offset: Long): Unit = {
- eventually(Time(10000), Time(100)) {
- val kc = new KafkaCluster(Map("metadata.broker.list" -> brokerAddress))
- val tp = TopicAndPartition(topic, partition)
- val llo = kc.getLatestLeaderOffsets(Set(tp)).right.get.apply(tp).offset
- assert(
- llo == offset,
- s"$topic $partition $offset not reached after timeout")
- }
- }
-
private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
case Some(partitionState) =>
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
index 5cf3796353..a9dc6e5061 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
@@ -72,9 +72,6 @@ public class JavaKafkaRDDSuite implements Serializable {
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress());
- kafkaTestUtils.waitUntilLeaderOffset(topic1, 0, topic1data.length);
- kafkaTestUtils.waitUntilLeaderOffset(topic2, 0, topic2data.length);
-
OffsetRange[] offsetRanges = {
OffsetRange.create(topic1, 0, 0, 1),
OffsetRange.create(topic2, 0, 0, 1)
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
index 054487269a..d5baf5fd89 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
@@ -61,8 +61,6 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress,
"group.id" -> s"test-consumer-${Random.nextInt}")
- kafkaTestUtils.waitUntilLeaderOffset(topic, 0, messages.size)
-
val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size))
val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
@@ -86,7 +84,6 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
// this is the "lots of messages" case
kafkaTestUtils.sendMessages(topic, sent)
val sentCount = sent.values.sum
- kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sentCount)
// rdd defined from leaders after sending messages, should get the number sent
val rdd = getRdd(kc, Set(topic))
@@ -113,7 +110,6 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
val sentOnlyOne = Map("d" -> 1)
kafkaTestUtils.sendMessages(topic, sentOnlyOne)
- kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sentCount + 1)
assert(rdd2.isDefined)
assert(rdd2.get.count === 0, "got messages when there shouldn't be any")