aboutsummaryrefslogtreecommitdiff
path: root/external/kafka-0-10
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-07-26 00:41:46 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-07-26 00:41:46 -0700
commit03c27435aee4e319abe290771ba96e69469109ac (patch)
tree83d7803bec5ae4d1e9cbf7cf455c2118ac0a9380 /external/kafka-0-10
parent6959061f02b02afd4cef683b5eea0b7097eedee7 (diff)
downloadspark-03c27435aee4e319abe290771ba96e69469109ac.tar.gz
spark-03c27435aee4e319abe290771ba96e69469109ac.tar.bz2
spark-03c27435aee4e319abe290771ba96e69469109ac.zip
[TEST][STREAMING] Fix flaky Kafka rate controlling test
## What changes were proposed in this pull request? The current test is incorrect, because - The expected number of messages does not take into account that the topic has 2 partitions, and rate is set per partition. - Also in some cases, the test ran out of data in Kafka while waiting for the right amount of data per batch. The PR - Reduces the number of partitions to 1 - Adds more data to Kafka - Runs with 0.5 second so that batches are created slowly ## How was this patch tested? Ran many times locally, going to run it many times in Jenkins (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #14361 from tdas/kafka-rate-test-fix.
Diffstat (limited to 'external/kafka-0-10')
-rw-r--r--external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala9
1 files changed, 4 insertions, 5 deletions
diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
index c9e15bcba0..b1d90b8a82 100644
--- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
+++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
@@ -544,15 +544,14 @@ class DirectKafkaStreamSuite
test("using rate controller") {
val topic = "backpressure"
- val topicPartitions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1))
- kafkaTestUtils.createTopic(topic, 2)
+ kafkaTestUtils.createTopic(topic, 1)
val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
val executorKafkaParams = new JHashMap[String, Object](kafkaParams)
KafkaUtils.fixKafkaParams(executorKafkaParams)
- val batchIntervalMilliseconds = 100
+ val batchIntervalMilliseconds = 500
val estimator = new ConstantEstimator(100)
- val messages = Map("foo" -> 200)
+ val messages = Map("foo" -> 5000)
kafkaTestUtils.sendMessages(topic, messages)
val sparkConf = new SparkConf()
@@ -596,7 +595,7 @@ class DirectKafkaStreamSuite
estimator.updateRate(rate) // Set a new rate.
// Expect blocks of data equal to "rate", scaled by the interval length in secs.
val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001)
- eventually(timeout(5.seconds), interval(batchIntervalMilliseconds.milliseconds)) {
+ eventually(timeout(5.seconds), interval(10 milliseconds)) {
// Assert that rate estimator values are used to determine maxMessagesPerPartition.
// Funky "-" in message makes the complete assertion message read better.
assert(collectedData.asScala.exists(_.size == expectedSize),