aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2017-04-10 20:11:56 +0100
committerSean Owen <sowen@cloudera.com>2017-04-10 20:11:56 +0100
commita26e3ed5e414d0a350cfe65dd511b154868b9f1d (patch)
treef8bf8feabae7acdd5b2c29e38273fddb80e3de33 /external
parentfd711ea13e558f0e7d3e01f08e01444d394499a6 (diff)
downloadspark-a26e3ed5e414d0a350cfe65dd511b154868b9f1d.tar.gz
spark-a26e3ed5e414d0a350cfe65dd511b154868b9f1d.tar.bz2
spark-a26e3ed5e414d0a350cfe65dd511b154868b9f1d.zip
[SPARK-20156][CORE][SQL][STREAMING][MLLIB] Java String toLowerCase "Turkish locale bug" causes Spark problems
## What changes were proposed in this pull request? Add Locale.ROOT to internal calls to String `toLowerCase`, `toUpperCase`, to avoid inadvertent locale-sensitive variation in behavior (aka the "Turkish locale problem"). The change looks large but it is just adding `Locale.ROOT` (the locale with no country or language specified) to every call to these methods. ## How was this patch tested? Existing tests. Author: Sean Owen <sowen@cloudera.com> Closes #17527 from srowen/SPARK-20156.
Diffstat (limited to 'external')
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala22
-rw-r--r--external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala3
-rw-r--r--external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala24
-rw-r--r--external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala6
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala9
-rw-r--r--external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala4
6 files changed, 38 insertions, 30 deletions
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 58b52692b5..ab1ce347cb 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.kafka010
import java.{util => ju}
-import java.util.UUID
+import java.util.{Locale, UUID}
import scala.collection.JavaConverters._
@@ -74,11 +74,11 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
// id. Hence, we should generate a unique id for each query.
val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
- val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase, v) }
+ val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
val specifiedKafkaParams =
parameters
.keySet
- .filter(_.toLowerCase.startsWith("kafka."))
+ .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
.map { k => k.drop(6).toString -> parameters(k) }
.toMap
@@ -115,11 +115,11 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
// partial data since Kafka will assign partitions to multiple consumers having the same group
// id. Hence, we should generate a unique id for each query.
val uniqueGroupId = s"spark-kafka-relation-${UUID.randomUUID}"
- val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase, v) }
+ val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
val specifiedKafkaParams =
parameters
.keySet
- .filter(_.toLowerCase.startsWith("kafka."))
+ .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
.map { k => k.drop(6).toString -> parameters(k) }
.toMap
@@ -192,7 +192,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}
private def kafkaParamsForProducer(parameters: Map[String, String]): Map[String, String] = {
- val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase, v) }
+ val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
if (caseInsensitiveParams.contains(s"kafka.${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}")) {
throw new IllegalArgumentException(
s"Kafka option '${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}' is not supported as keys "
@@ -207,7 +207,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}
parameters
.keySet
- .filter(_.toLowerCase.startsWith("kafka."))
+ .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
.map { k => k.drop(6).toString -> parameters(k) }
.toMap + (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[ByteArraySerializer].getName,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[ByteArraySerializer].getName)
@@ -272,7 +272,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
private def validateGeneralOptions(parameters: Map[String, String]): Unit = {
// Validate source options
- val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase, v) }
+ val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
val specifiedStrategies =
caseInsensitiveParams.filter { case (k, _) => STRATEGY_OPTION_KEYS.contains(k) }.toSeq
@@ -451,8 +451,10 @@ private[kafka010] object KafkaSourceProvider {
offsetOptionKey: String,
defaultOffsets: KafkaOffsetRangeLimit): KafkaOffsetRangeLimit = {
params.get(offsetOptionKey).map(_.trim) match {
- case Some(offset) if offset.toLowerCase == "latest" => LatestOffsetRangeLimit
- case Some(offset) if offset.toLowerCase == "earliest" => EarliestOffsetRangeLimit
+ case Some(offset) if offset.toLowerCase(Locale.ROOT) == "latest" =>
+ LatestOffsetRangeLimit
+ case Some(offset) if offset.toLowerCase(Locale.ROOT) == "earliest" =>
+ EarliestOffsetRangeLimit
case Some(json) => SpecificOffsetRangeLimit(JsonUtils.partitionOffsets(json))
case None => defaultOffsets
}
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
index 68bc3e3e2e..91893df4ec 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.kafka010
+import java.util.Locale
import java.util.concurrent.atomic.AtomicInteger
import org.apache.kafka.common.TopicPartition
@@ -195,7 +196,7 @@ class KafkaRelationSuite extends QueryTest with BeforeAndAfter with SharedSQLCon
reader.load()
}
expectedMsgs.foreach { m =>
- assert(ex.getMessage.toLowerCase.contains(m.toLowerCase))
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(m.toLowerCase(Locale.ROOT)))
}
}
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
index 490535623c..4bd052d249 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.kafka010
+import java.util.Locale
import java.util.concurrent.atomic.AtomicInteger
import org.apache.kafka.clients.producer.ProducerConfig
@@ -75,7 +76,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.save()
}
- assert(ex.getMessage.toLowerCase.contains(
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
"null topic present in the data"))
}
@@ -92,7 +93,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
.mode(SaveMode.Ignore)
.save()
}
- assert(ex.getMessage.toLowerCase.contains(
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
s"save mode ignore not allowed for kafka"))
// Test bad save mode Overwrite
@@ -103,7 +104,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
.mode(SaveMode.Overwrite)
.save()
}
- assert(ex.getMessage.toLowerCase.contains(
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
s"save mode overwrite not allowed for kafka"))
}
@@ -233,7 +234,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
writer.stop()
}
assert(ex.getMessage
- .toLowerCase
+ .toLowerCase(Locale.ROOT)
.contains("topic option required when no 'topic' attribute is present"))
try {
@@ -248,7 +249,8 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
} finally {
writer.stop()
}
- assert(ex.getMessage.toLowerCase.contains("required attribute 'value' not found"))
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
+ "required attribute 'value' not found"))
}
test("streaming - write data with valid schema but wrong types") {
@@ -270,7 +272,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
} finally {
writer.stop()
}
- assert(ex.getMessage.toLowerCase.contains("topic type must be a string"))
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("topic type must be a string"))
try {
/* value field wrong type */
@@ -284,7 +286,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
} finally {
writer.stop()
}
- assert(ex.getMessage.toLowerCase.contains(
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
"value attribute type must be a string or binarytype"))
try {
@@ -299,7 +301,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
} finally {
writer.stop()
}
- assert(ex.getMessage.toLowerCase.contains(
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
"key attribute type must be a string or binarytype"))
}
@@ -318,7 +320,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
} finally {
writer.stop()
}
- assert(ex.getMessage.toLowerCase.contains("job aborted"))
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("job aborted"))
}
test("streaming - exception on config serializer") {
@@ -330,7 +332,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
input.toDF(),
withOptions = Map("kafka.key.serializer" -> "foo"))()
}
- assert(ex.getMessage.toLowerCase.contains(
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
"kafka option 'key.serializer' is not supported"))
ex = intercept[IllegalArgumentException] {
@@ -338,7 +340,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext {
input.toDF(),
withOptions = Map("kafka.value.serializer" -> "foo"))()
}
- assert(ex.getMessage.toLowerCase.contains(
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
"kafka option 'value.serializer' is not supported"))
}
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index 0046ba7e43..2034b9be07 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.kafka010
import java.io._
import java.nio.charset.StandardCharsets.UTF_8
import java.nio.file.{Files, Paths}
-import java.util.Properties
+import java.util.{Locale, Properties}
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger
@@ -491,7 +491,7 @@ class KafkaSourceSuite extends KafkaSourceTest {
reader.load()
}
expectedMsgs.foreach { m =>
- assert(ex.getMessage.toLowerCase.contains(m.toLowerCase))
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(m.toLowerCase(Locale.ROOT)))
}
}
@@ -524,7 +524,7 @@ class KafkaSourceSuite extends KafkaSourceTest {
.option(s"$key", value)
reader.load()
}
- assert(ex.getMessage.toLowerCase.contains("not supported"))
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("not supported"))
}
testUnsupportedConfig("kafka.group.id")
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
index 778c06ea16..d2100fc5a4 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
@@ -17,7 +17,8 @@
package org.apache.spark.streaming.kafka010
-import java.{ lang => jl, util => ju }
+import java.{lang => jl, util => ju}
+import java.util.Locale
import scala.collection.JavaConverters._
@@ -93,7 +94,8 @@ private case class Subscribe[K, V](
// but cant seek to a position before poll, because poll is what gets subscription partitions
// So, poll, suppress the first exception, then seek
val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
- val shouldSuppress = aor != null && aor.asInstanceOf[String].toUpperCase == "NONE"
+ val shouldSuppress =
+ aor != null && aor.asInstanceOf[String].toUpperCase(Locale.ROOT) == "NONE"
try {
consumer.poll(0)
} catch {
@@ -145,7 +147,8 @@ private case class SubscribePattern[K, V](
if (!toSeek.isEmpty) {
// work around KAFKA-3370 when reset is none, see explanation in Subscribe above
val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
- val shouldSuppress = aor != null && aor.asInstanceOf[String].toUpperCase == "NONE"
+ val shouldSuppress =
+ aor != null && aor.asInstanceOf[String].toUpperCase(Locale.ROOT) == "NONE"
try {
consumer.poll(0)
} catch {
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index d5aef8184f..78230725f3 100644
--- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -20,7 +20,7 @@ package org.apache.spark.streaming.kafka
import java.io.OutputStream
import java.lang.{Integer => JInt, Long => JLong, Number => JNumber}
import java.nio.charset.StandardCharsets
-import java.util.{List => JList, Map => JMap, Set => JSet}
+import java.util.{List => JList, Locale, Map => JMap, Set => JSet}
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
@@ -206,7 +206,7 @@ object KafkaUtils {
kafkaParams: Map[String, String],
topics: Set[String]
): Map[TopicAndPartition, Long] = {
- val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
+ val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase(Locale.ROOT))
val result = for {
topicPartitions <- kc.getPartitions(topics).right
leaderOffsets <- (if (reset == Some("smallest")) {