From b127ff8a0c5fb704da574d101a2d0e27ac5f463a Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Sun, 7 Jun 2015 21:42:45 +0100 Subject: [SPARK-2808] [STREAMING] [KAFKA] cleanup tests from see if requiring producer acks eliminates the need for waitUntilLeaderOffset calls in tests Author: cody koeninger Closes #5921 from koeninger/kafka-0.8.2-test-cleanup and squashes the following commits: 1e89dc8 [cody koeninger] Merge branch 'master' into kafka-0.8.2-test-cleanup 4662828 [cody koeninger] [Streaming][Kafka] filter mima issue for removal of method from private test class af1e083 [cody koeninger] Merge branch 'master' into kafka-0.8.2-test-cleanup 4298ac2 [cody koeninger] [Streaming][Kafka] update comment to trigger jenkins attempt 1274afb [cody koeninger] [Streaming][Kafka] see if requiring producer acks eliminates the need for waitUntilLeaderOffset calls in tests --- python/pyspark/streaming/tests.py | 5 ----- 1 file changed, 5 deletions(-) (limited to 'python/pyspark/streaming/tests.py') diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 46cb18b2e8..57049beea4 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -615,7 +615,6 @@ class KafkaStreamTests(PySparkStreamingTestCase): self._kafkaTestUtils.createTopic(topic) self._kafkaTestUtils.sendMessages(topic, sendData) - self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values())) stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(), "test-streaming-consumer", {topic: 1}, @@ -631,7 +630,6 @@ class KafkaStreamTests(PySparkStreamingTestCase): self._kafkaTestUtils.createTopic(topic) self._kafkaTestUtils.sendMessages(topic, sendData) - self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values())) stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams) self._validateStreamResult(sendData, stream) @@ -646,7 +644,6 @@ class KafkaStreamTests(PySparkStreamingTestCase): self._kafkaTestUtils.createTopic(topic) self._kafkaTestUtils.sendMessages(topic, sendData) - self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values())) stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams, fromOffsets) self._validateStreamResult(sendData, stream) @@ -661,7 +658,6 @@ class KafkaStreamTests(PySparkStreamingTestCase): self._kafkaTestUtils.createTopic(topic) self._kafkaTestUtils.sendMessages(topic, sendData) - self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values())) rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges) self._validateRddResult(sendData, rdd) @@ -677,7 +673,6 @@ class KafkaStreamTests(PySparkStreamingTestCase): self._kafkaTestUtils.createTopic(topic) self._kafkaTestUtils.sendMessages(topic, sendData) - self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values())) rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges, leaders) self._validateRddResult(sendData, rdd) -- cgit v1.2.3