diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2016-11-07 10:43:36 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-11-07 10:43:36 -0800 |
commit | b06c23db9aedae48c9eba9d702ae82fa5647cfe5 (patch) | |
tree | 84e046272c886e092f9008c44cab49dd6f5134a6 /external | |
parent | daa975f4bfa4f904697bf3365a4be9987032e490 (diff) | |
download | spark-b06c23db9aedae48c9eba9d702ae82fa5647cfe5.tar.gz spark-b06c23db9aedae48c9eba9d702ae82fa5647cfe5.tar.bz2 spark-b06c23db9aedae48c9eba9d702ae82fa5647cfe5.zip |
[SPARK-18283][STRUCTURED STREAMING][KAFKA] Added test to check whether default starting offset in latest
## What changes were proposed in this pull request?
Added test to check whether default starting offset in latest
## How was this patch tested?
new unit test
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #15778 from tdas/SPARK-18283.
Diffstat (limited to 'external')
-rw-r--r-- | external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala | 24 |
1 files changed, 24 insertions, 0 deletions
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index ed4cc75920..89e713f92d 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -306,6 +306,30 @@ class KafkaSourceSuite extends KafkaSourceTest { ) } + test("starting offset is latest by default") { + val topic = newTopic() + testUtils.createTopic(topic, partitions = 5) + testUtils.sendMessages(topic, Array("0")) + require(testUtils.getLatestOffsets(Set(topic)).size === 5) + + val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("subscribe", topic) + + val kafka = reader.load() + .selectExpr("CAST(value AS STRING)") + .as[String] + val mapped = kafka.map(_.toInt) + + testStream(mapped)( + makeSureGetOffsetCalled, + AddKafkaData(Set(topic), 1, 2, 3), + CheckAnswer(1, 2, 3) // should not have 0 + ) + } + test("bad source options") { def testBadOptions(options: (String, String)*)(expectedMsgs: String*): Unit = { val ex = intercept[IllegalArgumentException] { |