aboutsummaryrefslogtreecommitdiff
path: root/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
diff options
context:
space:
mode:
Diffstat (limited to 'common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java')
-rw-r--r--common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java38
1 files changed, 17 insertions, 21 deletions
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
index 0ea631ea14..5322fcd781 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
@@ -20,6 +20,7 @@ package org.apache.spark.network.sasl;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.collect.Lists;
@@ -197,26 +198,23 @@ public class SaslIntegrationSuite {
final AtomicReference<Throwable> exception = new AtomicReference<>();
+ final CountDownLatch blockFetchLatch = new CountDownLatch(1);
BlockFetchingListener listener = new BlockFetchingListener() {
@Override
- public synchronized void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
- notifyAll();
+ public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
+ blockFetchLatch.countDown();
}
-
@Override
- public synchronized void onBlockFetchFailure(String blockId, Throwable t) {
+ public void onBlockFetchFailure(String blockId, Throwable t) {
exception.set(t);
- notifyAll();
+ blockFetchLatch.countDown();
}
};
- String[] blockIds = new String[] { "shuffle_2_3_4", "shuffle_6_7_8" };
- OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(client1, "app-2", "0",
- blockIds, listener);
- synchronized (listener) {
- fetcher.start();
- listener.wait();
- }
+ String[] blockIds = { "shuffle_2_3_4", "shuffle_6_7_8" };
+ OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(client1, "app-2", "0", blockIds, listener);
+ fetcher.start();
+ blockFetchLatch.await();
checkSecurityException(exception.get());
// Register an executor so that the next steps work.
@@ -240,24 +238,22 @@ public class SaslIntegrationSuite {
client2 = clientFactory2.createClient(TestUtils.getLocalHost(),
blockServer.getPort());
+ final CountDownLatch chunkReceivedLatch = new CountDownLatch(1);
ChunkReceivedCallback callback = new ChunkReceivedCallback() {
@Override
- public synchronized void onSuccess(int chunkIndex, ManagedBuffer buffer) {
- notifyAll();
+ public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
+ chunkReceivedLatch.countDown();
}
-
@Override
- public synchronized void onFailure(int chunkIndex, Throwable t) {
+ public void onFailure(int chunkIndex, Throwable t) {
exception.set(t);
- notifyAll();
+ chunkReceivedLatch.countDown();
}
};
exception.set(null);
- synchronized (callback) {
- client2.fetchChunk(streamId, 0, callback);
- callback.wait();
- }
+ client2.fetchChunk(streamId, 0, callback);
+ chunkReceivedLatch.await();
checkSecurityException(exception.get());
} finally {
if (client1 != null) {