aboutsummaryrefslogtreecommitdiff
path: root/common
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-03-16 09:36:34 +0000
committerSean Owen <sowen@cloudera.com>2016-03-16 09:36:34 +0000
commit3b461d9ecd633c4fd659998b99e700d76f58d18a (patch)
tree09e9923fc17bada794d01bc365f130039972a8b7 /common
parent05ab2948ab357fc07222bb3505df80b1886f7310 (diff)
downloadspark-3b461d9ecd633c4fd659998b99e700d76f58d18a.tar.gz
spark-3b461d9ecd633c4fd659998b99e700d76f58d18a.tar.bz2
spark-3b461d9ecd633c4fd659998b99e700d76f58d18a.zip
[SPARK-13823][SPARK-13397][SPARK-13395][CORE] More warnings, StandardCharset follow up
## What changes were proposed in this pull request? Follow up to https://github.com/apache/spark/pull/11657 - Also update `String.getBytes("UTF-8")` to use `StandardCharsets.UTF_8` - And fix one last new Coverity warning that turned up (use of unguarded `wait()` replaced by simpler/more robust `java.util.concurrent` classes in tests) - And while we're here cleaning up Coverity warnings, just fix about 15 more build warnings ## How was this patch tested? Jenkins tests Author: Sean Owen <sowen@cloudera.com> Closes #11725 from srowen/SPARK-13823.2.
Diffstat (limited to 'common')
-rw-r--r--common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java94
-rw-r--r--common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java13
-rw-r--r--common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java38
-rw-r--r--common/sketch/pom.xml24
-rw-r--r--common/sketch/src/main/java/org/apache/spark/util/sketch/Utils.java8
-rw-r--r--common/unsafe/pom.xml2
-rw-r--r--common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java10
-rw-r--r--common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java8
8 files changed, 93 insertions, 104 deletions
diff --git a/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java b/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java
index c0ff9dc5f5..dd0171d1d1 100644
--- a/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java
+++ b/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java
@@ -36,6 +36,7 @@ import static org.junit.Assert.*;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -93,7 +94,7 @@ public class RequestTimeoutIntegrationSuite {
ByteBuffer message,
RpcResponseCallback callback) {
try {
- semaphore.tryAcquire(FOREVER, TimeUnit.MILLISECONDS);
+ semaphore.acquire();
callback.onSuccess(ByteBuffer.allocate(responseSize));
} catch (InterruptedException e) {
// do nothing
@@ -113,20 +114,17 @@ public class RequestTimeoutIntegrationSuite {
// First completes quickly (semaphore starts at 1).
TestCallback callback0 = new TestCallback();
- synchronized (callback0) {
- client.sendRpc(ByteBuffer.allocate(0), callback0);
- callback0.wait(FOREVER);
- assertEquals(responseSize, callback0.successLength);
- }
+ client.sendRpc(ByteBuffer.allocate(0), callback0);
+ callback0.latch.await();
+ assertEquals(responseSize, callback0.successLength);
// Second times out after 2 seconds, with slack. Must be IOException.
TestCallback callback1 = new TestCallback();
- synchronized (callback1) {
- client.sendRpc(ByteBuffer.allocate(0), callback1);
- callback1.wait(4 * 1000);
- assertNotNull(callback1.failure);
- assertTrue(callback1.failure instanceof IOException);
- }
+ client.sendRpc(ByteBuffer.allocate(0), callback1);
+ callback1.latch.await(4, TimeUnit.SECONDS);
+ assertNotNull(callback1.failure);
+ assertTrue(callback1.failure instanceof IOException);
+
semaphore.release();
}
@@ -143,7 +141,7 @@ public class RequestTimeoutIntegrationSuite {
ByteBuffer message,
RpcResponseCallback callback) {
try {
- semaphore.tryAcquire(FOREVER, TimeUnit.MILLISECONDS);
+ semaphore.acquire();
callback.onSuccess(ByteBuffer.allocate(responseSize));
} catch (InterruptedException e) {
// do nothing
@@ -164,24 +162,20 @@ public class RequestTimeoutIntegrationSuite {
TransportClient client0 =
clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
TestCallback callback0 = new TestCallback();
- synchronized (callback0) {
- client0.sendRpc(ByteBuffer.allocate(0), callback0);
- callback0.wait(FOREVER);
- assertTrue(callback0.failure instanceof IOException);
- assertFalse(client0.isActive());
- }
+ client0.sendRpc(ByteBuffer.allocate(0), callback0);
+ callback0.latch.await();
+ assertTrue(callback0.failure instanceof IOException);
+ assertFalse(client0.isActive());
// Increment the semaphore and the second request should succeed quickly.
semaphore.release(2);
TransportClient client1 =
clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
TestCallback callback1 = new TestCallback();
- synchronized (callback1) {
- client1.sendRpc(ByteBuffer.allocate(0), callback1);
- callback1.wait(FOREVER);
- assertEquals(responseSize, callback1.successLength);
- assertNull(callback1.failure);
- }
+ client1.sendRpc(ByteBuffer.allocate(0), callback1);
+ callback1.latch.await();
+ assertEquals(responseSize, callback1.successLength);
+ assertNull(callback1.failure);
}
// The timeout is relative to the LAST request sent, which is kinda weird, but still.
@@ -226,18 +220,14 @@ public class RequestTimeoutIntegrationSuite {
client.fetchChunk(0, 1, callback1);
Uninterruptibles.sleepUninterruptibly(1200, TimeUnit.MILLISECONDS);
- synchronized (callback0) {
- // not complete yet, but should complete soon
- assertEquals(-1, callback0.successLength);
- assertNull(callback0.failure);
- callback0.wait(2 * 1000);
- assertTrue(callback0.failure instanceof IOException);
- }
+ // not complete yet, but should complete soon
+ assertEquals(-1, callback0.successLength);
+ assertNull(callback0.failure);
+ callback0.latch.await(2, TimeUnit.SECONDS);
+ assertTrue(callback0.failure instanceof IOException);
- synchronized (callback1) {
- // failed at same time as previous
- assertTrue(callback0.failure instanceof IOException);
- }
+ // failed at same time as previous
+ assertTrue(callback1.failure instanceof IOException);
}
/**
@@ -248,41 +238,35 @@ public class RequestTimeoutIntegrationSuite {
int successLength = -1;
Throwable failure;
+ final CountDownLatch latch = new CountDownLatch(1);
@Override
public void onSuccess(ByteBuffer response) {
- synchronized(this) {
- successLength = response.remaining();
- this.notifyAll();
- }
+ successLength = response.remaining();
+ latch.countDown();
}
@Override
public void onFailure(Throwable e) {
- synchronized(this) {
- failure = e;
- this.notifyAll();
- }
+ failure = e;
+ latch.countDown();
}
@Override
public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
- synchronized(this) {
- try {
- successLength = buffer.nioByteBuffer().remaining();
- this.notifyAll();
- } catch (IOException e) {
- // weird
- }
+ try {
+ successLength = buffer.nioByteBuffer().remaining();
+ } catch (IOException e) {
+ // weird
+ } finally {
+ latch.countDown();
}
}
@Override
public void onFailure(int chunkIndex, Throwable e) {
- synchronized(this) {
- failure = e;
- this.notifyAll();
- }
+ failure = e;
+ latch.countDown();
}
}
}
diff --git a/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java b/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
index 045773317a..45cc03df43 100644
--- a/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
+++ b/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -276,7 +277,7 @@ public class SparkSaslSuite {
ctx = new SaslTestCtx(rpcHandler, true, false);
- final Object lock = new Object();
+ final CountDownLatch lock = new CountDownLatch(1);
ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
doAnswer(new Answer<Void>() {
@@ -284,17 +285,13 @@ public class SparkSaslSuite {
public Void answer(InvocationOnMock invocation) {
response.set((ManagedBuffer) invocation.getArguments()[1]);
response.get().retain();
- synchronized (lock) {
- lock.notifyAll();
- }
+ lock.countDown();
return null;
}
}).when(callback).onSuccess(anyInt(), any(ManagedBuffer.class));
- synchronized (lock) {
- ctx.client.fetchChunk(0, 0, callback);
- lock.wait(10 * 1000);
- }
+ ctx.client.fetchChunk(0, 0, callback);
+ lock.await(10, TimeUnit.SECONDS);
verify(callback, times(1)).onSuccess(anyInt(), any(ManagedBuffer.class));
verify(callback, never()).onFailure(anyInt(), any(Throwable.class));
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
index 0ea631ea14..5322fcd781 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
@@ -20,6 +20,7 @@ package org.apache.spark.network.sasl;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.collect.Lists;
@@ -197,26 +198,23 @@ public class SaslIntegrationSuite {
final AtomicReference<Throwable> exception = new AtomicReference<>();
+ final CountDownLatch blockFetchLatch = new CountDownLatch(1);
BlockFetchingListener listener = new BlockFetchingListener() {
@Override
- public synchronized void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
- notifyAll();
+ public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
+ blockFetchLatch.countDown();
}
-
@Override
- public synchronized void onBlockFetchFailure(String blockId, Throwable t) {
+ public void onBlockFetchFailure(String blockId, Throwable t) {
exception.set(t);
- notifyAll();
+ blockFetchLatch.countDown();
}
};
- String[] blockIds = new String[] { "shuffle_2_3_4", "shuffle_6_7_8" };
- OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(client1, "app-2", "0",
- blockIds, listener);
- synchronized (listener) {
- fetcher.start();
- listener.wait();
- }
+ String[] blockIds = { "shuffle_2_3_4", "shuffle_6_7_8" };
+ OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(client1, "app-2", "0", blockIds, listener);
+ fetcher.start();
+ blockFetchLatch.await();
checkSecurityException(exception.get());
// Register an executor so that the next steps work.
@@ -240,24 +238,22 @@ public class SaslIntegrationSuite {
client2 = clientFactory2.createClient(TestUtils.getLocalHost(),
blockServer.getPort());
+ final CountDownLatch chunkReceivedLatch = new CountDownLatch(1);
ChunkReceivedCallback callback = new ChunkReceivedCallback() {
@Override
- public synchronized void onSuccess(int chunkIndex, ManagedBuffer buffer) {
- notifyAll();
+ public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
+ chunkReceivedLatch.countDown();
}
-
@Override
- public synchronized void onFailure(int chunkIndex, Throwable t) {
+ public void onFailure(int chunkIndex, Throwable t) {
exception.set(t);
- notifyAll();
+ chunkReceivedLatch.countDown();
}
};
exception.set(null);
- synchronized (callback) {
- client2.fetchChunk(streamId, 0, callback);
- callback.wait();
- }
+ client2.fetchChunk(streamId, 0, callback);
+ chunkReceivedLatch.await();
checkSecurityException(exception.get());
} finally {
if (client1 != null) {
diff --git a/common/sketch/pom.xml b/common/sketch/pom.xml
index 442043cb51..8bc1f52798 100644
--- a/common/sketch/pom.xml
+++ b/common/sketch/pom.xml
@@ -45,5 +45,29 @@
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <configuration>
+ <javacArgs combine.children="append">
+ <!-- This option is needed to suppress warnings from sun.misc.Unsafe usage -->
+ <javacArg>-XDignore.symbol.file</javacArg>
+ </javacArgs>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <compilerArgs combine.children="append">
+ <!-- This option is needed to suppress warnings from sun.misc.Unsafe usage -->
+ <arg>-XDignore.symbol.file</arg>
+ </compilerArgs>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
</build>
</project>
diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/Utils.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/Utils.java
index feb601d44f..81461f0300 100644
--- a/common/sketch/src/main/java/org/apache/spark/util/sketch/Utils.java
+++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/Utils.java
@@ -17,15 +17,11 @@
package org.apache.spark.util.sketch;
-import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
class Utils {
public static byte[] getBytesFromUTF8String(String str) {
- try {
- return str.getBytes("utf-8");
- } catch (UnsupportedEncodingException e) {
- throw new IllegalArgumentException("Only support utf-8 string", e);
- }
+ return str.getBytes(StandardCharsets.UTF_8);
}
public static long integralToLong(Object i) {
diff --git a/common/unsafe/pom.xml b/common/unsafe/pom.xml
index 5250014739..93b9580f26 100644
--- a/common/unsafe/pom.xml
+++ b/common/unsafe/pom.xml
@@ -98,7 +98,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
- <compilerArgs>
+ <compilerArgs combine.children="append">
<!-- This option is needed to suppress warnings from sun.misc.Unsafe usage -->
<arg>-XDignore.symbol.file</arg>
</compilerArgs>
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
index e16166ade4..54a5456924 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
@@ -106,15 +106,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
* Creates an UTF8String from String.
*/
public static UTF8String fromString(String str) {
- if (str == null) return null;
- try {
- return fromBytes(str.getBytes("utf-8"));
- } catch (UnsupportedEncodingException e) {
- // Turn the exception into unchecked so we can find out about it at runtime, but
- // don't need to add lots of boilerplate code everywhere.
- throwException(e);
- return null;
- }
+ return str == null ? null : fromBytes(str.getBytes(StandardCharsets.UTF_8));
}
/**
diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java
index bef5d712cf..d4160ad029 100644
--- a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java
+++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java
@@ -17,7 +17,7 @@
package org.apache.spark.unsafe.types;
-import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
@@ -30,9 +30,9 @@ import static org.apache.spark.unsafe.types.UTF8String.*;
public class UTF8StringSuite {
- private static void checkBasic(String str, int len) throws UnsupportedEncodingException {
+ private static void checkBasic(String str, int len) {
UTF8String s1 = fromString(str);
- UTF8String s2 = fromBytes(str.getBytes("utf8"));
+ UTF8String s2 = fromBytes(str.getBytes(StandardCharsets.UTF_8));
assertEquals(s1.numChars(), len);
assertEquals(s2.numChars(), len);
@@ -51,7 +51,7 @@ public class UTF8StringSuite {
}
@Test
- public void basicTest() throws UnsupportedEncodingException {
+ public void basicTest() {
checkBasic("", 0);
checkBasic("hello", 5);
checkBasic("大 千 世 界", 7);