diff options
author | Sean Owen <sowen@cloudera.com> | 2017-04-10 20:11:56 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2017-04-10 20:11:56 +0100 |
commit | a26e3ed5e414d0a350cfe65dd511b154868b9f1d (patch) | |
tree | f8bf8feabae7acdd5b2c29e38273fddb80e3de33 /streaming/src | |
parent | fd711ea13e558f0e7d3e01f08e01444d394499a6 (diff) | |
download | spark-a26e3ed5e414d0a350cfe65dd511b154868b9f1d.tar.gz spark-a26e3ed5e414d0a350cfe65dd511b154868b9f1d.tar.bz2 spark-a26e3ed5e414d0a350cfe65dd511b154868b9f1d.zip |
[SPARK-20156][CORE][SQL][STREAMING][MLLIB] Java String toLowerCase "Turkish locale bug" causes Spark problems
## What changes were proposed in this pull request?
Add Locale.ROOT to internal calls to String `toLowerCase`, `toUpperCase`, to avoid inadvertent locale-sensitive variation in behavior (aka the "Turkish locale problem").
The change looks large but it is just adding `Locale.ROOT` (the locale with no country or language specified) to every call to these methods.
## How was this patch tested?
Existing tests.
Author: Sean Owen <sowen@cloudera.com>
Closes #17527 from srowen/SPARK-20156.
Diffstat (limited to 'streaming/src')
4 files changed, 11 insertions, 7 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala index 9a760e2947..931f015f03 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala @@ -17,6 +17,8 @@ package org.apache.spark.streaming.dstream +import java.util.Locale + import scala.reflect.ClassTag import org.apache.spark.SparkContext @@ -60,7 +62,7 @@ abstract class InputDStream[T: ClassTag](_ssc: StreamingContext) .split("(?=[A-Z])") .filter(_.nonEmpty) .mkString(" ") - .toLowerCase + .toLowerCase(Locale.ROOT) .capitalize s"$newName [$id]" } @@ -74,7 +76,7 @@ abstract class InputDStream[T: ClassTag](_ssc: StreamingContext) protected[streaming] override val baseScope: Option[String] = { val scopeName = Option(ssc.sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY)) .map { json => RDDOperationScope.fromJson(json).name + s" [$id]" } - .getOrElse(name.toLowerCase) + .getOrElse(name.toLowerCase(Locale.ROOT)) Some(new RDDOperationScope(scopeName).toJson) } diff --git a/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java index 80513de4ee..90d1f8c503 100644 --- a/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java +++ b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java @@ -101,7 +101,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ JavaDStream<String> mapped = stream.mapPartitions(in -> { String out = ""; while (in.hasNext()) { - out = out + in.next().toUpperCase(); + out = out + in.next().toUpperCase(Locale.ROOT); } return Arrays.asList(out).iterator(); }); @@ -806,7 +806,8 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ ssc, inputData, 1); JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<String, String> mapped = pairStream.mapValues(String::toUpperCase); + JavaPairDStream<String, String> mapped = + pairStream.mapValues(s -> s.toUpperCase(Locale.ROOT)); JavaTestUtils.attachTestOutputStream(mapped); List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); diff --git a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java index 96f8d9593d..6c86cacec8 100644 --- a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java @@ -267,7 +267,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<String> mapped = stream.mapPartitions(in -> { StringBuilder out = new StringBuilder(); while (in.hasNext()) { - out.append(in.next().toUpperCase(Locale.ENGLISH)); + out.append(in.next().toUpperCase(Locale.ROOT)); } return Arrays.asList(out.toString()).iterator(); }); @@ -1315,7 +1315,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); JavaPairDStream<String, String> mapped = - pairStream.mapValues(s -> s.toUpperCase(Locale.ENGLISH)); + pairStream.mapValues(s -> s.toUpperCase(Locale.ROOT)); JavaTestUtils.attachTestOutputStream(mapped); List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 5645996de5..eb996c93ff 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.streaming import java.io.{File, NotSerializableException} +import java.util.Locale import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.atomic.AtomicInteger @@ -745,7 +746,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo val ex = intercept[IllegalStateException] { body } - assert(ex.getMessage.toLowerCase().contains(expectedErrorMsg)) + assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(expectedErrorMsg)) } } |