aboutsummaryrefslogtreecommitdiff
path: root/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
diff options
context:
space:
mode:
Diffstat (limited to 'external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala')
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala9
1 files changed, 6 insertions, 3 deletions
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
index 778c06ea16..d2100fc5a4 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
@@ -17,7 +17,8 @@
package org.apache.spark.streaming.kafka010
-import java.{ lang => jl, util => ju }
+import java.{lang => jl, util => ju}
+import java.util.Locale
import scala.collection.JavaConverters._
@@ -93,7 +94,8 @@ private case class Subscribe[K, V](
// but cant seek to a position before poll, because poll is what gets subscription partitions
// So, poll, suppress the first exception, then seek
val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
- val shouldSuppress = aor != null && aor.asInstanceOf[String].toUpperCase == "NONE"
+ val shouldSuppress =
+ aor != null && aor.asInstanceOf[String].toUpperCase(Locale.ROOT) == "NONE"
try {
consumer.poll(0)
} catch {
@@ -145,7 +147,8 @@ private case class SubscribePattern[K, V](
if (!toSeek.isEmpty) {
// work around KAFKA-3370 when reset is none, see explanation in Subscribe above
val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
- val shouldSuppress = aor != null && aor.asInstanceOf[String].toUpperCase == "NONE"
+ val shouldSuppress =
+ aor != null && aor.asInstanceOf[String].toUpperCase(Locale.ROOT) == "NONE"
try {
consumer.poll(0)
} catch {