aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-03-13 21:03:49 -0700
committerReynold Xin <rxin@databricks.com>2016-03-13 21:03:49 -0700
commit184085284185011d7cc6d054b54d2d38eaf1dd77 (patch)
tree7b068f5bcf02ea959ab3a49c49fbc1cdae979a26 /streaming/src
parent473263f9598d1cf880f421aae1b51eb0b6e3cf79 (diff)
downloadspark-184085284185011d7cc6d054b54d2d38eaf1dd77.tar.gz
spark-184085284185011d7cc6d054b54d2d38eaf1dd77.tar.bz2
spark-184085284185011d7cc6d054b54d2d38eaf1dd77.zip
[SPARK-13823][CORE][STREAMING][SQL] Always specify Charset in String <-> byte[] conversions (and remaining Coverity items)
## What changes were proposed in this pull request? - Fixes calls to `new String(byte[])` or `String.getBytes()` that rely on platform default encoding, to use UTF-8 - Same for `InputStreamReader` and `OutputStreamWriter` constructors - Standardizes on UTF-8 everywhere - Standardizes specifying the encoding with `StandardCharsets.UTF-8`, not the Guava constant or "UTF-8" (which means handling `UnuspportedEncodingException`) - (also addresses the other remaining Coverity scan issues, which are pretty trivial; these are separated into commit https://github.com/srowen/spark/commit/1deecd8d9ca986d8adb1a42d315890ce5349d29c ) ## How was this patch tested? Jenkins tests Author: Sean Owen <sowen@cloudera.com> Closes #11657 from srowen/SPARK-13823.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala4
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java7
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala10
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala4
6 files changed, 19 insertions, 14 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
index 4414774791..f7519c10c8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
@@ -19,6 +19,7 @@ package org.apache.spark.streaming.dstream
import java.io._
import java.net.{ConnectException, Socket}
+import java.nio.charset.StandardCharsets
import scala.reflect.ClassTag
import scala.util.control.NonFatal
@@ -113,7 +114,8 @@ object SocketReceiver {
* to '\n' delimited strings and returns an iterator to access the strings.
*/
def bytesToLines(inputStream: InputStream): Iterator[String] = {
- val dataInputStream = new BufferedReader(new InputStreamReader(inputStream, "UTF-8"))
+ val dataInputStream = new BufferedReader(
+ new InputStreamReader(inputStream, StandardCharsets.UTF_8))
new NextIterator[String] {
protected override def getNext() = {
val nextValue = dataInputStream.readLine()
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 806cea24ca..66448fd400 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -18,7 +18,7 @@
package org.apache.spark.streaming;
import java.io.*;
-import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -1866,7 +1866,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
@Override
public Iterable<String> call(InputStream in) throws IOException {
List<String> out = new ArrayList<>();
- try (BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
+ try (BufferedReader reader = new BufferedReader(
+ new InputStreamReader(in, StandardCharsets.UTF_8))) {
for (String line; (line = reader.readLine()) != null;) {
out.add(line);
}
@@ -1930,7 +1931,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
private static List<List<String>> fileTestPrepare(File testDir) throws IOException {
File existingFile = new File(testDir, "0");
- Files.write("0\n", existingFile, Charset.forName("UTF-8"));
+ Files.write("0\n", existingFile, StandardCharsets.UTF_8);
Assert.assertTrue(existingFile.setLastModified(1000));
Assert.assertEquals(1000, existingFile.lastModified());
return Arrays.asList(Arrays.asList("0"));
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
index d09258e0e4..091ccbfd85 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
@@ -38,6 +38,7 @@ import java.io.InputStreamReader;
import java.io.Serializable;
import java.net.ConnectException;
import java.net.Socket;
+import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicLong;
public class JavaReceiverAPISuite implements Serializable {
@@ -126,7 +127,8 @@ public class JavaReceiverAPISuite implements Serializable {
BufferedReader in = null;
try {
socket = new Socket(host, port);
- in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+ in = new BufferedReader(
+ new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
String userInput;
while ((userInput = in.readLine()) != null) {
store(userInput);
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index ca716cf4e6..9a3248b3e8 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -18,12 +18,12 @@
package org.apache.spark.streaming
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, ObjectOutputStream}
+import java.nio.charset.StandardCharsets
import java.util.concurrent.ConcurrentLinkedQueue
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
-import com.google.common.base.Charsets
import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
@@ -609,7 +609,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
*/
def writeFile(i: Int, clock: Clock): Unit = {
val file = new File(testDir, i.toString)
- Files.write(i + "\n", file, Charsets.UTF_8)
+ Files.write(i + "\n", file, StandardCharsets.UTF_8)
assert(file.setLastModified(clock.getTimeMillis()))
// Check that the file's modification date is actually the value we wrote, since rounding or
// truncation will break the test:
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index fa17b3a15c..cc2a67187e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.streaming
import java.io.{BufferedWriter, File, OutputStreamWriter}
import java.net.{ServerSocket, Socket, SocketException}
-import java.nio.charset.Charset
+import java.nio.charset.StandardCharsets
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
@@ -146,7 +146,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val testDir = Utils.createTempDir()
// Create a file that exists before the StreamingContext is created:
val existingFile = new File(testDir, "0")
- Files.write("0\n", existingFile, Charset.forName("UTF-8"))
+ Files.write("0\n", existingFile, StandardCharsets.UTF_8)
assert(existingFile.setLastModified(10000) && existingFile.lastModified === 10000)
// Set up the streaming context and input streams
@@ -369,7 +369,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val testDir = Utils.createTempDir()
// Create a file that exists before the StreamingContext is created:
val existingFile = new File(testDir, "0")
- Files.write("0\n", existingFile, Charset.forName("UTF-8"))
+ Files.write("0\n", existingFile, StandardCharsets.UTF_8)
assert(existingFile.setLastModified(10000) && existingFile.lastModified === 10000)
// Set up the streaming context and input streams
@@ -393,7 +393,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val input = Seq(1, 2, 3, 4, 5)
input.foreach { i =>
val file = new File(testDir, i.toString)
- Files.write(i + "\n", file, Charset.forName("UTF-8"))
+ Files.write(i + "\n", file, StandardCharsets.UTF_8)
assert(file.setLastModified(clock.getTimeMillis()))
assert(file.lastModified === clock.getTimeMillis())
logInfo("Created file " + file)
@@ -448,7 +448,7 @@ class TestServer(portToBind: Int = 0) extends Logging {
try {
clientSocket.setTcpNoDelay(true)
val outputStream = new BufferedWriter(
- new OutputStreamWriter(clientSocket.getOutputStream))
+ new OutputStreamWriter(clientSocket.getOutputStream, StandardCharsets.UTF_8))
while (clientSocket.isConnected) {
val msg = queue.poll(100, TimeUnit.MILLISECONDS)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
index faa9c4f0cb..6406d53f89 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
@@ -18,7 +18,7 @@
package org.apache.spark.streaming
import java.io.{File, IOException}
-import java.nio.charset.Charset
+import java.nio.charset.StandardCharsets
import java.util.UUID
import scala.collection.JavaConverters._
@@ -371,7 +371,7 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
val localFile = new File(localTestDir, (i + 1).toString)
val hadoopFile = new Path(testDir, (i + 1).toString)
val tempHadoopFile = new Path(testDir, ".tmp_" + (i + 1).toString)
- Files.write(input(i) + "\n", localFile, Charset.forName("UTF-8"))
+ Files.write(input(i) + "\n", localFile, StandardCharsets.UTF_8)
var tries = 0
var done = false
while (!done && tries < maxTries) {