aboutsummaryrefslogtreecommitdiff
path: root/external/kafka/src/main
diff options
context:
space:
mode:
authorcody koeninger <cody@koeninger.org>2015-06-07 21:42:45 +0100
committerSean Owen <sowen@cloudera.com>2015-06-07 21:42:45 +0100
commitb127ff8a0c5fb704da574d101a2d0e27ac5f463a (patch)
treec36566079d8bd031caf69899f1c0440cd0e5b206 /external/kafka/src/main
parente84815dc333a69368a48e0152f02934980768a14 (diff)
downloadspark-b127ff8a0c5fb704da574d101a2d0e27ac5f463a.tar.gz
spark-b127ff8a0c5fb704da574d101a2d0e27ac5f463a.tar.bz2
spark-b127ff8a0c5fb704da574d101a2d0e27ac5f463a.zip
[SPARK-2808] [STREAMING] [KAFKA] cleanup tests from
see if requiring producer acks eliminates the need for waitUntilLeaderOffset calls in tests Author: cody koeninger <cody@koeninger.org> Closes #5921 from koeninger/kafka-0.8.2-test-cleanup and squashes the following commits: 1e89dc8 [cody koeninger] Merge branch 'master' into kafka-0.8.2-test-cleanup 4662828 [cody koeninger] [Streaming][Kafka] filter mima issue for removal of method from private test class af1e083 [cody koeninger] Merge branch 'master' into kafka-0.8.2-test-cleanup 4298ac2 [cody koeninger] [Streaming][Kafka] update comment to trigger jenkins attempt 1274afb [cody koeninger] [Streaming][Kafka] see if requiring producer acks eliminates the need for waitUntilLeaderOffset calls in tests
Diffstat (limited to 'external/kafka/src/main')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala17
1 files changed, 2 insertions, 15 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
index 6dc4e9517d..b608b75952 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
@@ -195,6 +195,8 @@ private class KafkaTestUtils extends Logging {
val props = new Properties()
props.put("metadata.broker.list", brokerAddress)
props.put("serializer.class", classOf[StringEncoder].getName)
+ // wait for all in-sync replicas to ack sends
+ props.put("request.required.acks", "-1")
props
}
@@ -229,21 +231,6 @@ private class KafkaTestUtils extends Logging {
tryAgain(1)
}
- /** Wait until the leader offset for the given topic/partition equals the specified offset */
- def waitUntilLeaderOffset(
- topic: String,
- partition: Int,
- offset: Long): Unit = {
- eventually(Time(10000), Time(100)) {
- val kc = new KafkaCluster(Map("metadata.broker.list" -> brokerAddress))
- val tp = TopicAndPartition(topic, partition)
- val llo = kc.getLatestLeaderOffsets(Set(tp)).right.get.apply(tp).offset
- assert(
- llo == offset,
- s"$topic $partition $offset not reached after timeout")
- }
- }
-
private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
case Some(partitionState) =>