diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2016-07-26 00:41:46 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2016-07-26 00:41:46 -0700 |
commit | 03c27435aee4e319abe290771ba96e69469109ac (patch) | |
tree | 83d7803bec5ae4d1e9cbf7cf455c2118ac0a9380 | |
parent | 6959061f02b02afd4cef683b5eea0b7097eedee7 (diff) | |
download | spark-03c27435aee4e319abe290771ba96e69469109ac.tar.gz spark-03c27435aee4e319abe290771ba96e69469109ac.tar.bz2 spark-03c27435aee4e319abe290771ba96e69469109ac.zip |
[TEST][STREAMING] Fix flaky Kafka rate controlling test
## What changes were proposed in this pull request?
The current test is incorrect, because
- The expected number of messages does not take into account that the topic has 2 partitions, and rate is set per partition.
- Also in some cases, the test ran out of data in Kafka while waiting for the right amount of data per batch.
The PR
- Reduces the number of partitions to 1
- Adds more data to Kafka
- Runs with 0.5 second so that batches are created slowly
## How was this patch tested?
Ran many times locally, going to run it many times in Jenkins
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #14361 from tdas/kafka-rate-test-fix.
-rw-r--r-- | external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala | 9 |
1 files changed, 4 insertions, 5 deletions
diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala index c9e15bcba0..b1d90b8a82 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala @@ -544,15 +544,14 @@ class DirectKafkaStreamSuite test("using rate controller") { val topic = "backpressure" - val topicPartitions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1)) - kafkaTestUtils.createTopic(topic, 2) + kafkaTestUtils.createTopic(topic, 1) val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest") val executorKafkaParams = new JHashMap[String, Object](kafkaParams) KafkaUtils.fixKafkaParams(executorKafkaParams) - val batchIntervalMilliseconds = 100 + val batchIntervalMilliseconds = 500 val estimator = new ConstantEstimator(100) - val messages = Map("foo" -> 200) + val messages = Map("foo" -> 5000) kafkaTestUtils.sendMessages(topic, messages) val sparkConf = new SparkConf() @@ -596,7 +595,7 @@ class DirectKafkaStreamSuite estimator.updateRate(rate) // Set a new rate. // Expect blocks of data equal to "rate", scaled by the interval length in secs. val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001) - eventually(timeout(5.seconds), interval(batchIntervalMilliseconds.milliseconds)) { + eventually(timeout(5.seconds), interval(10 milliseconds)) { // Assert that rate estimator values are used to determine maxMessagesPerPartition. // Funky "-" in message makes the complete assertion message read better. assert(collectedData.asScala.exists(_.size == expectedSize), |