diff options
Diffstat (limited to 'external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala')
-rw-r--r-- | external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala | 19 |
1 files changed, 19 insertions, 0 deletions
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index 534fb77c9c..bf6aad671a 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -37,6 +37,7 @@ import org.apache.spark.SparkContext import org.apache.spark.sql.ForeachWriter import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.functions.{count, window} +import org.apache.spark.sql.kafka010.KafkaSourceProvider._ import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest} import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession} import org.apache.spark.util.Utils @@ -606,6 +607,24 @@ class KafkaSourceSuite extends KafkaSourceTest { assert(query.exception.isEmpty) } + test("get offsets from case insensitive parameters") { + for ((optionKey, optionValue, answer) <- Seq( + (STARTING_OFFSETS_OPTION_KEY, "earLiEst", EarliestOffsetRangeLimit), + (ENDING_OFFSETS_OPTION_KEY, "laTest", LatestOffsetRangeLimit), + (STARTING_OFFSETS_OPTION_KEY, """{"topic-A":{"0":23}}""", + SpecificOffsetRangeLimit(Map(new TopicPartition("topic-A", 0) -> 23))))) { + val offset = getKafkaOffsetRangeLimit(Map(optionKey -> optionValue), optionKey, answer) + assert(offset === answer) + } + + for ((optionKey, answer) <- Seq( + (STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit), + (ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit))) { + val offset = getKafkaOffsetRangeLimit(Map.empty, optionKey, answer) + assert(offset === answer) + } + } + private def newTopic(): String = s"topic-${topicId.getAndIncrement()}" private def assignString(topic: String, partitions: Iterable[Int]): String = { |