aboutsummaryrefslogtreecommitdiff
path: root/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2017-04-10 20:11:56 +0100
committerSean Owen <sowen@cloudera.com>2017-04-10 20:11:56 +0100
commita26e3ed5e414d0a350cfe65dd511b154868b9f1d (patch)
treef8bf8feabae7acdd5b2c29e38273fddb80e3de33 /external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
parentfd711ea13e558f0e7d3e01f08e01444d394499a6 (diff)
downloadspark-a26e3ed5e414d0a350cfe65dd511b154868b9f1d.tar.gz
spark-a26e3ed5e414d0a350cfe65dd511b154868b9f1d.tar.bz2
spark-a26e3ed5e414d0a350cfe65dd511b154868b9f1d.zip
[SPARK-20156][CORE][SQL][STREAMING][MLLIB] Java String toLowerCase "Turkish locale bug" causes Spark problems
## What changes were proposed in this pull request? Add Locale.ROOT to internal calls to String `toLowerCase`, `toUpperCase`, to avoid inadvertent locale-sensitive variation in behavior (aka the "Turkish locale problem"). The change looks large but it is just adding `Locale.ROOT` (the locale with no country or language specified) to every call to these methods. ## How was this patch tested? Existing tests. Author: Sean Owen <sowen@cloudera.com> Closes #17527 from srowen/SPARK-20156.
Diffstat (limited to 'external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala')
-rw-r--r--external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala24
1 files changed, 13 insertions, 11 deletions
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
index 490535623c..4bd052d249 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.kafka010
+import java.util.Locale
import java.util.concurrent.atomic.AtomicInteger
import org.apache.kafka.clients.producer.ProducerConfig
@@ -75,7 +76,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.save()
}
- assert(ex.getMessage.toLowerCase.contains(
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
"null topic present in the data"))
}
@@ -92,7 +93,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
.mode(SaveMode.Ignore)
.save()
}
- assert(ex.getMessage.toLowerCase.contains(
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
s"save mode ignore not allowed for kafka"))
// Test bad save mode Overwrite
@@ -103,7 +104,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
.mode(SaveMode.Overwrite)
.save()
}
- assert(ex.getMessage.toLowerCase.contains(
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
s"save mode overwrite not allowed for kafka"))
}
@@ -233,7 +234,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
writer.stop()
}
assert(ex.getMessage
- .toLowerCase
+ .toLowerCase(Locale.ROOT)
.contains("topic option required when no 'topic' attribute is present"))
try {
@@ -248,7 +249,8 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
} finally {
writer.stop()
}
- assert(ex.getMessage.toLowerCase.contains("required attribute 'value' not found"))
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
+ "required attribute 'value' not found"))
}
test("streaming - write data with valid schema but wrong types") {
@@ -270,7 +272,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
} finally {
writer.stop()
}
- assert(ex.getMessage.toLowerCase.contains("topic type must be a string"))
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("topic type must be a string"))
try {
/* value field wrong type */
@@ -284,7 +286,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
} finally {
writer.stop()
}
- assert(ex.getMessage.toLowerCase.contains(
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
"value attribute type must be a string or binarytype"))
try {
@@ -299,7 +301,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
} finally {
writer.stop()
}
- assert(ex.getMessage.toLowerCase.contains(
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
"key attribute type must be a string or binarytype"))
}
@@ -318,7 +320,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
} finally {
writer.stop()
}
- assert(ex.getMessage.toLowerCase.contains("job aborted"))
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("job aborted"))
}
test("streaming - exception on config serializer") {
@@ -330,7 +332,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
input.toDF(),
withOptions = Map("kafka.key.serializer" -> "foo"))()
}
- assert(ex.getMessage.toLowerCase.contains(
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
"kafka option 'key.serializer' is not supported"))
ex = intercept[IllegalArgumentException] {
@@ -338,7 +340,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
input.toDF(),
withOptions = Map("kafka.value.serializer" -> "foo"))()
}
- assert(ex.getMessage.toLowerCase.contains(
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
"kafka option 'value.serializer' is not supported"))
}