diff options
author | Sean Owen <sowen@cloudera.com> | 2017-04-10 20:11:56 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2017-04-10 20:11:56 +0100 |
commit | a26e3ed5e414d0a350cfe65dd511b154868b9f1d (patch) | |
tree | f8bf8feabae7acdd5b2c29e38273fddb80e3de33 /external | |
parent | fd711ea13e558f0e7d3e01f08e01444d394499a6 (diff) | |
download | spark-a26e3ed5e414d0a350cfe65dd511b154868b9f1d.tar.gz spark-a26e3ed5e414d0a350cfe65dd511b154868b9f1d.tar.bz2 spark-a26e3ed5e414d0a350cfe65dd511b154868b9f1d.zip |
[SPARK-20156][CORE][SQL][STREAMING][MLLIB] Java String toLowerCase "Turkish locale bug" causes Spark problems
## What changes were proposed in this pull request?
Add Locale.ROOT to internal calls to String `toLowerCase`, `toUpperCase`, to avoid inadvertent locale-sensitive variation in behavior (aka the "Turkish locale problem").
The change looks large but it is just adding `Locale.ROOT` (the locale with no country or language specified) to every call to these methods.
## How was this patch tested?
Existing tests.
Author: Sean Owen <sowen@cloudera.com>
Closes #17527 from srowen/SPARK-20156.
Diffstat (limited to 'external')
6 files changed, 38 insertions, 30 deletions
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 58b52692b5..ab1ce347cb 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} -import java.util.UUID +import java.util.{Locale, UUID} import scala.collection.JavaConverters._ @@ -74,11 +74,11 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister // id. Hence, we should generate a unique id for each query. val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}" - val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase, v) } + val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) } val specifiedKafkaParams = parameters .keySet - .filter(_.toLowerCase.startsWith("kafka.")) + .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka.")) .map { k => k.drop(6).toString -> parameters(k) } .toMap @@ -115,11 +115,11 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister // partial data since Kafka will assign partitions to multiple consumers having the same group // id. Hence, we should generate a unique id for each query. val uniqueGroupId = s"spark-kafka-relation-${UUID.randomUUID}" - val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase, v) } + val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) } val specifiedKafkaParams = parameters .keySet - .filter(_.toLowerCase.startsWith("kafka.")) + .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka.")) .map { k => k.drop(6).toString -> parameters(k) } .toMap @@ -192,7 +192,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister } private def kafkaParamsForProducer(parameters: Map[String, String]): Map[String, String] = { - val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase, v) } + val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) } if (caseInsensitiveParams.contains(s"kafka.${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}")) { throw new IllegalArgumentException( s"Kafka option '${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}' is not supported as keys " @@ -207,7 +207,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister } parameters .keySet - .filter(_.toLowerCase.startsWith("kafka.")) + .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka.")) .map { k => k.drop(6).toString -> parameters(k) } .toMap + (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[ByteArraySerializer].getName, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[ByteArraySerializer].getName) @@ -272,7 +272,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister private def validateGeneralOptions(parameters: Map[String, String]): Unit = { // Validate source options - val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase, v) } + val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) } val specifiedStrategies = caseInsensitiveParams.filter { case (k, _) => STRATEGY_OPTION_KEYS.contains(k) }.toSeq @@ -451,8 +451,10 @@ private[kafka010] object KafkaSourceProvider { offsetOptionKey: String, defaultOffsets: KafkaOffsetRangeLimit): KafkaOffsetRangeLimit = { params.get(offsetOptionKey).map(_.trim) match { - case Some(offset) if offset.toLowerCase == "latest" => LatestOffsetRangeLimit - case Some(offset) if offset.toLowerCase == "earliest" => EarliestOffsetRangeLimit + case Some(offset) if offset.toLowerCase(Locale.ROOT) == "latest" => + LatestOffsetRangeLimit + case Some(offset) if offset.toLowerCase(Locale.ROOT) == "earliest" => + EarliestOffsetRangeLimit case Some(json) => SpecificOffsetRangeLimit(JsonUtils.partitionOffsets(json)) case None => defaultOffsets } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala index 68bc3e3e2e..91893df4ec 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.kafka010 +import java.util.Locale import java.util.concurrent.atomic.AtomicInteger import org.apache.kafka.common.TopicPartition @@ -195,7 +196,7 @@ class KafkaRelationSuite extends QueryTest with BeforeAndAfter with SharedSQLCon reader.load() } expectedMsgs.foreach { m => - assert(ex.getMessage.toLowerCase.contains(m.toLowerCase)) + assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(m.toLowerCase(Locale.ROOT))) } } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala index 490535623c..4bd052d249 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.kafka010 +import java.util.Locale import java.util.concurrent.atomic.AtomicInteger import org.apache.kafka.clients.producer.ProducerConfig @@ -75,7 +76,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext { .option("kafka.bootstrap.servers", testUtils.brokerAddress) .save() } - assert(ex.getMessage.toLowerCase.contains( + assert(ex.getMessage.toLowerCase(Locale.ROOT).contains( "null topic present in the data")) } @@ -92,7 +93,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext { .mode(SaveMode.Ignore) .save() } - assert(ex.getMessage.toLowerCase.contains( + assert(ex.getMessage.toLowerCase(Locale.ROOT).contains( s"save mode ignore not allowed for kafka")) // Test bad save mode Overwrite @@ -103,7 +104,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext { .mode(SaveMode.Overwrite) .save() } - assert(ex.getMessage.toLowerCase.contains( + assert(ex.getMessage.toLowerCase(Locale.ROOT).contains( s"save mode overwrite not allowed for kafka")) } @@ -233,7 +234,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext { writer.stop() } assert(ex.getMessage - .toLowerCase + .toLowerCase(Locale.ROOT) .contains("topic option required when no 'topic' attribute is present")) try { @@ -248,7 +249,8 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext { } finally { writer.stop() } - assert(ex.getMessage.toLowerCase.contains("required attribute 'value' not found")) + assert(ex.getMessage.toLowerCase(Locale.ROOT).contains( + "required attribute 'value' not found")) } test("streaming - write data with valid schema but wrong types") { @@ -270,7 +272,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext { } finally { writer.stop() } - assert(ex.getMessage.toLowerCase.contains("topic type must be a string")) + assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("topic type must be a string")) try { /* value field wrong type */ @@ -284,7 +286,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext { } finally { writer.stop() } - assert(ex.getMessage.toLowerCase.contains( + assert(ex.getMessage.toLowerCase(Locale.ROOT).contains( "value attribute type must be a string or binarytype")) try { @@ -299,7 +301,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext { } finally { writer.stop() } - assert(ex.getMessage.toLowerCase.contains( + assert(ex.getMessage.toLowerCase(Locale.ROOT).contains( "key attribute type must be a string or binarytype")) } @@ -318,7 +320,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext { } finally { writer.stop() } - assert(ex.getMessage.toLowerCase.contains("job aborted")) + assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("job aborted")) } test("streaming - exception on config serializer") { @@ -330,7 +332,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext { input.toDF(), withOptions = Map("kafka.key.serializer" -> "foo"))() } - assert(ex.getMessage.toLowerCase.contains( + assert(ex.getMessage.toLowerCase(Locale.ROOT).contains( "kafka option 'key.serializer' is not supported")) ex = intercept[IllegalArgumentException] { @@ -338,7 +340,7 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext { input.toDF(), withOptions = Map("kafka.value.serializer" -> "foo"))() } - assert(ex.getMessage.toLowerCase.contains( + assert(ex.getMessage.toLowerCase(Locale.ROOT).contains( "kafka option 'value.serializer' is not supported")) } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index 0046ba7e43..2034b9be07 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.kafka010 import java.io._ import java.nio.charset.StandardCharsets.UTF_8 import java.nio.file.{Files, Paths} -import java.util.Properties +import java.util.{Locale, Properties} import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicInteger @@ -491,7 +491,7 @@ class KafkaSourceSuite extends KafkaSourceTest { reader.load() } expectedMsgs.foreach { m => - assert(ex.getMessage.toLowerCase.contains(m.toLowerCase)) + assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(m.toLowerCase(Locale.ROOT))) } } @@ -524,7 +524,7 @@ class KafkaSourceSuite extends KafkaSourceTest { .option(s"$key", value) reader.load() } - assert(ex.getMessage.toLowerCase.contains("not supported")) + assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("not supported")) } testUnsupportedConfig("kafka.group.id") diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala index 778c06ea16..d2100fc5a4 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala @@ -17,7 +17,8 @@ package org.apache.spark.streaming.kafka010 -import java.{ lang => jl, util => ju } +import java.{lang => jl, util => ju} +import java.util.Locale import scala.collection.JavaConverters._ @@ -93,7 +94,8 @@ private case class Subscribe[K, V]( // but cant seek to a position before poll, because poll is what gets subscription partitions // So, poll, suppress the first exception, then seek val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG) - val shouldSuppress = aor != null && aor.asInstanceOf[String].toUpperCase == "NONE" + val shouldSuppress = + aor != null && aor.asInstanceOf[String].toUpperCase(Locale.ROOT) == "NONE" try { consumer.poll(0) } catch { @@ -145,7 +147,8 @@ private case class SubscribePattern[K, V]( if (!toSeek.isEmpty) { // work around KAFKA-3370 when reset is none, see explanation in Subscribe above val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG) - val shouldSuppress = aor != null && aor.asInstanceOf[String].toUpperCase == "NONE" + val shouldSuppress = + aor != null && aor.asInstanceOf[String].toUpperCase(Locale.ROOT) == "NONE" try { consumer.poll(0) } catch { diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index d5aef8184f..78230725f3 100644 --- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -20,7 +20,7 @@ package org.apache.spark.streaming.kafka import java.io.OutputStream import java.lang.{Integer => JInt, Long => JLong, Number => JNumber} import java.nio.charset.StandardCharsets -import java.util.{List => JList, Map => JMap, Set => JSet} +import java.util.{List => JList, Locale, Map => JMap, Set => JSet} import scala.collection.JavaConverters._ import scala.reflect.ClassTag @@ -206,7 +206,7 @@ object KafkaUtils { kafkaParams: Map[String, String], topics: Set[String] ): Map[TopicAndPartition, Long] = { - val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase) + val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase(Locale.ROOT)) val result = for { topicPartitions <- kc.getPartitions(topics).right leaderOffsets <- (if (reset == Some("smallest")) { |