aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark')
-rw-r--r--python/pyspark/streaming/tests.py8
1 files changed, 5 insertions, 3 deletions
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 7c06c20345..33ea8c9293 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -606,7 +606,6 @@ class KafkaStreamTests(PySparkStreamingTestCase):
result = {}
for i in rdd.map(lambda x: x[1]).collect():
result[i] = result.get(i, 0) + 1
-
self.assertEqual(sendData, result)
def test_kafka_stream(self):
@@ -616,6 +615,7 @@ class KafkaStreamTests(PySparkStreamingTestCase):
self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)
+ self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))
stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(),
"test-streaming-consumer", {topic: 1},
@@ -631,6 +631,7 @@ class KafkaStreamTests(PySparkStreamingTestCase):
self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)
+ self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))
stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams)
self._validateStreamResult(sendData, stream)
@@ -645,6 +646,7 @@ class KafkaStreamTests(PySparkStreamingTestCase):
self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)
+ self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))
stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams, fromOffsets)
self._validateStreamResult(sendData, stream)
@@ -659,7 +661,7 @@ class KafkaStreamTests(PySparkStreamingTestCase):
self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)
-
+ self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))
rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges)
self._validateRddResult(sendData, rdd)
@@ -675,7 +677,7 @@ class KafkaStreamTests(PySparkStreamingTestCase):
self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)
-
+ self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))
rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges, leaders)
self._validateRddResult(sendData, rdd)