aboutsummaryrefslogtreecommitdiff
path: root/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala')
-rw-r--r--external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala24
1 files changed, 13 insertions, 11 deletions
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
index 490535623c..4bd052d249 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.kafka010
+import java.util.Locale
import java.util.concurrent.atomic.AtomicInteger
import org.apache.kafka.clients.producer.ProducerConfig
@@ -75,7 +76,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.save()
}
- assert(ex.getMessage.toLowerCase.contains(
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
"null topic present in the data"))
}
@@ -92,7 +93,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
.mode(SaveMode.Ignore)
.save()
}
- assert(ex.getMessage.toLowerCase.contains(
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
s"save mode ignore not allowed for kafka"))
// Test bad save mode Overwrite
@@ -103,7 +104,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
.mode(SaveMode.Overwrite)
.save()
}
- assert(ex.getMessage.toLowerCase.contains(
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
s"save mode overwrite not allowed for kafka"))
}
@@ -233,7 +234,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
writer.stop()
}
assert(ex.getMessage
- .toLowerCase
+ .toLowerCase(Locale.ROOT)
.contains("topic option required when no 'topic' attribute is present"))
try {
@@ -248,7 +249,8 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
} finally {
writer.stop()
}
- assert(ex.getMessage.toLowerCase.contains("required attribute 'value' not found"))
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
+ "required attribute 'value' not found"))
}
test("streaming - write data with valid schema but wrong types") {
@@ -270,7 +272,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
} finally {
writer.stop()
}
- assert(ex.getMessage.toLowerCase.contains("topic type must be a string"))
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("topic type must be a string"))
try {
/* value field wrong type */
@@ -284,7 +286,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
} finally {
writer.stop()
}
- assert(ex.getMessage.toLowerCase.contains(
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
"value attribute type must be a string or binarytype"))
try {
@@ -299,7 +301,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
} finally {
writer.stop()
}
- assert(ex.getMessage.toLowerCase.contains(
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
"key attribute type must be a string or binarytype"))
}
@@ -318,7 +320,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
} finally {
writer.stop()
}
- assert(ex.getMessage.toLowerCase.contains("job aborted"))
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("job aborted"))
}
test("streaming - exception on config serializer") {
@@ -330,7 +332,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
input.toDF(),
withOptions = Map("kafka.key.serializer" -> "foo"))()
}
- assert(ex.getMessage.toLowerCase.contains(
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
"kafka option 'key.serializer' is not supported"))
ex = intercept[IllegalArgumentException] {
@@ -338,7 +340,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
input.toDF(),
withOptions = Map("kafka.value.serializer" -> "foo"))()
}
- assert(ex.getMessage.toLowerCase.contains(
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
"kafka option 'value.serializer' is not supported"))
}