aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming/tests.py
diff options
context:
space:
mode:
authorcody koeninger <cody@koeninger.org>2015-05-01 17:54:56 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-05-01 17:54:56 -0700
commit4786484076865c56c3fc23c49819b9be2933d287 (patch)
tree32d44db8cd54dcc1a7a13c0501a71b5618cf0e3a /python/pyspark/streaming/tests.py
parentb88c275e6ef6b17cd34d1c2c780b8959b41222c0 (diff)
downloadspark-4786484076865c56c3fc23c49819b9be2933d287.tar.gz
spark-4786484076865c56c3fc23c49819b9be2933d287.tar.bz2
spark-4786484076865c56c3fc23c49819b9be2933d287.zip
[SPARK-2808][Streaming][Kafka] update kafka to 0.8.2
i don't think this should be merged until after 1.3.0 is final Author: cody koeninger <cody@koeninger.org> Author: Helena Edelson <helena.edelson@datastax.com> Closes #4537 from koeninger/wip-2808-kafka-0.8.2-upgrade and squashes the following commits: 803aa2c [cody koeninger] [SPARK-2808][Streaming][Kafka] code cleanup per TD e6dfaf6 [cody koeninger] [SPARK-2808][Streaming][Kafka] pointless whitespace change to trigger jenkins again 1770abc [cody koeninger] [SPARK-2808][Streaming][Kafka] make waitUntilLeaderOffset easier to call, call it from python tests as well d4267e9 [cody koeninger] [SPARK-2808][Streaming][Kafka] fix stderr redirect in python test script 30d991d [cody koeninger] [SPARK-2808][Streaming][Kafka] remove stderr prints since it breaks python 3 syntax 1d896e2 [cody koeninger] [SPARK-2808][Streaming][Kafka] add even even more logging to python test 4c4557f [cody koeninger] [SPARK-2808][Streaming][Kafka] add even more logging to python test 115aeee [cody koeninger] Merge branch 'master' into wip-2808-kafka-0.8.2-upgrade 2712649 [cody koeninger] [SPARK-2808][Streaming][Kafka] add more logging to python test, see why its timing out in jenkins 2b92d3f [cody koeninger] [SPARK-2808][Streaming][Kafka] wait for leader offsets in the java test as well 3824ce3 [cody koeninger] [SPARK-2808][Streaming][Kafka] naming / comments per tdas 61b3464 [cody koeninger] [SPARK-2808][Streaming][Kafka] delay for second send in boundary condition test af6f3ec [cody koeninger] [SPARK-2808][Streaming][Kafka] delay test until latest leader offset matches expected value 9edab4c [cody koeninger] [SPARK-2808][Streaming][Kafka] more shots in the dark on jenkins failing test c70ee43 [cody koeninger] [SPARK-2808][Streaming][Kafka] add more asserts to test, try to figure out why it fails on jenkins but not locally 1d10751 [cody koeninger] Merge branch 'master' into wip-2808-kafka-0.8.2-upgrade ed02d2c [cody koeninger] [SPARK-2808][Streaming][Kafka] move default argument for api version to overloaded method, for binary compat 407382e [cody koeninger] [SPARK-2808][Streaming][Kafka] update kafka to 0.8.2.1 77de6c2 [cody koeninger] Merge branch 'master' into wip-2808-kafka-0.8.2-upgrade 6953429 [cody koeninger] [SPARK-2808][Streaming][Kafka] update kafka to 0.8.2 2e67c66 [Helena Edelson] #SPARK-2808 Update to Kafka 0.8.2.0 GA from beta. d9dc2bc [Helena Edelson] Merge remote-tracking branch 'upstream/master' into wip-2808-kafka-0.8.2-upgrade e768164 [Helena Edelson] #2808 update kafka to version 0.8.2
Diffstat (limited to 'python/pyspark/streaming/tests.py')
-rw-r--r--python/pyspark/streaming/tests.py8
1 files changed, 5 insertions, 3 deletions
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 7c06c20345..33ea8c9293 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -606,7 +606,6 @@ class KafkaStreamTests(PySparkStreamingTestCase):
result = {}
for i in rdd.map(lambda x: x[1]).collect():
result[i] = result.get(i, 0) + 1
-
self.assertEqual(sendData, result)
def test_kafka_stream(self):
@@ -616,6 +615,7 @@ class KafkaStreamTests(PySparkStreamingTestCase):
self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)
+ self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))
stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(),
"test-streaming-consumer", {topic: 1},
@@ -631,6 +631,7 @@ class KafkaStreamTests(PySparkStreamingTestCase):
self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)
+ self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))
stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams)
self._validateStreamResult(sendData, stream)
@@ -645,6 +646,7 @@ class KafkaStreamTests(PySparkStreamingTestCase):
self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)
+ self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))
stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams, fromOffsets)
self._validateStreamResult(sendData, stream)
@@ -659,7 +661,7 @@ class KafkaStreamTests(PySparkStreamingTestCase):
self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)
-
+ self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))
rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges)
self._validateRddResult(sendData, rdd)
@@ -675,7 +677,7 @@ class KafkaStreamTests(PySparkStreamingTestCase):
self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)
-
+ self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))
rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges, leaders)
self._validateRddResult(sendData, rdd)