aboutsummaryrefslogtreecommitdiff
path: root/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala')
-rw-r--r--external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala7
1 files changed, 4 insertions, 3 deletions
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index e1af14f95d..2d6ccb22dd 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -442,12 +442,13 @@ class KafkaSourceSuite extends KafkaSourceTest {
val mapped = kafka.map(kv => kv._2.toInt + 1)
testStream(mapped)(
+ StartStream(trigger = ProcessingTime(1)),
makeSureGetOffsetCalled,
AddKafkaData(Set(topic), 1, 2, 3),
CheckAnswer(2, 3, 4),
- AssertOnLastQueryStatus { status =>
- assert(status.triggerDetails.get("numRows.input.total").toInt > 0)
- assert(status.sourceStatuses(0).processingRate > 0.0)
+ AssertOnQuery { query =>
+ val recordsRead = query.recentProgresses.map(_.numInputRows).sum
+ recordsRead == 3
}
)
}