From 3b461d9ecd633c4fd659998b99e700d76f58d18a Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Wed, 16 Mar 2016 09:36:34 +0000 Subject: [SPARK-13823][SPARK-13397][SPARK-13395][CORE] More warnings, StandardCharset follow up ## What changes were proposed in this pull request? Follow up to https://github.com/apache/spark/pull/11657 - Also update `String.getBytes("UTF-8")` to use `StandardCharsets.UTF_8` - And fix one last new Coverity warning that turned up (use of unguarded `wait()` replaced by simpler/more robust `java.util.concurrent` classes in tests) - And while we're here cleaning up Coverity warnings, just fix about 15 more build warnings ## How was this patch tested? Jenkins tests Author: Sean Owen Closes #11725 from srowen/SPARK-13823.2. --- .../network/RequestTimeoutIntegrationSuite.java | 94 +++++++++------------- .../apache/spark/network/sasl/SparkSaslSuite.java | 13 ++- .../spark/network/sasl/SaslIntegrationSuite.java | 38 ++++----- common/sketch/pom.xml | 24 ++++++ .../java/org/apache/spark/util/sketch/Utils.java | 8 +- common/unsafe/pom.xml | 2 +- .../org/apache/spark/unsafe/types/UTF8String.java | 10 +-- .../apache/spark/unsafe/types/UTF8StringSuite.java | 8 +- 8 files changed, 93 insertions(+), 104 deletions(-) (limited to 'common') diff --git a/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java b/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java index c0ff9dc5f5..dd0171d1d1 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java @@ -36,6 +36,7 @@ import static org.junit.Assert.*; import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -93,7 +94,7 @@ public class RequestTimeoutIntegrationSuite { ByteBuffer message, RpcResponseCallback callback) { try { - semaphore.tryAcquire(FOREVER, TimeUnit.MILLISECONDS); + semaphore.acquire(); callback.onSuccess(ByteBuffer.allocate(responseSize)); } catch (InterruptedException e) { // do nothing @@ -113,20 +114,17 @@ public class RequestTimeoutIntegrationSuite { // First completes quickly (semaphore starts at 1). TestCallback callback0 = new TestCallback(); - synchronized (callback0) { - client.sendRpc(ByteBuffer.allocate(0), callback0); - callback0.wait(FOREVER); - assertEquals(responseSize, callback0.successLength); - } + client.sendRpc(ByteBuffer.allocate(0), callback0); + callback0.latch.await(); + assertEquals(responseSize, callback0.successLength); // Second times out after 2 seconds, with slack. Must be IOException. TestCallback callback1 = new TestCallback(); - synchronized (callback1) { - client.sendRpc(ByteBuffer.allocate(0), callback1); - callback1.wait(4 * 1000); - assertNotNull(callback1.failure); - assertTrue(callback1.failure instanceof IOException); - } + client.sendRpc(ByteBuffer.allocate(0), callback1); + callback1.latch.await(4, TimeUnit.SECONDS); + assertNotNull(callback1.failure); + assertTrue(callback1.failure instanceof IOException); + semaphore.release(); } @@ -143,7 +141,7 @@ public class RequestTimeoutIntegrationSuite { ByteBuffer message, RpcResponseCallback callback) { try { - semaphore.tryAcquire(FOREVER, TimeUnit.MILLISECONDS); + semaphore.acquire(); callback.onSuccess(ByteBuffer.allocate(responseSize)); } catch (InterruptedException e) { // do nothing @@ -164,24 +162,20 @@ public class RequestTimeoutIntegrationSuite { TransportClient client0 = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort()); TestCallback callback0 = new TestCallback(); - synchronized (callback0) { - client0.sendRpc(ByteBuffer.allocate(0), callback0); - callback0.wait(FOREVER); - assertTrue(callback0.failure instanceof IOException); - assertFalse(client0.isActive()); - } + client0.sendRpc(ByteBuffer.allocate(0), callback0); + callback0.latch.await(); + assertTrue(callback0.failure instanceof IOException); + assertFalse(client0.isActive()); // Increment the semaphore and the second request should succeed quickly. semaphore.release(2); TransportClient client1 = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort()); TestCallback callback1 = new TestCallback(); - synchronized (callback1) { - client1.sendRpc(ByteBuffer.allocate(0), callback1); - callback1.wait(FOREVER); - assertEquals(responseSize, callback1.successLength); - assertNull(callback1.failure); - } + client1.sendRpc(ByteBuffer.allocate(0), callback1); + callback1.latch.await(); + assertEquals(responseSize, callback1.successLength); + assertNull(callback1.failure); } // The timeout is relative to the LAST request sent, which is kinda weird, but still. @@ -226,18 +220,14 @@ public class RequestTimeoutIntegrationSuite { client.fetchChunk(0, 1, callback1); Uninterruptibles.sleepUninterruptibly(1200, TimeUnit.MILLISECONDS); - synchronized (callback0) { - // not complete yet, but should complete soon - assertEquals(-1, callback0.successLength); - assertNull(callback0.failure); - callback0.wait(2 * 1000); - assertTrue(callback0.failure instanceof IOException); - } + // not complete yet, but should complete soon + assertEquals(-1, callback0.successLength); + assertNull(callback0.failure); + callback0.latch.await(2, TimeUnit.SECONDS); + assertTrue(callback0.failure instanceof IOException); - synchronized (callback1) { - // failed at same time as previous - assertTrue(callback0.failure instanceof IOException); - } + // failed at same time as previous + assertTrue(callback1.failure instanceof IOException); } /** @@ -248,41 +238,35 @@ public class RequestTimeoutIntegrationSuite { int successLength = -1; Throwable failure; + final CountDownLatch latch = new CountDownLatch(1); @Override public void onSuccess(ByteBuffer response) { - synchronized(this) { - successLength = response.remaining(); - this.notifyAll(); - } + successLength = response.remaining(); + latch.countDown(); } @Override public void onFailure(Throwable e) { - synchronized(this) { - failure = e; - this.notifyAll(); - } + failure = e; + latch.countDown(); } @Override public void onSuccess(int chunkIndex, ManagedBuffer buffer) { - synchronized(this) { - try { - successLength = buffer.nioByteBuffer().remaining(); - this.notifyAll(); - } catch (IOException e) { - // weird - } + try { + successLength = buffer.nioByteBuffer().remaining(); + } catch (IOException e) { + // weird + } finally { + latch.countDown(); } } @Override public void onFailure(int chunkIndex, Throwable e) { - synchronized(this) { - failure = e; - this.notifyAll(); - } + failure = e; + latch.countDown(); } } } diff --git a/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java b/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java index 045773317a..45cc03df43 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java @@ -26,6 +26,7 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; import java.util.Random; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -276,7 +277,7 @@ public class SparkSaslSuite { ctx = new SaslTestCtx(rpcHandler, true, false); - final Object lock = new Object(); + final CountDownLatch lock = new CountDownLatch(1); ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class); doAnswer(new Answer() { @@ -284,17 +285,13 @@ public class SparkSaslSuite { public Void answer(InvocationOnMock invocation) { response.set((ManagedBuffer) invocation.getArguments()[1]); response.get().retain(); - synchronized (lock) { - lock.notifyAll(); - } + lock.countDown(); return null; } }).when(callback).onSuccess(anyInt(), any(ManagedBuffer.class)); - synchronized (lock) { - ctx.client.fetchChunk(0, 0, callback); - lock.wait(10 * 1000); - } + ctx.client.fetchChunk(0, 0, callback); + lock.await(10, TimeUnit.SECONDS); verify(callback, times(1)).onSuccess(anyInt(), any(ManagedBuffer.class)); verify(callback, never()).onFailure(anyInt(), any(Throwable.class)); diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java index 0ea631ea14..5322fcd781 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java @@ -20,6 +20,7 @@ package org.apache.spark.network.sasl; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; import com.google.common.collect.Lists; @@ -197,26 +198,23 @@ public class SaslIntegrationSuite { final AtomicReference exception = new AtomicReference<>(); + final CountDownLatch blockFetchLatch = new CountDownLatch(1); BlockFetchingListener listener = new BlockFetchingListener() { @Override - public synchronized void onBlockFetchSuccess(String blockId, ManagedBuffer data) { - notifyAll(); + public void onBlockFetchSuccess(String blockId, ManagedBuffer data) { + blockFetchLatch.countDown(); } - @Override - public synchronized void onBlockFetchFailure(String blockId, Throwable t) { + public void onBlockFetchFailure(String blockId, Throwable t) { exception.set(t); - notifyAll(); + blockFetchLatch.countDown(); } }; - String[] blockIds = new String[] { "shuffle_2_3_4", "shuffle_6_7_8" }; - OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(client1, "app-2", "0", - blockIds, listener); - synchronized (listener) { - fetcher.start(); - listener.wait(); - } + String[] blockIds = { "shuffle_2_3_4", "shuffle_6_7_8" }; + OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(client1, "app-2", "0", blockIds, listener); + fetcher.start(); + blockFetchLatch.await(); checkSecurityException(exception.get()); // Register an executor so that the next steps work. @@ -240,24 +238,22 @@ public class SaslIntegrationSuite { client2 = clientFactory2.createClient(TestUtils.getLocalHost(), blockServer.getPort()); + final CountDownLatch chunkReceivedLatch = new CountDownLatch(1); ChunkReceivedCallback callback = new ChunkReceivedCallback() { @Override - public synchronized void onSuccess(int chunkIndex, ManagedBuffer buffer) { - notifyAll(); + public void onSuccess(int chunkIndex, ManagedBuffer buffer) { + chunkReceivedLatch.countDown(); } - @Override - public synchronized void onFailure(int chunkIndex, Throwable t) { + public void onFailure(int chunkIndex, Throwable t) { exception.set(t); - notifyAll(); + chunkReceivedLatch.countDown(); } }; exception.set(null); - synchronized (callback) { - client2.fetchChunk(streamId, 0, callback); - callback.wait(); - } + client2.fetchChunk(streamId, 0, callback); + chunkReceivedLatch.await(); checkSecurityException(exception.get()); } finally { if (client1 != null) { diff --git a/common/sketch/pom.xml b/common/sketch/pom.xml index 442043cb51..8bc1f52798 100644 --- a/common/sketch/pom.xml +++ b/common/sketch/pom.xml @@ -45,5 +45,29 @@ target/scala-${scala.binary.version}/classes target/scala-${scala.binary.version}/test-classes + + + + net.alchim31.maven + scala-maven-plugin + + + + -XDignore.symbol.file + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + + -XDignore.symbol.file + + + + + diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/Utils.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/Utils.java index feb601d44f..81461f0300 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/Utils.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/Utils.java @@ -17,15 +17,11 @@ package org.apache.spark.util.sketch; -import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; class Utils { public static byte[] getBytesFromUTF8String(String str) { - try { - return str.getBytes("utf-8"); - } catch (UnsupportedEncodingException e) { - throw new IllegalArgumentException("Only support utf-8 string", e); - } + return str.getBytes(StandardCharsets.UTF_8); } public static long integralToLong(Object i) { diff --git a/common/unsafe/pom.xml b/common/unsafe/pom.xml index 5250014739..93b9580f26 100644 --- a/common/unsafe/pom.xml +++ b/common/unsafe/pom.xml @@ -98,7 +98,7 @@ org.apache.maven.plugins maven-compiler-plugin - + -XDignore.symbol.file diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java index e16166ade4..54a5456924 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java @@ -106,15 +106,7 @@ public final class UTF8String implements Comparable, Externalizable, * Creates an UTF8String from String. */ public static UTF8String fromString(String str) { - if (str == null) return null; - try { - return fromBytes(str.getBytes("utf-8")); - } catch (UnsupportedEncodingException e) { - // Turn the exception into unchecked so we can find out about it at runtime, but - // don't need to add lots of boilerplate code everywhere. - throwException(e); - return null; - } + return str == null ? null : fromBytes(str.getBytes(StandardCharsets.UTF_8)); } /** diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java index bef5d712cf..d4160ad029 100644 --- a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java +++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java @@ -17,7 +17,7 @@ package org.apache.spark.unsafe.types; -import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashMap; @@ -30,9 +30,9 @@ import static org.apache.spark.unsafe.types.UTF8String.*; public class UTF8StringSuite { - private static void checkBasic(String str, int len) throws UnsupportedEncodingException { + private static void checkBasic(String str, int len) { UTF8String s1 = fromString(str); - UTF8String s2 = fromBytes(str.getBytes("utf8")); + UTF8String s2 = fromBytes(str.getBytes(StandardCharsets.UTF_8)); assertEquals(s1.numChars(), len); assertEquals(s2.numChars(), len); @@ -51,7 +51,7 @@ public class UTF8StringSuite { } @Test - public void basicTest() throws UnsupportedEncodingException { + public void basicTest() { checkBasic("", 0); checkBasic("hello", 5); checkBasic("大 千 世 界", 7); -- cgit v1.2.3