diff options
author | cody koeninger <cody@koeninger.org> | 2015-05-01 17:54:56 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-05-01 17:54:56 -0700 |
commit | 4786484076865c56c3fc23c49819b9be2933d287 (patch) | |
tree | 32d44db8cd54dcc1a7a13c0501a71b5618cf0e3a /external/kafka/src/test | |
parent | b88c275e6ef6b17cd34d1c2c780b8959b41222c0 (diff) | |
download | spark-4786484076865c56c3fc23c49819b9be2933d287.tar.gz spark-4786484076865c56c3fc23c49819b9be2933d287.tar.bz2 spark-4786484076865c56c3fc23c49819b9be2933d287.zip |
[SPARK-2808][Streaming][Kafka] update kafka to 0.8.2
i don't think this should be merged until after 1.3.0 is final
Author: cody koeninger <cody@koeninger.org>
Author: Helena Edelson <helena.edelson@datastax.com>
Closes #4537 from koeninger/wip-2808-kafka-0.8.2-upgrade and squashes the following commits:
803aa2c [cody koeninger] [SPARK-2808][Streaming][Kafka] code cleanup per TD
e6dfaf6 [cody koeninger] [SPARK-2808][Streaming][Kafka] pointless whitespace change to trigger jenkins again
1770abc [cody koeninger] [SPARK-2808][Streaming][Kafka] make waitUntilLeaderOffset easier to call, call it from python tests as well
d4267e9 [cody koeninger] [SPARK-2808][Streaming][Kafka] fix stderr redirect in python test script
30d991d [cody koeninger] [SPARK-2808][Streaming][Kafka] remove stderr prints since it breaks python 3 syntax
1d896e2 [cody koeninger] [SPARK-2808][Streaming][Kafka] add even even more logging to python test
4c4557f [cody koeninger] [SPARK-2808][Streaming][Kafka] add even more logging to python test
115aeee [cody koeninger] Merge branch 'master' into wip-2808-kafka-0.8.2-upgrade
2712649 [cody koeninger] [SPARK-2808][Streaming][Kafka] add more logging to python test, see why its timing out in jenkins
2b92d3f [cody koeninger] [SPARK-2808][Streaming][Kafka] wait for leader offsets in the java test as well
3824ce3 [cody koeninger] [SPARK-2808][Streaming][Kafka] naming / comments per tdas
61b3464 [cody koeninger] [SPARK-2808][Streaming][Kafka] delay for second send in boundary condition test
af6f3ec [cody koeninger] [SPARK-2808][Streaming][Kafka] delay test until latest leader offset matches expected value
9edab4c [cody koeninger] [SPARK-2808][Streaming][Kafka] more shots in the dark on jenkins failing test
c70ee43 [cody koeninger] [SPARK-2808][Streaming][Kafka] add more asserts to test, try to figure out why it fails on jenkins but not locally
1d10751 [cody koeninger] Merge branch 'master' into wip-2808-kafka-0.8.2-upgrade
ed02d2c [cody koeninger] [SPARK-2808][Streaming][Kafka] move default argument for api version to overloaded method, for binary compat
407382e [cody koeninger] [SPARK-2808][Streaming][Kafka] update kafka to 0.8.2.1
77de6c2 [cody koeninger] Merge branch 'master' into wip-2808-kafka-0.8.2-upgrade
6953429 [cody koeninger] [SPARK-2808][Streaming][Kafka] update kafka to 0.8.2
2e67c66 [Helena Edelson] #SPARK-2808 Update to Kafka 0.8.2.0 GA from beta.
d9dc2bc [Helena Edelson] Merge remote-tracking branch 'upstream/master' into wip-2808-kafka-0.8.2-upgrade
e768164 [Helena Edelson] #2808 update kafka to version 0.8.2
Diffstat (limited to 'external/kafka/src/test')
-rw-r--r-- | external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java | 3 | ||||
-rw-r--r-- | external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala | 32 |
2 files changed, 26 insertions, 9 deletions
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java index a9dc6e5061..5cf3796353 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java @@ -72,6 +72,9 @@ public class JavaKafkaRDDSuite implements Serializable { HashMap<String, String> kafkaParams = new HashMap<String, String>(); kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress()); + kafkaTestUtils.waitUntilLeaderOffset(topic1, 0, topic1data.length); + kafkaTestUtils.waitUntilLeaderOffset(topic2, 0, topic2data.length); + OffsetRange[] offsetRanges = { OffsetRange.create(topic1, 0, 0, 1), OffsetRange.create(topic2, 0, 0, 1) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala index 7d26ce5087..39c3fb448f 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala @@ -53,14 +53,15 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll { } test("basic usage") { - val topic = "topicbasic" + val topic = s"topicbasic-${Random.nextInt}" kafkaTestUtils.createTopic(topic) val messages = Set("the", "quick", "brown", "fox") kafkaTestUtils.sendMessages(topic, messages.toArray) - val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress, - "group.id" -> s"test-consumer-${Random.nextInt(10000)}") + "group.id" -> s"test-consumer-${Random.nextInt}") + + kafkaTestUtils.waitUntilLeaderOffset(topic, 0, messages.size) val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size)) @@ -73,27 +74,38 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll { test("iterator boundary conditions") { // the idea is to find e.g. off-by-one errors between what kafka has available and the rdd - val topic = "topic1" + val topic = s"topicboundary-${Random.nextInt}" val sent = Map("a" -> 5, "b" -> 3, "c" -> 10) kafkaTestUtils.createTopic(topic) val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress, - "group.id" -> s"test-consumer-${Random.nextInt(10000)}") + "group.id" -> s"test-consumer-${Random.nextInt}") val kc = new KafkaCluster(kafkaParams) // this is the "lots of messages" case kafkaTestUtils.sendMessages(topic, sent) + val sentCount = sent.values.sum + kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sentCount) + // rdd defined from leaders after sending messages, should get the number sent val rdd = getRdd(kc, Set(topic)) assert(rdd.isDefined) - assert(rdd.get.count === sent.values.sum, "didn't get all sent messages") - val ranges = rdd.get.asInstanceOf[HasOffsetRanges] - .offsetRanges.map(o => TopicAndPartition(o.topic, o.partition) -> o.untilOffset).toMap + val ranges = rdd.get.asInstanceOf[HasOffsetRanges].offsetRanges + val rangeCount = ranges.map(o => o.untilOffset - o.fromOffset).sum - kc.setConsumerOffsets(kafkaParams("group.id"), ranges) + assert(rangeCount === sentCount, "offset range didn't include all sent messages") + assert(rdd.get.count === sentCount, "didn't get all sent messages") + + val rangesMap = ranges.map(o => TopicAndPartition(o.topic, o.partition) -> o.untilOffset).toMap + + // make sure consumer offsets are committed before the next getRdd call + kc.setConsumerOffsets(kafkaParams("group.id"), rangesMap).fold( + err => throw new Exception(err.mkString("\n")), + _ => () + ) // this is the "0 messages" case val rdd2 = getRdd(kc, Set(topic)) @@ -101,6 +113,8 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll { val sentOnlyOne = Map("d" -> 1) kafkaTestUtils.sendMessages(topic, sentOnlyOne) + kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sentCount + 1) + assert(rdd2.isDefined) assert(rdd2.get.count === 0, "got messages when there shouldn't be any") |