aboutsummaryrefslogtreecommitdiff
path: root/common/network-shuffle
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-03-13 21:03:49 -0700
committerReynold Xin <rxin@databricks.com>2016-03-13 21:03:49 -0700
commit184085284185011d7cc6d054b54d2d38eaf1dd77 (patch)
tree7b068f5bcf02ea959ab3a49c49fbc1cdae979a26 /common/network-shuffle
parent473263f9598d1cf880f421aae1b51eb0b6e3cf79 (diff)
downloadspark-184085284185011d7cc6d054b54d2d38eaf1dd77.tar.gz
spark-184085284185011d7cc6d054b54d2d38eaf1dd77.tar.bz2
spark-184085284185011d7cc6d054b54d2d38eaf1dd77.zip
[SPARK-13823][CORE][STREAMING][SQL] Always specify Charset in String <-> byte[] conversions (and remaining Coverity items)
## What changes were proposed in this pull request? - Fixes calls to `new String(byte[])` or `String.getBytes()` that rely on platform default encoding, to use UTF-8 - Same for `InputStreamReader` and `OutputStreamWriter` constructors - Standardizes on UTF-8 everywhere - Standardizes specifying the encoding with `StandardCharsets.UTF-8`, not the Guava constant or "UTF-8" (which means handling `UnuspportedEncodingException`) - (also addresses the other remaining Coverity scan issues, which are pretty trivial; these are separated into commit https://github.com/srowen/spark/commit/1deecd8d9ca986d8adb1a42d315890ce5349d29c ) ## How was this patch tested? Jenkins tests Author: Sean Owen <sowen@cloudera.com> Closes #11657 from srowen/SPARK-13823.
Diffstat (limited to 'common/network-shuffle')
-rw-r--r--common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java12
-rw-r--r--common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java36
-rw-r--r--common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java21
3 files changed, 40 insertions, 29 deletions
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
index fe933ed650..460110d78f 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
@@ -18,6 +18,7 @@
package org.apache.spark.network.shuffle;
import java.io.*;
+import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
@@ -27,7 +28,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Charsets;
import com.google.common.base.Objects;
import com.google.common.collect.Maps;
import org.fusesource.leveldbjni.JniDBFactory;
@@ -152,7 +152,7 @@ public class ExternalShuffleBlockResolver {
try {
if (db != null) {
byte[] key = dbAppExecKey(fullId);
- byte[] value = mapper.writeValueAsString(executorInfo).getBytes(Charsets.UTF_8);
+ byte[] value = mapper.writeValueAsString(executorInfo).getBytes(StandardCharsets.UTF_8);
db.put(key, value);
}
} catch (Exception e) {
@@ -350,7 +350,7 @@ public class ExternalShuffleBlockResolver {
// we stick a common prefix on all the keys so we can find them in the DB
String appExecJson = mapper.writeValueAsString(appExecId);
String key = (APP_KEY_PREFIX + ";" + appExecJson);
- return key.getBytes(Charsets.UTF_8);
+ return key.getBytes(StandardCharsets.UTF_8);
}
private static AppExecId parseDbAppExecKey(String s) throws IOException {
@@ -368,10 +368,10 @@ public class ExternalShuffleBlockResolver {
ConcurrentMap<AppExecId, ExecutorShuffleInfo> registeredExecutors = Maps.newConcurrentMap();
if (db != null) {
DBIterator itr = db.iterator();
- itr.seek(APP_KEY_PREFIX.getBytes(Charsets.UTF_8));
+ itr.seek(APP_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
while (itr.hasNext()) {
Map.Entry<byte[], byte[]> e = itr.next();
- String key = new String(e.getKey(), Charsets.UTF_8);
+ String key = new String(e.getKey(), StandardCharsets.UTF_8);
if (!key.startsWith(APP_KEY_PREFIX)) {
break;
}
@@ -418,7 +418,7 @@ public class ExternalShuffleBlockResolver {
public static class StoreVersion {
- static final byte[] KEY = "StoreVersion".getBytes(Charsets.UTF_8);
+ static final byte[] KEY = "StoreVersion".getBytes(StandardCharsets.UTF_8);
public final int major;
public final int minor;
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
index 60a1b8b045..d9b5f0261a 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
@@ -20,6 +20,7 @@ package org.apache.spark.network.shuffle;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.io.CharStreams;
@@ -34,15 +35,16 @@ import org.junit.Test;
import static org.junit.Assert.*;
public class ExternalShuffleBlockResolverSuite {
- static String sortBlock0 = "Hello!";
- static String sortBlock1 = "World!";
+ private static final String sortBlock0 = "Hello!";
+ private static final String sortBlock1 = "World!";
- static String hashBlock0 = "Elementary";
- static String hashBlock1 = "Tabular";
+ private static final String hashBlock0 = "Elementary";
+ private static final String hashBlock1 = "Tabular";
- static TestShuffleDataContext dataContext;
+ private static TestShuffleDataContext dataContext;
- static TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
+ private static final TransportConf conf =
+ new TransportConf("shuffle", new SystemPropertyConfigProvider());
@BeforeClass
public static void beforeAll() throws IOException {
@@ -50,10 +52,12 @@ public class ExternalShuffleBlockResolverSuite {
dataContext.create();
// Write some sort and hash data.
- dataContext.insertSortShuffleData(0, 0,
- new byte[][] { sortBlock0.getBytes(), sortBlock1.getBytes() } );
- dataContext.insertHashShuffleData(1, 0,
- new byte[][] { hashBlock0.getBytes(), hashBlock1.getBytes() } );
+ dataContext.insertSortShuffleData(0, 0, new byte[][] {
+ sortBlock0.getBytes(StandardCharsets.UTF_8),
+ sortBlock1.getBytes(StandardCharsets.UTF_8)});
+ dataContext.insertHashShuffleData(1, 0, new byte[][] {
+ hashBlock0.getBytes(StandardCharsets.UTF_8),
+ hashBlock1.getBytes(StandardCharsets.UTF_8)});
}
@AfterClass
@@ -100,13 +104,15 @@ public class ExternalShuffleBlockResolverSuite {
InputStream block0Stream =
resolver.getBlockData("app0", "exec0", "shuffle_0_0_0").createInputStream();
- String block0 = CharStreams.toString(new InputStreamReader(block0Stream));
+ String block0 = CharStreams.toString(
+ new InputStreamReader(block0Stream, StandardCharsets.UTF_8));
block0Stream.close();
assertEquals(sortBlock0, block0);
InputStream block1Stream =
resolver.getBlockData("app0", "exec0", "shuffle_0_0_1").createInputStream();
- String block1 = CharStreams.toString(new InputStreamReader(block1Stream));
+ String block1 = CharStreams.toString(
+ new InputStreamReader(block1Stream, StandardCharsets.UTF_8));
block1Stream.close();
assertEquals(sortBlock1, block1);
}
@@ -119,13 +125,15 @@ public class ExternalShuffleBlockResolverSuite {
InputStream block0Stream =
resolver.getBlockData("app0", "exec0", "shuffle_1_0_0").createInputStream();
- String block0 = CharStreams.toString(new InputStreamReader(block0Stream));
+ String block0 = CharStreams.toString(
+ new InputStreamReader(block0Stream, StandardCharsets.UTF_8));
block0Stream.close();
assertEquals(hashBlock0, block0);
InputStream block1Stream =
resolver.getBlockData("app0", "exec0", "shuffle_1_0_1").createInputStream();
- String block1 = CharStreams.toString(new InputStreamReader(block1Stream));
+ String block1 = CharStreams.toString(
+ new InputStreamReader(block1Stream, StandardCharsets.UTF_8));
block1Stream.close();
assertEquals(hashBlock1, block1);
}
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
index 532d7ab8d0..43d0201405 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
@@ -19,6 +19,7 @@ package org.apache.spark.network.shuffle;
import java.io.File;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -34,8 +35,8 @@ import org.apache.spark.network.util.TransportConf;
public class ExternalShuffleCleanupSuite {
// Same-thread Executor used to ensure cleanup happens synchronously in test thread.
- Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor();
- TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
+ private Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor();
+ private TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
@Test
public void noCleanupAndCleanup() throws IOException {
@@ -123,27 +124,29 @@ public class ExternalShuffleCleanupSuite {
assertCleanedUp(dataContext1);
}
- private void assertStillThere(TestShuffleDataContext dataContext) {
+ private static void assertStillThere(TestShuffleDataContext dataContext) {
for (String localDir : dataContext.localDirs) {
assertTrue(localDir + " was cleaned up prematurely", new File(localDir).exists());
}
}
- private void assertCleanedUp(TestShuffleDataContext dataContext) {
+ private static void assertCleanedUp(TestShuffleDataContext dataContext) {
for (String localDir : dataContext.localDirs) {
assertFalse(localDir + " wasn't cleaned up", new File(localDir).exists());
}
}
- private TestShuffleDataContext createSomeData() throws IOException {
+ private static TestShuffleDataContext createSomeData() throws IOException {
Random rand = new Random(123);
TestShuffleDataContext dataContext = new TestShuffleDataContext(10, 5);
dataContext.create();
- dataContext.insertSortShuffleData(rand.nextInt(1000), rand.nextInt(1000),
- new byte[][] { "ABC".getBytes(), "DEF".getBytes() } );
- dataContext.insertHashShuffleData(rand.nextInt(1000), rand.nextInt(1000) + 1000,
- new byte[][] { "GHI".getBytes(), "JKLMNOPQRSTUVWXYZ".getBytes() } );
+ dataContext.insertSortShuffleData(rand.nextInt(1000), rand.nextInt(1000), new byte[][] {
+ "ABC".getBytes(StandardCharsets.UTF_8),
+ "DEF".getBytes(StandardCharsets.UTF_8)});
+ dataContext.insertHashShuffleData(rand.nextInt(1000), rand.nextInt(1000) + 1000, new byte[][] {
+ "GHI".getBytes(StandardCharsets.UTF_8),
+ "JKLMNOPQRSTUVWXYZ".getBytes(StandardCharsets.UTF_8)});
return dataContext;
}
}