diff options
Diffstat (limited to 'external/kafka-0-10')
-rw-r--r-- | external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala | 9 |
1 files changed, 4 insertions, 5 deletions
diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala index c9e15bcba0..b1d90b8a82 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala @@ -544,15 +544,14 @@ class DirectKafkaStreamSuite test("using rate controller") { val topic = "backpressure" - val topicPartitions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1)) - kafkaTestUtils.createTopic(topic, 2) + kafkaTestUtils.createTopic(topic, 1) val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest") val executorKafkaParams = new JHashMap[String, Object](kafkaParams) KafkaUtils.fixKafkaParams(executorKafkaParams) - val batchIntervalMilliseconds = 100 + val batchIntervalMilliseconds = 500 val estimator = new ConstantEstimator(100) - val messages = Map("foo" -> 200) + val messages = Map("foo" -> 5000) kafkaTestUtils.sendMessages(topic, messages) val sparkConf = new SparkConf() @@ -596,7 +595,7 @@ class DirectKafkaStreamSuite estimator.updateRate(rate) // Set a new rate. // Expect blocks of data equal to "rate", scaled by the interval length in secs. val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001) - eventually(timeout(5.seconds), interval(batchIntervalMilliseconds.milliseconds)) { + eventually(timeout(5.seconds), interval(10 milliseconds)) { // Assert that rate estimator values are used to determine maxMessagesPerPartition. // Funky "-" in message makes the complete assertion message read better. assert(collectedData.asScala.exists(_.size == expectedSize), |