diff options
Diffstat (limited to 'external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala')
-rw-r--r-- | external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala | 7 |
1 files changed, 4 insertions, 3 deletions
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index e1af14f95d..2d6ccb22dd 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -442,12 +442,13 @@ class KafkaSourceSuite extends KafkaSourceTest { val mapped = kafka.map(kv => kv._2.toInt + 1) testStream(mapped)( + StartStream(trigger = ProcessingTime(1)), makeSureGetOffsetCalled, AddKafkaData(Set(topic), 1, 2, 3), CheckAnswer(2, 3, 4), - AssertOnLastQueryStatus { status => - assert(status.triggerDetails.get("numRows.input.total").toInt > 0) - assert(status.sourceStatuses(0).processingRate > 0.0) + AssertOnQuery { query => + val recordsRead = query.recentProgresses.map(_.numInputRows).sum + recordsRead == 3 } ) } |