aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-03-13 21:03:49 -0700
committerReynold Xin <rxin@databricks.com>2016-03-13 21:03:49 -0700
commit184085284185011d7cc6d054b54d2d38eaf1dd77 (patch)
tree7b068f5bcf02ea959ab3a49c49fbc1cdae979a26 /external
parent473263f9598d1cf880f421aae1b51eb0b6e3cf79 (diff)
downloadspark-184085284185011d7cc6d054b54d2d38eaf1dd77.tar.gz
spark-184085284185011d7cc6d054b54d2d38eaf1dd77.tar.bz2
spark-184085284185011d7cc6d054b54d2d38eaf1dd77.zip
[SPARK-13823][CORE][STREAMING][SQL] Always specify Charset in String <-> byte[] conversions (and remaining Coverity items)
## What changes were proposed in this pull request? - Fixes calls to `new String(byte[])` or `String.getBytes()` that rely on platform default encoding, to use UTF-8 - Same for `InputStreamReader` and `OutputStreamWriter` constructors - Standardizes on UTF-8 everywhere - Standardizes specifying the encoding with `StandardCharsets.UTF-8`, not the Guava constant or "UTF-8" (which means handling `UnuspportedEncodingException`) - (also addresses the other remaining Coverity scan issues, which are pretty trivial; these are separated into commit https://github.com/srowen/spark/commit/1deecd8d9ca986d8adb1a42d315890ce5349d29c ) ## How was this patch tested? Jenkins tests Author: Sean Owen <sowen@cloudera.com> Closes #11657 from srowen/SPARK-13823.
Diffstat (limited to 'external')
-rw-r--r--external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala4
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala4
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala5
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala4
-rw-r--r--external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala3
-rw-r--r--external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala3
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala4
-rw-r--r--external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala4
8 files changed, 19 insertions, 12 deletions
diff --git a/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala b/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
index 7f6cecf9cd..e8ca1e7163 100644
--- a/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
+++ b/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.streaming.flume.sink
import java.net.InetSocketAddress
+import java.nio.charset.StandardCharsets
import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
@@ -184,7 +185,8 @@ class SparkSinkSuite extends FunSuite {
private def putEvents(ch: MemoryChannel, count: Int): Unit = {
val tx = ch.getTransaction
tx.begin()
- (1 to count).foreach(x => ch.put(EventBuilder.withBody(x.toString.getBytes)))
+ (1 to count).foreach(x =>
+ ch.put(EventBuilder.withBody(x.toString.getBytes(StandardCharsets.UTF_8))))
tx.commit()
tx.close()
}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
index 3f87ce46e5..945cfa7295 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
@@ -19,12 +19,12 @@ package org.apache.spark.streaming.flume
import java.net.{InetSocketAddress, ServerSocket}
import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
import java.util.{List => JList}
import java.util.Collections
import scala.collection.JavaConverters._
-import com.google.common.base.Charsets.UTF_8
import org.apache.avro.ipc.NettyTransceiver
import org.apache.avro.ipc.specific.SpecificRequestor
import org.apache.commons.lang3.RandomUtils
@@ -65,7 +65,7 @@ private[flume] class FlumeTestUtils {
val inputEvents = input.asScala.map { item =>
val event = new AvroFlumeEvent
- event.setBody(ByteBuffer.wrap(item.getBytes(UTF_8)))
+ event.setBody(ByteBuffer.wrap(item.getBytes(StandardCharsets.UTF_8)))
event.setHeaders(Collections.singletonMap("test", "header"))
event
}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
index 9515d07c5e..1a96df6e94 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
@@ -17,12 +17,12 @@
package org.apache.spark.streaming.flume
+import java.nio.charset.StandardCharsets
import java.util.{Collections, List => JList, Map => JMap}
import java.util.concurrent._
import scala.collection.mutable.ArrayBuffer
-import com.google.common.base.Charsets.UTF_8
import org.apache.flume.event.EventBuilder
import org.apache.flume.Context
import org.apache.flume.channel.MemoryChannel
@@ -193,7 +193,8 @@ private[flume] class PollingFlumeTestUtils {
val tx = channel.getTransaction
tx.begin()
for (j <- 0 until eventsPerBatch) {
- channel.put(EventBuilder.withBody(s"${channel.getName}-$t".getBytes(UTF_8),
+ channel.put(EventBuilder.withBody(
+ s"${channel.getName}-$t".getBytes(StandardCharsets.UTF_8),
Collections.singletonMap(s"test-$t", "header")))
t += 1
}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index 0cb875c975..72d9053355 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -19,12 +19,12 @@ package org.apache.spark.streaming.kafka
import java.io.OutputStream
import java.lang.{Integer => JInt, Long => JLong}
+import java.nio.charset.StandardCharsets
import java.util.{List => JList, Map => JMap, Set => JSet}
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
-import com.google.common.base.Charsets.UTF_8
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.{Decoder, DefaultDecoder, StringDecoder}
@@ -787,7 +787,7 @@ private object KafkaUtilsPythonHelper {
def pickle(obj: Object, out: OutputStream, pickler: Pickler) {
if (obj == this) {
out.write(Opcodes.GLOBAL)
- out.write(s"$module\nKafkaMessageAndMetadata\n".getBytes(UTF_8))
+ out.write(s"$module\nKafkaMessageAndMetadata\n".getBytes(StandardCharsets.UTF_8))
} else {
pickler.save(this)
val msgAndMetaData = obj.asInstanceOf[PythonMessageAndMetadata]
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
index 0ace453ee9..026387ed65 100644
--- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
+++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
@@ -18,6 +18,7 @@
package org.apache.spark.streaming.kinesis
import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
@@ -242,7 +243,7 @@ private[kinesis] class SimpleDataGenerator(
val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]()
data.foreach { num =>
val str = num.toString
- val data = ByteBuffer.wrap(str.getBytes())
+ val data = ByteBuffer.wrap(str.getBytes(StandardCharsets.UTF_8))
val putRecordRequest = new PutRecordRequest().withStreamName(streamName)
.withData(data)
.withPartitionKey(str)
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
index fdb270eaad..0b455e574e 100644
--- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
+++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
@@ -17,6 +17,7 @@
package org.apache.spark.streaming.kinesis
import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
@@ -51,7 +52,7 @@ private[kinesis] class KPLDataGenerator(regionName: String) extends KinesisDataG
val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]()
data.foreach { num =>
val str = num.toString
- val data = ByteBuffer.wrap(str.getBytes())
+ val data = ByteBuffer.wrap(str.getBytes(StandardCharsets.UTF_8))
val future = producer.addUserRecord(streamName, str, data)
val kinesisCallBack = new FutureCallback[UserRecordResult]() {
override def onFailure(t: Throwable): Unit = {} // do nothing
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
index 079bd8a9a8..cbad6f7fe4 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
@@ -17,6 +17,8 @@
package org.apache.spark.streaming.mqtt
+import java.nio.charset.StandardCharsets
+
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
import org.eclipse.paho.client.mqttv3.MqttCallback
import org.eclipse.paho.client.mqttv3.MqttClient
@@ -75,7 +77,7 @@ class MQTTReceiver(
// Handles Mqtt message
override def messageArrived(topic: String, message: MqttMessage) {
- store(new String(message.getPayload(), "utf-8"))
+ store(new String(message.getPayload(), StandardCharsets.UTF_8))
}
override def deliveryComplete(token: IMqttDeliveryToken) {
diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala
index 26c6dc45d5..3680c13605 100644
--- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala
+++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala
@@ -18,10 +18,10 @@
package org.apache.spark.streaming.mqtt
import java.net.{ServerSocket, URI}
+import java.nio.charset.StandardCharsets
import scala.language.postfixOps
-import com.google.common.base.Charsets.UTF_8
import org.apache.activemq.broker.{BrokerService, TransportConnector}
import org.apache.commons.lang3.RandomUtils
import org.eclipse.paho.client.mqttv3._
@@ -85,7 +85,7 @@ private[mqtt] class MQTTTestUtils extends Logging {
client.connect()
if (client.isConnected) {
val msgTopic = client.getTopic(topic)
- val message = new MqttMessage(data.getBytes(UTF_8))
+ val message = new MqttMessage(data.getBytes(StandardCharsets.UTF_8))
message.setQos(1)
message.setRetained(true)