aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-11-07 10:43:36 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-11-07 10:43:36 -0800
commitb06c23db9aedae48c9eba9d702ae82fa5647cfe5 (patch)
tree84e046272c886e092f9008c44cab49dd6f5134a6 /external
parentdaa975f4bfa4f904697bf3365a4be9987032e490 (diff)
downloadspark-b06c23db9aedae48c9eba9d702ae82fa5647cfe5.tar.gz
spark-b06c23db9aedae48c9eba9d702ae82fa5647cfe5.tar.bz2
spark-b06c23db9aedae48c9eba9d702ae82fa5647cfe5.zip
[SPARK-18283][STRUCTURED STREAMING][KAFKA] Added test to check whether default starting offset in latest
## What changes were proposed in this pull request? Added test to check whether default starting offset in latest ## How was this patch tested? new unit test Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #15778 from tdas/SPARK-18283.
Diffstat (limited to 'external')
-rw-r--r--external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala24
1 files changed, 24 insertions, 0 deletions
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index ed4cc75920..89e713f92d 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -306,6 +306,30 @@ class KafkaSourceSuite extends KafkaSourceTest {
)
}
+ test("starting offset is latest by default") {
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 5)
+ testUtils.sendMessages(topic, Array("0"))
+ require(testUtils.getLatestOffsets(Set(topic)).size === 5)
+
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("subscribe", topic)
+
+ val kafka = reader.load()
+ .selectExpr("CAST(value AS STRING)")
+ .as[String]
+ val mapped = kafka.map(_.toInt)
+
+ testStream(mapped)(
+ makeSureGetOffsetCalled,
+ AddKafkaData(Set(topic), 1, 2, 3),
+ CheckAnswer(1, 2, 3) // should not have 0
+ )
+ }
+
test("bad source options") {
def testBadOptions(options: (String, String)*)(expectedMsgs: String*): Unit = {
val ex = intercept[IllegalArgumentException] {