aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2017-04-10 20:11:56 +0100
committerSean Owen <sowen@cloudera.com>2017-04-10 20:11:56 +0100
commita26e3ed5e414d0a350cfe65dd511b154868b9f1d (patch)
treef8bf8feabae7acdd5b2c29e38273fddb80e3de33 /streaming
parentfd711ea13e558f0e7d3e01f08e01444d394499a6 (diff)
downloadspark-a26e3ed5e414d0a350cfe65dd511b154868b9f1d.tar.gz
spark-a26e3ed5e414d0a350cfe65dd511b154868b9f1d.tar.bz2
spark-a26e3ed5e414d0a350cfe65dd511b154868b9f1d.zip
[SPARK-20156][CORE][SQL][STREAMING][MLLIB] Java String toLowerCase "Turkish locale bug" causes Spark problems
## What changes were proposed in this pull request? Add Locale.ROOT to internal calls to String `toLowerCase`, `toUpperCase`, to avoid inadvertent locale-sensitive variation in behavior (aka the "Turkish locale problem"). The change looks large but it is just adding `Locale.ROOT` (the locale with no country or language specified) to every call to these methods. ## How was this patch tested? Existing tests. Author: Sean Owen <sowen@cloudera.com> Closes #17527 from srowen/SPARK-20156.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala6
-rw-r--r--streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java5
-rw-r--r--streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala3
4 files changed, 11 insertions, 7 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index 9a760e2947..931f015f03 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -17,6 +17,8 @@
package org.apache.spark.streaming.dstream
+import java.util.Locale
+
import scala.reflect.ClassTag
import org.apache.spark.SparkContext
@@ -60,7 +62,7 @@ abstract class InputDStream[T: ClassTag](_ssc: StreamingContext)
.split("(?=[A-Z])")
.filter(_.nonEmpty)
.mkString(" ")
- .toLowerCase
+ .toLowerCase(Locale.ROOT)
.capitalize
s"$newName [$id]"
}
@@ -74,7 +76,7 @@ abstract class InputDStream[T: ClassTag](_ssc: StreamingContext)
protected[streaming] override val baseScope: Option[String] = {
val scopeName = Option(ssc.sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY))
.map { json => RDDOperationScope.fromJson(json).name + s" [$id]" }
- .getOrElse(name.toLowerCase)
+ .getOrElse(name.toLowerCase(Locale.ROOT))
Some(new RDDOperationScope(scopeName).toJson)
}
diff --git a/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java
index 80513de4ee..90d1f8c503 100644
--- a/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java
+++ b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java
@@ -101,7 +101,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
JavaDStream<String> mapped = stream.mapPartitions(in -> {
String out = "";
while (in.hasNext()) {
- out = out + in.next().toUpperCase();
+ out = out + in.next().toUpperCase(Locale.ROOT);
}
return Arrays.asList(out).iterator();
});
@@ -806,7 +806,8 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
ssc, inputData, 1);
JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<String, String> mapped = pairStream.mapValues(String::toUpperCase);
+ JavaPairDStream<String, String> mapped =
+ pairStream.mapValues(s -> s.toUpperCase(Locale.ROOT));
JavaTestUtils.attachTestOutputStream(mapped);
List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
diff --git a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
index 96f8d9593d..6c86cacec8 100644
--- a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
@@ -267,7 +267,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<String> mapped = stream.mapPartitions(in -> {
StringBuilder out = new StringBuilder();
while (in.hasNext()) {
- out.append(in.next().toUpperCase(Locale.ENGLISH));
+ out.append(in.next().toUpperCase(Locale.ROOT));
}
return Arrays.asList(out.toString()).iterator();
});
@@ -1315,7 +1315,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, String> mapped =
- pairStream.mapValues(s -> s.toUpperCase(Locale.ENGLISH));
+ pairStream.mapValues(s -> s.toUpperCase(Locale.ROOT));
JavaTestUtils.attachTestOutputStream(mapped);
List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 5645996de5..eb996c93ff 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.streaming
import java.io.{File, NotSerializableException}
+import java.util.Locale
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
@@ -745,7 +746,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
val ex = intercept[IllegalStateException] {
body
}
- assert(ex.getMessage.toLowerCase().contains(expectedErrorMsg))
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(expectedErrorMsg))
}
}