diff options
Diffstat (limited to 'external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala')
-rw-r--r-- | external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala | 24 |
1 files changed, 13 insertions, 11 deletions
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala index 490535623c..4bd052d249 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.kafka010 +import java.util.Locale import java.util.concurrent.atomic.AtomicInteger import org.apache.kafka.clients.producer.ProducerConfig @@ -75,7 +76,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext { .option("kafka.bootstrap.servers", testUtils.brokerAddress) .save() } - assert(ex.getMessage.toLowerCase.contains( + assert(ex.getMessage.toLowerCase(Locale.ROOT).contains( "null topic present in the data")) } @@ -92,7 +93,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext { .mode(SaveMode.Ignore) .save() } - assert(ex.getMessage.toLowerCase.contains( + assert(ex.getMessage.toLowerCase(Locale.ROOT).contains( s"save mode ignore not allowed for kafka")) // Test bad save mode Overwrite @@ -103,7 +104,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext { .mode(SaveMode.Overwrite) .save() } - assert(ex.getMessage.toLowerCase.contains( + assert(ex.getMessage.toLowerCase(Locale.ROOT).contains( s"save mode overwrite not allowed for kafka")) } @@ -233,7 +234,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext { writer.stop() } assert(ex.getMessage - .toLowerCase + .toLowerCase(Locale.ROOT) .contains("topic option required when no 'topic' attribute is present")) try { @@ -248,7 +249,8 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext { } finally { writer.stop() } - assert(ex.getMessage.toLowerCase.contains("required attribute 'value' not found")) + assert(ex.getMessage.toLowerCase(Locale.ROOT).contains( + "required attribute 'value' not found")) } test("streaming - write data with valid schema but wrong types") { @@ -270,7 +272,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext { } finally { writer.stop() } - assert(ex.getMessage.toLowerCase.contains("topic type must be a string")) + assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("topic type must be a string")) try { /* value field wrong type */ @@ -284,7 +286,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext { } finally { writer.stop() } - assert(ex.getMessage.toLowerCase.contains( + assert(ex.getMessage.toLowerCase(Locale.ROOT).contains( "value attribute type must be a string or binarytype")) try { @@ -299,7 +301,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext { } finally { writer.stop() } - assert(ex.getMessage.toLowerCase.contains( + assert(ex.getMessage.toLowerCase(Locale.ROOT).contains( "key attribute type must be a string or binarytype")) } @@ -318,7 +320,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext { } finally { writer.stop() } - assert(ex.getMessage.toLowerCase.contains("job aborted")) + assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("job aborted")) } test("streaming - exception on config serializer") { @@ -330,7 +332,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext { input.toDF(), withOptions = Map("kafka.key.serializer" -> "foo"))() } - assert(ex.getMessage.toLowerCase.contains( + assert(ex.getMessage.toLowerCase(Locale.ROOT).contains( "kafka option 'key.serializer' is not supported")) ex = intercept[IllegalArgumentException] { @@ -338,7 +340,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext { input.toDF(), withOptions = Map("kafka.value.serializer" -> "foo"))() } - assert(ex.getMessage.toLowerCase.contains( + assert(ex.getMessage.toLowerCase(Locale.ROOT).contains( "kafka option 'value.serializer' is not supported")) } |