aboutsummaryrefslogtreecommitdiff
path: root/common/network-shuffle
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2017-02-19 09:42:50 -0800
committerSean Owen <sowen@cloudera.com>2017-02-19 09:42:50 -0800
commit1487c9af20a333ead55955acf4c0aa323bea0d07 (patch)
tree5f47daa77e0f73da1e009cc3dcf0a5c0073246aa /common/network-shuffle
parentde14d35f77071932963a994fac5aec0e5df838a1 (diff)
downloadspark-1487c9af20a333ead55955acf4c0aa323bea0d07.tar.gz
spark-1487c9af20a333ead55955acf4c0aa323bea0d07.tar.bz2
spark-1487c9af20a333ead55955acf4c0aa323bea0d07.zip
[SPARK-19534][TESTS] Convert Java tests to use lambdas, Java 8 features
## What changes were proposed in this pull request? Convert tests to use Java 8 lambdas, and modest related fixes to surrounding code. ## How was this patch tested? Jenkins tests Author: Sean Owen <sowen@cloudera.com> Closes #16964 from srowen/SPARK-19534.
Diffstat (limited to 'common/network-shuffle')
-rw-r--r--common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java34
-rw-r--r--common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java2
-rw-r--r--common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java6
-rw-r--r--common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java13
-rw-r--r--common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java78
-rw-r--r--common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java64
6 files changed, 87 insertions, 110 deletions
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
index 52f50a3409..c0e170e5b9 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
@@ -19,11 +19,11 @@ package org.apache.spark.network.sasl;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
-import com.google.common.collect.Lists;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -38,7 +38,6 @@ import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.client.ChunkReceivedCallback;
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.TransportClient;
-import org.apache.spark.network.client.TransportClientBootstrap;
import org.apache.spark.network.client.TransportClientFactory;
import org.apache.spark.network.server.OneForOneStreamManager;
import org.apache.spark.network.server.RpcHandler;
@@ -105,8 +104,7 @@ public class SaslIntegrationSuite {
@Test
public void testGoodClient() throws IOException, InterruptedException {
clientFactory = context.createClientFactory(
- Lists.<TransportClientBootstrap>newArrayList(
- new SaslClientBootstrap(conf, "app-1", secretKeyHolder)));
+ Arrays.asList(new SaslClientBootstrap(conf, "app-1", secretKeyHolder)));
TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
String msg = "Hello, World!";
@@ -120,8 +118,7 @@ public class SaslIntegrationSuite {
when(badKeyHolder.getSaslUser(anyString())).thenReturn("other-app");
when(badKeyHolder.getSecretKey(anyString())).thenReturn("wrong-password");
clientFactory = context.createClientFactory(
- Lists.<TransportClientBootstrap>newArrayList(
- new SaslClientBootstrap(conf, "unknown-app", badKeyHolder)));
+ Arrays.asList(new SaslClientBootstrap(conf, "unknown-app", badKeyHolder)));
try {
// Bootstrap should fail on startup.
@@ -134,8 +131,7 @@ public class SaslIntegrationSuite {
@Test
public void testNoSaslClient() throws IOException, InterruptedException {
- clientFactory = context.createClientFactory(
- Lists.<TransportClientBootstrap>newArrayList());
+ clientFactory = context.createClientFactory(new ArrayList<>());
TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
try {
@@ -159,15 +155,11 @@ public class SaslIntegrationSuite {
RpcHandler handler = new TestRpcHandler();
TransportContext context = new TransportContext(conf, handler);
clientFactory = context.createClientFactory(
- Lists.<TransportClientBootstrap>newArrayList(
- new SaslClientBootstrap(conf, "app-1", secretKeyHolder)));
- TransportServer server = context.createServer();
- try {
+ Arrays.asList(new SaslClientBootstrap(conf, "app-1", secretKeyHolder)));
+ try (TransportServer server = context.createServer()) {
clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
} catch (Exception e) {
assertTrue(e.getMessage(), e.getMessage().contains("Digest-challenge format violation"));
- } finally {
- server.close();
}
}
@@ -191,14 +183,13 @@ public class SaslIntegrationSuite {
try {
// Create a client, and make a request to fetch blocks from a different app.
clientFactory = blockServerContext.createClientFactory(
- Lists.<TransportClientBootstrap>newArrayList(
- new SaslClientBootstrap(conf, "app-1", secretKeyHolder)));
+ Arrays.asList(new SaslClientBootstrap(conf, "app-1", secretKeyHolder)));
client1 = clientFactory.createClient(TestUtils.getLocalHost(),
blockServer.getPort());
- final AtomicReference<Throwable> exception = new AtomicReference<>();
+ AtomicReference<Throwable> exception = new AtomicReference<>();
- final CountDownLatch blockFetchLatch = new CountDownLatch(1);
+ CountDownLatch blockFetchLatch = new CountDownLatch(1);
BlockFetchingListener listener = new BlockFetchingListener() {
@Override
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
@@ -235,12 +226,11 @@ public class SaslIntegrationSuite {
// Create a second client, authenticated with a different app ID, and try to read from
// the stream created for the previous app.
clientFactory2 = blockServerContext.createClientFactory(
- Lists.<TransportClientBootstrap>newArrayList(
- new SaslClientBootstrap(conf, "app-2", secretKeyHolder)));
+ Arrays.asList(new SaslClientBootstrap(conf, "app-2", secretKeyHolder)));
client2 = clientFactory2.createClient(TestUtils.getLocalHost(),
blockServer.getPort());
- final CountDownLatch chunkReceivedLatch = new CountDownLatch(1);
+ CountDownLatch chunkReceivedLatch = new CountDownLatch(1);
ChunkReceivedCallback callback = new ChunkReceivedCallback() {
@Override
public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
@@ -284,7 +274,7 @@ public class SaslIntegrationSuite {
}
}
- private void checkSecurityException(Throwable t) {
+ private static void checkSecurityException(Throwable t) {
assertNotNull("No exception was caught.", t);
assertTrue("Expected SecurityException.",
t.getMessage().contains(SecurityException.class.getName()));
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
index c036bc2e8d..e47a72c9d1 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
@@ -93,7 +93,7 @@ public class ExternalShuffleBlockHandlerSuite {
ArgumentCaptor<ByteBuffer> response = ArgumentCaptor.forClass(ByteBuffer.class);
verify(callback, times(1)).onSuccess(response.capture());
- verify(callback, never()).onFailure((Throwable) any());
+ verify(callback, never()).onFailure(any());
StreamHandle handle =
(StreamHandle) BlockTransferMessage.Decoder.fromByteBuffer(response.getValue());
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
index 7757500b41..47c087088a 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
@@ -60,12 +60,10 @@ public class ExternalShuffleCleanupSuite {
public void cleanupUsesExecutor() throws IOException {
TestShuffleDataContext dataContext = createSomeData();
- final AtomicBoolean cleanupCalled = new AtomicBoolean(false);
+ AtomicBoolean cleanupCalled = new AtomicBoolean(false);
// Executor which does nothing to ensure we're actually using it.
- Executor noThreadExecutor = new Executor() {
- @Override public void execute(Runnable runnable) { cleanupCalled.set(true); }
- };
+ Executor noThreadExecutor = runnable -> cleanupCalled.set(true);
ExternalShuffleBlockResolver manager =
new ExternalShuffleBlockResolver(conf, null, noThreadExecutor);
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
index 88de6fb83c..b8ae04eefb 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
@@ -19,6 +19,7 @@ package org.apache.spark.network.shuffle;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
@@ -29,7 +30,6 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.junit.After;
import org.junit.AfterClass;
@@ -173,7 +173,7 @@ public class ExternalShuffleIntegrationSuite {
FetchResult exec0Fetch = fetchBlocks("exec-0", new String[] { "shuffle_0_0_0" });
assertEquals(Sets.newHashSet("shuffle_0_0_0"), exec0Fetch.successBlocks);
assertTrue(exec0Fetch.failedBlocks.isEmpty());
- assertBufferListsEqual(exec0Fetch.buffers, Lists.newArrayList(exec0Blocks[0]));
+ assertBufferListsEqual(exec0Fetch.buffers, Arrays.asList(exec0Blocks[0]));
exec0Fetch.releaseBuffers();
}
@@ -185,7 +185,7 @@ public class ExternalShuffleIntegrationSuite {
assertEquals(Sets.newHashSet("shuffle_0_0_0", "shuffle_0_0_1", "shuffle_0_0_2"),
exec0Fetch.successBlocks);
assertTrue(exec0Fetch.failedBlocks.isEmpty());
- assertBufferListsEqual(exec0Fetch.buffers, Lists.newArrayList(exec0Blocks));
+ assertBufferListsEqual(exec0Fetch.buffers, Arrays.asList(exec0Blocks));
exec0Fetch.releaseBuffers();
}
@@ -241,7 +241,7 @@ public class ExternalShuffleIntegrationSuite {
assertEquals(Sets.newHashSet("shuffle_1_0_0", "shuffle_1_0_1"), execFetch.failedBlocks);
}
- private void registerExecutor(String executorId, ExecutorShuffleInfo executorInfo)
+ private static void registerExecutor(String executorId, ExecutorShuffleInfo executorInfo)
throws IOException, InterruptedException {
ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false);
client.init(APP_ID);
@@ -249,7 +249,7 @@ public class ExternalShuffleIntegrationSuite {
executorId, executorInfo);
}
- private void assertBufferListsEqual(List<ManagedBuffer> list0, List<byte[]> list1)
+ private static void assertBufferListsEqual(List<ManagedBuffer> list0, List<byte[]> list1)
throws Exception {
assertEquals(list0.size(), list1.size());
for (int i = 0; i < list0.size(); i ++) {
@@ -257,7 +257,8 @@ public class ExternalShuffleIntegrationSuite {
}
}
- private void assertBuffersEqual(ManagedBuffer buffer0, ManagedBuffer buffer1) throws Exception {
+ private static void assertBuffersEqual(ManagedBuffer buffer0, ManagedBuffer buffer1)
+ throws Exception {
ByteBuffer nio0 = buffer0.nioByteBuffer();
ByteBuffer nio1 = buffer1.nioByteBuffer();
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java
index 2590b9ce4c..3e51fea3cf 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java
@@ -25,8 +25,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.Maps;
import io.netty.buffer.Unpooled;
import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -85,8 +83,8 @@ public class OneForOneBlockFetcherSuite {
// Each failure will cause a failure to be invoked in all remaining block fetches.
verify(listener, times(1)).onBlockFetchSuccess("b0", blocks.get("b0"));
- verify(listener, times(1)).onBlockFetchFailure(eq("b1"), (Throwable) any());
- verify(listener, times(2)).onBlockFetchFailure(eq("b2"), (Throwable) any());
+ verify(listener, times(1)).onBlockFetchFailure(eq("b1"), any());
+ verify(listener, times(2)).onBlockFetchFailure(eq("b2"), any());
}
@Test
@@ -100,15 +98,15 @@ public class OneForOneBlockFetcherSuite {
// We may call both success and failure for the same block.
verify(listener, times(1)).onBlockFetchSuccess("b0", blocks.get("b0"));
- verify(listener, times(1)).onBlockFetchFailure(eq("b1"), (Throwable) any());
+ verify(listener, times(1)).onBlockFetchFailure(eq("b1"), any());
verify(listener, times(1)).onBlockFetchSuccess("b2", blocks.get("b2"));
- verify(listener, times(1)).onBlockFetchFailure(eq("b2"), (Throwable) any());
+ verify(listener, times(1)).onBlockFetchFailure(eq("b2"), any());
}
@Test
public void testEmptyBlockFetch() {
try {
- fetchBlocks(Maps.<String, ManagedBuffer>newLinkedHashMap());
+ fetchBlocks(Maps.newLinkedHashMap());
fail();
} catch (IllegalArgumentException e) {
assertEquals("Zero-sized blockIds array", e.getMessage());
@@ -123,52 +121,46 @@ public class OneForOneBlockFetcherSuite {
*
* If a block's buffer is "null", an exception will be thrown instead.
*/
- private BlockFetchingListener fetchBlocks(final LinkedHashMap<String, ManagedBuffer> blocks) {
+ private static BlockFetchingListener fetchBlocks(LinkedHashMap<String, ManagedBuffer> blocks) {
TransportClient client = mock(TransportClient.class);
BlockFetchingListener listener = mock(BlockFetchingListener.class);
- final String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]);
+ String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]);
OneForOneBlockFetcher fetcher =
new OneForOneBlockFetcher(client, "app-id", "exec-id", blockIds, listener);
- // Respond to the "OpenBlocks" message with an appropirate ShuffleStreamHandle with streamId 123
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
- BlockTransferMessage message = BlockTransferMessage.Decoder.fromByteBuffer(
- (ByteBuffer) invocationOnMock.getArguments()[0]);
- RpcResponseCallback callback = (RpcResponseCallback) invocationOnMock.getArguments()[1];
- callback.onSuccess(new StreamHandle(123, blocks.size()).toByteBuffer());
- assertEquals(new OpenBlocks("app-id", "exec-id", blockIds), message);
- return null;
- }
+ // Respond to the "OpenBlocks" message with an appropriate ShuffleStreamHandle with streamId 123
+ doAnswer(invocationOnMock -> {
+ BlockTransferMessage message = BlockTransferMessage.Decoder.fromByteBuffer(
+ (ByteBuffer) invocationOnMock.getArguments()[0]);
+ RpcResponseCallback callback = (RpcResponseCallback) invocationOnMock.getArguments()[1];
+ callback.onSuccess(new StreamHandle(123, blocks.size()).toByteBuffer());
+ assertEquals(new OpenBlocks("app-id", "exec-id", blockIds), message);
+ return null;
}).when(client).sendRpc(any(ByteBuffer.class), any(RpcResponseCallback.class));
// Respond to each chunk request with a single buffer from our blocks array.
- final AtomicInteger expectedChunkIndex = new AtomicInteger(0);
- final Iterator<ManagedBuffer> blockIterator = blocks.values().iterator();
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
- try {
- long streamId = (Long) invocation.getArguments()[0];
- int myChunkIndex = (Integer) invocation.getArguments()[1];
- assertEquals(123, streamId);
- assertEquals(expectedChunkIndex.getAndIncrement(), myChunkIndex);
-
- ChunkReceivedCallback callback = (ChunkReceivedCallback) invocation.getArguments()[2];
- ManagedBuffer result = blockIterator.next();
- if (result != null) {
- callback.onSuccess(myChunkIndex, result);
- } else {
- callback.onFailure(myChunkIndex, new RuntimeException("Failed " + myChunkIndex));
- }
- } catch (Exception e) {
- e.printStackTrace();
- fail("Unexpected failure");
+ AtomicInteger expectedChunkIndex = new AtomicInteger(0);
+ Iterator<ManagedBuffer> blockIterator = blocks.values().iterator();
+ doAnswer(invocation -> {
+ try {
+ long streamId = (Long) invocation.getArguments()[0];
+ int myChunkIndex = (Integer) invocation.getArguments()[1];
+ assertEquals(123, streamId);
+ assertEquals(expectedChunkIndex.getAndIncrement(), myChunkIndex);
+
+ ChunkReceivedCallback callback = (ChunkReceivedCallback) invocation.getArguments()[2];
+ ManagedBuffer result = blockIterator.next();
+ if (result != null) {
+ callback.onSuccess(myChunkIndex, result);
+ } else {
+ callback.onFailure(myChunkIndex, new RuntimeException("Failed " + myChunkIndex));
}
- return null;
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Unexpected failure");
}
- }).when(client).fetchChunk(anyLong(), anyInt(), (ChunkReceivedCallback) any());
+ return null;
+ }).when(client).fetchChunk(anyLong(), anyInt(), any());
fetcher.start();
return listener;
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
index 6db71eea6e..a530e16734 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
@@ -28,7 +28,6 @@ import java.util.Map;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.stubbing.Stubber;
@@ -84,7 +83,7 @@ public class RetryingBlockFetcherSuite {
performInteractions(interactions, listener);
- verify(listener).onBlockFetchFailure(eq("b0"), (Throwable) any());
+ verify(listener).onBlockFetchFailure(eq("b0"), any());
verify(listener).onBlockFetchSuccess("b1", block1);
verifyNoMoreInteractions(listener);
}
@@ -190,7 +189,7 @@ public class RetryingBlockFetcherSuite {
performInteractions(interactions, listener);
verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
- verify(listener, timeout(5000)).onBlockFetchFailure(eq("b1"), (Throwable) any());
+ verify(listener, timeout(5000)).onBlockFetchFailure(eq("b1"), any());
verifyNoMoreInteractions(listener);
}
@@ -220,7 +219,7 @@ public class RetryingBlockFetcherSuite {
performInteractions(interactions, listener);
verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
- verify(listener, timeout(5000)).onBlockFetchFailure(eq("b1"), (Throwable) any());
+ verify(listener, timeout(5000)).onBlockFetchFailure(eq("b1"), any());
verify(listener, timeout(5000)).onBlockFetchSuccess("b2", block2);
verifyNoMoreInteractions(listener);
}
@@ -249,40 +248,37 @@ public class RetryingBlockFetcherSuite {
Stubber stub = null;
// Contains all blockIds that are referenced across all interactions.
- final LinkedHashSet<String> blockIds = Sets.newLinkedHashSet();
+ LinkedHashSet<String> blockIds = Sets.newLinkedHashSet();
- for (final Map<String, Object> interaction : interactions) {
+ for (Map<String, Object> interaction : interactions) {
blockIds.addAll(interaction.keySet());
- Answer<Void> answer = new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
- try {
- // Verify that the RetryingBlockFetcher requested the expected blocks.
- String[] requestedBlockIds = (String[]) invocationOnMock.getArguments()[0];
- String[] desiredBlockIds = interaction.keySet().toArray(new String[interaction.size()]);
- assertArrayEquals(desiredBlockIds, requestedBlockIds);
-
- // Now actually invoke the success/failure callbacks on each block.
- BlockFetchingListener retryListener =
- (BlockFetchingListener) invocationOnMock.getArguments()[1];
- for (Map.Entry<String, Object> block : interaction.entrySet()) {
- String blockId = block.getKey();
- Object blockValue = block.getValue();
-
- if (blockValue instanceof ManagedBuffer) {
- retryListener.onBlockFetchSuccess(blockId, (ManagedBuffer) blockValue);
- } else if (blockValue instanceof Exception) {
- retryListener.onBlockFetchFailure(blockId, (Exception) blockValue);
- } else {
- fail("Can only handle ManagedBuffers and Exceptions, got " + blockValue);
- }
+ Answer<Void> answer = invocationOnMock -> {
+ try {
+ // Verify that the RetryingBlockFetcher requested the expected blocks.
+ String[] requestedBlockIds = (String[]) invocationOnMock.getArguments()[0];
+ String[] desiredBlockIds = interaction.keySet().toArray(new String[interaction.size()]);
+ assertArrayEquals(desiredBlockIds, requestedBlockIds);
+
+ // Now actually invoke the success/failure callbacks on each block.
+ BlockFetchingListener retryListener =
+ (BlockFetchingListener) invocationOnMock.getArguments()[1];
+ for (Map.Entry<String, Object> block : interaction.entrySet()) {
+ String blockId = block.getKey();
+ Object blockValue = block.getValue();
+
+ if (blockValue instanceof ManagedBuffer) {
+ retryListener.onBlockFetchSuccess(blockId, (ManagedBuffer) blockValue);
+ } else if (blockValue instanceof Exception) {
+ retryListener.onBlockFetchFailure(blockId, (Exception) blockValue);
+ } else {
+ fail("Can only handle ManagedBuffers and Exceptions, got " + blockValue);
}
- return null;
- } catch (Throwable e) {
- e.printStackTrace();
- throw e;
}
+ return null;
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw e;
}
};
@@ -295,7 +291,7 @@ public class RetryingBlockFetcherSuite {
}
assertNotNull(stub);
- stub.when(fetchStarter).createAndStart((String[]) any(), (BlockFetchingListener) anyObject());
+ stub.when(fetchStarter).createAndStart(any(), anyObject());
String[] blockIdArray = blockIds.toArray(new String[blockIds.size()]);
new RetryingBlockFetcher(conf, fetchStarter, blockIdArray, listener).start();
}