aboutsummaryrefslogtreecommitdiff
path: root/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala')
-rw-r--r--external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala19
1 files changed, 19 insertions, 0 deletions
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index 534fb77c9c..bf6aad671a 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -37,6 +37,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.sql.ForeachWriter
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions.{count, window}
+import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
import org.apache.spark.util.Utils
@@ -606,6 +607,24 @@ class KafkaSourceSuite extends KafkaSourceTest {
assert(query.exception.isEmpty)
}
+ test("get offsets from case insensitive parameters") {
+ for ((optionKey, optionValue, answer) <- Seq(
+ (STARTING_OFFSETS_OPTION_KEY, "earLiEst", EarliestOffsetRangeLimit),
+ (ENDING_OFFSETS_OPTION_KEY, "laTest", LatestOffsetRangeLimit),
+ (STARTING_OFFSETS_OPTION_KEY, """{"topic-A":{"0":23}}""",
+ SpecificOffsetRangeLimit(Map(new TopicPartition("topic-A", 0) -> 23))))) {
+ val offset = getKafkaOffsetRangeLimit(Map(optionKey -> optionValue), optionKey, answer)
+ assert(offset === answer)
+ }
+
+ for ((optionKey, answer) <- Seq(
+ (STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit),
+ (ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit))) {
+ val offset = getKafkaOffsetRangeLimit(Map.empty, optionKey, answer)
+ assert(offset === answer)
+ }
+ }
+
private def newTopic(): String = s"topic-${topicId.getAndIncrement()}"
private def assignString(topic: String, partitions: Iterable[Int]): String = {