aboutsummaryrefslogtreecommitdiff
path: root/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-03-16 09:36:34 +0000
committerSean Owen <sowen@cloudera.com>2016-03-16 09:36:34 +0000
commit3b461d9ecd633c4fd659998b99e700d76f58d18a (patch)
tree09e9923fc17bada794d01bc365f130039972a8b7 /common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
parent05ab2948ab357fc07222bb3505df80b1886f7310 (diff)
downloadspark-3b461d9ecd633c4fd659998b99e700d76f58d18a.tar.gz
spark-3b461d9ecd633c4fd659998b99e700d76f58d18a.tar.bz2
spark-3b461d9ecd633c4fd659998b99e700d76f58d18a.zip
[SPARK-13823][SPARK-13397][SPARK-13395][CORE] More warnings, StandardCharset follow up
## What changes were proposed in this pull request? Follow up to https://github.com/apache/spark/pull/11657 - Also update `String.getBytes("UTF-8")` to use `StandardCharsets.UTF_8` - And fix one last new Coverity warning that turned up (use of unguarded `wait()` replaced by simpler/more robust `java.util.concurrent` classes in tests) - And while we're here cleaning up Coverity warnings, just fix about 15 more build warnings ## How was this patch tested? Jenkins tests Author: Sean Owen <sowen@cloudera.com> Closes #11725 from srowen/SPARK-13823.2.
Diffstat (limited to 'common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java')
-rw-r--r--common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java38
1 files changed, 17 insertions, 21 deletions
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
index 0ea631ea14..5322fcd781 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
@@ -20,6 +20,7 @@ package org.apache.spark.network.sasl;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.collect.Lists;
@@ -197,26 +198,23 @@ public class SaslIntegrationSuite {
final AtomicReference<Throwable> exception = new AtomicReference<>();
+ final CountDownLatch blockFetchLatch = new CountDownLatch(1);
BlockFetchingListener listener = new BlockFetchingListener() {
@Override
- public synchronized void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
- notifyAll();
+ public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
+ blockFetchLatch.countDown();
}
-
@Override
- public synchronized void onBlockFetchFailure(String blockId, Throwable t) {
+ public void onBlockFetchFailure(String blockId, Throwable t) {
exception.set(t);
- notifyAll();
+ blockFetchLatch.countDown();
}
};
- String[] blockIds = new String[] { "shuffle_2_3_4", "shuffle_6_7_8" };
- OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(client1, "app-2", "0",
- blockIds, listener);
- synchronized (listener) {
- fetcher.start();
- listener.wait();
- }
+ String[] blockIds = { "shuffle_2_3_4", "shuffle_6_7_8" };
+ OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(client1, "app-2", "0", blockIds, listener);
+ fetcher.start();
+ blockFetchLatch.await();
checkSecurityException(exception.get());
// Register an executor so that the next steps work.
@@ -240,24 +238,22 @@ public class SaslIntegrationSuite {
client2 = clientFactory2.createClient(TestUtils.getLocalHost(),
blockServer.getPort());
+ final CountDownLatch chunkReceivedLatch = new CountDownLatch(1);
ChunkReceivedCallback callback = new ChunkReceivedCallback() {
@Override
- public synchronized void onSuccess(int chunkIndex, ManagedBuffer buffer) {
- notifyAll();
+ public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
+ chunkReceivedLatch.countDown();
}
-
@Override
- public synchronized void onFailure(int chunkIndex, Throwable t) {
+ public void onFailure(int chunkIndex, Throwable t) {
exception.set(t);
- notifyAll();
+ chunkReceivedLatch.countDown();
}
};
exception.set(null);
- synchronized (callback) {
- client2.fetchChunk(streamId, 0, callback);
- callback.wait();
- }
+ client2.fetchChunk(streamId, 0, callback);
+ chunkReceivedLatch.await();
checkSecurityException(exception.get());
} finally {
if (client1 != null) {